Source code for GraphRicciCurvature.util

import logging
import community as community_louvain
import networkx as nx
import numpy as np
from functools import partial, partialmethod

logging.TRACE = logging.DEBUG + 5
logging.addLevelName(logging.TRACE, 'TRACE')
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)

logger = logging.getLogger("GraphRicciCurvature")


[docs]def set_verbose(verbose="ERROR"): """Set up the verbose level of the GraphRicciCurvature. Parameters ---------- verbose : {"INFO", "TRACE","DEBUG","ERROR"} Verbose level. (Default value = "ERROR") - "INFO": show only iteration process log. - "TRACE": show detailed iteration process log. - "DEBUG": show all output logs. - "ERROR": only show log if error happened. """ if verbose == "INFO": logger.setLevel(logging.INFO) elif verbose == "TRACE": logger.setLevel(logging.TRACE) elif verbose == "DEBUG": logger.setLevel(logging.DEBUG) elif verbose == "ERROR": logger.setLevel(logging.ERROR) else: print('Incorrect verbose level, option:["INFO","DEBUG","ERROR"], use "ERROR instead."') logger.setLevel(logging.ERROR)
[docs]def cut_graph_by_cutoff(G_origin, cutoff, weight="weight"): """Remove graph's edges with "weight" greater than "cutoff". Parameters ---------- G_origin : NetworkX graph A graph with ``weight`` as Ricci flow metric to cut. cutoff : float A threshold to remove all edges with "weight" greater than it. weight : str The edge weight used as Ricci flow metric. (Default value = "weight") Returns ------- G: NetworkX graph A graph with edges cut by given cutoff value. """ assert nx.get_edge_attributes(G_origin, weight), "No edge weight detected, abort." G = G_origin.copy() edge_trim_list = [] for n1, n2 in G.edges(): if G[n1][n2][weight] > cutoff: edge_trim_list.append((n1, n2)) G.remove_edges_from(edge_trim_list) return G
[docs]def get_rf_metric_cutoff(G_origin, weight="weight", cutoff_step=0.025, drop_threshold=0.01): """Get good clustering cutoff points for Ricci flow metric by detect the change of modularity while removing edges. Parameters ---------- G_origin : NetworkX graph A graph with "weight" as Ricci flow metric to cut. weight : str The edge weight used as Ricci flow metric. (Default value = "weight") cutoff_step : float The step size to find the good cutoff points. drop_threshold : float At least drop this much to considered as a drop for good_cut. Returns ------- good_cuts : list of float A list of possible cutoff point, usually we use the first one as the best cut. """ G = G_origin.copy() modularity, ari = [], [] maxw = max(nx.get_edge_attributes(G, weight).values()) cutoff_range = np.arange(maxw, 1, -cutoff_step) for cutoff in cutoff_range: G = cut_graph_by_cutoff(G, cutoff, weight=weight) # Get connected component after cut as clustering clustering = {c: idx for idx, comp in enumerate(nx.connected_components(G)) for c in comp} # Compute modularity modularity.append(community_louvain.modularity(clustering, G, weight)) good_cuts = [] mod_last = modularity[-1] # check drop from 1 -> maxw for i in range(len(modularity) - 1, 0, -1): mod_now = modularity[i] if mod_last > mod_now > 1e-4 and abs(mod_last - mod_now) / mod_last > drop_threshold: logger.trace("Cut detected: cut:%f, diff:%f, mod_now:%f, mod_last:%f" % ( cutoff_range[i+1], mod_last - mod_now, mod_now, mod_last)) good_cuts.append(cutoff_range[i+1]) mod_last = mod_now return good_cuts