Forecasting PM2.5 using big data analysis

Prerequisites

  • The tools available in the ArcGIS API for Python geoanalytics module require an ArcGIS Enterprise licensed and configured with the linux based ArcGIS GeoAnalytics server.
  • When ArcGIS GeoAnalytics Server is installed on Linux, additional configuration steps are required before using the RunPythonScript operation. Install and configure Python 3.6 for Linux on each machine in your GeoAnalytics Server site, ensuring that Python is installed into the same directory on each machine. For this analysis, you also need to install FB Prophet library in the same environment. Then, update the ArcGIS Server Properties on your GeoAnalytics Server site with the pysparkPython property. The value of this property should be the path to the Python executable on your GeoAnalytics Server machines, for example, {"pysparkPython":"path/to/environment"}.

Introduction

While the arcgis.geoanalytics module offers powerful spatial analysis tools, the pyspark package includes dozens of non-spatial distributed tools for classification, prediction, clustering, and more. Using the power of distributed compute, we can analyze large datasets much faster than other non-distributed systems.

This notebook demonstrates the capability of spark-powered geoanalytics server to forecast hourly PM2.5 given the historic time series data for more than one time-dependent variable. The most common factors in the weather environment used in this analysis are PM2.5, PM10, wind speed, wind direction, and relative humidity. The levels of these pollutants are measured by the US Environmental Protection Agency (EPA), which controls overall air quality. We have used the dataset provided by EPA. With Spark we will learn how to customize and extend our analysis capabilities by:

  • Querying and summarizing your data using SQL
  • Turning analysis workflows into pipelines of GeoAnalytics tools
  • Modeling data with included machine learning libraries

The forecasted result will then be displayed on a dashboard using the recently introduced arcgis.dashboard module in ArcGIS API for Python.

Note:

  • The ability to perform big data analysis is only available on ArcGIS Enterprise licensed with a GeoAnalytics server and not yet available on ArcGIS Online.

Necessary imports

Input
from datetime import datetime as dt
import pandas as pd

import arcgis
from arcgis.gis import GIS
from arcgis.geoanalytics.manage_data import run_python_script

Connect to your ArcGIS Enterprise organization

Input
gis = GIS('https://pythonapi.playground.esri.com/portal', 'arcgis_python', 'amazing_arcgis_123')

Ensure your GIS supports GeoAnalytics

After connecting to the Enterprise organization, we need to ensure an ArcGIS Enterprise GIS is set up with a licensed GeoAnalytics server. To do so, we will call the is_supported() method.

Input
arcgis.geoanalytics.is_supported()
Output
True

Prepare data

To register a file share or an HDFS, we need to format datasets as subfolders within a single parent folder and register the parent folder. This parent folder becomes a datastore, and each subfolder becomes a dataset. Our folder hierarchy would look like below:

Learn more about preparing your big data file share datasets here.

The get_datastores() method of the geoanalytics module returns a DatastoreManager object that lets you search for and manage the big data file share items as Python API Datastore objects on your GeoAnalytics server.

Input
bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager
Output
<DatastoreManager for https://pythonapi.playground.esri.com/ga/admin>

We will register air quality data as a big data file share using the add_bigdata() function on a DatastoreManager object.

When we register a directory, all subdirectories under the specified folder are also registered with the server. Always register the parent folder (for example, \machinename\mydatashare) that contains one or more individual dataset folders as the big data file share item. To learn more, see register a big data file share

Note: You cannot browse directories in ArcGIS Server Manager. You must provide the full path to the folder you want to register, for example, \myserver\share\bigdata. Avoid using local paths, such as C:\bigdata, unless the same data folder is available on all nodes of the server site.

Input
# data_item = bigdata_datastore_manager.add_bigdata("air_quality_17_18_19", r"/mnt/network/data")
Input
bigdata_fileshares = bigdata_datastore_manager.search()
bigdata_fileshares
Output
[<Datastore title:"/bigDataFileShares/NYC_taxi_data15" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/GA_Data" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/ServiceCallsOrleansTest" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/calls" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/all_hurricanes" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/NYCdata" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_1848_1900" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/ServiceCallsOrleans" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_dask_csv" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_dask_shp" type:"bigDataFileShare">,
 <Datastore title:"/cloudStores/cloud_store" type:"cloudStore">]

Get data for analysis

Adding a big data file share to the Geoanalytics server adds a corresponding big data file share item on the portal. We can search for these types of items using the item_type parameter.

Input
aqs_data = gis.content.search("bigDataFileShares_GA_Data", item_type = "big data file share")[0]
aqs_data
Output
bigDataFileShares_GA_Data
Big Data File Share by arcgis_python
Last Modified: May 27, 2021
0 comments, 0 views

Querying the layers property of the item returns a featureLayer representing the data.

Input
air_lyr = aqs_data.layers[0]
Input
air_lyr.properties
Output
{
  "dataStoreID": "4f498adb-8e5a-4b7e-96d1-9accacb26a50",
  "fields": [
    {
      "name": "State Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "County Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Site Num",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Parameter Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "POC",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Latitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Longitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Datum",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Parameter Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Time Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date GMT",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Time GMT",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Sample Measurement",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Units of Measure",
      "type": "esriFieldTypeString"
    },
    {
      "name": "MDL",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Uncertainty",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Qualifier",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Method Type",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Method Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Method Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "State Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "County Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date of Last Change",
      "type": "esriFieldTypeString"
    }
  ],
  "name": "air_quality",
  "geometryType": "esriGeometryPoint",
  "type": "featureClass",
  "spatialReference": {
    "wkid": 102682,
    "latestWkid": 3452
  },
  "geometry": {
    "fields": [
      {
        "name": "Longitude",
        "formats": [
          "x"
        ]
      },
      {
        "name": "Latitude",
        "formats": [
          "y"
        ]
      }
    ]
  },
  "time": {
    "timeType": "instant",
    "timeReference": {
      "timeZone": "UTC"
    },
    "fields": [
      {
        "name": "Date Local",
        "formats": [
          "yyyy-MM-dd"
        ]
      }
    ]
  },
  "currentVersion": 10.81,
  "children": []
}

Now we'll search for a feature layer depicting all the counties in the United States. We'll use the county polygons later in the notebook.

Input
usa_counties = gis.content.get('5c6ef8ef57934990b543708f815d606e')
Input
usa_counties
Output
usaCounties
Feature Layer Collection by api_data_owner
Last Modified: May 27, 2021
0 comments, 2 views

We will use the first item for our analysis. Since the item is a Feature Layer Collection, accessing the layers property will give us a list of Feature layer objects.

Input
usa_counties_lyr = usa_counties.layers[0]

Uncover patterns in data

Describe data

The describe_dataset method provides an overview of big data. By default, the tool outputs a table layer containing calculated field statistics and a dictionary outlining geometry and time settings for the input layer.

Optionally, the tool can output a feature layer representing a sample set of features using the sample_size parameter, or a single polygon feature layer representing the input feature layers' extent by setting the extent_output parameter to True.

Input
from arcgis.geoanalytics.summarize_data import describe_dataset
Input
description = describe_dataset(input_layer=air_lyr,
                               extent_output=True,
                               sample_size=1000,
                               output_name="Description of air quality data" + str(dt.now().microsecond),
                               return_tuple=True)
Input
description.output_json
Output
{'datasetName': 'air_quality',
 'datasetSource': 'Big Data File Share - Air_Auality_17_18_19',
 'recordCount': 147535365,
 'geometry': {'geometryType': 'Point',
  'sref': {'wkid': 4326},
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'spatialExtent': {'xmin': -161.767,
   'ymin': 17.953006,
   'xmax': -65.915482,
   'ymax': 64.84568999999999}},
 'time': {'timeType': 'Instant',
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'temporalExtent': {'start': '2017-01-01 00:00:00.000',
   'end': '2019-04-30 00:00:00.000'}}}

We can also use sql queries to return a subset of records by leveraging the ArcGIS API for Python's Feature Layer object itself. When you run a query() on a FeatureLayer, you get back a FeatureSet object. Calling the sdf property of the FeatureSet returns a Spatially Enabled DataFrame object.

Input
description.sample_layer.query().sdf
Output
State_Code County_Code Site_Num Parameter_Code POC Latitude Longitude Datum Parameter_Name Date_Local ... Method_Type Method_Code Method_Name State_Name County_Name Date_of_Last_Change INSTANT_DATETIME globalid OBJECTID SHAPE
0 56 35 700 62201 1 42.486361 -110.098861 WGS84 Relative Humidity 2017-07-05 ... Non-FRM 60 Instrumental - Vaisala 435C RH/AT Sensor Wyoming Sublette 2017-11-20 2017-07-05 {4CB48700-32CB-6126-E342-B783681A14AF} 1 {"x": -110.098861, "y": 42.486360999999995, "s...
1 9 9 27 88313 1 41.301400 -72.902871 WGS84 Black Carbon PM2.5 at 880 nm 2017-06-08 ... Non-FRM 894 Magee AE33/ TAPI M633 Aethalometer - Optical a... Connecticut New Haven 2018-01-09 2017-06-08 {3D99EDC1-EC50-6B1A-508F-A0034CD9DB02} 2 {"x": -72.90287099999999, "y": 41.3014, "spati...
2 45 79 7 42601 2 34.093959 -80.962304 WGS84 Nitric oxide (NO) 2018-06-28 ... Non-FRM 674 Instrumental - Chemiluminescence Thermo Electr... South Carolina Richland 2019-02-18 2018-06-28 {04ED8FBB-2ADF-AA44-BE30-F61E7A939657} 3 {"x": -80.962304, "y": 34.093959, "spatialRefe...
3 48 183 1 62101 1 32.378682 -94.711811 WGS84 Outdoor Temperature 2018-10-18 ... Non-FRM 40 INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. Texas Gregg 2019-02-18 2018-10-18 {60E09F2E-891F-EEBD-1253-0FFA407BDEA4} 4 {"x": -94.711811, "y": 32.378682, "spatialRefe...
4 19 153 30 44201 1 41.603159 -93.643118 WGS84 Ozone 2017-12-23 ... FEM 47 INSTRUMENTAL - ULTRA VIOLET Iowa Polk 2018-01-12 2017-12-23 {73E73030-0883-7A32-ED9C-BD59ACC00A2F} 5 {"x": -93.643118, "y": 41.603159, "spatialRefe...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
995 27 53 962 61104 1 44.965242 -93.254759 NAD83 Wind Direction - Resultant 2018-07-15 ... Non-FRM 63 Instrumental - Climatronics Minnesota Hennepin 2019-01-17 2018-07-15 {61736B2D-7C20-DF9E-37BA-21CF788F8FDD} 1003 {"x": -93.25475899999999, "y": 44.965241999999...
996 48 43 101 44201 1 29.302650 -103.177810 WGS84 Ozone 2018-02-21 ... FEM 47 INSTRUMENTAL - ULTRA VIOLET Texas Brewster 2018-04-19 2018-02-21 {E739DA0C-7A97-8059-72A3-6505490F990D} 1004 {"x": -103.17781, "y": 29.30265, "spatialRefer...
997 19 163 17 62101 1 41.467236 -90.688451 WGS84 Outdoor Temperature 2017-07-16 ... Non-FRM 40 INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. Iowa Scott 2017-08-10 2017-07-16 {1B805C6B-7621-B9B2-9C25-E35DC16D466E} 1006 {"x": -90.688451, "y": 41.467236, "spatialRefe...
998 39 145 21 62101 1 38.600611 -82.829782 NAD83 Outdoor Temperature 2017-12-20 ... Non-FRM 40 INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. Ohio Scioto 2018-01-12 2017-12-20 {965A3811-00DB-88A4-A734-959041D2CF67} 1007 {"x": -82.829782, "y": 38.600611, "spatialRefe...
999 6 31 4 61103 1 36.102244 -119.565650 NAD83 Wind Speed - Resultant 2017-09-28 ... Non-FRM 20 INSTRUMENTAL - VECTOR SUMMATION California Kings 2018-03-05 2017-09-28 {85F9DBB0-C17B-A74A-2717-29AACFCDB63B} 1008 {"x": -119.56565, "y": 36.102244, "spatialRefe...

1000 rows × 28 columns

Input
m1 = gis.map('USA')
m1
Output

Locations of the Air pollution monitors

Input
m1.add_layer(description.sample_layer)
Input
m1.zoom_to_layer(description.sample_layer)

Commonly used methods of measurement

The 'Method Name' attribute contains information about the type of instrument used for measurement. 'Parameter Name' attribute tells about the name or description assigned in AQS to the parameter measured by the monitor. For more details, read here.

The function below groups data by both these parameters. This will help us know the most common type of instrument used to measure each type of parameter.

Input
def measurement_type():
    from datetime import datetime as dt
    # Load the big data file share layer into a DataFrame
    df = layers[0]
    out = df.groupBy('Method Name','Parameter Name').count()
    out.write.format("webgis").save("common_method_type" + str(dt.now().microsecond)) # Write the final result to our datastore.

The run_python_script method executes a Python script directly in an ArcGIS GeoAnalytics server site . The script can create an analysis pipeline by chaining together multiple GeoAnalytics tools without writing intermediate results to a data store. The tool can also distribute Python functionality across the GeoAnalytics server site.

When using the geoanalytics and pyspark packages, most functions return analysis results as Spark DataFrame memory structures. You can write these data frames to a data store or process them in a script. This lets you chain multiple geoanalytics and pyspark tools while only writing out the final result, eliminating the need to create any bulky intermediate result layers.

Input
run_python_script(code=measurement_type, layers=[air_lyr])

The result is saved as a feature layer. We can Search for the saved item using the search() method. Providing the search keyword same as the name we used for writing the result will retrieve the layer.

Input
method_item = gis.content.search('common_method_type')[0]

Accessing the tables property of the item will give us the tables object. We will then use query() method to read the table as spatially enabled dataframe.

Input
method_df = method_item.tables[0].query(as_df=True)

Sort the values in the decreasing order of the count field.

Input
method_df.sort_values(by='count', ascending=False)
Output
Method_Name Parameter_Name count globalid OBJECTID
89 INSTRUMENTAL - ULTRA VIOLET ABSORPTION Ozone 10650047 {1C333EB9-AB7D-9ACF-3116-09924757FD90} 112
6 INSTRUMENTAL - ELECTRONIC OR MACHINE AVG. Outdoor Temperature 9808818 {17DCDDD0-079F-5BF2-45C3-0BF485355EB0} 8
158 INSTRUMENTAL - ULTRA VIOLET Ozone 8634142 {EDA2D065-CE9E-2175-F094-8D9C2F6DCEBD} 198
290 INSTRUMENTAL - VECTOR SUMMATION Wind Direction - Resultant 7348868 {7154F73E-775C-4C1A-8775-FB99B0074348} 503
277 INSTRUMENTAL - VECTOR SUMMATION Wind Speed - Resultant 7267098 {4CCC027A-BABE-DF05-FC6B-9B04F013AF14} 455
... ... ... ... ... ...
100 Cooper Environmental Services model Xact 620 -... Vanadium PM10 LC 105 {ABFA136A-BE6E-79A7-FCA4-608CFABCB77F} 126
234 Cooper Environmental Services model Xact 620 -... Tin PM10 LC 105 {A5B11059-6FD5-EC9A-65E4-6D4B06B25FFF} 339
245 Cooper Environmental Services model Xact 620 -... Silver PM10 LC 105 {E56280AE-5F42-AD31-2D95-C3418CB40985} 356
12 Cooper Environmental Services model Xact 620 -... Calcium PM10 LC 105 {6FC4B1BA-A7E4-A69C-BB44-CC26A6FD935B} 17
38 The PM2.5 - Local Conditions 1 {E955A26F-4FC1-E8A3-40F1-D85F48D15ED9} 49

338 rows × 5 columns

The table above shows that Ozone is measure using ULTRA VIOLET ABSORPTION method. We can filter the dataframe and search for the ones we are interested in.

Average PM 2.5 value by county

The function below filters the data by rows that give information about PM2.5 pollutant. To find the average PM2.5 value of each county, we will use join_features tool. Finally, we will write the output to the datastore.

Input
def average():
    from datetime import datetime as dt
    df = layers[0]
    df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
    res = geoanalytics.join_features(target_layer=layers[1], 
                                     join_layer=df, 
                                     join_operation="JoinOneToOne",
                                     summary_fields=[{'statisticType' : 'mean', 'onStatisticField' : 'Sample Measurement'}],
                                     spatial_relationship='Contains')
    res.write.format("webgis").save("average_pm_by_county" + str(dt.now().microsecond))
Input
run_python_script(average, [air_lyr, usa_counties_lyr])
Input
average_pm_by_county = gis.content.search('average_pm_by_county')[0]
Input
average_pm_by_county
Output
average_pm_by_county63006
Feature Layer Collection by admin
Last Modified: July 02, 2020
0 comments, 0 views
Input
avg_pm = average_pm_by_county.layers[0]
Input
avg_pm.query(as_df=True).columns
Output
Index(['fid', 'state_name', 'fips', 'population', 'pop_sqmi', 'SHAPE__Length1',
       'SHAPE__Area1', 'COUNT', 'MEAN_Sample_Measurement', 'globalid',
       'OBJECTID', 'SHAPE'],
      dtype='object')
Input
m2 = gis.map('USA')
m2
Output
Input
m2.add_layer(avg_pm, {"type": "FeatureLayer",
                      "renderer":"ClassedColorRenderer",
                      "field_name":"MEAN_Sample_Measurement",
                      "class_breaks": 6})
Input
m2.zoom_to_layer(avg_pm)
m2.legend=True

Prepare time series data

We have observed that the data is spread across US and comes from multiple stations. So we will create a dataframe that contains data points from one station. Additionally, for the purpose of sample, we will only use 2017 and 2018 data to train our model and foreacst on 2019 data.

The function below creates a column that gives a unique station id to each row of the data. We will then filter by one station id. The timeseries data is expected to have time-dependent variables as attributes of dataframe. For this, we will pivot the table along 'Parameter Name' column.

Input
def data_processsing():
    from datetime import datetime as dt
    import pyspark.sql.functions as F
    from pyspark.sql.functions import concat, col, lit
    # Load the big data file share layer into a DataFrame.
    df = layers[0] #converts feature layer to spark dataframe
    cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']
    df = df.select(cols) #create a subset of the dataset with only selected columns
    df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))
    df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))
    df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))
    df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))
#     drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']
    df = df.drop('Site_Num', 'County_Code', 'Staate_Code', 'Site Num', 'County Code', 'State Code')
    df = df.withColumn('datetime', concat(col("Date Local"), lit(" "), col("Time Local")))
#     drop_cols = ['Time Local', 'Date Local']
    df = df.drop('Time Local', 'Date Local')
    df = df.filter(df.unique_id == df.first().unique_id) #filter by only one station
    # group the dataframe by datetime,unique_id field and pivot the table to get variables needed for prediction as columns  
    df = df.groupby(df['datetime'], df['unique_id']).pivot("Parameter Name").avg("Sample Measurement")

    df.write.format("webgis").save("timeseries_data_17_18_19_1station" + str(dt.now().microsecond))
Input
run_python_script(code=data_processsing, layers=[air_lyr], gis=gis)
Input
data = gis.content.search('timeseries_data_17_18_19_1station')[0]
Input
data
Output
timeseries_data_17_18_19_1station
Table Layer by admin
Last Modified: December 07, 2020
0 comments, 2 views
Input
series_data = data.tables[0]
Input
series_data.query(as_df=True)[:5]
Output
Barometric_pressure Carbon_monoxide Nitric_oxide__NO_ Nitrogen_dioxide__NO2_ OBJECTID Outdoor_Temperature Oxides_of_nitrogen__NOx_ Ozone PM10_Total_0_10um_STP PM2_5___Local_Conditions Reactive_oxides_of_nitrogen__NOy_ Relative_Humidity Sulfur_dioxide Wind_Direction___Resultant Wind_Speed___Resultant datetime globalid unique_id
0 1004.3 0.133 0.80 1.8 1 54.3 2.6 0.039 NaN 3.6 2.5 35.0 0.3 73.0 4.4 2017-02-13 10:00 {35E5A035-34CC-85FB-4955-22F1325C9A40} 010730023
1 996.3 NaN 0.15 12.3 2 46.4 12.6 0.018 NaN 13.2 13.0 59.0 0.6 73.0 3.5 2017-12-14 22:00 {B078A8A2-237E-4732-4C33-62B9D9F39D57} 010730023
2 995.8 NaN 31.40 21.5 3 66.2 55.9 0.007 50.0 19.0 48.5 69.0 6.4 150.0 2.0 2017-05-26 06:00 {F45C8958-8061-FA8D-DDC8-9325DB224B77} 010730023
3 992.6 NaN -0.05 2.7 4 57.4 2.5 0.035 NaN NaN 2.9 56.0 0.5 280.0 5.9 2018-01-22 22:00 {85796BC1-2FE1-86C6-23BE-E70D9DABD1F7} 010730023
4 1007.0 0.148 0.15 6.9 5 46.2 7.0 0.035 NaN 1.7 6.5 41.0 0.0 119.0 3.3 2017-02-09 17:00 {61606AF2-0DC2-52DF-3B01-B979C11CB37E} 010730023

Predict PM2.5 using Facebook's Prophet model

The functon below uses pyspark pandas UDF function fb-prophet model to foreast pm2.5 hourly value for the month of 2019 January.

Input
def predict_pm25():
    #imports
    from arcgis.gis import GIS
    gis = GIS(profile="your_enterprise_portal")
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    import numpy as np
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0] #converts laayer into spark dataframe
    cols = ['Outdoor_Temperature', 'Ozone', 'PM10_Total_0_10um_STP',
        'PM2_5___Local_Conditions',
        'Wind_Direction___Resultant',
        'Wind_Speed___Resultant', 'datetime']
    df1 = df1.select(cols) #filter data by columns needed
    df1 = df1.withColumn('flag', lit(1))
    schema = StructType([StructField('ds', TimestampType(), True), #schema of the resulting dataframe
                         StructField('yhat_lower', FloatType(), True),
                         StructField('yhat_upper', FloatType(), True),
                         StructField('yhat', FloatType(), True),
                         StructField('y', FloatType(), True)])
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):
        #prepare data 
        df['Date'] = df['datetime'].astype('datetime64[ns]')
        df['year'] = df['Date'].dt.year
        df.set_index('Date', inplace=True) 
        df.sort_index(inplace=True)
        v = pd.date_range(start='2016-12-31 23:00:00', periods=18265, freq='H', closed='right') #get date range
        newdf = pd.DataFrame(index=v) 
        # Fill missing dates 
        historical=pd.merge(newdf, df, how='left', left_index=True, right_index=True)
        historical.interpolate(method='time', inplace=True)
        historical.reset_index(inplace=True)
        historical.rename(columns={'index': 'ds', 'PM2_5___Local_Conditions': 'y'}, inplace=True)
        historical.fillna(0, inplace=True)
        # handle zero and negative values for pm
        for i,item in enumerate(historical['y']):
            if item<=0:
                historical['y'].iloc[i]=historical['y'].iloc[i-1]
            else:
                historical['y'].iloc[i]=item
                
        for i,item in enumerate(historical['PM10_Total_0_10um_STP']):
            if item<=0:
                historical['PM10_Total_0_10um_STP'].iloc[i]=historical['PM10_Total_0_10um_STP'].iloc[i-1]
            else:
                historical['PM10_Total_0_10um_STP'].iloc[i]=item        
         
        for i,item in enumerate(historical['Wind_Speed___Resultant']):
            if item<=0:
                historical['Wind_Speed___Resultant'].iloc[i]=historical['Wind_Speed___Resultant'].iloc[i-1]
            else:
                historical['Wind_Speed___Resultant'].iloc[i]=item
        
        for i,item in enumerate(historical['Wind_Direction___Resultant']):
            if item<=0:
                historical['Wind_Direction___Resultant'].iloc[i]=historical['Wind_Direction___Resultant'].iloc[i-1]
            else:
                historical['Wind_Direction___Resultant'].iloc[i]=item
        # split data into train and test        
        train_df = historical[historical.year != 2019]
        test_df = historical[historical.year == 2019]
        test_df.drop(columns='y', inplace=True)        
        # train model    
        m = Prophet(daily_seasonality=True,
                    weekly_seasonality=True)
        m.add_regressor('PM10_Total_0_10um_STP')
        m.add_regressor('Wind_Speed___Resultant')
        m.add_regressor('Wind_Direction___Resultant')
        m.fit(train_df);
        # predict on test data
        forecast = m.predict(test_df)
        # save plots locally
        plot1 = m.plot(forecast);
        plot2 = m.plot_components(forecast);
        plot1.savefig(r'/home/ags/localdatastore/fbdata/forecast.png')
        plot2.savefig(r'/home/ags/localdatastore/fbdata/cmponents.png')
        # Uncomment the following lines if you want to publish and visualize your graphs as as item.
#         gis = GIS('https://machinename/portal', 'username', 'password')
#         gis.content.add(item_properties={"type": "Image", "title": "Forecast Plot"}, data=r"/home/ags/localdatastore/fbdata/forecast.png")
#         gis.content.add(item_properties={"type": "Image", "title": "Forecase Components2"}, data=r"/home/ags/localdatastore/fbdata/cmponents.png")
        # create df with actual and predicted fields
        cmp_df = forecast.set_index('ds')[['yhat', 'yhat_lower', 'yhat_upper']].join(historical.set_index('ds'))
        cmp_df.reset_index(inplace=True)
        cmp_df = cmp_df[['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y']]
        return cmp_df
    res = df1.groupby(['flag']).apply(forecast_pm25)

    res.write.format("webgis").save("predicted_results_on_test_data" + str(dt.now().microsecond))
Input
run_python_script(code=predict_pm25, layers=[series_data])
Input
predicted_item = gis.content.search('predicted_results_on_test_data854829')[0]
Input
predicted_item
Output
predicted_results_on_test_data854829
Table Layer by admin
Last Modified: June 24, 2020
0 comments, 366 views
Input
predicted_df = predicted_item[0].tables[0].query().sdf
Input
predicted_df.columns
Output
Index(['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y', 'globalid', 'OBJECTID'], dtype='object')
Input
predicted_df.head()
Output
ds yhat_lower yhat_upper yhat y globalid OBJECTID
0 2018-12-31 18:30:00 1.357769 15.941110 8.691076 5.5 {7BCD0AF8-F5B1-4A7E-7118-88589E0A4DAD} 44
1 2018-12-31 19:30:00 1.141512 16.265352 8.773848 5.7 {747EF178-8A36-5CCC-74A3-4BAD71DB0553} 244
2 2018-12-31 20:30:00 0.389374 15.927957 7.641061 5.4 {7D6C9381-7279-AB10-C5F7-AE138BFD6AE0} 444
3 2018-12-31 21:30:00 -0.164141 15.033084 7.111085 4.4 {B9BD44CF-F794-87F3-E389-E654CDCE8844} 644
4 2018-12-31 22:30:00 -0.587776 14.489634 7.002420 4.4 {B7C8C5C7-1C9E-E66A-CA41-39E5F0D601FA} 844

In the above table, y attribute shows the actual value of PM2.5 and yhat shows the values predicted by the trained model.

Visualize result on Dashboard

The arcgis.apps module includes Dashboard submodule to create dashboards programmatically. The dashboard submodule contains classes for different widgets which can be configured and be used to publish a dashboard.

We want to visualize our predicted results on a dashboard. To learn more about Dashboards module, visit guide here.

Input
from arcgis.apps.dashboard import SerialChart, add_column, add_row
from arcgis.apps.dashboard import Dashboard
from arcgis.apps.dashboard import SerialChart
Input
chart = SerialChart(predicted_item,  #Create a serial chart
                    categories_from="features", 
                    title="Forecast of PM2.5 for Janumary 2019") 

chart.data.category_field = "ds" #set category field

chart.category_axis.title = "datetime" #set title for x axis

chart.value_axis.title = "pm2.5" #set title for y axis

#set fields to visualize on y axis
chart.data.add_value_field('y', line_color='#CB4335') 
chart.data.add_value_field('yhat', line_color='#2980B9')
chart.data.add_value_field('yhat_upper', line_color='#CACFD2')
chart.data.add_value_field('yhat_lower', line_color='#CACFD2')

chart.legend.visibility = True 
chart.legend.placement = "side"
chart.category_axis.minimum_period = 'hours'
chart.data.labels = False
chart.data.hover_text = True
chart.data.hover_text = True
chart
Output
Input
agol_gis = GIS('home')
Input
dashboard = Dashboard() #creates a Dashboard object
dashboard.layout = add_row([a_chart]) #adds one chat to the Dashboard
dashboard.save('pm2.5_dashboard_2019_jan',  #publishes the dashboard to the portal
               gis=agol_gis)
Output
pm2.5_dashboard_2019_jan
Dashboard by api_data_owner
Last Modified: December 02, 2020
0 comments, 0 views

Conclusion

In this notebook, we learnt how to use spark with geoanalytics server in order to carry out distributed computation of big data analysis.

References

Your browser is no longer supported. Please upgrade your browser for the best experience. See our browser deprecation post for more details.