Site icon R-bloggers

Advent of 2021, Day 16 – Dataframe operations for Spark streaming

[This article was first published on R – TomazTsql, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Series of Apache Spark posts:

When working with Spark Streaming from file based ingestion, user must predefine the schema. This will require not only better performance but consistent data ingest for streaming data. There is always possibility to set the spark.sql.streaming.schemaInference to true to enable Spark to infer schema on read or automatically.

Folder structure

Data partitioning is can be optimized for data discovery, especially when storing binary files are done in a key=value manner. This listing will recurse and will also help Spark to read the path of directories. These directories make the partitioning schema and must be static (or available) for the time of query execution

Folder structure usually follow the index, that can also be optimized based on key=value pair. For example:

/Sales/masterdata/customer/year=2021/month=12/Day=15

or any relevant schema structure that follows organization of data. In example, we have shown date partitioning, but it can also be e.g. geography:

/Sales/masterdata/customer/region=Osrednja/ZIP=1000/municipality=Bežigrad

And because the dataframes are not checked at compile time (but checked at runtime), we can improve the directory structure significantly. And normally, we would be aiming to convert the untyped (or semi-structured) streaming DataFrames to typed (structured) streaming Datasets using method to define DataFrame as static.

Working with Dataframes

Input Sources

Dataframes for structural streaming can be created using DataStreamReader (with Scala, R, Python and Java).The method SparkSession.readstream() or read.stream() you can create a static Dataframe with additional details on the source. For the source, Spark Streaming can read files formats as TXT, CSV, JSON, ORC, Parquet. Also it can read data from Kafka and data from socket connections.

Basic operations in R

Let’s set the connection and start working with streaming dataset:

sparkR.session()
schemaFile <- structType(structField("name", "string"),
                     structField("age", "integer"),
                     structField("device", "integer"),
                     structField("timestamp","Timestamp"))
csvDF <- read.stream("csv", path = "/sample/customer/customer.csv", schema = schemaFile, sep = ";")

After loading, we can proceed with operations:

#Selecting a subset
select(where(csvDF, "age > 10"), "device")

# Running count for each group in column device
count(groupBy(csvDF, "device"))

With Streaming data, what we normally want is to get a aggregated statistics over a period or window time. To do so in R, using both  groupBy() and window() function, to expresses aggregated windowed results:

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      csvDF,
                      window(csvDF$timestamp, "10 minutes", "5 minutes"),
                      csvDF$age))

Basic operations in Python

And let’s connect with Python to streaming dataset:

spark = SparkSession()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer").add("device","integer").add("Timestamp","Timestamp")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/sample/customer/") 

And repeat same simple functions with Python:

#Selecting a subset
csvDF.select("device").where("age > 10")

# Running count of the number of updates for each device type
csvDF.groupBy("device").count()

And create analysis over aggregated window functions:

windowedCounts = csvDF.groupBy(
    window(csvDF.timestamp, "10 minutes", "5 minutes"),
    csvDF.age
).count()

In both languages – Python and R, datasets should be transformed into streamed structured dataset and then can be plugged to different aggregated and window functions.

Tomorrow we will look into Watermarking and joins.

Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers

Happy Spark Advent of 2021!

To leave a comment for the author, please follow the link and comment on their blog: R – TomazTsql.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.