Working With R and Big Data: Use Replyr
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
In our latest R and Big Data article we discuss replyr.
Why replyr
replyr
stands for REmote PLYing of big data for R.
Why should R users try replyr
? Because it lets you take a number of common working patterns and apply them to remote data (such as databases or Spark
).
replyr
allows users to work with Spark
or database data similar to how they work with local data.frame
s. Some key capability gaps remedied by replyr
include:
- Summarizing data:
replyr_summary()
. - Combining tables:
replyr_union_all()
. - Binding tables by row:
replyr_bind_rows()
. - Using the split/apply/combine pattern (
dplyr::do()
):replyr_split()
,replyr::gapply()
. - Pivot/anti-pivot (
gather
/spread
):replyr_moveValuesToRows()
/replyr_moveValuesToColumns()
. - Handle tracking.
- A join controller.
You may have already learned to decompose your local data processing into steps including the above, so retaining such capabilities makes working with Spark
and sparklyr
much easier. Some of the above capabilities will likely come to the tidyverse
, but the above implementations are build purely on top of dplyr
and are the ones already being vetted and debugged at production scale (I think these will be ironed out and reliable sooner).
replyr
is the product of collecting experience applying R
at production scale with many clients, collecting issues and filling-in gaps.
Below are some examples.
Examples
Things are changing fast right now, so let’s use the development versions of the packages for our examples.
base::date()
## [1] "Thu Jul 6 15:56:28 2017"
# devtools::install_github('rstudio/sparklyr') # devtools::install_github('tidyverse/dplyr') # devtools::install_github('tidyverse/dbplyr') # install.packages("replyr") suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr")
## [1] '0.7.1.9000'
packageVersion("dbplyr")
## [1] '1.1.0.9000'
library("tidyr") packageVersion("tidyr")
## [1] '0.6.3'
library("replyr") packageVersion("replyr")
## [1] '0.4.2'
suppressPackageStartupMessages(library("sparklyr")) packageVersion("sparklyr")
## [1] '0.5.6.9012'
# more memory as suggested in https://github.com/rstudio/sparklyr/issues/783 config <- spark_config() config[["sparklyr.shell.driver-memory"]] <- "8G" sc <- sparklyr::spark_connect(version='2.1.0', hadoop_version = '2.7', master = "local", config = config)
summary
Standard summary()
and glance()
, all fail on Spark
.
mtcars_spark <- copy_to(sc, mtcars) # gives summary of handle, not data summary(mtcars_spark)
## Length Class Mode ## src 1 src_spark list ## ops 2 op_base_remote list
packageVersion("broom")
## [1] '0.4.2'
broom::glance(mtcars_spark)
## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
replyr_summary
works.
replyr_summary(mtcars_spark) %>% select(-lexmin, -lexmax, -nunique, -index)
## column class nrows nna min max mean sd ## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481 ## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216 ## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938 ## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685 ## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787 ## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574 ## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432 ## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161 ## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909 ## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041 ## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
gather
/spread
tidyr
pretty much only works on local data.
mtcars2 <- mtcars %>% mutate(car = row.names(mtcars)) %>% copy_to(sc, ., 'mtcars2') # errors out mtcars2 %>% tidyr::gather('fact', 'value')
## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', nameForNewValueColumn= 'value', columnsToTakeFrom= colnames(mtcars), nameForNewClassColumn= 'class') %>% arrange(car, fact)
## # Source: lazy query [?? x 4] ## # Database: spark_connection ## # Ordered by: car, fact ## car fact value class ## <chr> <chr> <dbl> <chr> ## 1 AMC Javelin am 0.00 numeric ## 2 AMC Javelin carb 2.00 numeric ## 3 AMC Javelin cyl 8.00 numeric ## 4 AMC Javelin disp 304.00 numeric ## 5 AMC Javelin drat 3.15 numeric ## 6 AMC Javelin gear 3.00 numeric ## 7 AMC Javelin hp 150.00 numeric ## 8 AMC Javelin mpg 15.20 numeric ## 9 AMC Javelin qsec 17.30 numeric ## 10 AMC Javelin vs 0.00 numeric ## # ... with 342 more rows
Binding rows
dplyr
bind_rows
, union
, and union_all
are all currently unsuitable for use on Spark
. replyr::replyr_union_all()
and replyr::replyr_bind_rows()
supply working alternatives.
bind_rows()
db1 <- copy_to(sc, data.frame(x=1:2, y=c('a','b'), stringsAsFactors=FALSE), name='db1') db2 <- copy_to(sc, data.frame(y=c('c','d'), x=3:4, stringsAsFactors=FALSE), name='db2') # Errors out as it tries to operate on the handles instead of the data. bind_rows(list(db1, db2))
## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
union_all
# ignores column names and converts all data to char union_all(db1, db2)
## # Source: lazy query [?? x 2] ## # Database: spark_connection ## x y ## <int> <chr> ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
union
# ignores column names and converts all data to char # also will probably lose duplicate rows union(db1, db2)
## # Source: lazy query [?? x 2] ## # Database: spark_connection ## x y ## <int> <chr> ## 1 4 d ## 2 1 a ## 3 3 c ## 4 2 b
replyr_bind_rows
replyr::replyr_bind_rows
can bind multiple data.frame
s together.
replyr_bind_rows(list(db1, db2))
## # Source: table<sparklyr_tmp_3db057f187f1> [?? x 2] ## # Database: spark_connection ## x y ## <int> <chr> ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
dplyr::do
Our example is just taking a few rows from each group of a grouped data set. Note: since we are not enforcing order by an arrange we can’t expect the results to always match on database or Spark
data sources.
dplyr::do
on local data
From help('do', package='dplyr')
:
by_cyl <- group_by(mtcars, cyl) do(by_cyl, head(., 2))
## # A tibble: 6 x 11 ## # Groups: cyl [3] ## mpg cyl disp hp drat wt qsec vs am gear carb ## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
dplyr::do
on Spark
by_cyl <- group_by(mtcars_spark, cyl) do(by_cyl, head(., 2))
## # A tibble: 3 x 2 ## cyl V2 ## <dbl> <list> ## 1 6 <NULL> ## 2 4 <NULL> ## 3 8 <NULL>
Notice we did not get back usable results.
replyr
split/apply
mtcars_spark %>% replyr_split('cyl', partitionMethod = 'extract') %>% lapply(function(di) head(di, 2)) %>% replyr_bind_rows()
## # Source: table<sparklyr_tmp_3db01fba6f09> [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr
gapply
mtcars_spark %>% gapply('cyl', partitionMethod = 'extract', function(di) head(di, 2))
## # Source: table<sparklyr_tmp_3db015cf9a8> [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr::replyr_apply_f_mapped
wrapr::let
was only the secondary proposal in the original 2016 "Parametric variable names" article. What we really wanted was a stack of view so the data pretended to have names that matched the code (i.e., re-mapping the data, not the code).
With a bit of thought we can achieve this if we associate the data re-mapping with a function environment instead of with the data. So a re-mapping is active as long as a given controlling function is in control. In our case that function is replyr::replyr_apply_f_mapped()
and works as follows:
Suppose the operation we wish to use is a rank-reducing function that has been supplied as function from somewhere else that we do not have control of (such as a package). The function could be simple such as the following, but we are going to assume we want to use it without alteration (including the without the small alteration of introducing wrapr::let()
).
# an external function with hard-coded column names DecreaseRankColumnByOne <- function(d) { d$RankColumn <- d$RankColumn - 1 d }
To apply this function to d
(which doesn’t have the expected column names!) we use replyr::replyr_apply_f_mapped()
to create a new parameterized adapter as follows:
# our data d <- data.frame(Sepal_Length = c(5.8,5.7), Sepal_Width = c(4.0,4.4), Species = 'setosa', rank = c(1,2)) # a wrapper to introduce parameters DecreaseRankColumnByOneNamed <- function(d, ColName) { replyr::replyr_apply_f_mapped(d, f = DecreaseRankColumnByOne, nmap = c(RankColumn = ColName), restrictMapIn = FALSE, restrictMapOut = FALSE) } # use dF <- DecreaseRankColumnByOneNamed(d, 'rank') print(dF)
## Sepal_Length Sepal_Width Species rank ## 1 5.8 4.0 setosa 0 ## 2 5.7 4.4 setosa 1
replyr::replyr_apply_f_mapped()
renames the columns to the names expected by DecreaseRankColumnByOne
(the mapping specified in nmap
), applies DecreaseRankColumnByOne
, and then inverts the mapping before returning the value.
Handle management
Many Sparklyr
tasks involve creation of intermediate or temporary tables. This can be through dplyr::copy_to()
and through dplyr::compute()
. These handles can represent a reference leak and eat up resources.
To help control handle lifetime the replyr
supplies handle trackers: record-retaining temporary name generators (and uses the same internally).
The actual function is pretty simple:
print(replyr::makeTempNameGenerator)
## function (prefix, suffix = NULL) ## { ## force(prefix) ## if ((length(prefix) != 1) || (!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if (is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), ## collapse = "") ## } ## count <- 0 ## nameList <- list() ## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) { ## if (length(list(...)) > 0) { ## stop("replyr::makeTempNameGenerator tempname generate unexpected argument") ## } ## if (peek) { ## return(names(nameList)) ## } ## if (dumpList) { ## v <- names(nameList) ## nameList <<- list() ## return(v) ## } ## if (!is.null(remove)) { ## victims <- intersect(remove, names(nameList)) ## nameList[victims] <<- NULL ## return(victims) ## } ## nm <- paste(prefix, suffix, sprintf("%010d", count), ## sep = "_") ## nameList[[nm]] <<- 1 ## count <<- count + 1 ## nm ## } ## } ## <bytecode: 0x7f809a30d320> ## <environment: namespace:replyr>
For instance to join a few tables it can be a good idea to call compute
after each join for some data sources (else the generated SQL
can become large and unmanageable). This sort of code looks like the following:
# create example data names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) # build our temp name generator tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') # left join the tables in sequence joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { # use non-temp name. joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } # clean up temps temps <- tmpNamGen(dumpList = TRUE) print(temps)
## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000" ## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001" ## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002"
for(ti in temps) { db_drop_table(sc, ti) } # show result print(joined)
## # Source: table<joinres> [?? x 6] ## # Database: spark_connection ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## <int> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796 ## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018 ## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551
Careful introduction and management of materialized intermediates can conserve resources (both time and space) and greatly improve outcomes. We feel it is a good practice to set up an explicit temp name manager, pass it through all your Sparklyr
transforms, and then clear temps in batches after the results no longer depend on the intermediates.
Conclusion
If you are serious about R
controlled data processing in Spark
or databases you should seriously consider using replyr
in addition to dplyr
and sparklyr
.
sparklyr::spark_disconnect(sc) rm(list=ls()) gc()
## used (Mb) gc trigger (Mb) max used (Mb) ## Ncells 821292 43.9 1442291 77.1 1168576 62.5 ## Vcells 1364897 10.5 2552219 19.5 1694265 13.0
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.