Spaces:
Sleeping
Sleeping
import networkx as nx | |
from typing import List, Dict, Any, Tuple | |
import json | |
class GraphBuilder: | |
def __init__(self): | |
self.graph = nx.DiGraph() # Directed graph for relationships | |
def build_graph(self, entities: List[Dict[str, Any]], relationships: List[Dict[str, Any]]) -> nx.DiGraph: | |
"""Build NetworkX graph from entities and relationships.""" | |
self.graph.clear() | |
# Add entities as nodes | |
for entity in entities: | |
node_id = entity.get("name", "").strip() | |
if node_id: | |
self.graph.add_node( | |
node_id, | |
type=entity.get("type", "UNKNOWN"), | |
importance=entity.get("importance", 0.0), | |
description=entity.get("description", ""), | |
size=self._calculate_node_size(entity.get("importance", 0.0)) | |
) | |
# Add relationships as edges | |
for relationship in relationships: | |
source = relationship.get("source", "").strip() | |
target = relationship.get("target", "").strip() | |
rel_type = relationship.get("relationship", "related_to") | |
description = relationship.get("description", "") | |
if source and target and source in self.graph.nodes and target in self.graph.nodes: | |
self.graph.add_edge( | |
source, | |
target, | |
relationship=rel_type, | |
description=description, | |
weight=1.0 | |
) | |
return self.graph | |
def _calculate_node_size(self, importance: float) -> int: | |
"""Calculate node size based on importance score.""" | |
# Map importance (0.0-1.0) to node size (10-50) | |
min_size, max_size = 10, 50 | |
return int(min_size + (max_size - min_size) * importance) | |
def get_graph_statistics(self) -> Dict[str, Any]: | |
"""Get basic statistics about the graph.""" | |
if not self.graph.nodes(): | |
return { | |
"num_nodes": 0, | |
"num_edges": 0, | |
"density": 0.0, | |
"is_connected": False, | |
"num_components": 0 | |
} | |
# Convert to undirected for connectivity analysis | |
undirected = self.graph.to_undirected() | |
return { | |
"num_nodes": self.graph.number_of_nodes(), | |
"num_edges": self.graph.number_of_edges(), | |
"density": nx.density(self.graph), | |
"is_connected": nx.is_connected(undirected), | |
"num_components": nx.number_connected_components(undirected), | |
"avg_degree": sum(dict(self.graph.degree()).values()) / self.graph.number_of_nodes() if self.graph.number_of_nodes() > 0 else 0 | |
} | |
def get_central_nodes(self, top_k: int = 5) -> List[Tuple[str, float]]: | |
"""Get most central nodes using various centrality measures.""" | |
if not self.graph.nodes(): | |
return [] | |
centralities = {} | |
# Degree centrality | |
degree_centrality = nx.degree_centrality(self.graph) | |
# Betweenness centrality (if graph has enough nodes) | |
if self.graph.number_of_nodes() > 2: | |
betweenness_centrality = nx.betweenness_centrality(self.graph) | |
else: | |
betweenness_centrality = {node: 0.0 for node in self.graph.nodes()} | |
# PageRank | |
try: | |
pagerank = nx.pagerank(self.graph) | |
except: | |
pagerank = {node: 1.0/self.graph.number_of_nodes() for node in self.graph.nodes()} | |
# Combine centrality measures | |
for node in self.graph.nodes(): | |
importance = self.graph.nodes[node].get('importance', 0.0) | |
combined_score = ( | |
0.3 * degree_centrality.get(node, 0.0) + | |
0.3 * betweenness_centrality.get(node, 0.0) + | |
0.2 * pagerank.get(node, 0.0) + | |
0.2 * importance | |
) | |
centralities[node] = combined_score | |
# Sort by centrality score | |
sorted_nodes = sorted(centralities.items(), key=lambda x: x[1], reverse=True) | |
return sorted_nodes[:top_k] | |
def filter_graph(self, | |
entity_types: List[str] = None, | |
min_importance: float = None, | |
relationship_types: List[str] = None) -> nx.DiGraph: | |
"""Filter graph by entity types, importance, or relationship types.""" | |
filtered_graph = self.graph.copy() | |
# Filter nodes by type and importance | |
nodes_to_remove = [] | |
for node, data in filtered_graph.nodes(data=True): | |
if entity_types and data.get('type') not in entity_types: | |
nodes_to_remove.append(node) | |
elif min_importance and data.get('importance', 0.0) < min_importance: | |
nodes_to_remove.append(node) | |
filtered_graph.remove_nodes_from(nodes_to_remove) | |
# Filter edges by relationship type | |
if relationship_types: | |
edges_to_remove = [] | |
for u, v, data in filtered_graph.edges(data=True): | |
if data.get('relationship') not in relationship_types: | |
edges_to_remove.append((u, v)) | |
filtered_graph.remove_edges_from(edges_to_remove) | |
return filtered_graph | |
def export_graph(self, format_type: str = "json") -> str: | |
"""Export graph in various formats.""" | |
if format_type.lower() == "json": | |
return self._export_json() | |
elif format_type.lower() == "graphml": | |
return self._export_graphml() | |
elif format_type.lower() == "gexf": | |
return self._export_gexf() | |
else: | |
raise ValueError(f"Unsupported export format: {format_type}") | |
def _export_json(self) -> str: | |
"""Export graph as JSON.""" | |
data = { | |
"nodes": [], | |
"edges": [] | |
} | |
# Export nodes | |
for node, attrs in self.graph.nodes(data=True): | |
node_data = {"id": node} | |
node_data.update(attrs) | |
data["nodes"].append(node_data) | |
# Export edges | |
for u, v, attrs in self.graph.edges(data=True): | |
edge_data = {"source": u, "target": v} | |
edge_data.update(attrs) | |
data["edges"].append(edge_data) | |
return json.dumps(data, indent=2) | |
def _export_graphml(self) -> str: | |
"""Export graph as GraphML.""" | |
import io | |
output = io.StringIO() | |
nx.write_graphml(self.graph, output) | |
return output.getvalue() | |
def _export_gexf(self) -> str: | |
"""Export graph as GEXF.""" | |
import io | |
output = io.StringIO() | |
nx.write_gexf(self.graph, output) | |
return output.getvalue() | |
def get_subgraph_around_node(self, node: str, radius: int = 1) -> nx.DiGraph: | |
"""Get subgraph within specified radius of a node.""" | |
if node not in self.graph: | |
return nx.DiGraph() | |
# Get nodes within radius | |
nodes_in_radius = set([node]) | |
current_nodes = set([node]) | |
for _ in range(radius): | |
next_nodes = set() | |
for n in current_nodes: | |
# Add neighbors (both incoming and outgoing) | |
next_nodes.update(self.graph.successors(n)) | |
next_nodes.update(self.graph.predecessors(n)) | |
nodes_in_radius.update(next_nodes) | |
current_nodes = next_nodes - nodes_in_radius | |
if not current_nodes: | |
break | |
return self.graph.subgraph(nodes_in_radius).copy() | |
def get_shortest_path(self, source: str, target: str) -> List[str]: | |
"""Get shortest path between two nodes.""" | |
try: | |
# Convert to undirected for path finding | |
undirected = self.graph.to_undirected() | |
return nx.shortest_path(undirected, source, target) | |
except (nx.NetworkXNoPath, nx.NodeNotFound): | |
return [] | |
def get_node_info(self, node: str) -> Dict[str, Any]: | |
"""Get detailed information about a specific node.""" | |
if node not in self.graph: | |
return {} | |
node_data = dict(self.graph.nodes[node]) | |
# Add connectivity information | |
predecessors = list(self.graph.predecessors(node)) | |
successors = list(self.graph.successors(node)) | |
node_data.update({ | |
"in_degree": self.graph.in_degree(node), | |
"out_degree": self.graph.out_degree(node), | |
"predecessors": predecessors, | |
"successors": successors, | |
"total_connections": len(predecessors) + len(successors) | |
}) | |
return node_data | |