Site icon R-bloggers

Advent of 2020, Day 25 – Using Spark GraphFrames in Azure Databricks

[This article was first published on R – TomazTsql, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Series of Azure Databricks posts:

Yesterday we looked into MLlib package for Machine Learning. And oh, boy, there are so many topics to cover. But moving forward. Today we will look into the GraphFrames in Spark for Azure Databricks.

This is the last part of high-level API on Spark engine is the GraphX (legacy) and GraphFrames. GraphFrames is a computation engine built on top of Spark Core API that enables end-users and taking advantages of Spark DataFrames in Python and Scala. It gives you the possibility to transform and build structured data at a massive scale.

In your workspace, create a new notebook, called: Day25_Graph and select language: Python. We will need a ML Databricks cluster or install additional Python packages. I installed additional Python package graphframes using PyPI installer.:

Before we begin, couple of word that I would like to explain:

Edge (edges)- is a link or a line between two nodes or a points in the network.

Vertex (vertices) – is a node or a point that has a relation to another node through a link.

Motif – you can build more complex relationships involving edges and vertices. The following cell finds the pairs of vertices with edges in both directions between them. The result is a DataFrame, in which the column names are given by the motif keys.

Stateful – with combining GraphFrame motif finding with filters on the result where the filters use sequence operations to operate over DataFrame columns. Therefore it is called stateful (vis-a-vis stateless), because it remembers previous state.

Now you can start using the notebook. Import the packages that we will need.

from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *

1.Create a sample dataset

We will create a sample dataset (taken from Databricks website) and will be inserted as a DataFrame.

Vertices:

vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34, "F"),
  ("b", "Bob", 36, "M"),
  ("c", "Charlie", 30, "M"),
  ("d", "David", 29, "M"),
  ("e", "Esther", 32, "F"),
  ("f", "Fanny", 36, "F"),
  ("g", "Gabby", 60, "F"),
  ("h", "Mark", 45, "M"),
  ("i", "Eddie", 60, "M"),
  ("j", "Mandy", 21, "F")
], ["id", "name", "age", "gender"])

Edges:

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend"),
  ("a", "h", "follow"),
  ("a", "i", "follow"),
  ("a", "j", "follow"),
  ("j", "h", "friend"),
  ("i", "c", "follow"),
  ("i", "c", "friend"),
  ("b", "j", "follow"),
  ("d", "h", "friend"),
  ("e", "j", "friend"),
  ("h", "a", "friend")
], ["src", "dst", "relationship"])

Let’s create a graph using vertices and edges:

graph_sample = GraphFrame(vertices, edges)
print(graph_sample)

Or you can achieve same with:

# This example graph also comes with the GraphFrames package.
from graphframes.examples import Graphs
same_graph = Graphs(sqlContext).friends()
print(same_graph)

2.Querying graph

We can display Edges, vertices, incoming or outgoing degrees:

display(graph_sample.vertices)
#
display(graph_sample.edges)
#
display(graph_sample.inDegrees)
#
display(graph_sample.degrees)

And you can even combine some filtering and using aggregation funtions:

youngest = graph_sample.vertices.groupBy().min("age")
display(youngest)

3.Using motif

Using motifs you can build more complex relationships involving edges and vertices. The following cell finds the pairs of vertices with edges in both directions between them. The result is a DataFrame, in which the column names are given by the motif keys.

# Search for pairs of vertices with edges in both directions between them.
motifs = graph_sample.find("(a)-[e]->(h); (h)-[e2]->(a)")
display(motifs)

4.Using Filter

You can filter out the relationship between nodes and adding multiple predicates.

filtered = motifs.filter("(b.age > 30 or a.age > 30) and (a.gender = 'M' and b.gender ='F')")
display(filtered)
# I guess Mark has a crush on Alice, but she just wants to be a follower 

5. Stateful Queries

Stateful queries are set of filters with given sequences, hence the name. You can combine GraphFrame motif finding with filters on the result where the filters use sequence operations to operate over DataFrame columns. Following an example:

# Find chains of 4 vertices.
chain4 = graph_sample.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
def cumFriends(cnt, edge):
  relationship = col(edge)["relationship"]
  return when(relationship == "friend", cnt + 1).otherwise(cnt)

#  (b) Use sequence operation to apply method to sequence of elements in motif.
#   In this case, the elements are the 3 edges.
edges = ["ab", "bc", "cd"]
numFriends = reduce(cumFriends, edges, lit(0))
    
chainWith2Friends2 = chain4.withColumn("num_friends", numFriends).where(numFriends >= 2)
display(chainWith2Friends2)

6.Standard graph algorithms

GraphFrames comes with a number of standard graph algorithms built in:

6.1.BFS – Breadth-first search; applying expression through edges

This is searching from expression through the Graph to expression. This will look from A: person named Esther to B: everyone who is 30 or younger.

paths = graph_sample.bfs("name = 'Esther'", "age < 31")
display(paths)

Same result can be achieved with refined query:

filteredPaths = graph_sample.bfs(
  fromExpr = "name = 'Esther'",
  toExpr = "age < 31",
  edgeFilter = "relationship != 'friend'",
  maxPathLength = 3)
display(filteredPaths)

6.2. Shortest Path

Computes shortest paths to the given set of “landmark” vertices, where landmarks are specified by vertex ID.

results = graph_sample.shortestPaths(landmarks=["a", "d"])
display(results)
#or
results = graph_sample.shortestPaths(landmarks=["a", "d", "h"])
display(results)

Tomorrow we will explore how to connect Azure Machine Learning Services Workspace and Azure Databricks

Complete set of code and the Notebook is available at the Github repository.

Happy Coding and Stay Healthy!

To leave a comment for the author, please follow the link and comment on their blog: R – TomazTsql.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.