Pagerank implementation in dgraph

Hi team,

Has anyone of you tried/implemented PageRank or Personalised PageRank algorithm for graphs in dgraph?
If so can you please share few pointers to get started with?

Thanks,
Eshwar

2 Likes
import pydgraph
from collections import defaultdict

# Connect to Dgraph
client_stub = pydgraph.DgraphClientStub('localhost:9080')
client = pydgraph.DgraphClient(client_stub)

# Function to query all nodes and their outgoing edges
def get_graph():
    query = """
    {
        nodes(func: has(dgraph.type)) {
            uid
            out as out
            ~out {
                uid
            }
        }
    }
    """
    res = client.txn(read_only=True).query(query)
    return res.json

# Function to compute PageRank
def pagerank(graph, max_iterations=100, d=0.85, epsilon=1e-6):
    nodes = graph['nodes']
    node_dict = {node['uid']: node for node in nodes}
    num_nodes = len(nodes)
    
    # Initialize PageRank scores
    ranks = {node['uid']: 1.0 / num_nodes for node in nodes}
    outlinks = defaultdict(list)
    for node in nodes:
        for link in node.get('out', []):
            outlinks[node['uid']].append(link['uid'])

    for _ in range(max_iterations):
        new_ranks = {}
        for node in nodes:
            rank_sum = 0
            for inlink in node.get('~out', []):
                out_degree = len(outlinks[inlink['uid']])
                if out_degree > 0:
                    rank_sum += ranks[inlink['uid']] / out_degree
            new_ranks[node['uid']] = (1 - d) / num_nodes + d * rank_sum

        # Check for convergence
        if all(abs(new_ranks[k] - ranks[k]) < epsilon for k in ranks):
            break

        ranks = new_ranks

    return ranks

# Main execution
graph_data = get_graph()
page_ranks = pagerank(graph_data)

# Output or use ranks
for node, rank in page_ranks.items():
    print(f"Node {node} has PageRank: {rank}")

# Close the client stub
client_stub.close()

Hi,

I’d implemented a pagerank algorithm in the past using the Dgraph Spark connector and GraphX with Scala.
The code connects to a local Dgraph Alpha instance at localhost:9080 and exposes a simple HTTP API (using Akka HTTP) at localhost:8085, which can be called to run the PageRank computation.

Here’s the implementation:

PageRank.scala:

package example

import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD   
import org.apache.spark.sql.DataFrame
import org.apache.spark.graphx._
import uk.co.gresearch.spark.dgraph.graphx._

object PageRank {
  def calculatePageRank(graph: Graph[VertexProperty, EdgeProperty], tol: Double): String = {
    val ranks = graph.pageRank(tol)
    val result = ranks.vertices.collect().map {
      case (vertexId, rank) => s"Node $vertexId: $rank"
    }.mkString("\n")
    result
  }
}

Main.scala:

package example

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame

object SparkInitializer {
  private var spark: SparkSession = _

  def getSparkSession(): SparkSession = {
    if (spark == null) {
      spark = SparkSession.builder()
        .appName("dg")
        .config("spark.master", "local")
        .getOrCreate()
    }
    spark
  }
}

object Main {
  def main2(args: Array[String]): Unit = {

    implicit val system = ActorSystem("dg-api")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    val spark = SparkInitializer.getSparkSession

    val target = "localhost:9080"
    
    import uk.co.gresearch.spark.dgraph.graphx._
    import org.apache.spark.graphx._
    import org.apache.spark._
    import org.apache.spark.rdd.RDD

    val graph: Graph[VertexProperty, EdgeProperty] = spark.read.option("dgraph.chunksize", 10000).dgraph.graphx(target)
    val edges: RDD[Edge[EdgeProperty]] = spark.read.dgraph.edges(target)
    val vertices: RDD[(VertexId, VertexProperty)] = spark.read.option("dgraph.chunksize", 10000).dgraph.vertices(target)

    println("********************    Num edges: ", edges.count.toString)
    println("********************    Num vertices: ", vertices.count.toString)

    val tol = 0.0001
    val pageRankResult = PageRankAlgorithm.calculatePageRank(graph, tol)
    
    val route =
      path("pr") {
        get {
          complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, pageRankResult))
        }
      
    val bindingFuture = Http().bindAndHandle(route, "localhost", 8085)
    println(s"HTTP service is running at http://localhost:8085/")

build.sbt:


ThisBuild / scalaVersion     := "2.12.14"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "dg",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.1",
    libraryDependencies += "org.apache.spark" %% "spark-graphx" % "3.1.1",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1",
    libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.0",
    libraryDependencies += "uk.co.gresearch.spark" %% "spark-dgraph-connector" % "0.10.0-3.1"
  )

HTH!