|
""" |
|
Graph builder module for converting GDELT data to graph formats. |
|
|
|
This module provides classes for building graphs from GDELT data in various formats, |
|
including NetworkX, Neo4j, and st-link-analysis. It supports batch processing, |
|
input validation, custom properties, and logging for robust and efficient graph construction. |
|
""" |
|
import pandas as pd |
|
import networkx as nx |
|
import logging |
|
import json |
|
from neo4j import GraphDatabase |
|
from typing import Dict, List, Optional, Set, Tuple |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class GraphBuilder: |
|
""" |
|
Base class for building graphs from GDELT data. |
|
|
|
Attributes: |
|
ENTITY_MAPPINGS (dict): Mapping of GDELT fields to node types and relationships. |
|
logger (Logger): Logger instance for tracking progress and errors. |
|
""" |
|
ENTITY_MAPPINGS = { |
|
"V2EnhancedPersons": ("Person", "MENTIONED_IN"), |
|
"V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"), |
|
"V2EnhancedLocations": ("Location", "LOCATED_IN"), |
|
"V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"), |
|
"V2.1AllNames": ("Name", "MENTIONED_IN"), |
|
"V2.1Counts": ("Count", "MENTIONED_IN"), |
|
"V2.1Amounts": ("Amount", "MENTIONED_IN"), |
|
} |
|
|
|
def __init__(self): |
|
self.logger = logger |
|
|
|
def validate_input(self, df): |
|
""" |
|
Validate input DataFrame for required columns. |
|
|
|
Args: |
|
df (pd.DataFrame): Input DataFrame containing GDELT data. |
|
|
|
Raises: |
|
ValueError: If any required column is missing. |
|
""" |
|
required_columns = ["GKGRECORDID", "DATE", "SourceCommonName", "DocumentIdentifier"] |
|
for col in required_columns: |
|
if col not in df.columns: |
|
raise ValueError(f"Missing required column: {col}") |
|
|
|
def process_entities(self, row, custom_node_props=None, custom_edge_props=None): |
|
""" |
|
Process entities from a row and return nodes and relationships. |
|
|
|
Args: |
|
row (pd.Series): A row of GDELT data. |
|
custom_node_props (dict, optional): Custom properties for nodes. |
|
custom_edge_props (dict, optional): Custom properties for edges. |
|
|
|
Returns: |
|
Tuple[List[Dict], List[Dict]]: Lists of nodes and relationships. |
|
""" |
|
nodes = [] |
|
relationships = [] |
|
event_id = row["GKGRECORDID"] |
|
event_date = row["DATE"] |
|
event_source = row["SourceCommonName"] |
|
event_document_id = row["DocumentIdentifier"] |
|
event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else "" |
|
event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0 |
|
|
|
|
|
event_props = { |
|
"date": event_date, |
|
"source": event_source, |
|
"document": event_document_id, |
|
"quotations": event_quotations, |
|
"tone": event_tone |
|
} |
|
if custom_node_props: |
|
event_props.update(custom_node_props) |
|
|
|
nodes.append({ |
|
"id": event_id, |
|
"type": "event", |
|
"properties": event_props |
|
}) |
|
|
|
|
|
for field, (label, relationship) in self.ENTITY_MAPPINGS.items(): |
|
if pd.notna(row[field]): |
|
entities = [e.strip() for e in row[field].split(';') if e.strip()] |
|
for entity in entities: |
|
nodes.append({ |
|
"id": entity, |
|
"type": label.lower(), |
|
"properties": {"name": entity} |
|
}) |
|
edge_props = {"created_at": event_date} |
|
if custom_edge_props: |
|
edge_props.update(custom_edge_props) |
|
relationships.append({ |
|
"from": entity, |
|
"to": event_id, |
|
"type": relationship, |
|
"properties": edge_props |
|
}) |
|
|
|
return nodes, relationships |
|
|
|
def validate_graph(self, G): |
|
""" |
|
Validate the graph for consistency. |
|
|
|
Args: |
|
G (nx.Graph): The graph to validate. |
|
|
|
Raises: |
|
ValueError: If the graph is invalid. |
|
""" |
|
if not isinstance(G, nx.Graph): |
|
raise ValueError("Input is not a valid NetworkX graph.") |
|
if len(G.nodes) == 0: |
|
raise ValueError("Graph has no nodes.") |
|
if len(G.edges) == 0: |
|
raise ValueError("Graph has no edges.") |
|
self.logger.info("Graph validation passed.") |
|
|
|
class NetworkXBuilder(GraphBuilder): |
|
""" |
|
Builder for NetworkX graphs. |
|
|
|
Attributes: |
|
directed (bool): Whether to create a directed graph. |
|
""" |
|
def __init__(self, directed=False): |
|
super().__init__() |
|
self.directed = directed |
|
|
|
def build_graph(self, df, custom_node_props=None, custom_edge_props=None): |
|
""" |
|
Build a NetworkX graph from the DataFrame. |
|
|
|
Args: |
|
df (pd.DataFrame): Input DataFrame containing GDELT data. |
|
custom_node_props (dict, optional): Custom properties for nodes. |
|
custom_edge_props (dict, optional): Custom properties for edges. |
|
|
|
Returns: |
|
nx.Graph: The constructed graph. |
|
""" |
|
self.validate_input(df) |
|
G = nx.DiGraph() if self.directed else nx.Graph() |
|
|
|
try: |
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props) |
|
|
|
|
|
for node in nodes: |
|
G.add_node(node["id"], type=node["type"], **node["properties"]) |
|
|
|
|
|
for rel in relationships: |
|
G.add_edge(rel["from"], rel["to"], relationship=rel["type"], **rel["properties"]) |
|
|
|
self.validate_graph(G) |
|
self.logger.info("NetworkX graph built successfully.") |
|
return G |
|
except Exception as e: |
|
self.logger.error(f"Error building NetworkX graph: {str(e)}") |
|
raise |
|
|
|
class Neo4jBuilder(GraphBuilder): |
|
""" |
|
Builder for Neo4j graphs. |
|
|
|
Attributes: |
|
driver (neo4j.Driver): Neo4j driver instance. |
|
logger (Logger): Logger instance for tracking progress and errors. |
|
""" |
|
def __init__(self, uri, user, password): |
|
super().__init__() |
|
self.driver = GraphDatabase.driver(uri, auth=(user, password)) |
|
|
|
def close(self): |
|
"""Close the Neo4j driver.""" |
|
self.driver.close() |
|
|
|
def build_graph(self, df, batch_size=1000, custom_node_props=None, custom_edge_props=None): |
|
""" |
|
Build a Neo4j graph from the DataFrame with batch processing. |
|
|
|
Args: |
|
df (pd.DataFrame): Input DataFrame containing GDELT data. |
|
batch_size (int): Number of rows to process in each batch. |
|
custom_node_props (dict, optional): Custom properties for nodes. |
|
custom_edge_props (dict, optional): Custom properties for edges. |
|
""" |
|
self.validate_input(df) |
|
|
|
with self.driver.session() as session: |
|
batch = [] |
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props) |
|
batch.append((nodes, relationships)) |
|
if len(batch) >= batch_size: |
|
session.execute_write(self._create_graph_elements_batch, batch) |
|
batch = [] |
|
if batch: |
|
session.execute_write(self._create_graph_elements_batch, batch) |
|
|
|
self.logger.info("Neo4j graph built successfully.") |
|
|
|
def _create_graph_elements_batch(self, tx, batch): |
|
""" |
|
Create nodes and relationships in Neo4j in batches. |
|
|
|
Args: |
|
tx (neo4j.Transaction): Neo4j transaction. |
|
batch (List[Tuple[List[Dict], List[Dict]]]): Batch of nodes and relationships. |
|
""" |
|
for nodes, relationships in batch: |
|
|
|
for node in nodes: |
|
query = f""" |
|
MERGE (n:{node['type']} {{id: $id}}) |
|
SET n += $properties |
|
""" |
|
tx.run(query, id=node["id"], properties=node["properties"]) |
|
|
|
|
|
for rel in relationships: |
|
query = f""" |
|
MATCH (a {{id: $from_id}}) |
|
MATCH (b {{id: $to_id}}) |
|
MERGE (a)-[r:{rel['type']}]->(b) |
|
SET r += $properties |
|
""" |
|
tx.run(query, from_id=rel["from"], to_id=rel["to"], properties=rel["properties"]) |
|
|
|
class StLinkBuilder(GraphBuilder): |
|
""" |
|
Builder for st-link-analysis compatible graphs. |
|
|
|
Attributes: |
|
logger (Logger): Logger instance for tracking progress and errors. |
|
""" |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def build_graph(self, df, custom_node_props=None, custom_edge_props=None): |
|
""" |
|
Build graph in st-link-analysis format. |
|
|
|
Args: |
|
df (pd.DataFrame): Input DataFrame containing GDELT data. |
|
custom_node_props (dict, optional): Custom properties for nodes. |
|
custom_edge_props (dict, optional): Custom properties for edges. |
|
|
|
Returns: |
|
Dict: Graph data in st-link-analysis format. |
|
""" |
|
self.validate_input(df) |
|
all_nodes = [] |
|
all_edges = [] |
|
edge_counter = 0 |
|
added_nodes = set() |
|
|
|
try: |
|
for _, row in df.iterrows(): |
|
nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props) |
|
|
|
|
|
for node in nodes: |
|
if node["id"] not in added_nodes: |
|
stlink_node = { |
|
"data": { |
|
"id": str(node["id"]), |
|
"label": node["type"].upper(), |
|
**node["properties"] |
|
} |
|
} |
|
all_nodes.append(stlink_node) |
|
added_nodes.add(node["id"]) |
|
|
|
|
|
for rel in relationships: |
|
edge_counter += 1 |
|
stlink_edge = { |
|
"data": { |
|
"id": f"e{edge_counter}", |
|
"source": str(rel["from"]), |
|
"target": str(rel["to"]), |
|
"label": rel["type"], |
|
**rel["properties"] |
|
} |
|
} |
|
all_edges.append(stlink_edge) |
|
|
|
self.logger.info("st-link-analysis graph built successfully.") |
|
return { |
|
"nodes": all_nodes, |
|
"edges": all_edges |
|
} |
|
except Exception as e: |
|
self.logger.error(f"Error building st-link-analysis graph: {str(e)}") |
|
raise |
|
|
|
def write_json(self, graph_data, filename): |
|
""" |
|
Write graph to JSON file with streaming. |
|
|
|
Args: |
|
graph_data (Dict): Graph data in st-link-analysis format. |
|
filename (str): Output file name. |
|
""" |
|
try: |
|
with open(filename, 'w') as f: |
|
json.dump(graph_data, f, indent=2) |
|
self.logger.info(f"Graph data written to {filename} successfully.") |
|
except Exception as e: |
|
self.logger.error(f"Error writing JSON file: {str(e)}") |
|
raise |
|
|