Advent of 2020, Day 22 – Using Spark SQL and DataFrames in Azure Databricks
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Series of Azure Databricks posts:
- Dec 01: What is Azure Databricks
- Dec 02: How to get started with Azure Databricks
- Dec 03: Getting to know the workspace and Azure Databricks platform
- Dec 04: Creating your first Azure Databricks cluster
- Dec 05: Understanding Azure Databricks cluster architecture, workers, drivers and jobs
- Dec 06: Importing and storing data to Azure Databricks
- Dec 07: Starting with Databricks notebooks and loading data to DBFS
- Dec 08: Using Databricks CLI and DBFS CLI for file upload
- Dec 09: Connect to Azure Blob storage using Notebooks in Azure Databricks
- Dec 10: Using Azure Databricks Notebooks with SQL for Data engineering tasks
- Dec 11: Using Azure Databricks Notebooks with R Language for data analytics
- Dec 12: Using Azure Databricks Notebooks with Python Language for data analytics
- Dec 13: Using Python Databricks Koalas with Azure Databricks
- Dec 14: From configuration to execution of Databricks jobs
- Dec 15: Databricks Spark UI, Event Logs, Driver logs and Metrics
- Dec 16: Databricks experiments, models and MLFlow
- Dec 17: End-to-End Machine learning project in Azure Databricks
- Dec 18: Using Azure Data Factory with Azure Databricks
- Dec 19: Using Azure Data Factory with Azure Databricks for merging CSV files
- Dec 20: Orchestrating multiple notebooks with Azure Databricks
- Dec 21: Using Scala with Spark Core API in Azure Databricks
Yesterday we took a closer look into Spark Scala with notebooks in Azure Databricks and how to handle data engineering. Today we will look into the Spark SQL and DataFrames that is using Spark Core API.
“Spark SQL is a spark module for structured data processing and data querying. It provides programming abstraction called DataFrames and can also serve as distributed SQL query engine. It enables unmodified Hadoop Hive queries to run up to 100x faster on existing deployments and data. It also provides powerful integration with the rest of the Spark ecosystem (e.g.: integrating SQL query processing with machine learning).” (Apache Spark Tutorial).
Start your Azure Databricks workspace and create new Notebook. I named mine as: Day22_SparkSQL and set the language: SQL. Now let’s explore the functionalities of Spark SQL.
1.Loading Data
We will load data from /databricks-datasets using Spark SQL, R and Python languages. The CSV dataset will be data_geo.csv in the following folder:
%scala display(dbutils.fs.ls("/databricks-datasets/samples/population-vs-price"))
1.1. Loading using Python
%python data = spark.read.csv("/databricks-datasets/samples/population-vs-price/data_geo.csv", header="true", inferSchema="true")
And materialize the data using to create a view with name data_geo_py:
%python data.createOrReplaceTempView("data_geo_py")
And run the following SQL Statement:
SELECT * FROM data_geo_py LIMIT 10
1.2. Loading using SQL
DROP TABLE IF EXISTS data_geo; CREATE TABLE data_geo USING com.databricks.spark.csv OPTIONS (path "/databricks-datasets/samples/population-vs-price/data_geo.csv", header "true", inferSchema "true")
And run the following SQL Statement:
SELECT * FROM data_geo LIMIT 10
1.3. Loading using R
%r library(SparkR) data_geo_r <- read.df("/databricks-datasets/samples/population-vs-price/data_geo.csv", source = "csv", header="true", inferSchema = "true") registerTempTable(data_geo_r, "data_geo_r")
Cache the results:
CACHE TABLE data_geo_r
And run the following SQL Statement:
SELECT * FROM data_geo_r LIMIT 10
All three DataFrames are the same (unless additional modification are done; like: dropping rows with null values, etc).
2.Viewing DataFrame
Viewing DataFrame is done by simple SELECT statement, the ANSI SQL Standard. E.g.:
SELECT City ,`2014 Population estimate` ,`2015 median sales price` ,`State Code` AS State_Code FROM data_geo WHERE `State Code` = 'AZ';
You can also combine all three DataFrames that were imported using three different languages (SQL, R, Python).
SELECT *, 'data_geo_SQL' AS dataset FROM data_geo UNION SELECT *, 'data_geo_Python' AS dataset FROM data_geo_py UNION SELECT *, 'data_geo_R' AS dataset FROM data_geo_r ORDER BY `2014 rank`, dataset LIMIT 12
3.Running SQL
3.1. Date and Time functions
SELECT CURRENT_TIMESTAMP() AS now ,date_format(CURRENT_TIMESTAMP(), "L") AS Month_ ,date_format(CURRENT_TIMESTAMP(), "LL") AS Month_LeadingZero ,date_format(CURRENT_TIMESTAMP(), "y") AS Year_ ,date_format(CURRENT_TIMESTAMP(), "d") AS Day_ ,date_format(CURRENT_TIMESTAMP(), "E") AS DayOFTheWeek ,date_format(CURRENT_TIMESTAMP(), "H") AS Hour ,date_format(CURRENT_TIMESTAMP(), "m") AS Minute ,date_format(CURRENT_TIMESTAMP(), "s") AS Second
3.2. Built-in functions
SELECT COUNT(*) AS Nof_rows ,SUM(`2014 rank`) AS Sum_Rank ,AVG(`2014 rank`) AS Avg_Rank ,SUM(CASE WHEN `2014 rank` > 150 THEN 1 ELSE -1 END) AS Sum_case ,STD(`2014 rank`) as stdev ,MAX(`2014 rank`) AS Max_Val ,MIN(`2014 rank`) AS Min_Val ,KURTOSIS (`2014 rank`) as Kurt ,SKEWNESS(`2014 rank`) AS Skew ,CAST(SKEWNESS(`2014 rank`) AS INT) AS Skew_cast FROM data_geo
3.3. SELECT INTO
You can also store results using SELECT INTO statement, with table being predifined:
DROP TABLE IF EXISTS tmp_data_geo; CREATE TABLE tmp_data_geo (`2014 rank` INT, State VARCHAR(64), `State Code` VARCHAR(2)); INSERT INTO tmp_data_geo FROM data_geo SELECT `2014 rank` ,State ,`State Code` WHERE `2014 rank` >= 50 AND `2014 rank` < 60 AND `State Code` = "C"; SELECT * FROM tmp_data_geo;
3.4. JOIN
SELECT dg1.`State Code` ,dg1.`2014 rank` ,dg2.`State Code` ,dg2.`2014 rank` FROM data_geo AS dg1 JOIN data_geo AS dg2 ON dg1.`2014 rank` = dg2.`2014 rank`+1 AND dg1.`State Code` = dg2.`State Code` WHERE dg1.`State Code` = "CA"
3.5. Common Table Expressions
WITH cte AS ( SELECT * FROM data_geo WHERE `2014 rank` >= 50 AND `2014 rank` < 60 ) SELECT * FROM cte;
3.6. Inline tables
SELECT * FROM VALUES ("WA", "Seattle"), ("WA", "Tacoma"), ("WA", "Spokane") AS data(StateName, CityName)
3.7. EXISTS
WITH cte AS ( SELECT * FROM data_geo WHERE `2014 rank` >= 50 AND `2014 rank` < 60 ) SELECT * FROM data_geo as dg WHERE EXISTS (SELECT * FROM cte WHERE cte.city = dg.city) AND NOT EXISTS (SELECT * FROM cte WHERE cte.city = dg.city AND `2015 median sales price` IS NULL )
3.8.Window functions
SELECT City ,State ,RANK() OVER (PARTITION BY State ORDER BY `2015 median sales price`) AS rank ,`2015 median sales price` AS MedianPrice FROM data_geo WHERE `2015 median sales price` IS NOT NULL;
4. Exploring the visuals
Results of a SQL SELECT statements that are returned as a table, can also be visualised. Given the following SQL Statement:
SELECT `State Code`, `2015 median sales price` FROM data_geo
in the result cell you can select the plot icon and pick Map.
Furthermore, using “Plot Options…” you can change the settings of the variables on the graph, aggregations and data series.
With additional query:
SELECT `State Code` , `2015 median sales price` FROM data_geo_SQL ORDER BY `2015 median sales price` DESC;
you can also create a box-plot; again selecting the desired plot type.
There are also many other visuals available and much more SQL statements to explore and feel free to go a step further and beyond this blogpost.
Tomorrow we will explore Streaming with Spark Core API in Azure Databricks.
Complete set of code and SQL notebooks (including HTML) will be available at the Github repository.
Happy Coding and Stay Healthy!
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.