|
import aioboto3 |
|
import pandas as pd |
|
from io import StringIO |
|
from typing import Optional, Union |
|
import os |
|
|
|
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') |
|
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') |
|
S3_BUCKET = os.getenv('S3_BUCKET') |
|
S3_VIDEO_PATH = 'sample videos' |
|
S3_MODEL_PATH = 'models' |
|
S3_DATA_PATH = '3d_animation_arena/results' |
|
|
|
async def download_from_s3(file_key : str, target_dir: str, bucket : str = S3_BUCKET) -> Optional[str]: |
|
""" |
|
Downloads a file from an S3 bucket. |
|
|
|
Args: |
|
file_key (str): The key of the file in the S3 bucket, including extension. |
|
target_dir (str): The path to the directory to save the downloaded file. |
|
bucket (str, optional): The name of the S3 bucket. |
|
|
|
Returns: |
|
Optional[str]: The path to the file or None if the download fails. |
|
""" |
|
session = aioboto3.Session() |
|
target_path = os.path.join(target_dir, file_key) |
|
|
|
async with session.client( |
|
's3', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) as s3_client: |
|
try: |
|
os.makedirs(target_dir, exist_ok=True) |
|
if os.path.exists(target_path): |
|
print(f'{file_key} already exists in {target_dir}') |
|
return target_path |
|
with open(target_path, 'wb') as f: |
|
match file_key.split('.')[-1]: |
|
case 'mp4': |
|
await s3_client.download_fileobj(bucket, os.path.join(S3_VIDEO_PATH, file_key), f) |
|
case 'glb'|'obj'|'stl'|'gltf'|'splat'|'ply': |
|
await s3_client.download_fileobj(bucket, os.path.join(S3_MODEL_PATH, file_key), f) |
|
case _: |
|
print(f"Unsupported file type: {file_key}") |
|
raise ValueError(f"Unsupported file type: {file_key}") |
|
return target_path |
|
except Exception as e: |
|
print(f'Error downloading {file_key} from bucket {bucket}: {e}') |
|
raise e |
|
|
|
|
|
async def read_from_s3(file_key : str, bucket : str = S3_BUCKET) -> Optional[Union[pd.DataFrame, str]]: |
|
""" |
|
Reads a file from an S3 bucket based on its file extension and returns the appropriate data type. |
|
|
|
Args: |
|
file_key (str): The key of the file in the S3 bucket. |
|
bucket (str, optional): The name of the S3 bucket. |
|
|
|
Returns: |
|
Optional[Union[pd.DataFrame, str]]: |
|
- A pandas DataFrame if the file is a CSV. |
|
- A temporary file path (str) if the file is a GLB. |
|
- A presigned URL (str) if the file is an MP4. |
|
- None if the file type is unsupported. |
|
""" |
|
session = aioboto3.Session() |
|
async with session.client( |
|
's3', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) as s3_client: |
|
try: |
|
match file_key.split('.')[-1]: |
|
case 'csv': |
|
response = await s3_client.get_object(Bucket=bucket, Key=os.path.join(S3_DATA_PATH, file_key)) |
|
content = await response['Body'].read() |
|
result = pd.read_csv(StringIO(content.decode("utf-8"))) |
|
return result |
|
case _: |
|
print(f"Unsupported file type for reading: {file_key}") |
|
raise ValueError(f"Unsupported file type for reading: {file_key}") |
|
except Exception as e: |
|
print(f'Error reading {file_key} from bucket {bucket}: {e}') |
|
raise e |
|
|
|
|
|
async def write_to_s3(file_key : str, dataframe: pd.DataFrame, bucket : str = S3_BUCKET) -> None: |
|
""" |
|
Writes a pandas DataFrame to an S3 bucket as a CSV file. |
|
|
|
Args: |
|
file_key (str): The key (file name) under which the file will be stored in the S3 bucket. |
|
dataframe (pd.DataFrame): The pandas DataFrame to write to the S3 bucket. |
|
bucket (str, optional): The name of the S3 bucket. |
|
|
|
Raises: |
|
Exception: Reraises any exception encountered during the write process. |
|
""" |
|
session = aioboto3.Session() |
|
async with session.client( |
|
's3', |
|
aws_access_key_id=AWS_ACCESS_KEY_ID, |
|
aws_secret_access_key=AWS_SECRET_ACCESS_KEY |
|
) as s3_client: |
|
try: |
|
match file_key.split('.')[-1]: |
|
case 'csv': |
|
csv_buffer = StringIO() |
|
dataframe.to_csv(csv_buffer, index=False) |
|
await s3_client.put_object( |
|
Bucket=bucket, |
|
Key=os.path.join(S3_DATA_PATH, file_key), |
|
Body=csv_buffer.getvalue() |
|
) |
|
case _: |
|
print(f"Unsupported file type for writing: {file_key}") |
|
raise ValueError(f"Unsupported file type for writing: {file_key}") |
|
except Exception as e: |
|
print(f'Error writing {file_key} to bucket {bucket}: {e}') |
|
raise e |