TRK_CollapseDwells takes a track, a distance threshold, and a duration threshold and returns a track. The result collapses the dwells detected based on the distance and duration thresholds provided, by preserving the start and end point of the dwell and removing the rest of dwells points from the input track.
Distance threshold is the maximum distance between point observations in a track to be considered a single dwell. Duration threshold is the minimum duration of a track to be considered a single dwell.
Dwell is defined as a track that contains sequential observations with little or no movement over a certain period of time.
Tracks are linestrings that represent the change in an entity's location over time. Each vertex in the linestring has a timestamp (stored as the M-value) and the vertices are ordered sequentially.
For more information on using tracks in GeoAnalytics for Microsoft Fabric, see the core concept topic on tracks.
Function | Syntax |
---|---|
Python | collapse |
SQL | TRK |
Scala | collapse |
For more details, go to the GeoAnalytics for Microsoft Fabric API reference for collapse_dwells.
Python and SQL Examples
from geoanalytics_fabric.tracks import functions as TRK
from geoanalytics_fabric.sql import functions as ST
from pyspark.sql import functions as F
data = [(1, "POINT (0 0)", "2021-10-05 10:30:00"),
(1, "POINT (5 0)", "2021-10-05 10:31:00"),
(1, "POINT (10 0)", "2021-10-05 10:32:00"),
(1, "POINT (11.2 1.1)", "2021-10-05 10:33:00"),
(1, "POINT (12.2 -0.2)", "2021-10-05 10:34:00"),
(1, "POINT (11.2 -1.2)", "2021-10-05 10:35:00"),
(1, "POINT (10.5 -0.9)", "2021-10-05 10:36:00"),
(1, "POINT (10.6 0.5)", "2021-10-05 10:37:00"),
(1, "POINT (11.5 0)", "2021-10-05 10:38:00"),
(1, "POINT (11.6 -0.5)", "2021-10-05 10:39:00"),
(1, "POINT (10.5 -0.3)", "2021-10-05 10:40:00"),
(1, "POINT (12 0)", "2021-10-05 10:41:00"),
(1, "POINT (12.5 0)", "2021-10-05 10:42:00"),
(1, "POINT (18 0)", "2021-10-05 10:43:00"),
(1, "POINT (20 0)", "2021-10-05 10:44:00")]
df = spark.createDataFrame(data, ["id", "wkt", "datetime_str"]) \
.withColumn("point", ST.point_from_text("wkt", 3857)) \
.withColumn("timestamp", F.to_timestamp("datetime_str")) \
.drop("wkt", "datetime_str")
df_trk = df.groupBy("id").agg(TRK.aggr_create_track("point", "timestamp").alias("track"))
trk_collapse_dwells = df_trk.select(TRK.collapse_dwells("track", (3, 'Meters'), (1, 'Minutes')).alias("collapse_dwells"))
axes = df_trk.st.plot("track", edgecolor="lightgrey", linewidths=10, figsize=(15, 8))
trk_collapse_dwells.select("collapse_dwells").st.plot(ax=axes, edgecolor="greenyellow", linewidths=3)
axes.legend(['Input track','Result track'], loc='lower right', fontsize='large', bbox_to_anchor=(1, -0.1))

Scala Example
import com.esri.geoanalytics.sql.{functions => ST}
import com.esri.geoanalytics.sql.{trackFunctions => TRK}
import org.apache.spark.sql.{functions => F}
import java.sql.Timestamp
case class pointRow(id: Int, pointWkt: String, timestamp: Timestamp)
val data = Seq(pointRow(1, "POINT (0 0)", Timestamp.valueOf("2021-10-05 10:30:00")),
pointRow(1, "POINT (5 0)", Timestamp.valueOf("2021-10-05 10:31:00")),
pointRow(1, "POINT (10 0)", Timestamp.valueOf("2021-10-05 10:32:00")),
pointRow(1, "POINT (11.2 1.1)", Timestamp.valueOf("2021-10-05 10:33:00")),
pointRow(1, "POINT (12.2 -0.2)", Timestamp.valueOf("2021-10-05 10:34:00")),
pointRow(1, "POINT (11.2 -1.2)", Timestamp.valueOf("2021-10-05 10:35:00")),
pointRow(1, "POINT (10.5 -0.9)", Timestamp.valueOf("2021-10-05 10:36:00")),
pointRow(1, "POINT (10.6 0.5)", Timestamp.valueOf("2021-10-05 10:37:00")),
pointRow(1, "POINT (11.5 0)", Timestamp.valueOf("2021-10-05 10:38:00")),
pointRow(1, "POINT (11.6 -0.5)", Timestamp.valueOf("2021-10-05 10:39:00")),
pointRow(1, "POINT (10.5 -0.3)", Timestamp.valueOf("2021-10-05 10:40:00")),
pointRow(1, "POINT (12 0)", Timestamp.valueOf("2021-10-05 10:41:00")),
pointRow(1, "POINT (12.5 0)", Timestamp.valueOf("2021-10-05 10:42:00")),
pointRow(1, "POINT (18 0)", Timestamp.valueOf("2021-10-05 10:43:00")),
pointRow(1, "POINT (20 0)", Timestamp.valueOf("2021-10-05 10:44:00")))
val points = spark.createDataFrame(data)
.withColumn("point", ST.pointFromText($"pointWkt", F.lit(3857)))
val tracks = points.groupBy("id").agg(TRK.aggrCreateTrack($"point", $"timestamp").alias("track"))
val collapsedTracks = tracks.select(TRK.collapseDwells($"track", F.lit(struct(F.lit(3).as("value"), F.lit("Meters").as("units"))),
F.lit(struct(F.lit(1).as("value"), F.lit("Minutes").as("units")))).alias("collapse_dwells"))
collapsedTracks.show(truncate = false)
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|collapse_dwells |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|{"hasM":true,"paths":[[[0,0,1.633455e9],[5,0,1.63345506e9],[10,0,1.63345512e9],[12.5,0,1.63345572e9],[18,0,1.63345578e9],[20,0,1.63345584e9]]]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------+
Version table
Release | Notes |
---|---|
1.0.0-beta | Python, SQL, and Scala functions introduced |