|
""" |
|
Graph builder module for converting GDELT data to graph formats |
|
""" |
|
import pandas as pd |
|
import networkx as nx |
|
import json |
|
|
|
class GraphBuilder: |
|
"""Base class for building graph from GDELT data""" |
|
def process_entities(self, row): |
|
"""Process entities from a row and return nodes and relationships""" |
|
nodes = [] |
|
relationships = [] |
|
event_id = row["GKGRECORDID"] |
|
event_date = row["DATE"] |
|
event_source = row["SourceCommonName"] |
|
event_document_id = row["DocumentIdentifier"] |
|
|
|
event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else "" |
|
event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0 |
|
|
|
|
|
nodes.append({ |
|
"id": event_id, |
|
"type": "event", |
|
"properties": { |
|
"date": event_date, |
|
"source": event_source, |
|
"document": event_document_id, |
|
|
|
"quotations": event_quotations, |
|
"tone": event_tone |
|
} |
|
}) |
|
|
|
|
|
entity_mappings = { |
|
"V2EnhancedPersons": ("Person", "MENTIONED_IN"), |
|
"V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"), |
|
"V2EnhancedLocations": ("Location", "LOCATED_IN"), |
|
"V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"), |
|
"V2.1AllNames": ("Name", "MENTIONED_IN"), |
|
"V2.1Counts": ("Count", "MENTIONED_IN"), |
|
"V2.1Amounts": ("Amount", "MENTIONED_IN"), |
|
} |
|
|
|
for field, (label, relationship) in entity_mappings.items(): |
|
if pd.notna(row[field]): |
|
entities = [e.strip() for e in row[field].split(';') if e.strip()] |
|
for entity in entities: |
|
nodes.append({ |
|
"id": entity, |
|
"type": label.lower(), |
|
"properties": {"name": entity} |
|
}) |
|
relationships.append({ |
|
"from": entity, |
|
"to": event_id, |
|
"type": relationship, |
|
"properties": {"created_at": event_date} |
|
}) |
|
|
|
return nodes, relationships |
|
|
|
class NetworkXBuilder(GraphBuilder): |
|
"""Builder for NetworkX graphs""" |
|
def build_graph(self, df): |
|
G = nx.Graph() |
|
|
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row) |
|
|
|
|
|
for node in nodes: |
|
G.add_node(node["id"], |
|
type=node["type"], |
|
**node["properties"]) |
|
|
|
|
|
for rel in relationships: |
|
G.add_edge(rel["from"], |
|
rel["to"], |
|
relationship=rel["type"], |
|
**rel["properties"]) |
|
|
|
return G |
|
|
|
class Neo4jBuilder(GraphBuilder): |
|
def __init__(self, uri, user, password): |
|
self.driver = GraphDatabase.driver(uri, auth=(user, password)) |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def close(self): |
|
self.driver.close() |
|
|
|
def build_graph(self, df): |
|
with self.driver.session() as session: |
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row) |
|
|
|
|
|
try: |
|
session.execute_write(self._create_graph_elements, |
|
nodes, relationships) |
|
except Exception as e: |
|
self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}") |
|
|
|
def _create_graph_elements(self, tx, nodes, relationships): |
|
|
|
for node in nodes: |
|
query = f""" |
|
MERGE (n:{node['type']} {{id: $id}}) |
|
SET n += $properties |
|
""" |
|
tx.run(query, id=node["id"], properties=node["properties"]) |
|
|
|
|
|
for rel in relationships: |
|
query = f""" |
|
MATCH (a {{id: $from_id}}) |
|
MATCH (b {{id: $to_id}}) |
|
MERGE (a)-[r:{rel['type']}]->(b) |
|
SET r += $properties |
|
""" |
|
tx.run(query, |
|
from_id=rel["from"], |
|
to_id=rel["to"], |
|
properties=rel["properties"]) |
|
|
|
class StreamlitGraphBuilder: |
|
"""Adapted graph builder for Streamlit visualization""" |
|
def __init__(self): |
|
self.G = nx.Graph() |
|
|
|
def process_row(self, row): |
|
"""Process a single row of data""" |
|
event_id = row["GKGRECORDID"] |
|
event_props = { |
|
"type": "event", |
|
"date": row["DATE"], |
|
"source": row["SourceCommonName"], |
|
"document": row["DocumentIdentifier"], |
|
"tone": row["tone"], |
|
|
|
"name": row["SourceCommonName"] |
|
} |
|
|
|
self.G.add_node(event_id, **event_props) |
|
|
|
|
|
entity_types = { |
|
"V2EnhancedPersons": ("person", "MENTIONED_IN"), |
|
"V2EnhancedOrganizations": ("organization", "MENTIONED_IN"), |
|
"V2EnhancedLocations": ("location", "LOCATED_IN"), |
|
"V2EnhancedThemes": ("theme", "CATEGORIZED_AS"), |
|
"V2.1AllNames": ("name", "MENTIONED_IN"), |
|
"V2.1Counts": ("count", "MENTIONED_IN"), |
|
"V2.1Amounts": ("amount", "MENTIONED_IN"), |
|
} |
|
|
|
for col, (node_type, rel_type) in entity_types.items(): |
|
if pd.notna(row[col]): |
|
|
|
entities = [e.strip() for e in row[col].split(';') if e.strip()] |
|
for entity in entities: |
|
self.G.add_node(entity, type=node_type, name=entity) |
|
self.G.add_edge(entity, event_id, |
|
relationship=rel_type, |
|
date=row["DATE"]) |
|
|
|
class StLinkBuilder(GraphBuilder): |
|
"""Builder for st-link-analysis compatible graphs""" |
|
def build_graph(self, df): |
|
"""Build graph in st-link-analysis format""" |
|
all_nodes = [] |
|
all_edges = [] |
|
edge_counter = 0 |
|
|
|
|
|
added_nodes = set() |
|
|
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row) |
|
|
|
|
|
for node in nodes: |
|
if node["id"] not in added_nodes: |
|
stlink_node = { |
|
"data": { |
|
"id": str(node["id"]), |
|
"label": node["type"].upper(), |
|
**node["properties"] |
|
} |
|
} |
|
all_nodes.append(stlink_node) |
|
added_nodes.add(node["id"]) |
|
|
|
|
|
for rel in relationships: |
|
edge_counter += 1 |
|
stlink_edge = { |
|
"data": { |
|
"id": f"e{edge_counter}", |
|
"source": str(rel["from"]), |
|
"target": str(rel["to"]), |
|
"label": rel["type"], |
|
**rel["properties"] |
|
} |
|
} |
|
all_edges.append(stlink_edge) |
|
|
|
return { |
|
"nodes": all_nodes, |
|
"edges": all_edges |
|
} |
|
|
|
def write_json(self, graph_data, filename): |
|
"""Write graph to JSON file""" |
|
with open(filename, 'w') as f: |
|
json.dump(graph_data, f, indent=2) |