Spaces:
Paused
Paused
import gradio as gr | |
import fastapi | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import HTMLResponse, FileResponse | |
from fastapi import FastAPI, Request, Form, UploadFile, File | |
import os | |
import time | |
import logging | |
import json | |
import shutil | |
import uvicorn | |
from pathlib import Path | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Create the FastAPI app | |
app = FastAPI(title="AGI Telecom POC") | |
# Create static directory if it doesn't exist | |
static_dir = Path("static") | |
static_dir.mkdir(exist_ok=True) | |
# Copy index.html from templates to static if it doesn't exist | |
html_template = Path("templates/index.html") | |
static_html = static_dir / "index.html" | |
if html_template.exists() and not static_html.exists(): | |
shutil.copy(html_template, static_html) | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Mock data and functions to simulate the real implementation | |
SESSIONS = {} | |
def generate_session_id(): | |
"""Generate a unique session ID.""" | |
import uuid | |
return str(uuid.uuid4()) | |
def mock_transcribe(audio_bytes): | |
"""Mock function to simulate speech-to-text.""" | |
# In production, this would use Whisper | |
logger.info("Transcribing audio...") | |
time.sleep(1) # Simulate processing time | |
return "This is a mock transcription of the audio." | |
def mock_agent_response(text, session_id="default"): | |
"""Mock function to simulate agent reasoning.""" | |
# In production, this would use a real LLM | |
logger.info(f"Processing query: {text}") | |
time.sleep(1.5) # Simulate processing time | |
# Simple keyword-based responses | |
if "5g" in text.lower(): | |
return "5G is the fifth generation of cellular networks, offering higher speeds, lower latency, and more capacity than previous generations." | |
elif "telecom" in text.lower(): | |
return "Telecommunications (telecom) refers to the exchange of information over significant distances by electronic means." | |
elif "webrtc" in text.lower(): | |
return "WebRTC (Web Real-Time Communication) is a free, open-source project that enables web browsers and mobile applications to have real-time communication via simple APIs." | |
else: | |
return "I'm an AI assistant specialized in telecom topics. Feel free to ask me about 5G, network technologies, or telecommunications in general." | |
def mock_synthesize_speech(text): | |
"""Mock function to simulate text-to-speech.""" | |
# In production, this would use a real TTS engine | |
logger.info("Synthesizing speech...") | |
time.sleep(0.5) # Simulate processing time | |
# Create a dummy audio file | |
import numpy as np | |
from scipy.io.wavfile import write | |
sample_rate = 22050 | |
duration = 2 # seconds | |
t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) | |
audio = np.sin(2 * np.pi * 440 * t) * 0.3 | |
output_file = "temp_audio.wav" | |
write(output_file, sample_rate, audio.astype(np.float32)) | |
with open(output_file, "rb") as f: | |
audio_bytes = f.read() | |
# Clean up | |
os.remove(output_file) | |
return audio_bytes | |
# Routes for the API | |
async def root(): | |
"""Serve the main UI.""" | |
return FileResponse("static/index.html") | |
async def transcribe(file: UploadFile = File(...)): | |
"""Transcribe audio to text.""" | |
try: | |
audio_bytes = await file.read() | |
text = mock_transcribe(audio_bytes) | |
return {"transcription": text} | |
except Exception as e: | |
logger.error(f"Transcription error: {str(e)}") | |
return {"error": f"Failed to transcribe audio: {str(e)}"} | |
async def query_agent(input_text: str = Form(...), session_id: str = Form("default")): | |
"""Process a text query with the agent.""" | |
try: | |
response = mock_agent_response(input_text, session_id) | |
return {"response": response} | |
except Exception as e: | |
logger.error(f"Query error: {str(e)}") | |
return {"error": f"Failed to process query: {str(e)}"} | |
async def speak(text: str = Form(...)): | |
"""Convert text to speech.""" | |
try: | |
audio_bytes = mock_synthesize_speech(text) | |
return FileResponse( | |
"temp_audio.wav", | |
media_type="audio/wav", | |
filename="response.wav" | |
) | |
except Exception as e: | |
logger.error(f"Speech synthesis error: {str(e)}") | |
return {"error": f"Failed to synthesize speech: {str(e)}"} | |
async def create_session(): | |
"""Create a new session.""" | |
session_id = generate_session_id() | |
SESSIONS[session_id] = {"created_at": time.time()} | |
return {"session_id": session_id} | |
# Gradio interface | |
with gr.Blocks(title="AGI Telecom POC", css="footer {visibility: hidden}") as interface: | |
gr.Markdown("# AGI Telecom POC Demo") | |
gr.Markdown("This is a demonstration of the AGI Telecom Proof of Concept. The full interface is available via the direct API.") | |
with gr.Row(): | |
with gr.Column(): | |
# Input components | |
audio_input = gr.Audio(label="Voice Input", type="filepath") | |
text_input = gr.Textbox(label="Text Input", placeholder="Type your message here...", lines=2) | |
# Session management | |
session_id = gr.Textbox(label="Session ID", value="default") | |
new_session_btn = gr.Button("New Session") | |
# Action buttons | |
with gr.Row(): | |
transcribe_btn = gr.Button("Transcribe Audio") | |
query_btn = gr.Button("Send Query") | |
speak_btn = gr.Button("Speak Response") | |
with gr.Column(): | |
# Output components | |
transcription_output = gr.Textbox(label="Transcription", lines=2) | |
response_output = gr.Textbox(label="Agent Response", lines=5) | |
audio_output = gr.Audio(label="Voice Response", autoplay=True) | |
# Status and info | |
status_output = gr.Textbox(label="Status", value="Ready") | |
# Link components with functions | |
def update_session(): | |
new_id = generate_session_id() | |
status = f"Created new session: {new_id}" | |
return new_id, status | |
new_session_btn.click( | |
update_session, | |
outputs=[session_id, status_output] | |
) | |
def process_audio(audio_path, session): | |
if not audio_path: | |
return "No audio provided", "", None, "Error: No audio input" | |
try: | |
with open(audio_path, "rb") as f: | |
audio_bytes = f.read() | |
# Transcribe | |
text = mock_transcribe(audio_bytes) | |
# Get response | |
response = mock_agent_response(text, session) | |
# Synthesize | |
audio_bytes = mock_synthesize_speech(response) | |
temp_file = "temp_response.wav" | |
with open(temp_file, "wb") as f: | |
f.write(audio_bytes) | |
return text, response, temp_file, "Processed successfully" | |
except Exception as e: | |
logger.error(f"Error: {str(e)}") | |
return "", "", None, f"Error: {str(e)}" | |
transcribe_btn.click( | |
lambda audio_path: mock_transcribe(open(audio_path, "rb").read()) if audio_path else "No audio provided", | |
inputs=[audio_input], | |
outputs=[transcription_output] | |
) | |
query_btn.click( | |
lambda text, session: mock_agent_response(text, session), | |
inputs=[text_input, session_id], | |
outputs=[response_output] | |
) | |
speak_btn.click( | |
lambda text: "temp_response.wav" if mock_synthesize_speech(text) else None, | |
inputs=[response_output], | |
outputs=[audio_output] | |
) | |
# Full process | |
audio_input.change( | |
process_audio, | |
inputs=[audio_input, session_id], | |
outputs=[transcription_output, response_output, audio_output, status_output] | |
) | |
# Mount Gradio app | |
app = gr.mount_gradio_app(app, interface, path="/gradio") | |
# Run the app | |
if __name__ == "__main__": | |
# Check if running on HF Spaces | |
if os.environ.get("SPACE_ID"): | |
# Running on HF Spaces - use their port | |
port = int(os.environ.get("PORT", 7860)) | |
uvicorn.run(app, host="0.0.0.0", port=port) | |
else: | |
# Running locally | |
uvicorn.run(app, host="0.0.0.0", port=8000) |