Site icon R-bloggers

R in big data pipeline

[This article was first published on Opiate for the masses, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

R is my fabovite tool for research. There are still quite a few things that only R can do or quicker/easier with R.

But unfortunately a lot of people think R becomes less powerful at production stage where you really need to make sure all the functionalities run as you planned against incoming big data.

Personally, what makes R special in the data field is its ability to become friend with many other tools. R can easily ask JavaScript for data visualization, node.js for interactive web app and data pipeline tools/databases for production ready big data system.

In this post I address how to use R stably combined with other tools in big data pipeline without losing its awesomeness.

tl;dr

You’ll find how to include R into luigi, light weight python data workflow management library. You can still use R’s awesomeness in complex big data pipeline while handling big data tasks by other appropriate tools.

I’m not covering luigi basics in this post. Please refer to luigi website if necesary.

Simple pipeline

Here is a very simple example;

and you wanna do this job everyday in an easily debuggable fashion with fancy workflow UI.

That’s super easy, just run

python awesome.py --HiveTask1-timestamp 2015-08-20

This runs python file called awesome.py. --HiveTask1-timestamp 2015-08-20 sets 2015-08-20 as timestamp argument in HiveTask1 class.

Yay, all the above tasks are now connected in the luigi task UI!
Notice our workflow goes from bottom to top.
You can see there is an error in the very first HiveTask2 but this is just by design.

Codes

Let’s take a look at awesome.py:

import luigi
from luigi.file import LocalTarget
from luigi.hive import HiveTableTarget
from luigi.contrib.hdfs import HdfsTarget
import subprocess
import sys

class HiveTask1(luigi.ExternalTask):
    timestamp = luigi.DateParameter(is_global=True)
    def output(self):
        return HdfsTarget('/user/storage/externaljob/timestamp=%s' % self.timestamp.strftime('%Y%m%d'))

class RTask(luigi.Task):
    timestamp = HiveTask1.timestamp
    def requires(self):
        return HiveTask1()
    def run(self):
        subprocess.call('Rscript awesome.R %s' % self.timestamp.strftime('%Y%m%d'),shell=True)
    def output(self):
        return LocalTarget('awesome_is_here_%s.txt' % self.timestamp.strftime('%Y%m%d'))

class HiveTask2(luigi.Task):
    timestamp=HiveTask1.timestamp
    def requires(self):
        return RTask()
    def run(self):
        subprocess.call('Rscript update2hive.R %s' % self.timestamp.strftime('%Y%m%d'),shell=True)
    def output(self):
        return HdfsTarget('/user/hive/warehouse/awesome/timestamp=%s' % self.timestamp.strftime('%Y%m%d'))


if __name__ == '__main__':
    luigi.run()

Basically the file only contains three classes (HiveTask1, RTask and HiveTask2) and their dependency is specified by

def requires(self):
        return TASKNAME

luigi checks dependencies and outputs of each step so it checks existense of;

The most important thing here is using python’s subprocess module with shell=True, so you can run your R file

def run(self):
        subprocess.call('Rscript YOUR_R_FILE',shell=True)

The timestamp argument you gave at the very beginning is stored as global variable timestamp (well, this is not necessarily the coolest option)

timestamp = luigi.DateParameter(is_global=True)

and can be used in other tasks by

timestamp = HiveTask1.timestamp

Moreover, you can pass timestamp to R file by

'Rscript awesome.R %s' % self.timestamp.strftime('%Y%m%d')

Then let’s take a look at awesome.R

library(infuser)
args <- commandArgs(TRUE)
X <- as.character(args[1])
timestamp <- format(as.Date(X,"%Y-%m-%d"),"%Y%m%d")

DO AWESOME THINGS

write.csv(YOUREAWESOME,file=paste0('awesome_is_here_',timestamp,'.txt'),row.names=F)

In R side, you can receive timestamp argument you passed from python by

args <- commandArgs(TRUE)
X <- as.character(args[1])

Similarly, update2hive.R can look like

library(infuser)
args <- commandArgs(TRUE)
X <- as.character(args[1])
temp <- list.files(pattern='TEMPLATE.hql')
Q <- infuse(temp, timestamp=timestamp,verbose=T)
fileConn<-file("FINALHQL.hql")
writeLines(Q, fileConn)
close(fileConn)
system('hive -f FINALHQL.hql')
#this file updates 

One last thing you might like to do is to set a cronjob.

0 1 * * * python awesome.py --HiveTask1-timestamp `date --date='+1 days' +\%Y-\%m-\%d`

This one for example runs the whole thing at 1 a.m everyday.

Conclusion

In this post I’ve shown simple example of how to quickly convert your research R project into solid deployable product. This is not limited to simple R-hive integration but you can let R, spark, databases, stan/bugs, H2O, vowpal wabbit and millions of other data tools dance together as you wish. and you’ll recognize R still plays a central role in the play.

Codes

The full codes are available from here.

R in big data pipeline was originally published by Kirill Pomogajko at Opiate for the masses on August 16, 2015.

To leave a comment for the author, please follow the link and comment on their blog: Opiate for the masses.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.