Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Introduction
The {sparklyr}
package lets us connect and use Apache Spark for high-performance, highly parallelized, and distributed computations. We can also use Spark’s capabilities to improve and streamline our data processing pipelines, as Spark supports reading and writing from many popular sources such as Parquet, Orc, etc. and most database systems via JDBC drivers.
In this post, we will explore using R to perform data loads to Spark and optionally R from relational database management systems such as MySQL, Oracle, and MS SQL Server and show how such processes can be simplified. We will also provide reproducible code via a Docker image, such that interested readers can experiment with it easily.
Contents
Getting test data into a MySQL database
If you are interested only in the Spark loading part, feel free to skip this paragraph.
For a fully reproducible example, we will use a local MySQL server instance as due to its open-source nature it is very accessible. We will use the {DBI}
and {RMySQL}
packages to connect to the server directly from R and populate a database with data provided by the {nycflights13}
package that we will later use for our Spark loads.
Let us write the flights
data frame into the MySQL database using {DBI}
and call the newly created table test_table
:
test_df <- nycflights13::flights # Create a connection to database `testdb` con <- DBI::dbConnect( drv = RMySQL::MySQL(), host = "localhost", dbname = "testdb", user = "rstudio", password = "pass" ) # Write our `test_df` into a table called `test_table` DBI::dbWriteTable(con, "test_table", test_df, overwrite = TRUE) # Close the connection DBI::dbDisconnect(con)
Now we have our table available and we can focus on the main part of the article.
Using JDBC to connect to database systems from Spark
Getting a JDBC driver and using it with Spark and sparklyr
Since Spark runs via a JVM, the natural way to establish connections to database systems is using Java Database Connectivity (JDBC). To do that, we will need a JDBC driver which will enable us to interact with the database system of our choice. For this example, we are using MySQL, but we provide details on other RDBMS later in the article.
Downloading and extracting the connector jar
With a bit of online search, we can download the driver and extract the contents of the zip file:
mkdir -p $HOME/jars wget -q -t 3 \ -O $HOME/jars/mysql-connector.zip \ https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.21.zip unzip -q -o \ -d $HOME/jars \ $HOME/jars/mysql-connector.zip
Now the file we are most interested in for our use case the .jar
file that contains classes necessary to establish the connection. Using R, we can locate the extracted jar file(s), for example using the dir()
function:
jars <- dir("~/jars", pattern = "jar$", recursive = TRUE, full.names = TRUE) basename(jars) ## [1] "mysql-connector-java-8.0.21.jar"
Connecting using the jar
Next we need to tell {sparklyr}
to use that resource when establishing a Spark connection, for example by adding a sparklyr.jars.default
element with the paths to the necessary jar files to the config
list and finally establish the Spark connection using our config
:
config <- list(sparklyr.jars.default = jars) sc <- sparklyr::spark_connect("local", config = config)
Retrieving data from a database with sparklyr
With the Spark connection established, we can connect to our MySQL database from Spark and retrieve the data. {sparklyr}
provides a handy spark_read_jdbc()
function for this exact purpose. The API maps closely to the Scala API, but it is not very explicit in how to set up the connection. The key here is the options
argument to spark_read_jdbc()
, which will specify all the connection details we need.
Setting the options
argument of spark_read_jdbc()
First, let us create a jdbcConnectionOpts
list with the basic connection properties. These are the connection URL and the driver. Below we also explictly specify the user
and password
, but these can usually also be provided as part of the URL:
# Connection options jdbcConnectionOpts <- list( url = "jdbc:mysql://localhost:3306/testdb", driver = "com.mysql.cj.jdbc.Driver", user = "rstudio", password = "pass" )
The last bit of information we need to provide is the identification of the data we want to extract once the connection is established. For this, we can use one of two options:
dbtable
– in case we want to create a Spark DataFrame by extracting contents of a specific tablequery
– in case we want to create a Spark DataFrame by executing a SQL query
Loading a specific database table
First let us go with the option to load a database table that we populated with the flights earlier and named test_table
, putting it all together and loading the data using spark_read_jdbc()
:
# Other options specific to the action jdbcDataOpts <- list(dbtable = "test_table") # Use spark_read_jdbc() to load the data test_tbl <- sparklyr::spark_read_jdbc( sc = sc, name = "test_table", options = append(jdbcConnectionOpts, jdbcDataOpts), memory = FALSE ) # Print some records test_tbl ## # Source: spark<test_table> [?? x 20] ## row_names year month day dep_time sched_dep_time dep_delay arr_time ## <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 1 2013 1 1 517 515 2 830 ## 2 2 2013 1 1 533 529 4 850 ## 3 3 2013 1 1 542 540 2 923 ## 4 4 2013 1 1 544 545 -1 1004 ## 5 5 2013 1 1 554 600 -6 812 ## 6 6 2013 1 1 554 558 -4 740 ## 7 7 2013 1 1 555 600 -5 913 ## 8 8 2013 1 1 557 600 -3 709 ## 9 9 2013 1 1 557 600 -3 838 ## 10 10 2013 1 1 558 600 -2 753 ## # … with more rows, and 12 more variables: sched_arr_time <dbl>, ## # arr_delay <dbl>, carrier <chr>, flight <dbl>, tailnum <chr>, ## # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, ## # minute <dbl>, time_hour <chr>
We provided the following arguments:
sc
is the Spark connection that we established using the config that includes necessary jarsname
is a character string with the name to be assigned to the newly generated table within Spark SQL, not the name of the source table we want to read from our databaseoptions
is a list with both the connection options and the data-related options, so we useappend()
to combine thejdbcConnectionOpts
andjdbcDataOpts
lists into onememory
is a logical that tells Spark whether we want to cache the table into memory. A bit more on that and some performance implications below
Executing a query instead
We mentioned above that apart from just loading a table, we can also choose to execute a SQL query and use its result as the source for our Spark DtaFrame. Here is a simple example of that.
# Use `query` instead of `dbtable` jdbcDataOpts <- list( query = "SELECT * FROM test_table WHERE tailnum = 'N14228'" ) # Use spark_read_jdbc() to load the data test_qry <- sparklyr::spark_read_jdbc( sc = sc, name = "test_table", options = append(jdbcConnectionOpts, jdbcDataOpts), memory = FALSE ) # Print some records test_qry ## # Source: spark<test_table> [?? x 20] ## row_names year month day dep_time sched_dep_time dep_delay arr_time ## <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 1 2013 1 1 517 515 2 830 ## 2 6570 2013 1 8 1435 1440 -5 1717 ## 3 7111 2013 1 9 717 700 17 812 ## 4 7349 2013 1 9 1143 1144 -1 1425 ## 5 10593 2013 1 13 835 824 11 1030 ## 6 13775 2013 1 16 1829 1730 59 2117 ## 7 18967 2013 1 22 1902 1808 54 2214 ## 8 19417 2013 1 23 1050 1056 -6 1143 ## 9 19648 2013 1 23 1533 1529 4 1641 ## 10 21046 2013 1 25 724 720 4 1000 ## # … with more rows, and 12 more variables: sched_arr_time <dbl>, ## # arr_delay <dbl>, carrier <chr>, flight <dbl>, tailnum <chr>, ## # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, ## # minute <dbl>, time_hour <chr>
Note that the only element that changed is the
jdbcDataOpts
list, which now contains aquery
element instead of adbtable
element.
Other RDBM Systems
Our toy example with MySQL worked fine, but in practice, we might need to access data in other popular RDBM systems, such as Oracle, MS SQL Server, and others. The pattern we have shown above however remains, as the API design is the same regardless of the system in question.
In general, we will need 3 elements to successfully connect:
- A JDBC driver specified and the resources provided to
{sparklyr}
in theconfig
argument ofspark_connect()
, usually in the form of paths to .jar files containing the necessary resources - A connection URL that will depend on the system and other setup specifics
- Last but not least, all the technical and infrastructural prerequisites such as credentials with the proper access rights, the host being accessible from the Spark cluster, etc.
Now for some examples that we have worked with in the past and had success with.
Oracle
Oracle JDBC Driver
The drivers can be downloaded (after login) from Oracle’s website and the driver name usually is "oracle.jdbc.driver.OracleDriver"
. Make sure you use the appropriate version.
Using fully qualified host identification
hostName <- "0.0.0.0" portNumber <- "1521" serviceName <- "service_name" jdbcConnectionOpts <- list( user = "username", password = "password", driver = "oracle.jdbc.driver.OracleDriver", fetchsize = "100000", url = paste0( "jdbc:oracle:thin:@//", hostName, ":", portNumber, "/", serviceName ) )
Using tnsnames.ora
The tnsnames.ora
file is a configuration file that contains network service names mapped to connect descriptors for the local naming method, or net service names mapped to listener protocol addresses. With this in place, we can use just the service name instead of fully qualified host, port, and service identification, for example:
serviceName <- "service_name" jdbcConnectionOpts <- list( user = "username", password = "password", driver = "oracle.jdbc.driver.OracleDriver", fetchsize = "100000", url = paste0("jdbc:oracle:thin:@", serviceName) )
Parsing special data types
Note that the JDBC driver on its own may not be enough to parse all data types in an Oracle database. For instance, parsing the XMLType
will very likely require xmlparserv2.jar
, and xdb.jar
along with the proper ojdbc*.jar
.
MS SQL Server
MS SQL Server JDBC Driver
The drivers for different JRE versions can be downloaded from the Download Microsoft JDBC Driver for SQL Server website. Again, make sure that the JRE version matches the one you use in your environments.
MS SQL Server connection options
serverName <- "0.0.0.0" portNumber <- "1433" databaseName <- "db_name" jdbcConnectionOpts <- list( user = "username", password = "password", driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver", fetchsize = "100000", url = paste0( "jdbc:sqlserver://", serverName, ":", portNumber, ";databaseName=", databaseName ) )
Even more RDBM Systems
Vlad Mihalcea wrote a very useful article on JDBC Driver Connection URL strings which has the connection URL details for several other common database systems.
Some notes on performance
The memory
argument
The memory
argument to spark_read_jdbc()
can prove very important when performance is of interest. What happens when using the default memory = TRUE
is that the table in the Spark SQL context is cached using CACHE TABLE
and a SELECT count(*) FROM
query is executed on the cached table. This forces Spark to perform the action of loading the entire table into memory.
Depending on our use case, it might be much more beneficial to use memory = FALSE
and only cache into Spark memory the parts of the table (or processed results) that we need, as the most time-costly operations usually are data transfers over the network. Transferring as little data as possible from the database into Spark memory may bring significant performance benefits.
This is a bit difficult to show with our toy example, as everything is physically happening inside the same container (and therefore the same file system), but differences can be observed even with this setup and our small dataset:
microbenchmark::microbenchmark( times = 10, setup = { library(dplyr) library(sparklyr) sparklyr::spark_disconnect_all() sc <- sparklyr::spark_connect("local", config = config) }, # with memory=TRUE (the default) eager = { one <- sparklyr::spark_read_jdbc( sc = sc, name = "test", options = append(jdbcConnectionOpts, list(dbtable = "test_table")) ) %>% filter(tailnum == "N14228") %>% select(tailnum, distance) %>% compute("test") }, # with memory=FALSE lazy = { two <- sparklyr::spark_read_jdbc( sc = sc, name = "test", options = append(jdbcConnectionOpts, list(dbtable = "test_table")), memory = FALSE ) %>% filter(tailnum == "N14228") %>% select(tailnum, distance) %>% compute("test") } ) # Unit: seconds # expr min lq mean median uq max neval # eager 15.460844 16.24838 17.07560 17.03592 17.88299 18.73005 10 # lazy 9.821039 10.12435 10.40718 10.42766 10.70024 10.97283 10
We see that the “lazy” approach that does not cache the entire table into memory has yielded the result around 41% faster. This is of course by no means a relevant benchmark for real-life data loads but can provide some insight into optimizing the loads.
Partitioning
Partitioning the data can bring a very significant performance boost and we will look into setting it up and optimizing it in detail in a separate article.
Running the code in this article
If you have Docker available, running the following should yield a Docker container with RStudio Server exposed on port 8787, so you can open your web browser at http://localhost:8787
to access it and experiment with the code. The user name is rstudio
and the password is as you choose below:
docker run -d -p 8787:8787 -e PASSWORD=pass jozefhajnala/jozefio
References
- JDBC Driver Connection URL strings
- MS SQL Server: Programming Guide for JDBC – Building the Connection URL
- Oracle: Database JDBC Developer’s Guide and Reference – Data Sources and URLs
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.