Prerequisites
- The tools available in the ArcGIS API for Python geoanalytics module require an ArcGIS Enterprise licensed and configured with the linux based ArcGIS GeoAnalytics server.
- When ArcGIS GeoAnalytics Server is installed on Linux, additional configuration steps are required before using the RunPythonScript operation. Install and configure Python 3.6 for Linux on each machine in your GeoAnalytics Server site, ensuring that Python is installed into the same directory on each machine. For this analysis, you also need to install FB Prophet library in the same environment. Then, update the ArcGIS Server Properties on your GeoAnalytics Server site with the pysparkPython property. The value of this property should be the path to the Python executable on your GeoAnalytics Server machines, for example, {"pysparkPython":"path/to/environment"}.
Introduction
While the arcgis.geoanalytics
module offers powerful spatial analysis tools, the pyspark package includes dozens of non-spatial distributed tools for classification, prediction, clustering, and more.
Using the power of distributed compute, we can analyze large datasets much faster than other non-distributed systems.
This notebook demonstrates the capability of spark-powered geoanalytics server to forecast hourly PM2.5 given the historic time series data for more than one time-dependent variable. The most common factors in the weather environment used in this analysis are PM2.5, PM10, wind speed, wind direction, and relative humidity. The levels of these pollutants are measured by the US Environmental Protection Agency (EPA), which controls overall air quality. We have used the dataset provided by EPA. With Spark we will learn how to customize and extend our analysis capabilities by:
- Querying and summarizing your data using SQL
- Turning analysis workflows into pipelines of GeoAnalytics tools
- Modeling data with included machine learning libraries
The forecasted result will then be displayed on a dashboard using the recently introduced arcgis.dashboard
module in ArcGIS API for Python.
Note:
- The ability to perform big data analysis is only available on ArcGIS Enterprise licensed with a GeoAnalytics server and not yet available on ArcGIS Online.
Necessary imports
from datetime import datetime as dt
import pandas as pd
import arcgis
from arcgis.gis import GIS
from arcgis.geoanalytics.manage_data import run_python_script
Connect to your ArcGIS Enterprise organization
gis = GIS('https://pythonapi.playground.esri.com/portal', 'arcgis_python', 'amazing_arcgis_123')
Ensure your GIS supports GeoAnalytics
After connecting to the Enterprise organization, we need to ensure an ArcGIS Enterprise GIS is set up with a licensed GeoAnalytics server. To do so, we will call the is_supported()
method.
arcgis.geoanalytics.is_supported()
True
Prepare data
To register a file share or an HDFS, we need to format datasets as subfolders within a single parent folder and register the parent folder. This parent folder becomes a datastore, and each subfolder becomes a dataset. Our folder hierarchy would look like below:
Learn more about preparing your big data file share datasets here.
The get_datastores()
method of the geoanalytics module returns a DatastoreManager
object that lets you search for and manage the big data file share items as Python API Datastore
objects on your GeoAnalytics server.
bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager
<DatastoreManager for https://pythonapi.playground.esri.com/ga/admin>
We will register air quality data as a big data file share using the add_bigdata()
function on a DatastoreManager
object.
When we register a directory, all subdirectories under the specified folder are also registered with the server. Always register the parent folder (for example, \machinename\mydatashare) that contains one or more individual dataset folders as the big data file share item. To learn more, see register a big data file share
Note: You cannot browse directories in ArcGIS Server Manager. You must provide the full path to the folder you want to register, for example, \myserver\share\bigdata. Avoid using local paths, such as C:\bigdata, unless the same data folder is available on all nodes of the server site.
# data_item = bigdata_datastore_manager.add_bigdata("air_quality_17_18_19", r"/mnt/network/data")
bigdata_fileshares = bigdata_datastore_manager.search()
bigdata_fileshares
[<Datastore title:"/bigDataFileShares/NYC_taxi_data15" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/GA_Data" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/ServiceCallsOrleansTest" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/calls" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/all_hurricanes" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/NYCdata" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/hurricanes_1848_1900" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/ServiceCallsOrleans" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/hurricanes_dask_csv" type:"bigDataFileShare">, <Datastore title:"/bigDataFileShares/hurricanes_dask_shp" type:"bigDataFileShare">, <Datastore title:"/cloudStores/cloud_store" type:"cloudStore">]
Get data for analysis
Adding a big data file share to the Geoanalytics server adds a corresponding big data file share item on the portal. We can search for these types of items using the item_type parameter.
aqs_data = gis.content.search("bigDataFileShares_GA_Data", item_type = "big data file share")[0]
aqs_data
Querying the layers property of the item returns a featureLayer representing the data.
air_lyr = aqs_data.layers[0]
air_lyr.properties
{ "dataStoreID": "4f498adb-8e5a-4b7e-96d1-9accacb26a50", "fields": [ { "name": "State Code", "type": "esriFieldTypeInteger" }, { "name": "County Code", "type": "esriFieldTypeInteger" }, { "name": "Site Num", "type": "esriFieldTypeInteger" }, { "name": "Parameter Code", "type": "esriFieldTypeInteger" }, { "name": "POC", "type": "esriFieldTypeInteger" }, { "name": "Latitude", "type": "esriFieldTypeDouble" }, { "name": "Longitude", "type": "esriFieldTypeDouble" }, { "name": "Datum", "type": "esriFieldTypeString" }, { "name": "Parameter Name", "type": "esriFieldTypeString" }, { "name": "Date Local", "type": "esriFieldTypeString" }, { "name": "Time Local", "type": "esriFieldTypeString" }, { "name": "Date GMT", "type": "esriFieldTypeString" }, { "name": "Time GMT", "type": "esriFieldTypeString" }, { "name": "Sample Measurement", "type": "esriFieldTypeDouble" }, { "name": "Units of Measure", "type": "esriFieldTypeString" }, { "name": "MDL", "type": "esriFieldTypeDouble" }, { "name": "Uncertainty", "type": "esriFieldTypeString" }, { "name": "Qualifier", "type": "esriFieldTypeString" }, { "name": "Method Type", "type": "esriFieldTypeString" }, { "name": "Method Code", "type": "esriFieldTypeInteger" }, { "name": "Method Name", "type": "esriFieldTypeString" }, { "name": "State Name", "type": "esriFieldTypeString" }, { "name": "County Name", "type": "esriFieldTypeString" }, { "name": "Date of Last Change", "type": "esriFieldTypeString" } ], "name": "air_quality", "geometryType": "esriGeometryPoint", "type": "featureClass", "spatialReference": { "wkid": 102682, "latestWkid": 3452 }, "geometry": { "fields": [ { "name": "Longitude", "formats": [ "x" ] }, { "name": "Latitude", "formats": [ "y" ] } ] }, "time": { "timeType": "instant", "timeReference": { "timeZone": "UTC" }, "fields": [ { "name": "Date Local", "formats": [ "yyyy-MM-dd" ] } ] }, "currentVersion": 10.81, "children": [] }
Now we'll search for a feature layer depicting all the counties in the United States. We'll use the county polygons later in the notebook.
usa_counties = gis.content.get('5c6ef8ef57934990b543708f815d606e')
usa_counties
We will use the first item for our analysis. Since the item is a Feature Layer Collection, accessing the layers property will give us a list of Feature layer objects.
usa_counties_lyr = usa_counties.layers[0]
Uncover patterns in data
Describe data
The describe_dataset
method provides an overview of big data. By default, the tool outputs a table layer containing calculated field statistics and a dictionary outlining geometry and time settings for the input layer.
Optionally, the tool can output a feature layer
representing a sample set of features using the sample_size
parameter, or a single polygon feature layer representing the input feature layers' extent by setting the extent_output
parameter to True.
from arcgis.geoanalytics.summarize_data import describe_dataset
description = describe_dataset(input_layer=air_lyr,
extent_output=True,
sample_size=1000,
output_name="Description of air quality data" + str(dt.now().microsecond),
return_tuple=True)
description.output_json
{'datasetName': 'air_quality', 'datasetSource': 'Big Data File Share - Air_Auality_17_18_19', 'recordCount': 147535365, 'geometry': {'geometryType': 'Point', 'sref': {'wkid': 4326}, 'countNonEmpty': 147535365, 'countEmpty': 0, 'spatialExtent': {'xmin': -161.767, 'ymin': 17.953006, 'xmax': -65.915482, 'ymax': 64.84568999999999}}, 'time': {'timeType': 'Instant', 'countNonEmpty': 147535365, 'countEmpty': 0, 'temporalExtent': {'start': '2017-01-01 00:00:00.000', 'end': '2019-04-30 00:00:00.000'}}}
We can also use sql queries to return a subset of records by leveraging the ArcGIS API for Python's Feature Layer object itself. When you run a query()
on a FeatureLayer, you get back a FeatureSet object. Calling the sdf
property of the FeatureSet returns a Spatially Enabled DataFrame object.
description.sample_layer.query().sdf
State_Code | County_Code | Site_Num | Parameter_Code | POC | Latitude | Longitude | Datum | Parameter_Name | Date_Local | ... | Method_Type | Method_Code | Method_Name | State_Name | County_Name | Date_of_Last_Change | INSTANT_DATETIME | globalid | OBJECTID | SHAPE | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 56 | 35 | 700 | 62201 | 1 | 42.486361 | -110.098861 | WGS84 | Relative Humidity | 2017-07-05 | ... | Non-FRM | 60 | Instrumental - Vaisala 435C RH/AT Sensor | Wyoming | Sublette | 2017-11-20 | 2017-07-05 | {4CB48700-32CB-6126-E342-B783681A14AF} | 1 | {"x": -110.098861, "y": 42.486360999999995, "s... |
1 | 9 | 9 | 27 | 88313 | 1 | 41.301400 | -72.902871 | WGS84 | Black Carbon PM2.5 at 880 nm | 2017-06-08 | ... | Non-FRM | 894 | Magee AE33/ TAPI M633 Aethalometer - Optical a... | Connecticut | New Haven | 2018-01-09 | 2017-06-08 | {3D99EDC1-EC50-6B1A-508F-A0034CD9DB02} | 2 | {"x": -72.90287099999999, "y": 41.3014, "spati... |
2 | 45 | 79 | 7 | 42601 | 2 | 34.093959 | -80.962304 | WGS84 | Nitric oxide (NO) | 2018-06-28 | ... | Non-FRM | 674 | Instrumental - Chemiluminescence Thermo Electr... | South Carolina | Richland | 2019-02-18 | 2018-06-28 | {04ED8FBB-2ADF-AA44-BE30-F61E7A939657} | 3 | {"x": -80.962304, "y": 34.093959, "spatialRefe... |
3 | 48 | 183 | 1 | 62101 | 1 | 32.378682 | -94.711811 | WGS84 | Outdoor Temperature | 2018-10-18 | ... | Non-FRM | 40 | INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. | Texas | Gregg | 2019-02-18 | 2018-10-18 | {60E09F2E-891F-EEBD-1253-0FFA407BDEA4} | 4 | {"x": -94.711811, "y": 32.378682, "spatialRefe... |
4 | 19 | 153 | 30 | 44201 | 1 | 41.603159 | -93.643118 | WGS84 | Ozone | 2017-12-23 | ... | FEM | 47 | INSTRUMENTAL - ULTRA VIOLET | Iowa | Polk | 2018-01-12 | 2017-12-23 | {73E73030-0883-7A32-ED9C-BD59ACC00A2F} | 5 | {"x": -93.643118, "y": 41.603159, "spatialRefe... |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
995 | 27 | 53 | 962 | 61104 | 1 | 44.965242 | -93.254759 | NAD83 | Wind Direction - Resultant | 2018-07-15 | ... | Non-FRM | 63 | Instrumental - Climatronics | Minnesota | Hennepin | 2019-01-17 | 2018-07-15 | {61736B2D-7C20-DF9E-37BA-21CF788F8FDD} | 1003 | {"x": -93.25475899999999, "y": 44.965241999999... |
996 | 48 | 43 | 101 | 44201 | 1 | 29.302650 | -103.177810 | WGS84 | Ozone | 2018-02-21 | ... | FEM | 47 | INSTRUMENTAL - ULTRA VIOLET | Texas | Brewster | 2018-04-19 | 2018-02-21 | {E739DA0C-7A97-8059-72A3-6505490F990D} | 1004 | {"x": -103.17781, "y": 29.30265, "spatialRefer... |
997 | 19 | 163 | 17 | 62101 | 1 | 41.467236 | -90.688451 | WGS84 | Outdoor Temperature | 2017-07-16 | ... | Non-FRM | 40 | INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. | Iowa | Scott | 2017-08-10 | 2017-07-16 | {1B805C6B-7621-B9B2-9C25-E35DC16D466E} | 1006 | {"x": -90.688451, "y": 41.467236, "spatialRefe... |
998 | 39 | 145 | 21 | 62101 | 1 | 38.600611 | -82.829782 | NAD83 | Outdoor Temperature | 2017-12-20 | ... | Non-FRM | 40 | INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. | Ohio | Scioto | 2018-01-12 | 2017-12-20 | {965A3811-00DB-88A4-A734-959041D2CF67} | 1007 | {"x": -82.829782, "y": 38.600611, "spatialRefe... |
999 | 6 | 31 | 4 | 61103 | 1 | 36.102244 | -119.565650 | NAD83 | Wind Speed - Resultant | 2017-09-28 | ... | Non-FRM | 20 | INSTRUMENTAL - VECTOR SUMMATION | California | Kings | 2018-03-05 | 2017-09-28 | {85F9DBB0-C17B-A74A-2717-29AACFCDB63B} | 1008 | {"x": -119.56565, "y": 36.102244, "spatialRefe... |
1000 rows × 28 columns
m1 = gis.map('USA')
m1
Locations of the Air pollution monitors
m1.add_layer(description.sample_layer)
m1.zoom_to_layer(description.sample_layer)
Commonly used methods of measurement
The 'Method Name' attribute contains information about the type of instrument used for measurement. 'Parameter Name' attribute tells about the name or description assigned in AQS to the parameter measured by the monitor. For more details, read here.
The function below groups data by both these parameters. This will help us know the most common type of instrument used to measure each type of parameter.
def measurement_type():
from datetime import datetime as dt
# Load the big data file share layer into a DataFrame
df = layers[0]
out = df.groupBy('Method Name','Parameter Name').count()
out.write.format("webgis").save("common_method_type" + str(dt.now().microsecond)) # Write the final result to our datastore.
The run_python_script
method executes a Python script directly in an ArcGIS GeoAnalytics server site . The script can create an analysis pipeline by chaining together multiple GeoAnalytics tools without writing intermediate results to a data store. The tool can also distribute Python functionality across the GeoAnalytics server site.
When using the geoanalytics and pyspark packages, most functions return analysis results as Spark DataFrame memory structures. You can write these data frames to a data store or process them in a script. This lets you chain multiple geoanalytics and pyspark tools while only writing out the final result, eliminating the need to create any bulky intermediate result layers.
run_python_script(code=measurement_type, layers=[air_lyr])
The result is saved as a feature layer. We can Search for the saved item using the search() method. Providing the search keyword same as the name we used for writing the result will retrieve the layer.
method_item = gis.content.search('common_method_type')[0]
Accessing the tables
property of the item will give us the tables object. We will then use query()
method to read the table as spatially enabled dataframe.
method_df = method_item.tables[0].query(as_df=True)
Sort the values in the decreasing order of the count field.
method_df.sort_values(by='count', ascending=False)
Method_Name | Parameter_Name | count | globalid | OBJECTID | |
---|---|---|---|---|---|
89 | INSTRUMENTAL - ULTRA VIOLET ABSORPTION | Ozone | 10650047 | {1C333EB9-AB7D-9ACF-3116-09924757FD90} | 112 |
6 | INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. | Outdoor Temperature | 9808818 | {17DCDDD0-079F-5BF2-45C3-0BF485355EB0} | 8 |
158 | INSTRUMENTAL - ULTRA VIOLET | Ozone | 8634142 | {EDA2D065-CE9E-2175-F094-8D9C2F6DCEBD} | 198 |
290 | INSTRUMENTAL - VECTOR SUMMATION | Wind Direction - Resultant | 7348868 | {7154F73E-775C-4C1A-8775-FB99B0074348} | 503 |
277 | INSTRUMENTAL - VECTOR SUMMATION | Wind Speed - Resultant | 7267098 | {4CCC027A-BABE-DF05-FC6B-9B04F013AF14} | 455 |
... | ... | ... | ... | ... | ... |
100 | Cooper Environmental Services model Xact 620 -... | Vanadium PM10 LC | 105 | {ABFA136A-BE6E-79A7-FCA4-608CFABCB77F} | 126 |
234 | Cooper Environmental Services model Xact 620 -... | Tin PM10 LC | 105 | {A5B11059-6FD5-EC9A-65E4-6D4B06B25FFF} | 339 |
245 | Cooper Environmental Services model Xact 620 -... | Silver PM10 LC | 105 | {E56280AE-5F42-AD31-2D95-C3418CB40985} | 356 |
12 | Cooper Environmental Services model Xact 620 -... | Calcium PM10 LC | 105 | {6FC4B1BA-A7E4-A69C-BB44-CC26A6FD935B} | 17 |
38 | The | PM2.5 - Local Conditions | 1 | {E955A26F-4FC1-E8A3-40F1-D85F48D15ED9} | 49 |
338 rows × 5 columns
The table above shows that Ozone is measure using ULTRA VIOLET ABSORPTION method. We can filter the dataframe and search for the ones we are interested in.
Average PM 2.5 value by county
The function below filters the data by rows that give information about PM2.5 pollutant. To find the average PM2.5 value of each county, we will use join_features
tool. Finally, we will write the output to the datastore.
def average():
from datetime import datetime as dt
df = layers[0]
df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
res = geoanalytics.join_features(target_layer=layers[1],
join_layer=df,
join_operation="JoinOneToOne",
summary_fields=[{'statisticType' : 'mean', 'onStatisticField' : 'Sample Measurement'}],
spatial_relationship='Contains')
res.write.format("webgis").save("average_pm_by_county" + str(dt.now().microsecond))
run_python_script(average, [air_lyr, usa_counties_lyr])
average_pm_by_county = gis.content.search('average_pm_by_county')[0]
average_pm_by_county
avg_pm = average_pm_by_county.layers[0]
avg_pm.query(as_df=True).columns
Index(['fid', 'state_name', 'fips', 'population', 'pop_sqmi', 'SHAPE__Length1', 'SHAPE__Area1', 'COUNT', 'MEAN_Sample_Measurement', 'globalid', 'OBJECTID', 'SHAPE'], dtype='object')
m2 = gis.map('USA')
m2
m2.add_layer(avg_pm, {"type": "FeatureLayer",
"renderer":"ClassedColorRenderer",
"field_name":"MEAN_Sample_Measurement",
"class_breaks": 6})
m2.zoom_to_layer(avg_pm)
m2.legend=True
Prepare time series data
We have observed that the data is spread across US and comes from multiple stations. So we will create a dataframe that contains data points from one station. Additionally, for the purpose of sample, we will only use 2017 and 2018 data to train our model and foreacst on 2019 data.
The function below creates a column that gives a unique station id to each row of the data. We will then filter by one station id. The timeseries data is expected to have time-dependent variables as attributes of dataframe. For this, we will pivot the table along 'Parameter Name' column.
def data_processsing():
from datetime import datetime as dt
import pyspark.sql.functions as F
from pyspark.sql.functions import concat, col, lit
# Load the big data file share layer into a DataFrame.
df = layers[0] #converts feature layer to spark dataframe
cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']
df = df.select(cols) #create a subset of the dataset with only selected columns
df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))
df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))
df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))
df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))
# drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']
df = df.drop('Site_Num', 'County_Code', 'Staate_Code', 'Site Num', 'County Code', 'State Code')
df = df.withColumn('datetime', concat(col("Date Local"), lit(" "), col("Time Local")))
# drop_cols = ['Time Local', 'Date Local']
df = df.drop('Time Local', 'Date Local')
df = df.filter(df.unique_id == df.first().unique_id) #filter by only one station
# group the dataframe by datetime,unique_id field and pivot the table to get variables needed for prediction as columns
df = df.groupby(df['datetime'], df['unique_id']).pivot("Parameter Name").avg("Sample Measurement")
df.write.format("webgis").save("timeseries_data_17_18_19_1station" + str(dt.now().microsecond))
run_python_script(code=data_processsing, layers=[air_lyr], gis=gis)
data = gis.content.search('timeseries_data_17_18_19_1station')[0]
data
series_data = data.tables[0]
series_data.query(as_df=True)[:5]
Barometric_pressure | Carbon_monoxide | Nitric_oxide__NO_ | Nitrogen_dioxide__NO2_ | OBJECTID | Outdoor_Temperature | Oxides_of_nitrogen__NOx_ | Ozone | PM10_Total_0_10um_STP | PM2_5___Local_Conditions | Reactive_oxides_of_nitrogen__NOy_ | Relative_Humidity | Sulfur_dioxide | Wind_Direction___Resultant | Wind_Speed___Resultant | datetime | globalid | unique_id | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1004.3 | 0.133 | 0.80 | 1.8 | 1 | 54.3 | 2.6 | 0.039 | NaN | 3.6 | 2.5 | 35.0 | 0.3 | 73.0 | 4.4 | 2017-02-13 10:00 | {35E5A035-34CC-85FB-4955-22F1325C9A40} | 010730023 |
1 | 996.3 | NaN | 0.15 | 12.3 | 2 | 46.4 | 12.6 | 0.018 | NaN | 13.2 | 13.0 | 59.0 | 0.6 | 73.0 | 3.5 | 2017-12-14 22:00 | {B078A8A2-237E-4732-4C33-62B9D9F39D57} | 010730023 |
2 | 995.8 | NaN | 31.40 | 21.5 | 3 | 66.2 | 55.9 | 0.007 | 50.0 | 19.0 | 48.5 | 69.0 | 6.4 | 150.0 | 2.0 | 2017-05-26 06:00 | {F45C8958-8061-FA8D-DDC8-9325DB224B77} | 010730023 |
3 | 992.6 | NaN | -0.05 | 2.7 | 4 | 57.4 | 2.5 | 0.035 | NaN | NaN | 2.9 | 56.0 | 0.5 | 280.0 | 5.9 | 2018-01-22 22:00 | {85796BC1-2FE1-86C6-23BE-E70D9DABD1F7} | 010730023 |
4 | 1007.0 | 0.148 | 0.15 | 6.9 | 5 | 46.2 | 7.0 | 0.035 | NaN | 1.7 | 6.5 | 41.0 | 0.0 | 119.0 | 3.3 | 2017-02-09 17:00 | {61606AF2-0DC2-52DF-3B01-B979C11CB37E} | 010730023 |
Predict PM2.5 using Facebook's Prophet model
The functon below uses pyspark pandas UDF function fb-prophet model to foreast pm2.5 hourly value for the month of 2019 January.
def predict_pm25():
#imports
from arcgis.gis import GIS
gis = GIS(profile="your_enterprise_portal")
from datetime import datetime as dt
from pyspark.sql.functions import concat, col, lit
import pandas as pd
import numpy as np
from fbprophet import Prophet
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
import warnings
warnings.filterwarnings('ignore')
df1 = layers[0] #converts laayer into spark dataframe
cols = ['Outdoor_Temperature', 'Ozone', 'PM10_Total_0_10um_STP',
'PM2_5___Local_Conditions',
'Wind_Direction___Resultant',
'Wind_Speed___Resultant', 'datetime']
df1 = df1.select(cols) #filter data by columns needed
df1 = df1.withColumn('flag', lit(1))
schema = StructType([StructField('ds', TimestampType(), True), #schema of the resulting dataframe
StructField('yhat_lower', FloatType(), True),
StructField('yhat_upper', FloatType(), True),
StructField('yhat', FloatType(), True),
StructField('y', FloatType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def forecast_pm25(df):
#prepare data
df['Date'] = df['datetime'].astype('datetime64[ns]')
df['year'] = df['Date'].dt.year
df.set_index('Date', inplace=True)
df.sort_index(inplace=True)
v = pd.date_range(start='2016-12-31 23:00:00', periods=18265, freq='H', closed='right') #get date range
newdf = pd.DataFrame(index=v)
# Fill missing dates
historical=pd.merge(newdf, df, how='left', left_index=True, right_index=True)
historical.interpolate(method='time', inplace=True)
historical.reset_index(inplace=True)
historical.rename(columns={'index': 'ds', 'PM2_5___Local_Conditions': 'y'}, inplace=True)
historical.fillna(0, inplace=True)
# handle zero and negative values for pm
for i,item in enumerate(historical['y']):
if item<=0:
historical['y'].iloc[i]=historical['y'].iloc[i-1]
else:
historical['y'].iloc[i]=item
for i,item in enumerate(historical['PM10_Total_0_10um_STP']):
if item<=0:
historical['PM10_Total_0_10um_STP'].iloc[i]=historical['PM10_Total_0_10um_STP'].iloc[i-1]
else:
historical['PM10_Total_0_10um_STP'].iloc[i]=item
for i,item in enumerate(historical['Wind_Speed___Resultant']):
if item<=0:
historical['Wind_Speed___Resultant'].iloc[i]=historical['Wind_Speed___Resultant'].iloc[i-1]
else:
historical['Wind_Speed___Resultant'].iloc[i]=item
for i,item in enumerate(historical['Wind_Direction___Resultant']):
if item<=0:
historical['Wind_Direction___Resultant'].iloc[i]=historical['Wind_Direction___Resultant'].iloc[i-1]
else:
historical['Wind_Direction___Resultant'].iloc[i]=item
# split data into train and test
train_df = historical[historical.year != 2019]
test_df = historical[historical.year == 2019]
test_df.drop(columns='y', inplace=True)
# train model
m = Prophet(daily_seasonality=True,
weekly_seasonality=True)
m.add_regressor('PM10_Total_0_10um_STP')
m.add_regressor('Wind_Speed___Resultant')
m.add_regressor('Wind_Direction___Resultant')
m.fit(train_df);
# predict on test data
forecast = m.predict(test_df)
# save plots locally
plot1 = m.plot(forecast);
plot2 = m.plot_components(forecast);
plot1.savefig(r'/home/ags/localdatastore/fbdata/forecast.png')
plot2.savefig(r'/home/ags/localdatastore/fbdata/cmponents.png')
# Uncomment the following lines if you want to publish and visualize your graphs as as item.
# gis = GIS('https://machinename/portal', 'username', 'password')
# gis.content.add(item_properties={"type": "Image", "title": "Forecast Plot"}, data=r"/home/ags/localdatastore/fbdata/forecast.png")
# gis.content.add(item_properties={"type": "Image", "title": "Forecase Components2"}, data=r"/home/ags/localdatastore/fbdata/cmponents.png")
# create df with actual and predicted fields
cmp_df = forecast.set_index('ds')[['yhat', 'yhat_lower', 'yhat_upper']].join(historical.set_index('ds'))
cmp_df.reset_index(inplace=True)
cmp_df = cmp_df[['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y']]
return cmp_df
res = df1.groupby(['flag']).apply(forecast_pm25)
res.write.format("webgis").save("predicted_results_on_test_data" + str(dt.now().microsecond))
run_python_script(code=predict_pm25, layers=[series_data])
predicted_item = gis.content.search('predicted_results_on_test_data854829')[0]
predicted_item
predicted_df = predicted_item[0].tables[0].query().sdf
predicted_df.columns
Index(['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y', 'globalid', 'OBJECTID'], dtype='object')
predicted_df.head()
ds | yhat_lower | yhat_upper | yhat | y | globalid | OBJECTID | |
---|---|---|---|---|---|---|---|
0 | 2018-12-31 18:30:00 | 1.357769 | 15.941110 | 8.691076 | 5.5 | {7BCD0AF8-F5B1-4A7E-7118-88589E0A4DAD} | 44 |
1 | 2018-12-31 19:30:00 | 1.141512 | 16.265352 | 8.773848 | 5.7 | {747EF178-8A36-5CCC-74A3-4BAD71DB0553} | 244 |
2 | 2018-12-31 20:30:00 | 0.389374 | 15.927957 | 7.641061 | 5.4 | {7D6C9381-7279-AB10-C5F7-AE138BFD6AE0} | 444 |
3 | 2018-12-31 21:30:00 | -0.164141 | 15.033084 | 7.111085 | 4.4 | {B9BD44CF-F794-87F3-E389-E654CDCE8844} | 644 |
4 | 2018-12-31 22:30:00 | -0.587776 | 14.489634 | 7.002420 | 4.4 | {B7C8C5C7-1C9E-E66A-CA41-39E5F0D601FA} | 844 |
In the above table, y attribute shows the actual value of PM2.5 and yhat shows the values predicted by the trained model.
Visualize result on Dashboard
The arcgis.apps
module includes Dashboard submodule to create dashboards programmatically. The dashboard submodule contains classes for different widgets which can be configured and be used to publish a dashboard.
We want to visualize our predicted results on a dashboard. To learn more about Dashboards module, visit guide here.
from arcgis.apps.dashboard import SerialChart, add_column, add_row
from arcgis.apps.dashboard import Dashboard
from arcgis.apps.dashboard import SerialChart
chart = SerialChart(predicted_item, #Create a serial chart
categories_from="features",
title="Forecast of PM2.5 for Janumary 2019")
chart.data.category_field = "ds" #set category field
chart.category_axis.title = "datetime" #set title for x axis
chart.value_axis.title = "pm2.5" #set title for y axis
#set fields to visualize on y axis
chart.data.add_value_field('y', line_color='#CB4335')
chart.data.add_value_field('yhat', line_color='#2980B9')
chart.data.add_value_field('yhat_upper', line_color='#CACFD2')
chart.data.add_value_field('yhat_lower', line_color='#CACFD2')
chart.legend.visibility = True
chart.legend.placement = "side"
chart.category_axis.minimum_period = 'hours'
chart.data.labels = False
chart.data.hover_text = True
chart.data.hover_text = True
chart
agol_gis = GIS('home')
dashboard = Dashboard() #creates a Dashboard object
dashboard.layout = add_row([a_chart]) #adds one chat to the Dashboard
dashboard.save('pm2.5_dashboard_2019_jan', #publishes the dashboard to the portal
gis=agol_gis)
Conclusion
In this notebook, we learnt how to use spark with geoanalytics server in order to carry out distributed computation of big data analysis.