import os import gradio as gr import pandas as pd import openai from dotenv import load_dotenv # Load environment variables load_dotenv() # Set up OpenAI API key openai.api_key = os.getenv("OPENAI_API_KEY") # Simple database using pandas DataFrames class SimpleDatabase: def __init__(self): # Sample product data self.products = pd.DataFrame({ 'product_id': [1, 2, 3, 4, 5], 'name': ['Laptop', 'Smartphone', 'Headphones', 'Monitor', 'Keyboard'], 'category': ['Electronics', 'Electronics', 'Audio', 'Electronics', 'Accessories'], 'price': [1200, 800, 150, 300, 80], 'stock': [10, 25, 50, 15, 30] }) # Sample transactions data self.transactions = pd.DataFrame({ 'transaction_id': [101, 102, 103, 104, 105, 106, 107], 'product_id': [1, 2, 3, 1, 5, 2, 4], 'quantity': [1, 2, 3, 1, 2, 1, 2], 'date': ['2025-04-29', '2025-04-29', '2025-04-28', '2025-04-28', '2025-04-27', '2025-04-29', '2025-04-29'], 'revenue': [1200, 1600, 450, 1200, 160, 800, 600] }) def query_database(self, query_type, **kwargs): """Execute queries on the database based on query type""" if query_type == "product_info": if 'product_name' in kwargs: return self.products[self.products['name'].str.lower() == kwargs['product_name'].lower()] elif 'product_id' in kwargs: return self.products[self.products['product_id'] == kwargs['product_id']] else: return self.products elif query_type == "max_revenue_product": date_filter = kwargs.get('date', '2025-04-29') # Default to today # Group by product_id and calculate total revenue for the specified date daily_revenue = self.transactions[self.transactions['date'] == date_filter].groupby( 'product_id')['revenue'].sum().reset_index() if daily_revenue.empty: return "No sales data found for that date." # Find the product with max revenue max_revenue_product_id = daily_revenue.loc[daily_revenue['revenue'].idxmax()]['product_id'] max_revenue = daily_revenue.loc[daily_revenue['revenue'].idxmax()]['revenue'] # Get product details product_details = self.products[self.products['product_id'] == max_revenue_product_id].iloc[0] return { 'product_name': product_details['name'], 'revenue': max_revenue, 'date': date_filter } elif query_type == "inventory_check": product_name = kwargs.get('product_name') if product_name: product = self.products[self.products['name'].str.lower() == product_name.lower()] if not product.empty: return {'product': product_name, 'stock': product.iloc[0]['stock']} return f"Product '{product_name}' not found." return self.products[['name', 'stock']] return "Query type not supported" class QueryRouter: def __init__(self): """Initialize the query router""" pass def _classify_query(self, query): """Classify the query to determine which agent should handle it""" # Use OpenAI to classify the query response = openai.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": """ You are a query classifier for a shop assistant system. Classify customer queries into one of these categories: - max_revenue_product: Questions about which product generated the most revenue (today or on a specific date) - inventory_check: Questions about product availability or stock levels - product_info: Questions about product details, pricing, etc. - general_knowledge: Questions that require general knowledge not related to specific shop data Return ONLY the category as a single word without any explanation. """}, {"role": "user", "content": query} ], temperature=0 ) # Extract the query type from the response query_type = response.choices[0].message.content.strip().lower() return query_type def _extract_parameters(self, query, query_type): """Extract relevant parameters from the query based on query type""" # Use OpenAI to extract parameters prompt_content = f""" Extract parameters from this customer query: "{query}" Query type: {query_type} For max_revenue_product: - date (in YYYY-MM-DD format, extract "today" as today's date which is 2025-04-29) For inventory_check or product_info: - product_name (the name of the product being asked about) Return ONLY a valid JSON object with the extracted parameters, nothing else. Example: {{"product_name": "laptop"}} or {{"date": "2025-04-29"}} """ response = openai.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You extract parameters from customer queries for a shop assistant."}, {"role": "user", "content": prompt_content} ], temperature=0, response_format={"type": "json_object"} ) # Parse the JSON response import json try: parameters = json.loads(response.choices[0].message.content) return parameters except json.JSONDecodeError: return {} def _handle_general_knowledge(self, query): """Handle general knowledge queries using OpenAI""" response = openai.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": """ You are a helpful assistant for a shop. Answer the customer's question using your general knowledge. Keep answers brief and focused. """}, {"role": "user", "content": query} ], temperature=0.7, max_tokens=150 ) return response.choices[0].message.content def _format_response(self, query_type, data): """Format the response based on query type and data""" if query_type == "max_revenue_product": if isinstance(data, str): return data return f"The product with the highest revenue on {data['date']} is {data['product_name']} with ${data['revenue']} in sales." elif query_type == "inventory_check": if isinstance(data, str): return data if isinstance(data, dict) and 'product' in data: return f"We currently have {data['stock']} units of {data['product']} in stock." return "Here's our current inventory: " + ", ".join([f"{row['name']}: {row['stock']} units" for _, row in data.iterrows()]) elif query_type == "product_info": if data.empty: return "Product not found." if len(data) == 1: product = data.iloc[0] return f"{product['name']} ({product['category']}): ${product['price']}. We have {product['stock']} units in stock." return "Here are our products: " + ", ".join([f"{row['name']}: ${row['price']}" for _, row in data.iterrows()]) return str(data) def process(self, query, db): """Process the query and return a response""" # Classify the query query_type = self._classify_query(query) # If it's a general knowledge query, handle it differently if query_type == "general_knowledge": return self._handle_general_knowledge(query) # Extract parameters from the query parameters = self._extract_parameters(query, query_type) # Query the database result = db.query_database(query_type, **parameters) # Format the response response = self._format_response(query_type, result) return response # Initialize database and router db = SimpleDatabase() router = QueryRouter() def process_query(query): """Process the user query and return a response""" if not query.strip(): return "Please ask a question about our shop products or services." response = router.process(query, db) return response # Create Gradio interface demo = gr.Interface( fn=process_query, inputs=gr.Textbox( placeholder="Ask about product pricing, inventory, sales, or any other question...", label="Customer Query" ), outputs=gr.Textbox(label="Shop Assistant Response"), title="Shop Voice Box Assistant", description="Ask questions about products, inventory, sales, or general questions.", examples=[ ["What's the maximum revenue product today?"], ["How many laptops do we have in stock?"], ["Tell me about the smartphone."], ["What's the weather like today?"] ] ) # Launch the app if __name__ == "__main__": demo.launch()