Alleviating AWS Athena Aggravation with Asynchronous Assistance

[This article was first published on R – rud.is, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

I’ve blogged about how to use Amazon Athena with R before and if you are a regular Athena user, you’ve likely run into a situation where you prepare a dplyr chain, fire off a collect() and then wait.

And, wait.

And, wait.

And, wait.

Queries that take significant processing time or have large result sets do not play nicely with the provided ODBC and JDBC drivers. This means “hung” R sessions and severe frustration, especially when you can login to the AWS Athena console and see that the results are right there!!

I’ve been crafting SQL by hand or using sql_render() by hand to avoid this (when I remember to) but finally felt sufficient frustration to craft a better way, provided you can install and run rJava-based code (it’s 2018 and that still is not an easy given on many systems unfortunately).

There are two functions below:

  • collect_async(), and
  • gather_results()

The collect_async() function is designed to be used like collect() but uses Athena components from the AWS SDK for Java to execute the SQL query behind the dplyr chain asynchronously. The companion function gather_results() takes the object created by collect_async() and checks to see if the results are ready. If if they are, it will use the aws.s3 package to download them. Personally, I’d just aws s3 sync ... from the command line vs use the aws.s3 package but that’s not everyone’s cup of tea.

Once I figure out the best package API for this I’ll add it to the metis package. There are many AWS idiosyncrasies that need to be accounted for and I’d rather ship this current set of functions via the blog so folks can use it (and tweak it to their needs) before waiting for perfection.

Here’s the code:

library(rJava)
library(awsjavasdk)
library(aws.signature)
library(aws.s3)
library(odbc)
library(tidyverse)
library(dbplyr)

#' Collect Amazon Athena query results asynchronously
#' 
#' Long running Athena queries and Athena queries with large result
#' sets can seriously stall a `dplyr` processing chain due to poorly
#' implemented ODBC and JDBC drivers. The AWS SDK for Athena has 
#' methods that support submitting a query asynchronously for "batch"
#' processing. All Athena resutls are stored in CSV files in S3 and it's
#' easy to use the R `aws.s3` package to grab these or perform an
#' `aws s3 sync ...` operation on the command line.
#' 
#' @md
#' @param obj the `dplyr` chain
#' @param schema Athena schema (usually matches the `Schema` parameter to the 
#'        Simba ODBC connection)
#' @param region Your AWS region. All lower case with dashes (usually matches
#'        the `AwsRegion` parameter to the Simba ODBC connection)
#' @param results_bucket the S3 results bucket where query results are stored 
#'        (usually matches the `S3OutputLocation` parameter to the Simba ODBC
#'        connection)
#' @return a `list` with the query execution ID and the S3 bucket. This object
#'         is designed to be passed to the companion `gather_results()` if you
#'         want to use the `aws.s3` package to retrieve the results. Otherwise,
#'         sync the file however you want using the query execution id.
#' @note You may need to change up the authentication provider depending on how 
#'       you use credentials with Athena
collect_async <- function(obj, schema, region, results_bucket) {

  ugly_query <- as.character(sql_render(obj))

  region <- toupper(region)
  region <- gsub("-", "_", region, fixed=TRUE)

  regions <- J("com.amazonaws.regions.Regions")

  available_regions <- grep("^[[:upper:][:digit:]_]+$", names(regions), value=TRUE)
  if (!region %in% available_regions) stop("Invalid region.", call.=FALSE)

  switch(
    region,
    "GovCloud" = regions$GovCloud,
    "US_EAST_1" = regions$US_EAST_1,
    "US_EAST_2" = regions$US_EAST_2,
    "US_WEST_1" = regions$US_WEST_1,
    "US_WEST_2" = regions$US_WEST_2,
    "EU_WEST_1" = regions$EU_WEST_1,
    "EU_WEST_2" = regions$EU_WEST_2,
    "EU_WEST_3" = regions$EU_WEST_3,
    "EU_CENTRAL_1" = regions$EU_CENTRAL_1,
    "AP_SOUTH_1" = regions$AP_SOUTH_1,
    "AP_SOUTHEAST_1" = regions$AP_SOUTHEAST_1,
    "AP_SOUTHEAST_2" = regions$AP_SOUTHEAST_2,
    "AP_NORTHEAST_1" = regions$AP_NORTHEAST_1,
    "AP_NORTHEAST_2" = regions$AP_NORTHEAST_2,
    "SA_EAST_1" = regions$SA_EAST_1,
    "CN_NORTH_1" = regions$CN_NORTH_1,
    "CN_NORTHWEST_1" = regions$CN_NORTHWEST_1,
    "CA_CENTRAL_1" = regions$CA_CENTRAL_1,
    "DEFAULT_REGION" = regions$DEFAULT_REGION
  ) -> region

  provider <- J("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
  client <- J("com.amazonaws.services.athena.AmazonAthenaAsyncClientBuilder")

  my_client <- client$standard()
  my_client <- my_client$withRegion(region)
  my_client <- my_client$withCredentials(provider$getInstance())
  my_client <- my_client$build()

  queryExecutionContext <- .jnew("com.amazonaws.services.athena.model.QueryExecutionContext")
  context <- queryExecutionContext$withDatabase(schema)
  result <- .jnew("com.amazonaws.services.athena.model.ResultConfiguration")
  result$setOutputLocation(results_bucket)

  startQueryExecutionRequest <- .jnew("com.amazonaws.services.athena.model.StartQueryExecutionRequest")
  startQueryExecutionRequest$setQueryString(ugly_query)
  startQueryExecutionRequest$setQueryExecutionContext(context)
  startQueryExecutionRequest$setResultConfiguration(result)

  res <- my_client$startQueryExecutionAsync(startQueryExecutionRequest)

  r <- res$get()
  qex_id <- r$getQueryExecutionId()

  list(
    qex_id = qex_id,
    results_bucket = results_bucket
  )

}

#' Gather the results of an asynchronous query
#'
#' @md
#' @param async_result the result of a call to `collect_async()`
#' @return a data frame (tibble) or `NULL` if the query results are not ready yet
gather_results <- function(async_result) {
  if (bucket_exists(sprintf("%s/%s", async_result$results_bucket, async_result$qex_id))) {
    readr::read_csv(
      get_object(sprintf("%s/%s.csv", async_result$results_bucket, async_result$qex_id))
    )
  } else {
    message("Results are not in the designated bucket.")
    return(NULL)
  }
}

Now, we give it a go:

# Setup the credentials you're using
use_credentials("personal")

# load the AWS Java SDK classes
awsjavasdk::load_sdk()

# necessary for Simba ODBC and the async query ops
aws_region <- "us-east-1"
athena_schema <- "sampledb"
athena_results_bucket <- "s3://aws-athena-query-results-redacted"

# connect to Athena and the sample database
DBI::dbConnect(
  odbc::odbc(),
  driver = "/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib",
  Schema = athena_schema,
  AwsRegion = aws_region,
  AuthenticationType = "IAM Profile",
  AwsProfile = "personal",
  S3OutputLocation = athena_results_bucket
) -> con

# the sample table in the sample db/schema
elb_logs <- tbl(con, "elb_logs")

# create your dplyr chain. This one is small so I don't incur charges
# collect_async() MUST be the LAST item in the dplyr chain.
elb_logs %>%
  filter(requestip == "253.89.30.138") %>%
  collect_async(
    schema = athena_schema,
    region = aws_region,
    results_bucket = athena_results_bucket
  ) -> async_result

async_result
## $qex_id
## [1] "d5fe7754-919b-47c5-bd7d-3ccdb1a3a414"
## 
## $results_bucket
## [1] "s3://aws-athena-query-results-redacted"

# For long queries we can wait a bit but the function will tell us if the results
# are there or not.

gather_results(async_result)
## Parsed with column specification:
## cols(
##   timestamp = col_datetime(format = ""),
##   elbname = col_character(),
##   requestip = col_character(),
##   requestport = col_integer(),
##   backendip = col_character(),
##   backendport = col_integer(),
##   requestprocessingtime = col_double(),
##   backendprocessingtime = col_double(),
##   clientresponsetime = col_double(),
##   elbresponsecode = col_integer(),
##   backendresponsecode = col_integer(),
##   receivedbytes = col_integer(),
##   sentbytes = col_integer(),
##   requestverb = col_character(),
##   url = col_character(),
##   protocol = col_character()
## )
## # A tibble: 1 x 16
##   timestamp           elbname requestip     requestport backendip     backendport
##                 <chr>   <chr>               <int> <chr>               <int>
## 1 2014-09-29 03:24:38 lb-demo 253.89.30.138       20159 253.89.30.138        8888
## # ... with 10 more variables: requestprocessingtime <dbl>, backendprocessingtime <dbl>,
## #   clientresponsetime <dbl>, elbresponsecode <int>, backendresponsecode <int>,
## #   receivedbytes <int>, sentbytes <int>, requestverb <chr>, url <chr>, protocol <chr>

If you do try this out and end up needing to tweak it, feedback on what you had to do (via the comments) would be greatly appreciated.

To leave a comment for the author, please follow the link and comment on their blog: R – rud.is.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)