Spaces:
Runtime error
Runtime error
import gradio as gr | |
import pdfplumber | |
import pytesseract | |
import faiss | |
import nltk | |
import spacy | |
import re | |
import numpy as np | |
import os | |
import speech_recognition as sr | |
from gtts import gTTS | |
from nltk.corpus import stopwords | |
from PIL import Image | |
from transformers import pipeline | |
from sentence_transformers import SentenceTransformer, util | |
# Download stopwords and load NLP tools | |
nltk.download("stopwords") | |
nlp = spacy.load("en_core_web_sm") | |
stop_words = set(stopwords.words("english")) | |
# Load AI models from Hugging Face | |
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") | |
summarizer = pipeline("summarization", model="t5-small") | |
embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
# FAISS index for fast search | |
dimension = 384 # Embedding size | |
index = faiss.IndexFlatL2(dimension) | |
# Dummy database of documents (for recommendations) | |
document_database = { | |
"Machine Learning Basics": "Introduction to ML, Supervised vs Unsupervised, Algorithms", | |
"Deep Learning Advanced": "Neural Networks, CNN, RNN, Transformers", | |
"Data Science Fundamentals": "Data Preprocessing, Feature Engineering, Statistics", | |
"AI in Healthcare": "Medical Image Analysis, AI in Diagnosis, Predictive Analytics", | |
"Blockchain Technology": "Decentralized Networks, Smart Contracts, Cryptography" | |
} | |
# Function to recommend relevant documents | |
def recommend_documents(query): | |
query_embedding = embedder.encode(query, convert_to_tensor=True) | |
doc_embeddings = embedder.encode(list(document_database.values()), convert_to_tensor=True) | |
scores = util.pytorch_cos_sim(query_embedding, doc_embeddings).cpu().numpy() | |
top_indices = np.argsort(scores[0])[-3:][::-1] # Top 3 recommendations | |
recommended_docs = [list(document_database.keys())[i] for i in top_indices] | |
return recommended_docs | |
# Function to preprocess text | |
def preprocess_text(text): | |
text = text.lower() | |
text = re.sub(r"[^a-zA-Z0-9\s]", "", text) # Remove special characters | |
text = " ".join([word for word in text.split() if word not in stop_words]) # Remove stopwords | |
return text | |
# Extract text from PDF | |
def extract_text_from_pdf(pdf_file): | |
text = "" | |
with pdfplumber.open(pdf_file) as pdf: | |
for page in pdf.pages: | |
text += page.extract_text() + "\n" | |
return preprocess_text(text) | |
# Extract text from image using OCR | |
def extract_text_from_image(image_file): | |
image = Image.open(image_file) | |
return preprocess_text(pytesseract.image_to_string(image)) | |
# Convert speech to text | |
def voice_to_text(audio_file): | |
recognizer = sr.Recognizer() | |
with sr.AudioFile(audio_file) as source: | |
audio = recognizer.record(source) | |
try: | |
return recognizer.recognize_google(audio) | |
except sr.UnknownValueError: | |
return "Could not understand the audio." | |
except sr.RequestError: | |
return "Speech recognition service unavailable." | |
# Convert text to speech | |
def text_to_speech(answer_text): | |
tts = gTTS(text=answer_text, lang="en") | |
tts.save("response.mp3") | |
return "response.mp3" | |
# Process document and answer questions | |
def document_processor(uploaded_file, query): | |
text = "" | |
# File type handling | |
if uploaded_file.name.endswith(".pdf"): | |
text = extract_text_from_pdf(uploaded_file.name) | |
elif uploaded_file.name.endswith((".png", ".jpg", ".jpeg")): | |
text = extract_text_from_image(uploaded_file.name) | |
else: | |
text = preprocess_text(uploaded_file.read().decode("utf-8")) | |
# If user asks for a summary | |
if query.lower() == "summarize": | |
summary = summarizer(text, max_length=200, min_length=50, do_sample=False) | |
return summary[0]["summary_text"], text_to_speech(summary[0]["summary_text"]), recommend_documents(summary[0]["summary_text"]) | |
# Multi-question processing | |
queries = [q.strip() for q in query.split(";")] | |
responses = {} | |
for q in queries: | |
# Sentence embeddings for better accuracy | |
sentences = text.split(". ") | |
sentence_embeddings = embedder.encode(sentences, convert_to_tensor=True) | |
query_embedding = embedder.encode(q, convert_to_tensor=True) | |
# Find most relevant sentence | |
scores = util.pytorch_cos_sim(query_embedding, sentence_embeddings) | |
best_sentence = sentences[np.argmax(scores.cpu().numpy())] | |
# Generate answer | |
answer = qa_pipeline(question=q, context=best_sentence) | |
responses[q] = answer["answer"] | |
# Convert answer to speech | |
combined_answers = " ".join(responses.values()) | |
speech_output = text_to_speech(combined_answers) | |
return responses, speech_output, recommend_documents(query) | |
# Gradio UI | |
with gr.Blocks() as app: | |
gr.Markdown("# π Smart Document Explorer π") | |
with gr.Row(): | |
uploaded_file = gr.File(label="π Upload Document (PDF, Image, or Text)") | |
with gr.Row(): | |
query = gr.Textbox(label="π¬ Ask Questions (Separate with ';') or Type 'summarize'", placeholder="e.g. What is the topic?; Who wrote it?") | |
with gr.Row(): | |
voice_input = gr.Audio(label="π€ Speak Your Query", type="filepath") | |
voice_btn = gr.Button("ποΈ Convert Speech to Text") | |
with gr.Row(): | |
output_text = gr.JSON(label="π§ AI Response") | |
output_audio = gr.Audio(label="π AI Voice Answer", type="filepath") | |
with gr.Row(): | |
recommendations = gr.JSON(label="π Recommended Topics") | |
submit_btn = gr.Button("π Process Document") | |
# Button Actions | |
voice_btn.click(voice_to_text, inputs=voice_input, outputs=query) | |
submit_btn.click(document_processor, inputs=[uploaded_file, query], outputs=[output_text, output_audio, recommendations]) | |
app.launch() | |