import asyncio import os from playwright.async_api import async_playwright, Page, Request, Response, Download import re import logging from urllib.parse import urlparse from datetime import datetime, timedelta import enum # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger(__name__) class VoiceType(enum.Enum): NEUTRAL = "voice1" HAPPY = "voice2" SAD = "voice3" ANGRY = "voice4" EXCITED = "voice5" CALM = "voice6" # Add more voices as supported by Pi.ai class PiAIClient: def __init__(self, headless: bool = False, download_dir: str = "/tmp/Audio"): self.headless = headless self.download_dir = "/tmp/Audio" self.playwright = None self.browser = None self.context = None self.page = None self.initialized = False # Define actions with their selectors and corresponding handler methods self.actions = [ { "selector": 'textarea[placeholder="Talk with Pi"]', "handler": self.send_chat_message, "description": "Chat input detected, sending message.", "break_after": True, # Indicates to break the loop after sending the message }, { "selector": 'button:has-text("I’ll do it later")', "handler": self.click_element, "description": "'I’ll do it later' button found, clicking it.", }, { "selector": 'button:has-text("Next")', "handler": self.click_element, "description": "'Next' button found, clicking it.", }, { "selector": 'textarea[placeholder="Your first name"]', "handler": self.fill_name, "description": "Name input detected, filling it.", }, ] # Regular expression to extract 'sid' values from the response self.sid_regex = re.compile(r'"sid":"([\w\-]+)"') # Set to keep track of processed sids to avoid duplicates self.processed_sids = set() # Directory to store downloaded audios self.download_dir = download_dir self.ensure_download_directory() # Semaphore to limit concurrent downloads (optional) self.semaphore = asyncio.Semaphore(5) # Adjust the number as needed # Rate limiting attributes self.rate_limit_until = None # Timestamp until which the bot should wait self.rate_limit_lock = asyncio.Lock() # To prevent race conditions # Mapping from sid to (Future, VoiceType) self.sid_futures = asyncio.Queue() def ensure_download_directory(self): """Ensure that the downloads directory exists.""" if not os.path.exists(self.download_dir): os.makedirs(self.download_dir) logger.info( f"Created directory '{self.download_dir}' for storing downloads." ) else: logger.info(f"Directory '{self.download_dir}' already exists.") async def setup(self): """Initialize Playwright, launch the browser with a persistent context, and create a new page.""" self.playwright = await async_playwright().start() # Specify the path for the user data directory user_data_dir = os.path.join(os.getcwd(), "user_data") if not os.path.exists(user_data_dir): os.makedirs(user_data_dir) logger.info(f"Created user data directory at '{user_data_dir}'.") else: logger.info(f"Using existing user data directory at '{user_data_dir}'.") # Launch a persistent context self.context = await self.playwright.chromium.launch_persistent_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/114.0.0.0 Safari/537.36" ), user_data_dir=user_data_dir, # Persistent storage directory headless=self.headless, args=["--no-sandbox"], # Optional: Add any Chromium args if needed ) # Create a new page within the persistent context self.page = await self.context.new_page() # Set up request and response interception self.page.on("request", self.handle_request) self.page.on("response", self.handle_response) await self.navigate("https://pi.ai/talk") # Start the monitoring task asyncio.create_task(self.monitor_page_and_act()) self.initialized = True async def navigate(self, url: str): """Navigate to the specified URL and wait for the page to load.""" await self.page.goto(url) await self.page.wait_for_load_state("networkidle") logger.info(f"Navigated to {url}") async def monitor_page_and_act(self): """Continuously monitor the page and perform actions based on the detected elements.""" counter = 0 while True: try: # Check for rate limiting before performing any actions if self.is_rate_limited(): wait_seconds = ( self.rate_limit_until - datetime.utcnow() ).total_seconds() wait_seconds = max(wait_seconds, 0) logger.warning( f"Rate limited. Waiting for {wait_seconds:.2f} seconds before retrying." ) await asyncio.sleep(wait_seconds) continue # After waiting, re-enter the loop action_performed = False for action in self.actions: # content = await self.page.content() # print(content) if await self.page.is_visible(action["selector"]): logger.info(action["description"]) await action["handler"](action["selector"]) action_performed = True if action.get("break_after"): action_performed = ( False # Continue monitoring after sending the message ) break # Exit the for-loop to allow handling other tasks if not action_performed: logger.info( "No matching state detected. Navigating to /talk or /discover route." ) if counter % 5 == 0: await self.navigate_to_route("/discover") logger.info("Navigated to /discover route.") counter = 0 else: await self.navigate_to_route("/talk") logger.info("Navigated to /talk route.") counter += 1 # Wait for a short period before the next check await asyncio.sleep(2) except Exception as e: logger.error(f"Error during monitoring: {e}") await asyncio.sleep( 2 ) # Prevent tight loop in case of continuous errors def is_rate_limited(self): """Check if the bot is currently rate-limited.""" if self.rate_limit_until and datetime.utcnow() < self.rate_limit_until: return True return False async def navigate_to_route(self, route): """Navigate to the specified route.""" try: current_url = self.page.url # Check if already on the specified route to prevent unnecessary navigation if not current_url.endswith(route): new_url = self.construct_route_url(current_url, route) await self.navigate(new_url) else: logger.info(f"Already on the {route} route.") except Exception as e: logger.error(f"Error navigating to {route} route: {e}") def construct_route_url(self, current_url, route): """Construct the new URL for the specified route.""" # Modify this function to fit your URL structure parsed_url = urlparse(current_url) # Replace the path with the desired route new_url = parsed_url._replace(path=route).geturl() logger.info(f"Constructed new URL: {new_url}") return new_url async def click_element(self, selector: str): """Wait for an element to be visible and click it.""" try: await self.page.wait_for_selector(selector, timeout=3000) await self.page.click(selector) logger.info(f"Clicked element: {selector}") except Exception as e: logger.error(f"Error clicking element {selector}: {e}") async def fill_name(self, selector: str): """Fill in the name input field and submit.""" try: name = "Cassandra" await self.page.fill(selector, name) await self.page.click('button[aria-label="Submit text"]') logger.info(f"Name '{name}' submitted") except Exception as e: logger.error(f"Error submitting name: {e}") await self.handle_send_failure() async def send_chat_message(self, selector: str): """Send a chat message in the chat input field.""" try: await self.page.fill(selector, self.user_input) await self.page.click('button[aria-label="Submit text"]') logger.info("Chat message submitted") except Exception as e: logger.error(f"Could not send chat message: {e}") await self.handle_send_failure() async def handle_send_failure(self): """Handle failure in sending messages by navigating to /talk or /discover.""" try: # Attempt to navigate to /talk await self.navigate_to_route("/talk") logger.info("Navigated to /talk route after failing to send message.") except Exception: try: # If navigating to /talk fails, navigate to /discover await self.navigate_to_route("/discover") logger.info( "Navigated to /discover route after failing to send message." ) except Exception as e2: logger.error(f"Failed to navigate after send_message failure: {e2}") async def handle_request(self, request: Request): """Handle and log network requests.""" # Log all requests at DEBUG level logger.debug(f"Request: {request.method} {request.url}") async def handle_response(self, response: Response): """Handle and log network responses, extracting 'sid's.""" url = response.url if "/api/chat" in url and response.request.method == "POST": logger.info(f"Handling response for: {url}") try: response_status = response.status response_text = await asyncio.wait_for(response.text(), timeout=5) logger.info(f"Response received from {url}: {response_text}") if response_status == 429: # Handle rate limiting based on status code logger.warning("Received 429 Too Many Requests.") retry_after = response.headers.get("Retry-After") if retry_after: wait_seconds = int(retry_after) else: wait_seconds = ( 60 # Default wait time if Retry-After not provided ) await self.trigger_rate_limit(wait_seconds) return # Attempt to parse the response as JSON if "error" in response_text and "Too Many Requests" in response_text: logger.warning("Received error response: Too Many Requests.") await self.trigger_rate_limit(60) # Default wait time return # Extract 'sid's using regex sids = self.sid_regex.findall(response_text) if sids: logger.info(f"Extracted 'sid's: {sids}") for sid in sids: if sid not in self.processed_sids: self.processed_sids.add(sid) logger.info(f"Processing sid: {sid}") # If there are pending say requests, assign this sid to the first one if not self.sid_futures.empty(): future, voice = await self.sid_futures.get() asyncio.create_task( self.process_sid(sid, voice, future) ) else: # No pending say requests, process with default voice or skip asyncio.create_task( self.process_sid(sid, VoiceType.NEUTRAL.value, None) ) break else: logger.info("No 'sid's found in the response.") except asyncio.TimeoutError: logger.warning( "Timed out waiting for the response body (possibly streaming)." ) except Exception as e: logger.error(f"Error processing response: {e}") elif "/api/chat/voice" in url: # Handle audio responses directly if needed pass # Currently handled in process_sid async def trigger_rate_limit(self, wait_seconds: int): """Trigger rate limiting by setting the rate_limit_until timestamp.""" async with self.rate_limit_lock: if ( not self.rate_limit_until or datetime.utcnow() + timedelta(seconds=wait_seconds) > self.rate_limit_until ): self.rate_limit_until = datetime.utcnow() + timedelta( seconds=wait_seconds ) logger.warning( f"Rate limited. Will resume after {self.rate_limit_until} UTC." ) else: self.rate_limit_until += timedelta(seconds=wait_seconds) logger.warning("Already rate limited. Extending the wait time.") async def process_sid(self, sid: str, voice: str, future: asyncio.Future): """Download the TTS audio using the sid and specified voice.""" async with self.semaphore: try: logger.info(f"Processing sid: {sid} with voice: {voice}") url = f"https://pi.ai/api/chat/voice?mode=eager&voice={voice}&messageSid={sid}" logger.info(f"Initiating download from URL: {url}") # Open a new page (tab) new_page = await self.context.new_page() # Set up download handler new_page.on("download", self.handle_download) # Navigate to the URL await new_page.goto(url) logger.info(f"Opened URL: {url}") # Create and click the anchor tag via JavaScript await new_page.evaluate( f""" (function() {{ var link = document.createElement('a'); link.href = "{url}"; link.download = "{sid}.mp3"; document.body.appendChild(link); link.click(); document.body.removeChild(link); }})(); """ ) logger.info(f"Triggered download for sid: {sid}") filename = f"{sid}_{voice.lower()}.mp3" file_path = os.path.join(self.download_dir, filename) # Start the download # Wait for the download to start await asyncio.sleep(2) # Close the new page await new_page.close() # If a future was provided, set its result if future: future.set_result(file_path) except Exception as e: logger.error(f"Error processing sid {sid}: {e}") if future and not future.done(): future.set_exception(e) async def handle_download(self, download: Download): """Handle the download event and save the file.""" try: # Define the path to save the download filename = download.suggested_filename or "audio.mp3" download_path = os.path.join(self.download_dir, filename) # Save the downloaded file await download.save_as(download_path) logger.info(f"Downloaded audio to {download_path}") except Exception as e: logger.error(f"Error downloading audio: {e}") async def close(self): """Close the browser and Playwright.""" if self.context: await self.context.close() if self.playwright: await self.playwright.stop() logger.info("Browser closed") async def say(self, message: str, voice: str) -> str: """ Send a message and retrieve the path to the downloaded TTS audio. :param message: The message to send. :param voice: The emotional voice type to use. :return: The file path of the downloaded audio. """ # Create a Future to wait for the audio download future = asyncio.get_event_loop().create_future() # Put the future and voice into the queue await self.sid_futures.put((future, voice)) # Send the message self.user_input = message await self.send_message(message) # Wait for the Future to be set with the audio path try: audio_path = await asyncio.wait_for( future, timeout=60 ) # Adjust timeout as needed return audio_path except asyncio.TimeoutError: logger.error("Timeout while waiting for audio download.") return "" async def send_message( self, message: str, retry_count: int = 3, retry_delay: int = 60 ): """ Send a message through the chat interface with retry logic. :param message: The message to send. :param retry_count: Number of times to retry on failure. :param retry_delay: Seconds to wait before retrying. """ attempt = 0 while attempt < retry_count: try: # Check if currently rate limited if self.is_rate_limited(): wait_seconds = ( self.rate_limit_until - datetime.utcnow() ).total_seconds() wait_seconds = max(wait_seconds, 0) logger.warning( f"Currently rate limited. Waiting for {wait_seconds:.2f} seconds before retrying." ) await asyncio.sleep(wait_seconds) self.user_input = message # Update the user_input attribute await self.page.fill( 'textarea[placeholder="Talk with Pi"]', self.user_input ) await self.page.click('button[aria-label="Submit text"]') logger.info("Chat message submitted") return # Success, exit the method except Exception as e: logger.error(f"Could not send chat message: {e}") attempt += 1 if attempt < retry_count: logger.info( f"Retrying to send message in {retry_delay} seconds... (Attempt {attempt}/{retry_count})" ) await asyncio.sleep(retry_delay) else: logger.error( "Max retry attempts reached. Failed to send the message." ) await self.handle_send_failure() # If all retries fail, handle the failure await self.handle_send_failure() import asyncio # async def main(): # # Initialize the PiBot # bot = PiAIClient(headless=True) # try: # await bot.setup() # # await bot.navigate("https://pi.ai/talk") # # Example usage of the say method # audio_path_neutral = await bot.say("Hello baby.", voice=VoiceType.NEUTRAL.value) # print(f"Neutral Audio Path: {audio_path_neutral}") # await asyncio.sleep(5) # audio_path_happy = await bot.say( # "I'm so happy to see you!", voice=VoiceType.HAPPY.value # ) # print(f"Happy Audio Path: {audio_path_happy}") # await asyncio.sleep(5) # audio_path_sad = await bot.say( # "I'm feeling a bit down today.", voice=VoiceType.SAD.value # ) # print(f"Sad Audio Path: {audio_path_sad}") # # You can add more messages with different emotions as needed # finally: # await bot.close() # if __name__ == "__main__": # asyncio.run(main())