Besides the challenges met during configuring the PySpark & GraphFrames environment, also running my first example in Spyder IDE proved to be a bit more challenging than expected. Starting from an example provided by the DataBricks documentation on GraphFrames, I had to add 3 more lines to establish the connection of the Spark cluster, respectively to deactivate the context (only one SparkContext can be active per Java VM).
The following code displays the vertices and edges, respectively the in and out degrees for a basic graph.
from graphframes import * from pyspark.context import SparkContext from pyspark.sql.session import SparkSession #establishing a connection to the Spark cluster (code added) sc = SparkContext('local').getOrCreate() spark = SparkSession(sc) # Create a Vertex DataFrame with unique ID column "id" v = spark.createDataFrame([ ("a", "Alice", 34), ("b", "Bob", 36), ("c", "Charlie", 30), ("d", "David", 29), ("e", "Esther", 32), ("f", "Fanny", 36), ("g", "Gabby", 60) ], ["id", "name", "age"]) # Create an Edge DataFrame with "src" and "dst" columns e = spark.createDataFrame([ ("a", "b", "friend"), ("b", "c", "follow"), ("c", "b", "follow"), ("f", "c", "follow"), ("e", "f", "follow"), ("e", "d", "friend"), ("d", "a", "friend"), ("a", "e", "friend") ], ["src", "dst", "relationship"]) # Create a GraphFrame g = GraphFrame(v, e) g.vertices.show() g.edges.show() g.inDegrees.show() g.outDegrees.show() #stopping the active context (code added) sc.stop()
Output:
|
|
|
|
Notes:
Without the last line, running a second time the code will halt with the following error:
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local) created by __init__ at D:\Work\Python\untitled0.py:4
Loading the same data from a csv file involves a small overhead as the schema needs to be defined explicitly. The same output from above should be provided by the following code:
from graphframes import * from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from pyspark.sql.types import * #establishing a connection to the Spark cluster (code added) sc = SparkContext('local').getOrCreate() spark = SparkSession(sc) nodes = [ StructField("id", StringType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ] edges = [ StructField("src", StringType(), True), StructField("dst", StringType(), True), StructField("relationship", StringType(), True) ] v = spark.read.csv(r"D:\data\nodes.csv", header=True, schema=StructType(nodes)) e = spark.read.csv(r"D:\data\edges.csv", header=True, schema=StructType(edges)) # Create a GraphFrame g = GraphFrame(v, e) g.vertices.show() g.edges.show() g.inDegrees.show() g.outDegrees.show() #stopping the active context (code added) sc.stop()
The 'nodes.csv' file has the following content:
id,name,age
"a","Alice",34
"b","Bob",36
"c","Charlie",30
"d","David",29
"e","Esther",32
"f","Fanny",36
"g","Gabby",60
The 'edges.csv' file has the following content:
src,dst,relationship
"a","b","friend"
"b","c","follow"
"c","b","follow"
"f","c","follow"
"e","f","follow"
"e","d","friend"
"d","a","friend"
"a","e","friend"
Note:
There should be no spaces between values (e.g. "a", "b"), otherwise the results might deviate from expectations.
Now, one can go and test further operations on the graph thus created:
#filtering edges gl = g.edges.filter("relationship = 'follow'").sort("src") gl.show() print("number edges: ", gl.count()) #filtering vertices #gl = g.vertices.filter("age >= 30 and age<40").sort("id") #gl.show() #print("number vertices: ", gl.count()) # relationships involving edges and vertices #motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)") #motifs.show()
Happy coding!
No comments:
Post a Comment