GeoAnalytics Engine uses Spark DataFrames along with custom geometry data types to represent spatial data. A Spark DataFrame is like a Pandas DataFrame or a table in a relational database, but is optimized for distributed queries.
GeoAnalytics Engine comes with several DataFrame extensions for reading from spatial data sources like shapefiles and feature services, in addition to any data source that Spark supports. When reading from feature service, FileGDB, GeoJSON, GeoParquet, or shapefile, a geometry column will be created automatically. For other data sources, a geometry column can be created from text or binary columns using GeoAnalytics Engine functions.
In this tutorial, you will learn how to create a DataFrame from your own definition, as well as from an existing file. You will also learn how to enable time and geometry for a DataFrame, and filter for certain records and columns.
Create a DataFrame
Set up the workspace
- Import the required modules.
Python Python Scala # Import the required modules from geoanalytics.sql import functions as ST from geoanalytics import extensions from pyspark.sql.functions import to_timestamp
Create a DataFrame from an existing file
-
Specify an existing CSV file to create a DataFrame out of. The CSV contains GPS point readings with information like point ID, latitude and longitude, and a timestamp.
Python Python Scala # Specify your input dataset location myPointCSV = r"c:\MyData\MyGPSPoints.csv"
-
Using your point definition created above, create a DataFrame with specified column names.
Python Python Scala # Using your csv file, create a DataFrame with specified column names. myDataFrame = spark.read.csv(myPointCSV, header=True)
-
Return and review the schema for your DataFrame.
Python Python Scala # Review the schema of your DataFrame myDataFrame.printSchema()
Resultroot |-- id: long (nullable = true) |-- trackid: long (nullable = true) |-- longitude: long (nullable = true) |-- latitude: long (nullable = true) |-- mytime: string (nullable = true)
Create a DataFrame using your own definition
-
Create your own dataset definition containing values for ID, latitude and longitude information, and a timestamp.
Python Python Scala # Define your own dataset myPoints = [(0, -4655711.2806, 222503.076, "01/22/2020 1:00:00 PM"), (1, -4570473.292, 322503.076, "01/22/2020 1:15:00 PM"), (2, -4830838.089, 146545.398, "01/22/2020 1:30:00 PM"), (3, -4570771.608, 116617.112, "01/22/2020 1:45:00 PM"), (4, -4682228.671, 173377.654, "01/22/2020 2:00:00 PM")] fields = ["id", "latitude", "longitude", "mytime"]
-
Create a DataFrame from the CSV using the headers as column names.
Python Python Scala # Create a DataFrame myDataFrame = spark.createDataFrame(myPoints, fields)
-
Review your DataFrame. This should reflect the records in your CSV file.
Python Python Scala # Review your DataFrame myDataFrame.show()
Result+---+-------------+----------+--------------------+ | id| latitude| longitude| mytime| +---+-------------+----------+--------------------+ | 0|-4655711.2806|222503.076|01/22/2020 1:00:0...| | 1| -4570473.292|322503.076|01/22/2020 1:15:0...| | 2| -4830838.089|146545.398|01/22/2020 1:30:0...| | 3| -4570771.608|116617.112|01/22/2020 1:45:0...| | 4| -4682228.671|173377.654|01/22/2020 2:00:0...| +---+-------------+----------+--------------------+
Set geometry and time for a DataFrame
Define geometry and time
-
For this example, geometry will be of type point defined by the columns latitude and longitude. The spatial reference ID for the coordinates is
4326
. Time is defined using the columnmytime
and the formatM
. You will be able to use the result (M/dd/yyyy h :mm :ss a df
) as a time- and geometry-enabled DataFrame for input to spatial and temporal functions.Python Python Scala # Define geometry and time for your DataFrame df = myDataFrame.withColumn("geometry", ST.srid(ST.point("longitude", "latitude"), 4326)) \ .st.set_geometry_field("geometry") \ .withColumn("timestamp", to_timestamp("mytime", "MM/dd/yyyy h:mm:ss a")).st.set_time_fields("timestamp") df.show()
Result+---+-------------+----------+--------------------+--------------------+-------------------+ | id| latitude| longitude| mytime| geometry| timestamp| +---+-------------+----------+--------------------+--------------------+-------------------+ | 0|-4655711.2806|222503.076|01/22/2020 1:00:0...|{"x":222503.076,"...|2020-01-22 13:00:00| | 1| -4570473.292|322503.076|01/22/2020 1:15:0...|{"x":322503.076,"...|2020-01-22 13:15:00| | 2| -4830838.089|146545.398|01/22/2020 1:30:0...|{"x":146545.398,"...|2020-01-22 13:30:00| | 3| -4570771.608|116617.112|01/22/2020 1:45:0...|{"x":116617.112,"...|2020-01-22 13:45:00| | 4| -4682228.671|173377.654|01/22/2020 2:00:0...|{"x":173377.654,"...|2020-01-22 14:00:00| +---+-------------+----------+--------------------+--------------------+-------------------+
Investigate and summarize your DataFrame
The examples below show common operations you can perform on DataFrames.
Summarize and understand your data
-
Create a DataFrame from an existing file.
Python Python Scala # Create a DataFrame from an existing ORC file myData = r"c:\MyData\MyORCFile" df = spark.read.format("orc").load(myData)
-
Get the count of rows within your DataFrame.
Python Python Scala # Get the count of rows in your DataFrame df.count()
Result253
-
Generate a sample of your data.
Python Python Scala # Sample your DataFrame sample = df.rdd.takeSample(withReplacement=False, num=3) print(sample)
Result(Python)[Row(serial_num='1899215N12329', season=1899, num=3, basin='NA', sub_basin='MM', name='NOT NAMED', iso_time='1899-08-30 18:00:00', nature='TS', latitude=40.2, longitude=-41.2, wind_wmo_=40.0, pres_wmo_=0, center='atcf', wind_wmo1=42.322, pres_wmo1=-100.0, track_type='main', size='40000', Wind=40000, x=-41.2, y=40.2, INSTANT_DATETIME='1899-08-30 18:00:00.000'), Row(serial_num='1899215N12329', season=1899, num=3, basin='NA', sub_basin='MM', name='NOT NAMED', iso_time='1899-08-17 12:00:00', nature='TS', latitude=34.5, longitude=-74.5, wind_wmo_=105.0, pres_wmo_=0, center='atcf', wind_wmo1=94.425, pres_wmo1=-100.0, track_type='main', size='105000', Wind=105000, x=-74.5, y=34.5, INSTANT_DATETIME='1899-08-17 12:00:00.000'), Row(serial_num='1994222N11267', season=1994, num=11, basin='EP', sub_basin='CP', name='JOHN', iso_time='9/10/1994 0:00', nature='TS', latitude=41.5, longitude=-171.0, wind_wmo_=60.0, pres_wmo_=0, center='atcf', wind_wmo1=68.854, pres_wmo1=-100.0, track_type='main', size='60000', Wind=60000, x=-171.0, y=41.5, INSTANT_DATETIME='1994-09-10 00:00:00.000')]
Result(Scala)[1899215N12329,1899,3,NA,MM,NEI,1899-08-3018:00:00,TS,40.2,41.2,40.0,0,atcf,42.322,100.0,main,40000,40000,-41.2,40.2,1899-08-3018:00:00.000] [1899215N12329,1899,3,NA,MM,NOTNAMED,1899-08-1712:00:00,TS,34.5,-74.5,105.0,0,atcf,94.425,-100.0,main,105000,105000,-74.5,34.5,1899-08-1712:00:00.000] [1994222N11267,1994,11,EP,CP,JOHN,9/10/19940:00,TS,41.5,-171.0,60.0,0,atcf,68.854,-100.0,main,60000,60000,-171.0,41.5,1994-09-1000:00:00.000]
-
Group your data based on column values.
Python Python Scala # Group your DataFrame by serial number groups = df.groupBy("serial_num")
-
Calculate average, maximum, and minimum statistics for your grouped data.
Python Python Scala # Group your data and calculate group statistics groups.avg("wind_wmo_").toDF("SerialNum", "AvgWind").show() groups.max("pres_wmo_").toDF("SerialNum", "MaxPres").show() groups.min("pres_wmo_").toDF("SerialNum", "MinPres").show()
Result+-------------+-----------------+ | sub_basin| AvgWind| +-------------+-----------------+ |1994222N11267|67.97520661157024| |1899215N12329|71.43939393939394| +-------------+-----------------+ +-------------+-------+ | SerialNum|MaxPres| +-------------+-------+ |1994222N11267| 1010| |1899215N12329| 995| +-------------+-------+ +-------------+-------+ | SerialNum|MinPres| +-------------+-------+ |1994222N11267| 13| |1899215N12329| 5| +-------------+-------+
Filter and preview your data
-
Filter your data for records with a pressure value greater than or equal to 1000.
Python Python Scala # Filter your data myFilteredDF = df.filter(df["pres_wmo_"] >= 1000)
-
Query your filtered result for the columns of interest. Using
.collect()
returns the values within the column as a list of row values and.show()
prints them out as a table.Python Python Scala # Select certain columns and collect the data within them myFilteredDF.selectExpr("latitude", "longitude").collect()
Result(Python)[Row(latitude=11.1, longitude=-97.1), Row(latitude=11.2, longitude=-98.2), Row(latitude=11.2, longitude=-99.4), Row(latitude=11.3, longitude=-100.6), Row(latitude=11.5, longitude=-101.7), Row(latitude=14.0, longitude=-108.4), Row(latitude=14.0, longitude=-109.1), Row(latitude=13.9, longitude=-109.9), Row(latitude=13.8, longitude=-111.0), Row(latitude=13.7, longitude=-114.4), Row(latitude=13.7, longitude=-115.4), Row(latitude=14.6, longitude=-118.8), Row(latitude=15.0, longitude=-119.8), Row(latitude=15.3, longitude=-120.9), Row(latitude=15.3, longitude=-122.3), Row(latitude=15.3, longitude=-124.0), Row(latitude=15.3, longitude=-125.7), Row(latitude=15.3, longitude=-127.5), Row(latitude=15.2, longitude=-129.2)]
Result(Scala)[11.1, -97.1] [11.2, -98.2] [11.2, -99.4] [11.3, -100.6] [11.5, -101.7] [14.0, -108.4] [14.0, -109.1] [13.9, -109.9] [13.8, -111.0] [13.7, -114.4] [13.7, -115.4] [14.6, -118.8] [15.0, -119.8] [15.3, -120.9] [15.3, -122.3] [15.3, -124.0] [15.3, -125.7] [15.3, -127.5] [15.2, -129.2]
Python Python Scala # Print the columns you have filtered and selected for myFilteredDF.selectExpr("latitude", "longitude").show()
Result+--------+---------+ |latitude|longitude| +--------+---------+ | 11.1| -97.1| | 11.2| -98.2| | 11.2| -99.4| | 11.3| -100.6| | 11.5| -101.7| | 14.0| -108.4| | 14.0| -109.1| | 13.9| -109.9| | 13.8| -111.0| | 13.7| -114.4| | 13.7| -115.4| | 14.6| -118.8| | 15.0| -119.8| | 15.3| -120.9| | 15.3| -122.3| | 15.3| -124.0| | 15.3| -125.7| | 15.3| -127.5| | 15.2| -129.2| +--------+---------+
What's next?
Learn more about how to set up your data and run tools and SQL functions: