""" Graph builder module for converting GDELT data to graph formats """ import pandas as pd import networkx as nx import json class GraphBuilder: """Base class for building graph from GDELT data""" def process_entities(self, row): """Process entities from a row and return nodes and relationships""" nodes = [] relationships = [] event_id = row["GKGRECORDID"] event_date = row["DATE"] event_source = row["SourceCommonName"] event_document_id = row["DocumentIdentifier"] # event_image = row["V2.1SharingImage"] if pd.notna(row["V2.1SharingImage"]) else "" event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else "" event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0 # Add event node nodes.append({ "id": event_id, "type": "event", "properties": { "date": event_date, "source": event_source, "document": event_document_id, # "image": event_image, "quotations": event_quotations, "tone": event_tone } }) # Process each entity type entity_mappings = { "V2EnhancedPersons": ("Person", "MENTIONED_IN"), "V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"), "V2EnhancedLocations": ("Location", "LOCATED_IN"), "V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"), "V2.1AllNames": ("Name", "MENTIONED_IN"), "V2.1Counts": ("Count", "MENTIONED_IN"), "V2.1Amounts": ("Amount", "MENTIONED_IN"), } for field, (label, relationship) in entity_mappings.items(): if pd.notna(row[field]): entities = [e.strip() for e in row[field].split(';') if e.strip()] for entity in entities: nodes.append({ "id": entity, "type": label.lower(), "properties": {"name": entity} }) relationships.append({ "from": entity, "to": event_id, "type": relationship, "properties": {"created_at": event_date} }) return nodes, relationships class NetworkXBuilder(GraphBuilder): """Builder for NetworkX graphs""" def build_graph(self, df): G = nx.Graph() for _, row in df.iterrows(): nodes, relationships = self.process_entities(row) # Add nodes for node in nodes: G.add_node(node["id"], type=node["type"], **node["properties"]) # Add relationships for rel in relationships: G.add_edge(rel["from"], rel["to"], relationship=rel["type"], **rel["properties"]) return G class Neo4jBuilder(GraphBuilder): def __init__(self, uri, user, password): self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.logger = logging.getLogger(__name__) def close(self): self.driver.close() def build_graph(self, df): with self.driver.session() as session: for _, row in df.iterrows(): nodes, relationships = self.process_entities(row) # Create nodes and relationships in Neo4j try: session.execute_write(self._create_graph_elements, nodes, relationships) except Exception as e: self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}") def _create_graph_elements(self, tx, nodes, relationships): # Create nodes for node in nodes: query = f""" MERGE (n:{node['type']} {{id: $id}}) SET n += $properties """ tx.run(query, id=node["id"], properties=node["properties"]) # Create relationships for rel in relationships: query = f""" MATCH (a {{id: $from_id}}) MATCH (b {{id: $to_id}}) MERGE (a)-[r:{rel['type']}]->(b) SET r += $properties """ tx.run(query, from_id=rel["from"], to_id=rel["to"], properties=rel["properties"]) class StreamlitGraphBuilder: """Adapted graph builder for Streamlit visualization""" def __init__(self): self.G = nx.Graph() def process_row(self, row): """Process a single row of data""" event_id = row["GKGRECORDID"] event_props = { "type": "event", # already in lowercase "date": row["DATE"], "source": row["SourceCommonName"], "document": row["DocumentIdentifier"], "tone": row["tone"], # Store display name in its original format if needed. "name": row["SourceCommonName"] } self.G.add_node(event_id, **event_props) # Use lowercase node types for consistency in lookups. entity_types = { "V2EnhancedPersons": ("person", "MENTIONED_IN"), "V2EnhancedOrganizations": ("organization", "MENTIONED_IN"), "V2EnhancedLocations": ("location", "LOCATED_IN"), "V2EnhancedThemes": ("theme", "CATEGORIZED_AS"), "V2.1AllNames": ("name", "MENTIONED_IN"), "V2.1Counts": ("count", "MENTIONED_IN"), "V2.1Amounts": ("amount", "MENTIONED_IN"), } for col, (node_type, rel_type) in entity_types.items(): if pd.notna(row[col]): # The actual display value (which may be in Parent Case) is preserved in the "name" attribute. entities = [e.strip() for e in row[col].split(';') if e.strip()] for entity in entities: self.G.add_node(entity, type=node_type, name=entity) self.G.add_edge(entity, event_id, relationship=rel_type, date=row["DATE"]) class StLinkBuilder(GraphBuilder): """Builder for st-link-analysis compatible graphs""" def build_graph(self, df): """Build graph in st-link-analysis format""" all_nodes = [] all_edges = [] edge_counter = 0 # Track nodes we've already added to avoid duplicates added_nodes = set() for _, row in df.iterrows(): nodes, relationships = self.process_entities(row) # Process nodes for node in nodes: if node["id"] not in added_nodes: stlink_node = { "data": { "id": str(node["id"]), "label": node["type"].upper(), **node["properties"] } } all_nodes.append(stlink_node) added_nodes.add(node["id"]) # Process relationships/edges for rel in relationships: edge_counter += 1 stlink_edge = { "data": { "id": f"e{edge_counter}", "source": str(rel["from"]), "target": str(rel["to"]), "label": rel["type"], **rel["properties"] } } all_edges.append(stlink_edge) return { "nodes": all_nodes, "edges": all_edges } def write_json(self, graph_data, filename): """Write graph to JSON file""" with open(filename, 'w') as f: json.dump(graph_data, f, indent=2)