Hi team,
Has anyone of you tried/implemented PageRank or Personalised PageRank algorithm for graphs in dgraph?
If so can you please share few pointers to get started with?
Thanks,
Eshwar
Hi team,
Has anyone of you tried/implemented PageRank or Personalised PageRank algorithm for graphs in dgraph?
If so can you please share few pointers to get started with?
Thanks,
Eshwar
import pydgraph
from collections import defaultdict
# Connect to Dgraph
client_stub = pydgraph.DgraphClientStub('localhost:9080')
client = pydgraph.DgraphClient(client_stub)
# Function to query all nodes and their outgoing edges
def get_graph():
query = """
{
nodes(func: has(dgraph.type)) {
uid
out as out
~out {
uid
}
}
}
"""
res = client.txn(read_only=True).query(query)
return res.json
# Function to compute PageRank
def pagerank(graph, max_iterations=100, d=0.85, epsilon=1e-6):
nodes = graph['nodes']
node_dict = {node['uid']: node for node in nodes}
num_nodes = len(nodes)
# Initialize PageRank scores
ranks = {node['uid']: 1.0 / num_nodes for node in nodes}
outlinks = defaultdict(list)
for node in nodes:
for link in node.get('out', []):
outlinks[node['uid']].append(link['uid'])
for _ in range(max_iterations):
new_ranks = {}
for node in nodes:
rank_sum = 0
for inlink in node.get('~out', []):
out_degree = len(outlinks[inlink['uid']])
if out_degree > 0:
rank_sum += ranks[inlink['uid']] / out_degree
new_ranks[node['uid']] = (1 - d) / num_nodes + d * rank_sum
# Check for convergence
if all(abs(new_ranks[k] - ranks[k]) < epsilon for k in ranks):
break
ranks = new_ranks
return ranks
# Main execution
graph_data = get_graph()
page_ranks = pagerank(graph_data)
# Output or use ranks
for node, rank in page_ranks.items():
print(f"Node {node} has PageRank: {rank}")
# Close the client stub
client_stub.close()
Hi,
I’d implemented a pagerank algorithm in the past using the Dgraph Spark connector and GraphX with Scala.
The code connects to a local Dgraph Alpha instance at localhost:9080
and exposes a simple HTTP API (using Akka HTTP) at localhost:8085
, which can be called to run the PageRank computation.
Here’s the implementation:
PageRank.scala
:
package example
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.graphx._
import uk.co.gresearch.spark.dgraph.graphx._
object PageRank {
def calculatePageRank(graph: Graph[VertexProperty, EdgeProperty], tol: Double): String = {
val ranks = graph.pageRank(tol)
val result = ranks.vertices.collect().map {
case (vertexId, rank) => s"Node $vertexId: $rank"
}.mkString("\n")
result
}
}
Main.scala
:
package example
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
object SparkInitializer {
private var spark: SparkSession = _
def getSparkSession(): SparkSession = {
if (spark == null) {
spark = SparkSession.builder()
.appName("dg")
.config("spark.master", "local")
.getOrCreate()
}
spark
}
}
object Main {
def main2(args: Array[String]): Unit = {
implicit val system = ActorSystem("dg-api")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val spark = SparkInitializer.getSparkSession
val target = "localhost:9080"
import uk.co.gresearch.spark.dgraph.graphx._
import org.apache.spark.graphx._
import org.apache.spark._
import org.apache.spark.rdd.RDD
val graph: Graph[VertexProperty, EdgeProperty] = spark.read.option("dgraph.chunksize", 10000).dgraph.graphx(target)
val edges: RDD[Edge[EdgeProperty]] = spark.read.dgraph.edges(target)
val vertices: RDD[(VertexId, VertexProperty)] = spark.read.option("dgraph.chunksize", 10000).dgraph.vertices(target)
println("******************** Num edges: ", edges.count.toString)
println("******************** Num vertices: ", vertices.count.toString)
val tol = 0.0001
val pageRankResult = PageRankAlgorithm.calculatePageRank(graph, tol)
val route =
path("pr") {
get {
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, pageRankResult))
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8085)
println(s"HTTP service is running at http://localhost:8085/")
build.sbt
:
ThisBuild / scalaVersion := "2.12.14"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "dg",
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-graphx" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1",
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.0",
libraryDependencies += "uk.co.gresearch.spark" %% "spark-dgraph-connector" % "0.10.0-3.1"
)
HTH!