GitBot / app.py
acecalisto3's picture
Update app.py
dd262f4 verified
raw
history blame
15.4 kB
import sys
import signal
import shutil
import logging
import time
import os
from datetime import datetime
from typing import List, Dict, Any
import requests
import gradio as gr
import atexit
import subprocess
from urllib.parse import urlparse, quote
import warnings
import webbrowser
import spaces
device = "cuda"
@spaces.GPU()
def stream_chat(
message: str,
history: list,
system_prompt: str,
temperature: float = 0.8,
max_new_tokens: int = 1024,
top_p: float = 1.0,
top_k: int = 20,
penalty: float = 1.2,
):
print(f'message: {message}')
print(f'history: {history}')
conversation = [
{"role": "system", "content": system_prompt}
]
for prompt, answer in history:
conversation.extend([
{"role": "user", "content": prompt},
{"role": "assistant", "content": answer},
])
conversation.append({"role": "user", "content": message})
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
input_ids=input_ids,
max_new_tokens = max_new_tokens,
do_sample = False if temperature == 0 else True,
top_p = top_p,
top_k = top_k,
temperature = temperature,
eos_token_id=[128001,128008,128009],
streamer=streamer,
)
# Constants
INPUT_DIRECTORY = 'input'
OUTPUT_DIRECTORY = 'output'
LOGS_DIRECTORY = 'logs'
RESOLUTIONS_DIRECTORY = 'resolutions'
REPOS_DIRECTORY = 'repos'
# Set up environment
def initialize_environment(input_file, output_directory):
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, input_file, output_directory]
for directory in directories:
os.makedirs(directory, exist_ok=True)
# Set up logging
def initialize_logger():
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
# Initialize environment and logger
initialize_environment(INPUT_DIRECTORY, OUTPUT_DIRECTORY)
logger = initialize_logger() # Initialize logger globally
# GitHub API handler
class GitHubAPI:
def __init__(self, token: str):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = "https://api.github.com"
def _check_rate_limit(self) -> bool:
try:
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
response.raise_for_status()
limits = response.json()
remaining = limits['resources']['core']['remaining']
reset_time = limits['resources']['core']['reset']
if remaining < 10:
wait_time = max(0, reset_time - int(time.time()))
if wait_time > 0:
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...")
time.sleep(wait_time)
return False
return True
except Exception as e:
logger.error(f"Error checking rate limit: {str(e)}")
return True
def get_repository(self, owner: str, repo: str) -> Dict:
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
logger.error(f"HTTP error getting repository info: {str(e)}")
raise
except Exception as e:
logger.error(f"Error getting repository info: {str(e)}")
raise
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]:
if not self._check_rate_limit():
return []
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state})
response.raise_for_status()
issues = response.json()
return [issue for issue in issues if 'pull_request' not in issue]
except Exception as e:
logger.error(f"Error fetching issues: {str(e)}")
return []
# GitHub Bot
class GitHubBot:
def __init__(self):
self.github_api = None
def initialize_api(self, token: str):
self.github_api = GitHubAPI(token)
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]:
try:
self.initialize_api(token)
return self.github_api.get_issues(owner, repo)
except Exception as e:
logger.error(f"Error fetching issues: {str(e)}")
return []
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str:
try:
self.initialize_api(token)
self.github_api.get_repository(owner, repo)
# Create resolution file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md"
with open(resolution_file, "w") as f:
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}")
# Clone the forked repo
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True)
# Change to the cloned directory
os.chdir('/tmp/' + forked_repo.split('/')[-1])
# Assuming manual intervention now
input(" Apply the fix manually and stage the changes (press ENTER)? ")
# Commit and push the modifications
subprocess.run(['git', 'add', '.'], check=True)
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True)
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True)
# Open Pull Request page
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}')
return f"Resolution saved: {resolution_file}"
except Exception as e:
error_msg = f"Error resolving issue: {str(e)}"
logger.error(error_msg)
return error_msg
def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo):
bot = GitHubBot()
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo)
return result
def extract_info_from_url(url: str) -> Dict[str, Any]:
info = {}
try:
response = requests.get(url)
response.raise_for_status()
info['status_code'] = response.status_code
info['headers'] = dict(response.headers)
info['content'] = response.text[:500] # Limit content to first 500 characters for brevity
parsed_url = urlparse(url)
if 'github.com' in parsed_url.netloc:
parts = parsed_url.path.split('/')
if len(parts) > 2:
owner = parts[1]
repo = parts[2]
issues = bot.fetch_issues(github_token, owner, repo)
info['issues'] = issues
elif 'huggingface.co' in parsed_url.netloc:
# Add Hugging Face specific handling if needed
pass
except requests.HTTPError as e:
info['error'] = f"HTTP error: {str(e)}"
except Exception as e:
info['error'] = f"Error: {str(e)}"
return info
# Initialize GitHubBot globally
bot = GitHubBot()
# HTML and CSS integration
custom_html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>GitHub Issue Manager</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/full.css">
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/vue.js"></script>
<style>
.btn-primary {
background-color: #7928CA;
color: white;
border: 2px solid #7928CA;
}
.btn-success {
background-color: #22c55e;
color: white;
border: 2px solid #22c55e;
}
</style>
</head>
<body>
<div id="app" class="container mx-auto p-4">
<h1 class="text-4xl md:text-5xl font-extrabold text-center mb-8">GitHub Issue Manager</h1>
<div class="card bg-gray-800 p-8">
<div class="form-control">
<label class="label">
<span class="label-text">GitHub Token</span>
</label>
<input type="password" placeholder="Enter your GitHub Personal Access Token" class="input input-bordered input-primary" v-model="githubToken">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Repository URL</span>
</label>
<input type="text" placeholder="Enter the full GitHub repository URL" class="input input-bordered input-primary" v-model="repoUrl">
</div>
</div>
<div class="card bg-gray-800 p-8">
<div class="flex justify-between gap-4">
<button class="btn btn-primary" @click="fetchIssues">Fetch Issues</button>
<select class="select select-primary w-full max-w-xs" v-model="selectedIssue">
<option disabled="" selected="">Select Issue</option>
<option v-for="issue in issues" :key="issue.id" :value="issue">{{ issue.title }}</option>
</select>
</div>
<div class="form-control mt-4">
<label class="label">
<span class="label-text">Resolution</span>
</label>
<textarea class="textarea textarea-primary" placeholder="Enter the resolution details" v-model="resolution"></textarea>
</div>
<div class="form-control mt-4">
<label class="label">
<span class="label-text">Forked Repository URL</span>
</label>
<input type="text" placeholder="URL to your forked repository (Optional)" class="input input-bordered input-primary" v-model="forkedRepoUrl">
</div>
<div class="form-control mt-4">
<button class="btn btn-success" @click="resolveIssue">Resolve Issue</button>
</div>
</div>
<div class="card bg-gray-800 p-8 mt-4">
<label class="label">
<span class="label-text">Output</span>
</label>
<textarea class="textarea textarea-primary h-48" v-model="output" readonly></textarea>
</div>
<div class="card bg-gray-800 p-8 mt-4">
<div class="form-control">
<label class="label">
<span class="label-text">URL</span>
</label>
<input type="text" placeholder="Enter a URL to extract information" class="input input-bordered input-primary" v-model="url">
</div>
<div class="form-control">
<button class="btn btn-primary" @click="extractInfo">Extract Info</button>
</div>
</div>
</div>
<script>
new Vue({
el: '#app',
data: {
githubToken: '',
repoUrl: '',
issues: [],
selectedIssue: '',
resolution: '',
forkedRepoUrl: '',
output: '',
url: ''
},
methods: {
fetchIssues() {
fetch('/fetch-issues', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
githubToken: this.githubToken,
repoUrl: this.repoUrl
})
})
.then(response => response.json())
.then(data => {
this.issues = data.issues;
})
.catch(error => {
console.error(error);
});
},
resolveIssue() {
fetch('/resolve-issue', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
githubToken: this.githubToken,
repoUrl: this.repoUrl,
issue: this.selectedIssue,
resolution: this.resolution,
forkedRepoUrl: this.forkedRepoUrl
})
})
.then(response => response.json())
.then(data => {
this.output = data.output;
})
.catch(error => {
console.error(error);
});
},
extractInfo() {
fetch('/extract-info', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
url: this.url
})
})
.then(response => response.json())
.then(data => {
this.output = data.output;
})
.catch(error => {
console.error(error);
});
}
}
});
</script>
</body>
</html>
"""
def create_gradio_interface():
with gr.Blocks(css=None, theme=None) as demo:
gr.HTML(custom_html)
return demo
# Cleanup function
def cleanup():
try:
temp_dirs = [REPOS_DIRECTORY]
for dir_name in temp_dirs:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
logging.shutdown()
except Exception as e:
print(f"Error during cleanup: {str(e)}")
# Signal handler
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}")
cleanup()
sys.exit(0)
if __name__ == "__main__":
# Register cleanup handlers
atexit.register(cleanup)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Create Gradio interface
demo = create_gradio_interface()
demo.launch()