Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Series of Apache Spark posts:
- Dec 01: What is Apache Spark
- Dec 02: Installing Apache Spark
- Dec 03: Getting around CLI and WEB UI in Apache Spark
- Dec 04: Spark Architecture – Local and cluster mode
- Dec 05: Setting up Spark Cluster
- Dec 06: Setting up IDE
- Dec 07: Starting Spark with R and Python
- Dec 08: Creating RDD files
- Dec 09: RDD Operations
- Dec 10: Working with data frames
- Dec 11: Working with packages and spark DataFrames
- Dec 12: Spark SQL
- Dec 13: Spark SQL Bucketing and partitioning
- Dec 14: Spark SQL query hints and executions
- Dec 15: Introduction to Spark Streaming
When working with Spark Streaming from file based ingestion, user must predefine the schema. This will require not only better performance but consistent data ingest for streaming data. There is always possibility to set the spark.sql.streaming.schemaInference
to true to enable Spark to infer schema on read or automatically.
Folder structure
Data partitioning is can be optimized for data discovery, especially when storing binary files are done in a key=value manner. This listing will recurse and will also help Spark to read the path of directories. These directories make the partitioning schema and must be static (or available) for the time of query execution
Folder structure usually follow the index, that can also be optimized based on key=value pair. For example:
/Sales/masterdata/customer/year=2021/month=12/Day=15
or any relevant schema structure that follows organization of data. In example, we have shown date partitioning, but it can also be e.g. geography:
/Sales/masterdata/customer/region=Osrednja/ZIP=1000/municipality=Bežigrad
And because the dataframes are not checked at compile time (but checked at runtime), we can improve the directory structure significantly. And normally, we would be aiming to convert the untyped (or semi-structured) streaming DataFrames to typed (structured) streaming Datasets using method to define DataFrame as static.
Working with Dataframes
Input Sources
Dataframes for structural streaming can be created using DataStreamReader
(with Scala, R, Python and Java).The method SparkSession.readstream() or read.stream() you can create a static Dataframe with additional details on the source. For the source, Spark Streaming can read files formats as TXT, CSV, JSON, ORC, Parquet. Also it can read data from Kafka and data from socket connections.
Basic operations in R
Let’s set the connection and start working with streaming dataset:
sparkR.session() schemaFile <- structType(structField("name", "string"), structField("age", "integer"), structField("device", "integer"), structField("timestamp","Timestamp")) csvDF <- read.stream("csv", path = "/sample/customer/customer.csv", schema = schemaFile, sep = ";")
After loading, we can proceed with operations:
#Selecting a subset select(where(csvDF, "age > 10"), "device") # Running count for each group in column device count(groupBy(csvDF, "device"))
With Streaming data, what we normally want is to get a aggregated statistics over a period or window time. To do so in R, using both groupBy()
and window()
function, to expresses aggregated windowed results:
# Group the data by window and word and compute the count of each group windowedCounts <- count( groupBy( csvDF, window(csvDF$timestamp, "10 minutes", "5 minutes"), csvDF$age))
Basic operations in Python
And let’s connect with Python to streaming dataset:
spark = SparkSession() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer").add("device","integer").add("Timestamp","Timestamp") csvDF = spark \ .readStream \ .option("sep", ";") \ .schema(userSchema) \ .csv("/sample/customer/")
And repeat same simple functions with Python:
#Selecting a subset csvDF.select("device").where("age > 10") # Running count of the number of updates for each device type csvDF.groupBy("device").count()
And create analysis over aggregated window functions:
windowedCounts = csvDF.groupBy( window(csvDF.timestamp, "10 minutes", "5 minutes"), csvDF.age ).count()
In both languages – Python and R, datasets should be transformed into streamed structured dataset and then can be plugged to different aggregated and window functions.
Tomorrow we will look into Watermarking and joins.
Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers
Happy Spark Advent of 2021!
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.