Mbonea's picture
capcha
fc39edf
import asyncio
import os
from playwright.async_api import async_playwright, Page, Request, Response, Download
import re
import logging
from urllib.parse import urlparse
from datetime import datetime, timedelta
import enum
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
class VoiceType(enum.Enum):
NEUTRAL = "voice1"
HAPPY = "voice2"
SAD = "voice3"
ANGRY = "voice4"
EXCITED = "voice5"
CALM = "voice6"
# Add more voices as supported by Pi.ai
class PiAIClient:
def __init__(self, headless: bool = False, download_dir: str = "/tmp/Audio"):
self.headless = headless
self.download_dir = "/tmp/Audio"
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.initialized = False
# Define actions with their selectors and corresponding handler methods
self.actions = [
{
"selector": 'textarea[placeholder="Talk with Pi"]',
"handler": self.send_chat_message,
"description": "Chat input detected, sending message.",
"break_after": True, # Indicates to break the loop after sending the message
},
{
"selector": 'button:has-text("I’ll do it later")',
"handler": self.click_element,
"description": "'I’ll do it later' button found, clicking it.",
},
{
"selector": 'button:has-text("Next")',
"handler": self.click_element,
"description": "'Next' button found, clicking it.",
},
{
"selector": 'textarea[placeholder="Your first name"]',
"handler": self.fill_name,
"description": "Name input detected, filling it.",
},
]
# Regular expression to extract 'sid' values from the response
self.sid_regex = re.compile(r'"sid":"([\w\-]+)"')
# Set to keep track of processed sids to avoid duplicates
self.processed_sids = set()
# Directory to store downloaded audios
self.download_dir = download_dir
self.ensure_download_directory()
# Semaphore to limit concurrent downloads (optional)
self.semaphore = asyncio.Semaphore(5) # Adjust the number as needed
# Rate limiting attributes
self.rate_limit_until = None # Timestamp until which the bot should wait
self.rate_limit_lock = asyncio.Lock() # To prevent race conditions
# Mapping from sid to (Future, VoiceType)
self.sid_futures = asyncio.Queue()
def ensure_download_directory(self):
"""Ensure that the downloads directory exists."""
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)
logger.info(
f"Created directory '{self.download_dir}' for storing downloads."
)
else:
logger.info(f"Directory '{self.download_dir}' already exists.")
async def setup(self):
"""Initialize Playwright, launch the browser with a persistent context, and create a new page."""
self.playwright = await async_playwright().start()
# Specify the path for the user data directory
user_data_dir = os.path.join(os.getcwd(), "user_data")
if not os.path.exists(user_data_dir):
os.makedirs(user_data_dir)
logger.info(f"Created user data directory at '{user_data_dir}'.")
else:
logger.info(f"Using existing user data directory at '{user_data_dir}'.")
# Launch a persistent context
self.context = await self.playwright.chromium.launch_persistent_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/114.0.0.0 Safari/537.36"
),
user_data_dir=user_data_dir, # Persistent storage directory
headless=self.headless,
args=["--no-sandbox"], # Optional: Add any Chromium args if needed
)
# Create a new page within the persistent context
self.page = await self.context.new_page()
# Set up request and response interception
self.page.on("request", self.handle_request)
self.page.on("response", self.handle_response)
await self.navigate("https://pi.ai/talk")
# Start the monitoring task
asyncio.create_task(self.monitor_page_and_act())
self.initialized = True
async def navigate(self, url: str):
"""Navigate to the specified URL and wait for the page to load."""
await self.page.goto(url)
await self.page.wait_for_load_state("networkidle")
logger.info(f"Navigated to {url}")
async def monitor_page_and_act(self):
"""Continuously monitor the page and perform actions based on the detected elements."""
counter = 0
while True:
try:
# Check for rate limiting before performing any actions
if self.is_rate_limited():
wait_seconds = (
self.rate_limit_until - datetime.utcnow()
).total_seconds()
wait_seconds = max(wait_seconds, 0)
logger.warning(
f"Rate limited. Waiting for {wait_seconds:.2f} seconds before retrying."
)
await asyncio.sleep(wait_seconds)
continue # After waiting, re-enter the loop
action_performed = False
for action in self.actions:
# content = await self.page.content()
# print(content)
if await self.page.is_visible(action["selector"]):
logger.info(action["description"])
await action["handler"](action["selector"])
action_performed = True
if action.get("break_after"):
action_performed = (
False # Continue monitoring after sending the message
)
break # Exit the for-loop to allow handling other tasks
if not action_performed:
logger.info(
"No matching state detected. Navigating to /talk or /discover route."
)
if counter % 5 == 0:
await self.navigate_to_route("/discover")
logger.info("Navigated to /discover route.")
counter = 0
else:
await self.navigate_to_route("/talk")
logger.info("Navigated to /talk route.")
counter += 1
# Wait for a short period before the next check
await asyncio.sleep(2)
except Exception as e:
logger.error(f"Error during monitoring: {e}")
await asyncio.sleep(
2
) # Prevent tight loop in case of continuous errors
def is_rate_limited(self):
"""Check if the bot is currently rate-limited."""
if self.rate_limit_until and datetime.utcnow() < self.rate_limit_until:
return True
return False
async def navigate_to_route(self, route):
"""Navigate to the specified route."""
try:
current_url = self.page.url
# Check if already on the specified route to prevent unnecessary navigation
if not current_url.endswith(route):
new_url = self.construct_route_url(current_url, route)
await self.navigate(new_url)
else:
logger.info(f"Already on the {route} route.")
except Exception as e:
logger.error(f"Error navigating to {route} route: {e}")
def construct_route_url(self, current_url, route):
"""Construct the new URL for the specified route."""
# Modify this function to fit your URL structure
parsed_url = urlparse(current_url)
# Replace the path with the desired route
new_url = parsed_url._replace(path=route).geturl()
logger.info(f"Constructed new URL: {new_url}")
return new_url
async def click_element(self, selector: str):
"""Wait for an element to be visible and click it."""
try:
await self.page.wait_for_selector(selector, timeout=3000)
await self.page.click(selector)
logger.info(f"Clicked element: {selector}")
except Exception as e:
logger.error(f"Error clicking element {selector}: {e}")
async def fill_name(self, selector: str):
"""Fill in the name input field and submit."""
try:
name = "Cassandra"
await self.page.fill(selector, name)
await self.page.click('button[aria-label="Submit text"]')
logger.info(f"Name '{name}' submitted")
except Exception as e:
logger.error(f"Error submitting name: {e}")
await self.handle_send_failure()
async def send_chat_message(self, selector: str):
"""Send a chat message in the chat input field."""
try:
await self.page.fill(selector, self.user_input)
await self.page.click('button[aria-label="Submit text"]')
logger.info("Chat message submitted")
except Exception as e:
logger.error(f"Could not send chat message: {e}")
await self.handle_send_failure()
async def handle_send_failure(self):
"""Handle failure in sending messages by navigating to /talk or /discover."""
try:
# Attempt to navigate to /talk
await self.navigate_to_route("/talk")
logger.info("Navigated to /talk route after failing to send message.")
except Exception:
try:
# If navigating to /talk fails, navigate to /discover
await self.navigate_to_route("/discover")
logger.info(
"Navigated to /discover route after failing to send message."
)
except Exception as e2:
logger.error(f"Failed to navigate after send_message failure: {e2}")
async def handle_request(self, request: Request):
"""Handle and log network requests."""
# Log all requests at DEBUG level
logger.debug(f"Request: {request.method} {request.url}")
async def handle_response(self, response: Response):
"""Handle and log network responses, extracting 'sid's."""
url = response.url
if "/api/chat" in url and response.request.method == "POST":
logger.info(f"Handling response for: {url}")
try:
response_status = response.status
response_text = await asyncio.wait_for(response.text(), timeout=5)
logger.info(f"Response received from {url}: {response_text}")
if response_status == 429:
# Handle rate limiting based on status code
logger.warning("Received 429 Too Many Requests.")
retry_after = response.headers.get("Retry-After")
if retry_after:
wait_seconds = int(retry_after)
else:
wait_seconds = (
60 # Default wait time if Retry-After not provided
)
await self.trigger_rate_limit(wait_seconds)
return
# Attempt to parse the response as JSON
if "error" in response_text and "Too Many Requests" in response_text:
logger.warning("Received error response: Too Many Requests.")
await self.trigger_rate_limit(60) # Default wait time
return
# Extract 'sid's using regex
sids = self.sid_regex.findall(response_text)
if sids:
logger.info(f"Extracted 'sid's: {sids}")
for sid in sids:
if sid not in self.processed_sids:
self.processed_sids.add(sid)
logger.info(f"Processing sid: {sid}")
# If there are pending say requests, assign this sid to the first one
if not self.sid_futures.empty():
future, voice = await self.sid_futures.get()
asyncio.create_task(
self.process_sid(sid, voice, future)
)
else:
# No pending say requests, process with default voice or skip
asyncio.create_task(
self.process_sid(sid, VoiceType.NEUTRAL.value, None)
)
break
else:
logger.info("No 'sid's found in the response.")
except asyncio.TimeoutError:
logger.warning(
"Timed out waiting for the response body (possibly streaming)."
)
except Exception as e:
logger.error(f"Error processing response: {e}")
elif "/api/chat/voice" in url:
# Handle audio responses directly if needed
pass # Currently handled in process_sid
async def trigger_rate_limit(self, wait_seconds: int):
"""Trigger rate limiting by setting the rate_limit_until timestamp."""
async with self.rate_limit_lock:
if (
not self.rate_limit_until
or datetime.utcnow() + timedelta(seconds=wait_seconds)
> self.rate_limit_until
):
self.rate_limit_until = datetime.utcnow() + timedelta(
seconds=wait_seconds
)
logger.warning(
f"Rate limited. Will resume after {self.rate_limit_until} UTC."
)
else:
self.rate_limit_until += timedelta(seconds=wait_seconds)
logger.warning("Already rate limited. Extending the wait time.")
async def process_sid(self, sid: str, voice: str, future: asyncio.Future):
"""Download the TTS audio using the sid and specified voice."""
async with self.semaphore:
try:
logger.info(f"Processing sid: {sid} with voice: {voice}")
url = f"https://pi.ai/api/chat/voice?mode=eager&voice={voice}&messageSid={sid}"
logger.info(f"Initiating download from URL: {url}")
# Open a new page (tab)
new_page = await self.context.new_page()
# Set up download handler
new_page.on("download", self.handle_download)
# Navigate to the URL
await new_page.goto(url)
logger.info(f"Opened URL: {url}")
# Create and click the anchor tag via JavaScript
await new_page.evaluate(
f"""
(function() {{
var link = document.createElement('a');
link.href = "{url}";
link.download = "{sid}.mp3";
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
}})();
"""
)
logger.info(f"Triggered download for sid: {sid}")
filename = f"{sid}_{voice.lower()}.mp3"
file_path = os.path.join(self.download_dir, filename)
# Start the download
# Wait for the download to start
await asyncio.sleep(2)
# Close the new page
await new_page.close()
# If a future was provided, set its result
if future:
future.set_result(file_path)
except Exception as e:
logger.error(f"Error processing sid {sid}: {e}")
if future and not future.done():
future.set_exception(e)
async def handle_download(self, download: Download):
"""Handle the download event and save the file."""
try:
# Define the path to save the download
filename = download.suggested_filename or "audio.mp3"
download_path = os.path.join(self.download_dir, filename)
# Save the downloaded file
await download.save_as(download_path)
logger.info(f"Downloaded audio to {download_path}")
except Exception as e:
logger.error(f"Error downloading audio: {e}")
async def close(self):
"""Close the browser and Playwright."""
if self.context:
await self.context.close()
if self.playwright:
await self.playwright.stop()
logger.info("Browser closed")
async def say(self, message: str, voice: str) -> str:
"""
Send a message and retrieve the path to the downloaded TTS audio.
:param message: The message to send.
:param voice: The emotional voice type to use.
:return: The file path of the downloaded audio.
"""
# Create a Future to wait for the audio download
future = asyncio.get_event_loop().create_future()
# Put the future and voice into the queue
await self.sid_futures.put((future, voice))
# Send the message
self.user_input = message
await self.send_message(message)
# Wait for the Future to be set with the audio path
try:
audio_path = await asyncio.wait_for(
future, timeout=60
) # Adjust timeout as needed
return audio_path
except asyncio.TimeoutError:
logger.error("Timeout while waiting for audio download.")
return ""
async def send_message(
self, message: str, retry_count: int = 3, retry_delay: int = 60
):
"""
Send a message through the chat interface with retry logic.
:param message: The message to send.
:param retry_count: Number of times to retry on failure.
:param retry_delay: Seconds to wait before retrying.
"""
attempt = 0
while attempt < retry_count:
try:
# Check if currently rate limited
if self.is_rate_limited():
wait_seconds = (
self.rate_limit_until - datetime.utcnow()
).total_seconds()
wait_seconds = max(wait_seconds, 0)
logger.warning(
f"Currently rate limited. Waiting for {wait_seconds:.2f} seconds before retrying."
)
await asyncio.sleep(wait_seconds)
self.user_input = message # Update the user_input attribute
await self.page.fill(
'textarea[placeholder="Talk with Pi"]', self.user_input
)
await self.page.click('button[aria-label="Submit text"]')
logger.info("Chat message submitted")
return # Success, exit the method
except Exception as e:
logger.error(f"Could not send chat message: {e}")
attempt += 1
if attempt < retry_count:
logger.info(
f"Retrying to send message in {retry_delay} seconds... (Attempt {attempt}/{retry_count})"
)
await asyncio.sleep(retry_delay)
else:
logger.error(
"Max retry attempts reached. Failed to send the message."
)
await self.handle_send_failure()
# If all retries fail, handle the failure
await self.handle_send_failure()
import asyncio
# async def main():
# # Initialize the PiBot
# bot = PiAIClient(headless=True)
# try:
# await bot.setup()
# # await bot.navigate("https://pi.ai/talk")
# # Example usage of the say method
# audio_path_neutral = await bot.say("Hello baby.", voice=VoiceType.NEUTRAL.value)
# print(f"Neutral Audio Path: {audio_path_neutral}")
# await asyncio.sleep(5)
# audio_path_happy = await bot.say(
# "I'm so happy to see you!", voice=VoiceType.HAPPY.value
# )
# print(f"Happy Audio Path: {audio_path_happy}")
# await asyncio.sleep(5)
# audio_path_sad = await bot.say(
# "I'm feeling a bit down today.", voice=VoiceType.SAD.value
# )
# print(f"Sad Audio Path: {audio_path_sad}")
# # You can add more messages with different emotions as needed
# finally:
# await bot.close()
# if __name__ == "__main__":
# asyncio.run(main())