Spaces:
Runtime error
Runtime error
import sys | |
import shutil | |
import logging | |
import time | |
import os | |
from datetime import datetime | |
from typing import List, Dict, Any | |
import requests | |
import gradio as gr | |
import atexit | |
import subprocess | |
from urllib.parse import urlparse, quote | |
import webbrowser | |
# Constants | |
INPUT_DIRECTORY = 'input' | |
OUTPUT_DIRECTORY = 'output' | |
LOGS_DIRECTORY = 'logs' | |
RESOLUTIONS_DIRECTORY = 'resolutions' | |
REPOS_DIRECTORY = 'repos' | |
# Logger setup | |
def initialize_logger() -> logging.Logger: | |
"""Sets up and returns a logger instance.""" | |
os.makedirs(LOGS_DIRECTORY, exist_ok=True) | |
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler() | |
] | |
) | |
return logging.getLogger("GitHubBot") | |
# Global logger instance | |
logger = initialize_logger() | |
# Ensure required directories exist | |
def initialize_environment(): | |
"""Creates necessary directories for the script.""" | |
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, INPUT_DIRECTORY, OUTPUT_DIRECTORY] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
# GitHub API interaction | |
class GitHubAPI: | |
def __init__(self, token: str): | |
self.token = token | |
self.headers = { | |
'Authorization': f'token {token}', | |
'Accept': 'application/vnd.github.v3+json' | |
} | |
self.base_url = "https://api.github.com" | |
def _check_rate_limit(self) -> bool: | |
"""Checks the GitHub API rate limit and waits if necessary.""" | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
response.raise_for_status() | |
limits = response.json() | |
remaining = limits['resources']['core']['remaining'] | |
reset_time = limits['resources']['core']['reset'] | |
if remaining < 10: | |
wait_time = max(0, reset_time - int(time.time())) | |
if wait_time > 0: | |
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...") | |
time.sleep(wait_time) | |
return False | |
return True | |
except requests.RequestException as e: | |
logger.error(f"Error checking rate limit: {str(e)}") | |
return True | |
def get_repository(self, owner: str, repo: str) -> Dict: | |
"""Fetches repository details.""" | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.HTTPError as e: | |
logger.error(f"HTTP error fetching repo {owner}/{repo}: {str(e)}") | |
raise | |
except Exception as e: | |
logger.error(f"Error fetching repo: {str(e)}") | |
raise | |
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]: | |
"""Fetches issues for a repository.""" | |
if not self._check_rate_limit(): | |
return [] | |
try: | |
response = requests.get( | |
f"{self.base_url}/repos/{owner}/{repo}/issues", | |
headers=self.headers, | |
params={'state': state} | |
) | |
response.raise_for_status() | |
return [issue for issue in response.json() if 'pull_request' not in issue] | |
except requests.RequestException as e: | |
logger.error(f"Error fetching issues for {owner}/{repo}: {str(e)}") | |
return [] | |
# GitHub Bot main functionality | |
class GitHubBot: | |
def __init__(self): | |
self.github_api = None | |
def initialize_api(self, token: str): | |
"""Initializes the GitHub API client.""" | |
self.github_api = GitHubAPI(token) | |
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]: | |
"""Fetches issues from a repository.""" | |
self.initialize_api(token) | |
return self.github_api.get_issues(owner, repo) | |
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str: | |
"""Resolves a GitHub issue and pushes changes to a forked repository.""" | |
try: | |
self.initialize_api(token) | |
self.github_api.get_repository(owner, repo) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md" | |
with open(resolution_file, "w") as f: | |
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}") | |
temp_repo_dir = f"/tmp/{forked_repo.split('/')[-1]}" | |
subprocess.run(['git', 'clone', forked_repo, temp_repo_dir], check=True) | |
os.chdir(temp_repo_dir) | |
input("Apply the fix manually, then press ENTER to continue...") | |
subprocess.run(['git', 'add', '.'], check=True) | |
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number}: {resolution}"], check=True) | |
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True) | |
pull_request_url = f"https://github.com/{forked_repo.split('/')[-1]}/compare/master...{owner}:{repo}_resolved_issue_{issue_number}" | |
webbrowser.open(pull_request_url) | |
return f"Resolution completed and saved: {resolution_file}" | |
except Exception as e: | |
logger.error(f"Error resolving issue #{issue_number} for {owner}/{repo}: {str(e)}") | |
return f"Error resolving issue: {str(e)}" | |
# Utility Functions | |
def extract_info_from_url(url: str) -> Dict[str, Any]: | |
"""Extracts information from a URL.""" | |
info = {} | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
info['status_code'] = response.status_code | |
info['headers'] = dict(response.headers) | |
info['content'] = response.text[:500] | |
parsed_url = urlparse(url) | |
if 'github.com' in parsed_url.netloc: | |
parts = parsed_url.path.strip('/').split('/') | |
if len(parts) >= 2: | |
owner, repo = parts[:2] | |
bot = GitHubBot() | |
issues = bot.fetch_issues(github_token, owner, repo) | |
info['issues'] = issues | |
except requests.RequestException as e: | |
info['error'] = f"Error extracting info: {str(e)}" | |
return info | |
# Initialize environment | |
initialize_environment() | |
# Example usage (placeholder for main logic) | |
if __name__ == "__main__": | |
print("GitHub Bot initialized. Add main logic as needed.") | |
# Sample interaction for testing purposes | |
token = input("Enter your GitHub Personal Access Token: ").strip() | |
owner = input("Enter the repository owner: ").strip() | |
repo = input("Enter the repository name: ").strip() | |
bot = GitHubBot() | |
issues = bot.fetch_issues(token, owner, repo) | |
if issues: | |
print(f"Found {len(issues)} issue(s):") | |
for issue in issues: | |
print(f"- #{issue['number']}: {issue['title']}") | |
else: | |
print("No issues found or unable to fetch issues.") | |