Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import uuid | |
import json | |
from huggingface_hub import InferenceClient | |
from pypdf import PdfReader | |
from bs4 import BeautifulSoup | |
import zipfile | |
import nltk | |
from typing import List, Dict | |
import lxml | |
# Ensure NLTK resources | |
try: | |
nltk.data.find('tokenizers/punkt') | |
except LookupError: | |
nltk.download('punkt') | |
# Initialize Hugging Face API | |
HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
client = InferenceClient(model=HF_MODEL, token=HF_TOKEN) | |
# State to manage datasets | |
datasets_queue = [] | |
def extract_text_from_url(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, "lxml") # Specify lxml here | |
return soup.get_text() | |
except Exception as e: | |
return f"Error scraping URL: {e}" | |
# Helper Functions | |
def extract_text_from_pdf(file_path): | |
try: | |
reader = PdfReader(file_path) | |
return "\n".join(page.extract_text() for page in reader.pages) | |
except Exception as e: | |
return f"Error reading PDF: {e}" | |
def extract_text_from_url(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, "lxml") | |
return soup.get_text() | |
except Exception as e: | |
return f"Error scraping URL: {e}" | |
def process_uploaded_file(file): | |
try: | |
if file.name.endswith(".pdf"): | |
return extract_text_from_pdf(file.name) | |
elif file.name.endswith(".txt"): | |
with open(file.name, "r", encoding="utf-8") as f: | |
return f.read() | |
elif file.name.endswith(".zip"): | |
extracted_data = [] | |
with zipfile.ZipFile(file.name, "r") as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.filename.endswith((".pdf", ".txt")): | |
with zip_ref.open(file_info) as f: | |
content = f.read() | |
if file_info.filename.endswith(".txt"): | |
extracted_data.append(content.decode("utf-8")) | |
elif file_info.filename.endswith(".pdf"): | |
temp_path = f"/tmp/{uuid.uuid4()}" | |
with open(temp_path, "wb") as temp_file: | |
temp_file.write(content) | |
extracted_data.append(extract_text_from_pdf(temp_path)) | |
return "\n".join(extracted_data) | |
except Exception as e: | |
return f"Error processing file: {e}" | |
def chunk_text(text, max_chunk_size=2000): | |
sentences = nltk.sent_tokenize(text) | |
chunks, current_chunk = [], "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) + 1 > max_chunk_size: | |
chunks.append(current_chunk.strip()) | |
current_chunk = "" | |
current_chunk += sentence + " " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def infer_dataset(data, instructions): | |
extracted = [] | |
chunks = chunk_text(data) | |
for i, chunk in enumerate(chunks): | |
try: | |
response = client.text_generation( | |
prompt=instructions.format(history=chunk), | |
max_new_tokens=1024 | |
) | |
extracted.append(response["generated_text"]) | |
except Exception as e: | |
extracted.append(f"Error in chunk {i}: {e}") | |
return "\n".join(extracted) | |
# Gradio Interface | |
def scrape_data(instructions, files, urls): | |
combined_data = [] | |
# Process uploaded files | |
if files: | |
for file in files: | |
combined_data.append(process_uploaded_file(file)) | |
# Process URLs | |
if urls: | |
url_list = [url.strip() for url in urls.split(",") if url.strip()] | |
for url in url_list: | |
combined_data.append(extract_text_from_url(url)) | |
# Combine and infer with instructions | |
full_text = "\n".join(combined_data) | |
if instructions: | |
dataset = infer_dataset(full_text, instructions) | |
else: | |
dataset = full_text | |
return dataset | |
def add_to_queue(dataset): | |
datasets_queue.append(dataset) | |
return json.dumps(datasets_queue, indent=2) | |
def combine_datasets(): | |
combined_data = "\n".join(datasets_queue) | |
combined_json = {"combined_dataset": combined_data} | |
combined_file = "/tmp/combined_dataset.json" | |
with open(combined_file, "w") as f: | |
json.dump(combined_json, f, indent=2) | |
return json.dumps(combined_json, indent=2), combined_file | |
def train_chatbot(dataset): | |
system_message = {"system": "You are a bot trained on the following dataset:"} | |
system_message["dataset"] = dataset | |
return "Chatbot trained successfully!" | |
def chat_with_bot(history, user_input): | |
if "dataset" not in system_message: | |
return history + [(user_input, "No dataset loaded for the chatbot.")] | |
bot_response = client.text_generation( | |
prompt=f"{system_message['dataset']} {user_input}", | |
max_new_tokens=128 | |
) | |
return history + [(user_input, bot_response["generated_text"])] | |
# Gradio Interface | |
with gr.Blocks() as app: | |
gr.Markdown("# Intelligent Scraper, Dataset Handler, and Chatbot") | |
with gr.Tab("Scrape / Extract Data"): | |
gr.Markdown("Upload files or enter URLs to scrape data and generate JSON datasets.") | |
instruction_input = gr.Textbox(label="Optional Instructions", placeholder="Enter instructions for scraping.") | |
upload_files = gr.Files(label="Upload Files (PDF, TXT, ZIP)", file_types=[".pdf", ".txt", ".zip"]) | |
url_input = gr.Textbox(label="Enter URLs (comma-separated or multiline)") | |
scrape_button = gr.Button("Scrape / Extract Data") | |
extracted_output = gr.Textbox(label="Extracted Output") | |
dataset_button = gr.Button("Add to Dataset Queue") | |
scraped_dataset = gr.Textbox(label="Current Dataset") | |
scrape_button.click(scrape_data, inputs=[instruction_input, upload_files, url_input], outputs=extracted_output) | |
dataset_button.click(add_to_queue, inputs=[extracted_output], outputs=scraped_dataset) | |
with gr.Tab("Combine Datasets"): | |
gr.Markdown("Combine queued datasets into a single JSON dataset.") | |
combine_button = gr.Button("Combine Datasets") | |
combined_output = gr.Textbox(label="Combined Dataset") | |
download_button = gr.Button("Download Combined Dataset") | |
download_output = gr.File(label="Download") | |
combine_button.click(combine_datasets, outputs=[combined_output, download_output]) | |
with gr.Tab("Train and Chat"): | |
gr.Markdown("**Train a chatbot with a selected dataset and interact with it.**") | |
chat_dataset = gr.Textbox( | |
label="Dataset for Training", | |
placeholder="Paste or load a dataset for training.", | |
lines=5, | |
) | |
train_button = gr.Button("Train Chatbot") | |
chatbot = gr.Chatbot(label="Chat with Trained Bot", type="messages") | |
user_input = gr.Textbox( | |
label="Your Message", | |
placeholder="Type a message and press Enter...", | |
lines=1, | |
) | |
# Persistent system message with dataset knowledge | |
system_message = {"system": "You are a bot trained on the following dataset:"} | |
bot_knowledge = {"dataset": None} | |
# Train the chatbot by setting the dataset | |
def train_chatbot(dataset): | |
bot_knowledge["dataset"] = dataset | |
return "Chatbot trained successfully!" | |
# Chat function for handling user messages | |
def chat_with_bot(history, user_message): | |
if not bot_knowledge["dataset"]: | |
return history + [{"role": "bot", "content": "No dataset loaded. Please train the bot first."}] | |
# Append user input to history | |
history.append({"role": "user", "content": user_message}) | |
# Generate bot response based on the dataset | |
prompt = f"{bot_knowledge['dataset']} {user_message}" | |
response = client.text_generation(prompt=prompt, max_new_tokens=128)["generated_text"] | |
# Append bot response to history | |
history.append({"role": "bot", "content": response}) | |
return history | |
# Train button event | |
train_button.click(train_chatbot, inputs=[chat_dataset], outputs=None) | |
# User input submission event | |
user_input.submit( | |
chat_with_bot, inputs=[chatbot, user_input], outputs=chatbot | |
) | |
app.launch() |