What is a Graph?
A Graph is a representation of the directed and undirected graph concept that is taken from mathematics graph theory. It is the abstract data type and a data structure that contains a measurable set of vertices with some unordered pair of vertices for an undirected graph and an ordered set of vertices for a directed set and together these sets are called edge.
What is Apache Spark GraphX?
Apache Spark GraphX is a distributed graph processing framework that is used to process graphs in parallel. It provides a collection of Graph algorithms and builders which are used to analyze the graph tasks easily. GraphX uses the Spark RDD to provides a new Graph abstraction. There is a property graph that has user-defined objects for each property graph.
Apache Spark GraphX Features
Apache Spark GraphX provides the following features.
- PageRank
- Connected Components
- Label Propagation
- SVD++
- Strongly Connected Components
- Triangle Count
1. Flexibility
Apache Spark GraphX is capable of working with graphs and perform the computations on them. Spark GraphX is be used for ETL processing, iterative graph computation, exploratory analysis, and so on. The data can be views as a collection as well as a graph and the transformation and joining of that data can be efficiently performed with Spark RDD.
2. Speed
Apache Spark GraphX provides better performance compared to the fastest graph systems and since it works with Spark so it by default adopts the features of Apache Spark such as fault tolerance, flexibility, ease to use, and so on.
3. Algorithms
Apache Spark GraphX provides the following graph algorithms.
Background on Graph-Parallel Computation
The graph data keeps growing day after day and looking into its importance the new graph processing systems are getting developed. These systems are working on the new technology to perform partition and process graphs distributedly. The Graphics algorithms are getting tested with new systems to provides a faster response compared to the common data processing systems.
Apache Spark GraphX Project Goal
The existing Graph processing system work on the complicated programming model that leads to heavy data movement and duplications. So to overcome, this issue Apache Spark GraphX project is an ally to provide the data-parallel and graph-parallel processing to a single system with one composable API. Apache Spark GraphX API provides the facility to see data as a collection as well as graphs without any duplication or any data movement.
Graph Algorithms
Apache GraphX provides a set of graph algorithms to simplify analytics tasks. These algorithms are contained in the org.apache.spark.graphx.lib package and can be accessed directly as methods on Graph via GraphOps.
1. PageRank Algorithm
The Graphs PageRank Algorithm estimate the usage of every vertex of the Graph and assume that the representation of the edge from u to v is the description of v's. The best example of PageRank is Twitter users in that if a Twitter user is followed by many others, the user will be ranked highly.
Let us understand the PageRank Algorithm with the help of the Social networking program.
PageRank Program Execution
We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.
Datasets & Program Details
Datasets for this program are present under SPARK_HOME directory.
graphx/data/followers.txt
graphx/data/users.txt
Location of PageRankExample.scala program is mentioned below.
/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala
Run PageRankExample.scala program from SPARK_HOME.
./bin/run-example org.apache.spark.examples.graphx.PageRankExample
Let us understand code line by line and then we will execute it on terminal.
- Import Spark's graphx and SQL package.
- Creates a SparkSession.
- Load the edges as a graph.
- Run PageRank.
- Join the ranks with the usernames.
- Print the result.
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SparkSession
object PageRankExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
val ranks = graph.pageRank(0.0001).vertices
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
println(ranksByUsername.collect().mkString("\n"))
spark.stop()
}
}
2. Connected Components Algorithm
In the connected components algorithm, there is a subgraph in which each vertex is accessible to each other vertex by following edges. In this algorithm, every connected component of the graph is tagged with an ID from its lower vertex.
Connected Components Program Execution
We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.
Datasets & Program Details
Datasets for this program are present under SPARK_HOME directory.
graphx/data/followers.txt
graphx/data/users.txt
Location of ConnectedComponentsExample.scala program is mentioned below.
/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala
Run ConnectedComponentsExample.scala program from SPARK_HOME.
./bin/run-example org.apache.spark.examples.graphx.ConnectedComponentsExample
Let us understand code line by line and then we will execute it on terminal.
- Import Spark's graphx and SQL package.
- Creates a SparkSession.
- Load the graph as in the PageRank example.
- Find the connected components.
- Join the connected components with the usernames.
- Print the result.
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SparkSession
object ConnectedComponentsExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
val cc = graph.connectedComponents().vertices
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
println(ccByUsername.collect().mkString("\n"))
spark.stop()
}
}
3. Triangle Counting Algorithm
The Triangle Counting Algorithm provides the two nearby vertices and an edge in between them that present vertex.
Triangle Counting Program Execution
We have two datasets one is for users and another one is for user’s followers. We will compute the PageRank of each user as follows.
Datasets & Program Details
Datasets for this program are present under SPARK_HOME directory.
graphx/data/followers.txt
graphx/data/users.txt
Location of TriangleCountingExample.scala program is mentioned below.
/home/cloudduggu/spark/examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala
Run TriangleCountingExample.scala program from SPARK_HOME.
./bin/run-example org.apache.spark.examples.graphx.TriangleCountingExample
Let us understand code line by line and then we will execute it on terminal.
- Import Spark's graphx and SQL package.
- Creates a SparkSession.
- Arrange the edge in the order of canonical and perform the graph partition to count the triangle.
- Find the triangle count for each vertex.
- Join the triangle counts with the usernames.
- Print the result.
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
import org.apache.spark.sql.SparkSession
object TriangleCountingExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
val triCounts = graph.triangleCount().vertices
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
println(triCountByUsername.collect().mkString("\n"))
spark.stop()
}
}