Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
I’ve blogged about how to use Amazon Athena with R before and if you are a regular Athena user, you’ve likely run into a situation where you prepare a dplyr
chain, fire off a collect()
and then wait.
And, wait.
And, wait.
And, wait.
Queries that take significant processing time or have large result sets do not play nicely with the provided ODBC and JDBC drivers. This means “hung” R sessions and severe frustration, especially when you can login to the AWS Athena console and see that the results are right there!!
I’ve been crafting SQL by hand or using sql_render()
by hand to avoid this (when I remember to) but finally felt sufficient frustration to craft a better way, provided you can install and run rJava
-based code (it’s 2018 and that still is not an easy given on many systems unfortunately).
There are two functions below:
collect_async()
, andgather_results()
The collect_async()
function is designed to be used like collect()
but uses Athena components from the AWS SDK for Java to execute the SQL query behind the dplyr
chain asynchronously. The companion function gather_results()
takes the object created by collect_async()
and checks to see if the results are ready. If if they are, it will use the aws.s3
package to download them. Personally, I’d just aws s3 sync ...
from the command line vs use the aws.s3
package but that’s not everyone’s cup of tea.
Once I figure out the best package API for this I’ll add it to the metis
package. There are many AWS idiosyncrasies that need to be accounted for and I’d rather ship this current set of functions via the blog so folks can use it (and tweak it to their needs) before waiting for perfection.
Here’s the code:
library(rJava) library(awsjavasdk) library(aws.signature) library(aws.s3) library(odbc) library(tidyverse) library(dbplyr) #' Collect Amazon Athena query results asynchronously #' #' Long running Athena queries and Athena queries with large result #' sets can seriously stall a `dplyr` processing chain due to poorly #' implemented ODBC and JDBC drivers. The AWS SDK for Athena has #' methods that support submitting a query asynchronously for "batch" #' processing. All Athena resutls are stored in CSV files in S3 and it's #' easy to use the R `aws.s3` package to grab these or perform an #' `aws s3 sync ...` operation on the command line. #' #' @md #' @param obj the `dplyr` chain #' @param schema Athena schema (usually matches the `Schema` parameter to the #' Simba ODBC connection) #' @param region Your AWS region. All lower case with dashes (usually matches #' the `AwsRegion` parameter to the Simba ODBC connection) #' @param results_bucket the S3 results bucket where query results are stored #' (usually matches the `S3OutputLocation` parameter to the Simba ODBC #' connection) #' @return a `list` with the query execution ID and the S3 bucket. This object #' is designed to be passed to the companion `gather_results()` if you #' want to use the `aws.s3` package to retrieve the results. Otherwise, #' sync the file however you want using the query execution id. #' @note You may need to change up the authentication provider depending on how #' you use credentials with Athena collect_async <- function(obj, schema, region, results_bucket) { ugly_query <- as.character(sql_render(obj)) region <- toupper(region) region <- gsub("-", "_", region, fixed=TRUE) regions <- J("com.amazonaws.regions.Regions") available_regions <- grep("^[[:upper:][:digit:]_]+$", names(regions), value=TRUE) if (!region %in% available_regions) stop("Invalid region.", call.=FALSE) switch( region, "GovCloud" = regions$GovCloud, "US_EAST_1" = regions$US_EAST_1, "US_EAST_2" = regions$US_EAST_2, "US_WEST_1" = regions$US_WEST_1, "US_WEST_2" = regions$US_WEST_2, "EU_WEST_1" = regions$EU_WEST_1, "EU_WEST_2" = regions$EU_WEST_2, "EU_WEST_3" = regions$EU_WEST_3, "EU_CENTRAL_1" = regions$EU_CENTRAL_1, "AP_SOUTH_1" = regions$AP_SOUTH_1, "AP_SOUTHEAST_1" = regions$AP_SOUTHEAST_1, "AP_SOUTHEAST_2" = regions$AP_SOUTHEAST_2, "AP_NORTHEAST_1" = regions$AP_NORTHEAST_1, "AP_NORTHEAST_2" = regions$AP_NORTHEAST_2, "SA_EAST_1" = regions$SA_EAST_1, "CN_NORTH_1" = regions$CN_NORTH_1, "CN_NORTHWEST_1" = regions$CN_NORTHWEST_1, "CA_CENTRAL_1" = regions$CA_CENTRAL_1, "DEFAULT_REGION" = regions$DEFAULT_REGION ) -> region provider <- J("com.amazonaws.auth.DefaultAWSCredentialsProviderChain") client <- J("com.amazonaws.services.athena.AmazonAthenaAsyncClientBuilder") my_client <- client$standard() my_client <- my_client$withRegion(region) my_client <- my_client$withCredentials(provider$getInstance()) my_client <- my_client$build() queryExecutionContext <- .jnew("com.amazonaws.services.athena.model.QueryExecutionContext") context <- queryExecutionContext$withDatabase(schema) result <- .jnew("com.amazonaws.services.athena.model.ResultConfiguration") result$setOutputLocation(results_bucket) startQueryExecutionRequest <- .jnew("com.amazonaws.services.athena.model.StartQueryExecutionRequest") startQueryExecutionRequest$setQueryString(ugly_query) startQueryExecutionRequest$setQueryExecutionContext(context) startQueryExecutionRequest$setResultConfiguration(result) res <- my_client$startQueryExecutionAsync(startQueryExecutionRequest) r <- res$get() qex_id <- r$getQueryExecutionId() list( qex_id = qex_id, results_bucket = results_bucket ) } #' Gather the results of an asynchronous query #' #' @md #' @param async_result the result of a call to `collect_async()` #' @return a data frame (tibble) or `NULL` if the query results are not ready yet gather_results <- function(async_result) { if (bucket_exists(sprintf("%s/%s", async_result$results_bucket, async_result$qex_id))) { readr::read_csv( get_object(sprintf("%s/%s.csv", async_result$results_bucket, async_result$qex_id)) ) } else { message("Results are not in the designated bucket.") return(NULL) } }
Now, we give it a go:
# Setup the credentials you're using use_credentials("personal") # load the AWS Java SDK classes awsjavasdk::load_sdk() # necessary for Simba ODBC and the async query ops aws_region <- "us-east-1" athena_schema <- "sampledb" athena_results_bucket <- "s3://aws-athena-query-results-redacted" # connect to Athena and the sample database DBI::dbConnect( odbc::odbc(), driver = "/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib", Schema = athena_schema, AwsRegion = aws_region, AuthenticationType = "IAM Profile", AwsProfile = "personal", S3OutputLocation = athena_results_bucket ) -> con # the sample table in the sample db/schema elb_logs <- tbl(con, "elb_logs") # create your dplyr chain. This one is small so I don't incur charges # collect_async() MUST be the LAST item in the dplyr chain. elb_logs %>% filter(requestip == "253.89.30.138") %>% collect_async( schema = athena_schema, region = aws_region, results_bucket = athena_results_bucket ) -> async_result async_result ## $qex_id ## [1] "d5fe7754-919b-47c5-bd7d-3ccdb1a3a414" ## ## $results_bucket ## [1] "s3://aws-athena-query-results-redacted" # For long queries we can wait a bit but the function will tell us if the results # are there or not. gather_results(async_result) ## Parsed with column specification: ## cols( ## timestamp = col_datetime(format = ""), ## elbname = col_character(), ## requestip = col_character(), ## requestport = col_integer(), ## backendip = col_character(), ## backendport = col_integer(), ## requestprocessingtime = col_double(), ## backendprocessingtime = col_double(), ## clientresponsetime = col_double(), ## elbresponsecode = col_integer(), ## backendresponsecode = col_integer(), ## receivedbytes = col_integer(), ## sentbytes = col_integer(), ## requestverb = col_character(), ## url = col_character(), ## protocol = col_character() ## ) ## # A tibble: 1 x 16 ## timestamp elbname requestip requestport backendip backendport ## < chr> < chr> < int> < chr> < int> ## 1 2014-09-29 03:24:38 lb-demo 253.89.30.138 20159 253.89.30.138 8888 ## # ... with 10 more variables: requestprocessingtime < dbl>, backendprocessingtime < dbl>, ## # clientresponsetime < dbl>, elbresponsecode < int>, backendresponsecode < int>, ## # receivedbytes < int>, sentbytes < int>, requestverb < chr>, url < chr>, protocol < chr>
If you do try this out and end up needing to tweak it, feedback on what you had to do (via the comments) would be greatly appreciated.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.