File size: 5,032 Bytes
b12e5fe 0729c66 b12e5fe 52ec688 7598edf 6219b13 52ec688 0729c66 b12e5fe 6219b13 0729c66 dd5aa4f 7598edf dd5aa4f 6219b13 0729c66 7b770ed 52ec688 7b770ed b12e5fe 52ec688 dd5aa4f 0729c66 52ec688 0729c66 dd5aa4f 0729c66 dd5aa4f b12e5fe 7598edf 52ec688 7598edf 52ec688 7598edf b12e5fe 7598edf 52ec688 7598edf 0729c66 52ec688 6219b13 6e84209 e8bcf17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import os
import sqlite3
import requests
import openai
import gradio as gr
import asyncio
from gtts import gTTS
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
import csv
openai.api_key = os.getenv("OPENAI_API_KEY")
def init_db_from_csv(csv_path: str = "transactions.csv") -> None:
conn = sqlite3.connect("shop.db")
cur = conn.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS transactions (date TEXT, product TEXT, amount REAL)"
)
with open(csv_path, newline='') as f:
reader = csv.DictReader(f)
rows = [(row["date"], row["product"], float(row["amount"])) for row in reader]
cur.execute("DELETE FROM transactions")
cur.executemany(
"INSERT INTO transactions (date, product, amount) VALUES (?, ?, ?)", rows
)
conn.commit()
conn.close()
init_db_from_csv()
def db_agent(query: str) -> str:
try:
conn = sqlite3.connect("shop.db")
cur = conn.cursor()
cur.execute(
"""
SELECT product, SUM(amount) AS revenue
FROM transactions
WHERE date = date('now')
GROUP BY product
ORDER BY revenue DESC
LIMIT 1
"""
)
row = cur.fetchone()
if row:
return f"Top product today: {row[0]} with ₹{row[1]:,.2f}"
return "No transactions found for today."
except sqlite3.OperationalError as e:
return f"Database error: {e}. Please check 'transactions' table in shop.db."
def web_search_agent(query: str) -> str:
try:
resp = requests.get(
"https://serpapi.com/search",
params={"q": query, "api_key": os.getenv("SERPAPI_KEY")}
)
snippet = resp.json().get("organic_results", [{}])[0].get("snippet", "").strip()
if snippet:
return llm_agent(f"Summarize: {snippet}")
except Exception:
pass
return llm_agent(query)
def llm_agent(query: str) -> str:
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query},
],
temperature=0.2,
)
return response.choices[0].message.content.strip()
def stt_agent(audio_path: str) -> str:
with open(audio_path, "rb") as afile:
transcript = openai.audio.transcriptions.create(
model="whisper-1",
file=afile
)
return transcript.text.strip()
def tts_agent(text: str, lang: str = 'en') -> str:
tts = gTTS(text=text, lang=lang)
out_path = "response_audio.mp3"
tts.save(out_path)
return out_path
class State(TypedDict):
query: str
result: str
def route_fn(state: State) -> str:
q = state["query"].lower()
if any(k in q for k in ["max revenue", "revenue"]):
return "db"
if any(k in q for k in ["who", "what", "when", "where"]):
return "web"
return "llm"
def router_node(state: State) -> dict:
return {"query": state["query"]}
def db_node(state: State) -> dict:
return {"result": db_agent(state["query"]) }
def web_node(state: State) -> dict:
return {"result": web_search_agent(state["query"]) }
def llm_node(state: State) -> dict:
return {"result": llm_agent(state["query"]) }
builder = StateGraph(State)
builder.add_node("router", router_node)
builder.set_entry_point("router")
builder.set_conditional_entry_point(
route_fn,
path_map={"db": "db", "web": "web", "llm": "llm"}
)
builder.add_node("db", db_node)
builder.add_node("web", web_node)
builder.add_node("llm", llm_node)
builder.add_edge(START, "router")
builder.add_edge("db", END)
builder.add_edge("web", END)
builder.add_edge("llm", END)
graph = builder.compile()
def handle_query(audio_or_text: str):
is_audio = audio_or_text.endswith('.wav') or audio_or_text.endswith('.mp3')
if is_audio:
query = stt_agent(audio_or_text)
else:
query = audio_or_text
state = graph.invoke({"query": query})
response = state["result"]
if is_audio:
audio_path = tts_agent(response)
return response, audio_path
return response
with gr.Blocks() as demo:
gr.Markdown("## Shop Voice-Box Assistant (Speech In/Out)")
inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question or upload transactions.csv separately in root")
out_text = gr.Textbox(label="Answer (text)")
out_audio = gr.Audio(label="Answer (speech)")
submit = gr.Button("Submit")
gr.Examples(
examples=[
["What is the max revenue product today?"],
["Who invented the light bulb?"],
["Tell me a joke about cats."],
],
inputs=inp,
outputs=[out_text, out_audio],
)
submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio])
if __name__ == "__main__":
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)
|