## Leveraging DuckDB with HF Datasets - GDELT Global KG

This notebook demonstrates how to seamlessly transform **GDELT** knowledge graph data into a coherent format that can be pushed to both **NetworkX** and **Neo4j**. It provides a **referenceable pipeline** for data professionals, researchers, and solution architects who need to:

1. **Ingest and Query Data Efficiently** 
 - Utilize **DuckDB** to load just the required portions of large Parquet datasets, enabling targeted data exploration and analysis.
 - It also allows for iteratively honing in on a specific segment of data using splits - helping to maximize performance / cost / efficiency.

2. **Maintain Consistent Graph Modeling** 
 - Leverage a shared parsing and entity extraction layer to build consistent node and relationship structures in both an **in-memory** graph (NetworkX) and a **Neo4j** database. (not a requirement per se - but an approach I wanted to start with)

3. **Run Advanced Queries and Analytics** 
 - Illustrate critical tasks like **centrality** and **community detection** to pinpoint influential nodes and groupings, and execute **Cypher** queries for real-time insights.

4. **Visualize and Export** 
 - Produce simple web-based **PyVis** visualizations or **matplotlib** plots.
 - more importantly the data can also be exported in **JSON** and GraphML for integration with other graph tooling. (D3.js, Cytoscape, etc.)

In [None]:
%pip install -q duckdb networkx pandas neo4j pyvis

In [None]:
from google.colab import userdata

URI = userdata.get('NEO4J_URI')
USER = 'neo4j'
PASSWORD = userdata.get('NEO4J_PASSWORD')

In [None]:
import duckdb
import networkx as nx
from neo4j import GraphDatabase
import logging
from datetime import datetime
import pandas as pd
from pyvis.network import Network

def get_gdelt_data(limit=100):
 """Get data from DuckDB with specified limit"""
 con = duckdb.connect(database=':memory:')

 # Create view of the dataset
 con.execute("""
 CREATE VIEW train AS (
 SELECT *
 FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2/*.parquet')
 );
 """)

 # Single query with limit
 query = f"""
 SELECT
 GKGRECORDID,
 DATE,
 SourceCommonName,
 DocumentIdentifier,
 V2EnhancedPersons,
 V2EnhancedOrganizations,
 V2EnhancedLocations,
 V2EnhancedThemes,
 CAST(SPLIT_PART("V1.5Tone", ',', 1) AS FLOAT) as tone
 FROM train
 LIMIT {limit}
 """

 results_df = con.execute(query).fetchdf()
 con.close()
 return results_df

class GraphBuilder:
 """Base class for building graph from GDELT data"""
 def process_entities(self, row):
 """Process entities from a row and return nodes and relationships"""
 nodes = []
 relationships = []
 event_id = row["GKGRECORDID"]
 event_date = row["DATE"]
 event_source = row["SourceCommonName"]
 event_document_id = row["DocumentIdentifier"]
 event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0

 # Add event node
 nodes.append({
 "id": event_id,
 "type": "event",
 "properties": {
 "date": event_date,
 "source": event_source,
 "document": event_document_id,
 "tone": event_tone
 }
 })

 # Process each entity type
 entity_mappings = {
 "V2EnhancedPersons": ("Person", "MENTIONED_IN"),
 "V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
 "V2EnhancedLocations": ("Location", "LOCATED_IN"),
 "V2EnhancedThemes": ("Theme", "CATEGORIZED_AS")
 }

 for field, (label, relationship) in entity_mappings.items():
 if pd.notna(row[field]):
 entities = [e.strip() for e in row[field].split(';') if e.strip()]
 for entity in entities:
 nodes.append({
 "id": entity,
 "type": label.lower(),
 "properties": {"name": entity}
 })
 relationships.append({
 "from": entity,
 "to": event_id,
 "type": relationship,
 "properties": {"created_at": event_date}
 })

 return nodes, relationships

class NetworkXBuilder(GraphBuilder):
 def build_graph(self, df):
 G = nx.Graph()

 for _, row in df.iterrows():
 nodes, relationships = self.process_entities(row)

 # Add nodes
 for node in nodes:
 G.add_node(node["id"],
 type=node["type"],
 **node["properties"])

 # Add relationships
 for rel in relationships:
 G.add_edge(rel["from"],
 rel["to"],
 relationship=rel["type"],
 **rel["properties"])

 return G

class Neo4jBuilder(GraphBuilder):
 def __init__(self, uri, user, password):
 self.driver = GraphDatabase.driver(uri, auth=(user, password))
 self.logger = logging.getLogger(__name__)

 def close(self):
 self.driver.close()

 def build_graph(self, df):
 with self.driver.session() as session:
 for _, row in df.iterrows():
 nodes, relationships = self.process_entities(row)

 # Create nodes and relationships in Neo4j
 try:
 session.execute_write(self._create_graph_elements,
 nodes, relationships)
 except Exception as e:
 self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}")

 def _create_graph_elements(self, tx, nodes, relationships):
 # Create nodes
 for node in nodes:
 query = f"""
 MERGE (n:{node['type']} {{id: $id}})
 SET n += $properties
 """
 tx.run(query, id=node["id"], properties=node["properties"])

 # Create relationships
 for rel in relationships:
 query = f"""
 MATCH (a {{id: $from_id}})
 MATCH (b {{id: $to_id}})
 MERGE (a)-[r:{rel['type']}]->(b)
 SET r += $properties
 """
 tx.run(query,
 from_id=rel["from"],
 to_id=rel["to"],
 properties=rel["properties"])

In [None]:
if __name__ == "__main__":
 # Get data once
 df = get_gdelt_data(limit=25) # Get 25 records

 # Build NetworkX graph
 nx_builder = NetworkXBuilder()
 G = nx_builder.build_graph(df)

 # Print graph information
 print(f"NetworkX Graph Summary:")
 print(f"Nodes: {G.number_of_nodes()}")
 print(f"Edges: {G.number_of_edges()}")

 # Print node types distribution
 node_types = {}
 for _, attr in G.nodes(data=True):
 node_type = attr.get('type', 'unknown')
 node_types[node_type] = node_types.get(node_type, 0) + 1

 print("\nNode types distribution:")
 for ntype, count in node_types.items():
 print(f"{ntype}: {count}")

 # Build Neo4j graph
 neo4j_builder = Neo4jBuilder(URI, USER, PASSWORD)
 try:
 neo4j_builder.build_graph(df)
 finally:
 neo4j_builder.close()

In [None]:
# run cypher query for validation

from neo4j import GraphDatabase

class Neo4jQuery:
 def __init__(self, uri, user, password):
 self.driver = GraphDatabase.driver(uri, auth=(user, password))
 self.logger = logging.getLogger(__name__)

 def close(self):
 self.driver.close()

 def run_query(self, query):
 with self.driver.session() as session:
 result = session.run(query)
 return result.data()

query_1 = """
// Count nodes by type
MATCH (n)
RETURN labels(n) as type, count(*) as count
ORDER BY count DESC;
"""


In [None]:
def visualize_graph(G, output_file='gdelt_network.html'):
 """Visualize NetworkX graph using Pyvis"""
 # Create Pyvis network
 net = Network(notebook=True,
 height='750px',
 width='100%',
 bgcolor='#ffffff',
 font_color='#000000')

 # Configure physics
 net.force_atlas_2based(gravity=-50,
 central_gravity=0.01,
 spring_length=100,
 spring_strength=0.08,
 damping=0.4,
 overlap=0)

 # Color mapping for node types
 color_map = {
 'event': '#1f77b4', # Blue
 'person': '#00ff00', # Green
 'organization': '#ffa500', # Orange
 'location': '#ff0000', # Red
 'theme': '#800080' # Purple
 }

 # Add nodes
 for node, attr in G.nodes(data=True):
 node_type = attr.get('type', 'unknown')
 title = f"Type: {node_type}\n"
 for k, v in attr.items():
 if k != 'type':
 title += f"{k}: {v}\n"

 net.add_node(node,
 title=title,
 label=str(node)[:20] + '...' if len(str(node)) > 20 else str(node),
 color=color_map.get(node_type, '#gray'),
 size=20 if node_type == 'event' else 15)

 # Add edges
 for source, target, attr in G.edges(data=True):
 net.add_edge(source,
 target,
 title=f"{attr.get('relationship', '')}\nDate: {attr.get('created_at', '')}",
 color='#666666')

 # Save visualization
 net.show(output_file)
 return f"Graph visualization saved to {output_file}"

# Usage example:
if __name__ == "__main__":
 visualize_graph(G)

In [None]:
!pip show duckdb