Site icon R-bloggers

Visualizing taxi trips between NYC neighborhoods with Spark and Microsoft R Server

[This article was first published on Revolutions, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

by Ali Zaidi, Data Scientist at Microsoft

In previous post we showcased the use of the sparklyr package for manipulating large datasets using a familiar dplyr syntax on top of Spark HDInsight Clusters.

In this post, we will take a look at the RxSpark API for R, part of the RevoScaleR package and the Microsoft R Server distribution of R on HDInsight. We'll use RxSpark to visualize a dataset of 140M taxi rides between boroughs in New York City.

Dealing with data in distributed storage and programming with concurrent systems often requires learning complicated new paradigms and techniques. Statisticans and data scientists familiar wtih R are unlikely to have much experience with such systems. Fortunately, the RevoScaleR package abstracts away the difficult portions of distributed computation and allows the user to focus on building R code that can be automatically deployed in distributed environments.

WODA – Write Once, Deploy Anywhere

In a similar spirit to how sparklyr allowed us to reuse our functions from the dplyr package to manipulate Spark DataFrames, the RxSpark API allows a data scientist to develop code that can be deployed in a multitude of environments. This allows the developer to shift their focus from writing code that’s specific to a certain environment, and instead focus on the complex analysis of their data science problem. We call this flexibility Write Once, Deploy Anywhere, or WODA for the acronym lovers.

For a deeper dive into the RevoScaleR package, I recommend you take a look at the online course, Analyzing Big Data with Microsoft R Server. Much of this blogpost follows along the last section of the course, on deployment to Spark.

NYC Taxi Data

In this section, we will examine the ubiquitious NYC Taxi Dataset, and showcase how we can develop data analysis pipelines that are platform invariant.

As we will see in this post, we can reuse code that we developed in a local compute environment (i.e., a single machine), and have it deploy automagically in our new Spark environment. RevoScaleR will automatically transfer the computation from a single local machine to a network of concurrent systems, without requiring any concurrent expertise from the developer.

The aforementioned online course provides detailed examples of examining the NYC Taxi Dataset in a local compute context. A compute context is an object from the RevoScaleR package that describes a compute platform to inform RevoScaleR where to conduct its computations. By default, it is a RxLocalSeq environment, which means RevoScaleR will conduct it’s analysis using the resources provided by the local machine, and do so in a sequential manner (i.e., if you have multiple jobs, these will be run sequentially, not in parallel). In our case, we have access to a cluster of machines on HDInsight, and a distributed compute platform through Spark, so we should inform RevoScaleR of our good fortune and request it use more than just a single machine to do our computation. Fortunately, we can leverage the code directly from the course demonstrations, and apply it a larger dataset through Spark.

Defining our Spark Compute Context

One of the major niceties of the RevoScaleR package is that it allows a developer to move seamlessly betweeen environments by simply specifying which compute platform the package should use for it’s next computations. When you start working with RevoScaleR, you are working in a local compute context, which you can see by calling on the function rxGetComputeContext:

rxGetComputeContext()
## RxLocalSeq Compute Context

If you ever change your compute environment to a different platform, you can always revert back to a local compute context by simply setting the compute context back to “local”:

rxSetComputeContext("local")

In our case, we would like to work in a Spark compute context, which is defined by using the RxSpark object. Moreover, we also want to specify to RevoScaleR the location of our data store, which in this case, will be HDFS. This is done through the RxHdfsFileSystem object:

hdfsFS <- RxHdfsFileSystem()

spark_cc <- RxSpark(
                    persistentRun = TRUE, 
                    extraSparkConfig = "--conf spark.speculation=true"
                    )
                          

rxSetComputeContext(spark_cc)

rxGetComputeContext()
## RevoScaleR Hadoop Spark Local Object
## ------------------------------------
## hdfsShareDir : "/user/RevoShare/alizaidi" 
## clientShareDir : "/tmp" 
## hadoopRPath : "/usr/bin/Revo64" 
## hadoopSwitches : "" 
## sshUsername : "alizaidi" 
## sshHostname : NULL 
## sshSwitches : "" 
## sshProfileScript : NULL 
## sshClientDir : "" 
## remoteMrsVersion : structure(list(major = 9L, minor = 0L, patch = 1L), .Names = c("major", "minor", "patch"), class = "version") 
## sshStrictHostKeyChecking : "ask" 
## usingRunAsUserMode : FALSE 
## nameNode : "default" 
## jobTrackerURL : NULL 
## port : 0 
## onClusterNode : TRUE 
## showOutputWhileWaiting : TRUE 
## fileSystem : NULL 
## numExecutors : 4 
## executorCores : 11 
## executorMem : "19280m" 
## driverMem : "4g" 
## executorOverheadMem : "19280m" 
## extraSparkConfig : "--conf spark.speculation=true" 
## idleTimeout : 3600 
## persistentRun : TRUE 
## sparkReduceMethod : "auto" 
## suppressWarning : TRUE 
## user : "alizaidi" 
## rSessionPid : 62825L 
## appId : "1C7804591FEA4EEEB12E2FC5963C556B" 
## jobId : "" 
## shareDir : "/var/RevoShare/alizaidi" 
## revoPath : "/usr/bin/Revo64" 
## wait : TRUE 
## consoleOutput : FALSE 
## autoCleanup : TRUE 
## workingDir : NULL 
## dataPath : NULL 
## outDataPath : NULL 
## packagesToLoad : NULL 
## email : NULL 
## resultsTimeout : 15 
## description : "spark" 
## version : "1.0-1" 
## compatibilityRequest : <environment>

For simplicity, we have used all the default arguments when defining the Spark and HDFS environmental variables, except for insisting that RevoScaleR reuse the existing Spark application whenever possible (the persistentRun parameter), and that Spark attempt to restart tasks that appear to be lagging (the extraSparkConfig value).

Now that we have moved from a local environment to a Spark compute environment, our subsequent calls to rx functions will kick off a Spark application which will handle the distributed nature of our computations. For the developer/data scientist, we can proceed as though we were still working with a local machine, and allow RevoScaleR to handle the complexities of concurrent computation. Our data is saved in HDFS in the file path defined through the taxi_path object below. Let’s first create a path to where we would like to save our XDF files and a pointer to our existing directory of csv files that we are hoping to import. Lastly, we will save a sample data.frame that we could use for testing:

data_path <- file.path("/user/RevoShare/alizaidi")
taxi_path <- file.path(data_path, "nyctaxi/data")
hdfs_ls <- paste0("hadoop fs -ls ", taxi_path)
system(hdfs_ls)

taxi_xdf <- file.path(data_path, "TaxiXdf")

taxi_text <- RxTextData(taxi_path, fileSystem = hdfsFS)
taxi_xdf <- RxXdfData(taxi_xdf, fileSystem = hdfsFS)

### Also save a sample as a data.frame
nyc_sample_df <- read.csv("data/yellow_tripdata_2016-05.csv", nrows = 1000)

An XDF file is short for an external data frame, which is the fundamental data structure for RevoScaleR algorithms. It is an on-disk file object, that is optimized to work with RevoScaleR, and will be our default data structure for this blogpost. When we save XDFs on a HDFS data store, we will use the acronym XDFDs, to emphasize the distributed nature of the XDF object. For more information about XDFs an XDFDs, particularly when stored on HDFS, please take a look at the documentation here.

Now that we have our pointers to the original data source (taxi_text), and the file path to where we want to save our data to (taxi_xdf), we can directly import our data using the simplest of RevoScaleR functions:

rxImport(inData = taxi_text, outFile = taxi_xdf)

Despite the fact that the data is stored in a set of directories partitioned across data nodes in a distributed cluster, our import step is the exact call that we would make when working with data stored locally!

rxGetInfo(taxi_xdf, getVarInfo = TRUE, numRows = 5)
## File name: /user/RevoShare/alizaidi/TaxiXdf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 19 
## Number of blocks: 289 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID, Type: integer, Low/High: (1, 2)
## Var 2: tpep_pickup_datetime, Type: character
## Var 3: tpep_dropoff_datetime, Type: character
## Var 4: passenger_count, Type: integer, Low/High: (0, 9)
## Var 5: trip_distance, Type: numeric, Storage: float32, Low/High: (-3390583.7500, 198623008.0000)
## Var 6: pickup_longitude, Type: numeric, Storage: float32, Low/High: (-736.6166, 172.6000)
## Var 7: pickup_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 80.6025)
## Var 8: RatecodeID, Type: integer, Low/High: (1, 99)
## Var 9: store_and_fwd_flag, Type: character
## Var 10: dropoff_longitude, Type: numeric, Storage: float32, Low/High: (-781.8333, 172.6000)
## Var 11: dropoff_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 480.7333)
## Var 12: payment_type, Type: integer, Low/High: (1, 5)
## Var 13: fare_amount, Type: numeric, Storage: float32, Low/High: (-957.6000, 825998.6250)
## Var 14: extra, Type: numeric, Storage: float32, Low/High: (-58.5000, 648.8700)
## Var 15: mta_tax, Type: numeric, Storage: float32, Low/High: (-3.0000, 91.0000)
## Var 16: tip_amount, Type: numeric, Storage: float32, Low/High: (-440.0000, 1200.8000)
## Var 17: tolls_amount, Type: numeric, Storage: float32, Low/High: (-99.9900, 1901.4000)
## Var 18: improvement_surcharge, Type: numeric, Storage: float32, Low/High: (-0.3000, 137.6300)
## Var 19: total_amount, Type: numeric, Storage: float32, Low/High: (-958.4000, 826040.0000)
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount
## 1        15.30
## 2        18.30
## 3        10.30
## 4        42.85
## 5         9.30

Even though the data is now saved across multiple nodes in a distributed environment, the data is compressed in order to improve read performance, and metadata is saved, improving querying time for simple statsitics.

Transformations with XDFDs

An important distinction with working with data stored in distributed file systems like HDFS in comparison to data residing on a single disk is its mutability. In general, it’s much more difficult to overwrite data in distributed storage, as it requires rewriting multiple non-contiguous blocks.

Therefore, it is often better practice to write to a new location when working with XDFDs than to overwrite existing directories. The fundamental function for data manipulation with RevoScaleR is the rxDataStep function, and it adds new columns by taking in a list of transform expressions, as described in the example below:

taxi_tip <- RxXdfData("/user/RevoShare/alizaidi/taxitipXdf",
                      fileSystem = hdfsFS)
rxDataStep(taxi_xdf, taxi_tip,
           transforms = list(tip_percent = ifelse(fare_amount > 0, 
                                                  tip_amount/fare_amount,
                                                  NA)))

Examining results:

rxGetInfo(taxi_tip, getVarInfo = TRUE)
## File name: /user/RevoShare/alizaidi/taxitipXdf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 20 
## Number of blocks: 289 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID, Type: integer, Low/High: (1, 2)
## Var 2: tpep_pickup_datetime, Type: character
## Var 3: tpep_dropoff_datetime, Type: character
## Var 4: passenger_count, Type: integer, Low/High: (0, 9)
## Var 5: trip_distance, Type: numeric, Storage: float32, Low/High: (-3390583.7500, 198623008.0000)
## Var 6: pickup_longitude, Type: numeric, Storage: float32, Low/High: (-736.6166, 172.6000)
## Var 7: pickup_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 80.6025)
## Var 8: RatecodeID, Type: integer, Low/High: (1, 99)
## Var 9: store_and_fwd_flag, Type: character
## Var 10: dropoff_longitude, Type: numeric, Storage: float32, Low/High: (-781.8333, 172.6000)
## Var 11: dropoff_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 480.7333)
## Var 12: payment_type, Type: integer, Low/High: (1, 5)
## Var 13: fare_amount, Type: numeric, Storage: float32, Low/High: (-957.6000, 825998.6250)
## Var 14: extra, Type: numeric, Storage: float32, Low/High: (-58.5000, 648.8700)
## Var 15: mta_tax, Type: numeric, Storage: float32, Low/High: (-3.0000, 91.0000)
## Var 16: tip_amount, Type: numeric, Storage: float32, Low/High: (-440.0000, 1200.8000)
## Var 17: tolls_amount, Type: numeric, Storage: float32, Low/High: (-99.9900, 1901.4000)
## Var 18: improvement_surcharge, Type: numeric, Storage: float32, Low/High: (-0.3000, 137.6300)
## Var 19: total_amount, Type: numeric, Storage: float32, Low/High: (-958.4000, 826040.0000)
## Var 20: tip_percent, Type: numeric, Low/High: (-1.0000, 54900.0012)

More Complicated Transforms

Creating Temporal Features

The above manipuluation previous sections how we could create new columns by defining a list of expressions. For more complicated data maipulation steps, we could create a user-defined function (UDF) that we could apply to each chunk of our data to create new columns. This greatly expands on the possibilities of data manipulation for large datasets, as we are no longer limited to handcrafting single expressions, we could create complex functions that could even rely on external R packages. The function below uses the lubridate package to create a set of columns defining some temporal features; the pickup/dropoff date, hour, day of week, as well as the trip duration:

xforms <- function(data) { # transformation function for extracting some date and time features
  # require(lubridate)
  weekday_labels <- c('Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat')
  cut_levels <- c(1, 5, 9, 12, 16, 18, 22)
  hour_labels <- c('1AM-5AM', '5AM-9AM', '9AM-12PM', '12PM-4PM', '4PM-6PM', '6PM-10PM', '10PM-1AM')
  
  pickup_datetime <- lubridate::ymd_hms(data$tpep_pickup_datetime, tz = "UTC")
  pickup_hour <- addNA(cut(hour(pickup_datetime), cut_levels))
  pickup_dow <- factor(wday(pickup_datetime), levels = 1:7, labels = weekday_labels)
  levels(pickup_hour) <- hour_labels
  # 
  dropoff_datetime <- lubridate::ymd_hms(data$tpep_dropoff_datetime, tz = "UTC")
  dropoff_hour <- addNA(cut(hour(dropoff_datetime), cut_levels))
  dropoff_dow <- factor(wday(dropoff_datetime), levels = 1:7, labels = weekday_labels)
  levels(dropoff_hour) <- hour_labels
  # 
  data$pickup_hour <- pickup_hour
  data$pickup_dow <- pickup_dow
  data$dropoff_hour <- dropoff_hour
  data$dropoff_dow <- dropoff_dow
  data$trip_duration <- as.integer(lubridate::interval(pickup_datetime, dropoff_datetime))
  
  return(data)
}

We defined our function above, and now we apply it to our dataset. In order to ensure our function works on our data, we could try it out on a sample dataset locally first by reverting to a local compute context:

x <- head(taxi_tip)
rxSetComputeContext("local")

rxDataStep(inData = x, 
           outFile = NULL, 
           transformFunc = xforms, 
           transformPackages = "lubridate")
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
## 6        1  2015-07-01 00:00:00   2015-07-01 00:06:46               2
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
## 6           1.0        -73.95916        40.77343          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
## 6                  N         -73.96935         40.76925            2
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1        14.0   0.5     0.5       0.00            0                   0.3
## 2        17.0   0.5     0.5       0.00            0                   0.3
## 3         9.0   0.5     0.5       0.00            0                   0.3
## 4        33.0   0.5     0.5       8.55            0                   0.3
## 5         6.0   0.5     0.5       2.00            0                   0.3
## 6         6.5   0.5     0.5       0.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
## 6         7.80   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration
## 1           926
## 2          1342
## 3           462
## 4          2377
## 5           334
## 6           406
rxSetComputeContext(spark_cc)

We used a local compute context to try our function on the sample locally, then reverted to the spark context for doing our analysis on the cluster.

taxi_date <- RxXdfData("/user/RevoShare/alizaidi/TaxiDatesTranf",
                       fileSystem = hdfsFS)
rxDataStep(inData = taxi_tip, 
           outFile = taxi_date, 
           transformFunc = xforms, 
           transformPackages = "lubridate")
rxGetInfo(taxi_date, numRows = 5, getVarInfo = TRUE)
## File name: /user/RevoShare/alizaidi/TaxiDatesTranf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 25 
## Number of blocks: 289 
## Compression type: zlib 
## Variable information: 
## Var 1: VendorID, Type: integer, Low/High: (1, 2)
## Var 2: tpep_pickup_datetime, Type: character
## Var 3: tpep_dropoff_datetime, Type: character
## Var 4: passenger_count, Type: integer, Low/High: (0, 9)
## Var 5: trip_distance, Type: numeric, Storage: float32, Low/High: (-3390583.7500, 198623008.0000)
## Var 6: pickup_longitude, Type: numeric, Storage: float32, Low/High: (-736.6166, 172.6000)
## Var 7: pickup_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 80.6025)
## Var 8: RatecodeID, Type: integer, Low/High: (1, 99)
## Var 9: store_and_fwd_flag, Type: character
## Var 10: dropoff_longitude, Type: numeric, Storage: float32, Low/High: (-781.8333, 172.6000)
## Var 11: dropoff_latitude, Type: numeric, Storage: float32, Low/High: (-78.1947, 480.7333)
## Var 12: payment_type, Type: integer, Low/High: (1, 5)
## Var 13: fare_amount, Type: numeric, Storage: float32, Low/High: (-957.6000, 825998.6250)
## Var 14: extra, Type: numeric, Storage: float32, Low/High: (-58.5000, 648.8700)
## Var 15: mta_tax, Type: numeric, Storage: float32, Low/High: (-3.0000, 91.0000)
## Var 16: tip_amount, Type: numeric, Storage: float32, Low/High: (-440.0000, 1200.8000)
## Var 17: tolls_amount, Type: numeric, Storage: float32, Low/High: (-99.9900, 1901.4000)
## Var 18: improvement_surcharge, Type: numeric, Storage: float32, Low/High: (-0.3000, 137.6300)
## Var 19: total_amount, Type: numeric, Storage: float32, Low/High: (-958.4000, 826040.0000)
## Var 20: tip_percent, Type: numeric, Low/High: (-1.0000, 54900.0012)
## Var 21: pickup_hour
##        7 factor levels: 1AM-5AM 5AM-9AM 9AM-12PM 12PM-4PM 4PM-6PM 6PM-10PM 10PM-1AM
## Var 22: pickup_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 23: dropoff_hour
##        7 factor levels: 1AM-5AM 5AM-9AM 9AM-12PM 12PM-4PM 4PM-6PM 6PM-10PM 10PM-1AM
## Var 24: dropoff_dow
##        7 factor levels: Sun Mon Tue Wed Thu Fri Sat
## Var 25: trip_duration, Type: integer, Low/High: (-631148790, 29227264)
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration
## 1           926
## 2          1342
## 3           462
## 4          2377
## 5           334

As you see from the output of the chunk above, our function creates a set of columns defining some useful temporal features. Our hope is that these temporal features will give us a better understanding of the distribution of trips as a function of temporal variables.

For example, we can now examine how the distribution of tip percents vary as a function of day of week and pickup hour:

tip_dist_df <- rxCube(tip_percent ~ pickup_hour + pickup_dow, data = taxi_date, returnDataFrame = TRUE)

library(ggplot2)
library(magrittr)

tip_dist_df %>% ggplot(aes(x = pickup_hour, y = pickup_dow, fill = tip_percent)) +
  geom_tile() + theme_minimal() + 
  scale_fill_continuous(label = scales::percent) +
  labs(x = "Pickup Hour", y = "Pickup Day of Week", fill = "Tip Percent",
      title = "Distribution of Tip Percents",
      subtitle = "Do Passengers Tip More in the AM?")

Crafting Spatial Features

While temporal features give us an interesting look at how taxi trips vary as a function of time, the trips data contains a wealth of information about spatial features. Currently, the spatial features are all consolidated into the coordinates columns, which are numeric values. In order to reduce the variability of the numeric features, and encode them into more sensible categories, let’s see if we can transform the coordinate columns into a column of categorical features encoding the neighborhood of pickup and dropoff.

Our UDF will examine the rows of our data set, and map the spatial coordinates to a shapefile that contains the neighborhoods of NYC. The shapefile we will use is from Zillow:

library(rgeos)
library(sp)
library(maptools)
library(stringr)

nyc_shapefile <- readShapePoly('ZillowNeighborhoods-NY/ZillowNeighborhoods-NY.shp')
mht_shapefile <- subset(nyc_shapefile, str_detect(CITY, 'New York City-Manhattan'))

mht_shapefile@data$id <- as.character(mht_shapefile@data$NAME)

We will use the shapefile to map the spatial coordinates of our obsevations to the nearest neighborhood given by the shapefile:

find_nhoods <- function(data) {
  
  # extract pick-up lat and long and find their neighborhoods
  pickup_longitude <- ifelse(is.na(data$pickup_longitude), 0, data$pickup_longitude)
  pickup_latitude <- ifelse(is.na(data$pickup_latitude), 0, data$pickup_latitude)
  data_coords <- data.frame(long = pickup_longitude, lat = pickup_latitude)
  coordinates(data_coords) <- c('long', 'lat')
  nhoods <- over(data_coords, shapefile)
  
  ## add only the pick-up neighborhood and city columns to the data
  data$pickup_nhood <- nhoods$NAME
  data$pickup_borough <- nhoods$CITY
  
  # extract drop-off lat and long and find their neighborhoods
  dropoff_longitude <- ifelse(is.na(data$dropoff_longitude), 0, data$dropoff_longitude)
  dropoff_latitude <- ifelse(is.na(data$dropoff_latitude), 0, data$dropoff_latitude)
  data_coords <- data.frame(long = dropoff_longitude, lat = dropoff_latitude)
  coordinates(data_coords) <- c('long', 'lat')
  nhoods <- over(data_coords, shapefile)
  
  ## add only the drop-off neighborhood and city columns to the data  
  data$dropoff_nhood <- nhoods$NAME
  data$dropoff_borough <- nhoods$CITY
  
  ## return the data with the new columns added in
  data
}

Let’s again test this function locally first:

rxSetComputeContext("local")

head(rxDataStep(nyc_sample_df, transformFunc = find_nhoods, transformPackages = c("sp", "maptools"), 
                transformObjects = list(shapefile = mht_shapefile)))
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2016-05-01 00:00:00   2016-05-01 00:17:31               1
## 2        2  2016-05-01 00:00:00   2016-05-01 00:07:31               1
## 3        2  2016-05-01 00:00:00   2016-05-01 00:07:01               6
## 4        2  2016-05-01 00:00:00   2016-05-01 00:19:47               1
## 5        2  2016-05-01 00:00:00   2016-05-01 00:06:39               1
## 6        2  2016-05-01 00:00:00   2016-05-01 00:05:19               2
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1          3.60        -73.98590        40.76804          1
## 2          1.68        -73.99158        40.74475          1
## 3          1.09        -73.99307        40.74157          1
## 4          4.21        -73.99194        40.68460          1
## 5          0.56        -74.00528        40.74019          1
## 6          0.63        -73.97929        40.75576          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.98399         40.73010            1
## 2                  N         -73.97570         40.76547            1
## 3                  N         -73.98100         40.74463            1
## 4                  N         -74.00226         40.73300            1
## 5                  N         -73.99750         40.73756            1
## 6                  N         -73.98801         40.75847            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1        15.0   0.5     0.5       1.50            0                   0.3
## 2         7.5   0.5     0.5       0.88            0                   0.3
## 3         6.5   0.5     0.5       1.56            0                   0.3
## 4        17.0   0.5     0.5       3.66            0                   0.3
## 5         6.0   0.5     0.5       1.46            0                   0.3
## 6         5.0   0.5     0.5       0.00            0                   0.3
##   total_amount pickup_nhood          pickup_borough     dropoff_nhood
## 1        17.80      Midtown New York City-Manhattan      East Village
## 2         9.68      Chelsea New York City-Manhattan      Central Park
## 3         9.36      Chelsea New York City-Manhattan          Gramercy
## 4        21.96         <NA>                    <NA> Greenwich Village
## 5         8.76 West Village New York City-Manhattan Greenwich Village
## 6         6.30      Midtown New York City-Manhattan           Midtown
##           dropoff_borough
## 1 New York City-Manhattan
## 2 New York City-Manhattan
## 3 New York City-Manhattan
## 4 New York City-Manhattan
## 5 New York City-Manhattan
## 6 New York City-Manhattan
rxSetComputeContext(spark_cc)

Looks like it worked locally, let’s try it on our full dataset:

taxi_hoods <- RxXdfData("/user/RevoShare/alizaidi/TaxiHoodsXdf",
                       fileSystem = hdfsFS)
rxDataStep(taxi_date, taxi_hoods, 
           transformFunc = find_nhoods, 
           transformPackages = c("sp", "maptools", "rgeos"), 
           transformObjects = list(shapefile = mht_shapefile))
rxGetInfo(taxi_hoods, numRows = 5)
## File name: /user/RevoShare/alizaidi/TaxiHoodsXdf 
## Number of composite data files: 48 
## Number of observations: 138413407 
## Number of variables: 29 
## Number of blocks: 289 
## Compression type: zlib 
## Data (5 rows starting with row 1):
##   VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count
## 1        1  2015-07-01 00:00:00   2015-07-01 00:15:26               1
## 2        1  2015-07-01 00:00:00   2015-07-01 00:22:22               1
## 3        1  2015-07-01 00:00:00   2015-07-01 00:07:42               1
## 4        1  2015-07-01 00:00:00   2015-07-01 00:39:37               1
## 5        1  2015-07-01 00:00:00   2015-07-01 00:05:34               1
##   trip_distance pickup_longitude pickup_latitude RatecodeID
## 1           3.5        -73.99416        40.75113          1
## 2           3.9        -73.98466        40.76849          1
## 3           2.3        -73.97889        40.76229          1
## 4           9.2        -73.99279        40.74276          1
## 5           1.1        -73.91243        40.76981          1
##   store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type
## 1                  N         -73.97682         40.78857            2
## 2                  N         -74.00013         40.73490            2
## 3                  N         -74.00422         40.75253            2
## 4                  N         -73.97151         40.63715            1
## 5                  N         -73.92033         40.75744            1
##   fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge
## 1          14   0.5     0.5       0.00            0                   0.3
## 2          17   0.5     0.5       0.00            0                   0.3
## 3           9   0.5     0.5       0.00            0                   0.3
## 4          33   0.5     0.5       8.55            0                   0.3
## 5           6   0.5     0.5       2.00            0                   0.3
##   total_amount tip_percent pickup_hour pickup_dow dropoff_hour dropoff_dow
## 1        15.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 2        18.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 3        10.30   0.0000000    10PM-1AM        Wed     10PM-1AM         Wed
## 4        42.85   0.2590909    10PM-1AM        Wed     10PM-1AM         Wed
## 5         9.30   0.3333333    10PM-1AM        Wed     10PM-1AM         Wed
##   trip_duration     pickup_nhood          pickup_borough     dropoff_nhood
## 1           926 Garment District New York City-Manhattan   Upper West Side
## 2          1342          Midtown New York City-Manhattan Greenwich Village
## 3           462          Midtown New York City-Manhattan           Chelsea
## 4          2377          Chelsea New York City-Manhattan              <NA>
## 5           334             <NA>                    <NA>              <NA>
##           dropoff_borough
## 1 New York City-Manhattan
## 2 New York City-Manhattan
## 3 New York City-Manhattan
## 4                    <NA>
## 5                    <NA>

Filter Data to Manhattan Only

Since Manhattan is the busiest of boroughs in NYC, we will narrow our focus to do the trips that started and ended in that borough. Moreover, we will filter out possible outlier observations as well:

mht_xdf <- RxXdfData("/user/RevoShare/alizaidi/ManhattanXdf",
                     fileSystem = hdfsFS)
rxDataStep(mht_hoods, mht_xdf, 
           rowSelection = (
             passenger_count > 0 &
               trip_distance >= 0 & trip_distance < 30 &
               trip_duration > 0 & trip_duration < 60*60*24 &
               str_detect(pickup_borough, 'Manhattan') &
               str_detect(dropoff_borough, 'Manhattan') &
               !is.na(pickup_nb) &
               !is.na(dropoff_nb) &
               fare_amount > 0), 
           transformPackages = "stringr",
           varsToDrop = c('extra', 'mta_tax', 'improvement_surcharge', 'total_amount', 
                          'pickup_borough', 'dropoff_borough', 'pickup_nhood', 'dropoff_nhood'))

Now that we have our spatial features, let’s see if we can make a visualization of the trips as a function time of trip as well as the trip route.

library(dplyr)

nbs_df <- rxCube(~ pickup_nb + dropoff_nb + pickup_hour, data = mht_xdf, returnDataFrame = TRUE)
nbs_df <- nbs_df %>% tbl_df %>%
  filter(Counts >= 100) %>% 
  mutate(width = ntile(Counts, 5))

We used the beloved dplyr package in conjuction with the RevoScaleR rxCube function to tabulate the counts of trips as a function of route and pickup hour, and then filtered to only those routes that had at least 100 observations in our dataset. Finally, we used the handy ntile function in dplyr to create a variable to measure the strength of relationship of pickup and dropoff neighborhoods as a 20% percentile of all trips.

Now that we have the counts in a tidy data.frame object, we can make visualizations of the route. We’ll use the exceptional circlize package to make visualizations, one for each level of the pickup_hour factor. In order to map each of these to the visualization, we will first make a functional sequence which will contain the methods to make our visualization, and then use the purrr package to map each level of the factor column into the functional sequence. In order to make our plots a little less “hair-ballish”, we’ll narrow our focus to a few of the most popular neighborhoods, as specified through the nbs vector (please don’t be offended if your favorite neighborhood was ommitted):

library(purrr)
library(circlize)

nbs <- c("Lower East Side","East Village",
          "Chelsea", "Midtown", "Upper East Side",
         "Upper West Side", "Greenwich Vilalge")


chord_diag <- . %>% select(pickup_nb, dropoff_nb, width) %>% chordDiagram()

chord_plot <- nbs_df %>%
  filter(pickup_nb %in% nbs,
         dropoff_nb %in% nbs) %>% 
  split(.$pickup_hour) %>%
  map(chord_diag)

Here's the chart for the morning rush hour (5AM-9AM):

Conclusion

Hopefully this post provided some insight on how we could RevoScaleR to develop scalable data analysis pipelines that can be deployed onto Spark clusters easily. We developed some intuitive features from our data set based on temporal and spatial characteristics of the original data. Stay tuned for another post where we show how you can use these features for prediction and inference!

To leave a comment for the author, please follow the link and comment on their blog: Revolutions.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.