|
import os |
|
import asyncio |
|
from prefect import flow, task, get_run_logger |
|
from prefect.tasks import task_input_hash |
|
from prefect.blocks.system import Secret, JSON |
|
from prefect.task_runners import ConcurrentTaskRunner |
|
from prefect.concurrency.sync import concurrency |
|
from pathlib import Path |
|
import datetime |
|
from datetime import timedelta |
|
import pandas as pd |
|
from tqdm import tqdm |
|
from huggingface_hub import HfApi, hf_hub_url, list_datasets |
|
import requests |
|
import zipfile |
|
from typing import List, Dict, Optional |
|
|
|
|
|
|
|
REPO_ID = "dwb2023/gdelt-gkg-march2020-v2" |
|
|
|
BASE_URL = "http://data.gdeltproject.org/gdeltv2" |
|
|
|
|
|
GKG_COLUMNS = [ |
|
'GKGRECORDID', |
|
'DATE', |
|
'SourceCollectionIdentifier', |
|
'SourceCommonName', |
|
'DocumentIdentifier', |
|
'V1Counts', |
|
'V2.1Counts', |
|
'V1Themes', |
|
'V2EnhancedThemes', |
|
'V1Locations', |
|
'V2EnhancedLocations', |
|
'V1Persons', |
|
'V2EnhancedPersons', |
|
'V1Organizations', |
|
'V2EnhancedOrganizations', |
|
'V1.5Tone', |
|
'V2.1EnhancedDates', |
|
'V2GCAM', |
|
'V2.1SharingImage', |
|
'V2.1RelatedImages', |
|
'V2.1SocialImageEmbeds', |
|
'V2.1SocialVideoEmbeds', |
|
'V2.1Quotations', |
|
'V2.1AllNames', |
|
'V2.1Amounts', |
|
'V2.1TranslationInfo', |
|
'V2ExtrasXML' |
|
] |
|
|
|
|
|
PRIORITY_COLUMNS = [ |
|
'GKGRECORDID', |
|
'DATE', |
|
'SourceCollectionIdentifier', |
|
'SourceCommonName', |
|
'DocumentIdentifier', |
|
'V1Counts', |
|
'V2.1Counts', |
|
'V1Themes', |
|
'V2EnhancedThemes', |
|
'V1Locations', |
|
'V2EnhancedLocations', |
|
'V1Persons', |
|
'V2EnhancedPersons', |
|
'V1Organizations', |
|
'V2EnhancedOrganizations', |
|
'V1.5Tone', |
|
'V2.1EnhancedDates', |
|
'V2GCAM', |
|
'V2.1Quotations', |
|
'V2.1AllNames', |
|
'V2.1Amounts' |
|
] |
|
|
|
|
|
|
|
@task(retries=3, retry_delay_seconds=30, log_prints=True) |
|
def setup_directories(base_path: Path) -> dict: |
|
"""Create processing directories.""" |
|
logger = get_run_logger() |
|
try: |
|
raw_dir = base_path / "gdelt_raw" |
|
processed_dir = base_path / "gdelt_processed" |
|
raw_dir.mkdir(parents=True, exist_ok=True) |
|
processed_dir.mkdir(parents=True, exist_ok=True) |
|
logger.info("Directories created successfully") |
|
return {"raw": raw_dir, "processed": processed_dir} |
|
except Exception as e: |
|
logger.error(f"Directory creation failed: {str(e)}") |
|
raise |
|
|
|
@task(retries=2, log_prints=True) |
|
def generate_gdelt_urls(start_date: datetime.datetime, end_date: datetime.datetime) -> Dict[datetime.date, List[str]]: |
|
""" |
|
Generate a dictionary keyed by date. Each value is a list of URLs (one per 15-minute interval). |
|
""" |
|
logger = get_run_logger() |
|
url_groups = {} |
|
try: |
|
current_date = start_date.date() |
|
while current_date <= end_date.date(): |
|
urls = [ |
|
f"{BASE_URL}/{current_date.strftime('%Y%m%d')}{hour:02}{minute:02}00.gkg.csv.zip" |
|
for hour in range(24) |
|
for minute in [0, 15, 30, 45] |
|
] |
|
url_groups[current_date] = urls |
|
current_date += timedelta(days=1) |
|
logger.info(f"Generated URL groups for dates: {list(url_groups.keys())}") |
|
return url_groups |
|
except Exception as e: |
|
logger.error(f"URL generation failed: {str(e)}") |
|
raise |
|
|
|
@task(retries=3, retry_delay_seconds=30, log_prints=True) |
|
def download_file(url: str, raw_dir: Path) -> Path: |
|
"""Download a single CSV (zip) file from the given URL.""" |
|
logger = get_run_logger() |
|
try: |
|
response = requests.get(url, timeout=10) |
|
response.raise_for_status() |
|
filename = Path(url).name |
|
zip_path = raw_dir / filename |
|
with zip_path.open('wb') as f: |
|
f.write(response.content) |
|
logger.info(f"Downloaded {filename}") |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as z: |
|
|
|
csv_names = z.namelist() |
|
if csv_names: |
|
extracted_csv = raw_dir / csv_names[0] |
|
z.extractall(path=raw_dir) |
|
logger.info(f"Extracted {csv_names[0]}") |
|
return extracted_csv |
|
else: |
|
raise ValueError("Zip file is empty.") |
|
except Exception as e: |
|
logger.error(f"Error downloading {url}: {str(e)}") |
|
raise |
|
|
|
@task(retries=2, log_prints=True) |
|
def convert_and_filter_combined(csv_paths: List[Path], processed_dir: Path, date: datetime.date) -> Path: |
|
""" |
|
Combine multiple CSV files (for one day) into a single DataFrame, |
|
filter to only the required columns, optimize data types, |
|
and write out as a single Parquet file. |
|
""" |
|
logger = get_run_logger() |
|
try: |
|
dfs = [] |
|
for csv_path in csv_paths: |
|
df = pd.read_csv( |
|
csv_path, |
|
sep='\t', |
|
names=GKG_COLUMNS, |
|
dtype='string', |
|
quoting=3, |
|
na_values=[''], |
|
encoding='utf-8', |
|
encoding_errors='replace' |
|
) |
|
dfs.append(df) |
|
combined_df = pd.concat(dfs, ignore_index=True) |
|
filtered_df = combined_df[PRIORITY_COLUMNS].copy() |
|
|
|
if 'V2.1DATE' in filtered_df.columns: |
|
filtered_df['V2.1DATE'] = pd.to_datetime( |
|
filtered_df['V2.1DATE'], format='%Y%m%d%H%M%S', errors='coerce' |
|
) |
|
output_filename = f"gdelt_gkg_{date.strftime('%Y%m%d')}.parquet" |
|
output_path = processed_dir / output_filename |
|
filtered_df.to_parquet(output_path, engine='pyarrow', compression='snappy', index=False) |
|
logger.info(f"Converted and filtered data for {date} into {output_filename}") |
|
return output_path |
|
except Exception as e: |
|
logger.error(f"Error processing CSVs for {date}: {str(e)}") |
|
raise |
|
|
|
@task(retries=3, retry_delay_seconds=30, log_prints=True) |
|
def upload_to_hf(file_path: Path, token: str) -> bool: |
|
"""Upload task with global concurrency limit.""" |
|
logger = get_run_logger() |
|
try: |
|
with concurrency("hf_uploads", occupy=1): |
|
|
|
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" |
|
|
|
api = HfApi() |
|
api.upload_file( |
|
path_or_fileobj=str(file_path), |
|
path_in_repo=file_path.name, |
|
repo_id=REPO_ID, |
|
repo_type="dataset", |
|
token=token, |
|
) |
|
logger.info(f"Uploaded {file_path.name}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Upload failed for {file_path.name}: {str(e)}") |
|
raise |
|
|
|
@task(retries=3, retry_delay_seconds=120, log_prints=True) |
|
def create_hf_repo(token: str) -> bool: |
|
""" |
|
Validate that the Hugging Face dataset repository exists; create it if not. |
|
""" |
|
logger = get_run_logger() |
|
try: |
|
api = HfApi() |
|
datasets = [ds.id for ds in list_datasets(token=token)] |
|
if REPO_ID in datasets: |
|
logger.info(f"Dataset repository '{REPO_ID}' already exists.") |
|
return True |
|
|
|
api.create_repo(repo_id=REPO_ID, repo_type="dataset", token=token, private=False) |
|
logger.info(f"Successfully created dataset repository: {REPO_ID}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to create or validate dataset repo '{REPO_ID}': {str(e)}") |
|
raise RuntimeError(f"Repository validation/creation failed for '{REPO_ID}'") from e |
|
|
|
@flow(name="Process Single Day", log_prints=True) |
|
def process_single_day( |
|
date: datetime.date, urls: List[str], directories: dict, hf_token: str |
|
) -> bool: |
|
""" |
|
Process one day's data by: |
|
1. Downloading all CSV files concurrently. |
|
2. Merging, filtering, and optimizing the CSVs. |
|
3. Writing out a single daily Parquet file. |
|
4. Uploading the file to the Hugging Face Hub. |
|
""" |
|
logger = get_run_logger() |
|
try: |
|
|
|
csv_paths = [download_file(url, directories["raw"]) for url in urls] |
|
daily_parquet = convert_and_filter_combined(csv_paths, directories["processed"], date) |
|
|
|
|
|
upload_to_hf(daily_parquet, hf_token) |
|
|
|
logger.info(f"Completed {date}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Day {date} failed: {str(e)}") |
|
raise |
|
|
|
@flow( |
|
name="Process Date Range", |
|
task_runner=ConcurrentTaskRunner(), |
|
log_prints=True |
|
) |
|
def process_date_range(base_path: Path = Path("data")): |
|
""" |
|
Main ETL flow: |
|
1. Load parameters and credentials. |
|
2. Validate (or create) the Hugging Face repository. |
|
3. Setup directories. |
|
4. Generate URL groups by date. |
|
5. Process each day concurrently. |
|
""" |
|
logger = get_run_logger() |
|
|
|
|
|
json_block = JSON.load("gdelt-etl-parameters") |
|
params = json_block.value |
|
start_date = datetime.datetime.fromisoformat(params.get("start_date", "2020-03-16T00:00:00")) |
|
end_date = datetime.datetime.fromisoformat(params.get("end_date", "2020-03-22T00:00:00")) |
|
|
|
|
|
secret_block = Secret.load("huggingface-token") |
|
hf_token = secret_block.get() |
|
|
|
|
|
create_hf_repo(hf_token) |
|
|
|
directories = setup_directories(base_path) |
|
url_groups = generate_gdelt_urls(start_date, end_date) |
|
|
|
|
|
futures = [process_single_day(date, urls, directories, hf_token) |
|
for date, urls in url_groups.items()] |
|
|
|
|
|
for future in futures: |
|
try: |
|
future.result() |
|
except Exception as e: |
|
logger.error(f"Failed day: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
process_date_range.serve( |
|
name="gdelt-etl-production-v2", |
|
tags=["gdelt", "etl", "production"], |
|
) |
|
|