Apache Spark, Zeppelin and geospatial big data processing

Cranfield University in the use of Big Data tools, and with our parallel interests in all things geospatial, the question arises – how can Big Data tools process geospatial data? In this blog, we investigate the use of Apache Spark, Apache Zeppelin and a couple of geospatial libraries. In an earlier blog, we set up Spark and Zeppelin, and now we extend this to use these additional tools. Note that this exercise is undertaken with a MacBook, although the instructions should work with Linux just as well. There are few geospatial libraries for Big Data processing that work with Spark/Hadoop. Some of those that exist include the Hadoop offering from ESRI, Magellan, and GeoSpark.

GeoSpark

To set up GeoSpark, we downloaded the library ‘geospark-0.3.2-spark-2.x.jar’ from https://github.com/DataSystemsLab/GeoSpark/releases and saved the file off locally, e.g. to /Users/sparkuser/spark/jars/ Next, in the Apache Spark installation ‘conf’ folder, we copied the template file ‘spark-defaults.conf.template’ to ‘spark-defaults.conf’ ready for editing – we need to tell Spark to use the GeoSpark jar library. Now, we edited the conf configuration file to add the line at the end to reference the jar, e.g. spark.jars /Users/sparkuser/spark/jars/geospark-0.3.2-spark-2.x.jar

Sourcing data

We need some spatial data for our test. We downloaded sample data files ‘zcta510-small.csv‘ and ‘arealm-small.csv‘ (online as above), to a local data location, e.g. /Users/sparkuser/spark/data/geospark. The datasets take the following form: arealm-small.csv
-88.331492,32.324142
-88.175933,32.360763
-88.388954,32.357073
-88.221102,32.35078
-88.323995,32.950671
...
zcta510-small.csv
-155.940114,19.081331,-155.618917,19.5307
-155.335476,19.802474,-155.104434,19.93224
-155.85966,20.120695,-155.765027,20.268469
-155.396864,19.519641,-154.987674,19.800274
-155.98572,19.53958,-155.822977,19.70849
...

The code

We now followed exactly the GeoSpark example tutorial code, in the Scala language. First, we need to ensure the correct libraries are loaded and available:
import org.datasyslab.geospark.spatialOperator.RangeQuery
import org.datasyslab.geospark.spatialRDD.PointRDD
import org.datasyslab.geospark.spatialOperator.JoinQuery
import org.datasyslab.geospark.spatialRDD.RectangleRDD
import com.vividsolutions.jts.geom.Envelope
import org.datasyslab.geospark.spatialOperator.KNNQuery
import org.datasyslab.geospark.spatialRDD.PointRDD
import com.vividsolutions.jts.geom.Coordinate
import com.vividsolutions.jts.geom.GeometryFactory
import com.vividsolutions.jts.geom.Point
Now we can run the following code and observe the following:
// Start an example Spatial Range Query without Index
val queryEnvelope=new Envelope (-113.79,-109.73,32.99,35.08);
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
val resultSize = RangeQuery.SpatialRangeQuery(objectRDD, queryEnvelope, 0).getRawPointRDD().count(); /* The O means consider a point only if it is fully covered by the query window when doing query */
queryEnvelope: com.vividsolutions.jts.geom.Envelope = Env[-113.79 : -109.73, 32.99 : 35.08] objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@52b8d9a6 resultSize: Long = 445
// Start an example Spatial Range Query with Index
val queryEnvelope=new Envelope (-113.79,-109.73,32.99,35.08);
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
objectRDD.buildIndex("rtree"); /* Build R-Tree index */
val resultSize = RangeQuery.SpatialRangeQueryUsingIndex(objectRDD, queryEnvelope,0).getRawPointRDD().count(); /* The O means consider a point only if it is fully covered by the query window when doing query */
queryEnvelope: com.vividsolutions.jts.geom.Envelope = Env[-113.79 : -109.73, 32.99 : 35.08] objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@2c3e8ebf resultSize: Long = 445
// Start an example Spatial KNN Query without Index
val fact=new GeometryFactory();
val queryPoint=fact.createPoint(new Coordinate(-109.73, 35.08));
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
val resultSize = KNNQuery.SpatialKnnQuery(objectRDD, queryPoint, 5); /* The number 5 means 5 nearest neighbors */
fact: com.vividsolutions.jts.geom.GeometryFactory = com.vividsolutions.jts.geom.GeometryFactory@35f6b599 queryPoint: com.vividsolutions.jts.geom.Point = POINT (-109.73 35.08) objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@76d6439b resultSize: java.util.List[com.vividsolutions.jts.geom.Point] = [POINT (-109.538914 35.123446), POINT (-108.729849 37.196678), POINT (-117.105253 33.48551), POINT (-120.679839 35.25764), POINT (-120.860368 35.398047)]
// Start an example Spatial KNN Query with Index
val fact=new GeometryFactory();
val queryPoint=fact.createPoint(new Coordinate(-109.73, 35.08));
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
objectRDD.buildIndex("rtree"); /* Build R-Tree index */
val resultSize = KNNQuery.SpatialKnnQueryUsingIndex(objectRDD, queryPoint, 5); /* The number 5 means 5 nearest neighbors */
fact: com.vividsolutions.jts.geom.GeometryFactory = com.vividsolutions.jts.geom.GeometryFactory@24046396 queryPoint: com.vividsolutions.jts.geom.Point = POINT (-109.73 35.08) objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@6db7719d resultSize: java.util.List[com.vividsolutions.jts.geom.Point] = [POINT (-109.538914 35.123446), POINT (-108.729849 37.196678), POINT (-108.135158 37.242491), POINT (-107.596572 37.000003), POINT (-107.79524 37.225479)]
// Start an example Spatial Join Query without Index
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0 ,"csv","rtree",4); /* The O means spatial attribute starts at Column 0, number 4 means 4 RDD partitions, "rtree" means use R-Tree Spatial Partitioning Grid */
val rectangleRDD = new RectangleRDD(sc, "/Users/sparkuser/spark/data/geospark/zcta510-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
val joinQuery = new JoinQuery(sc,objectRDD,rectangleRDD);
val resultSize = joinQuery.SpatialJoinQuery(objectRDD,rectangleRDD).count();
objectRDD.totalNumberOfRecords  /* see https://github.com/DataSystemsLab/GeoSpark/blob/master/src/main/java/org/datasyslab/geospark/spatialRDD/PointRDD.java for API */
objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@730e3723 rectangleRDD: org.datasyslab.geospark.spatialRDD.RectangleRDD = org.datasyslab.geospark.spatialRDD.RectangleRDD@2bf31c8c joinQuery: org.datasyslab.geospark.spatialOperator.JoinQuery = org.datasyslab.geospark.spatialOperator.JoinQuery@36cecee7 resultSize: Long = 9989
// Start an example Spatial Join Query with Index
val objectRDD = new PointRDD(sc, "/Users/sparkuser/spark/data/geospark/arealm-small.csv", 0 ,"csv","rtree",4); /* The O means spatial attribute starts at Column 0, number 4 means 4 RDD partitions, "rtree" means use R-Tree Spatial Partitioning Grid */
val rectangleRDD = new RectangleRDD(sc, "/Users/sparkuser/spark/data/geospark/zcta510-small.csv", 0, "csv"); /* The O means spatial attribute starts at Column 0 */
val joinQuery = new JoinQuery(sc,objectRDD,rectangleRDD);
objectRDD.buildIndex("rtree"); /* Build R-Tree index */
val resultSize = joinQuery.SpatialJoinQueryUsingIndex(objectRDD,rectangleRDD).count();
objectRDD: org.datasyslab.geospark.spatialRDD.PointRDD = org.datasyslab.geospark.spatialRDD.PointRDD@1301fbdd rectangleRDD: org.datasyslab.geospark.spatialRDD.RectangleRDD = org.datasyslab.geospark.spatialRDD.RectangleRDD@ebfb5e7 joinQuery: org.datasyslab.geospark.spatialOperator.JoinQuery = org.datasyslab.geospark.spatialOperator.JoinQuery@197ff4a6 resultSize: Long = 9989]]>