Advent of 2021, Day 20 – Spark GraphX processing
This article is originally published at https://tomaztsql.wordpress.com
Series of Apache Spark posts:
- Dec 01: What is Apache Spark
- Dec 02: Installing Apache Spark
- Dec 03: Getting around CLI and WEB UI in Apache Spark
- Dec 04: Spark Architecture – Local and cluster mode
- Dec 05: Setting up Spark Cluster
- Dec 06: Setting up IDE
- Dec 07: Starting Spark with R and Python
- Dec 08: Creating RDD files
- Dec 09: RDD Operations
- Dec 10: Working with data frames
- Dec 11: Working with packages and spark DataFrames
- Dec 12: Spark SQL
- Dec 13: Spark SQL Bucketing and partitioning
- Dec 14: Spark SQL query hints and executions
- Dec 15: Introduction to Spark Streaming
- Dec 16: Dataframe operations for Spark streaming
- Dec 17: Watermarking and joins for Spark streaming
- Dec 18: Time windows for Spark streaming
- Dec 19: Data engineering for Spark streaming
GraphX is Spark’s API component for graph and graph-parallel computations. GraphX uses Spark RDD and builds a graph abstraction on top of RDD. Graph abstraction is a directed multigraph with properties of edges and vertices.
GraphX supports computations, and exposes set of fundamental operators (subgraph, joinVertices, aggregateMessages), as well it includes a growing collection of graph algorithms for simpler ETL and analytical tasks.
Spark GraphX enables following features:
– Flexibility: giving the same RDD data both graphs and collections, transform and join graphs with RDDs efficiently and write custom iterative graph algorithms using the Google’s Pregel API,
– Computational speed: almost the fastest specialised graph processing systems with not only retaining flexibility, but also fault-tolerance
– Graph algorithms: gives popular algorithms to solve popular business cases. These algorithms are page rank, connected components, label propagation, SVD++, strongly connected components, and triangle count.
To get started, use the following Scala commands:
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.graphx.util.GraphGenerators
Property graph
Property graph is directed multigraph with defined objects attached to vertices and edges. A directed multigraph is a directed graph with multiple parallel edges sharing the same source and destination vertex.Supporting parallel edges simplifies modelling scenarios where there can be multiple relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a unique 64-bit long identifier (VertexId
). GraphX does not impose any ordering constraints on the vertex identifiers.
The property graph is parameterized over the vertex (VD
) and edge (ED
) types. Defining a property graph:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
Since graph is based on RDD, which are immutable and fault-tolerant, graph can behave the sam way as RDD. The example of property graph can be constructed as following:
val userGraph: Graph[(String, String), String]
Establishing the connection and Spark engine:
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
//Case class constructor to count edges
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
We can also use the SQL script to create a triplet view – join the vertex and edge property using RDD[EdgeTriplet[VD,ED]]
. This join property can be expressed with SQL query:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
Tomorrow we will look into couple of graphX operators.
Compete set of code, documents, notebooks, and all of the materials will be available at the Github repository: https://github.com/tomaztk/Spark-for-data-engineers
Happy Spark Advent of 2021!
Thanks for visiting r-craft.org
This article is originally published at https://tomaztsql.wordpress.com
Please visit source website for post related comments.