import os import asyncio from prefect import flow, task, get_run_logger from prefect.tasks import task_input_hash from prefect.blocks.system import Secret, JSON from prefect.task_runners import ConcurrentTaskRunner from prefect.concurrency.sync import concurrency from pathlib import Path import datetime from datetime import timedelta import pandas as pd from tqdm import tqdm from huggingface_hub import HfApi, hf_hub_url, list_datasets import requests import zipfile from typing import List, Dict, Optional # --- Constants --- # Set a global concurrency limit for Hugging Face uploads REPO_ID = "dwb2023/gdelt-gkg-march2020-v2" BASE_URL = "http://data.gdeltproject.org/gdeltv2" # Complete Column List GKG_COLUMNS = [ 'GKGRECORDID', # Unique identifier 'DATE', # Publication date 'SourceCollectionIdentifier', # Source type 'SourceCommonName', # Source name 'DocumentIdentifier', # Document URL/ID 'V1Counts', # Counts of various types 'V2.1Counts', # Enhanced counts with positions 'V1Themes', # Theme tags 'V2EnhancedThemes', # Themes with positions 'V1Locations', # Location mentions 'V2EnhancedLocations', # Locations with positions 'V1Persons', # Person names 'V2EnhancedPersons', # Persons with positions 'V1Organizations', # Organization names 'V2EnhancedOrganizations', # Organizations with positions 'V1.5Tone', # Emotional dimensions 'V2.1EnhancedDates', # Date mentions 'V2GCAM', # Global Content Analysis Measures 'V2.1SharingImage', # Publisher selected image 'V2.1RelatedImages', # Article images 'V2.1SocialImageEmbeds', # Social media images 'V2.1SocialVideoEmbeds', # Social media videos 'V2.1Quotations', # Quote extractions 'V2.1AllNames', # Named entities 'V2.1Amounts', # Numeric amounts 'V2.1TranslationInfo', # Translation metadata 'V2ExtrasXML' # Additional XML data ] # Priority Columns PRIORITY_COLUMNS = [ 'GKGRECORDID', # Unique identifier 'DATE', # Publication date 'SourceCollectionIdentifier', # Source type 'SourceCommonName', # Source name 'DocumentIdentifier', # Document URL/ID 'V1Counts', # Numeric mentions 'V2.1Counts', # Enhanced counts 'V1Themes', # Theme tags 'V2EnhancedThemes', # Enhanced themes 'V1Locations', # Geographic data 'V2EnhancedLocations', # Enhanced locations 'V1Persons', # Person mentions 'V2EnhancedPersons', # Enhanced persons 'V1Organizations', # Organization mentions 'V2EnhancedOrganizations', # Enhanced organizations 'V1.5Tone', # Sentiment scores 'V2.1EnhancedDates', # Date mentions 'V2GCAM', # Enhanced sentiment 'V2.1Quotations', # Direct quotes 'V2.1AllNames', # All named entities 'V2.1Amounts' # Numeric data ] # --- Tasks --- @task(retries=3, retry_delay_seconds=30, log_prints=True) def setup_directories(base_path: Path) -> dict: """Create processing directories.""" logger = get_run_logger() try: raw_dir = base_path / "gdelt_raw" processed_dir = base_path / "gdelt_processed" raw_dir.mkdir(parents=True, exist_ok=True) processed_dir.mkdir(parents=True, exist_ok=True) logger.info("Directories created successfully") return {"raw": raw_dir, "processed": processed_dir} except Exception as e: logger.error(f"Directory creation failed: {str(e)}") raise @task(retries=2, log_prints=True) def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]: """ Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval). """ logger = get_run_logger() url_groups = {} try: current_date = start_date.date() while current_date <= end_date.date(): urls = [ f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip" for hour in range(24) for minute in [0, 15, 30, 45] ] url_groups[current_date] = urls current_date += timedelta(days=1) logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}") return url_groups except Exception as e: logger.error(f"URL generation failed: {str(e)}") raise @task(retries=3, retry_delay_seconds=30, log_prints=True) def download_file(url: str, raw_dir: Path) -> Path: """Download a single CSV (zip) file from the given URL.""" logger = get_run_logger() try: response = requests.get(url, timeout=10) response.raise_for_status() filename = Path(url).name zip_path = raw_dir / filename with zip_path.open('wb') as f: f.write(response.content) logger.info(f"Downloaded {filename}") # Optionally, extract the CSV from the ZIP archive. with zipfile.ZipFile(zip_path, 'r') as z: # Assuming the zip contains one CSV file. csv_names = z.namelist() if csv_names: extracted_csv = raw_dir / csv_names[0] z.extractall(path=raw_dir) logger.info(f"Extracted {csv_names[0]}") return extracted_csv else: raise ValueError("Zip file is empty.") except Exception as e: logger.error(f"Error downloading {url}: {str(e)}") raise @task(retries=2, log_prints=True) def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path: """ Combine multiple CSV files (for one day) into a single DataFrame, filter to only the required columns, optimize data types, and write out as a single Parquet file. """ logger = get_run_logger() try: dfs = [] for csv_path in csv_paths: df = pd.read_csv( csv_path, sep='\t', names=GKG_COLUMNS, dtype='string', quoting=3, na_values=[''], encoding='utf-8', encoding_errors='replace' ) dfs.append(df) combined_df = pd.concat(dfs, ignore_index=True) filtered_df = combined_df[PRIORITY_COLUMNS].copy() # Convert the date field to datetime; adjust the format if necessary. if 'V2.1DATE' in filtered_df.columns: filtered_df['V2.1DATE'] = pd.to_datetime( filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce' ) output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet" output_path = processed_dir / output_filename filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False) logger.info(f"Converted and filtered data for {date} into {output_filename}") return output_path except Exception as e: logger.error(f"Error processing CSVs for {date}: {str(e)}") raise @task(retries=3, retry_delay_seconds=30, log_prints=True) def upload_to_hf(file_path: Path, token: str) -> bool: """Upload task with global concurrency limit.""" logger = get_run_logger() try: with concurrency("hf_uploads", occupy=1): # Enable the optimized HF Transfer backend. os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" api = HfApi() api.upload_file( path_or_fileobj=str(file_path), path_in_repo=file_path.name, repo_id=REPO_ID, repo_type="dataset", token=token, ) logger.info(f"Uploaded {file_path.name}") return True except Exception as e: logger.error(f"Upload failed for {file_path.name}: {str(e)}") raise @task(retries=3, retry_delay_seconds=120, log_prints=True) def create_hf_repo(token: str) -> bool: """ Validate that the Hugging Face dataset repository exists; create it if not. """ logger = get_run_logger() try: api = HfApi() datasets = [ds.id for ds in list_datasets(token=token)] if REPO_ID in datasets: logger.info(f"Dataset repository '{REPO_ID}' already exists.") return True # Create the repository if it doesn't exist. api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False) logger.info(f"Successfully created dataset repository: {REPO_ID}") return True except Exception as e: logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}") raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e @flow(name="Process Single Day", log_prints=True) def process_single_day( date: datetime.date, urls: List[str], directories: dict, hf_token: str ) -> bool: """ Process one day's data by: 1. Downloading all CSV files concurrently. 2. Merging, filtering, and optimizing the CSVs. 3. Writing out a single daily Parquet file. 4. Uploading the file to the Hugging Face Hub. """ logger = get_run_logger() try: # Download and process data (unlimited concurrency) csv_paths = [download_file(url, directories["raw"]) for url in urls] daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date) # Upload with global concurrency limit upload_to_hf(daily_parquet, hf_token) # <-- Throttled to 2 concurrent logger.info(f"Completed {date}") return True except Exception as e: logger.error(f"Day {date} failed: {str(e)}") raise @flow( name="Process Date Range", task_runner=ConcurrentTaskRunner(), # Parallel subflows log_prints=True ) def process_date_range(base_path: Path = Path("data")): """ Main ETL flow: 1. Load parameters and credentials. 2. Validate (or create) the Hugging Face repository. 3. Setup directories. 4. Generate URL groups by date. 5. Process each day concurrently. """ logger = get_run_logger() # Load parameters from a JSON block. json_block = JSON.load("gdelt-etl-parameters") params = json_block.value start_date = datetime.datetime.fromisoformat(params.get("start_date", "2020-03-16T00:00:00")) end_date = datetime.datetime.fromisoformat(params.get("end_date", "2020-03-22T00:00:00")) # Load the Hugging Face token from a Secret block. secret_block = Secret.load("huggingface-token") hf_token = secret_block.get() # Validate or create the repository. create_hf_repo(hf_token) directories = setup_directories(base_path) url_groups = generate_gdelt_urls(start_date, end_date) # Process days concurrently (subflows) futures = [process_single_day(date, urls, directories, hf_token) for date, urls in url_groups.items()] # Wait for completion (optional error handling) for future in futures: try: future.result() except Exception as e: logger.error(f"Failed day: {str(e)}") # --- Entry Point --- if __name__ == "__main__": process_date_range.serve( name="gdelt-etl-production-v2", tags=["gdelt", "etl", "production"], )