Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import uuid | |
from huggingface_hub import InferenceClient, HfApi | |
from pypdf import PdfReader | |
from bs4 import BeautifulSoup | |
import datetime | |
import zipfile | |
import nltk.data | |
import nltk | |
import langchain | |
import dotenv | |
import yaml | |
from typing import Optional, Union, List, Dict, Any, Tuple | |
import subprocess | |
from pathlib import Path | |
import json | |
import tempfile | |
from datetime import datetime as dt, timezone | |
import re | |
import logging | |
import shutil | |
# ----------------------- | |
# ENV / Logging Setup | |
# ----------------------- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Ensure the 'punkt' tokenizer is downloaded only if missing | |
try: | |
nltk.data.find('tokenizers/punkt') | |
except LookupError: | |
nltk.download('punkt') | |
VERBOSE = True | |
def log(message): | |
if VERBOSE: | |
print(f"[LOG] {datetime.datetime.now()} - {message}") | |
# ----------------------- | |
# 1) Scraper/Indexer/Dataset Generator - from your first script | |
# ----------------------- | |
# == Hugging Face API Setup == | |
HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
HF_TOKEN = os.environ.get('HF_TOKEN') | |
if not HF_TOKEN: | |
raise EnvironmentError("HF_TOKEN is not set. Please export it as an environment variable.") | |
try: | |
client = InferenceClient(HF_MODEL) | |
api = HfApi(token=HF_TOKEN) | |
log("Initialized Hugging Face client and API.") | |
except Exception as e: | |
log(f"Error initializing Hugging Face client: {e}") | |
exit(1) | |
REPO_NAME = "acecalisto3/tmp" | |
DATASET_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/" | |
MAX_TOKENS = 8192 | |
def read_pdf(file_path): | |
"""Read PDF and return its text.""" | |
try: | |
reader = PdfReader(file_path) | |
text = "\n".join(page.extract_text() for page in reader.pages) | |
return text | |
except Exception as e: | |
log(f"Error reading PDF {file_path}: {e}") | |
return "" | |
def fetch_url(url, max_depth): | |
"""Breadth-first search crawl to a given depth, collecting text.""" | |
visited = set() | |
to_visit = [(url, 0)] | |
results = [] | |
while to_visit: | |
current_url, depth = to_visit.pop(0) | |
if current_url in visited: | |
continue | |
if depth < max_depth: | |
try: | |
response = requests.get(current_url, timeout=10) | |
response.raise_for_status() | |
visited.add(current_url) | |
soup = BeautifulSoup(response.content, 'lxml') | |
results.append(soup.get_text()) | |
for link in soup.find_all("a", href=True): | |
absolute_url = requests.compat.urljoin(current_url, link.get('href')) | |
if absolute_url.startswith("http") and absolute_url not in visited: | |
to_visit.append((absolute_url, depth + 1)) | |
except Exception as e: | |
log(f"Error fetching {current_url}: {e}") | |
return "\n".join(results) | |
def read_txt(txt_path): | |
"""Read text file.""" | |
try: | |
with open(txt_path, "r", encoding="utf-8") as f: | |
return f.read() | |
except Exception as e: | |
log(f"Error reading TXT file {txt_path}: {e}") | |
return "" | |
def read_zip(zip_path): | |
"""Read all .txt/.pdf files inside a ZIP.""" | |
try: | |
extracted_data = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.filename.endswith((".txt", ".pdf")): | |
with zip_ref.open(file_info) as file: | |
content = file.read() | |
if file_info.filename.endswith(".txt"): | |
extracted_data.append(content.decode("utf-8")) | |
elif file_info.filename.endswith(".pdf"): | |
temp_path = f"/tmp/{uuid.uuid4()}" | |
with open(temp_path, "wb") as temp_file: | |
temp_file.write(content) | |
extracted_data.append(read_pdf(temp_path)) | |
os.remove(temp_path) | |
return "\n".join(extracted_data) | |
except Exception as e: | |
log(f"Error reading ZIP file {zip_path}: {e}") | |
return "" | |
def process_file(file): | |
"""Depending on file extension, process file to extract text.""" | |
try: | |
if file.name.endswith(".pdf"): | |
return read_pdf(file.name) | |
elif file.name.endswith(".txt"): | |
return read_txt(file.name) | |
elif file.name.endswith(".zip"): | |
return read_zip(file.name) | |
except Exception as e: | |
log(f"Error processing file {file.name}: {e}") | |
return "" | |
def chunk_text(text, max_chunk_size): | |
"""Naive chunking based on sentence tokenizer to avoid huge tokens.""" | |
tokenizer = nltk.data.load("tokenizers/punkt/english.pickle") | |
sentences = tokenizer.tokenize(text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) + 1 > max_chunk_size: | |
chunks.append(current_chunk.strip()) | |
current_chunk = "" | |
current_chunk += sentence + " " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def extract_dataset(data, instructions="Extract {history}", max_tokens=MAX_TOKENS): | |
"""Call text generation on each chunk with a certain instruction.""" | |
extracted = [] | |
chunks = chunk_text(data, 20000) # Adjust chunk size as needed | |
for i, chunk in enumerate(chunks): | |
try: | |
response = client.text_generation( | |
prompt=instructions.format(history=chunk), | |
max_new_tokens=max_tokens | |
) | |
extracted.append(response["generated_text"]) | |
except Exception as e: | |
log(f"Error processing chunk {i+1}: {e}") | |
extracted.append(f"Error processing chunk {i+1}: {e}") | |
return "\n".join(extracted) | |
def combine_datasets(datasets): | |
"""Simply combine multiple dataset strings into one big string.""" | |
return "\n".join(datasets) | |
# ----------------------- | |
# 2) GitHub Issue Resolver - from your second script | |
# ----------------------- | |
class TerminalCommand: | |
def execute(command: Union[str, List[str]], cwd: Optional[str] = None) -> Tuple[str, str, int]: | |
""" | |
Execute a terminal command and return stdout, stderr, and return code | |
""" | |
if isinstance(command, str): | |
command = command.split() | |
try: | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
cwd=cwd, | |
text=True | |
) | |
stdout, stderr = process.communicate() | |
return stdout.strip(), stderr.strip(), process.returncode | |
except Exception as e: | |
logger.error(f"Error executing command {command}: {e}") | |
return "", str(e), 1 | |
class GitUtilities: | |
def __init__(self, repo_path: str): | |
self.repo_path = Path(repo_path) | |
def clone(self, url: str, branch: str = "main") -> bool: | |
"""Clone a repository.""" | |
stdout, stderr, code = TerminalCommand.execute( | |
f"git clone -b {branch} {url} {self.repo_path}" | |
) | |
if code != 0: | |
logger.error(f"Git clone failed: {stderr}") | |
return code == 0 | |
def commit(self, message: str) -> bool: | |
"""Create a commit with the given message.""" | |
stdout, stderr, code = TerminalCommand.execute( | |
["git", "commit", "-am", message], | |
str(self.repo_path) | |
) | |
if code != 0: | |
logger.error(f"Git commit failed: {stderr}") | |
return code == 0 | |
def push(self, remote: str = "origin", branch: str = "main") -> bool: | |
"""Push changes to remote.""" | |
stdout, stderr, code = TerminalCommand.execute( | |
["git", "push", remote, branch], | |
str(self.repo_path) | |
) | |
if code != 0: | |
logger.error(f"Git push failed: {stderr}") | |
return code == 0 | |
def create_branch(self, branch_name: str) -> bool: | |
"""Create and checkout a new branch.""" | |
stdout, stderr, code = TerminalCommand.execute( | |
["git", "checkout", "-b", branch_name], | |
str(self.repo_path) | |
) | |
if code != 0: | |
logger.error(f"Git branch creation failed: {stderr}") | |
return code == 0 | |
class GitHubBot: | |
def __init__(self, logger: logging.Logger): | |
self.github_api = None | |
self.logger = logger | |
self.ai_provider = None | |
self.git = None | |
self.temp_dir = None | |
self.base_url = "https://api.github.com" | |
def initialize_api(self, token: str): | |
"""Initialize the GitHub API with a token.""" | |
if not token: | |
raise ValueError("GitHub token is required.") | |
self.github_api = {"Authorization": f"Bearer {token}"} | |
self.temp_dir = tempfile.mkdtemp() | |
self.git = GitUtilities(self.temp_dir) | |
def create_pull_request(self, owner: str, repo: str, title: str, body: str, head: str, base: str = "main") -> Dict: | |
"""Create a pull request.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/pulls" | |
data = { | |
"title": title, | |
"body": body, | |
"head": head, | |
"base": base | |
} | |
try: | |
response = requests.post(url, headers=self.github_api, json=data) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
logger.error(f"Error creating pull request: {e}") | |
raise | |
def resolve_issue( | |
self, | |
token: str, | |
owner: str, | |
repo: str, | |
issue_number: int, | |
resolution: str, | |
forked_repo: str | |
) -> str: | |
"""Resolve a GitHub issue by cloning, creating a fix branch, and opening a PR.""" | |
try: | |
self.initialize_api(token) | |
branch_name = f"fix/issue-{issue_number}-{dt.now().strftime('%Y%m%d-%H%M%S')}" | |
# Clone repository (forked repo URL is expected) | |
if not self.git.clone(forked_repo): | |
raise Exception("Failed to clone repository") | |
# Create a new branch | |
if not self.git.create_branch(branch_name): | |
raise Exception("Failed to create branch") | |
# Generate resolution content | |
resolution_content = self._create_resolution_document(issue_number, resolution) | |
# Save resolution file (as an example, you can adjust) | |
resolution_path = Path(self.temp_dir) / f"resolution_{issue_number}.md" | |
with open(resolution_path, "w") as f: | |
f.write(resolution_content) | |
# Commit and push changes | |
if not self.git.commit(f"Fix for issue #{issue_number}"): | |
raise Exception("Failed to commit changes") | |
if not self.git.push("origin", branch_name): | |
raise Exception("Failed to push changes") | |
# Create a pull request | |
pr = self.create_pull_request( | |
owner=owner, | |
repo=repo, | |
title=f"Fix for issue #{issue_number}", | |
body="This PR resolves the reported issue with the following resolution.", | |
head=branch_name | |
) | |
return f"Pull request created: {pr['html_url']}" | |
except Exception as e: | |
logger.error(f"Error resolving issue #{issue_number}: {e}") | |
return f"Error: {e}" | |
finally: | |
if self.temp_dir and os.path.exists(self.temp_dir): | |
shutil.rmtree(self.temp_dir) | |
def _create_resolution_document(self, issue_number: int, resolution: str) -> str: | |
"""Create a resolution document for the fix.""" | |
return f"""# Resolution for Issue #{issue_number} | |
## Resolution Details | |
{resolution} | |
## Metadata | |
- Date: {dt.now(timezone.utc).isoformat()} | |
- Resolved By: Automated System | |
""" | |
# ----------------------- | |
# 3) Build the combined Gradio interface with two tabs | |
# ----------------------- | |
def create_combined_gradio_app(): | |
""" | |
Create one Gradio interface that has two tabs: | |
1) 'Scraper/Indexer/Dataset Generator' | |
2) 'GitHub Issue Resolver' | |
""" | |
bot = GitHubBot(logger) | |
# 3.1) Functions for the first tab (Scraper/Indexer/Dataset Generator) | |
def process_workflow(command, data, files, url, depth): | |
datasets = [] | |
errors = [] | |
try: | |
# If user enters text in the data_input box | |
if data: | |
datasets.append(data) | |
# If user uploads any files | |
if files: | |
for file in files: | |
datasets.append(process_file(file)) | |
# If user supplies a URL | |
if url: | |
datasets.append(fetch_url(url, max_depth=depth)) | |
# Depending on the command chosen, do the logic | |
if command == "Extract Dataset": | |
return {"datasets": extract_dataset("\n".join(datasets))}, "" | |
elif command == "Combine Datasets": | |
return {"datasets": combine_datasets(datasets)}, "" | |
# Default: if "Scrape Data" or "Train Chatbot" or unknown | |
return {"datasets": datasets}, "" | |
except Exception as e: | |
errors.append(str(e)) | |
return {"datasets": []}, "\n".join(errors) | |
# 3.2) Functions for the second tab (GitHub Issue Resolver) | |
def on_resolve(token, repo_url, issue_number, resolution, forked_repo): | |
""" | |
This callback is used when a user clicks 'Resolve Issue' in the second tab. | |
""" | |
try: | |
parts = repo_url.strip("/").split("/") | |
# Typically, the repo URL is something like https://github.com/owner/repo | |
owner, repo = parts[-2], parts[-1] | |
result = bot.resolve_issue(token, owner, repo, int(issue_number), resolution, forked_repo) | |
return result | |
except Exception as e: | |
logger.error(f"Error in issue resolution: {e}") | |
return f"Error: {e}" | |
with gr.Blocks() as main_app: | |
# Title / Header | |
gr.Markdown("## Combined System: Scraper/Indexer/Dataset Generator & GitHub Issue Resolver") | |
with gr.Tab("Scraper / Indexer / Dataset Generator"): | |
gr.Markdown( | |
"**Use this tab to upload files, scrape data from URLs, or enter text to generate datasets.**" | |
) | |
# The UI from your first script | |
chatbot = gr.Chatbot(label="Flash Trained Chatbot (Placeholder)") | |
command_selector = gr.Dropdown( | |
label="Select Command", | |
choices=["Scrape Data", "Extract Dataset", "Combine Datasets", "Train Chatbot"], | |
value="Scrape Data" | |
) | |
data_input = gr.Textbox(label="Input Text", placeholder="Enter text here.") | |
file_upload = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"]) | |
url_input = gr.Textbox(label="URL", placeholder="https://example.com") | |
depth_slider = gr.Slider(label="Crawl Depth", minimum=1, maximum=10, value=1) | |
output_json = gr.JSON(label="Output Dataset") | |
error_output = gr.Textbox(label="Error Log", interactive=False) | |
process_button = gr.Button("Process") | |
process_button.click( | |
process_workflow, | |
inputs=[command_selector, data_input, file_upload, url_input, depth_slider], | |
outputs=[output_json, error_output] | |
) | |
with gr.Tab("GitHub Issue Resolver"): | |
gr.Markdown("**Use this tab to resolve GitHub issues by cloning, fixing, and opening PRs.**") | |
token_input = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token") | |
repo_url_input = gr.Textbox(label="Repository URL", placeholder="e.g. https://github.com/owner/repo") | |
issue_number_input = gr.Number(label="Issue Number", precision=0, value=1) | |
resolution_input = gr.Textbox( | |
label="Proposed Resolution", | |
placeholder="Describe the resolution for the issue here..." | |
) | |
forked_repo_input = gr.Textbox( | |
label="Forked Repo URL", | |
placeholder="e.g. https://github.com/youraccount/repo (your fork)" | |
) | |
resolve_button = gr.Button("Resolve Issue") | |
result_output = gr.Textbox(label="Result", interactive=False) | |
resolve_button.click( | |
fn=on_resolve, | |
inputs=[ | |
token_input, | |
repo_url_input, | |
issue_number_input, | |
resolution_input, | |
forked_repo_input | |
], | |
outputs=[result_output] | |
) | |
return main_app | |
if __name__ == "__main__": | |
app = create_combined_gradio_app() | |
app.launch(server_name="0.0.0.0", server_port=7860) | |