Crime analysis and clustering using geoanalytics and pyspark.ml
Introduction¶
Many of the poorest neighborhoods in the City of Chicago face violent crimes. With rapid increase in crime, amount of crime data is also increasing. Thus, there is a strong need to identify crime patterns in order to reduce its occurrence. Data mining using some of the most powerful tools available in ArcGIS API for Python is an effective way to analyze and detect patterns in data. Through this sample, we will demonstrate the utility of a number of geoanalytics tools including find_hot_spots
, aggregate_points
and calculate_density
to visually understand geographical patterns.
The pyspark module
available through run_python_script
tool provides a collection of distributed analysis tools for data management, clustering, regression, and more. The run_python_script
task automatically imports the pyspark module
so you can directly interact with it. By calling this implementation of k-means in the run_python_script
tool, we will cluster crime data into a predefined number of clusters. Such clusters are also useful in identifying crime patterns.
Further, based on the results of the analysis, the segmented crime map can be used to help efficiently dispatch officers throughout a city.
Necessary Imports¶
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime as dt
import arcgis
import arcgis.geoanalytics
from arcgis.gis import GIS
from arcgis.geoanalytics.summarize_data import describe_dataset, aggregate_points
from arcgis.geoanalytics.analyze_patterns import calculate_density, find_hot_spots
from arcgis.geoanalytics.manage_data import clip_layer, run_python_script
Connect to your ArcGIS Enterprise Organization¶
agol_gis = GIS('home')
gis = GIS('https://pythonapi.playground.esri.com/portal', 'arcgis_python', 'amazing_arcgis_123')
Ensure your GIS supports GeoAnalytics¶
Before executing a tool, we need to ensure an ArcGIS Enterprise GIS is set up with a licensed GeoAnalytics server. To do so, call the is_supported() method after connecting to your Enterprise portal. See the Components of ArcGIS URLs documentation for details on the urls to enter in the GIS parameters based on your particular Enterprise configuration.
arcgis.geoanalytics.is_supported()
Prepare the data¶
To register a file share or an HDFS, we need to format datasets as subfolders within a single parent folder and register the parent folder. This parent folder becomes a datastore, and each subfolder becomes a dataset. Our folder hierarchy would look like below:
Learn more about preparing your big data file share datasets here.
Register a big data file share¶
The get_datastores()
method of the geoanalytics module returns a DatastoreManager
object that lets you search for and manage the big data file share items as Python API Datastore
objects on your GeoAnalytics server.
bigdata_datastore_manager = arcgis.geoanalytics.get_datastores()
bigdata_datastore_manager
We will register chicago crime data as a big data file share using the add_bigdata()
function on a DatastoreManager
object.
When we register a directory, all subdirectories under the specified folder are also registered with the server. Always register the parent folder (for example, \machinename\mydatashare) that contains one or more individual dataset folders as the big data file share item. To learn more, see register a big data file share.
Note: You cannot browse directories in ArcGIS Server Manager. You must provide the full path to the folder you want to register, for example, \myserver\share\bigdata. Avoid using local paths, such as C:\bigdata, unless the same data folder is available on all nodes of the server site.
# data_item = bigdata_datastore_manager.add_bigdata("Chicago_Crime_2001_2020", r"\\machine_name\data\chicago")
bigdata_fileshares = bigdata_datastore_manager.search(id='0e7a861d-c1c5-4acc-869d-05d2cebbdbee')
bigdata_fileshares
file_share_folder = bigdata_fileshares[0]
Once a big data file share is created, the GeoAnalytics server samples the datasets to generate a manifest, which outlines the data schema and specifies any time and geometry fields. A query of the resulting manifest returns each dataset's schema. This process can take a few minutes depending on the size of your data. Once processed, querying the manifest property returns the schema of the datasets in your big data file share.
manifest = file_share_folder.manifest['datasets'][1]
manifest
Get data for analysis¶
Adding a big data file share to the Geoanalytics server adds a corresponding big data file share item on the portal. We can search for these types of items using the item_type
parameter.
search_result = gis.content.search("bigDataFileShares_GA_Data", item_type = "big data file share")
search_result
ga_item = search_result[0]
ga_item
Querying the layers property of the item returns a featureLayer representing the data. The object is actually an API Layer object.
ga_item.layers
crime_lyr = ga_item.layers[1]
illinois_blk_grps = agol_gis.content.get('a11d886be35149cb9dab0f7aac75a2af')
illinois_blk_grps
blk_lyr = illinois_blk_grps.layers[0]
We will filter the blockgroups by 031 code which is county code for Chicago.
blk_lyr.filter = "COUNTYFP = '031'"
m2 = gis.map('chicago')
m2
m2.add_layer(blk_lyr)
Describe data¶
The describe_dataset
method provides an overview of big data. By default, the tool outputs a table layer containing calculated field statistics and a dict outlining geometry and time settings for the input layer.
Optionally, the tool can output a feature layer representing a sample set of features using the sample_size
parameter, or a single polygon feature layer representing the input feature layers' extent by setting the extent_output parameter
to True.
description = describe_dataset(input_layer=crime_lyr,
extent_output=True,
sample_size=1000,
output_name="Description of crime data" + str(dt.now().microsecond),
return_tuple=True)
description.output_json
sdf_desc_output = description.output.query(as_df=True)
sdf_desc_output.head()
description.sample_layer
sdf_slyr = description.sample_layer.query(as_df=True)
sdf_slyr.head()
m1 = gis.map('chicago')
m1
m1.add_layer(description.sample_layer)
m1.legend = True
Analyze patterns¶
The GeoAnalytics Tools use a process spatial reference during execution. Analyses with square or hexagon bins require a projected coordinate system. We'll use 26771 as seen from http://epsg.io/?q=illinois%20kind%3APROJCRS.
arcgis.env.process_spatial_reference = 26771
Aggregate points¶
We can use the aggregate_points
method in the arcgis.geoanalytics.summarize_data
submodule to group call features into individual block group features. The output polygon feature layer summarizes attribute information for all calls that fall within each block group. If no calls fall within a block group, that block group will not appear in the output.
The GeoAnalytics Tools use a process spatial reference during execution. Analyses with square or hexagon bins require a projected coordinate system. We'll use the World Cylindrical Equal Area projection (WKID 54034) below. All results are stored in the spatiotemporal datastore of the Enterprise in the WGS 84 Spatial Reference.
See the GeoAnalytics Documentation for a full explanation of analysis environment settings.
agg_result = aggregate_points(crime_lyr,
polygon_layer=blk_lyr,
output_name="aggregate results of crime" + str(dt.now().microsecond))
agg_result
m3 = gis.map('chicago')
m3
m3.add_layer(agg_result)
m3.legend = True
Calculate density¶
The calculate_density
method creates a density map from point features by spreading known quantities of some phenomenon (represented as attributes of the points) across the map. The result is a layer of areas classified from least dense to most dense. In this example, we will create density map by aggregating points within a bin of 1 kilometer. To learn more. please see here.
cal_density = calculate_density(crime_lyr,
weight='Uniform',
bin_type='Square',
bin_size=1,
bin_size_unit="Kilometers",
time_step_interval=1,
time_step_interval_unit="Years",
time_step_repeat_interval=1,
time_step_repeat_interval_unit="Months",
time_step_reference=dt(2001, 1, 1),
radius=1000,
radius_unit="Meters",
area_units='SquareKilometers',
output_name="calculate density of crime" + str(dt.now().microsecond))
m4 = gis.map('chicago')
m4
m4.add_layer(cal_density)
m4.legend = True
The find_hot_spots
tool analyzes point data and finds statistically significant spatial clustering of high (hot spots) and low (cold spots) numbers of incidents relative to the overall distribution of the data.
Find hot spots¶
The find_hot_spots
tool analyzes point data and finds statistically significant spatial clustering of high (hot spots) and low (cold spots) numbers of incidents relative to the overall distribution of the data.
hot_spots = find_hot_spots(crime_lyr,
bin_size=100,
bin_size_unit='Meters',
neighborhood_distance=250,
neighborhood_distance_unit='Meters',
output_name="get hot spot areas of crime" + str(dt.now().microsecond))
m5 = gis.map('chicago')
m5
m5.add_layer(hot_spots)
m5.legend = True
The darkest red features indicate areas where you can state with 99 percent confidence that the clustering of crime features is not the result of random chance but rather of some other variable that might be worth investigating. Similarly, the darkest blue features indicate that the lack of crime incidents is most likely not just random, but with 90% certainty you can state it is because of some variable in those locations. Features that are beige do not represent statistically significant clustering; the number of crimes could very likely be the result of random processes and random chance in those areas.
Use Spark Dataframe and Run Python Script¶
The run_python_script
method executes a Python script directly in an ArcGIS GeoAnalytics server site . The script can create an analysis pipeline by chaining together multiple GeoAnalytics tools without writing intermediate results to a data store. The tool can also distribute Python functionality across the GeoAnalytics server site.
Geoanalytics Server installs a Python 3.6 environment that this tool uses. The environment includes Spark 2.2.0
, the compute platform that distributes analysis across multiple cores of one or more machines in your GeoAnalytics Server site. The environment includes the pyspark module
which provides a collection of distributed analysis tools for data management, clustering, regression, and more. The run_python_script
task automatically imports the pyspark module
so you can directly interact with it.
When using the geoanalytics
and pyspark packages, most functions return analysis results as Spark DataFrame memory structures. You can write these data frames to a data store or process them in a script. This lets you chain multiple geoanalytics and pyspark tools while only writing out the final result, eliminating the need to create any bulky intermediate result layers. For more details, click here.
The Location Description field represents areas with the most common crime locations. We will write a function to group our data by location description. This will help us count the number of crimes occurring at each location type.
def groupby_description():
from datetime import datetime as dt
# crime data is stored in a feature service and accessed as a DataFrame via the layers object
df = layers[0]
# group the dataframe by Location Description field and count the number of crimes for each Location Description.
out = df.groupBy('Location Description').count()
# Write the final result to our datastore.
out.write.format("webgis").save("groupby_location_description" + str(dt.now().microsecond))
run_python_script(code=groupby_description, layers=[crime_lyr])
The result is saved as a feature layer. We can Search for the saved item using the search()
method. Providing the search keyword same as the name we used for writing the result will retrieve the layer.
groupby_description = gis.content.search('groupby_location_description')[0]
groupby_description_lyr = groupby_description.tables[0] #retrieve table from the item
groupby_description_df = groupby_description_lyr.query(as_df=True) #read layer as dataframe
groupby_description_df.sort_values(by='count', ascending=False, inplace=True) #sort count field in decreasing order
Location of crime¶
groupby_description_df[:10].plot(x='Location_Description',
y='count', kind='barh')
plt.xticks(
rotation=45,
horizontalalignment='center',
fontweight='light',
fontsize='medium',
);
Street is the most frequent location for crime occurrance.
The Primary Type field contains the type for the crime. Let's investigate the most frequent type of crime in the Chicago by writing our own function:
def groupby_texttype():
from datetime import datetime as dt
# crime data is stored in a feature service and accessed as a DataFrame via the layers object
df = layers[0]
# group the dataframe by TextType field and count the crime incidents for each crime type.
out = df.groupBy('Primary Type').count()
# Write the final result to our datastore.
out.write.format("webgis").save("groupby_type_of_crime" + str(dt.now().microsecond))
run_python_script(code=groupby_texttype, layers=[crime_lyr])
groupby_texttype = gis.content.search('groupby_type_of_crime')[0]
groupby_texttype
groupby_texttype_df = groupby_texttype.tables[0].query(as_df=True)
groupby_texttype_df.head()
groupby_texttype_df.sort_values(by='count', ascending=False, inplace=True)
Type of crime¶
groupby_texttype_df.head(10).plot(x='Primary_Type', y='count', kind='barh')
plt.xticks(
rotation=45,
horizontalalignment='center',
fontweight='light',
fontsize='medium',
);
Theft is the most common type of crime in the city of Chicago.
theft = groupby_texttype_df[groupby_texttype_df['Primary_Type'] == 'THEFT']
theft
def theft_description():
from datetime import datetime as dt
# crime data is stored in a feature service and accessed as a DataFrame via the layers object
df = layers[0]
df[df['Primary Type'] == 'THEFT']
out = df.groupBy('Location Description').count()
# Write the final result to our datastore.
out.write.format("webgis").save("theft_description" + str(dt.now().microsecond))
run_python_script(code=theft_description, layers=[crime_lyr])
theft_description = gis.content.search('theft_description')[0]
theft_description_df = theft_description.tables[0].query(as_df=True)
theft_description_df.sort_values(by='count', ascending=False, inplace=True)
Location of theft¶
theft_description_df[:10].plot(x='Location_Description', y='count', kind='barh')
plt.xticks(
rotation=45,
horizontalalignment='center',
fontweight='light',
fontsize='medium',
);
This plot shows the relation between crime type and crime location. It indicates that most of the theft activities occur on streets.
def grpby_type_blkgrp():
from datetime import datetime as dt
# Load the big data file share layer into a DataFrame
df = layers[0]
out = df.groupBy('Primary Type', 'Block').count()
out.write.format("webgis").save("grpby_type_blkgrp" + str(dt.now().microsecond))
run_python_script(code=grpby_type_blkgrp, layers=[crime_lyr])
grpby_cat_blk = gis.content.search('grpby_type_blkgrp')[0]
grpby_cat_blk
grpby_cat_blk_df = grpby_cat_blk.tables[0].query(as_df=True)
grpby_cat_blk_df.head()
Count of crime incidents by block group¶
grpby_cat_blk_df.sort_values(by='count', ascending=False, inplace=True)
grpby_cat_blk_df.head(10).plot(x='Block', y='count', kind='barh')
plt.xticks(
rotation=45,
horizontalalignment='center',
fontweight='light',
fontsize='medium',
);
Get crime types for a particular block group¶
blk_addr_high = grpby_cat_blk_df[grpby_cat_blk_df['Block'] == '001XX N STATE ST']
blk_addr_high.Primary_Type.sort_values(ascending=False).head()
def crime_by_datetime():
from datetime import datetime as dt
# Load the big data file share layer into a DataFrame
from pyspark.sql import functions as F
df = layers[0]
out = df.withColumn('datetime', F.unix_timestamp('Date', 'dd/MM/yyyy hh:mm:ss a').cast('timestamp'))
out.write.format("webgis").save("crime_by_datetime" + str(dt.now().microsecond))
run_python_script(code=crime_by_datetime, layers=[crime_lyr])
calls_with_datetime = gis.content.search('crime_by_datetime')[0]
calls_with_datetime_lyr = calls_with_datetime.layers[0]
def crime_with_added_date_time_cols():
from datetime import datetime as dt
# Load the big data file share layer into a DataFrame
from pyspark.sql.functions import year, month, hour
df = layers[0]
df = df.withColumn('month', month(df['datetime']))
out = df.withColumn('hour', hour(df['datetime']))
out.write.format("webgis").save("crime_with_added_date_time_cols" + str(dt.now().microsecond))
run_python_script(code=crime_with_added_date_time_cols, layers=[calls_with_datetime_lyr])
date_time_added_item = gis.content.search('crime_with_added_date_time_cols')
date_time_added_lyr = date_time_added_item[0].layers[0]
def grp_crime_by_hour():
from datetime import datetime as dt
# Load the big data file share layer into a DataFrame
df = layers[0]
out = df.groupBy('hour').count()
out.write.format("webgis").save("grp_crime_by_hour" + str(dt.now().microsecond))
run_python_script(code=grp_crime_by_hour, layers=[date_time_added_lyr])
hour = gis.content.search('grp_crime_by_hour')[0]
grp_hour = hour.tables[0]
df_hour = grp_hour.query(as_df=True)
Crime distribution by the hour¶
(df_hour
.dropna()
.sort_values(by='hour')
.astype({'hour' : int})
.plot(x='hour', y='count', kind='bar'))
plt.xticks(
rotation=45,
horizontalalignment='center',
fontweight='light',
fontsize='medium',
);
This graph shows that the crime activities are more common at the peak hours 12 A.M. and 12 P.M.
Big data machine learning using pyspark.ml¶
Find the optimal number of clusters¶
The average silhouette approach measures the quality of a clustering. That is, it determines how well each object lies within its cluster. A high average silhouette width indicates a good clustering. To learn more about silhouette analysis, click here.
def optimal_k():
import time
import numpy as np
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from datetime import datetime as dt
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.context import SQLContext
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, FloatType
silh_lst = []
k_lst = np.arange(3, 70)
crime_locations = layers[0]
assembler = VectorAssembler(inputCols=["X Coordinate", "Y Coordinate"], outputCol="features")
crime_locations = assembler.setHandleInvalid("skip").transform(crime_locations)
for k in k_lst:
silh_val = []
for run in np.arange(1, 3):
# Trains a k-means model.
kmeans = KMeans().setK(int(k)).setSeed(int(np.random.randint(100, size=1)))
model = kmeans.fit(crime_locations.select("features"))
# Make predictions
predictions = model.transform(crime_locations)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
silh_val.append(silhouette)
silh_array=np.asanyarray(silh_val)
silh_lst.append(silh_array.mean())
silhouette = pd.DataFrame(list(zip(k_lst,silh_lst)),columns = ['k', 'silhouette'])
schema = StructType([StructField('k',IntegerType(),True), StructField('silhouette',FloatType(),True)])
out = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark).createDataFrame(silhouette, schema)
# Write the result DataFrame to the relational data store
out.write.format("webgis").option("dataStore","relational").save("optimalKmeans" + str(dt.now().microsecond))
run_python_script(code=optimal_k, layers=[crime_lyr])
optimal_k = gis.content.search('optimalKmeans')[0]
optimal_k_tbl = optimal_k.tables[0]
k_df = optimal_k_tbl.query().sdf
k_df.sort_values(by='silhouette', ascending=False)
num_clusters = k_df.sort_values(by='silhouette', ascending=False).loc[0]['k']
num_clusters
K-Means Clustering¶
def cluster_crimes():
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from datetime import datetime as dt
# Crime data is stored in a feature service and accessed as a DataFrame via the layers object
crime_locations = layers[0]
# Combine the x and y columns in the DataFrame into a single column called "features"
assembler = VectorAssembler(inputCols=["X Coordinate", "Y Coordinate"], outputCol="features")
crime_locations = assembler.setHandleInvalid("skip").transform(crime_locations)
# Fit a k-means model with 15 clusters using the "features" column of the crime locations
kmeans = KMeans(k=15)
model = kmeans.fit(crime_locations.select("features"))
cost = model.computeCost(crime_locations)
# Add the cluster labels from the k-means model to the original DataFrame
crime_locations_clusters = model.transform(crime_locations)
# Write the result DataFrame to the relational data store
crime_locations_clusters.write.format("webgis").save("Crime_Clusters_KMeans" + str(dt.now().microsecond))
run_python_script(code=cluster_crimes, layers=[crime_lyr])
clusters = gis.content.search('Crime_Clusters_KMeans')[0]
clusters
By symbolizing on the predictions made by the k-means model, we can visualize the clustered crime events as shown in the screen shot above.
Conclusion¶
In this sample, we have covered how to chain together geoanalytics and pyspark tools in order to analyze big data, while only writing out the final result to a data store, eliminating the need to create any intermediate result layers. We have really gained a lot of knowledge about the use of data mining and clustering to help manage huge amount of data and deduce useful information from criminal data.