[This article was first published on R-posts.com, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Intro
Pipeline concept is definitely not new for software world, Unix pipe operator (|) links two tasks putting the output of one as the input of the other. In machine learning solutions it is pretty much usual to apply several transformation and manipulation to datasets, or to different portions or sample of the same dataset (from classic test/train slices to samples taken for cross-validation procedure). In these cases, pipelines are definitely useful to define a sequence of tasks chained together to define a complete process, which in turn simplifies the operation of the ml solution. In addition, in BigData environment, it is possible to apply the “laziness” of execution to the entire process in order to make it more scalable and robust, therefore no surprise to see pipeline implemented in Spark machine learning library and R API available, by SparklyR package, to leverage the construct. Pipeline component in Spark are basically of two types :- transformer: since dataframe usually need to undergo several kinds of changes column-wide, row-wide or even single value-wide, transformers are the component meant to deliver these transformations. Typically a transformer has a table or dataframe as input and delivers a table/dataframe as output. Sparks, through MLlib, provide a set of feature’s transformers meant to address most common transformations needed;
- estimator: estimator is the component which delivers a model, fitting an algorithm to train data. Indeed fit() is the key method for an estimator and produces, as said a model which is a transformer. Leveraging the parallel processing which is provided by Spark, it is possible to run several estimators in parallel on different training dataset in order to find the best solution (or even to avoid overfitting issue). ML algorithms are basically a set of Estimators, they build a rich set of machine learning (ML) common algorithms, available from MLlib. This is a library of algorithms meant to be scalable and run in a parallel environment. Starting from the 2.0 release of Spark, the RDD-based library is in maintenance mode (the RDD-based APIs are expected to be removed in 3.0 release) whereas the mainstream development is focused on supporting dataframes. In MLlib features are to be expressed with labeledpoints, which means numeric vectors for features and predictors.Pipelines of transformers are, even for this reason, extremely useful to operationalize an ML solutions Spark-based. For additional details on MLlib refer to Apache Spark documentation
The dataset
For this example, the dataset comes from UCI – Machine Learning Repository Irvine, CA: University of California, School of Information and Computer Science. “Adults Income” dataset reports individual’s annual income results from various factors. The annual income will be our label (it is divided into two classes: <=50K and >50K) and there are 14 features, for each individual, we can leverage to explore the possibility in predicting income level. For additional detail on “adults dataset” refer to the UCI machine learning repository http://www.cs.toronto.edu/~delve/data/adult/desc.html.Scala code
As said, we’ll show how we can use scala API to access pipeline in MLlib, therefore we need to include references to classes we’re planning to use in our example and to start a spark session :import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, VectorIndexer} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.sql.SparkSession
then we’ll read dataset and will start to manipulate data in order to prepare for the pipeline. In our example, we’ll get data out of local repository (instead of referring to an eg. HDFS or Datalake repository, there are API – for both scala and R – which allows the access to these repositories as well). We’ll leverage this upload activity also to rename some columns, in particular, we’ll rename the “income” column as “label” since we’ll use this a label column in our logistic regression algorithm.
We’ll do some data clean up basically recoding the “working class” and “marital” columns, in order to reduce the number of codes and we’ll get rid of rows for which “occupation”, “working class”” (even recoded) and “capital gain” are not available. For first two column the dataset has the “?” value instead of “NA”, for capital gain there’s the 99999 value which will be filtered out. To recode “working class” and “marital” columns we’ll use UDF functions which in turn are wrappers of the actual recoding functions. To add a new column to the (new) dataframe we’ll use the “withColumn” method which will add “new_marital” and “new_workclass” to the startingdata dataframe. Afterwards, we’ll filter out all missing values and we’ll be ready to build the pipeline.
In our example, we’re going to use 12 features, 7 are categorical variables and 5 are numeric variables. The feature’s array we’ll use to fit the model will be the results of merging two arrays, one for categorical variables and the second one for numeric variables. Before building the categorical variables array, we need to transform categories to indexes using transformers provided by spark.ml, even the label must be transformed to an index. Our pipeline then will include 7 pipeline stages to transform categorical variables, 1 stage to transform the label, 2 stages to build categorical and numeric arrays, and the final stage to fit the logistic model. Finally, we’ll build an 11-stages pipeline.
To transform categorical variables into index we’ll use “Stringindexer” Transformer. StringIndexer encodes a vector of string to a column of non-negative indices, ranging from 0 to the number of values. The indices ordered by label frequencies, so the most frequent value gets index 0. For each variable, we need to define the input column and the output column which we’ll use as input for other transformer or evaluators. Finally it is possible to define the strategy to handle unseen labels (possible when you use the Stringindexer to fit a model and run the fitted model against a test dataframe) through setHandleInvalid method , in our case we simply put “skip” to tell Stringindexer we want to skip unseen labels (additional details are available in MLlib documentation).
In addition to Transfomer and Estimator provided by spark.ml package, it is possible to define custom Estimator and Transformers. As an example we’ll see how to define a custom transformer aimed at recoding “marital status” in our dataset (basically we’ll do the same task we have already seen, implementing it with a custom transformer; for additional details on implementing customer estimator and transformer see the nice article by H.Karau. To define a custom transformer, we’ll define a new scala class, columnRecoder, which extends the Transformer class, we’ll override the transformSchemamethod to map the correct type of the new column we’re going to add with the transformation and we’ll implement the transform method which actually does the recoding in our dataset. A possible implementation is :
Once defined as a transformer, we can use it in our pipeline as the first stage.
A second step in building our pipeline is to assemble categorical indexes in a single vector, therefore many categorical indexes are put all together in a single vector through the VectorAssemblertransformer. This VectorAssembler will deliver a single column feature which will be, in turn, transformed to indexes by VectorIndexer transformer to deliver indexes within the “catFeatures” column:
For numeric variables we need just to assemble columns with VectorAssembler, then we’re ready to put these two vectors (one for categorical variables, the other for numeric variables) together in a single vector.
We have now label and features ready to build the logistic regression model which is the final component of our pipeline. We can also set some parameters for the model, in particular, we can define the threshold (by default set to 0.5) to make the decision between label values, as well as the max number of iterations for this algorithm and a parameter to tune regularization.
When all stages of the pipeline are ready, we just need to define the pipeline component itself, passing as an input parameter an array with all defined stages:
Now the pipeline component, which encompasses a number of transformations as well as the classification algorithm, is ready; to actually use it we supply a train dataset to fit the model and then a test dataset to evaluate our fitted model. Since we have defined a pipeline, we’ll be sure that both, train and test datasets, will undergo the same transformations, therefore, we don’t have to replicate the process twice.
We need now to define train and test datasets.In our dataset, label values are unbalanced being the “more than 50k USD per year” value around the 25% of the total, in order to preserve the same proportion between label values we’ll subset the original dataset based on label value, obtaining a low-income dataset and an high-income dataset. We’ll split both dataset for train (70%) and test (30%), then we’ll merge back the two “train”” and the two “test” datasets and we’ll use resulting “train” dataset as input for our pipeline:
Once the pipeline is trained, we can use the data_model for testing against the test dataset, calculate the confusion matrix and evaluate the classifier metrics :
//load data source from local repository val csv = spark.read.option("inferSchema","true") .option("header", "true").csv("...\yyyy\xxxx\adult.csv") val data_start = csv.select(($"workclass"),($"gender"),($"education"),($"age"), ($"marital-status").alias("marital"), ($"occupation"),($"relationship"), ($"race"),($"hours-per-week").alias("hr_per_week"), ($"native-country").alias("country"), ($"income").alias("label"),($"capital-gain").alias("capitalgain"), ($"capital-loss").alias("capitalloss"),($"educational-num").alias("eduyears")).toDF
// recoding marital status and working class, adding a new column def newcol_marital(str1:String): String = { var nw_str = "noVal" if ((str1 == "Married-spouse-absent") | (str1 =="Married-AF-spouse") | (str1 == "Married-civ-spouse")) {nw_str = "Married" } else if ((str1 == "Divorced") | (str1 == "Never-married" ) | (str1 == "Separated" ) | (str1 == "Widowed")) {nw_str = "Nonmarried" } else { nw_str = str1} return nw_str } val udfnewcol = udf(newcol_marital _) val recodeddata = data_start.withColumn("new_marital", udfnewcol('marital')) def newcol_wkclass(str1:String): String = { var nw_str = "noVal" if ((str1 == "Local-gov") | (str1 =="Federal-gov") | (str1 == "State-gov")) {nw_str = "Gov" } else if ((str1 == "Self-emp-not-inc") | (str1 == "Self-emp-inc" )) {nw_str = "Selfemployed" } else if ((str1 == "Never-worked") | (str1 == "Without-pay")) {nw_str = "Notemployed" } else { nw_str = str1} return nw_str } val udfnewcol = udf(newcol_wkclass _) val startingdata = recodeddata.withColumn("new_workclass", udfnewcol('workclass')) // remove missing data val df_work01 = startingdata.na.drop("any") val df_work = startingdata.filter("occupation <> '?' and capitalgain < 99999 and new_workclass <> '?' and country <> '?' ")
// define stages val new_workclassIdx = new StringIndexer().setInputCol("new_workclass") .setOutputCol("new_workclassIdx").setHandleInvalid("skip") val genderIdx = new StringIndexer().setInputCol("gender") .setOutputCol("genderIdx").setHandleInvalid("skip") val maritalIdx = new StringIndexer().setInputCol("new_marital") .setOutputCol("maritalIdx").setHandleInvalid("skip") val occupationIdx = new StringIndexer().setInputCol("occupation") .setOutputCol("occupationIdx").setHandleInvalid("skip") val relationshipIdx = new StringIndexer().setInputCol("relationship") .setOutputCol("relationshipIdx").setHandleInvalid("skip") val raceIdx = new StringIndexer().setInputCol("race") .setOutputCol("raceIdx").setHandleInvalid("skip") val countryIdx = new StringIndexer().setInputCol("country") .setOutputCol("countryIdx").setHandleInvalid("skip") val labelIdx = new StringIndexer().setInputCol("label") .setOutputCol("labelIdx").setHandleInvalid("skip")
import org.apache.spark.ml.Transformer class columnRecoder(override val uid: String) extends Transformer { final val inputCol= new Param[String](this, "inputCol", "input column") final val outputCol = new Param[String](this, "outputCol", "output column") def setInputCol(value: String): this.type = set(inputCol, value) def setOutputCol(value: String): this.type = set(outputCol, value) def this() = this(Identifiable.randomUID("columnRecoder")) def copy(existingParam: ParamMap): columnRecoder = {defaultCopy(existingParam)} override def transformSchema(schema: StructType): StructType = { // Check inputCol type val idx = schema.fieldIndex($(inputCol)) val field = schema.fields(idx) if (field.dataType != StringType) { throw new Exception(s"Input type ${field.dataType} type mismatch: String expected") } // The return field schema.add(StructField($(outputCol),StringType, false)) } val newcol_recode = new marital_code() private def newcol_recode(str1: String): String = { var nw_str = "noVal" if ((str1 == "Married-spouse-absent") | (str1 =="Married-AF-spouse") | (str1 == "Married-civ-spouse")) {nw_str = "Married" } else if ((str1 == "Divorced") | (str1 == "Never-married" ) | (str1 == "Separated" ) | (str1 == "Widowed")) {nw_str = "Nonmarried" } else { nw_str = str1} nw_str } private def udfnewcol = udf(newcol_recode.recode(_)) def transform(df: Dataset[_]): DataFrame = { df.withColumn($(outputCol), udfnewcol(df($(inputCol)))) } }
// define stages val new_marital = new columnRecoder().setInputCol("marital") .setOutputCol("new_marital") val new_workclassIdx = new StringIndexer().setInputCol("new_workclass") .setOutputCol("new_workclassIdx").setHandleInvalid("skip") val genderIdx = new StringIndexer().setInputCol("gender") .setOutputCol("genderIdx").setHandleInvalid("skip") val maritalIdx = new StringIndexer().setInputCol("new_marital") .setOutputCol("maritalIdx").setHandleInvalid("skip") .......
// cat vector for categorical variables val catVect = new VectorAssembler() .setInputCols(Array("new_workclassIdx", "genderIdx", "catVect","maritalIdx", "occupationIdx","relationshipIdx","raceIdx","countryIdx")) .setOutputCol("cat01Features") val catIdx = new VectorIndexer() .setInputCol(catVect.getOutputCol) .setOutputCol("catFeatures")
// numeric vector for numeric variable val numVect = new VectorAssembler() .setInputCols(Array("age","hr_per_week","capitalgain","capitalloss","eduyears")) .setOutputCol("numFeatures") val featVect = new VectorAssembler() .setInputCols(Array("catFeatures", "numFeatures")) .setOutputCol("features")
When all stages of the pipeline are ready, we just need to define the pipeline component itself, passing as an input parameter an array with all defined stages:
val lr = new LogisticRegression().setLabelCol("labelIdx").setFeaturesCol("features") .setThreshold(0.33).setMaxIter(10).setRegParam(0.2) val pipeline = new Pipeline().setStages(Array(new_marital,new_workclassIdx, labelIdx,maritalIdx,occupationIdx, relationshipIdx,raceIdx,genderIdx, countryIdx,catVect, catIdx, numVect,featVect,lr))
// split betwen train and test val df_low_income = df_work.filter("label == '<=50K'") val df_high_income = df_work.filter("label == '>50K'") val splits_LI = df_low_income.randomSplit(Array(0.7, 0.3), seed=123) val splits_HI = df_high_income.randomSplit(Array(0.7, 0.3), seed=123) val df_work_train = splits_LI(0).unionAll(splits_HI(0)) val df_work_test = splits_LI(1).unionAll(splits_HI(1)) // fitting the pipeline val data_model = pipeline.fit(df_work_train)
// generate prediction val data_prediction = data_model.transform(df_work_test) val data_predicted = data_prediction.select("features", "prediction", "label","labelIdx") // confusion matrix val tp = data_predicted.filter("prediction == 1 AND labelIdx == 1").count().toFloat val fp = data_predicted.filter("prediction == 1 AND labelIdx == 0").count().toFloat val tn = data_predicted.filter("prediction == 0 AND labelIdx == 0").count().toFloat val fn = data_predicted.filter("prediction == 0 AND labelIdx == 1").count().toFloat val metrics = spark.createDataFrame(Seq( ("TP", tp), ("FP", fp), ("TN", tn), ("FN", fn), ("Precision", tp / (tp + fp)), ("Recall", tp / (tp + fn)))).toDF("metric", "value") metrics.show()
R code and SparklyR
Now we’ll try to replicate the same example we just saw in R, more precisely, working with the SparklyR package. We’ll use the developer version of SparklyR (as you possibly know, there’s an interesting debate on the best API to access Apache Spark resources from R. For those that wants to know more about https://github.com/rstudio/sparklyr/issues/502, http://blog.revolutionanalytics.com/2016/10/tutorial-scalable-r-on-spark.html). We need to install it from github before connecting with Spark environment. In our case Spark is a standalone instance running version 2.2.0, as reported in the official documentation for SparklyR, configuration parameters can be set through spark_config() function, in particular, spark_config() provides the basic configuration used by default for spark connection. To change parameters it’s possible to get default configuration via spark_connection() then change parameters as needed ( here’s the link for additional details to run sparklyR on Yarn cluster).devtools::install_github("rstudio/sparklyr") library(sparklyr) library(dplyr) sc <- spark_connect(master = "local",spark_home = "...\Local\spark",version="2.2.0")
income_table <- spark_read_csv(sc,"income_table","...\adultincome\adult.csv") income_table <- select(income_table,"workclass","gender","eduyears"="educationalnum", "age","marital"="maritalstatus","occupation","relationship", "race","hr_per_week"="hoursperweek","country"="nativecountry", "label"="income","capitalgain","capitalloss") # recoding marital status and workingclass income_table <- income_table %>% mutate(marital = ifelse(marital == "Married-spouse-absent" | marital == "Married-AF-spouse" | marital == "Married-civ-spouse","married","nonMarried")) income_table <- income_table %>% mutate(workclass = ifelse(workclass == "Local-gov"| workclass == "Federal-gov" | workclass == "State_gov", "Gov",ifelse(workclass == "Self-emp-inc" | workclass == "Self-emp-not-inc","Selfemployed",ifelse(workclass=="Never-worked" | workclass == "Without-pay","Notemployed",workclass))))
- functions prefixed with sdf_ generally access the Scala Spark DataFrame API directly (as opposed to the dplyr interface which uses Spark SQL) to manipulate dataframes;
- functions prefixed with ft_ are functions to manipulate and transform features. Pipeline transformers and estimators belong to this group of functions;
- functions prefixed with ml_ implement algorithms to build machine learning workflow. Even pipeline instance is provided by ml_pipeline() which belongs to these functions.
income_pipe <- ml_pipeline(sc,uid="income_pipe") income_pipe <-ft_string_indexer(income_pipe,input_col="workclass",output_col="workclassIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="gender",output_col="genderIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="marital",output_col="maritalIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="occupation",output_col="occupationIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="race",output_col="raceIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="country",output_col="countryIdx") income_pipe <- ft_string_indexer(income_pipe,input_col="label",output_col="labelIdx") array_features <- c("workclassIdx","genderIdx","maritalIdx", "occupationIdx","raceIdx","countryIdx","eduyears", "age","hr_per_week","capitalgain","capitalloss") income_pipe <- ft_vector_assembler(income_pipe, input_col=array_features, output_col="features")
# putting in pipe the logistic regression evaluator income_pipe <- ml_logistic_regression(income_pipe, features_col = "features", label_col = "labelIdx", family= "binomial",threshold = 0.33, reg_param=0.2, max_iter=10L)
# data split # dealing with label inbalance df_low_income = filter(income_table,income_table$label == "<=50K") df_high_income = filter(income_table,income_table$label == ">50K") splits_LI <- sdf_partition(df_low_income, test=0.3, train=0.7, seed = 7711) splits_HI <- sdf_partition(df_high_income,test=0.3,train=0.7,seed=7711) df_test <- sdf_bind_rows(splits_LI[[1]],splits_HI[[1]]) df_train <- sdf_bind_rows(splits_LI[[2]],splits_HI[[2]]) df_model <- ml_fit(income_pipe,df_train)
df_model$stages df_model$stages[[9]]$coefficients
df_prediction <- ml_predict(df_model,df_test) df_results <- select(df_prediction,"prediction","labelIdx","probability")
# calculating confusion matrix df_tp <- filter(df_results,(prediction == 1 && labelIdx == 1)) df_fp <- filter(df_results,(prediction ==1 && labelIdx == 0)) df_tn <- filter(df_results,(prediction == 1 && labelIdx == 0)) df_fn <- filter(df_results,(prediction == 1 && labelIdx == 1)) tp <- df_tp %>% tally() %>% collect() %>% as.integer() fp <- df_fp %>% tally() %>% collect() %>% as.integer() tn <- df_tn %>% tally() %>% collect() %>% as.integer() fn <- df_fn %>% tally() %>% collect() %>% as.integer() df_precision <- (tp/(tp+fp)) df_recall <- (tp/(tp+fn)) df_accuracy = (tp+tn)/(tp+tn+fp+fn) c_AUC <- ml_binary_classification_eval(df_prediction, label_col = "labelIdx", prediction_col = "prediction", metric_name = "areaUnderROC")
Conclusion
As we have seen, pipelines are a useful mechanism to assemble and serialize transformation in order to make it repeatable for different sets of data. Are then a simple way for fitting and evaluating models through train/test datasets, but also suitable to run the same sequence of transformer/estimator in parallel over different nodes of the Spark cluster (i.e. to find the best parameter set). Moreover, a powerful API to deal with pipelines in R is available by SparklyR package, which provides in addition a comprehensive set of functions to leverage the ML spark package (here’s a link for a guide to deploy SparlyR in different cluster environment). Finally a support to run R code distributed across a Spark cluster has been to SparklyR with the spark_apply() function (https://spark.rstudio.com/guides/distributed-r/) which makes evenmore interesting the possibility to leverage pipelines in R in ditributed environment for analytical solutions.To leave a comment for the author, please follow the link and comment on their blog: R-posts.com.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.