acecalisto3 commited on
Commit
b8d31e7
·
verified ·
1 Parent(s): c44af8d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +164 -206
app.py CHANGED
@@ -1,215 +1,173 @@
1
  import gradio as gr
2
- import langchain
3
- import huggingface_hub
4
- import dotenv
5
- import PyYAML
6
- from typing import Optional, Union, List, Dict, Any, Tuple
7
- import subprocess
8
- from pathlib import Path
9
- import json
10
- import tempfile
11
- from datetime import datetime, timezone
12
- import re
13
  import requests
 
 
 
 
 
 
 
14
  import logging
 
15
  import shutil
16
 
 
 
17
  # Configure logging
18
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
19
  logger = logging.getLogger(__name__)
20
 
21
- class TerminalCommand:
22
- @staticmethod
23
- def execute(command: Union[str, List[str]], cwd: Optional[str] = None) -> Tuple[str, str, int]:
24
- """
25
- Execute a terminal command and return stdout, stderr, and return code
26
- """
27
- if isinstance(command, str):
28
- command = command.split()
29
-
30
- try:
31
- process = subprocess.Popen(
32
- command,
33
- stdout=subprocess.PIPE,
34
- stderr=subprocess.PIPE,
35
- cwd=cwd,
36
- text=True
37
- )
38
- stdout, stderr = process.communicate()
39
- return stdout.strip(), stderr.strip(), process.returncode
40
- except Exception as e:
41
- logger.error(f"Error executing command {command}: {e}")
42
- return "", str(e), 1
43
-
44
- class GitUtilities:
45
- def __init__(self, repo_path: str):
46
- self.repo_path = Path(repo_path)
47
-
48
- def clone(self, url: str, branch: str = "main") -> bool:
49
- """Clone a repository"""
50
- stdout, stderr, code = TerminalCommand.execute(
51
- f"git clone -b {branch} {url} {self.repo_path}"
52
- )
53
- if code != 0:
54
- logger.error(f"Git clone failed: {stderr}")
55
- return code == 0
56
-
57
- def commit(self, message: str) -> bool:
58
- """Create a commit with the given message"""
59
- stdout, stderr, code = TerminalCommand.execute(
60
- ["git", "commit", "-m", message],
61
- str(self.repo_path)
62
- )
63
- if code != 0:
64
- logger.error(f"Git commit failed: {stderr}")
65
- return code == 0
66
-
67
- def push(self, remote: str = "origin", branch: str = "main") -> bool:
68
- """Push changes to remote"""
69
- stdout, stderr, code = TerminalCommand.execute(
70
- ["git", "push", remote, branch],
71
- str(self.repo_path)
72
- )
73
- if code != 0:
74
- logger.error(f"Git push failed: {stderr}")
75
- return code == 0
76
-
77
- def create_branch(self, branch_name: str) -> bool:
78
- """Create and checkout a new branch"""
79
- stdout, stderr, code = TerminalCommand.execute(
80
- ["git", "checkout", "-b", branch_name],
81
- str(self.repo_path)
82
- )
83
- if code != 0:
84
- logger.error(f"Git branch creation failed: {stderr}")
85
- return code == 0
86
-
87
- class GitHubBot:
88
- def __init__(self, logger: logging.Logger):
89
- self.github_api = None
90
- self.logger = logger
91
- self.ai_provider = None
92
- self.git = None
93
- self.temp_dir = None
94
- self.base_url = "https://api.github.com"
95
-
96
- def initialize_api(self, token: str):
97
- """Initialize the GitHub API with a token."""
98
- if not token:
99
- raise ValueError("GitHub token is required.")
100
- self.github_api = {"Authorization": f"Bearer {token}"}
101
- self.temp_dir = tempfile.mkdtemp()
102
- self.git = GitUtilities(self.temp_dir)
103
-
104
- def create_pull_request(self, owner: str, repo: str, title: str, body: str, head: str, base: str = "main") -> Dict:
105
- """Create a pull request."""
106
- url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
107
- data = {
108
- "title": title,
109
- "body": body,
110
- "head": head,
111
- "base": base
112
- }
113
- try:
114
- response = requests.post(url, headers=self.github_api, json=data)
115
- response.raise_for_status()
116
- return response.json()
117
- except requests.RequestException as e:
118
- logger.error(f"Error creating pull request: {e}")
119
- raise
120
-
121
- def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str:
122
- """Resolve a GitHub issue."""
123
- try:
124
- self.initialize_api(token)
125
- branch_name = f"fix/issue-{issue_number}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
126
-
127
- # Clone repository
128
- if not self.git.clone(forked_repo):
129
- raise Exception("Failed to clone repository")
130
-
131
- # Create a new branch
132
- if not self.git.create_branch(branch_name):
133
- raise Exception("Failed to create branch")
134
-
135
- # Generate resolution content
136
- resolution_content = self._create_resolution_document(issue_number, resolution)
137
-
138
- # Save resolution file
139
- resolution_path = Path(self.temp_dir) / f"resolution_{issue_number}.md"
140
- with open(resolution_path, "w") as f:
141
- f.write(resolution_content)
142
-
143
- # Commit and push changes
144
- if not self.git.commit(f"Fix for issue #{issue_number}"):
145
- raise Exception("Failed to commit changes")
146
- if not self.git.push("origin", branch_name):
147
- raise Exception("Failed to push changes")
148
-
149
- # Create a pull request
150
- pr = self.create_pull_request(
151
- owner=owner,
152
- repo=repo,
153
- title=f"Fix for issue #{issue_number}",
154
- body="This PR resolves the reported issue with the following resolution.",
155
- head=branch_name
156
- )
157
-
158
- return f"Pull request created: {pr['html_url']}"
159
- except Exception as e:
160
- logger.error(f"Error resolving issue #{issue_number}: {e}")
161
- return f"Error: {e}"
162
- finally:
163
- if self.temp_dir and os.path.exists(self.temp_dir):
164
- shutil.rmtree(self.temp_dir)
165
-
166
- def _create_resolution_document(self, issue_number: int, resolution: str) -> str:
167
- """Create a resolution document."""
168
- return f"""# Resolution for Issue #{issue_number}
169
- ## Resolution Details
170
- {resolution}
171
- ## Metadata
172
- - Date: {datetime.now(timezone.utc).isoformat()}
173
- - Resolved By: Automated System
174
- """
175
-
176
- def create_gradio_interface():
177
- """Create the Gradio interface."""
178
- bot = GitHubBot(logger)
179
-
180
- def on_resolve(token, repo_url, issue_number, resolution, forked_repo):
181
- try:
182
- parts = repo_url.strip("/").split("/")
183
- owner, repo = parts[-2], parts[-1]
184
- result = bot.resolve_issue(token, owner, repo, int(issue_number), resolution, forked_repo)
185
- return result
186
- except Exception as e:
187
- logger.error(f"Error in issue resolution: {e}")
188
- return f"Error: {e}"
189
-
190
- with gr.Blocks() as demo:
191
- gr.Markdown("# GitHub Issue Resolver")
192
- gr.Markdown("Resolve GitHub issues with AI assistance and Git integration.")
193
-
194
- with gr.Tab("Issue Resolution"):
195
- with gr.Row():
196
- token_input = gr.Textbox(label="GitHub Token", placeholder="Enter your GitHub token")
197
- repo_url_input = gr.Textbox(label="Repository URL", placeholder="Enter the repository URL")
198
- issue_number_input = gr.Number(label="Issue Number", precision=0)
199
- resolution_input = gr.Textbox(label="Resolution", placeholder="Describe the resolution for the issue")
200
- forked_repo_input = gr.Textbox(label="Forked Repo URL", placeholder="Enter the forked repository URL")
201
-
202
- resolve_button = gr.Button("Resolve Issue")
203
- result_output = gr.Textbox(label="Result", interactive=False)
204
-
205
- resolve_button.click(
206
- fn=on_resolve,
207
- inputs=[token_input, repo_url_input, issue_number_input, resolution_input, forked_repo_input],
208
- outputs=[result_output]
209
- )
210
-
211
- return demo
212
-
213
- if __name__ == "__main__":
214
- demo = create_gradio_interface()
215
- demo.launch(server_name="0.0.0.0", server_port=7860)
 
1
  import gradio as gr
2
+ import os
 
 
 
 
 
 
 
 
 
 
3
  import requests
4
+ import uuid
5
+ from pathlib import Path
6
+ from typing import Optional, Union, List, Tuple, Dict, Any
7
+ from pypdf import PdfReader
8
+ from bs4 import BeautifulSoup
9
+ import zipfile
10
+ import nltk
11
  import logging
12
+ import tempfile
13
  import shutil
14
 
15
+ nltk.download('punkt')
16
+
17
  # Configure logging
18
  logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
19
  logger = logging.getLogger(__name__)
20
 
21
+ # Utility to log messages
22
+ def log(message: str):
23
+ logger.info(message)
24
+
25
+ # File and Web Processing Utilities
26
+ def chunk_text(text: str, max_chunk_size: int) -> List[str]:
27
+ """Breaks large text into manageable chunks."""
28
+ tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
29
+ sentences = tokenizer.tokenize(text)
30
+ chunks = []
31
+ current_chunk = ""
32
+ for sentence in sentences:
33
+ if len(current_chunk) + len(sentence) + 1 > max_chunk_size:
34
+ chunks.append(current_chunk.strip())
35
+ current_chunk = ""
36
+ current_chunk += sentence + " "
37
+ if current_chunk:
38
+ chunks.append(current_chunk.strip())
39
+ return chunks
40
+
41
+ def read_pdf(file_path: str) -> str:
42
+ """Reads text content from a PDF file."""
43
+ try:
44
+ reader = PdfReader(file_path)
45
+ return "\n".join(page.extract_text() for page in reader.pages)
46
+ except Exception as e:
47
+ logger.error(f"Error reading PDF: {e}")
48
+ return ""
49
+
50
+ def read_txt(file_path: str) -> str:
51
+ """Reads content from a TXT file."""
52
+ try:
53
+ with open(file_path, "r", encoding="utf-8") as f:
54
+ return f.read()
55
+ except Exception as e:
56
+ logger.error(f"Error reading TXT file: {e}")
57
+ return ""
58
+
59
+ def read_zip(zip_path: str) -> str:
60
+ """Extracts and processes text and PDF files within a ZIP archive."""
61
+ extracted_data = []
62
+ try:
63
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
64
+ for file_info in zip_ref.infolist():
65
+ if file_info.filename.endswith((".txt", ".pdf")):
66
+ with zip_ref.open(file_info) as file:
67
+ try:
68
+ if file_info.filename.endswith(".txt"):
69
+ extracted_data.append(file.read().decode("utf-8"))
70
+ elif file_info.filename.endswith(".pdf"):
71
+ temp_path = f"/tmp/{uuid.uuid4()}"
72
+ with open(temp_path, "wb") as temp_file:
73
+ temp_file.write(file.read())
74
+ extracted_data.append(read_pdf(temp_path))
75
+ os.remove(temp_path)
76
+ except Exception as e:
77
+ logger.error(f"Error processing file in ZIP: {e}")
78
+ return "\n".join(extracted_data)
79
+ except Exception as e:
80
+ logger.error(f"Error extracting ZIP: {e}")
81
+ return ""
82
+
83
+ def fetch_url(url: str, max_depth: int) -> str:
84
+ """Fetches and scrapes text content from a webpage."""
85
+ visited = set()
86
+ to_visit = [(url, 0)]
87
+ results = []
88
+ while to_visit:
89
+ current_url, depth = to_visit.pop(0)
90
+ if current_url in visited:
91
+ continue
92
+ visited.add(current_url)
93
+ if depth < max_depth:
94
+ try:
95
+ response = requests.get(current_url, timeout=10)
96
+ response.raise_for_status()
97
+ soup = BeautifulSoup(response.content, 'lxml')
98
+ results.append(soup.text)
99
+ for link in soup.find_all("a", href=True):
100
+ absolute_url = requests.compat.urljoin(current_url, link.get('href'))
101
+ if absolute_url.startswith("http") and absolute_url not in visited:
102
+ to_visit.append((absolute_url, depth + 1))
103
+ except Exception as e:
104
+ logger.error(f"Error fetching URL {current_url}: {e}")
105
+ return "\n".join(results)
106
+
107
+ # Main Workflow Processing
108
+ def process_workflow(command: str, issue_details: str, files: List[Path], url: str, token: str, max_depth: int) -> Dict[str, Any]:
109
+ """Processes user input and performs selected command."""
110
+ datasets = []
111
+ errors = []
112
+ try:
113
+ # Add issue details to dataset
114
+ if issue_details:
115
+ datasets.append(issue_details)
116
+
117
+ # Process uploaded files
118
+ if files:
119
+ for file in files:
120
+ if file.name.endswith(".pdf"):
121
+ datasets.append(read_pdf(file.name))
122
+ elif file.name.endswith(".txt"):
123
+ datasets.append(read_txt(file.name))
124
+ elif file.name.endswith(".zip"):
125
+ datasets.append(read_zip(file.name))
126
+
127
+ # Fetch URL content
128
+ if url:
129
+ datasets.append(fetch_url(url, max_depth=max_depth))
130
+
131
+ # Execute commands
132
+ if command == "Analyze Issue":
133
+ analysis = chunk_text("\n".join(datasets), 8192)
134
+ return {"analysis": analysis}
135
+ elif command == "Propose Resolution":
136
+ resolution = f"Proposed resolution based on:\n\n{'\n'.join(datasets)}"
137
+ return {"resolution": resolution}
138
+ elif command == "Generate PR":
139
+ combined_data = "\n".join(datasets)
140
+ return {"pr_content": combined_data, "message": "Pull request content generated."}
141
+ else:
142
+ return {"error": "Invalid command"}
143
+ except Exception as e:
144
+ errors.append(str(e))
145
+ return {"error": "\n".join(errors)}
146
+
147
+ # Gradio Interface
148
+ with gr.Blocks() as demo:
149
+ gr.Markdown("# GitHub Issue Resolver - Advanced Edition")
150
+ gr.Markdown("Analyze issues, propose resolutions, and generate PRs.")
151
+
152
+ # Input Fields
153
+ with gr.Row():
154
+ command = gr.Dropdown(["Analyze Issue", "Propose Resolution", "Generate PR"], label="Command")
155
+ issue_details = gr.Textbox(label="Issue Details", lines=4, placeholder="Describe the issue or paste details.")
156
+ files = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"])
157
+ url = gr.Textbox(label="Documentation URL", placeholder="Enter related documentation URL.")
158
+ token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter your GitHub token securely.")
159
+ max_depth = gr.Slider(label="Web Crawl Depth", minimum=1, maximum=10, value=3, step=1)
160
+
161
+ # Outputs
162
+ result_output = gr.JSON(label="Results")
163
+ process_button = gr.Button("Process")
164
+
165
+ # Button Logic
166
+ process_button.click(
167
+ process_workflow,
168
+ inputs=[command, issue_details, files, url, token, max_depth],
169
+ outputs=[result_output]
170
+ )
171
+
172
+ # Launch Application
173
+ demo.launch()