import aioboto3 import pandas as pd from io import StringIO from typing import Optional, Union import os AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') S3_BUCKET = os.getenv('S3_BUCKET') S3_VIDEO_PATH = 'sample videos' S3_MODEL_PATH = 'models' S3_DATA_PATH = '3d_animation_arena/results' async def download_from_s3(file_key : str, target_dir: str, bucket : str = S3_BUCKET) -> Optional[str]: """ Downloads a file from an S3 bucket. Args: file_key (str): The key of the file in the S3 bucket, including extension. target_dir (str): The path to the directory to save the downloaded file. bucket (str, optional): The name of the S3 bucket. Returns: Optional[str]: The path to the file or None if the download fails. """ session = aioboto3.Session() target_path = os.path.join(target_dir, file_key) async with session.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) as s3_client: try: os.makedirs(target_dir, exist_ok=True) if os.path.exists(target_path): print(f'{file_key} already exists in {target_dir}') return target_path with open(target_path, 'wb') as f: match file_key.split('.')[-1]: case 'mp4': await s3_client.download_fileobj(bucket, os.path.join(S3_VIDEO_PATH, file_key), f) case 'glb'|'obj'|'stl'|'gltf'|'splat'|'ply': await s3_client.download_fileobj(bucket, os.path.join(S3_MODEL_PATH, file_key), f) case _: print(f"Unsupported file type: {file_key}") raise ValueError(f"Unsupported file type: {file_key}") return target_path except Exception as e: print(f'Error downloading {file_key} from bucket {bucket}: {e}') raise e async def read_from_s3(file_key : str, bucket : str = S3_BUCKET) -> Optional[Union[pd.DataFrame, str]]: """ Reads a file from an S3 bucket based on its file extension and returns the appropriate data type. Args: file_key (str): The key of the file in the S3 bucket. bucket (str, optional): The name of the S3 bucket. Returns: Optional[Union[pd.DataFrame, str]]: - A pandas DataFrame if the file is a CSV. - A temporary file path (str) if the file is a GLB. - A presigned URL (str) if the file is an MP4. - None if the file type is unsupported. """ session = aioboto3.Session() async with session.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) as s3_client: try: match file_key.split('.')[-1]: case 'csv': response = await s3_client.get_object(Bucket=bucket, Key=os.path.join(S3_DATA_PATH, file_key)) content = await response['Body'].read() result = pd.read_csv(StringIO(content.decode("utf-8"))) return result case _: print(f"Unsupported file type for reading: {file_key}") raise ValueError(f"Unsupported file type for reading: {file_key}") except Exception as e: print(f'Error reading {file_key} from bucket {bucket}: {e}') raise e async def write_to_s3(file_key : str, dataframe: pd.DataFrame, bucket : str = S3_BUCKET) -> None: """ Writes a pandas DataFrame to an S3 bucket as a CSV file. Args: file_key (str): The key (file name) under which the file will be stored in the S3 bucket. dataframe (pd.DataFrame): The pandas DataFrame to write to the S3 bucket. bucket (str, optional): The name of the S3 bucket. Raises: Exception: Reraises any exception encountered during the write process. """ session = aioboto3.Session() async with session.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) as s3_client: try: match file_key.split('.')[-1]: case 'csv': csv_buffer = StringIO() dataframe.to_csv(csv_buffer, index=False) await s3_client.put_object( Bucket=bucket, Key=os.path.join(S3_DATA_PATH, file_key), Body=csv_buffer.getvalue() ) case _: print(f"Unsupported file type for writing: {file_key}") raise ValueError(f"Unsupported file type for writing: {file_key}") except Exception as e: print(f'Error writing {file_key} to bucket {bucket}: {e}') raise e