Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Introduction
In the previous parts of this series, we have shown how to write functions as both combinations of dplyr verbs and SQL query generators that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed.
In this fourth part, we will look at how to write R functions that interface with Spark via a lower-level invocation API that lets us use all the functionality that is exposed by the Scala Spark APIs. We will also show how such R calls relate to Scala code.
Contents
- Introduction
- Contents
- Preparation
- The invoke() API of sparklyr
- Getting started with the invoke API
- Grouping and aggregation with invoke chains
- Wrapping the invocations into R functions
- Reconstructing variable normalization
- Where invoke can be better than dplyr translation or SQL
- Conclusion
- References
Preparation
The full setup of Spark and sparklyr is not in the scope of this post, please check the first one for some setup instructions and a ready-made Docker image.
If you have docker available, running
docker run -d -p 8787:8787 -e PASSWORD=pass --name rstudio jozefhajnala/sparkly:add-rstudio
Should make RStudio available by navigating to http://localhost:8787 in your browser. You can then use the user name rstudio
and password pass
to login and continue experimenting with the code in this post.
# Load packages suppressPackageStartupMessages({ library(sparklyr) library(dplyr) library(nycflights13) }) # Prepare the data weather <- nycflights13::weather %>% mutate(id = 1L:nrow(nycflights13::weather)) %>% select(id, everything()) # Connect sc <- sparklyr::spark_connect(master = "local") # Copy the weather dataset to the instance tbl_weather <- dplyr::copy_to( dest = sc, df = weather, name = "weather", overwrite = TRUE ) # Copy the flights dataset to the instance tbl_flights <- dplyr::copy_to( dest = sc, df = nycflights13::flights, name = "flights", overwrite = TRUE )
The invoke() API of sparklyr
So far when interfacing with Spark from R, we have used the sparklyr package in three ways:
- Writing combinations of dplyr verbs that would be translated to Spark SQL via the dbplyr package and the SQL executed by Spark when requested
- Generating Spark SQL code directly and sending it for execution in multiple ways
- Combinations of the above two methods
What these methods have in common is that they translate operations written in R to Spark SQL and that SQL code is then sent for execution by our Spark instance.
There is however another approach that we can use with sparklyr, which will be more familiar to users or developers who have worked with packages like rJava or rscala before. Even though arguably less convenient than the APIs provided by the 2 aforementioned packages, sparklyr provides an invocation API that exposes 3 functions:
invoke(jobj, method, ...)
to execute a method on a Java object referenceinvoke_static(sc, class, method, ...)
to execute a static method associated with a Java classinvoke_new(sc, class, ...)
to invoke a constructor associated with a Java class
Let us have a look at how we can use those functions in practice to efficiently work with Spark from R.
Getting started with the invoke API
We can start with a few very simple examples of invoke()
usage, for instance getting the number of rows of the tbl_flights
:
# Get the count of rows tbl_flights %>% spark_dataframe() %>% invoke("count") ## [1] 336776
We see one extra operation before invoking the count: spark_dataframe()
. This is because the invoke()
interface works with Java object references and not tbl
objects in remote sources such as tbl_flights
. We, therefore, need to convert tbl_flights
to a Java object reference, for which we use the spark_dataframe()
function.
Now, for something more exciting, let us compute a summary of the variables in tbl_flights
using the describe
method:
tbl_flights_summary <- tbl_flights %>% spark_dataframe() %>% invoke("describe", as.list(colnames(tbl_flights))) %>% sdf_register() tbl_flights_summary ## # Source: spark<?> [?? x 19] ## summary year month day dep_time sched_dep_time dep_delay arr_time ## <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> ## 1 count 3367… 3367… 3367… 328521 336776 328521 328063 ## 2 mean 2013… 6.54… 15.7… 1349.10… 1344.25484001… 12.63907… 1502.05… ## 3 stddev 0.0 3.41… 8.76… 488.281… 467.335755734… 40.21006… 533.264… ## 4 min 2013 1 1 1 106 -43.0 1 ## 5 max 2013 12 31 2400 2359 1301.0 2400 ## # … with 11 more variables: sched_arr_time <chr>, arr_delay <chr>, ## # carrier <chr>, flight <chr>, tailnum <chr>, origin <chr>, dest <chr>, ## # air_time <chr>, distance <chr>, hour <chr>, minute <chr>
We also one see extra operation after invoking the describe method: sdf_register()
. This is because the invoke()
interface also returns Java object references and we may like to see a more user-friendly tbl
object instead. This is where sdf_register()
comes in to register a Spark DataFrame and return a tbl_spark
object back to us.
And indeed, we can see that the wrapper sdf_describe()
provided by the sparklyr package itself works in a very similar fashion:
sparklyr::sdf_describe ## function(x, cols = colnames(x)) { ## in_df <- cols %in% colnames(x) ## if (any(!in_df)) { ## msg <- paste0("The following columns are not in the data frame: ", ## paste0(cols[which(!in_df)], collapse = ", ")) ## stop(msg) ## } ## cols <- cast_character_list(cols) ## ## x %>% ## spark_dataframe() %>% ## invoke("describe", cols) %>% ## sdf_register() ## } ## <environment: namespace:sparklyr>
If we so wish, for DataFrame related object references, we can also call collect()
to retrieve the results directly, without using sdf_register()
first, for instance retrieving the full content of the origin
column:
tbl_flights %>% spark_dataframe() %>% invoke("select", "origin", list()) %>% collect() ## # A tibble: 336,776 x 1 ## origin ## <chr> ## 1 EWR ## 2 LGA ## 3 JFK ## 4 JFK ## 5 LGA ## 6 EWR ## 7 EWR ## 8 LGA ## 9 JFK ## 10 LGA ## # … with 336,766 more rows
It can also be helpful to investigate the schema of our DataFrame:
tbl_flights %>% spark_dataframe() %>% invoke("schema") ## <jobj[142]> ## org.apache.spark.sql.types.StructType ## StructType(StructField(year,IntegerType,true), StructField(month,IntegerType,true), StructField(day,IntegerType,true), StructField(dep_time,IntegerType,true), StructField(sched_dep_time,IntegerType,true), StructField(dep_delay,DoubleType,true), StructField(arr_time,IntegerType,true), StructField(sched_arr_time,IntegerType,true), StructField(arr_delay,DoubleType,true), StructField(carrier,StringType,true), StructField(flight,IntegerType,true), StructField(tailnum,StringType,true), StructField(origin,StringType,true), StructField(dest,StringType,true), StructField(air_time,DoubleType,true), StructField(distance,DoubleType,true), StructField(hour,DoubleType,true), StructField(minute,DoubleType,true), StructField(time_hour,TimestampType,true))
We can also use the invoke interface on other objects, for instance the SparkContext
. Let’s for instance retrieve the uiWebUrl
of our context:
sc %>% spark_context() %>% invoke("uiWebUrl") %>% invoke("toString") ## [1] "Some(http://localhost:4040)"
Grouping and aggregation with invoke chains
Imagine we would like to do simple aggregations of a Spark DataFrame, such as an average of a column grouped by another column. For reference, we can do this very simply using the dplyr approach. Let’s compute the average departure delay by origin of the flight:
tbl_flights %>% group_by(origin) %>% summarise(avg(dep_delay)) ## # Source: spark<?> [?? x 2] ## origin `avg(dep_delay)` ## <chr> <dbl> ## 1 EWR 15.1 ## 2 JFK 12.1 ## 3 LGA 10.3
Now we will show how to do the same aggregation via the lower level API. Using the Spark shell we would simply do:
flights. groupBy("origin"). agg(avg("dep_delay"))
Translating that into the lower level invoke()
API provided by sparklyr looks something like this:
tbl_flights %>% spark_dataframe() %>% invoke("groupBy", "origin", list()) %>% invoke("agg", invoke_static(sc, "org.apache.spark.sql.functions", "expr", "avg(dep_delay)"), list()) %>% sdf_register()
What is all that extra code?
Now, compared to the very simple 2 operations in the Scala version, we have some gotchas to examine:
one of the
invoke()
calls is quite long. Instead of justavg("dep_delay")
like in the Scala example, we useinvoke_static(sc, "org.apache.spark.sql.functions", "expr", "avg(dep_delay)")
. This is because theavg("dep_delay")
expression is somewhat of a syntactic sugar provided by Scala, but when calling from R we need to provide the object reference hidden behind that sugar.the empty
list()
at the end of the"groupBy"
and"agg"
invokes. This is needed as a workaround some Scala methods take String, String* as arguments and sparklyr currently does not support variable parameters. We can passlist()
to represent an emptyString[]
in Scala as the needed second argument.
Wrapping the invocations into R functions
Seeing the above example, we can quickly write a useful wrapper to ease the pain a little. First, we can create a small function that will generate the aggregation expression we can use with invoke("agg", ...)
:
agg_expr <- function(tbl, exprs) { sparklyr::invoke_static( tbl[["src"]][["con"]], "org.apache.spark.sql.functions", "expr", exprs ) }
Next, we can wrap around the entire process to make a more generic aggregation function, using the fact that a remote tibble has the details on sc
within its tbl[["src"]][["con"]]
element:
grpagg_invoke <- function(tbl, colName, groupColName, aggOperation) { avgColumn <- tbl %>% agg_expr(paste0(aggOperation, "(", colName, ")")) tbl %>% spark_dataframe() %>% invoke("groupBy", groupColName, list()) %>% invoke("agg", avgColumn, list()) %>% sdf_register() }
And finally use our wrapper to get the same results in a more user-friendly way:
tbl_flights %>% grpagg_invoke("arr_delay", groupColName = "origin", aggOperation = "avg") ## # Source: spark<?> [?? x 2] ## origin `avg(arr_delay)` ## <chr> <dbl> ## 1 EWR 9.11 ## 2 JFK 5.55 ## 3 LGA 5.78
Reconstructing variable normalization
Now we will attempt to construct the variable normalization that we have shown in the previous parts with dplyr verbs and SQL generation – we will normalize the values of a column by first subtracting the mean value and then dividing the values by the standard deviation:
normalize_invoke <- function(tbl, colName) { sdf <- tbl %>% spark_dataframe() stdCol <- agg_expr(tbl, paste0("stddev_samp(", colName, ")")) avgCol <- agg_expr(tbl, paste0("avg(", colName, ")")) avgTemp <- sdf %>% invoke("agg", avgCol, list()) %>% invoke("first") stdTemp <- sdf %>% invoke("agg", stdCol, list()) %>% invoke("first") newCol <- sdf %>% invoke("col", colName) %>% invoke("minus", as.numeric(avgTemp)) %>% invoke("divide", as.numeric(stdTemp)) sdf %>% invoke("withColumn", colName, newCol) %>% sdf_register() } tbl_weather %>% normalize_invoke("temp") ## # Source: spark<?> [?? x 16] ## id origin year month day hour temp dewp humid wind_dir ## <int> <chr> <dbl> <dbl> <int> <int> <dbl> <dbl> <dbl> <dbl> ## 1 1 EWR 2013 1 1 1 -0.913 26.1 59.4 270 ## 2 2 EWR 2013 1 1 2 -0.913 27.0 61.6 250 ## 3 3 EWR 2013 1 1 3 -0.913 28.0 64.4 240 ## 4 4 EWR 2013 1 1 4 -0.862 28.0 62.2 250 ## 5 5 EWR 2013 1 1 5 -0.913 28.0 64.4 260 ## 6 6 EWR 2013 1 1 6 -0.974 28.0 67.2 240 ## 7 7 EWR 2013 1 1 7 -0.913 28.0 64.4 240 ## 8 8 EWR 2013 1 1 8 -0.862 28.0 62.2 250 ## 9 9 EWR 2013 1 1 9 -0.862 28.0 62.2 260 ## 10 10 EWR 2013 1 1 10 -0.802 28.0 59.6 260 ## # … with more rows, and 6 more variables: wind_speed <dbl>, ## # wind_gust <dbl>, precip <dbl>, pressure <dbl>, visib <dbl>, ## # time_hour <dttm>
The above implementation is just an example and far from optimal, but it also has a few interesting points about it:
- Using
invoke("first")
will actually compute and collect the value into the R session - Those collected values are then sent back during the
invoke("minus", as.numeric(avgTemp))
andinvoke("divide", as.numeric(stdTemp))
This means that there is unnecessary overhead when sending those values from the Spark instance into R and back, which will have slight performance penalties.
Where invoke can be better than dplyr translation or SQL
As we have seen in the above examples, working with the invoke()
API can prove more difficult than using the intuitive syntax of dplyr or SQL queries. In some use cases, the trade-off may still be worth it. In our practice, these are some examples of such situations:
- When Scala’s Spark API is more flexible, powerful or suitable for a particular task and the translation is not as good
- When performance is crucial and we can produce more optimal solutions using the invocations
- When we know the Scala API well and not want to invest time to learn the dplyr syntax, but it is easier to translate the Scala calls into a series of
invoke()
calls - When we need to interact and manipulate other Java objects apart from the standard Spark DataFrames
Conclusion
In this part of the series, we have looked at how to use the lower-level invoke interface provided by sparklyr to manipulate Spark objects and other Java object references. In the following part, we will dig a bit deeper and look into using Java’s reflection API to make the invoke interface more accessible from R, getting detail invocation logs and more.
References
- The first part of this series
- The second part of this series
- The third part of this series
- A Docker image with R, Spark, sparklyr and Arrow available and its Dockerfile.
- Wikipedia’s article on Method Chaining
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.