File size: 15,440 Bytes
3bb5fb5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "N20l3SsqSVUM"
      },
      "source": [
        "## Leveraging DuckDB with HF Datasets - GDELT Global KG\n",
        "\n",
        "This notebook demonstrates how to seamlessly transform **GDELT** knowledge graph data into a coherent format that can be pushed to both **NetworkX** and **Neo4j**. It provides a **referenceable pipeline** for data professionals, researchers, and solution architects who need to:\n",
        "\n",
        "1. **Ingest and Query Data Efficiently**  \n",
        "   - Utilize **DuckDB** to load just the required portions of large Parquet datasets, enabling targeted data exploration and analysis.\n",
        "   - It also allows for iteratively honing in on a specific segment of data using splits - helping to maximize performance / cost / efficiency.\n",
        "\n",
        "2. **Maintain Consistent Graph Modeling**  \n",
        "   - Leverage a shared parsing and entity extraction layer to build consistent node and relationship structures in both an **in-memory** graph (NetworkX) and a **Neo4j** database. (not a requirement per se - but an approach I wanted to start with)\n",
        "\n",
        "3. **Run Advanced Queries and Analytics**  \n",
        "   - Illustrate critical tasks like **centrality** and **community detection** to pinpoint influential nodes and groupings, and execute **Cypher** queries for real-time insights.\n",
        "\n",
        "4. **Visualize and Export**  \n",
        "   - Produce simple web-based **PyVis** visualizations or **matplotlib** plots.\n",
        "   - more importantly the data can also be exported in **JSON** and GraphML for integration with other graph tooling. (D3.js, Cytoscape, etc.)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "DCPEB5tpfW44"
      },
      "outputs": [],
      "source": [
        "%pip install -q duckdb networkx pandas neo4j pyvis"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "A1vEyOkm7LPV"
      },
      "outputs": [],
      "source": [
        "from google.colab import userdata\n",
        "\n",
        "URI = userdata.get('NEO4J_URI')\n",
        "USER = 'neo4j'\n",
        "PASSWORD = userdata.get('NEO4J_PASSWORD')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "cm8t66uPy_C7"
      },
      "outputs": [],
      "source": [
        "import duckdb\n",
        "import networkx as nx\n",
        "from neo4j import GraphDatabase\n",
        "import logging\n",
        "from datetime import datetime\n",
        "import pandas as pd\n",
        "from pyvis.network import Network\n",
        "\n",
        "def get_gdelt_data(limit=100):\n",
        "    \"\"\"Get data from DuckDB with specified limit\"\"\"\n",
        "    con = duckdb.connect(database=':memory:')\n",
        "\n",
        "    # Create view of the dataset\n",
        "    con.execute(\"\"\"\n",
        "        CREATE VIEW train AS (\n",
        "            SELECT *\n",
        "            FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2/*.parquet')\n",
        "        );\n",
        "    \"\"\")\n",
        "\n",
        "    # Single query with limit\n",
        "    query = f\"\"\"\n",
        "    SELECT\n",
        "        GKGRECORDID,\n",
        "        DATE,\n",
        "        SourceCommonName,\n",
        "        DocumentIdentifier,\n",
        "        V2EnhancedPersons,\n",
        "        V2EnhancedOrganizations,\n",
        "        V2EnhancedLocations,\n",
        "        V2EnhancedThemes,\n",
        "        CAST(SPLIT_PART(\"V1.5Tone\", ',', 1) AS FLOAT) as tone\n",
        "    FROM train\n",
        "    LIMIT {limit}\n",
        "    \"\"\"\n",
        "\n",
        "    results_df = con.execute(query).fetchdf()\n",
        "    con.close()\n",
        "    return results_df\n",
        "\n",
        "class GraphBuilder:\n",
        "    \"\"\"Base class for building graph from GDELT data\"\"\"\n",
        "    def process_entities(self, row):\n",
        "        \"\"\"Process entities from a row and return nodes and relationships\"\"\"\n",
        "        nodes = []\n",
        "        relationships = []\n",
        "        event_id = row[\"GKGRECORDID\"]\n",
        "        event_date = row[\"DATE\"]\n",
        "        event_source = row[\"SourceCommonName\"]\n",
        "        event_document_id = row[\"DocumentIdentifier\"]\n",
        "        event_tone = float(row[\"tone\"]) if pd.notna(row[\"tone\"]) else 0.0\n",
        "\n",
        "        # Add event node\n",
        "        nodes.append({\n",
        "            \"id\": event_id,\n",
        "            \"type\": \"event\",\n",
        "            \"properties\": {\n",
        "                \"date\": event_date,\n",
        "                \"source\": event_source,\n",
        "                \"document\": event_document_id,\n",
        "                \"tone\": event_tone\n",
        "            }\n",
        "        })\n",
        "\n",
        "        # Process each entity type\n",
        "        entity_mappings = {\n",
        "            \"V2EnhancedPersons\": (\"Person\", \"MENTIONED_IN\"),\n",
        "            \"V2EnhancedOrganizations\": (\"Organization\", \"MENTIONED_IN\"),\n",
        "            \"V2EnhancedLocations\": (\"Location\", \"LOCATED_IN\"),\n",
        "            \"V2EnhancedThemes\": (\"Theme\", \"CATEGORIZED_AS\")\n",
        "        }\n",
        "\n",
        "        for field, (label, relationship) in entity_mappings.items():\n",
        "            if pd.notna(row[field]):\n",
        "                entities = [e.strip() for e in row[field].split(';') if e.strip()]\n",
        "                for entity in entities:\n",
        "                    nodes.append({\n",
        "                        \"id\": entity,\n",
        "                        \"type\": label.lower(),\n",
        "                        \"properties\": {\"name\": entity}\n",
        "                    })\n",
        "                    relationships.append({\n",
        "                        \"from\": entity,\n",
        "                        \"to\": event_id,\n",
        "                        \"type\": relationship,\n",
        "                        \"properties\": {\"created_at\": event_date}\n",
        "                    })\n",
        "\n",
        "        return nodes, relationships\n",
        "\n",
        "class NetworkXBuilder(GraphBuilder):\n",
        "    def build_graph(self, df):\n",
        "        G = nx.Graph()\n",
        "\n",
        "        for _, row in df.iterrows():\n",
        "            nodes, relationships = self.process_entities(row)\n",
        "\n",
        "            # Add nodes\n",
        "            for node in nodes:\n",
        "                G.add_node(node[\"id\"],\n",
        "                          type=node[\"type\"],\n",
        "                          **node[\"properties\"])\n",
        "\n",
        "            # Add relationships\n",
        "            for rel in relationships:\n",
        "                G.add_edge(rel[\"from\"],\n",
        "                          rel[\"to\"],\n",
        "                          relationship=rel[\"type\"],\n",
        "                          **rel[\"properties\"])\n",
        "\n",
        "        return G\n",
        "\n",
        "class Neo4jBuilder(GraphBuilder):\n",
        "    def __init__(self, uri, user, password):\n",
        "        self.driver = GraphDatabase.driver(uri, auth=(user, password))\n",
        "        self.logger = logging.getLogger(__name__)\n",
        "\n",
        "    def close(self):\n",
        "        self.driver.close()\n",
        "\n",
        "    def build_graph(self, df):\n",
        "        with self.driver.session() as session:\n",
        "            for _, row in df.iterrows():\n",
        "                nodes, relationships = self.process_entities(row)\n",
        "\n",
        "                # Create nodes and relationships in Neo4j\n",
        "                try:\n",
        "                    session.execute_write(self._create_graph_elements,\n",
        "                                        nodes, relationships)\n",
        "                except Exception as e:\n",
        "                    self.logger.error(f\"Error processing row {row['GKGRECORDID']}: {str(e)}\")\n",
        "\n",
        "    def _create_graph_elements(self, tx, nodes, relationships):\n",
        "        # Create nodes\n",
        "        for node in nodes:\n",
        "            query = f\"\"\"\n",
        "            MERGE (n:{node['type']} {{id: $id}})\n",
        "            SET n += $properties\n",
        "            \"\"\"\n",
        "            tx.run(query, id=node[\"id\"], properties=node[\"properties\"])\n",
        "\n",
        "        # Create relationships\n",
        "        for rel in relationships:\n",
        "            query = f\"\"\"\n",
        "            MATCH (a {{id: $from_id}})\n",
        "            MATCH (b {{id: $to_id}})\n",
        "            MERGE (a)-[r:{rel['type']}]->(b)\n",
        "            SET r += $properties\n",
        "            \"\"\"\n",
        "            tx.run(query,\n",
        "                  from_id=rel[\"from\"],\n",
        "                  to_id=rel[\"to\"],\n",
        "                  properties=rel[\"properties\"])"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ghbLZNLe23x1"
      },
      "outputs": [],
      "source": [
        "if __name__ == \"__main__\":\n",
        "    # Get data once\n",
        "    df = get_gdelt_data(limit=25)  # Get 25 records\n",
        "\n",
        "    # Build NetworkX graph\n",
        "    nx_builder = NetworkXBuilder()\n",
        "    G = nx_builder.build_graph(df)\n",
        "\n",
        "    # Print graph information\n",
        "    print(f\"NetworkX Graph Summary:\")\n",
        "    print(f\"Nodes: {G.number_of_nodes()}\")\n",
        "    print(f\"Edges: {G.number_of_edges()}\")\n",
        "\n",
        "    # Print node types distribution\n",
        "    node_types = {}\n",
        "    for _, attr in G.nodes(data=True):\n",
        "        node_type = attr.get('type', 'unknown')\n",
        "        node_types[node_type] = node_types.get(node_type, 0) + 1\n",
        "\n",
        "    print(\"\\nNode types distribution:\")\n",
        "    for ntype, count in node_types.items():\n",
        "        print(f\"{ntype}: {count}\")\n",
        "\n",
        "    # Build Neo4j graph\n",
        "    neo4j_builder = Neo4jBuilder(URI, USER, PASSWORD)\n",
        "    try:\n",
        "        neo4j_builder.build_graph(df)\n",
        "    finally:\n",
        "        neo4j_builder.close()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "mkJKz_soTsAY"
      },
      "outputs": [],
      "source": [
        "# run cypher query for validation\n",
        "\n",
        "from neo4j import GraphDatabase\n",
        "\n",
        "class Neo4jQuery:\n",
        "    def __init__(self, uri, user, password):\n",
        "        self.driver = GraphDatabase.driver(uri, auth=(user, password))\n",
        "        self.logger = logging.getLogger(__name__)\n",
        "\n",
        "    def close(self):\n",
        "        self.driver.close()\n",
        "\n",
        "    def run_query(self, query):\n",
        "        with self.driver.session() as session:\n",
        "            result = session.run(query)\n",
        "            return result.data()\n",
        "\n",
        "query_1 = \"\"\"\n",
        "// Count nodes by type\n",
        "MATCH (n)\n",
        "RETURN labels(n) as type, count(*) as count\n",
        "ORDER BY count DESC;\n",
        "\"\"\"\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "mrlWADO93ize"
      },
      "outputs": [],
      "source": [
        "def visualize_graph(G, output_file='gdelt_network.html'):\n",
        "    \"\"\"Visualize NetworkX graph using Pyvis\"\"\"\n",
        "    # Create Pyvis network\n",
        "    net = Network(notebook=True,\n",
        "                  height='750px',\n",
        "                  width='100%',\n",
        "                  bgcolor='#ffffff',\n",
        "                  font_color='#000000')\n",
        "\n",
        "    # Configure physics\n",
        "    net.force_atlas_2based(gravity=-50,\n",
        "                          central_gravity=0.01,\n",
        "                          spring_length=100,\n",
        "                          spring_strength=0.08,\n",
        "                          damping=0.4,\n",
        "                          overlap=0)\n",
        "\n",
        "    # Color mapping for node types\n",
        "    color_map = {\n",
        "        'event': '#1f77b4',     # Blue\n",
        "        'person': '#00ff00',    # Green\n",
        "        'organization': '#ffa500', # Orange\n",
        "        'location': '#ff0000',  # Red\n",
        "        'theme': '#800080'      # Purple\n",
        "    }\n",
        "\n",
        "    # Add nodes\n",
        "    for node, attr in G.nodes(data=True):\n",
        "        node_type = attr.get('type', 'unknown')\n",
        "        title = f\"Type: {node_type}\\n\"\n",
        "        for k, v in attr.items():\n",
        "            if k != 'type':\n",
        "                title += f\"{k}: {v}\\n\"\n",
        "\n",
        "        net.add_node(node,\n",
        "                    title=title,\n",
        "                    label=str(node)[:20] + '...' if len(str(node)) > 20 else str(node),\n",
        "                    color=color_map.get(node_type, '#gray'),\n",
        "                    size=20 if node_type == 'event' else 15)\n",
        "\n",
        "    # Add edges\n",
        "    for source, target, attr in G.edges(data=True):\n",
        "        net.add_edge(source,\n",
        "                    target,\n",
        "                    title=f\"{attr.get('relationship', '')}\\nDate: {attr.get('created_at', '')}\",\n",
        "                    color='#666666')\n",
        "\n",
        "    # Save visualization\n",
        "    net.show(output_file)\n",
        "    return f\"Graph visualization saved to {output_file}\"\n",
        "\n",
        "# Usage example:\n",
        "if __name__ == \"__main__\":\n",
        "    visualize_graph(G)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "RqFRO1atnIIT"
      },
      "outputs": [],
      "source": [
        "!pip show duckdb"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "95ML8u0LnKif"
      },
      "outputs": [],
      "source": []
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}