Spaces:
Runtime error
Runtime error
import sys | |
import shutil | |
import logging | |
import time | |
import os | |
from datetime import datetime | |
from typing import List, Dict, Any | |
import requests | |
import gradio as gr | |
import atexit | |
import subprocess | |
from urllib.parse import urlparse, quote | |
import webbrowser | |
# Constants | |
INPUT_DIRECTORY = 'input' | |
OUTPUT_DIRECTORY = 'output' | |
LOGS_DIRECTORY = 'logs' | |
RESOLUTIONS_DIRECTORY = 'resolutions' | |
REPOS_DIRECTORY = 'repos' | |
# Set up logging | |
def initialize_logger() -> logging.Logger: | |
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler() | |
] | |
) | |
return logging.getLogger(__name__) | |
# Initialize environment and logger | |
def initialize_environment(): | |
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, INPUT_DIRECTORY, OUTPUT_DIRECTORY] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
# GitHub API handler | |
class GitHubAPI: | |
def __init__(self, token: str, logger: logging.Logger): | |
self.token = token | |
self.logger = logger | |
self.headers = { | |
'Authorization': f'token {token}', | |
'Accept': 'application/vnd.github.v3+json' | |
} | |
self.base_url = "https://api.github.com" | |
def _check_rate_limit(self) -> bool: | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
response.raise_for_status() | |
limits = response.json() | |
remaining = limits['resources']['core']['remaining'] | |
reset_time = limits['resources']['core']['reset'] | |
if remaining < 10: | |
wait_time = max(0, reset_time - int(time.time())) | |
if wait_time > 0: | |
self.logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds before retrying...") | |
time.sleep(wait_time) | |
return False | |
return True | |
except requests.exceptions.RequestException as e: | |
self.logger.error(f"Error checking rate limit: {str(e)}. Retrying...") | |
return True | |
def get_repository(self, owner: str, repo: str) -> Dict: | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.HTTPError as e: | |
self.logger.error(f"HTTP error getting repository info for {owner}/{repo}: {str(e)}. Please check the repository details.") | |
raise | |
except Exception as e: | |
self.logger.error(f"Error getting repository info: {str(e)}") | |
raise | |
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]: | |
if not self._check_rate_limit(): | |
return [] | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state}) | |
response.raise_for_status() | |
issues = response.json() | |
return [issue for issue in issues if 'pull_request' not in issue] | |
except Exception as e: | |
self.logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}. Please verify the repository and token.") | |
return [] | |
# GitHub Bot | |
class GitHubBot: | |
def __init__(self, logger: logging.Logger): | |
self.github_api = None | |
self.logger = logger | |
def initialize_api(self, token: str): | |
self.github_api = GitHubAPI(token, self.logger) | |
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]: | |
try: | |
self.initialize_api(token) | |
return self.github_api.get_issues(owner, repo) | |
except Exception as e: | |
self.logger.error(f"Error fetching issues for repository {owner}/{repo}: {str(e)}") | |
return [] | |
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str: | |
try: | |
self.initialize_api(token) | |
self.github_api.get_repository(owner, repo) | |
# Create resolution file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md" | |
with open(resolution_file, "w") as f: | |
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}") | |
# Validate forked_repo before cloning | |
if not forked_repo: | |
raise ValueError("Forked repository URL cannot be empty.") | |
# Clone the forked repo | |
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True) | |
# Change to the cloned directory | |
os.chdir('/tmp/' + forked_repo.split('/')[-1]) | |
# Assuming manual intervention now | |
input("Apply the fix manually and stage the changes (press ENTER)? ") | |
# Commit and push the modifications | |
subprocess.run(['git', 'add', '.'], check=True) | |
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True) | |
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True) | |
# Open Pull Request page | |
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}') | |
return f"Resolution saved: {resolution_file}" | |
except Exception as e: | |
error_msg = f"Error resolving issue #{issue_number} in repository {owner}/{repo}: {str(e)}" | |
self.logger.error(error_msg) | |
return error_msg | |
def suggest_automated_fixes(self, issue_title: str) -> str: | |
if "missing README" in issue_title.lower(): | |
return "Consider adding a README.md file to provide project documentation." | |
return "No automated fix available for this issue." | |
def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo): | |
bot = GitHubBot(logger) | |
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo) | |
return result | |
def extract_info_from_url(url: str) -> Dict[str, Any]: | |
info = {} | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
info['status_code'] = response.status_code | |
info['headers'] = dict(response.headers) | |
info['content'] = response.text[:500] # Limit content to first 500 characters for brevity | |
parsed_url = urlparse(url) | |
if 'github.com' in parsed_url.netloc: | |
parts = parsed_url.path.split('/') | |
if len(parts) > 2: | |
owner = parts[1] | |
repo = parts[2] | |
issues = bot.fetch_issues(github_token, owner, repo) | |
info['issues'] = issues | |
elif 'huggingface.co' in parsed_url.netloc: | |
# Add Hugging Face specific handling if needed | |
pass | |
except requests.HTTPError as e: | |
info['error'] = f"HTTP error: {str(e)}" | |
except Exception as e: | |
info['error'] = f"Error: {str(e)}" | |
return info | |
# Initialize GitHubBot globally | |
logger = initialize_logger() # Initialize logger before creating the bot | |
bot = GitHubBot(logger) | |
# Define missing functions with validation | |
def fetch_issues(token, repo_url): | |
try: | |
parts = repo_url.split('/') | |
if len(parts) < 2: | |
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") | |
owner, repo = parts[-2], parts[-1] | |
issues = bot.fetch_issues(token, owner, repo) | |
return issues | |
except Exception as e: | |
return str(e) | |
def resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url): | |
try: | |
parts = repo_url.split('/') | |
if len(parts) < 2: | |
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") | |
owner, repo = parts[-2], parts[-1] | |
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url) | |
return result | |
except Exception as e: | |
return str(e) | |
def extract_info(url): | |
try: | |
info = extract_info_from_url(url) | |
return info | |
except Exception as e: | |
return str(e) | |
def create_gradio_interface(): | |
with gr.Blocks() as demo: | |
gr.Markdown("# GitHub Issue Resolver") | |
gr.Markdown("This application allows you to fetch and resolve GitHub issues efficiently.") | |
with gr.Row(): | |
token_input = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token") | |
repo_url_input = gr.Textbox(label="Repository URL", placeholder="Enter the repository URL (owner/repo)") | |
with gr.Row(): | |
issue_number_input = gr.Number(label="Issue Number", info="Enter the issue number") | |
resolution_input = gr.Textbox(label="Resolution", placeholder="Describe the resolution for the issue", lines=4) | |
forked_repo_input = gr.Textbox(label="Forked Repository URL", placeholder="Enter the forked repository URL") | |
submit_button = gr.Button("Resolve Issue") | |
result_output = gr.Textbox(label="Result", interactive=False) | |
def on_submit(token, repo_url, issue_number, resolution, forked_repo): | |
issues = fetch_issues(token, repo_url) | |
if issues: | |
automated_fix = bot.suggest_automated_fixes(issues[0]['title']) | |
resolution += f"\n\n**Automated Suggestion:** {automated_fix}" | |
result = resolve_issue(token, repo_url, issue_number, resolution, forked_repo) | |
return result | |
return "No issues found or an error occurred." | |
submit_button.click(on_submit, inputs=[token_input, repo_url_input, issue_number_input, resolution_input, forked_repo_input], outputs=result_output) | |
return demo | |
# Cleanup function | |
def cleanup(): | |
try: | |
temp_dirs = [REPOS_DIRECTORY] | |
for dir_name in temp_dirs: | |
if os.path.exists(dir_name): | |
shutil.rmtree(dir_name) | |
logging.shutdown() | |
except Exception as e: | |
print(f"Error during cleanup: {str(e)}") | |
def main(): | |
# Initialize environment and logger | |
initialize_environment() | |
global logger | |
logger = initialize_logger() | |
# Register cleanup handlers | |
atexit.register(cleanup) | |
# Create Gradio interface | |
demo = create_gradio_interface() | |
demo.launch() | |
if __name__ == "__main__": | |
main() |