{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "N20l3SsqSVUM" }, "source": [ "## Leveraging DuckDB with HF Datasets - GDELT Global KG\n", "\n", "This notebook demonstrates how to seamlessly transform **GDELT** knowledge graph data into a coherent format that can be pushed to both **NetworkX** and **Neo4j**. It provides a **referenceable pipeline** for data professionals, researchers, and solution architects who need to:\n", "\n", "1. **Ingest and Query Data Efficiently** \n", " - Utilize **DuckDB** to load just the required portions of large Parquet datasets, enabling targeted data exploration and analysis.\n", " - It also allows for iteratively honing in on a specific segment of data using splits - helping to maximize performance / cost / efficiency.\n", "\n", "2. **Maintain Consistent Graph Modeling** \n", " - Leverage a shared parsing and entity extraction layer to build consistent node and relationship structures in both an **in-memory** graph (NetworkX) and a **Neo4j** database. (not a requirement per se - but an approach I wanted to start with)\n", "\n", "3. **Run Advanced Queries and Analytics** \n", " - Illustrate critical tasks like **centrality** and **community detection** to pinpoint influential nodes and groupings, and execute **Cypher** queries for real-time insights.\n", "\n", "4. **Visualize and Export** \n", " - Produce simple web-based **PyVis** visualizations or **matplotlib** plots.\n", " - more importantly the data can also be exported in **JSON** and GraphML for integration with other graph tooling. (D3.js, Cytoscape, etc.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DCPEB5tpfW44" }, "outputs": [], "source": [ "%pip install -q duckdb networkx pandas neo4j pyvis" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "A1vEyOkm7LPV" }, "outputs": [], "source": [ "from google.colab import userdata\n", "\n", "URI = userdata.get('NEO4J_URI')\n", "USER = 'neo4j'\n", "PASSWORD = userdata.get('NEO4J_PASSWORD')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cm8t66uPy_C7" }, "outputs": [], "source": [ "import duckdb\n", "import networkx as nx\n", "from neo4j import GraphDatabase\n", "import logging\n", "from datetime import datetime\n", "import pandas as pd\n", "from pyvis.network import Network\n", "\n", "def get_gdelt_data(limit=100):\n", " \"\"\"Get data from DuckDB with specified limit\"\"\"\n", " con = duckdb.connect(database=':memory:')\n", "\n", " # Create view of the dataset\n", " con.execute(\"\"\"\n", " CREATE VIEW train AS (\n", " SELECT *\n", " FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2/*.parquet')\n", " );\n", " \"\"\")\n", "\n", " # Single query with limit\n", " query = f\"\"\"\n", " SELECT\n", " GKGRECORDID,\n", " DATE,\n", " SourceCommonName,\n", " DocumentIdentifier,\n", " V2EnhancedPersons,\n", " V2EnhancedOrganizations,\n", " V2EnhancedLocations,\n", " V2EnhancedThemes,\n", " CAST(SPLIT_PART(\"V1.5Tone\", ',', 1) AS FLOAT) as tone\n", " FROM train\n", " LIMIT {limit}\n", " \"\"\"\n", "\n", " results_df = con.execute(query).fetchdf()\n", " con.close()\n", " return results_df\n", "\n", "class GraphBuilder:\n", " \"\"\"Base class for building graph from GDELT data\"\"\"\n", " def process_entities(self, row):\n", " \"\"\"Process entities from a row and return nodes and relationships\"\"\"\n", " nodes = []\n", " relationships = []\n", " event_id = row[\"GKGRECORDID\"]\n", " event_date = row[\"DATE\"]\n", " event_source = row[\"SourceCommonName\"]\n", " event_document_id = row[\"DocumentIdentifier\"]\n", " event_tone = float(row[\"tone\"]) if pd.notna(row[\"tone\"]) else 0.0\n", "\n", " # Add event node\n", " nodes.append({\n", " \"id\": event_id,\n", " \"type\": \"event\",\n", " \"properties\": {\n", " \"date\": event_date,\n", " \"source\": event_source,\n", " \"document\": event_document_id,\n", " \"tone\": event_tone\n", " }\n", " })\n", "\n", " # Process each entity type\n", " entity_mappings = {\n", " \"V2EnhancedPersons\": (\"Person\", \"MENTIONED_IN\"),\n", " \"V2EnhancedOrganizations\": (\"Organization\", \"MENTIONED_IN\"),\n", " \"V2EnhancedLocations\": (\"Location\", \"LOCATED_IN\"),\n", " \"V2EnhancedThemes\": (\"Theme\", \"CATEGORIZED_AS\")\n", " }\n", "\n", " for field, (label, relationship) in entity_mappings.items():\n", " if pd.notna(row[field]):\n", " entities = [e.strip() for e in row[field].split(';') if e.strip()]\n", " for entity in entities:\n", " nodes.append({\n", " \"id\": entity,\n", " \"type\": label.lower(),\n", " \"properties\": {\"name\": entity}\n", " })\n", " relationships.append({\n", " \"from\": entity,\n", " \"to\": event_id,\n", " \"type\": relationship,\n", " \"properties\": {\"created_at\": event_date}\n", " })\n", "\n", " return nodes, relationships\n", "\n", "class NetworkXBuilder(GraphBuilder):\n", " def build_graph(self, df):\n", " G = nx.Graph()\n", "\n", " for _, row in df.iterrows():\n", " nodes, relationships = self.process_entities(row)\n", "\n", " # Add nodes\n", " for node in nodes:\n", " G.add_node(node[\"id\"],\n", " type=node[\"type\"],\n", " **node[\"properties\"])\n", "\n", " # Add relationships\n", " for rel in relationships:\n", " G.add_edge(rel[\"from\"],\n", " rel[\"to\"],\n", " relationship=rel[\"type\"],\n", " **rel[\"properties\"])\n", "\n", " return G\n", "\n", "class Neo4jBuilder(GraphBuilder):\n", " def __init__(self, uri, user, password):\n", " self.driver = GraphDatabase.driver(uri, auth=(user, password))\n", " self.logger = logging.getLogger(__name__)\n", "\n", " def close(self):\n", " self.driver.close()\n", "\n", " def build_graph(self, df):\n", " with self.driver.session() as session:\n", " for _, row in df.iterrows():\n", " nodes, relationships = self.process_entities(row)\n", "\n", " # Create nodes and relationships in Neo4j\n", " try:\n", " session.execute_write(self._create_graph_elements,\n", " nodes, relationships)\n", " except Exception as e:\n", " self.logger.error(f\"Error processing row {row['GKGRECORDID']}: {str(e)}\")\n", "\n", " def _create_graph_elements(self, tx, nodes, relationships):\n", " # Create nodes\n", " for node in nodes:\n", " query = f\"\"\"\n", " MERGE (n:{node['type']} {{id: $id}})\n", " SET n += $properties\n", " \"\"\"\n", " tx.run(query, id=node[\"id\"], properties=node[\"properties\"])\n", "\n", " # Create relationships\n", " for rel in relationships:\n", " query = f\"\"\"\n", " MATCH (a {{id: $from_id}})\n", " MATCH (b {{id: $to_id}})\n", " MERGE (a)-[r:{rel['type']}]->(b)\n", " SET r += $properties\n", " \"\"\"\n", " tx.run(query,\n", " from_id=rel[\"from\"],\n", " to_id=rel[\"to\"],\n", " properties=rel[\"properties\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ghbLZNLe23x1" }, "outputs": [], "source": [ "if __name__ == \"__main__\":\n", " # Get data once\n", " df = get_gdelt_data(limit=25) # Get 25 records\n", "\n", " # Build NetworkX graph\n", " nx_builder = NetworkXBuilder()\n", " G = nx_builder.build_graph(df)\n", "\n", " # Print graph information\n", " print(f\"NetworkX Graph Summary:\")\n", " print(f\"Nodes: {G.number_of_nodes()}\")\n", " print(f\"Edges: {G.number_of_edges()}\")\n", "\n", " # Print node types distribution\n", " node_types = {}\n", " for _, attr in G.nodes(data=True):\n", " node_type = attr.get('type', 'unknown')\n", " node_types[node_type] = node_types.get(node_type, 0) + 1\n", "\n", " print(\"\\nNode types distribution:\")\n", " for ntype, count in node_types.items():\n", " print(f\"{ntype}: {count}\")\n", "\n", " # Build Neo4j graph\n", " neo4j_builder = Neo4jBuilder(URI, USER, PASSWORD)\n", " try:\n", " neo4j_builder.build_graph(df)\n", " finally:\n", " neo4j_builder.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "mkJKz_soTsAY" }, "outputs": [], "source": [ "# run cypher query for validation\n", "\n", "from neo4j import GraphDatabase\n", "\n", "class Neo4jQuery:\n", " def __init__(self, uri, user, password):\n", " self.driver = GraphDatabase.driver(uri, auth=(user, password))\n", " self.logger = logging.getLogger(__name__)\n", "\n", " def close(self):\n", " self.driver.close()\n", "\n", " def run_query(self, query):\n", " with self.driver.session() as session:\n", " result = session.run(query)\n", " return result.data()\n", "\n", "query_1 = \"\"\"\n", "// Count nodes by type\n", "MATCH (n)\n", "RETURN labels(n) as type, count(*) as count\n", "ORDER BY count DESC;\n", "\"\"\"\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "mrlWADO93ize" }, "outputs": [], "source": [ "def visualize_graph(G, output_file='gdelt_network.html'):\n", " \"\"\"Visualize NetworkX graph using Pyvis\"\"\"\n", " # Create Pyvis network\n", " net = Network(notebook=True,\n", " height='750px',\n", " width='100%',\n", " bgcolor='#ffffff',\n", " font_color='#000000')\n", "\n", " # Configure physics\n", " net.force_atlas_2based(gravity=-50,\n", " central_gravity=0.01,\n", " spring_length=100,\n", " spring_strength=0.08,\n", " damping=0.4,\n", " overlap=0)\n", "\n", " # Color mapping for node types\n", " color_map = {\n", " 'event': '#1f77b4', # Blue\n", " 'person': '#00ff00', # Green\n", " 'organization': '#ffa500', # Orange\n", " 'location': '#ff0000', # Red\n", " 'theme': '#800080' # Purple\n", " }\n", "\n", " # Add nodes\n", " for node, attr in G.nodes(data=True):\n", " node_type = attr.get('type', 'unknown')\n", " title = f\"Type: {node_type}\\n\"\n", " for k, v in attr.items():\n", " if k != 'type':\n", " title += f\"{k}: {v}\\n\"\n", "\n", " net.add_node(node,\n", " title=title,\n", " label=str(node)[:20] + '...' if len(str(node)) > 20 else str(node),\n", " color=color_map.get(node_type, '#gray'),\n", " size=20 if node_type == 'event' else 15)\n", "\n", " # Add edges\n", " for source, target, attr in G.edges(data=True):\n", " net.add_edge(source,\n", " target,\n", " title=f\"{attr.get('relationship', '')}\\nDate: {attr.get('created_at', '')}\",\n", " color='#666666')\n", "\n", " # Save visualization\n", " net.show(output_file)\n", " return f\"Graph visualization saved to {output_file}\"\n", "\n", "# Usage example:\n", "if __name__ == \"__main__\":\n", " visualize_graph(G)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RqFRO1atnIIT" }, "outputs": [], "source": [ "!pip show duckdb" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "95ML8u0LnKif" }, "outputs": [], "source": [] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }