Skip To Content
ArcGIS Developer
Dashboard

Reading and Writing Layers in pyspark

The Run Python Script task allows you to programmatically access and use ArcGIS Enterprise layers with both GeoAnalytics Tools and the pyspark package. Reading and writing ArcGIS Enterprise layers is described below with several examples. For more examples, see Examples: Scripting custom analysis with the Run Python Script task.

Reading layers

Run Python Script allows you to read in input layers for analysis. When you read in a layer, ArcGIS Enterprise layers must be converted to Spark DataFrames to be used by geoanalytics or pyspark functions. DataFrames have built in operations that allow you to query your data, apply filters, change the schema, and more. For more information see Spark's guide to DataFrame operations. There are two ways to convert layers to DataFrames:

  • Using the layers object —Layers listed in the inputLayers parameter of the Run Python Script task are automatically loaded into DataFrames when you run a script. These DataFrames are contained in a layers object . layers is a list of DataFrames where the index of each DataFrame matches the index of the corresponding layer in the JSON array provided for inputLayers.
  • Using a URL within the script—Layers can be loaded into DataFrames within the script by calling spark.read.format("webgis").load(<URL>) using the URL to a feature service or big data file share layer.

When a layer is converted to a DataFrame, the layer's geometry will be included in the DataFrame in a column called $geometry. If a layer is time enabled, the time will be included in a column called $time.

In the example script below, multiple input layers of crime events have been defined with the inputLayers parameter. These layers are all accessible as DataFrames within the script and can be queried using DataFrame operations. Here, the total count of burglaries across several datasets is found and printed out.

Accessing multiple input layers in the Run Python Script task with the layers object.


# Report the number of input layers that were provided
print("{0} input layers have been defined".format(len(layers)))

# Iterate through each layer and find the total count of features where "type" is equal to "BURGLARY"
count_burglaries = 0

for layer in layers:
     count_burglaries += layer.filter(layer["Type"] == "BURGLARY").count()

print("Total number of burglaries in all input layers: " + str(count_burglaries))

In this example a DataFrame is created using the URL of a big data file share layer containing sensor data. The DataFrame is then used to print the sensor ID of all features with a sample measurement value greater than 500.

Accessing ArcGIS Enterprise layers in the Run Python Script task with a layer URL.


sensor_data = "https://host.domain.com/arcgis/rest/DataStoreCatalogs/bigDataFileShares_OzoneData/BigDataCatalogServer/samples_2017"

# Load the big data file share layer into a DataFrame
sensor_data_df = spark.read.format("webgis").load(sensor_data)

# Show all features where the value of field "sample_measurement" is greater than 500
sensor_data_df.filter(sensor_data_df["sample_measurement"] > 500).select("sensor_ID").show()

Filtering and extent

You can choose to filter a layer while converting it to a DataFrame using the option method. This allows you to read in only a subset of your data, reducing the runtime of your analysis. The "where" option can be used to filter the layer with an SQL query by calling spark.read.format("webgis").option("where", <SQL query string>).load(<URL>). You can also use the "fields" option to choose which fields are loaded into the DataFrame by calling spark.read.format("webgis").option("fields", <field names>).load(<URL>).

In this example a DataFrame is created using the URL of a big data file share layer containing sensor data. The "where" and "fields" options are used to filter the layer and specify which fields should be included in the result DataFrame.

Filtering an ArcGIS Enterprise layer when creating a DataFrame.


sensor_data = "https://host.domain.com/arcgis/rest/DataStoreCatalogs/bigDataFileShares_OzoneData/BigDataCatalogServer/samples_2017"

# Load the big data file share layer into a DataFrame
sensor_data_df = spark.read.format("webgis").option("where", "sample_measurement > 500").option("fields", "Site Num,sample_measurment")load(sensor_data)

The "extent" option can be used to filter a layer spatially by calling spark.read.format("webgis").option("extent", <extent string>).load(<URL>).

In this example a DataFrame is created using the URL of a feature service layer. The "extent" option is used to only read in features within the extent specified.

Setting the extent of an ArcGIS Enterprise layer when creating a DataFrame.


input_data = "https://services.arcgis.com/P3ePLMYs2RVChkJx/arcgis/rest/services/Congressional_District_Demographics/FeatureServer/0"

# Define extent
extent_str =  "{\"xmin\":-11535264.81,\"ymin\":1271271.06,\"xmax\":-5606197.40,\"ymax\":7337313.62,\"spatialReference\":{\"wkid\":102100}}"

# Load the feature service layer into a DataFrame using the extent
input_df = spark.read.format("webgis").option("extent", extent_str).load(input_data)

Writing Layers

There are three different ways you can write results:

  • Write as feature layers in your ArcGIS Enterprise portal
  • Write to a big data file shares using templates
  • Write to data formats supported by spark

DataFrames are only held in memory during the runtime of your Python script. To save any result to ArcGIS Enterprise you must call write.format("webgis").save(<output name>) on the DataFrame. The name of the output feature service is be specified by <output name>.

In this example, a Living Atlas layer is copied to the ArcGIS Data Store of type spatiotemporal (the spatiotemporal big data store).

Writing a DataFrame to the spatiotemporal big data store.


# Load the Living Atlas layer into a DataFrame
url = "https://services.arcgis.com/P3ePLMYs2RVChkJx/ArcGIS/rest/services/USA_HistoricSites_2016/FeatureServer/0"
USA_HistoricSites = spark.read.format("webgis").load(url)

# Write the DataFrame to the spatiotemporal big data store
USA_HistoricSites.write.format("webgis").save("USA_HistoricSites_copy")

All layers will be written to the spatiotemporal big data store by default unless you use the option method with the "dataStore" option to specify a different data store. You can do this by calling write.format("webgis").option("dataStore",<data store>).save(<output name>).

In this example, a Living Atlas layer is copied to a big data file share called "OutputBDFS" using an output template called "shp_default"

Writing a DataFrame to a big data file share.


# Load the Living Atlas layer into a DataFrame
url = "https://services.arcgis.com/P3ePLMYs2RVChkJx/ArcGIS/rest/services/USA_HistoricSites_2016/FeatureServer/0"
USA_HistoricSites = spark.read.format("webgis").load(url)

# Write the DataFrame to a big data file share using an output template called "shp_default"
USA_HistoricSites.write.format("webgis").option("dataStore":"/bigDataFileShares/OutputBDFS:shp_default").save("USA_HistoricSites_copy")

To write a DataFrame to an existing hosted feature service you can use the "layer" option, along with the existing service's item ID. The layer being saved must have the same spatial reference as the existing feature service. The new layer is written to the same data store as the existing service.

In this example, the result of Create Buffers is written as a new layer in an existing feature service, which is referenced by an item ID.

Writing a DataFrame to an existing feature service.


# Use Create Buffers to buffer input features by 1 mile
cb_result = geoanalytics.create_buffers(layers[0], distance = 1, distance_unit = "Miles")

# Define the item ID of a feature service in ArcGIS Enterprise
portal_item_id = "a28f401e92b34073b3d4a14c3dee88ee"

# Write Create Buffers result to the feature service item with the specified id
cb_re.write.format("webgis").option("item", portal_item_id).save("buffers_layer")

To write multiple layers to a single new feature service, you can use the "layer" option with the keyword "last". This adds the new layer to the service previously created in the script. The tool fails if no service has been created previously or if the spatial reference of the new layer doesn't match that of the last layer.

In this example, three different Find Hot Spots results are written to a single feature service by using the "last" keyword when writing the second and third layers.

Writing multiple DataFrames to a new feature service.


# Run Find Hot Spots three times using a different bin size and neighborhood size each time
hs_500ft = geoanalytics.find_hot_spots(layers[0], 500, "Feet", 1000, "Feet")
hs_1mile = geoanalytics.find_hot_spots(layers[0], 1, "Miles", 2, "Miles")
hs_10mile = geoanalytics.find_hot_spots(layers[0], 10, "Miles", 20, "Miles")

# Write the first result layer to a new feature service
hs_500ft.format("webgis").save("hotspots")

# Write the second and third result layers to the feature service just created using option("last", "true")
hs_1mile.write.format("webgis").option("last", "true").save("hotspots_1mi_2mi")
hs_10mile.write.format("webgis").option("last", "true").save("hotspots_10mi_20mi")

Only layers written with format("webgis") are available in ArcGIS Enterprise. To learn about writing to output formats other than ArcGIS Enterprise layers, see Spark's documentation for write(). In the example below, a list of customers who meet a certain criteria is generated and written to a shared folder that the ArcGIS Server account has write access to. The result is not available in ArcGIS Enterprise and may be written in multiple parts. The number of parts is dependent on how Spark has distributed the computation.

Writing a file external to ArcGIS Enterprise.


# Load a big data file share layer of customers into a DataFrame
url = "https://host.domain.com/arcgis/rest/DataStoreCatalogs/bigDataFileShares_CustomerData/BigDataCatalogServer/active_time"
customers = spark.read.format("webgis").load(url)

# Find all customers who have been logged into an application for more than 300 minutes per week
IDs = customers.filter(customers["weekly_time_active"] > 300).select("ID")

# Write the result to a shared folder in JSON format
out_path = "\\\\data\\active_customerids"
IDs.write.format("json").save(out_path)