Installation to connect Spark and H2O in R
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
In the ThinkR Task force, we love playing with H2O in R. Their algorithms for machine learning are really powerful. Combined with Apache Spark through Sparkling Water, H2O provides even more powerful data processing workflows, which you can run on your own laptop.
Installing Spark and H2O so that they work together within R
The H2O documentation is great, but you may still encounter some problems of versions compatibilities. In this blog post, we present how to initiate an association between Spark and H2O within R. This is a step-by-step procedure, showing where the process may stop on errors. So, do not be surprise if some code lead to some errors ! Also, some steps are repeated because you will need to restart your session.
Problem encountered
There are many reasons to get the following error message when using {sparklyr}:
#> Error : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down... ...[cut](see below for the complete message]
A common reason is a problem of memory allocation when using big datasets. However, you can also get this problem from the very beginning, without any dataset. In this case, this is most probably a problem of versions compatibilities.
You can find on github the table of versions compatibilities: https://github.com/h2oai/rsparkling/blob/master/README.md. We’ll speak about it in more details below.
Install libraries
Well. Let’s go through the process. First installation is quite easy with CRAN repository. This will directly download and install h2o along with R libraries.
install.packages(c("rsparkling", "sparklyr", "h2o"))
Then load libraries.
# Libraries library(rsparkling) library(sparklyr) library(h2o) library(here) library(glue)
Define memory allocation
As I said, this is on of the reason for you to get the error message. So this memory allocation parameterization may save your analyses.
Depending on the size of the data and analyses you will want to perform, you may need to allocate an important amount of memory to your processes. Define the following depending on your machine. Here, I run models on my laptop having (16G) of RAM.
I also define a folder (“spark_dir”) in which all fitted models and logs will be stored, so that I can easily know where to find them and see what is happening.
# Just in case you are already connected spark_disconnect_all() # Define a directory where to save model outputs and logs spark_dir <- here("tmp/spark_dir/") # Set driver and executor memory allocations **depending on your machine** config <- spark_config() config$`sparklyr.shell.driver-memory` <- "8G" config$`sparklyr.shell.executor-memory` <- "8G" config$spark.yarn.executor.memoryOverhead <- "4g" config$`sparklyr.shell.driver-java-options` <- glue("-Djava.io.tmpdir={spark_dir}")
First connection to Spark
To install and connect to Apache Spark, you will use spark_connect
using configuration we set up above.
# This may install one version of Spark the first time sc <- spark_connect(master = "local", config = config)
You will probably get the following message:
#> * Using Spark: 2.2.1 #> Spark version 2.2 detected. Will call latest Sparkling Water version 2.2.12 #> Detected H2O version 3.16.0.2. Please install H2O version 3.18.0.2, which is compliant with the latest Sparkling Water version for Spark 2.2.* -> Sparkling Water version 2.2.12 #> To update your h2o R package, copy/paste the following commands and then restart your R session: #> detach("package:rsparkling", unload = TRUE) #> if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) } #> if (isNamespaceLoaded("h2o")){ unloadNamespace("h2o") } #> remove.packages("h2o") #> install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/2/R")
Indeed, version of H2O installed by default within R by {h2o} library is not totally in accordance with version required by Sparkling Water. Hence, you need to do what is recommended in the message to install the correct version of H2O.
Here, you will need to restart your R session (without saving workspace of course. Never do it…)
Start H2O cluster
You restarted your R session. Let’s load libraries and define configuration again. And start Spark.
# Libraries library(rsparkling) library(sparklyr) library(h2o) library(here) library(glue) # Just in case you are already connected spark_disconnect_all() # Define a directory where to save model outputs and logs spark_dir <- here("tmp/spark_dir/") # Set driver and executor memory allocations **depending on your machine** config <- spark_config() config$`sparklyr.shell.driver-memory` <- "8G" config$`sparklyr.shell.executor-memory` <- "8G" config$spark.yarn.executor.memoryOverhead <- "4g" config$`sparklyr.shell.driver-java-options` <- glue("-Djava.io.tmpdir={spark_dir}") # Connect to Spark sc <- spark_connect(master = "local", config = config)
No error message !
#> * Using Spark: 2.2.1 #> Spark version 2.2 detected. Will call latest Sparkling Water version 2.2.12
If you work within Rstudio, you probably remarked that your “Connection Pane” shows your Spark connection. With no tables if you are not connected to an existing Spark session.
Now that we started connection to Spark, you may want to initiation H2O cluster. Let’s test for H2O and Spark connection.
# Create h2o cluster h2o_context(sc)
Well, a new error message…
#> Version mismatch! H2O is running version 3.18.0.7 but h2o-R package is version 3.18.0.2.
#> Install the matching h2o-R version from – http://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/7/index.html
What do we do with this one ? If we do install the recommended H2O version, it is not the same as the one recommended when we used spark_connect
. So which one to choose ?
In this case and from now, I recommended to use parameter strict_version_check = FALSE
, because we do not really care if H2O is not totally the version used for compiling the R {h2o} package. This should work as well.
Let’s create the context…
# Create context with Sparkling Water h2o_context(sc, strict_version_check = FALSE)
… and finally start H2O clusters ! Choose the number of cores you allow it to use ((4) for me) as well as the maximum memory allocation. Yes, memory again, but this is H2O here, not spark. I have (16G) available, let me allow for “(14G)”.
# Start H2O clusters h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "14G", strict_version_check = FALSE) #> Connection successful!
Yeah ! This worked ! Well. This should work. Indeed, depending on the Spark version you define, this is not always the case. And at this point, you may encounter the erro message:
#> Error : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down...
Define Sparkling Water version
On my machine, this worked with the default settings. In my case, Spark 2.2.1 is installed and work with Sparkling Water 2.2.12.
If you installed all these a few months ago, or if you work with different versions of Apache Spark like Spark 2.1, you will probably get the error message. To get rid of it, you only have to specify, which version of Sparkling Water to run. And to know what is working with what, let’s have a look at the table in this page : https://github.com/h2oai/rsparkling/blob/master/README.md
You can see the version of Spark compatibilities with Sparkling Water and H2O. Hence, to be correct, here is what you can define:
- Find the Spark version you want:
spark_available_versions()
- Install Spark version you want:
spark_install(version = "2.2.1")
- Fix the correct sparkling water version:
options(rsparkling.sparklingwater.version = "2.2.11")
- Install the adequate H2O
# Install adequate h2o detach("package:rsparkling", unload = TRUE) if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) } if (isNamespaceLoaded("h2o")) { unloadNamespace("h2o") } remove.packages("h2o", lib = .libPaths()[1]) install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R" ) # **Restart your session**
Summary: complete configurations
Below, you will find the complete code that you can use on every Spark + H2O = Sparkling Water
R script project you start.
Update versions depending on this page: https://github.com/h2oai/rsparkling/blob/master/README.md and adapt memory parameters depending on your machine.
Spark 2.1
Restart your Rstudio session before.
# Define sparkling water version for Spark 2.1 options(rsparkling.sparklingwater.version = "2.1.25") # Libraries library(rsparkling) library(sparklyr) library(h2o) library(here) library(glue) # Install spark (eventually with specific hadoop version) spark_available_versions(show_hadoop = TRUE) spark_install(version = "2.1.1", hadoop_version = "2.7") # Set driver and executor memory allocations spark_dir <- here("tmp/spark/") config <- spark_config() # config$spark.driver.memory <- "6G" #config$spark.executor.memory <- "6G" config$`sparklyr.shell.driver-memory` <- "6G" config$`sparklyr.shell.executor-memory` <- "6G" config$spark.yarn.executor.memoryOverhead <- "2g" glue("-Djava.io.tmpdir={spark_dir}") needed <- FALSE if (needed) { # Install adequate h2o detach("package:rsparkling", unload = TRUE) if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) } if (isNamespaceLoaded("h2o")) { unloadNamespace("h2o") } remove.packages("h2o", lib = .libPaths()[1]) install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R" ) # **Restart your R session** } # Connect to spark version 2.1.1 sc <- spark_connect( master = "local", config = config, version = "2.1.1") # Create Sparkling Water context h2o_context(sc, strict_version_check = FALSE) # Stop earlier h2o cluster if exists h2o.shutdown() # Create h2o cluster h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G", strict_version_check = FALSE) # clean slate - just in case the cluster was already running h2o.removeAll()
Spark 2.2
Restart your Rstudio session before.
# Define Sparkling Water version for Spark 2.2 options(rsparkling.sparklingwater.version = "2.2.11") # Load libraries library(rsparkling) library(sparklyr) library(h2o) library(here) library(glue) # Install spark (eventually with specific hadoop version) spark_available_versions(show_hadoop = TRUE) spark_install(version = "2.2.1", hadoop_version = "2.7") # Set driver and executor memory allocations spark_dir <- here("tmp/spark/") config <- spark_config() # config$spark.driver.memory <- "6G" #config$spark.executor.memory <- "6G" config$`sparklyr.shell.driver-memory` <- "6G" config$`sparklyr.shell.executor-memory` <- "6G" config$spark.yarn.executor.memoryOverhead <- "2g" glue("-Djava.io.tmpdir={spark_dir}") needed <- FALSE if (needed) { # Install adequate h2o detach("package:rsparkling", unload = TRUE) if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) } if (isNamespaceLoaded("h2o")) { unloadNamespace("h2o") } remove.packages("h2o", lib = .libPaths()[1]) install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R" ) # **Restart your R session** } # Connect to spark version 2.2.1 sc <- spark_connect( master = "local", config = config, version = "2.2.1") # Create Sparkling Water context h2o_context(sc, strict_version_check = FALSE) # Stop earlier h2o cluster if exists h2o.shutdown() # Create h2o cluster h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G", strict_version_check = FALSE) # clean slate - just in case the cluster was already running h2o.removeAll() # h2o.shutdown()
Spark 2.3
Restart your Rstudio session before.
Compatibilities:
- Spark 2.3 + Sparkling Water 2.3.1 + H2O 3.18.0.7 (rel-wolpert/7)
# Define Sparkling Water version for Spark 2.3 options(rsparkling.sparklingwater.version = "2.3.1") # Load libraries library(rsparkling) library(sparklyr) library(h2o) library(here) library(glue) # Install spark (eventually with specific hadoop version) spark_available_versions(show_hadoop = TRUE) spark_install(version = "2.3.0", hadoop_version = "2.7") # Set driver and executor memory allocations spark_dir <- here("tmp/spark/") config <- spark_config() # config$spark.driver.memory <- "6G" #config$spark.executor.memory <- "6G" config$`sparklyr.shell.driver-memory` <- "6G" config$`sparklyr.shell.executor-memory` <- "6G" config$spark.yarn.executor.memoryOverhead <- "2g" glue("-Djava.io.tmpdir={spark_dir}") needed <- FALSE if (needed) { # Install adequate h2o detach("package:rsparkling", unload = TRUE) if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) } if (isNamespaceLoaded("h2o")) { unloadNamespace("h2o") } remove.packages("h2o", lib = .libPaths()[1]) install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/7/R" ) # **Restart your R session** } # Connect to spark version 2.3.0 sc <- spark_connect( master = "local", config = config, version = "2.3.0") # Create Sparkling Water context h2o_context(sc, strict_version_check = FALSE) # Stop earlier h2o cluster if exists h2o.shutdown() # Create h2o cluster h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G", strict_version_check = FALSE) # clean slate - just in case the cluster was already running h2o.removeAll()
Test if this works
If you want to test if your connection between Spark and H2O is working correctly, you can run the following machine learning algorithm example. This is the example of {rsparkling} github repository.
library(dplyr) # Copy data to Spark (See it in Connection pane in Rstudio) mtcars_tbl <- copy_to(sc, mtcars, overwrite = TRUE) # Convert to Spark data to H2O mtcars_hf <- as_h2o_frame(sc, mtcars_tbl) # Define variables y <- "mpg" x <- setdiff(names(mtcars_hf), y) # Split the mtcars H2O Frame into train & test sets splits <- h2o.splitFrame(mtcars_hf, ratios = 0.7, seed = 1) # Fit a gradient boost model training_frame = splits[[1]], min_rows = 1, seed = 1) print(fit) #> Model Details: #> ============== #> H2ORegressionModel: gbm #> number_of_trees number_of_internal_trees model_size_in_bytes min_depth max_depth #> 1 50 50 14804 5 5 #> mean_depth min_leaves max_leaves mean_leaves #> 1 5.00000 17 21 18.64000 #> H2ORegressionMetrics: gbm #> ** Reported on training data. ** #> MSE: 0.001211724 #> RMSE: 0.03480983 #> MAE: 0.02761402 #> RMSLE: 0.001929304 #> Mean Residual Deviance : 0.001211724
Complete error message
This is the complete error message you may encounter if Sparkling Water has not been set correctly or if your memory allocation was too low for models you wanted to run.
#> Erreur : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down #> at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808) #> at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) #> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) #> at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:806) #> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1668) #> at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83) #> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1587) #> at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1833) #> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283) #> at org.apache.spark.SparkContext.stop(SparkContext.scala:1832) #> at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581) #> at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188) #> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) #> at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188) #> at scala.util.Try$.apply(Try.scala:192) #> at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) #> at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) #> at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) #> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) #> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) #> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) #> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) #> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) #> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) #> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) #> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) #> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) #> at org.apache.spark.rdd.RDD.collect(RDD.scala:935) #> at org.apache.spark.h2o.backends.internal.InternalBackendUtils$class.startH2O(InternalBackendUtils.scala:163) #> at org.apache.spark.h2o.backends.internal.InternalBackendUtils$.startH2O(InternalBackendUtils.scala:262) #> at org.apache.spark.h2o.backends.internal.InternalH2OBackend.init(InternalH2OBackend.scala:99) #> at org.apache.spark.h2o.H2OContext.init(H2OContext.scala:129) #> at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:381) #> at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:416) #> at org.apache.spark.h2o.H2OContext.getOrCreate(H2OContext.scala) #> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) #> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) #> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) #> at java.lang.reflect.Method.invoke(Method.java:498) #> at sparklyr.Invoke$.invoke(invoke.scala:102) #> at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97) #> at sparklyr.StreamHandler$.read(stream.scala:62) #> at sparklyr.BackendHandler.channelRead0(handler.scala:52) #> at sparklyr.BackendHandler.channelRead0(handler.scala:14) #> at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) #> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) #> at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) #> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) #> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) #> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) #> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346) #> at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367) #> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353) #> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) #> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) #> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) #> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) #> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) #> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) #> at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) #> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) #> at java.lang.Thread.run(Thread.java:748)
The post Installation to connect Spark and H2O in R appeared first on The R Task Force.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.