Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
This note describes a useful replyr
tool we call a "join controller" (and is part of our "R and Big Data" series, please see here for the introduction, and here for one our big data courses).
When working on real world predictive modeling tasks in production, the ability to join data and document how you join data is paramount. There are very strong reasons to organize production data in something resembling one of the Codd normal forms. However, for machine learning we need a fully denormalized form (all columns populated into a single to ready to go row, no matter what their provenance, keying, or stride).
This is not an essential difficulty as in relational data systems moving between these forms can be done by joining, and data stores such as PostgreSQL or Apache Spark are designed to provide powerful join capabilities.
However there are some inessential (in that they can be avoided) but substantial difficulties in managing and documenting long join plans. It is not uncommon to have to join 7 or more tables to get an analysis ready. This at first seems trivial, but once you add in the important tasks of narrowing tables (eliminating columns not used later) and disambiguating column names (ensuring unique names after the join) the documentation and work can become substantial. Specifying the join process directly in R
code leads to hard to manage, hard to inspect, and hard to share spaghetti code (even when using a high-level data abstraction such as dplyr
).
If you have done non-trivial work with production data you have seen this pain point.
The fix is to apply the following principles:
- Anything long, repetitive, and tedious should not be done directly.
- Moving specification out of code and into data is of huge benefit.
- A common special case can be treated separately, as that documents intent.
To supply such a solution the development version of replyr
now supplies a item called a "join controller" under the method replyr::executeLeftJoinPlan()
.
This is easiest to explain through a concrete example, which is what we will do here.
First let’s load the needed packages.
# load packages suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr")
## [1] '0.7.0'
library("replyr") packageVersion("replyr")
## [1] '0.4.1'
Now let’s load some notional example data. For our example we have:
- One primary table of measurements (called "
meas1
") keyed byid
anddate
. - A fact table that maps
id
s to patient names (called "names
", and keyed byid
). - A second table of additional measurements (called "
meas2
") That we consider "nice to have." That is: rows missing from this table should not censor-outmeas1
rows, and additional rows found here should not be included in the analysis.
The data is given below:
# load notional example data my_db <- dplyr::src_sqlite(":memory:", create = TRUE) # example data replyr_copy_to(my_db, data.frame(id= c(1,1,2,2), date= c(1,2,1,2), weight= c(200, 180, 98, 120), height= c(60, 54, 12, 14)), 'meas1_train') replyr_copy_to(my_db, data.frame(id= seq_len(length(letters)), name= letters, stringsAsFactors=FALSE), 'names_facts') replyr_copy_to(my_db, data.frame(pid= c(2,3), date= c(2,2), weight= c(105, 110), width= 1), 'meas2_train')
An important (and very neglected) step in data science tasks is documenting roles of tables, especially their key-structure (which we also call "stride" in the sense it describes how you move from row to row). replyr::tableDescription()
is a function that builds an initial description of the tables. (Note: replyr::tableDescription()
is misspelled in the current release version of replyr
, we have fixed this in dev).
# map from abstract names to realized names tables <- list('meas1' = 'meas1_train', 'names' = 'names_facts', 'meas2' = 'meas2_train') # get the initial description of table defs got <- lapply(names(tables), function(ni) { # get table reference from source by concrete name ti <- tbl(my_db, tables[[ni]]) # map abstract name to reference tableDescription(ni, ti) }) tDesc <- bind_rows(got)
tDesc
is essentially a slightly enriched version of the data handle concordance described in "Managing Spark data handles in R." We can take a quick look at the stored simplified summaries:
print(tDesc %>% select(tableName, sourceClass, isEmpty))
## # A tibble: 3 x 3 ## tableName sourceClass isEmpty ## <chr> <chr> <lgl> ## 1 meas1 src_dbi, src_sql, src FALSE ## 2 names src_dbi, src_sql, src FALSE ## 3 meas2 src_dbi, src_sql, src FALSE
print(tDesc$columns)
## [[1]] ## [1] "id" "date" "weight" "height" ## ## [[2]] ## [1] "id" "name" ## ## [[3]] ## [1] "pid" "date" "weight" "width"
print(tDesc$colClass)
## [[1]] ## id date weight height ## "numeric" "numeric" "numeric" "numeric" ## ## [[2]] ## id name ## "integer" "character" ## ## [[3]] ## pid date weight width ## "numeric" "numeric" "numeric" "numeric"
# add names for printing names(tDesc$keys) <- tDesc$tableName print(tDesc$keys)
## $meas1 ## id date weight height ## "id" "date" "weight" "height" ## ## $names ## id name ## "id" "name" ## ## $meas2 ## pid date weight width ## "pid" "date" "weight" "width"
tableDescription()
is a table that holds the following:
tableName
: the abstract name we wish to use for this table.handle
: the actual data handle (either adata.frame
or a handle to a remote data source such asPostgreSQL
orSpark
). Notice in the example it is of class "tbl_sqlite
" or "tbl_dbi
" (depending on the version ofdplyr
).columns
: the list of columns in the table.keys
: a named list mapping abstract key names to table column names. The set of keys together is supposed to uniquely identify rows.colClasses
: a vector of column classes of the underlying table.sourceClass
: the declared class of the data source.isEmpty
: an advisory column indicating if any rows were present when we looked.
The tableName
is "abstract" in that it is only used to discuss tables (i.e., it is only ever used as row identifier in this table). The data is actually found through the handle. This is critical in processes where we may need to run the same set of joins twice on different sets of tables (such as building a machine learning model, and then later applying the model to new data).
The intent is to build a detailed join plan (describing order, column selection, and column re-naming) from the tDesc
table. We can try this with the supplied function buildJoinPlan()
, which in this case tells us our table descriptions are not quite ready to specify a join plan:
tryCatch( buildJoinPlan(tDesc), error = function(e) {e} )
## <simpleError in buildJoinPlan(tDesc): replyr::buildJoinPlan produced plan issue: key col(s) ( name ) not contained in result cols of previous table(s) for table: names>
In the above the keys
column is wrong in that it claims every column of each table is a table key. The join plan builder noticed this is unsupportable in that when it comes time to join the "names
" table not all of the columns that are claimed to be "names
" keys are already known from previous tables. That is: the "names$name
" column is present in the earlier tables, and so can not be joined on. We can’t check everything, but the join controller tries to "front load" or encounter as many configuration inconsistencies early- before any expensive steps have been started.
The intent is: the user should edit the "tDesc
" keys column and share it with partners for criticism. In our case we declare the primary of the measurement tables to be PatientID
and MeasurementDate
, and the primary key of the names table to be PatientID
. Notice we do this by specifying named lists or vectors mapping desired key names to names actually used in the tables.
# declare keys (and give them consistent names) tDesc$keys[[1]] <- c(PatientID= 'id', MeasurementDate= 'date') tDesc$keys[[2]] <- c(PatientID= 'id') tDesc$keys[[3]] <- c(PatientID= 'pid', MeasurementDate= 'date') print(tDesc$keys)
## $meas1 ## PatientID MeasurementDate ## "id" "date" ## ## $names ## PatientID ## "id" ## ## $meas2 ## PatientID MeasurementDate ## "pid" "date"
The above key mapping could then be circulated to partners for comments and help. Notice since this is not R
code we can easily share it with non-R
users for comment and corrections.
It is worth confirming the keying as as expected (else some rows can reproduce in bad ways during joining). This is a potentially expensive operation, but it can be done as follows:
keysAreUnique(tDesc)
## meas1 names meas2 ## TRUE TRUE TRUE
Once we are satisfied with our description of tables we can build a join plan. The join plan is an ordered sequence of left-joins.
In practice, when preparing data for predictive analytics or machine learning there is often a primary table that has exactly the set of rows you want to work over (especially when encountering production star-schemas. By starting joins from this table we can perform most of our transformations using only left-joins. To keep things simple we have only supplied a join controller for this case. This is obviously not the only join pattern needed; but it is the common one.
A join plan can now be built from our table descriptions:
# build the column join plan columnJoinPlan <- buildJoinPlan(tDesc) print(columnJoinPlan %>% select(tableName, sourceColumn, resultColumn, isKey, want))
## # A tibble: 10 x 5 ## tableName sourceColumn resultColumn isKey want ## <chr> <chr> <chr> <lgl> <lgl> ## 1 meas1 id PatientID TRUE TRUE ## 2 meas1 date MeasurementDate TRUE TRUE ## 3 meas1 weight meas1_weight FALSE TRUE ## 4 meas1 height height FALSE TRUE ## 5 names id PatientID TRUE TRUE ## 6 names name name FALSE TRUE ## 7 meas2 pid PatientID TRUE TRUE ## 8 meas2 date MeasurementDate TRUE TRUE ## 9 meas2 weight meas2_weight FALSE TRUE ## 10 meas2 width width FALSE TRUE
Essentially the join plan is an unnest of the columns from the table descriptions. This was anticipated in our article "Managing Spark Data Handles".
We then alter the join plan to meet our needs (either through R
commands or by exporting the plan to a spreadsheet and editing it there).
Only columns named in the join plan with a value of TRUE
in the want
column are kept in the join (columns marked isKey
must also have want
set to TRUE). This is very useful as systems of record often have very wide tables (with hundreds of columns) of which we only want a few columns for analysis.
For example we could decide to exclude the width
column by either dropping the row or setting the row’s want
column to FALSE
.
Since we have edited the join plan it is a good idea to both look at it and also run it through the inspectDescrAndJoinPlan()
to look for potential inconsistencies.
# decide we don't want the width column columnJoinPlan$want[columnJoinPlan$resultColumn=='width'] <- FALSE # double check our plan if(!is.null(inspectDescrAndJoinPlan(tDesc, columnJoinPlan))) { stop("bad join plan") } print(columnJoinPlan %>% select(tableName, sourceColumn, resultColumn, isKey, want))
## # A tibble: 10 x 5 ## tableName sourceColumn resultColumn isKey want ## <chr> <chr> <chr> <lgl> <lgl> ## 1 meas1 id PatientID TRUE TRUE ## 2 meas1 date MeasurementDate TRUE TRUE ## 3 meas1 weight meas1_weight FALSE TRUE ## 4 meas1 height height FALSE TRUE ## 5 names id PatientID TRUE TRUE ## 6 names name name FALSE TRUE ## 7 meas2 pid PatientID TRUE TRUE ## 8 meas2 date MeasurementDate TRUE TRUE ## 9 meas2 weight meas2_weight FALSE TRUE ## 10 meas2 width width FALSE FALSE
The join plan is the neglected (and often missing) piece of documentation key to non-trivial data science projects. We strongly suggest putting it under source control, and circulating it to project partners for comment.
As a diagram the key structure of the join plan looks like the following (produced by DiagrammeR::mermaid(makeJoinDiagramSpec(columnJoinPlan))
):
Note the diagramming ability is currently only in the dev version of replyr
. These diagrams are kind of fun. For instance, here is a more complicated one from the help(makeJoinDiagramSpec)
examples:
Once you have a good join plan executing it is a one-line command with executeLeftJoinPlan()
(once you have set up a temp name manager as described in "Managing intermediate results when using R/sparklyr"):
# manage the temp names as in: # http://www.win-vector.com/blog/2017/06/managing-intermediate-results-when-using-rsparklyr/ tempNameGenerator <- makeTempNameGenerator("extmps") # execute the left joins results <- executeLeftJoinPlan(tDesc, columnJoinPlan, verbose= TRUE, tempNameGenerator= tempNameGenerator)
## [1] "start meas1 Tue Jun 13 15:30:25 2017" ## [1] " rename/restrict meas1" ## [1] " 'table_meas1_present' = 'table_meas1_present'" ## [1] " 'PatientID' = 'id'" ## [1] " 'MeasurementDate' = 'date'" ## [1] " 'meas1_weight' = 'weight'" ## [1] " 'height' = 'height'" ## [1] " res <- meas1" ## [1] "done meas1 Tue Jun 13 15:30:25 2017" ## [1] "start names Tue Jun 13 15:30:25 2017" ## [1] " rename/restrict names" ## [1] " 'table_names_present' = 'table_names_present'" ## [1] " 'PatientID' = 'id'" ## [1] " 'name' = 'name'" ## [1] " res <- left_join(res, names, by = c( 'PatientID' ))" ## [1] "done names Tue Jun 13 15:30:25 2017" ## [1] "start meas2 Tue Jun 13 15:30:25 2017" ## [1] " rename/restrict meas2" ## [1] " 'table_meas2_present' = 'table_meas2_present'" ## [1] " 'PatientID' = 'pid'" ## [1] " 'MeasurementDate' = 'date'" ## [1] " 'meas2_weight' = 'weight'" ## [1] " res <- left_join(res, meas2, by = c( 'PatientID', 'MeasurementDate' ))" ## [1] "done meas2 Tue Jun 13 15:30:26 2017"
executeLeftJoinPlan()
takes both a table description table (tDesc
, keyed by tableName
) and the join plan (columnJoinPlan
, keyed by tableName
and sourceColumn
).
The separation of concerns is strong: all details about the intended left-join sequence are taken from the columnJoinPlan
, and only the mapping from abstract table names to tables (or table references/handles) is taken from tDesc
. This is deliberate design and makes running the same join plan on two different sets of tables (say once for model construction, and later for model application) very easy. tDesc
is a runtime entity (as it binds names to live handles, so can’t be serialized: you must save the code steps to produce it; note only the columns tableName
and handle
are used so there is no point re-editing the keys
column after running tableDescription()
on new tables) and columnJoinPlan
is a durable entity (has only information, not handles).
Basically you:
- Build simple procedures to build up
tDesc
. - Work hard to get a good
columnJoinPlan
. - Save
columnJoinPlan
in source control and re-load it (not re-build it) when you need it. - Re-build new
tDesc
compatible with the savedcolumnJoinPlan
later when you need to work with tables (note only the columnstableName
andhandle
are used during join execution, so you only need to create those).
As always: the proof is in the pudding. We should look at results:
print(results %>% select(PatientID, MeasurementDate, meas1_weight, height, name, table_meas2_present, meas2_weight), width= Inf)
## # Source: lazy query [?? x 7] ## # Database: sqlite 3.11.1 [:memory:] ## PatientID MeasurementDate meas1_weight height name table_meas2_present meas2_weight ## <dbl> <dbl> <dbl> <dbl> <chr> <dbl> <dbl> ## 1 1 1 200 60 a 0 NA ## 2 1 2 180 54 a 0 NA ## 3 2 1 98 12 b 0 NA ## 4 2 2 120 14 b 1 105
Notice the joiner added extra columns of the form table_*_present
to show which tables had needed rows. This lets us tell different sorts of missingness apart (value NA
as there was no row to join, versus value NA
as a NA
came from a row) and appropriately coalesce
results easily. These columns are also very good for collecting statistics on data coverage, and in business settings often are very useful data quality and data provenance features which can often be directly included in machine learning models.
Also notice the join plan is very specific: every decision (such as what order to operate and how to disambiguate column names) is already explicitly set in the plan. The executor is then allowed to simply move through the tables left-joining in the order the table names first appear in the plan.
Having to "join a bunch of tables to get the data into simple rows" is a common step in data science. Therefore you do not want this to be a difficult and undocumented task. By using a join controller you essentially make the documentation the executable specification for the task.
# cleanup temps <- tempNameGenerator(dumpList= TRUE) for(ti in temps) { replyr_drop_table_name(my_db, ti) } rm(list=ls()) gc(verbose= FALSE)
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.