Forecasting PM2.5 using big data analysis

Prerequisites

  • The tools available in the ArcGIS API for Python geoanalytics module require an ArcGIS Enterprise licensed and configured with the linux based ArcGIS GeoAnalytics server.
  • When ArcGIS GeoAnalytics Server is installed on Linux, additional configuration steps are required before using the RunPythonScript operation. Install and configure Python 3.6 for Linux on each machine in your GeoAnalytics Server site, ensuring that Python is installed into the same directory on each machine. For this analysis, you also need to install FB Prophet library in the same environment. Then, update the ArcGIS Server Properties on your GeoAnalytics Server site with the pysparkPython property. The value of this property should be the path to the Python executable on your GeoAnalytics Server machines, for example, {"pysparkPython":"path/to/environment"}.

Introduction

While the arcgis.geoanalytics module offers powerful spatial analysis tools, the pyspark package includes dozens of non-spatial distributed tools for classification, prediction, clustering, and more. Using the power of distributed compute, we can analyze large datasets much faster than other non-distributed systems.

This notebook demonstrates the capability of spark-powered geoanalytics server to forecast hourly PM2.5 given the historic time series data for more than one time-dependent variable. The most common factors in the weather environment used in this analysis are PM2.5, PM10, wind speed, wind direction, and relative humidity. The levels of these pollutants are measured by the US Environmental Protection Agency (EPA), which controls overall air quality. We have used the dataset provided by EPA. With Spark we will learn how to customize and extend our analysis capabilities by:

  • Querying and summarizing your data using SQL
  • Turning analysis workflows into pipelines of GeoAnalytics tools
  • Modeling data with included machine learning libraries

The forecasted result will then be displayed on a dashboard using the recently introduced arcgis.dashboard module in ArcGIS API for Python.

Note:

  • The ability to perform big data analysis is only available on ArcGIS Enterprise licensed with a GeoAnalytics server and not yet available on ArcGIS Online.

Necessary imports

from datetime import datetime as dt
import pandas as pd

import arcgis
from arcgis.gis import GIS
from arcgis.geoanalytics.manage_data import run_python_script

Connect to your ArcGIS Enterprise organization

gis = GIS('https://pythonapi.playground.esri.com/portal', 'arcgis_python', 'amazing_arcgis_123')

Ensure your GIS supports GeoAnalytics

After connecting to the Enterprise organization, we need to ensure an ArcGIS Enterprise GIS is set up with a licensed GeoAnalytics server. To do so, we will call the is_supported() method.

arcgis.geoanalytics.is_supported()
True

Prepare data

To register a file share or an HDFS, we need to format datasets as subfolders within a single parent folder and register the parent folder. This parent folder becomes a datastore, and each subfolder becomes a dataset. Our folder hierarchy would look like below:

Learn more about preparing your big data file share datasets here.

The get_datastores() method of the geoanalytics module returns a DatastoreManager object that lets you search for and manage the big data file share items as Python API Datastore objects on your GeoAnalytics server.

bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager
<DatastoreManager for https://pythonapi.playground.esri.com/ga/admin>

We will register air quality data as a big data file share using the add_bigdata() function on a DatastoreManager object.

When we register a directory, all subdirectories under the specified folder are also registered with the server. Always register the parent folder (for example, \machinename\mydatashare) that contains one or more individual dataset folders as the big data file share item. To learn more, see register a big data file share

Note: You cannot browse directories in ArcGIS Server Manager. You must provide the full path to the folder you want to register, for example, \myserver\share\bigdata. Avoid using local paths, such as C:\bigdata, unless the same data folder is available on all nodes of the server site.

# data_item = bigdata_datastore_manager.add_bigdata("air_quality_17_18_19", r"/mnt/network/data")
bigdata_fileshares = bigdata_datastore_manager.search()
bigdata_fileshares
[<Datastore title:"/bigDataFileShares/NYC_taxi_data15" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/GA_Data" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/ServiceCallsOrleansTest" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/calls" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/all_hurricanes" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/NYCdata" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_1848_1900" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/ServiceCallsOrleans" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_dask_csv" type:"bigDataFileShare">,
 <Datastore title:"/bigDataFileShares/hurricanes_dask_shp" type:"bigDataFileShare">,
 <Datastore title:"/cloudStores/cloud_store" type:"cloudStore">]

Get data for analysis

Adding a big data file share to the Geoanalytics server adds a corresponding big data file share item on the portal. We can search for these types of items using the item_type parameter.

aqs_data = gis.content.search("bigDataFileShares_GA_Data", item_type = "big data file share")[0]
aqs_data
bigDataFileShares_GA_Data
Big Data File Share by arcgis_python
Last Modified: May 27, 2021
0 comments, 0 views

Querying the layers property of the item returns a featureLayer representing the data.

air_lyr = aqs_data.layers[0]
air_lyr.properties
{
  "dataStoreID": "4f498adb-8e5a-4b7e-96d1-9accacb26a50",
  "fields": [
    {
      "name": "State Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "County Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Site Num",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Parameter Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "POC",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Latitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Longitude",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Datum",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Parameter Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Time Local",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date GMT",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Time GMT",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Sample Measurement",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Units of Measure",
      "type": "esriFieldTypeString"
    },
    {
      "name": "MDL",
      "type": "esriFieldTypeDouble"
    },
    {
      "name": "Uncertainty",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Qualifier",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Method Type",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Method Code",
      "type": "esriFieldTypeInteger"
    },
    {
      "name": "Method Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "State Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "County Name",
      "type": "esriFieldTypeString"
    },
    {
      "name": "Date of Last Change",
      "type": "esriFieldTypeString"
    }
  ],
  "name": "air_quality",
  "geometryType": "esriGeometryPoint",
  "type": "featureClass",
  "spatialReference": {
    "wkid": 102682,
    "latestWkid": 3452
  },
  "geometry": {
    "fields": [
      {
        "name": "Longitude",
        "formats": [
          "x"
        ]
      },
      {
        "name": "Latitude",
        "formats": [
          "y"
        ]
      }
    ]
  },
  "time": {
    "timeType": "instant",
    "timeReference": {
      "timeZone": "UTC"
    },
    "fields": [
      {
        "name": "Date Local",
        "formats": [
          "yyyy-MM-dd"
        ]
      }
    ]
  },
  "currentVersion": 10.81,
  "children": []
}

Now we'll search for a feature layer depicting all the counties in the United States. We'll use the county polygons later in the notebook.

usa_counties = gis.content.get('5c6ef8ef57934990b543708f815d606e')
usa_counties
usaCounties
Feature Layer Collection by api_data_owner
Last Modified: May 27, 2021
0 comments, 2 views

We will use the first item for our analysis. Since the item is a Feature Layer Collection, accessing the layers property will give us a list of Feature layer objects.

usa_counties_lyr = usa_counties.layers[0]

Uncover patterns in data

Describe data

The describe_dataset method provides an overview of big data. By default, the tool outputs a table layer containing calculated field statistics and a dictionary outlining geometry and time settings for the input layer.

Optionally, the tool can output a feature layer representing a sample set of features using the sample_size parameter, or a single polygon feature layer representing the input feature layers' extent by setting the extent_output parameter to True.

from arcgis.geoanalytics.summarize_data import describe_dataset
description = describe_dataset(input_layer=air_lyr,
                               extent_output=True,
                               sample_size=1000,
                               output_name="Description of air quality data" + str(dt.now().microsecond),
                               return_tuple=True)
description.output_json
{'datasetName': 'air_quality',
 'datasetSource': 'Big Data File Share - Air_Auality_17_18_19',
 'recordCount': 147535365,
 'geometry': {'geometryType': 'Point',
  'sref': {'wkid': 4326},
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'spatialExtent': {'xmin': -161.767,
   'ymin': 17.953006,
   'xmax': -65.915482,
   'ymax': 64.84568999999999}},
 'time': {'timeType': 'Instant',
  'countNonEmpty': 147535365,
  'countEmpty': 0,
  'temporalExtent': {'start': '2017-01-01 00:00:00.000',
   'end': '2019-04-30 00:00:00.000'}}}

We can also use sql queries to return a subset of records by leveraging the ArcGIS API for Python's Feature Layer object itself. When you run a query() on a FeatureLayer, you get back a FeatureSet object. Calling the sdf property of the FeatureSet returns a Spatially Enabled DataFrame object.

description.sample_layer.query().sdf
State_CodeCounty_CodeSite_NumParameter_CodePOCLatitudeLongitudeDatumParameter_NameDate_Local...Method_TypeMethod_CodeMethod_NameState_NameCounty_NameDate_of_Last_ChangeINSTANT_DATETIMEglobalidOBJECTIDSHAPE
0563570062201142.486361-110.098861WGS84Relative Humidity2017-07-05...Non-FRM60Instrumental - Vaisala 435C RH/AT SensorWyomingSublette2017-11-202017-07-05{4CB48700-32CB-6126-E342-B783681A14AF}1{"x": -110.098861, "y": 42.486360999999995, "s...
1992788313141.301400-72.902871WGS84Black Carbon PM2.5 at 880 nm2017-06-08...Non-FRM894Magee AE33/ TAPI M633 Aethalometer - Optical a...ConnecticutNew Haven2018-01-092017-06-08{3D99EDC1-EC50-6B1A-508F-A0034CD9DB02}2{"x": -72.90287099999999, "y": 41.3014, "spati...
24579742601234.093959-80.962304WGS84Nitric oxide (NO)2018-06-28...Non-FRM674Instrumental - Chemiluminescence Thermo Electr...South CarolinaRichland2019-02-182018-06-28{04ED8FBB-2ADF-AA44-BE30-F61E7A939657}3{"x": -80.962304, "y": 34.093959, "spatialRefe...
348183162101132.378682-94.711811WGS84Outdoor Temperature2018-10-18...Non-FRM40INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.TexasGregg2019-02-182018-10-18{60E09F2E-891F-EEBD-1253-0FFA407BDEA4}4{"x": -94.711811, "y": 32.378682, "spatialRefe...
4191533044201141.603159-93.643118WGS84Ozone2017-12-23...FEM47INSTRUMENTAL - ULTRA VIOLETIowaPolk2018-01-122017-12-23{73E73030-0883-7A32-ED9C-BD59ACC00A2F}5{"x": -93.643118, "y": 41.603159, "spatialRefe...
..................................................................
995275396261104144.965242-93.254759NAD83Wind Direction - Resultant2018-07-15...Non-FRM63Instrumental - ClimatronicsMinnesotaHennepin2019-01-172018-07-15{61736B2D-7C20-DF9E-37BA-21CF788F8FDD}1003{"x": -93.25475899999999, "y": 44.965241999999...
996484310144201129.302650-103.177810WGS84Ozone2018-02-21...FEM47INSTRUMENTAL - ULTRA VIOLETTexasBrewster2018-04-192018-02-21{E739DA0C-7A97-8059-72A3-6505490F990D}1004{"x": -103.17781, "y": 29.30265, "spatialRefer...
997191631762101141.467236-90.688451WGS84Outdoor Temperature2017-07-16...Non-FRM40INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.IowaScott2017-08-102017-07-16{1B805C6B-7621-B9B2-9C25-E35DC16D466E}1006{"x": -90.688451, "y": 41.467236, "spatialRefe...
998391452162101138.600611-82.829782NAD83Outdoor Temperature2017-12-20...Non-FRM40INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.OhioScioto2018-01-122017-12-20{965A3811-00DB-88A4-A734-959041D2CF67}1007{"x": -82.829782, "y": 38.600611, "spatialRefe...
999631461103136.102244-119.565650NAD83Wind Speed - Resultant2017-09-28...Non-FRM20INSTRUMENTAL - VECTOR SUMMATIONCaliforniaKings2018-03-052017-09-28{85F9DBB0-C17B-A74A-2717-29AACFCDB63B}1008{"x": -119.56565, "y": 36.102244, "spatialRefe...

1000 rows × 28 columns

m1 = gis.map('USA')
m1

Locations of the Air pollution monitors

m1.add_layer(description.sample_layer)
m1.zoom_to_layer(description.sample_layer)

Commonly used methods of measurement

The 'Method Name' attribute contains information about the type of instrument used for measurement. 'Parameter Name' attribute tells about the name or description assigned in AQS to the parameter measured by the monitor. For more details, read here.

The function below groups data by both these parameters. This will help us know the most common type of instrument used to measure each type of parameter.

def measurement_type():
    from datetime import datetime as dt
    # Load the big data file share layer into a DataFrame
    df = layers[0]
    out = df.groupBy('Method Name','Parameter Name').count()
    out.write.format("webgis").save("common_method_type" + str(dt.now().microsecond)) # Write the final result to our datastore.

The run_python_script method executes a Python script directly in an ArcGIS GeoAnalytics server site . The script can create an analysis pipeline by chaining together multiple GeoAnalytics tools without writing intermediate results to a data store. The tool can also distribute Python functionality across the GeoAnalytics server site.

When using the geoanalytics and pyspark packages, most functions return analysis results as Spark DataFrame memory structures. You can write these data frames to a data store or process them in a script. This lets you chain multiple geoanalytics and pyspark tools while only writing out the final result, eliminating the need to create any bulky intermediate result layers.

run_python_script(code=measurement_type, layers=[air_lyr])

The result is saved as a feature layer. We can Search for the saved item using the search() method. Providing the search keyword same as the name we used for writing the result will retrieve the layer.

method_item = gis.content.search('common_method_type')[0]

Accessing the tables property of the item will give us the tables object. We will then use query() method to read the table as spatially enabled dataframe.

method_df = method_item.tables[0].query(as_df=True)

Sort the values in the decreasing order of the count field.

method_df.sort_values(by='count', ascending=False)
Method_NameParameter_NamecountglobalidOBJECTID
89INSTRUMENTAL - ULTRA VIOLET ABSORPTIONOzone10650047{1C333EB9-AB7D-9ACF-3116-09924757FD90}112
6INSTRUMENTAL - ELECTRONIC OR MACHINE AVG.Outdoor Temperature9808818{17DCDDD0-079F-5BF2-45C3-0BF485355EB0}8
158INSTRUMENTAL - ULTRA VIOLETOzone8634142{EDA2D065-CE9E-2175-F094-8D9C2F6DCEBD}198
290INSTRUMENTAL - VECTOR SUMMATIONWind Direction - Resultant7348868{7154F73E-775C-4C1A-8775-FB99B0074348}503
277INSTRUMENTAL - VECTOR SUMMATIONWind Speed - Resultant7267098{4CCC027A-BABE-DF05-FC6B-9B04F013AF14}455
..................
100Cooper Environmental Services model Xact 620 -...Vanadium PM10 LC105{ABFA136A-BE6E-79A7-FCA4-608CFABCB77F}126
234Cooper Environmental Services model Xact 620 -...Tin PM10 LC105{A5B11059-6FD5-EC9A-65E4-6D4B06B25FFF}339
245Cooper Environmental Services model Xact 620 -...Silver PM10 LC105{E56280AE-5F42-AD31-2D95-C3418CB40985}356
12Cooper Environmental Services model Xact 620 -...Calcium PM10 LC105{6FC4B1BA-A7E4-A69C-BB44-CC26A6FD935B}17
38ThePM2.5 - Local Conditions1{E955A26F-4FC1-E8A3-40F1-D85F48D15ED9}49

338 rows × 5 columns

The table above shows that Ozone is measure using ULTRA VIOLET ABSORPTION method. We can filter the dataframe and search for the ones we are interested in.

Average PM 2.5 value by county

The function below filters the data by rows that give information about PM2.5 pollutant. To find the average PM2.5 value of each county, we will use join_features tool. Finally, we will write the output to the datastore.

def average():
    from datetime import datetime as dt
    df = layers[0]
    df = df.filter(df['Parameter Name'] == 'PM2.5 - Local Conditions')
    res = geoanalytics.join_features(target_layer=layers[1], 
                                     join_layer=df, 
                                     join_operation="JoinOneToOne",
                                     summary_fields=[{'statisticType' : 'mean', 'onStatisticField' : 'Sample Measurement'}],
                                     spatial_relationship='Contains')
    res.write.format("webgis").save("average_pm_by_county" + str(dt.now().microsecond))
run_python_script(average, [air_lyr, usa_counties_lyr])
average_pm_by_county = gis.content.search('average_pm_by_county')[0]
average_pm_by_county
average_pm_by_county63006
Feature Layer Collection by admin
Last Modified: July 02, 2020
0 comments, 0 views
avg_pm = average_pm_by_county.layers[0]
avg_pm.query(as_df=True).columns
Index(['fid', 'state_name', 'fips', 'population', 'pop_sqmi', 'SHAPE__Length1',
       'SHAPE__Area1', 'COUNT', 'MEAN_Sample_Measurement', 'globalid',
       'OBJECTID', 'SHAPE'],
      dtype='object')
m2 = gis.map('USA')
m2
m2.add_layer(avg_pm, {"type": "FeatureLayer",
                      "renderer":"ClassedColorRenderer",
                      "field_name":"MEAN_Sample_Measurement",
                      "class_breaks": 6})
m2.zoom_to_layer(avg_pm)
m2.legend=True

Prepare time series data

We have observed that the data is spread across US and comes from multiple stations. So we will create a dataframe that contains data points from one station. Additionally, for the purpose of sample, we will only use 2017 and 2018 data to train our model and foreacst on 2019 data.

The function below creates a column that gives a unique station id to each row of the data. We will then filter by one station id. The timeseries data is expected to have time-dependent variables as attributes of dataframe. For this, we will pivot the table along 'Parameter Name' column.

def data_processsing():
    from datetime import datetime as dt
    import pyspark.sql.functions as F
    from pyspark.sql.functions import concat, col, lit
    # Load the big data file share layer into a DataFrame.
    df = layers[0] #converts feature layer to spark dataframe
    cols = ['Site Num', 'County Code', 'State Code', 'Date Local', 'Time Local', 'Parameter Name', 'Sample Measurement']
    df = df.select(cols) #create a subset of the dataset with only selected columns
    df = df.withColumn('Site_Num', F.lpad(df['Site Num'], 4, '0'))
    df = df.withColumn('County_Code', F.lpad(df['County Code'], 3, '0'))
    df = df.withColumn('State_Code', F.lpad(df['State Code'], 2, '0'))
    df = df.withColumn('unique_id', F.concat(F.col('State_Code'), F.col('County_Code'), F.col('Site_Num')))
#     drop_cols = ['Site_Num', 'County_Code', 'State_Code', 'Site Num', 'County Code', 'State Code']
    df = df.drop('Site_Num', 'County_Code', 'Staate_Code', 'Site Num', 'County Code', 'State Code')
    df = df.withColumn('datetime', concat(col("Date Local"), lit(" "), col("Time Local")))
#     drop_cols = ['Time Local', 'Date Local']
    df = df.drop('Time Local', 'Date Local')
    df = df.filter(df.unique_id == df.first().unique_id) #filter by only one station
    # group the dataframe by datetime,unique_id field and pivot the table to get variables needed for prediction as columns  
    df = df.groupby(df['datetime'], df['unique_id']).pivot("Parameter Name").avg("Sample Measurement")

    df.write.format("webgis").save("timeseries_data_17_18_19_1station" + str(dt.now().microsecond))
run_python_script(code=data_processsing, layers=[air_lyr], gis=gis)
data = gis.content.search('timeseries_data_17_18_19_1station')[0]
data
timeseries_data_17_18_19_1station
Table Layer by admin
Last Modified: December 07, 2020
0 comments, 2 views
series_data = data.tables[0]
series_data.query(as_df=True)[:5]
Barometric_pressureCarbon_monoxideNitric_oxide__NO_Nitrogen_dioxide__NO2_OBJECTIDOutdoor_TemperatureOxides_of_nitrogen__NOx_OzonePM10_Total_0_10um_STPPM2_5___Local_ConditionsReactive_oxides_of_nitrogen__NOy_Relative_HumiditySulfur_dioxideWind_Direction___ResultantWind_Speed___Resultantdatetimeglobalidunique_id
01004.30.1330.801.8154.32.60.039NaN3.62.535.00.373.04.42017-02-13 10:00{35E5A035-34CC-85FB-4955-22F1325C9A40}010730023
1996.3NaN0.1512.3246.412.60.018NaN13.213.059.00.673.03.52017-12-14 22:00{B078A8A2-237E-4732-4C33-62B9D9F39D57}010730023
2995.8NaN31.4021.5366.255.90.00750.019.048.569.06.4150.02.02017-05-26 06:00{F45C8958-8061-FA8D-DDC8-9325DB224B77}010730023
3992.6NaN-0.052.7457.42.50.035NaNNaN2.956.00.5280.05.92018-01-22 22:00{85796BC1-2FE1-86C6-23BE-E70D9DABD1F7}010730023
41007.00.1480.156.9546.27.00.035NaN1.76.541.00.0119.03.32017-02-09 17:00{61606AF2-0DC2-52DF-3B01-B979C11CB37E}010730023

Predict PM2.5 using Facebook's Prophet model

The functon below uses pyspark pandas UDF function fb-prophet model to foreast pm2.5 hourly value for the month of 2019 January.

def predict_pm25():
    #imports
    from arcgis.gis import GIS
    gis = GIS(profile="your_enterprise_portal")
    from datetime import datetime as dt
    from pyspark.sql.functions import concat, col, lit
    import pandas as pd
    import numpy as np
    from fbprophet import Prophet
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType, TimestampType
    import warnings
    warnings.filterwarnings('ignore')
    
    df1 = layers[0] #converts laayer into spark dataframe
    cols = ['Outdoor_Temperature', 'Ozone', 'PM10_Total_0_10um_STP',
        'PM2_5___Local_Conditions',
        'Wind_Direction___Resultant',
        'Wind_Speed___Resultant', 'datetime']
    df1 = df1.select(cols) #filter data by columns needed
    df1 = df1.withColumn('flag', lit(1))
    schema = StructType([StructField('ds', TimestampType(), True), #schema of the resulting dataframe
                         StructField('yhat_lower', FloatType(), True),
                         StructField('yhat_upper', FloatType(), True),
                         StructField('yhat', FloatType(), True),
                         StructField('y', FloatType(), True)])
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def forecast_pm25(df):
        #prepare data 
        df['Date'] = df['datetime'].astype('datetime64[ns]')
        df['year'] = df['Date'].dt.year
        df.set_index('Date', inplace=True) 
        df.sort_index(inplace=True)
        v = pd.date_range(start='2016-12-31 23:00:00', periods=18265, freq='H', closed='right') #get date range
        newdf = pd.DataFrame(index=v) 
        # Fill missing dates 
        historical=pd.merge(newdf, df, how='left', left_index=True, right_index=True)
        historical.interpolate(method='time', inplace=True)
        historical.reset_index(inplace=True)
        historical.rename(columns={'index': 'ds', 'PM2_5___Local_Conditions': 'y'}, inplace=True)
        historical.fillna(0, inplace=True)
        # handle zero and negative values for pm
        for i,item in enumerate(historical['y']):
            if item<=0:
                historical['y'].iloc[i]=historical['y'].iloc[i-1]
            else:
                historical['y'].iloc[i]=item
                
        for i,item in enumerate(historical['PM10_Total_0_10um_STP']):
            if item<=0:
                historical['PM10_Total_0_10um_STP'].iloc[i]=historical['PM10_Total_0_10um_STP'].iloc[i-1]
            else:
                historical['PM10_Total_0_10um_STP'].iloc[i]=item        
         
        for i,item in enumerate(historical['Wind_Speed___Resultant']):
            if item<=0:
                historical['Wind_Speed___Resultant'].iloc[i]=historical['Wind_Speed___Resultant'].iloc[i-1]
            else:
                historical['Wind_Speed___Resultant'].iloc[i]=item
        
        for i,item in enumerate(historical['Wind_Direction___Resultant']):
            if item<=0:
                historical['Wind_Direction___Resultant'].iloc[i]=historical['Wind_Direction___Resultant'].iloc[i-1]
            else:
                historical['Wind_Direction___Resultant'].iloc[i]=item
        # split data into train and test        
        train_df = historical[historical.year != 2019]
        test_df = historical[historical.year == 2019]
        test_df.drop(columns='y', inplace=True)        
        # train model    
        m = Prophet(daily_seasonality=True,
                    weekly_seasonality=True)
        m.add_regressor('PM10_Total_0_10um_STP')
        m.add_regressor('Wind_Speed___Resultant')
        m.add_regressor('Wind_Direction___Resultant')
        m.fit(train_df);
        # predict on test data
        forecast = m.predict(test_df)
        # save plots locally
        plot1 = m.plot(forecast);
        plot2 = m.plot_components(forecast);
        plot1.savefig(r'/home/ags/localdatastore/fbdata/forecast.png')
        plot2.savefig(r'/home/ags/localdatastore/fbdata/cmponents.png')
        # Uncomment the following lines if you want to publish and visualize your graphs as as item.
#         gis = GIS('https://machinename/portal', 'username', 'password')
#         gis.content.add(item_properties={"type": "Image", "title": "Forecast Plot"}, data=r"/home/ags/localdatastore/fbdata/forecast.png")
#         gis.content.add(item_properties={"type": "Image", "title": "Forecase Components2"}, data=r"/home/ags/localdatastore/fbdata/cmponents.png")
        # create df with actual and predicted fields
        cmp_df = forecast.set_index('ds')[['yhat', 'yhat_lower', 'yhat_upper']].join(historical.set_index('ds'))
        cmp_df.reset_index(inplace=True)
        cmp_df = cmp_df[['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y']]
        return cmp_df
    res = df1.groupby(['flag']).apply(forecast_pm25)

    res.write.format("webgis").save("predicted_results_on_test_data" + str(dt.now().microsecond))
run_python_script(code=predict_pm25, layers=[series_data])
predicted_item = gis.content.search('predicted_results_on_test_data854829')[0]
predicted_item
predicted_results_on_test_data854829
Table Layer by admin
Last Modified: June 24, 2020
0 comments, 366 views
predicted_df = predicted_item[0].tables[0].query().sdf
predicted_df.columns
Index(['ds', 'yhat_lower', 'yhat_upper', 'yhat', 'y', 'globalid', 'OBJECTID'], dtype='object')
predicted_df.head()
dsyhat_loweryhat_upperyhatyglobalidOBJECTID
02018-12-31 18:30:001.35776915.9411108.6910765.5{7BCD0AF8-F5B1-4A7E-7118-88589E0A4DAD}44
12018-12-31 19:30:001.14151216.2653528.7738485.7{747EF178-8A36-5CCC-74A3-4BAD71DB0553}244
22018-12-31 20:30:000.38937415.9279577.6410615.4{7D6C9381-7279-AB10-C5F7-AE138BFD6AE0}444
32018-12-31 21:30:00-0.16414115.0330847.1110854.4{B9BD44CF-F794-87F3-E389-E654CDCE8844}644
42018-12-31 22:30:00-0.58777614.4896347.0024204.4{B7C8C5C7-1C9E-E66A-CA41-39E5F0D601FA}844

In the above table, y attribute shows the actual value of PM2.5 and yhat shows the values predicted by the trained model.

Visualize result on Dashboard

The arcgis.apps module includes Dashboard submodule to create dashboards programmatically. The dashboard submodule contains classes for different widgets which can be configured and be used to publish a dashboard.

We want to visualize our predicted results on a dashboard. To learn more about Dashboards module, visit guide here.

from arcgis.apps.dashboard import SerialChart, add_column, add_row
from arcgis.apps.dashboard import Dashboard
from arcgis.apps.dashboard import SerialChart
chart = SerialChart(predicted_item,  #Create a serial chart
                    categories_from="features", 
                    title="Forecast of PM2.5 for Janumary 2019") 

chart.data.category_field = "ds" #set category field

chart.category_axis.title = "datetime" #set title for x axis

chart.value_axis.title = "pm2.5" #set title for y axis

#set fields to visualize on y axis
chart.data.add_value_field('y', line_color='#CB4335') 
chart.data.add_value_field('yhat', line_color='#2980B9')
chart.data.add_value_field('yhat_upper', line_color='#CACFD2')
chart.data.add_value_field('yhat_lower', line_color='#CACFD2')

chart.legend.visibility = True 
chart.legend.placement = "side"
chart.category_axis.minimum_period = 'hours'
chart.data.labels = False
chart.data.hover_text = True
chart.data.hover_text = True
chart
agol_gis = GIS('home')
dashboard = Dashboard() #creates a Dashboard object
dashboard.layout = add_row([a_chart]) #adds one chat to the Dashboard
dashboard.save('pm2.5_dashboard_2019_jan',  #publishes the dashboard to the portal
               gis=agol_gis)
pm2.5_dashboard_2019_jan
Dashboard by api_data_owner
Last Modified: December 02, 2020
0 comments, 0 views

Conclusion

In this notebook, we learnt how to use spark with geoanalytics server in order to carry out distributed computation of big data analysis.

References

Your browser is no longer supported. Please upgrade your browser for the best experience. See our browser deprecation post for more details.