Spaces:
Runtime error
Runtime error
# ============================================================================= | |
# DON'T STEAL THE FREE CODE OF DEVS! Use it for free an do not touch credits! | |
# If you steal this code, in the future you will pay for apps like this! | |
# A bit of respect goes a long way – all rights reserved under German law. | |
# Copyright Volkan Kücükbudak https://github.com/volkansah | |
# Repo URL: https://github.com/AiCodeCraft | |
# ============================================================================= | |
import streamlit as st | |
import os | |
import json | |
import datetime | |
import openai | |
from datetime import timedelta | |
import logging | |
from datasets import load_dataset, Dataset, concatenate_datasets | |
# ------------------ Logging konfigurieren ------------------ | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
logger.info("Starte App mit HF-Dataset Memory...") | |
# ------------------ Hugging Face Token laden ------------------ | |
HF_TOKEN_MEMORY = os.getenv('HF_TOKEN_MEMORY', '').strip() | |
if HF_TOKEN_MEMORY: | |
logger.info("Hugging Face Token gefunden.") | |
else: | |
logger.warning("Kein Hugging Face Token gefunden. Falls benötigt, bitte setzen!") | |
# ------------------ Einstellungen für das Memory-Dataset ------------------ | |
DATASET_REPO = "AiCodeCarft/customer_memory" | |
def load_memory_dataset(): | |
try: | |
ds = load_dataset(DATASET_REPO, split="train") | |
logger.info("Dataset erfolgreich vom HF Hub geladen.") | |
except Exception as e: | |
logger.info("Kein Dataset gefunden. Erstelle ein neues Dataset...") | |
data = {"user_id": [], "query": [], "response": []} | |
ds = Dataset.from_dict(data) | |
ds.push_to_hub(DATASET_REPO) | |
logger.info("Neues Dataset erfolgreich erstellt und gepusht.") | |
return ds | |
def add_to_memory(user_id, query, response): | |
ds = load_memory_dataset() | |
new_entry = Dataset.from_dict({ | |
"user_id": [user_id], | |
"query": [query], | |
"response": [response] | |
}) | |
updated_ds = concatenate_datasets([ds, new_entry]) | |
updated_ds.push_to_hub(DATASET_REPO) | |
logger.info("Memory-Dataset erfolgreich aktualisiert.") | |
def get_memory(user_id): | |
ds = load_memory_dataset() | |
filtered_ds = ds.filter(lambda x: x["user_id"] == user_id) | |
logger.info(f"Memory für User {user_id} abgerufen. {len(filtered_ds)} Einträge gefunden.") | |
return filtered_ds | |
# ------------------ Streamlit App UI ------------------ | |
st.title("AI Customer Support Agent with Memory 🛒") | |
st.caption("Chat with a customer support assistant who remembers your past interactions.") | |
# OpenAI API Key Eingabe oben in der Haupt-UI | |
openai_api_key = st.text_input("Enter OpenAI API Key", type="password") | |
if not openai_api_key: | |
st.warning("⚠️ Please enter your OpenAI API key to continue.") | |
st.stop() | |
openai.api_key = openai_api_key # Direktes Setzen des API-Keys | |
# ------------------ Klasse: CustomerSupportAIAgent ------------------ | |
class CustomerSupportAIAgent: | |
def __init__(self): | |
self.client = openai | |
self.app_id = "customer-support" | |
def handle_query(self, query, user_id=None): | |
try: | |
memories = get_memory(user_id) | |
context = "Relevant past information:\n" | |
if len(memories) > 0: | |
for entry in memories: | |
context += f"- Query: {entry['query']}\n Response: {entry['response']}\n" | |
full_prompt = f"{context}\nCustomer: {query}\nSupport Agent:" | |
# API-Key wird direkt übergeben | |
answer = self.client.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a customer support AI for TechGadgets.com."}, | |
{"role": "user", "content": full_prompt} | |
] | |
).choices[0].message.content | |
add_to_memory(user_id, query, answer) | |
return answer | |
except Exception as e: | |
logger.error(f"Fehler bei handle_query: {e}") | |
return "Sorry, I encountered an error. Please try again later." | |
# ------------------ Initialisierung ------------------ | |
support_agent = CustomerSupportAIAgent() | |
# ------------------ Sidebar-Komponenten ------------------ | |
with st.sidebar: | |
st.title("Customer ID") | |
customer_id = st.text_input("Enter your Customer ID", key="customer_id") | |
if 'customer_id' in st.session_state and st.session_state.customer_id: | |
if st.button("Generate Synthetic Data"): | |
# ... (deine bestehende Synthetic Data Logik) | |
# ------------------ Chat-History Management ------------------ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# ------------------ Chat-Eingabe ------------------ | |
if prompt := st.chat_input("How can I assist you today?"): | |
if not customer_id: | |
st.error("❌ Please enter a customer ID first") | |
st.stop() | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
with st.spinner("Generating response..."): | |
response = support_agent.handle_query(prompt, customer_id) | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
# ------------------ Nachrichten anzeigen ------------------ | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) |