Site icon R-bloggers

Advent of 2020, Day 29 – Performance tuning for Apache Spark

[This article was first published on R – TomazTsql, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Series of Azure Databricks posts:

Yesterday we looked into powershell automation for Azure Databricks and how one can create, updae or remove the Workspace, resource group and VNet, using deployment templates and parameters.

Today we will address the issues with Spark performance. We have talked several times about different languages available in Spark.

There are indirect and direct performance improvements that can leverage and make your Spark run faster.

1.Choice of Languages

Java versus Scala versus R versus Python (versus HiveSQL)? There is no correct or wrong answer to this choice, but there are some important differences worth mentioning. If you are running single-node machine learning the SparkR is a best option, since it has a massive machine learning ecosystem, and it has a lot of optimised algorithms that can handle this.

If you are running ETL job, Spark and combination of another language (R, Python, Scala) will yield all great results. Spark’s Structured API are consistent in terms of speed and stability across all the languages, so there should be almost none differences. But things get much more interesting when there are UDF (user defined functions) that can not be directly created in Structured API’s. In this case, both R nor Python might not be a good idea, simply because the way Structured API manifests and transforms as RDD. In general, Python makes better choice over R when writing UDF, but probably the best way would be to write UDF in Scala (or Java), making these language jumps easier for the API interpreter.

2.Choice of data presentation

DataFrame versus Datasets versus SQL versus RDD is another choice, yet it is fairly easy. DataFrames, Datasets and SQL objects are all equal in performance and stability (at least from Spar 2.3 and above), meaning that if you are using DataFrames in any language, performance will be the same. Again, when writing custom objects of functions (UDF), there will be some performance degradation with both R or Python, so switching to Scala or Java might be a optimisation.

Rule of thumb is, stick to DataFrames. If you go a layer down to RDD, Spark will make better optimisation and use of it than you will. Spark optimisation engine will write better RDD code than you do and with certainly less effort. And doing so, you might also loose additional Spark optimisation with new releases.

When using RDD, try and use Scala or Java. If this is not possible, and you will be using Python or R extensively, try to use it as little as possible with RDDs. And convert to DataFrames as quickly as possible. Again, if your Spark code, application or data engineering task is not compute intensive, it should be fine, otherwise remember to use Scala or Java or convert to DataFrames. Both Python and R does not handle serialisation of RDD files optimally and runs a lot of data to and from Python or R engine, causing a lot of data movement, traffic and potentially making RDD unstable and making poor performance.

3. Data Storage

Storing data effectively is relevant when data will be read multiple times. If data will be accessed many times, either from different users in organisation or from a single user, all making data analysis, make sure to store it for effective reads. Choosing your storage, choosing the data formats and data partitioning is important.

With numerous file format available, there are some key differences. If you want to optimise your Spark job, data should be stored in best possible format for this. In general, always favour structured, binary types to store your data, especially when you are doing frequent-accessing. Although CSV files look well formatted, they are obnoxiously sparse, can have “edge” cases (missing line breaks, or other delimiters) are painfully slow to parse and hard to partition. Same logic applies to txt and xml formats. Avro are JSON orientated and also sparse and I am not going to even talk about XML format. Spark works best with Apache Parquet stored data. Parquet format stores data in a binary files in column-orientated storage, and also track some statistics of the files, making it possible to skip files not needed for query.

4.Table partitioning and bucketing

Table partitioning is referring to storing files in separate directories based on a partition key (e.g.: date of purchase, VAT number) such as a date field in data stored in these directories. Partitioning will help Spark skip files that are not needed for end result and it will return only the data that is in the range of the key. There are potentials pitfalls to this techniques, one for sure is the size of these subdirectories and how to choose the right granularity.

Bucketing is a process of “pre-partitioning” data to allow better data joins and aggregations operations. This will improve performance, because data can be consistently distributed across partitions as opposed all being in one partition. So if you are repeating a particular query that is joins are frequently performed on a column immediately after read, you can use bucketing to assure that data is well partitioned in accordance with those values. This will prevent shuffle before join and speed up data access.

5.Parallelism

Splittable data formats make Spark job easier to run in parallel. A ZIP or a TAR file can not be split, which means that even if you have 10 files in a ZIP file and 10 cores, only one core can read in that data, because Spark can not parallelise across ZIP file. But using GZIP, BZIP2 or LZ4 are generally splittable if (and only if) they are written by a parallel processing framework like Spark or Hadoop.

In general, Spark will work best when there are two to three tasks per CPU core in your cluster when working especially with large (big) data. You can also tune the spark.default.parallelism property.

6.Number of files

With numerous small files you will for sure pay a price for listing and fetching all the data. There is no golden rule on number of files and the size of the files per directory. But there are some directions. Multiple small files will is going to make a schedule worked harder to locate the data and launch all the read tasks. This can increase not only disk I/O but also network traffic. On the other spectrum, having fewer and larger files can ease the workload from scheduler, but it will make tasks longer to run. Again, a rule of thumb would be, to scope the size of the files in such way, that they contain a few tens of megabyte of data. From Spark 2.2. onward there are also possibilities to to partitioning and sizing optionally.

7. Temporary data storage

Data that will be reused constantly are great candidates for caching. Caching will place a DataFrame, Dataset, SQL table or RDD into temporary storage (either memory or disk) across the executors in your cluster. You might want to cache only dataset that will be used several times later on, but should not be hastened, because it takes also resources such as serialisation, deserialisation and storage costs. You can tell Spark to cache data by using a cache command on DataFrames or RDD’s.

Let’s put this to the test. In Azure Databricks create a new notebook: Day29_tuning and language: Python and attach the notebook to your cluster. Load a sample CSV file:

%python
DF1 = spark.read.format("CSV")
       .option("inferSchema", "true")
       .option("header","true")
       .load("dbfs/databricks-datasets/COVID/covid-19-data/us-states.csv")

The bigger the files, the more evident the difference will be. Create some aggregations:

DF2 = DF1.groupby("state").count().collect()
DF3 = DF1.groupby("date").count().collect()
DF4 = DF1.groupby("cases").count().collect()

After you have tracked the timing, now, let’s cache the DF1:

DF1.cache()
DF1.count()

And rerun the previous command:

DF2 = DF1.groupby("state").count().collect()
DF3 = DF1.groupby("date").count().collect()
DF4 = DF1.groupby("cases").count().collect()

And you should see the difference in results. As mentioned before, the bigger the dataset, the bigger would be time gained back when caching data.

We have touched today couple of performance tuning points and what approach one should take, to improve the work of Spark in Azure Databricks. These are probably the most frequent performance tunings and relatively easy to adjust.

Tomorrow we will look further into Apache Spark.

Complete set of code and the Notebook is available at the Github repository.

Happy Coding and Stay Healthy!

To leave a comment for the author, please follow the link and comment on their blog: R – TomazTsql.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.