Spaces:
Sleeping
Sleeping
import os | |
import json | |
from openai import OpenAI | |
from markdown import markdown | |
from bs4 import BeautifulSoup | |
# Define available models | |
AVAILABLE_MODELS = { | |
"DeepSeek V3 (Hyperbolic.xyz)": { | |
"model_name": "deepseek-ai/DeepSeek-V3", | |
"type": "hyperbolic" | |
}, | |
"Llama3.3-70b-Instruct (Hyperbolic.xyz)": { | |
"model_name": "meta-llama/Llama-3.3-70B-Instruct", | |
"type": "hyperbolic" | |
}, | |
#"DeepSeek V3 (HuggingFace.co)": { | |
# "model_name": "deepseek-ai/DeepSeek-V3", | |
# "type": "huggingface" | |
#}, | |
} | |
# Load email examples from JSON file | |
def load_examples(): | |
""" | |
Load example emails from a JSON file. | |
Returns a list of example dictionaries or a default empty example if loading fails. | |
""" | |
try: | |
with open("email_examples.json", "r") as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error loading examples: {e}") | |
# Return default empty examples if file can't be loaded | |
return [ | |
{"title": "No examples found", "preceding_conversation": "", "drafted_user_reply": ""} | |
] | |
# Set up the clients for API access | |
def setup_clients(): | |
""" | |
Initialize and configure API clients for different model providers. | |
Returns a dictionary of configured clients. | |
""" | |
clients = { | |
"hyperbolic": {"key": os.getenv('HYPERBOLIC_XYZ_KEY'), "endpoint": "https://api.hyperbolic.xyz/v1"}, | |
#"huggingface": {"key": os.getenv('HF_KEY'), "endpoint": "https://huggingface.co/api/inference-proxy/together"}, | |
} | |
for client_type in clients: | |
clients[client_type]["client"] = OpenAI( | |
base_url=clients[client_type]["endpoint"], | |
api_key=clients[client_type]["key"] | |
) | |
return clients | |
def markdown_to_text(markdown_string): | |
# Convert Markdown to HTML | |
html = markdown(markdown_string) | |
# Parse HTML and extract text | |
soup = BeautifulSoup(html, features="html.parser") | |
return soup.get_text() | |
def load_email_guidelines(): | |
""" | |
Load email formatting guidelines from a text file. | |
Returns the content as a string or a default message if loading fails. | |
""" | |
try: | |
with open("email_guidelines.txt", "r") as f: | |
return f.read().strip() | |
except Exception as e: | |
print(f"Error loading email guidelines: {e}") | |
return "No guidelines available." | |
def create_email_format_prompt(email_guidelines, identity=None): | |
""" | |
Create the prompt template that will be sent to the language model. | |
Incorporates the provided email guidelines and placeholders for user input. | |
Args: | |
email_guidelines: The guidelines text to include in the prompt | |
identity: Optional identity perspective to write from | |
""" | |
identity_section = "" | |
if identity and identity.strip(): | |
identity_section = f""" | |
<IDENTITY> | |
The email is being written from the perspective of: {identity} | |
</IDENTITY> | |
""" | |
return f"""<EMAIL_GUIDELINES> | |
{email_guidelines} | |
</EMAIL_GUIDELINES> | |
{identity_section} | |
========================= | |
Your Task: | |
1. Review the user's draft email | |
2. Apply the style, formatting, and content guidelines above to improve the email | |
3. Provide a polished, ready-to-send version | |
Previous email thread: | |
<PREVIOUS_EMAILS> | |
{{preceding_conversation}} | |
</PREVIOUS_EMAILS> | |
User's draft reply: | |
<USER_EMAIL_DRAFT> | |
{{drafted_user_reply}} | |
</USER_EMAIL_DRAFT> | |
Rewrite the draft to follow the guidelines while preserving the user's intent. | |
Begin with EMAIL_STARTS_HERE followed by the revised email. | |
If you have suggestions, add them after SUGGESTIONS_START_HERE. | |
Don't add any information not found in the original draft or thread. Don't create new URLs, phone numbers, attachments, or subject lines. Start directly with the email body after EMAIL_STARTS_HERE. Include the user's signature if present in their draft. Don't use markdown formatting.""" | |
def process_email( | |
preceding_conversation, | |
drafted_user_reply, | |
session_password, | |
system_message, | |
model_choice, | |
max_tokens, | |
temperature, | |
top_p, | |
password, | |
clients, | |
custom_guidelines=None, | |
identity=None | |
): | |
""" | |
Process the email and return a formatted response and suggestions. | |
Args: | |
preceding_conversation: The email thread context | |
drafted_user_reply: The user's draft email reply | |
session_password: The current session password | |
system_message: System instructions for the AI model | |
model_choice: The selected AI model | |
max_tokens: Maximum tokens for the response | |
temperature: Temperature setting for response generation | |
top_p: Top-p setting for response generation | |
password: The actual password to verify against | |
clients: Dictionary of configured API clients | |
custom_guidelines: Optional custom guidelines to use instead of default | |
identity: Optional identity perspective to write from | |
Returns: | |
Tuple of (formatted_email, suggestions) | |
""" | |
# Re-check the session password | |
if session_password != password: | |
return "Error: Invalid session password. Please refresh the page and enter the correct password.", "" | |
if model_choice not in AVAILABLE_MODELS: | |
return "Error: Invalid model selection.", "" | |
# Use custom guidelines if provided, otherwise load from file | |
if custom_guidelines is None or custom_guidelines.strip() == "": | |
email_guidelines = load_email_guidelines() | |
else: | |
email_guidelines = custom_guidelines | |
# Get the email format prompt template | |
email_format_prompt = create_email_format_prompt(email_guidelines, identity) | |
# Format the prompt using the template | |
formatted_prompt = email_format_prompt.format( | |
preceding_conversation=preceding_conversation, | |
drafted_user_reply=drafted_user_reply, | |
) | |
messages = [{"role": "system", "content": system_message}] | |
messages.append({"role": "user", "content": formatted_prompt}) | |
selected_client = clients[AVAILABLE_MODELS[model_choice]["type"]]["client"] | |
try: | |
response = selected_client.chat.completions.create( | |
model=AVAILABLE_MODELS[model_choice]["model_name"], | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
) | |
# Split the response based on EMAIL_STARTS_HERE and SUGGESTIONS_START_HERE | |
full_response = response.choices[0].message.content | |
#remove markdown formatting | |
full_response = markdown_to_text(full_response) | |
# First check if EMAIL_STARTS_HERE is in the response | |
if "EMAIL_STARTS_HERE" in full_response: | |
email_parts = full_response.split("EMAIL_STARTS_HERE", 1) | |
email_content = email_parts[1].strip() | |
# Then check if there are suggestions | |
if "SUGGESTIONS_START_HERE" in email_content: | |
parts = email_content.split("SUGGESTIONS_START_HERE", 1) | |
return parts[0].strip(), parts[1].strip() | |
else: | |
return email_content.strip(), "" | |
else: | |
# Fallback to original behavior if marker isn't found | |
if "SUGGESTIONS_START_HERE" in full_response: | |
parts = full_response.split("SUGGESTIONS_START_HERE", 1) | |
return parts[0].strip(), parts[1].strip() | |
else: | |
return full_response.strip(), "" | |
except Exception as e: | |
return f"Error: {str(e)}", "" |