insight / pages /2_๐ŸŒ_COVID_Network_Analysis.py
dwb2023's picture
update graph builder and covid analysis.py
78d0252
"""
Network Analysis Page - GDELT Graph Analysis
This module provides interactive network analysis of GDELT event data.
"""
import streamlit as st
import networkx as nx
from pyvis.network import Network
import pandas as pd
from datetime import datetime
import tempfile
import json
from typing import Dict, List, Set, Tuple, Optional
from pathlib import Path
from data_access import get_gdelt_data, filter_dataframe, GDELT_CATEGORIES
from graph_builder import NetworkXBuilder # Updated to use NetworkXBuilder
from graph_config import NODE_TYPES
# Type aliases for clarity
NodeID = str
CommunityID = int
Community = Set[NodeID]
Communities = List[Community]
def create_legend_html() -> str:
"""Create HTML for the visualization legend."""
legend_html = """
<div style="
position: absolute;
top: 10px;
right: 10px;
background-color: rgba(255, 255, 255, 0.9);
padding: 10px;
border-radius: 5px;
border: 1px solid #ddd;
z-index: 1000;
">
<h3 style="margin: 0 0 10px 0;">Legend</h3>
"""
for node_type, info in NODE_TYPES.items():
legend_html += f"""
<div style="margin: 5px 0;">
<span style="
display: inline-block;
width: 12px;
height: 12px;
background-color: {info['color']};
border-radius: 50%;
margin-right: 5px;
"></span>
<span>{info['description']}</span>
</div>
"""
legend_html += "</div>"
return legend_html
class CommunityAnalyzer:
"""Handles community detection and analysis for GDELT network graphs."""
def __init__(self, G: nx.Graph):
self.G = G
self._communities: Optional[Communities] = None
self._analysis: Optional[List[Dict]] = None
@property
def communities(self) -> Communities:
"""Cached access to detected communities."""
if self._communities is None:
self._communities = nx.community.louvain_communities(self.G)
return self._communities
def analyze_composition(self) -> List[Dict]:
"""Perform detailed analysis of each community's composition."""
if self._analysis is not None:
return self._analysis
analysis_results = []
for idx, community in enumerate(self.communities):
try:
# Initialize analysis containers
node_types = {ntype: 0 for ntype in NODE_TYPES.keys()}
themes: Set[str] = set()
entities: Dict[str, int] = {}
# Analyze community nodes
for node in community:
attrs = self.G.nodes[node]
node_type = attrs.get('type', 'unknown')
# Update type counts
if node_type in node_types:
node_types[node_type] += 1
# Collect themes
if node_type == 'theme':
theme_name = attrs.get('name', '')
if theme_name:
themes.add(theme_name)
# Track entity connections
if node_type in {'person', 'organization', 'location'}:
name = attrs.get('name', node)
entities[name] = self.G.degree(node)
# Calculate community metrics
subgraph = self.G.subgraph(community)
n = len(community)
possible_edges = (n * (n - 1)) / 2 if n > 1 else 0
density = (subgraph.number_of_edges() / possible_edges) if possible_edges > 0 else 0
# Get top entities by degree
top_entities = dict(sorted(entities.items(), key=lambda x: x[1], reverse=True)[:5])
analysis_results.append({
'id': idx,
'size': len(community),
'node_types': node_types,
'themes': sorted(themes),
'top_entities': top_entities,
'density': density,
'internal_edges': subgraph.number_of_edges(),
'external_edges': sum(1 for u in community
for v in self.G[u]
if v not in community)
})
except Exception as e:
st.error(f"Error analyzing community {idx}: {str(e)}")
continue
self._analysis = analysis_results
return analysis_results
def display_community_analysis(analysis: List[Dict]) -> None:
"""Display detailed community analysis in Streamlit."""
# Display summary metrics
total_nodes = sum(comm['size'] for comm in analysis)
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Communities", len(analysis))
with col2:
st.metric("Total Nodes", total_nodes)
with col3:
largest_comm = max(comm['size'] for comm in analysis)
st.metric("Largest Community", largest_comm)
# Display each community in tabs
st.subheader("Community Details")
tabs = st.tabs([f"Community {comm['id']}" for comm in analysis])
for tab, comm in zip(tabs, analysis):
with tab:
cols = st.columns(2)
# Left column: Composition
with cols[0]:
st.subheader("Composition")
node_types_df = pd.DataFrame([comm['node_types']]).T
node_types_df.columns = ['Count']
st.bar_chart(node_types_df)
st.markdown("**Metrics:**")
st.write(f"- Size: {comm['size']} nodes")
st.write(f"- Density: {comm['density']:.3f}")
st.write(f"- Internal edges: {comm['internal_edges']}")
st.write(f"- External edges: {comm['external_edges']}")
st.write(f"- % of network: {(comm['size']/total_nodes)*100:.1f}%")
# Right column: Entities and Themes
with cols[1]:
if comm['top_entities']:
st.subheader("Key Entities")
for entity, degree in comm['top_entities'].items():
st.write(f"- {entity} ({degree} connections)")
if comm['themes']:
st.subheader("Themes")
for theme in sorted(comm['themes']):
st.write(f"- {theme}")
def visualize_with_pyvis(G: nx.Graph, physics: bool = True) -> str:
"""Create interactive PyVis visualization with legend."""
net = Network(height="600px", width="100%", notebook=False, directed=False)
net.from_nx(G)
# Configure nodes
for node in net.nodes:
node_type = node.get("type", "unknown")
node["color"] = NODE_TYPES.get(node_type, {}).get('color', "#cccccc")
node["size"] = 20 if node_type == "event" else 15
title_attrs = {k: v for k, v in node.items() if k != "id"}
node["title"] = "\n".join(f"{k}: {v}" for k, v in title_attrs.items())
# Configure edges
for edge in net.edges:
edge["title"] = edge.get("relationship", "")
edge["color"] = {"color": "#666666", "opacity": 0.5}
# Physics settings
if physics:
net.show_buttons(filter_=['physics'])
else:
net.toggle_physics(False)
# Generate HTML
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
net.save_graph(f.name)
html_content = Path(f.name).read_text(encoding='utf-8')
# Add legend
legend = create_legend_html()
html_content = html_content.replace('</body>', f'{legend}</body>')
return html_content
def main():
st.title("๐ŸŒ Global Network Analysis")
st.markdown("""
**Explore Global Event Networks**
Dive deep into the interconnected world of negative sentiment events as captured by GDELT. Utilize interactive visualizations and community analysis tools to understand key metrics, structures, and interrelationships.
""")
# Initialize session state
if 'vis_html' not in st.session_state:
st.session_state.vis_html = None
# Sidebar controls
with st.sidebar:
st.header("Graph Controls")
limit = st.slider("Max records to load", 1, 25, 5)
tone_threshold = st.slider("Max tone score", -10.0, -5.0, -7.0)
show_physics = st.checkbox("Enable physics", value=True)
st.header("Advanced Filters")
source_filter = st.text_input("Filter by source name")
themes_filter = st.text_input("Filter by theme/keyword")
start_date = st.text_input("Start date (YYYYMMDD)")
end_date = st.text_input("End date (YYYYMMDD)")
try:
# Load and process data
df = get_gdelt_data(
limit=limit,
tone_threshold=tone_threshold,
start_date=start_date if start_date else None,
end_date=end_date if end_date else None,
source_filter=source_filter,
themes_filter=themes_filter
)
# Build graph using NetworkXBuilder
with st.spinner("Building knowledge graph..."):
builder = NetworkXBuilder() # Use NetworkXBuilder
G = builder.build_graph(df) # Build graph from DataFrame
if G.number_of_nodes() == 0:
st.warning("No data found matching the specified criteria.")
return
# Display basic metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Nodes", G.number_of_nodes())
with col2:
st.metric("Total Edges", G.number_of_edges())
with col3:
event_count = sum(1 for _, attr in G.nodes(data=True)
if attr.get("type") == "event")
st.metric("Negative Events", event_count)
# Analysis section
st.header("NetworkX Graph Analysis")
# Centrality analysis
with st.expander("Centrality Analysis"):
degree_centrality = nx.degree_centrality(G)
top_nodes = sorted(degree_centrality.items(),
key=lambda x: x[1], reverse=True)[:5]
st.write("Most Connected Nodes:")
for node, centrality in top_nodes:
node_type = G.nodes[node].get("type", "unknown")
st.write(f"- `{node[:30]}` ({node_type}): {centrality:.3f}")
# Community analysis
with st.expander("Community Analysis"):
try:
analyzer = CommunityAnalyzer(G)
analysis = analyzer.analyze_composition()
display_community_analysis(analysis)
except Exception as e:
st.error(f"Community analysis failed: {str(e)}")
st.error("Please check the graph structure and try again.")
# Export options
st.header("Export Options")
with st.expander("Export Data"):
col1, col2, col3 = st.columns(3)
with col1:
# GraphML export
graphml_string = "".join(nx.generate_graphml(G))
st.download_button(
label="Download GraphML",
data=graphml_string.encode('utf-8'),
file_name=f"gdelt_graph_{datetime.now().isoformat()}.graphml",
mime="application/xml"
)
with col2:
# JSON network export
json_string = json.dumps(nx.node_link_data(G, edges="edges"))
st.download_button(
label="Download JSON",
data=json_string.encode('utf-8'),
file_name=f"gdelt_graph_{datetime.now().isoformat()}.json",
mime="application/json"
)
with col3:
# Community analysis export
if 'analysis' in locals():
analysis_json = json.dumps(analysis, indent=2)
st.download_button(
label="Download Analysis",
data=analysis_json.encode('utf-8'),
file_name=f"community_analysis_{datetime.now().isoformat()}.json",
mime="application/json"
)
# Interactive visualization
st.header("Network Visualization")
with st.expander("Interactive Network", expanded=False):
if st.session_state.vis_html is None:
with st.spinner("Generating visualization..."):
st.session_state.vis_html = visualize_with_pyvis(G, physics=show_physics)
st.components.v1.html(st.session_state.vis_html, height=600, scrolling=True)
except Exception as e:
st.error(f"An error occurred: {str(e)}")
st.error("Please adjust your filters and try again.")
main()