# ============================================================================= # DON'T STEAL THE FREE CODE OF DEVS! Use it for free an do not touch credits! # If you steal this code, in the future you will pay for apps like this! # A bit of respect goes a long way – all rights reserved under German law. # Copyright Volkan Kücükbudak https://github.com/volkansah # Repo URL: https://github.com/AiCodeCraft # ============================================================================= import streamlit as st import os import json import datetime import openai from datetime import timedelta import logging from datasets import load_dataset, Dataset, concatenate_datasets # ------------------ Logging konfigurieren ------------------ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) logger.info("Starte App mit HF-Dataset Memory...") # ------------------ Hugging Face Token laden ------------------ HF_TOKEN_MEMORY = os.getenv('HF_TOKEN_MEMORY', '').strip() if HF_TOKEN_MEMORY: logger.info("Hugging Face Token gefunden.") else: logger.warning("Kein Hugging Face Token gefunden. Falls benötigt, bitte setzen!") # ------------------ Einstellungen für das Memory-Dataset ------------------ DATASET_REPO = "AiCodeCarft/customer_memory" def load_memory_dataset(): try: ds = load_dataset(DATASET_REPO, split="train") logger.info("Dataset erfolgreich vom HF Hub geladen.") except Exception as e: logger.info("Kein Dataset gefunden. Erstelle ein neues Dataset...") data = {"user_id": [], "query": [], "response": []} ds = Dataset.from_dict(data) ds.push_to_hub(DATASET_REPO) logger.info("Neues Dataset erfolgreich erstellt und gepusht.") return ds def add_to_memory(user_id, query, response): ds = load_memory_dataset() new_entry = Dataset.from_dict({ "user_id": [user_id], "query": [query], "response": [response] }) updated_ds = concatenate_datasets([ds, new_entry]) updated_ds.push_to_hub(DATASET_REPO) logger.info("Memory-Dataset erfolgreich aktualisiert.") def get_memory(user_id): ds = load_memory_dataset() filtered_ds = ds.filter(lambda x: x["user_id"] == user_id) logger.info(f"Memory für User {user_id} abgerufen. {len(filtered_ds)} Einträge gefunden.") return filtered_ds # ------------------ Streamlit App UI ------------------ st.title("AI Customer Support Agent with Memory 🛒") st.caption("Chat with a customer support assistant who remembers your past interactions.") # OpenAI API Key Eingabe oben in der Haupt-UI openai_api_key = st.text_input("Enter OpenAI API Key", type="password") if not openai_api_key: st.warning("⚠️ Please enter your OpenAI API key to continue.") st.stop() openai.api_key = openai_api_key # Direktes Setzen des API-Keys # ------------------ Klasse: CustomerSupportAIAgent ------------------ class CustomerSupportAIAgent: def __init__(self): self.client = openai self.app_id = "customer-support" def handle_query(self, query, user_id=None): try: memories = get_memory(user_id) context = "Relevant past information:\n" if len(memories) > 0: for entry in memories: context += f"- Query: {entry['query']}\n Response: {entry['response']}\n" full_prompt = f"{context}\nCustomer: {query}\nSupport Agent:" # API-Key wird direkt übergeben answer = self.client.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a customer support AI for TechGadgets.com."}, {"role": "user", "content": full_prompt} ] ).choices[0].message.content add_to_memory(user_id, query, answer) return answer except Exception as e: logger.error(f"Fehler bei handle_query: {e}") return "Sorry, I encountered an error. Please try again later." # ------------------ Initialisierung ------------------ support_agent = CustomerSupportAIAgent() # ------------------ Sidebar-Komponenten ------------------ with st.sidebar: st.title("Customer ID") customer_id = st.text_input("Enter your Customer ID", key="customer_id") if 'customer_id' in st.session_state and st.session_state.customer_id: if st.button("Generate Synthetic Data"): # ... (deine bestehende Synthetic Data Logik) # ------------------ Chat-History Management ------------------ if "messages" not in st.session_state: st.session_state.messages = [] # ------------------ Chat-Eingabe ------------------ if prompt := st.chat_input("How can I assist you today?"): if not customer_id: st.error("❌ Please enter a customer ID first") st.stop() st.session_state.messages.append({"role": "user", "content": prompt}) with st.spinner("Generating response..."): response = support_agent.handle_query(prompt, customer_id) st.session_state.messages.append({"role": "assistant", "content": response}) # ------------------ Nachrichten anzeigen ------------------ for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"])