Source code for GraphRicciCurvature.FormanRicci
"""
A class to compute the Forman-Ricci curvature of a given NetworkX graph.
"""
# Author:
# Chien-Chun Ni
# http://www3.cs.stonybrook.edu/~chni/
# Reference:
# Forman. 2003. “Bochner’s Method for Cell Complexes and Combinatorial Ricci Curvature.”
# Discrete & Computational Geometry 29 (3). Springer-Verlag: 323–74.
# Sreejith, R. P., Karthikeyan Mohanraj, Jürgen Jost, Emil Saucan, and Areejit Samal. 2016.
# “Forman Curvature for Complex Networks.” Journal of Statistical Mechanics: Theory and Experiment 2016 (6).
# IOP Publishing: 063206.
# Samal, A., Sreejith, R.P., Gu, J. et al.
# "Comparative analysis of two discretizations of Ricci curvature for complex networks."
# Scientific Report 8, 8650 (2018).
import networkx as nx
import math
from .util import logger, set_verbose
[docs]class FormanRicci:
[docs] def __init__(self, G: nx.Graph, weight="weight", method="augmented", verbose="ERROR"):
"""A class to compute Forman-Ricci curvature for all nodes and edges in G.
Parameters
----------
G : NetworkX graph
A given NetworkX graph, unweighted graph only for now, edge weight will be ignored.
weight : str
The edge weight used to compute Ricci curvature. (Default value = "weight")
method : {"1d", "augmented"}
The method used to compute Forman-Ricci curvature. (Default value = "augmented")
- "1d": Computed with 1-dimensional simplicial complex (vertex, edge).
- "augmented": Computed with 2-dimensional simplicial complex, length <=3 (vertex, edge, face).
verbose: {"INFO","DEBUG","ERROR"}
Verbose level. (Default value = "ERROR")
- "INFO": show only iteration process log.
- "DEBUG": show all output logs.
- "ERROR": only show log if error happened.
"""
self.G = G.copy()
self.weight = weight
self.method = method
if not nx.get_edge_attributes(self.G, self.weight):
logger.info('Edge weight not detected in graph, use "weight" as default edge weight.')
for (v1, v2) in self.G.edges():
self.G[v1][v2][self.weight] = 1.0
if not nx.get_node_attributes(self.G, self.weight):
logger.info('Node weight not detected in graph, use "weight" as default node weight.')
for v in self.G.nodes():
self.G.nodes[v][self.weight] = 1.0
if self.G.is_directed():
logger.info("Forman-Ricci curvature is not supported for directed graph yet, "
"covert input graph to undirected.")
self.G = self.G.to_undirected()
set_verbose(verbose)
[docs] def compute_ricci_curvature(self):
"""Compute Forman-ricci curvature for all nodes and edges in G.
Node curvature is defined as the average of all it's adjacency edge.
Returns
-------
G: NetworkX graph
A NetworkX graph with "formanCurvature" on nodes and edges.
Examples
--------
To compute the Forman-Ricci curvature for karate club graph:
>>> G = nx.karate_club_graph()
>>> frc = FormanRicci(G)
>>> frc.compute_ricci_curvature()
>>> frc.G[0][2]
{'weight': 1.0, 'formanCurvature': -7.0}
"""
if self.method == "1d":
# Edge Forman curvature
for (v1, v2) in self.G.edges():
v1_nbr = set(self.G.neighbors(v1))
v1_nbr.remove(v2)
v2_nbr = set(self.G.neighbors(v2))
v2_nbr.remove(v1)
w_e = self.G[v1][v2][self.weight]
w_v1 = self.G.nodes[v1][self.weight]
w_v2 = self.G.nodes[v2][self.weight]
ev1_sum = sum([w_v1 / math.sqrt(w_e * self.G[v1][v][self.weight]) for v in v1_nbr])
ev2_sum = sum([w_v2 / math.sqrt(w_e * self.G[v2][v][self.weight]) for v in v2_nbr])
self.G[v1][v2]["formanCurvature"] = w_e * (w_v1 / w_e + w_v2 / w_e - (ev1_sum + ev2_sum))
logger.debug("Source: %s, target: %d, Forman-Ricci curvature = %f " % (
v1, v2, self.G[v1][v2]["formanCurvature"]))
elif self.method == "augmented":
# Edge Forman curvature
for (v1, v2) in self.G.edges():
v1_nbr = set(self.G.neighbors(v1))
v1_nbr.remove(v2)
v2_nbr = set(self.G.neighbors(v2))
v2_nbr.remove(v1)
face = v1_nbr & v2_nbr
# prl_nbr = (v1_nbr | v2_nbr) - face
w_e = self.G[v1][v2][self.weight]
w_f = 1 # Assume all face have weight 1
w_v1 = self.G.nodes[v1][self.weight]
w_v2 = self.G.nodes[v2][self.weight]
sum_ef = sum([w_e / w_f for _ in face])
sum_ve = sum([w_v1 / w_e + w_v2 / w_e])
# sum_ehef = sum([math.sqrt(w_e*self.G[v1][v][self.weight])/w_f +
# math.sqrt(w_e*self.G[v2][v][self.weight])/w_f
# for v in face])
sum_ehef = 0 # Always 0 for cycle = 3 case.
sum_veeh = sum([w_v1 / math.sqrt(w_e * self.G[v1][v][self.weight]) for v in (v1_nbr - face)] +
[w_v2 / math.sqrt(w_e * self.G[v2][v][self.weight]) for v in (v2_nbr - face)])
self.G[v1][v2]["formanCurvature"] = w_e * (sum_ef + sum_ve - math.fabs(sum_ehef - sum_veeh))
logger.debug("Source: %s, target: %d, Forman-Ricci curvature = %f " % (
v1, v2, self.G[v1][v2]["formanCurvature"]))
else:
assert True, 'Method %s not available. Support methods: {"1d","augmented"}' % self.method
# Node Forman curvature
for n in self.G.nodes():
fcsum = 0 # sum of the neighbor Forman curvature
if self.G.degree(n) != 0:
for nbr in self.G.neighbors(n):
if 'formanCurvature' in self.G[n][nbr]:
fcsum += self.G[n][nbr]['formanCurvature']
# assign the node Forman curvature to be the average of node's adjacency edges
self.G.nodes[n]['formanCurvature'] = fcsum / self.G.degree(n)
else:
self.G.nodes[n]['formanCurvature'] = fcsum
logger.debug("node %d, Forman Curvature = %f" % (n, self.G.nodes[n]['formanCurvature']))
logger.debug("Forman curvature (%s) computation done." % self.method)