GitBot / app.py
acecalisto3's picture
Update app.py
b1e9534 verified
raw
history blame
17.2 kB
import os
import gradio as gr
import requests
import uuid
from huggingface_hub import InferenceClient, HfApi
from pypdf import PdfReader
from bs4 import BeautifulSoup
import datetime
import zipfile
import nltk.data
import nltk
import langchain
import dotenv
import yaml
from typing import Optional, Union, List, Dict, Any, Tuple
import subprocess
from pathlib import Path
import json
import tempfile
from datetime import datetime as dt, timezone
import re
import logging
import shutil
# -----------------------
# ENV / Logging Setup
# -----------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Ensure the 'punkt' tokenizer is downloaded only if missing
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
VERBOSE = True
def log(message):
if VERBOSE:
print(f"[LOG] {datetime.datetime.now()} - {message}")
# -----------------------
# 1) Scraper/Indexer/Dataset Generator - from your first script
# -----------------------
# == Hugging Face API Setup ==
HF_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1"
HF_TOKEN = os.environ.get('HF_TOKEN')
if not HF_TOKEN:
raise EnvironmentError("HF_TOKEN is not set. Please export it as an environment variable.")
try:
client = InferenceClient(HF_MODEL)
api = HfApi(token=HF_TOKEN)
log("Initialized Hugging Face client and API.")
except Exception as e:
log(f"Error initializing Hugging Face client: {e}")
exit(1)
REPO_NAME = "acecalisto3/tmp"
DATASET_URL = f"https://huggingface.co/datasets/{REPO_NAME}/raw/main/"
MAX_TOKENS = 8192
def read_pdf(file_path):
"""Read PDF and return its text."""
try:
reader = PdfReader(file_path)
text = "\n".join(page.extract_text() for page in reader.pages)
return text
except Exception as e:
log(f"Error reading PDF {file_path}: {e}")
return ""
def fetch_url(url, max_depth):
"""Breadth-first search crawl to a given depth, collecting text."""
visited = set()
to_visit = [(url, 0)]
results = []
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited:
continue
if depth < max_depth:
try:
response = requests.get(current_url, timeout=10)
response.raise_for_status()
visited.add(current_url)
soup = BeautifulSoup(response.content, 'lxml')
results.append(soup.get_text())
for link in soup.find_all("a", href=True):
absolute_url = requests.compat.urljoin(current_url, link.get('href'))
if absolute_url.startswith("http") and absolute_url not in visited:
to_visit.append((absolute_url, depth + 1))
except Exception as e:
log(f"Error fetching {current_url}: {e}")
return "\n".join(results)
def read_txt(txt_path):
"""Read text file."""
try:
with open(txt_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
log(f"Error reading TXT file {txt_path}: {e}")
return ""
def read_zip(zip_path):
"""Read all .txt/.pdf files inside a ZIP."""
try:
extracted_data = []
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if file_info.filename.endswith((".txt", ".pdf")):
with zip_ref.open(file_info) as file:
content = file.read()
if file_info.filename.endswith(".txt"):
extracted_data.append(content.decode("utf-8"))
elif file_info.filename.endswith(".pdf"):
temp_path = f"/tmp/{uuid.uuid4()}"
with open(temp_path, "wb") as temp_file:
temp_file.write(content)
extracted_data.append(read_pdf(temp_path))
os.remove(temp_path)
return "\n".join(extracted_data)
except Exception as e:
log(f"Error reading ZIP file {zip_path}: {e}")
return ""
def process_file(file):
"""Depending on file extension, process file to extract text."""
try:
if file.name.endswith(".pdf"):
return read_pdf(file.name)
elif file.name.endswith(".txt"):
return read_txt(file.name)
elif file.name.endswith(".zip"):
return read_zip(file.name)
except Exception as e:
log(f"Error processing file {file.name}: {e}")
return ""
def chunk_text(text, max_chunk_size):
"""Naive chunking based on sentence tokenizer to avoid huge tokens."""
tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
sentences = tokenizer.tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > max_chunk_size:
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk += sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def extract_dataset(data, instructions="Extract {history}", max_tokens=MAX_TOKENS):
"""Call text generation on each chunk with a certain instruction."""
extracted = []
chunks = chunk_text(data, 20000) # Adjust chunk size as needed
for i, chunk in enumerate(chunks):
try:
response = client.text_generation(
prompt=instructions.format(history=chunk),
max_new_tokens=max_tokens
)
extracted.append(response["generated_text"])
except Exception as e:
log(f"Error processing chunk {i+1}: {e}")
extracted.append(f"Error processing chunk {i+1}: {e}")
return "\n".join(extracted)
def combine_datasets(datasets):
"""Simply combine multiple dataset strings into one big string."""
return "\n".join(datasets)
# -----------------------
# 2) GitHub Issue Resolver - from your second script
# -----------------------
class TerminalCommand:
@staticmethod
def execute(command: Union[str, List[str]], cwd: Optional[str] = None) -> Tuple[str, str, int]:
"""
Execute a terminal command and return stdout, stderr, and return code
"""
if isinstance(command, str):
command = command.split()
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
text=True
)
stdout, stderr = process.communicate()
return stdout.strip(), stderr.strip(), process.returncode
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
return "", str(e), 1
class GitUtilities:
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
def clone(self, url: str, branch: str = "main") -> bool:
"""Clone a repository."""
stdout, stderr, code = TerminalCommand.execute(
f"git clone -b {branch} {url} {self.repo_path}"
)
if code != 0:
logger.error(f"Git clone failed: {stderr}")
return code == 0
def commit(self, message: str) -> bool:
"""Create a commit with the given message."""
stdout, stderr, code = TerminalCommand.execute(
["git", "commit", "-am", message],
str(self.repo_path)
)
if code != 0:
logger.error(f"Git commit failed: {stderr}")
return code == 0
def push(self, remote: str = "origin", branch: str = "main") -> bool:
"""Push changes to remote."""
stdout, stderr, code = TerminalCommand.execute(
["git", "push", remote, branch],
str(self.repo_path)
)
if code != 0:
logger.error(f"Git push failed: {stderr}")
return code == 0
def create_branch(self, branch_name: str) -> bool:
"""Create and checkout a new branch."""
stdout, stderr, code = TerminalCommand.execute(
["git", "checkout", "-b", branch_name],
str(self.repo_path)
)
if code != 0:
logger.error(f"Git branch creation failed: {stderr}")
return code == 0
class GitHubBot:
def __init__(self, logger: logging.Logger):
self.github_api = None
self.logger = logger
self.ai_provider = None
self.git = None
self.temp_dir = None
self.base_url = "https://api.github.com"
def initialize_api(self, token: str):
"""Initialize the GitHub API with a token."""
if not token:
raise ValueError("GitHub token is required.")
self.github_api = {"Authorization": f"Bearer {token}"}
self.temp_dir = tempfile.mkdtemp()
self.git = GitUtilities(self.temp_dir)
def create_pull_request(self, owner: str, repo: str, title: str, body: str, head: str, base: str = "main") -> Dict:
"""Create a pull request."""
url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
data = {
"title": title,
"body": body,
"head": head,
"base": base
}
try:
response = requests.post(url, headers=self.github_api, json=data)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Error creating pull request: {e}")
raise
def resolve_issue(
self,
token: str,
owner: str,
repo: str,
issue_number: int,
resolution: str,
forked_repo: str
) -> str:
"""Resolve a GitHub issue by cloning, creating a fix branch, and opening a PR."""
try:
self.initialize_api(token)
branch_name = f"fix/issue-{issue_number}-{dt.now().strftime('%Y%m%d-%H%M%S')}"
# Clone repository (forked repo URL is expected)
if not self.git.clone(forked_repo):
raise Exception("Failed to clone repository")
# Create a new branch
if not self.git.create_branch(branch_name):
raise Exception("Failed to create branch")
# Generate resolution content
resolution_content = self._create_resolution_document(issue_number, resolution)
# Save resolution file (as an example, you can adjust)
resolution_path = Path(self.temp_dir) / f"resolution_{issue_number}.md"
with open(resolution_path, "w") as f:
f.write(resolution_content)
# Commit and push changes
if not self.git.commit(f"Fix for issue #{issue_number}"):
raise Exception("Failed to commit changes")
if not self.git.push("origin", branch_name):
raise Exception("Failed to push changes")
# Create a pull request
pr = self.create_pull_request(
owner=owner,
repo=repo,
title=f"Fix for issue #{issue_number}",
body="This PR resolves the reported issue with the following resolution.",
head=branch_name
)
return f"Pull request created: {pr['html_url']}"
except Exception as e:
logger.error(f"Error resolving issue #{issue_number}: {e}")
return f"Error: {e}"
finally:
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def _create_resolution_document(self, issue_number: int, resolution: str) -> str:
"""Create a resolution document for the fix."""
return f"""# Resolution for Issue #{issue_number}
## Resolution Details
{resolution}
## Metadata
- Date: {dt.now(timezone.utc).isoformat()}
- Resolved By: Automated System
"""
# -----------------------
# 3) Build the combined Gradio interface with two tabs
# -----------------------
def create_combined_gradio_app():
"""
Create one Gradio interface that has two tabs:
1) 'Scraper/Indexer/Dataset Generator'
2) 'GitHub Issue Resolver'
"""
bot = GitHubBot(logger)
# 3.1) Functions for the first tab (Scraper/Indexer/Dataset Generator)
def process_workflow(command, data, files, url, depth):
datasets = []
errors = []
try:
# If user enters text in the data_input box
if data:
datasets.append(data)
# If user uploads any files
if files:
for file in files:
datasets.append(process_file(file))
# If user supplies a URL
if url:
datasets.append(fetch_url(url, max_depth=depth))
# Depending on the command chosen, do the logic
if command == "Extract Dataset":
return {"datasets": extract_dataset("\n".join(datasets))}, ""
elif command == "Combine Datasets":
return {"datasets": combine_datasets(datasets)}, ""
# Default: if "Scrape Data" or "Train Chatbot" or unknown
return {"datasets": datasets}, ""
except Exception as e:
errors.append(str(e))
return {"datasets": []}, "\n".join(errors)
# 3.2) Functions for the second tab (GitHub Issue Resolver)
def on_resolve(token, repo_url, issue_number, resolution, forked_repo):
"""
This callback is used when a user clicks 'Resolve Issue' in the second tab.
"""
try:
parts = repo_url.strip("/").split("/")
# Typically, the repo URL is something like https://github.com/owner/repo
owner, repo = parts[-2], parts[-1]
result = bot.resolve_issue(token, owner, repo, int(issue_number), resolution, forked_repo)
return result
except Exception as e:
logger.error(f"Error in issue resolution: {e}")
return f"Error: {e}"
with gr.Blocks() as main_app:
# Title / Header
gr.Markdown("## Combined System: Scraper/Indexer/Dataset Generator & GitHub Issue Resolver")
with gr.Tab("Scraper / Indexer / Dataset Generator"):
gr.Markdown(
"**Use this tab to upload files, scrape data from URLs, or enter text to generate datasets.**"
)
# The UI from your first script
chatbot = gr.Chatbot(label="Flash Trained Chatbot (Placeholder)")
command_selector = gr.Dropdown(
label="Select Command",
choices=["Scrape Data", "Extract Dataset", "Combine Datasets", "Train Chatbot"],
value="Scrape Data"
)
data_input = gr.Textbox(label="Input Text", placeholder="Enter text here.")
file_upload = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"])
url_input = gr.Textbox(label="URL", placeholder="https://example.com")
depth_slider = gr.Slider(label="Crawl Depth", minimum=1, maximum=10, value=1)
output_json = gr.JSON(label="Output Dataset")
error_output = gr.Textbox(label="Error Log", interactive=False)
process_button = gr.Button("Process")
process_button.click(
process_workflow,
inputs=[command_selector, data_input, file_upload, url_input, depth_slider],
outputs=[output_json, error_output]
)
with gr.Tab("GitHub Issue Resolver"):
gr.Markdown("**Use this tab to resolve GitHub issues by cloning, fixing, and opening PRs.**")
token_input = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token")
repo_url_input = gr.Textbox(label="Repository URL", placeholder="e.g. https://github.com/owner/repo")
issue_number_input = gr.Number(label="Issue Number", precision=0, value=1)
resolution_input = gr.Textbox(
label="Proposed Resolution",
placeholder="Describe the resolution for the issue here..."
)
forked_repo_input = gr.Textbox(
label="Forked Repo URL",
placeholder="e.g. https://github.com/youraccount/repo (your fork)"
)
resolve_button = gr.Button("Resolve Issue")
result_output = gr.Textbox(label="Result", interactive=False)
resolve_button.click(
fn=on_resolve,
inputs=[
token_input,
repo_url_input,
issue_number_input,
resolution_input,
forked_repo_input
],
outputs=[result_output]
)
return main_app
if __name__ == "__main__":
app = create_combined_gradio_app()
app.launch(server_name="0.0.0.0", server_port=7860)