insight / graph_builder.py
dwb2023's picture
Initial commit for Hugging Face Spaces
3bb5fb5
raw
history blame
8.13 kB
"""
Graph builder module for converting GDELT data to graph formats
"""
import pandas as pd
import networkx as nx
import json
class GraphBuilder:
"""Base class for building graph from GDELT data"""
def process_entities(self, row):
"""Process entities from a row and return nodes and relationships"""
nodes = []
relationships = []
event_id = row["GKGRECORDID"]
event_date = row["DATE"]
event_source = row["SourceCommonName"]
event_document_id = row["DocumentIdentifier"]
# event_image = row["V2.1SharingImage"] if pd.notna(row["V2.1SharingImage"]) else ""
event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else ""
event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0
# Add event node
nodes.append({
"id": event_id,
"type": "event",
"properties": {
"date": event_date,
"source": event_source,
"document": event_document_id,
# "image": event_image,
"quotations": event_quotations,
"tone": event_tone
}
})
# Process each entity type
entity_mappings = {
"V2EnhancedPersons": ("Person", "MENTIONED_IN"),
"V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
"V2EnhancedLocations": ("Location", "LOCATED_IN"),
"V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"),
"V2.1AllNames": ("Name", "MENTIONED_IN"),
"V2.1Counts": ("Count", "MENTIONED_IN"),
"V2.1Amounts": ("Amount", "MENTIONED_IN"),
}
for field, (label, relationship) in entity_mappings.items():
if pd.notna(row[field]):
entities = [e.strip() for e in row[field].split(';') if e.strip()]
for entity in entities:
nodes.append({
"id": entity,
"type": label.lower(),
"properties": {"name": entity}
})
relationships.append({
"from": entity,
"to": event_id,
"type": relationship,
"properties": {"created_at": event_date}
})
return nodes, relationships
class NetworkXBuilder(GraphBuilder):
"""Builder for NetworkX graphs"""
def build_graph(self, df):
G = nx.Graph()
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Add nodes
for node in nodes:
G.add_node(node["id"],
type=node["type"],
**node["properties"])
# Add relationships
for rel in relationships:
G.add_edge(rel["from"],
rel["to"],
relationship=rel["type"],
**rel["properties"])
return G
class Neo4jBuilder(GraphBuilder):
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.logger = logging.getLogger(__name__)
def close(self):
self.driver.close()
def build_graph(self, df):
with self.driver.session() as session:
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Create nodes and relationships in Neo4j
try:
session.execute_write(self._create_graph_elements,
nodes, relationships)
except Exception as e:
self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}")
def _create_graph_elements(self, tx, nodes, relationships):
# Create nodes
for node in nodes:
query = f"""
MERGE (n:{node['type']} {{id: $id}})
SET n += $properties
"""
tx.run(query, id=node["id"], properties=node["properties"])
# Create relationships
for rel in relationships:
query = f"""
MATCH (a {{id: $from_id}})
MATCH (b {{id: $to_id}})
MERGE (a)-[r:{rel['type']}]->(b)
SET r += $properties
"""
tx.run(query,
from_id=rel["from"],
to_id=rel["to"],
properties=rel["properties"])
class StreamlitGraphBuilder:
"""Adapted graph builder for Streamlit visualization"""
def __init__(self):
self.G = nx.Graph()
def process_row(self, row):
"""Process a single row of data"""
event_id = row["GKGRECORDID"]
event_props = {
"type": "event", # already in lowercase
"date": row["DATE"],
"source": row["SourceCommonName"],
"document": row["DocumentIdentifier"],
"tone": row["tone"],
# Store display name in its original format if needed.
"name": row["SourceCommonName"]
}
self.G.add_node(event_id, **event_props)
# Use lowercase node types for consistency in lookups.
entity_types = {
"V2EnhancedPersons": ("person", "MENTIONED_IN"),
"V2EnhancedOrganizations": ("organization", "MENTIONED_IN"),
"V2EnhancedLocations": ("location", "LOCATED_IN"),
"V2EnhancedThemes": ("theme", "CATEGORIZED_AS"),
"V2.1AllNames": ("name", "MENTIONED_IN"),
"V2.1Counts": ("count", "MENTIONED_IN"),
"V2.1Amounts": ("amount", "MENTIONED_IN"),
}
for col, (node_type, rel_type) in entity_types.items():
if pd.notna(row[col]):
# The actual display value (which may be in Parent Case) is preserved in the "name" attribute.
entities = [e.strip() for e in row[col].split(';') if e.strip()]
for entity in entities:
self.G.add_node(entity, type=node_type, name=entity)
self.G.add_edge(entity, event_id,
relationship=rel_type,
date=row["DATE"])
class StLinkBuilder(GraphBuilder):
"""Builder for st-link-analysis compatible graphs"""
def build_graph(self, df):
"""Build graph in st-link-analysis format"""
all_nodes = []
all_edges = []
edge_counter = 0
# Track nodes we've already added to avoid duplicates
added_nodes = set()
for _, row in df.iterrows():
nodes, relationships = self.process_entities(row)
# Process nodes
for node in nodes:
if node["id"] not in added_nodes:
stlink_node = {
"data": {
"id": str(node["id"]),
"label": node["type"].upper(),
**node["properties"]
}
}
all_nodes.append(stlink_node)
added_nodes.add(node["id"])
# Process relationships/edges
for rel in relationships:
edge_counter += 1
stlink_edge = {
"data": {
"id": f"e{edge_counter}",
"source": str(rel["from"]),
"target": str(rel["to"]),
"label": rel["type"],
**rel["properties"]
}
}
all_edges.append(stlink_edge)
return {
"nodes": all_nodes,
"edges": all_edges
}
def write_json(self, graph_data, filename):
"""Write graph to JSON file"""
with open(filename, 'w') as f:
json.dump(graph_data, f, indent=2)