dwb2023 commited on
Commit
78d0252
·
1 Parent(s): aafdc15

update graph builder and covid analysis.py

Browse files
graph_builder.py CHANGED
@@ -1,50 +1,97 @@
1
  """
2
- Graph builder module for converting GDELT data to graph formats
 
 
 
 
3
  """
4
  import pandas as pd
5
  import networkx as nx
 
6
  import json
 
 
 
 
 
 
7
 
8
  class GraphBuilder:
9
- """Base class for building graph from GDELT data"""
10
- def process_entities(self, row):
11
- """Process entities from a row and return nodes and relationships"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  nodes = []
13
  relationships = []
14
  event_id = row["GKGRECORDID"]
15
  event_date = row["DATE"]
16
  event_source = row["SourceCommonName"]
17
  event_document_id = row["DocumentIdentifier"]
18
- # event_image = row["V2.1SharingImage"] if pd.notna(row["V2.1SharingImage"]) else ""
19
  event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else ""
20
  event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0
21
 
22
- # Add event node
 
 
 
 
 
 
 
 
 
 
23
  nodes.append({
24
  "id": event_id,
25
  "type": "event",
26
- "properties": {
27
- "date": event_date,
28
- "source": event_source,
29
- "document": event_document_id,
30
- # "image": event_image,
31
- "quotations": event_quotations,
32
- "tone": event_tone
33
- }
34
  })
35
 
36
  # Process each entity type
37
- entity_mappings = {
38
- "V2EnhancedPersons": ("Person", "MENTIONED_IN"),
39
- "V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
40
- "V2EnhancedLocations": ("Location", "LOCATED_IN"),
41
- "V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"),
42
- "V2.1AllNames": ("Name", "MENTIONED_IN"),
43
- "V2.1Counts": ("Count", "MENTIONED_IN"),
44
- "V2.1Amounts": ("Amount", "MENTIONED_IN"),
45
- }
46
-
47
- for field, (label, relationship) in entity_mappings.items():
48
  if pd.notna(row[field]):
49
  entities = [e.strip() for e in row[field].split(';') if e.strip()]
50
  for entity in entities:
@@ -53,168 +100,230 @@ class GraphBuilder:
53
  "type": label.lower(),
54
  "properties": {"name": entity}
55
  })
 
 
 
56
  relationships.append({
57
  "from": entity,
58
  "to": event_id,
59
  "type": relationship,
60
- "properties": {"created_at": event_date}
61
  })
62
 
63
  return nodes, relationships
64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  class NetworkXBuilder(GraphBuilder):
66
- """Builder for NetworkX graphs"""
67
- def build_graph(self, df):
68
- G = nx.Graph()
69
 
70
- for _, row in df.iterrows():
71
- nodes, relationships = self.process_entities(row)
 
 
 
 
72
 
73
- # Add nodes
74
- for node in nodes:
75
- G.add_node(node["id"],
76
- type=node["type"],
77
- **node["properties"])
78
 
79
- # Add relationships
80
- for rel in relationships:
81
- G.add_edge(rel["from"],
82
- rel["to"],
83
- relationship=rel["type"],
84
- **rel["properties"])
85
 
86
- return G
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
  class Neo4jBuilder(GraphBuilder):
 
 
 
 
 
 
 
89
  def __init__(self, uri, user, password):
 
90
  self.driver = GraphDatabase.driver(uri, auth=(user, password))
91
- self.logger = logging.getLogger(__name__)
92
 
93
  def close(self):
 
94
  self.driver.close()
95
 
96
- def build_graph(self, df):
 
 
 
 
 
 
 
 
 
 
 
97
  with self.driver.session() as session:
 
98
  for _, row in df.iterrows():
99
- nodes, relationships = self.process_entities(row)
100
-
101
- # Create nodes and relationships in Neo4j
102
- try:
103
- session.execute_write(self._create_graph_elements,
104
- nodes, relationships)
105
- except Exception as e:
106
- self.logger.error(f"Error processing row {row['GKGRECORDID']}: {str(e)}")
107
-
108
- def _create_graph_elements(self, tx, nodes, relationships):
109
- # Create nodes
110
- for node in nodes:
111
- query = f"""
112
- MERGE (n:{node['type']} {{id: $id}})
113
- SET n += $properties
114
- """
115
- tx.run(query, id=node["id"], properties=node["properties"])
116
-
117
- # Create relationships
118
- for rel in relationships:
119
- query = f"""
120
- MATCH (a {{id: $from_id}})
121
- MATCH (b {{id: $to_id}})
122
- MERGE (a)-[r:{rel['type']}]->(b)
123
- SET r += $properties
124
- """
125
- tx.run(query,
126
- from_id=rel["from"],
127
- to_id=rel["to"],
128
- properties=rel["properties"])
129
-
130
- class StreamlitGraphBuilder:
131
- """Adapted graph builder for Streamlit visualization"""
132
- def __init__(self):
133
- self.G = nx.Graph()
134
-
135
- def process_row(self, row):
136
- """Process a single row of data"""
137
- event_id = row["GKGRECORDID"]
138
- event_props = {
139
- "type": "event", # already in lowercase
140
- "date": row["DATE"],
141
- "source": row["SourceCommonName"],
142
- "document": row["DocumentIdentifier"],
143
- "tone": row["tone"],
144
- # Store display name in its original format if needed.
145
- "name": row["SourceCommonName"]
146
- }
147
-
148
- self.G.add_node(event_id, **event_props)
149
-
150
- # Use lowercase node types for consistency in lookups.
151
- entity_types = {
152
- "V2EnhancedPersons": ("person", "MENTIONED_IN"),
153
- "V2EnhancedOrganizations": ("organization", "MENTIONED_IN"),
154
- "V2EnhancedLocations": ("location", "LOCATED_IN"),
155
- "V2EnhancedThemes": ("theme", "CATEGORIZED_AS"),
156
- "V2.1AllNames": ("name", "MENTIONED_IN"),
157
- "V2.1Counts": ("count", "MENTIONED_IN"),
158
- "V2.1Amounts": ("amount", "MENTIONED_IN"),
159
- }
160
-
161
- for col, (node_type, rel_type) in entity_types.items():
162
- if pd.notna(row[col]):
163
- # The actual display value (which may be in Parent Case) is preserved in the "name" attribute.
164
- entities = [e.strip() for e in row[col].split(';') if e.strip()]
165
- for entity in entities:
166
- self.G.add_node(entity, type=node_type, name=entity)
167
- self.G.add_edge(entity, event_id,
168
- relationship=rel_type,
169
- date=row["DATE"])
170
 
171
  class StLinkBuilder(GraphBuilder):
172
- """Builder for st-link-analysis compatible graphs"""
173
- def build_graph(self, df):
174
- """Build graph in st-link-analysis format"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  all_nodes = []
176
  all_edges = []
177
  edge_counter = 0
178
-
179
- # Track nodes we've already added to avoid duplicates
180
  added_nodes = set()
181
-
182
- for _, row in df.iterrows():
183
- nodes, relationships = self.process_entities(row)
184
-
185
- # Process nodes
186
- for node in nodes:
187
- if node["id"] not in added_nodes:
188
- stlink_node = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
  "data": {
190
- "id": str(node["id"]),
191
- "label": node["type"].upper(),
192
- **node["properties"]
 
 
193
  }
194
  }
195
- all_nodes.append(stlink_node)
196
- added_nodes.add(node["id"])
197
-
198
- # Process relationships/edges
199
- for rel in relationships:
200
- edge_counter += 1
201
- stlink_edge = {
202
- "data": {
203
- "id": f"e{edge_counter}",
204
- "source": str(rel["from"]),
205
- "target": str(rel["to"]),
206
- "label": rel["type"],
207
- **rel["properties"]
208
- }
209
- }
210
- all_edges.append(stlink_edge)
211
-
212
- return {
213
- "nodes": all_nodes,
214
- "edges": all_edges
215
- }
216
 
217
  def write_json(self, graph_data, filename):
218
- """Write graph to JSON file"""
219
- with open(filename, 'w') as f:
220
- json.dump(graph_data, f, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Graph builder module for converting GDELT data to graph formats.
3
+
4
+ This module provides classes for building graphs from GDELT data in various formats,
5
+ including NetworkX, Neo4j, and st-link-analysis. It supports batch processing,
6
+ input validation, custom properties, and logging for robust and efficient graph construction.
7
  """
8
  import pandas as pd
9
  import networkx as nx
10
+ import logging
11
  import json
12
+ from neo4j import GraphDatabase
13
+ from typing import Dict, List, Optional, Set, Tuple
14
+
15
+ # Configure logging
16
+ logging.basicConfig(level=logging.INFO)
17
+ logger = logging.getLogger(__name__)
18
 
19
  class GraphBuilder:
20
+ """
21
+ Base class for building graphs from GDELT data.
22
+
23
+ Attributes:
24
+ ENTITY_MAPPINGS (dict): Mapping of GDELT fields to node types and relationships.
25
+ logger (Logger): Logger instance for tracking progress and errors.
26
+ """
27
+ ENTITY_MAPPINGS = {
28
+ "V2EnhancedPersons": ("Person", "MENTIONED_IN"),
29
+ "V2EnhancedOrganizations": ("Organization", "MENTIONED_IN"),
30
+ "V2EnhancedLocations": ("Location", "LOCATED_IN"),
31
+ "V2EnhancedThemes": ("Theme", "CATEGORIZED_AS"),
32
+ "V2.1AllNames": ("Name", "MENTIONED_IN"),
33
+ "V2.1Counts": ("Count", "MENTIONED_IN"),
34
+ "V2.1Amounts": ("Amount", "MENTIONED_IN"),
35
+ }
36
+
37
+ def __init__(self):
38
+ self.logger = logger
39
+
40
+ def validate_input(self, df):
41
+ """
42
+ Validate input DataFrame for required columns.
43
+
44
+ Args:
45
+ df (pd.DataFrame): Input DataFrame containing GDELT data.
46
+
47
+ Raises:
48
+ ValueError: If any required column is missing.
49
+ """
50
+ required_columns = ["GKGRECORDID", "DATE", "SourceCommonName", "DocumentIdentifier"]
51
+ for col in required_columns:
52
+ if col not in df.columns:
53
+ raise ValueError(f"Missing required column: {col}")
54
+
55
+ def process_entities(self, row, custom_node_props=None, custom_edge_props=None):
56
+ """
57
+ Process entities from a row and return nodes and relationships.
58
+
59
+ Args:
60
+ row (pd.Series): A row of GDELT data.
61
+ custom_node_props (dict, optional): Custom properties for nodes.
62
+ custom_edge_props (dict, optional): Custom properties for edges.
63
+
64
+ Returns:
65
+ Tuple[List[Dict], List[Dict]]: Lists of nodes and relationships.
66
+ """
67
  nodes = []
68
  relationships = []
69
  event_id = row["GKGRECORDID"]
70
  event_date = row["DATE"]
71
  event_source = row["SourceCommonName"]
72
  event_document_id = row["DocumentIdentifier"]
 
73
  event_quotations = row["V2.1Quotations"] if pd.notna(row["V2.1Quotations"]) else ""
74
  event_tone = float(row["tone"]) if pd.notna(row["tone"]) else 0.0
75
 
76
+ # Add event node with custom properties
77
+ event_props = {
78
+ "date": event_date,
79
+ "source": event_source,
80
+ "document": event_document_id,
81
+ "quotations": event_quotations,
82
+ "tone": event_tone
83
+ }
84
+ if custom_node_props:
85
+ event_props.update(custom_node_props)
86
+
87
  nodes.append({
88
  "id": event_id,
89
  "type": "event",
90
+ "properties": event_props
 
 
 
 
 
 
 
91
  })
92
 
93
  # Process each entity type
94
+ for field, (label, relationship) in self.ENTITY_MAPPINGS.items():
 
 
 
 
 
 
 
 
 
 
95
  if pd.notna(row[field]):
96
  entities = [e.strip() for e in row[field].split(';') if e.strip()]
97
  for entity in entities:
 
100
  "type": label.lower(),
101
  "properties": {"name": entity}
102
  })
103
+ edge_props = {"created_at": event_date}
104
+ if custom_edge_props:
105
+ edge_props.update(custom_edge_props)
106
  relationships.append({
107
  "from": entity,
108
  "to": event_id,
109
  "type": relationship,
110
+ "properties": edge_props
111
  })
112
 
113
  return nodes, relationships
114
 
115
+ def validate_graph(self, G):
116
+ """
117
+ Validate the graph for consistency.
118
+
119
+ Args:
120
+ G (nx.Graph): The graph to validate.
121
+
122
+ Raises:
123
+ ValueError: If the graph is invalid.
124
+ """
125
+ if not isinstance(G, nx.Graph):
126
+ raise ValueError("Input is not a valid NetworkX graph.")
127
+ if len(G.nodes) == 0:
128
+ raise ValueError("Graph has no nodes.")
129
+ if len(G.edges) == 0:
130
+ raise ValueError("Graph has no edges.")
131
+ self.logger.info("Graph validation passed.")
132
+
133
  class NetworkXBuilder(GraphBuilder):
134
+ """
135
+ Builder for NetworkX graphs.
 
136
 
137
+ Attributes:
138
+ directed (bool): Whether to create a directed graph.
139
+ """
140
+ def __init__(self, directed=False):
141
+ super().__init__()
142
+ self.directed = directed
143
 
144
+ def build_graph(self, df, custom_node_props=None, custom_edge_props=None):
145
+ """
146
+ Build a NetworkX graph from the DataFrame.
 
 
147
 
148
+ Args:
149
+ df (pd.DataFrame): Input DataFrame containing GDELT data.
150
+ custom_node_props (dict, optional): Custom properties for nodes.
151
+ custom_edge_props (dict, optional): Custom properties for edges.
 
 
152
 
153
+ Returns:
154
+ nx.Graph: The constructed graph.
155
+ """
156
+ self.validate_input(df)
157
+ G = nx.DiGraph() if self.directed else nx.Graph()
158
+
159
+ try:
160
+ for _, row in df.iterrows():
161
+ nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props)
162
+
163
+ # Add nodes
164
+ for node in nodes:
165
+ G.add_node(node["id"], type=node["type"], **node["properties"])
166
+
167
+ # Add relationships
168
+ for rel in relationships:
169
+ G.add_edge(rel["from"], rel["to"], relationship=rel["type"], **rel["properties"])
170
+
171
+ self.validate_graph(G)
172
+ self.logger.info("NetworkX graph built successfully.")
173
+ return G
174
+ except Exception as e:
175
+ self.logger.error(f"Error building NetworkX graph: {str(e)}")
176
+ raise
177
 
178
  class Neo4jBuilder(GraphBuilder):
179
+ """
180
+ Builder for Neo4j graphs.
181
+
182
+ Attributes:
183
+ driver (neo4j.Driver): Neo4j driver instance.
184
+ logger (Logger): Logger instance for tracking progress and errors.
185
+ """
186
  def __init__(self, uri, user, password):
187
+ super().__init__()
188
  self.driver = GraphDatabase.driver(uri, auth=(user, password))
 
189
 
190
  def close(self):
191
+ """Close the Neo4j driver."""
192
  self.driver.close()
193
 
194
+ def build_graph(self, df, batch_size=1000, custom_node_props=None, custom_edge_props=None):
195
+ """
196
+ Build a Neo4j graph from the DataFrame with batch processing.
197
+
198
+ Args:
199
+ df (pd.DataFrame): Input DataFrame containing GDELT data.
200
+ batch_size (int): Number of rows to process in each batch.
201
+ custom_node_props (dict, optional): Custom properties for nodes.
202
+ custom_edge_props (dict, optional): Custom properties for edges.
203
+ """
204
+ self.validate_input(df)
205
+
206
  with self.driver.session() as session:
207
+ batch = []
208
  for _, row in df.iterrows():
209
+ nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props)
210
+ batch.append((nodes, relationships))
211
+ if len(batch) >= batch_size:
212
+ session.execute_write(self._create_graph_elements_batch, batch)
213
+ batch = []
214
+ if batch:
215
+ session.execute_write(self._create_graph_elements_batch, batch)
216
+
217
+ self.logger.info("Neo4j graph built successfully.")
218
+
219
+ def _create_graph_elements_batch(self, tx, batch):
220
+ """
221
+ Create nodes and relationships in Neo4j in batches.
222
+
223
+ Args:
224
+ tx (neo4j.Transaction): Neo4j transaction.
225
+ batch (List[Tuple[List[Dict], List[Dict]]]): Batch of nodes and relationships.
226
+ """
227
+ for nodes, relationships in batch:
228
+ # Create nodes
229
+ for node in nodes:
230
+ query = f"""
231
+ MERGE (n:{node['type']} {{id: $id}})
232
+ SET n += $properties
233
+ """
234
+ tx.run(query, id=node["id"], properties=node["properties"])
235
+
236
+ # Create relationships
237
+ for rel in relationships:
238
+ query = f"""
239
+ MATCH (a {{id: $from_id}})
240
+ MATCH (b {{id: $to_id}})
241
+ MERGE (a)-[r:{rel['type']}]->(b)
242
+ SET r += $properties
243
+ """
244
+ tx.run(query, from_id=rel["from"], to_id=rel["to"], properties=rel["properties"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
245
 
246
  class StLinkBuilder(GraphBuilder):
247
+ """
248
+ Builder for st-link-analysis compatible graphs.
249
+
250
+ Attributes:
251
+ logger (Logger): Logger instance for tracking progress and errors.
252
+ """
253
+ def __init__(self):
254
+ super().__init__()
255
+
256
+ def build_graph(self, df, custom_node_props=None, custom_edge_props=None):
257
+ """
258
+ Build graph in st-link-analysis format.
259
+
260
+ Args:
261
+ df (pd.DataFrame): Input DataFrame containing GDELT data.
262
+ custom_node_props (dict, optional): Custom properties for nodes.
263
+ custom_edge_props (dict, optional): Custom properties for edges.
264
+
265
+ Returns:
266
+ Dict: Graph data in st-link-analysis format.
267
+ """
268
+ self.validate_input(df)
269
  all_nodes = []
270
  all_edges = []
271
  edge_counter = 0
 
 
272
  added_nodes = set()
273
+
274
+ try:
275
+ for _, row in df.iterrows():
276
+ nodes, relationships = self.process_entities(row, custom_node_props, custom_edge_props)
277
+
278
+ # Process nodes
279
+ for node in nodes:
280
+ if node["id"] not in added_nodes:
281
+ stlink_node = {
282
+ "data": {
283
+ "id": str(node["id"]),
284
+ "label": node["type"].upper(),
285
+ **node["properties"]
286
+ }
287
+ }
288
+ all_nodes.append(stlink_node)
289
+ added_nodes.add(node["id"])
290
+
291
+ # Process relationships/edges
292
+ for rel in relationships:
293
+ edge_counter += 1
294
+ stlink_edge = {
295
  "data": {
296
+ "id": f"e{edge_counter}",
297
+ "source": str(rel["from"]),
298
+ "target": str(rel["to"]),
299
+ "label": rel["type"],
300
+ **rel["properties"]
301
  }
302
  }
303
+ all_edges.append(stlink_edge)
304
+
305
+ self.logger.info("st-link-analysis graph built successfully.")
306
+ return {
307
+ "nodes": all_nodes,
308
+ "edges": all_edges
309
+ }
310
+ except Exception as e:
311
+ self.logger.error(f"Error building st-link-analysis graph: {str(e)}")
312
+ raise
 
 
 
 
 
 
 
 
 
 
 
313
 
314
  def write_json(self, graph_data, filename):
315
+ """
316
+ Write graph to JSON file with streaming.
317
+
318
+ Args:
319
+ graph_data (Dict): Graph data in st-link-analysis format.
320
+ filename (str): Output file name.
321
+ """
322
+ try:
323
+ with open(filename, 'w') as f:
324
+ json.dump(graph_data, f, indent=2)
325
+ self.logger.info(f"Graph data written to {filename} successfully.")
326
+ except Exception as e:
327
+ self.logger.error(f"Error writing JSON file: {str(e)}")
328
+ raise
329
+
pages/2_🌐_COVID_Network_Analysis.py CHANGED
@@ -13,7 +13,7 @@ from typing import Dict, List, Set, Tuple, Optional
13
  from pathlib import Path
14
 
15
  from data_access import get_gdelt_data, filter_dataframe, GDELT_CATEGORIES
16
- from graph_builder import StreamlitGraphBuilder
17
  from graph_config import NODE_TYPES
18
 
19
  # Type aliases for clarity
@@ -252,12 +252,10 @@ def main():
252
  themes_filter=themes_filter
253
  )
254
 
255
- # Build graph
256
  with st.spinner("Building knowledge graph..."):
257
- builder = StreamlitGraphBuilder()
258
- for _, row in df.iterrows():
259
- builder.process_row(row)
260
- G = builder.G
261
 
262
  if G.number_of_nodes() == 0:
263
  st.warning("No data found matching the specified criteria.")
 
13
  from pathlib import Path
14
 
15
  from data_access import get_gdelt_data, filter_dataframe, GDELT_CATEGORIES
16
+ from graph_builder import NetworkXBuilder # Updated to use NetworkXBuilder
17
  from graph_config import NODE_TYPES
18
 
19
  # Type aliases for clarity
 
252
  themes_filter=themes_filter
253
  )
254
 
255
+ # Build graph using NetworkXBuilder
256
  with st.spinner("Building knowledge graph..."):
257
+ builder = NetworkXBuilder() # Use NetworkXBuilder
258
+ G = builder.build_graph(df) # Build graph from DataFrame
 
 
259
 
260
  if G.number_of_nodes() == 0:
261
  st.warning("No data found matching the specified criteria.")