acecalisto3 commited on
Commit
1265bd3
·
verified ·
1 Parent(s): b8d31e7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +206 -164
app.py CHANGED
@@ -1,173 +1,215 @@
1
  import gradio as gr
2
- import os
3
- import requests
4
- import uuid
 
 
 
5
  from pathlib import Path
6
- from typing import Optional, Union, List, Tuple, Dict, Any
7
- from pypdf import PdfReader
8
- from bs4 import BeautifulSoup
9
- import zipfile
10
- import nltk
11
- import logging
12
  import tempfile
 
 
 
 
13
  import shutil
14
 
15
- nltk.download('punkt')
16
-
17
  # Configure logging
18
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
19
  logger = logging.getLogger(__name__)
20
 
21
- # Utility to log messages
22
- def log(message: str):
23
- logger.info(message)
24
-
25
- # File and Web Processing Utilities
26
- def chunk_text(text: str, max_chunk_size: int) -> List[str]:
27
- """Breaks large text into manageable chunks."""
28
- tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
29
- sentences = tokenizer.tokenize(text)
30
- chunks = []
31
- current_chunk = ""
32
- for sentence in sentences:
33
- if len(current_chunk) + len(sentence) + 1 > max_chunk_size:
34
- chunks.append(current_chunk.strip())
35
- current_chunk = ""
36
- current_chunk += sentence + " "
37
- if current_chunk:
38
- chunks.append(current_chunk.strip())
39
- return chunks
40
-
41
- def read_pdf(file_path: str) -> str:
42
- """Reads text content from a PDF file."""
43
- try:
44
- reader = PdfReader(file_path)
45
- return "\n".join(page.extract_text() for page in reader.pages)
46
- except Exception as e:
47
- logger.error(f"Error reading PDF: {e}")
48
- return ""
49
-
50
- def read_txt(file_path: str) -> str:
51
- """Reads content from a TXT file."""
52
- try:
53
- with open(file_path, "r", encoding="utf-8") as f:
54
- return f.read()
55
- except Exception as e:
56
- logger.error(f"Error reading TXT file: {e}")
57
- return ""
58
-
59
- def read_zip(zip_path: str) -> str:
60
- """Extracts and processes text and PDF files within a ZIP archive."""
61
- extracted_data = []
62
- try:
63
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
64
- for file_info in zip_ref.infolist():
65
- if file_info.filename.endswith((".txt", ".pdf")):
66
- with zip_ref.open(file_info) as file:
67
- try:
68
- if file_info.filename.endswith(".txt"):
69
- extracted_data.append(file.read().decode("utf-8"))
70
- elif file_info.filename.endswith(".pdf"):
71
- temp_path = f"/tmp/{uuid.uuid4()}"
72
- with open(temp_path, "wb") as temp_file:
73
- temp_file.write(file.read())
74
- extracted_data.append(read_pdf(temp_path))
75
- os.remove(temp_path)
76
- except Exception as e:
77
- logger.error(f"Error processing file in ZIP: {e}")
78
- return "\n".join(extracted_data)
79
- except Exception as e:
80
- logger.error(f"Error extracting ZIP: {e}")
81
- return ""
82
-
83
- def fetch_url(url: str, max_depth: int) -> str:
84
- """Fetches and scrapes text content from a webpage."""
85
- visited = set()
86
- to_visit = [(url, 0)]
87
- results = []
88
- while to_visit:
89
- current_url, depth = to_visit.pop(0)
90
- if current_url in visited:
91
- continue
92
- visited.add(current_url)
93
- if depth < max_depth:
94
- try:
95
- response = requests.get(current_url, timeout=10)
96
- response.raise_for_status()
97
- soup = BeautifulSoup(response.content, 'lxml')
98
- results.append(soup.text)
99
- for link in soup.find_all("a", href=True):
100
- absolute_url = requests.compat.urljoin(current_url, link.get('href'))
101
- if absolute_url.startswith("http") and absolute_url not in visited:
102
- to_visit.append((absolute_url, depth + 1))
103
- except Exception as e:
104
- logger.error(f"Error fetching URL {current_url}: {e}")
105
- return "\n".join(results)
106
-
107
- # Main Workflow Processing
108
- def process_workflow(command: str, issue_details: str, files: List[Path], url: str, token: str, max_depth: int) -> Dict[str, Any]:
109
- """Processes user input and performs selected command."""
110
- datasets = []
111
- errors = []
112
- try:
113
- # Add issue details to dataset
114
- if issue_details:
115
- datasets.append(issue_details)
116
-
117
- # Process uploaded files
118
- if files:
119
- for file in files:
120
- if file.name.endswith(".pdf"):
121
- datasets.append(read_pdf(file.name))
122
- elif file.name.endswith(".txt"):
123
- datasets.append(read_txt(file.name))
124
- elif file.name.endswith(".zip"):
125
- datasets.append(read_zip(file.name))
126
-
127
- # Fetch URL content
128
- if url:
129
- datasets.append(fetch_url(url, max_depth=max_depth))
130
-
131
- # Execute commands
132
- if command == "Analyze Issue":
133
- analysis = chunk_text("\n".join(datasets), 8192)
134
- return {"analysis": analysis}
135
- elif command == "Propose Resolution":
136
- resolution = f"Proposed resolution based on:\n\n{'\n'.join(datasets)}"
137
- return {"resolution": resolution}
138
- elif command == "Generate PR":
139
- combined_data = "\n".join(datasets)
140
- return {"pr_content": combined_data, "message": "Pull request content generated."}
141
- else:
142
- return {"error": "Invalid command"}
143
- except Exception as e:
144
- errors.append(str(e))
145
- return {"error": "\n".join(errors)}
146
-
147
- # Gradio Interface
148
- with gr.Blocks() as demo:
149
- gr.Markdown("# GitHub Issue Resolver - Advanced Edition")
150
- gr.Markdown("Analyze issues, propose resolutions, and generate PRs.")
151
-
152
- # Input Fields
153
- with gr.Row():
154
- command = gr.Dropdown(["Analyze Issue", "Propose Resolution", "Generate PR"], label="Command")
155
- issue_details = gr.Textbox(label="Issue Details", lines=4, placeholder="Describe the issue or paste details.")
156
- files = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"])
157
- url = gr.Textbox(label="Documentation URL", placeholder="Enter related documentation URL.")
158
- token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter your GitHub token securely.")
159
- max_depth = gr.Slider(label="Web Crawl Depth", minimum=1, maximum=10, value=3, step=1)
160
-
161
- # Outputs
162
- result_output = gr.JSON(label="Results")
163
- process_button = gr.Button("Process")
164
-
165
- # Button Logic
166
- process_button.click(
167
- process_workflow,
168
- inputs=[command, issue_details, files, url, token, max_depth],
169
- outputs=[result_output]
170
- )
171
-
172
- # Launch Application
173
- demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import gradio as gr
2
+ import langchain
3
+ import huggingface_hub
4
+ import dotenv
5
+ import yaml
6
+ from typing import Optional, Union, List, Dict, Any, Tuple
7
+ import subprocess
8
  from pathlib import Path
9
+ import json
 
 
 
 
 
10
  import tempfile
11
+ from datetime import datetime, timezone
12
+ import re
13
+ import requests
14
+ import logging
15
  import shutil
16
 
 
 
17
  # Configure logging
18
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
19
  logger = logging.getLogger(__name__)
20
 
21
+ class TerminalCommand:
22
+ @staticmethod
23
+ def execute(command: Union[str, List[str]], cwd: Optional[str] = None) -> Tuple[str, str, int]:
24
+ """
25
+ Execute a terminal command and return stdout, stderr, and return code
26
+ """
27
+ if isinstance(command, str):
28
+ command = command.split()
29
+
30
+ try:
31
+ process = subprocess.Popen(
32
+ command,
33
+ stdout=subprocess.PIPE,
34
+ stderr=subprocess.PIPE,
35
+ cwd=cwd,
36
+ text=True
37
+ )
38
+ stdout, stderr = process.communicate()
39
+ return stdout.strip(), stderr.strip(), process.returncode
40
+ except Exception as e:
41
+ logger.error(f"Error executing command {command}: {e}")
42
+ return "", str(e), 1
43
+
44
+ class GitUtilities:
45
+ def __init__(self, repo_path: str):
46
+ self.repo_path = Path(repo_path)
47
+
48
+ def clone(self, url: str, branch: str = "main") -> bool:
49
+ """Clone a repository"""
50
+ stdout, stderr, code = TerminalCommand.execute(
51
+ f"git clone -b {branch} {url} {self.repo_path}"
52
+ )
53
+ if code != 0:
54
+ logger.error(f"Git clone failed: {stderr}")
55
+ return code == 0
56
+
57
+ def commit(self, message: str) -> bool:
58
+ """Create a commit with the given message"""
59
+ stdout, stderr, code = TerminalCommand.execute(
60
+ ["git", "commit", "-m", message],
61
+ str(self.repo_path)
62
+ )
63
+ if code != 0:
64
+ logger.error(f"Git commit failed: {stderr}")
65
+ return code == 0
66
+
67
+ def push(self, remote: str = "origin", branch: str = "main") -> bool:
68
+ """Push changes to remote"""
69
+ stdout, stderr, code = TerminalCommand.execute(
70
+ ["git", "push", remote, branch],
71
+ str(self.repo_path)
72
+ )
73
+ if code != 0:
74
+ logger.error(f"Git push failed: {stderr}")
75
+ return code == 0
76
+
77
+ def create_branch(self, branch_name: str) -> bool:
78
+ """Create and checkout a new branch"""
79
+ stdout, stderr, code = TerminalCommand.execute(
80
+ ["git", "checkout", "-b", branch_name],
81
+ str(self.repo_path)
82
+ )
83
+ if code != 0:
84
+ logger.error(f"Git branch creation failed: {stderr}")
85
+ return code == 0
86
+
87
+ class GitHubBot:
88
+ def __init__(self, logger: logging.Logger):
89
+ self.github_api = None
90
+ self.logger = logger
91
+ self.ai_provider = None
92
+ self.git = None
93
+ self.temp_dir = None
94
+ self.base_url = "https://api.github.com"
95
+
96
+ def initialize_api(self, token: str):
97
+ """Initialize the GitHub API with a token."""
98
+ if not token:
99
+ raise ValueError("GitHub token is required.")
100
+ self.github_api = {"Authorization": f"Bearer {token}"}
101
+ self.temp_dir = tempfile.mkdtemp()
102
+ self.git = GitUtilities(self.temp_dir)
103
+
104
+ def create_pull_request(self, owner: str, repo: str, title: str, body: str, head: str, base: str = "main") -> Dict:
105
+ """Create a pull request."""
106
+ url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
107
+ data = {
108
+ "title": title,
109
+ "body": body,
110
+ "head": head,
111
+ "base": base
112
+ }
113
+ try:
114
+ response = requests.post(url, headers=self.github_api, json=data)
115
+ response.raise_for_status()
116
+ return response.json()
117
+ except requests.RequestException as e:
118
+ logger.error(f"Error creating pull request: {e}")
119
+ raise
120
+
121
+ def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str:
122
+ """Resolve a GitHub issue."""
123
+ try:
124
+ self.initialize_api(token)
125
+ branch_name = f"fix/issue-{issue_number}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
126
+
127
+ # Clone repository
128
+ if not self.git.clone(forked_repo):
129
+ raise Exception("Failed to clone repository")
130
+
131
+ # Create a new branch
132
+ if not self.git.create_branch(branch_name):
133
+ raise Exception("Failed to create branch")
134
+
135
+ # Generate resolution content
136
+ resolution_content = self._create_resolution_document(issue_number, resolution)
137
+
138
+ # Save resolution file
139
+ resolution_path = Path(self.temp_dir) / f"resolution_{issue_number}.md"
140
+ with open(resolution_path, "w") as f:
141
+ f.write(resolution_content)
142
+
143
+ # Commit and push changes
144
+ if not self.git.commit(f"Fix for issue #{issue_number}"):
145
+ raise Exception("Failed to commit changes")
146
+ if not self.git.push("origin", branch_name):
147
+ raise Exception("Failed to push changes")
148
+
149
+ # Create a pull request
150
+ pr = self.create_pull_request(
151
+ owner=owner,
152
+ repo=repo,
153
+ title=f"Fix for issue #{issue_number}",
154
+ body="This PR resolves the reported issue with the following resolution.",
155
+ head=branch_name
156
+ )
157
+
158
+ return f"Pull request created: {pr['html_url']}"
159
+ except Exception as e:
160
+ logger.error(f"Error resolving issue #{issue_number}: {e}")
161
+ return f"Error: {e}"
162
+ finally:
163
+ if self.temp_dir and os.path.exists(self.temp_dir):
164
+ shutil.rmtree(self.temp_dir)
165
+
166
+ def _create_resolution_document(self, issue_number: int, resolution: str) -> str:
167
+ """Create a resolution document."""
168
+ return f"""# Resolution for Issue #{issue_number}
169
+ ## Resolution Details
170
+ {resolution}
171
+ ## Metadata
172
+ - Date: {datetime.now(timezone.utc).isoformat()}
173
+ - Resolved By: Automated System
174
+ """
175
+
176
+ def create_gradio_interface():
177
+ """Create the Gradio interface."""
178
+ bot = GitHubBot(logger)
179
+
180
+ def on_resolve(token, repo_url, issue_number, resolution, forked_repo):
181
+ try:
182
+ parts = repo_url.strip("/").split("/")
183
+ owner, repo = parts[-2], parts[-1]
184
+ result = bot.resolve_issue(token, owner, repo, int(issue_number), resolution, forked_repo)
185
+ return result
186
+ except Exception as e:
187
+ logger.error(f"Error in issue resolution: {e}")
188
+ return f"Error: {e}"
189
+
190
+ with gr.Blocks() as demo:
191
+ gr.Markdown("# GitHub Issue Resolver")
192
+ gr.Markdown("Resolve GitHub issues with AI assistance and Git integration.")
193
+
194
+ with gr.Tab("Issue Resolution"):
195
+ with gr.Row():
196
+ token_input = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token")
197
+ repo_url_input = gr.Textbox(label="Repository URL", placeholder="Enter the repository URL")
198
+ issue_number_input = gr.Number(label="Issue Number", precision=0)
199
+ resolution_input = gr.Textbox(label="Resolution", placeholder="Describe the resolution for the issue")
200
+ forked_repo_input = gr.Textbox(label="Forked Repo URL", placeholder="Enter the forked repository URL")
201
+
202
+ resolve_button = gr.Button("Resolve Issue")
203
+ result_output = gr.Textbox(label="Result", interactive=False)
204
+
205
+ resolve_button.click(
206
+ fn=on_resolve,
207
+ inputs=[token_input, repo_url_input, issue_number_input, resolution_input, forked_repo_input],
208
+ outputs=[result_output]
209
+ )
210
+
211
+ return demo
212
+
213
+ if __name__ == "__main__":
214
+ demo = create_gradio_interface()
215
+ demo.launch(server_name="0.0.0.0", server_port=7860)