import aiohttp import asyncio from typing import Optional import aiohttp import logging from typing import Optional logger = logging.getLogger(__name__) import aiohttp import aiofiles import logging import os from typing import Optional logger = logging.getLogger(__name__) async def download_whatsapp_media(media_id: str, access_token: str, file_path: Optional[str] = None) -> Optional[str]: """ Asynchronously download media from WhatsApp Cloud API using media ID and save it to a file. Args: media_id (str): The ID of the media to download. access_token (str): Your WhatsApp access token. file_path (Optional[str]): The path where the media file should be saved. Returns: str: The file path if the media is saved successfully. None: If download fails. """ timeout = aiohttp.ClientTimeout(total=30) # Set an appropriate timeout async with aiohttp.ClientSession(timeout=timeout) as session: # Step 1: Get the media URL try: url = f"https://graph.facebook.com/v21.0/{media_id}" headers = { "Authorization": f"Bearer {access_token}" } async with session.get(url, headers=headers) as response: data = await response.json() if response.status != 200: logger.error(f"Failed to get media URL. Status: {response.status}, Response: {data}") return None media_url = data.get('url') if not media_url: logger.error(f"Failed to get media URL from response: {data}") return None except aiohttp.ClientError as e: logger.error(f"Error getting media URL: {e}") return None except Exception as e: logger.error(f"Unexpected error: {e}") return None # Step 2: Download the actual media try: headers = { "Authorization": f"Bearer {access_token}" } async with session.get(media_url, headers=headers) as media_response: if media_response.status != 200: media_data = await media_response.text() logger.error(f"Failed to download media. Status: {media_response.status}, Response: {media_data}") return None if file_path: # Ensure the directory exists os.makedirs("user_media", exist_ok=True) file_path = os.path.join("user_media", file_path) # Write the media content to the file asynchronously async with aiofiles.open(file_path, 'wb') as f: async for chunk in media_response.content.iter_chunked(1024): await f.write(chunk) return file_path else: logger.error("File path not provided for saving the media.") return None except aiohttp.ClientError as e: logger.error(f"Error downloading media: {e}") return None except Exception as e: logger.error(f"Error saving media to file: {e}") return None # async def download_whatsapp_media(media_id: str, access_token: str) -> Optional[bytes]: # """ # Asynchronously download media from WhatsApp Cloud API using media ID # Args: # media_id (str): The ID of the media to download # access_token (str): Your WhatsApp access token # Returns: # bytes: The media content if successful # None: If download fails # """ # headers = { # "Authorization": f"Bearer {access_token}" # } # async with aiohttp.ClientSession() as session: # try: # # Step 1: Get the media URL # url = f"https://graph.facebook.com/v21.0/{media_id}" # async with session.get(url, headers=headers) as response: # if response.status != 200: # print(f"Failed to get media URL. Status: {response.status}") # return None # data = await response.json() # media_url = data.get('url') # if not media_url: # print("Failed to get media URL from response") # return None # # Step 2: Download the actual media # async with session.get(media_url, headers=headers) as media_response: # if media_response.status != 200: # print(f"Failed to download media. Status: {media_response.status}") # return None # return await media_response.read() # # Step 3: Save the media to a file # except aiohttp.ClientError as e: # print(f"Error downloading media: {e}") # return None async def save_whatsapp_image(media_id: str, access_token: str, output_path: str) -> None: """ Asynchronously download and save WhatsApp image to file Args: media_id (str): The ID of the media to download access_token (str): Your WhatsApp access token output_path (str): Path where to save the image """ media_content = await download_whatsapp_media(media_id, access_token) if media_content: # File operations are blocking, so we use asyncio.to_thread for Python 3.9+ # For older Python versions, you might want to use loop.run_in_executor await asyncio.to_thread(save_file, output_path, media_content) print(f"Image saved to {output_path}") else: print("Failed to download image") def save_file(path: str, content: bytes) -> None: """Helper function to save file content""" with open(path, 'wb') as f: f.write(content)