import gradio as gr import os import aiohttp import asyncio from git import Repo, GitCommandError from pathlib import Path from datetime import datetime import shutil import json import logging import re from typing import Dict, List, Optional, Tuple import subprocess import plotly.express as px import plotly.graph_objects as go from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer import threading from http.server import HTTPServer, BaseHTTPRequestHandler import speech_recognition as sr # Removed duplicate import: from code_editor import code_editor from functools import lru_cache import hashlib import markdown2 from concurrent.futures import ThreadPoolExecutor from hdbscan import HDBSCAN import websockets from websockets.exceptions import ConnectionClosed from code_editor import code_editor # ========== Configuration ========== WORKSPACE = Path("/tmp/issue_workspace") WORKSPACE.mkdir(exist_ok=True) GITHUB_API = "https://api.github.com/repos" HF_INFERENCE_API = "https://api-inference.huggingface.co/models" WEBHOOK_PORT = 8000 WS_PORT = 8001 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) executor = ThreadPoolExecutor(max_workers=4) HF_MODELS = { "Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1", "Llama-3-8B": "meta-llama/Meta-Llama-3-8B", "CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf", "StarCoder2": "bigcode/starcoder2-15b" } # Default Model DEFAULT_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" # ========== Modern Theme ========== # Define the base theme theme = gr.themes.Soft( primary_hue="violet", secondary_hue="emerald", radius_size="lg", font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] ).set( # Apply custom settings using .set() button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)", button_primary_text_color="white", # button_primary_border_radius="12px", # <-- FIX: Removed this line causing the TypeError block_label_text_size="lg", block_label_text_weight="600", block_title_text_size="lg", block_title_text_weight="800", panel_background_fill="white", # panel_border_radius="16px", # Assuming this might also cause issues if not supported, commented out as a precaution. Uncomment if needed and supported. block_shadow="*shadow_drop_lg", ) # ========== Enhanced Webhook Handler ========== class WebhookHandler(BaseHTTPRequestHandler): # Keep a reference to the manager instance manager_instance = None def do_POST(self): content_length = int(self.headers['Content-Length']) try: payload = json.loads(self.rfile.read(content_length).decode('utf-8')) except json.JSONDecodeError: self.send_response(400) self.end_headers() self.wfile.write(b"Invalid JSON payload") return except Exception as e: logger.error(f"Error reading webhook payload: {e}") self.send_response(500) self.end_headers() return event = self.headers.get('X-GitHub-Event') logger.info(f"Received GitHub webhook event: {event}") if event == 'issues' and WebhookHandler.manager_instance: action = payload.get('action') logger.info(f"Issue action: {action}") if action in ['opened', 'reopened', 'closed', 'assigned', 'edited']: # Handle edited issues too # Ensure the event loop is running in the webhook thread if needed # Get the loop associated with the asyncio thread loop = asyncio.get_event_loop_policy().get_event_loop() if loop.is_running(): asyncio.run_coroutine_threadsafe( WebhookHandler.manager_instance.handle_webhook_event(event, action, payload), loop ) else: logger.error("Asyncio event loop is not running in the target thread for webhook.") elif event == 'ping': logger.info("Received GitHub webhook ping.") else: logger.warning(f"Unhandled event type: {event} or manager not initialized.") self.send_response(200) self.end_headers() self.wfile.write(b"OK") # ========== AI-Powered Issue Manager ========== class IssueManager: def __init__(self): self.issues: Dict[int, dict] = {} self.repo_url: Optional[str] = None self.repo: Optional[Repo] = None self.current_issue: Optional[int] = None self.github_token: Optional[str] = None self.hf_token: Optional[str] = None self.collaborators: Dict[str, dict] = {} # Example: {"user1": {"status": "editing file.py"}} self.points: int = 0 self.severity_rules: Dict[str, List[str]] = { "Critical": ["critical", "urgent", "security", "crash", "blocker"], "High": ["high", "important", "error", "regression", "major"], "Medium": ["medium", "bug", "performance", "minor"], "Low": ["low", "documentation", "enhancement", "trivial", "feature"] } self.issue_clusters: Dict[int, List[int]] = {} # Store clusters: {cluster_id: [issue_index1, issue_index2]} self.issue_list_for_clustering: List[dict] = [] # Store issues in list order for clustering index mapping # self._init_local_models() # Consider lazy loading or conditional loading self.ws_clients: List[websockets.WebSocketServerProtocol] = [] # Use WebSocketServerProtocol self.code_editors: Dict[int, OTCodeEditor] = {} # Store code editors for each issue self.main_loop = asyncio.get_event_loop() # Store ref to main loop if needed elsewhere # Placeholder for local model initialization - implement actual loading if needed def _init_local_models(self): logger.info("Initializing local models (placeholder)...") # self.code_model = pipeline(...) # self.summarizer = pipeline(...) logger.info("Local models initialized (placeholder).") # Simple hash for caching based on issue content def _get_issue_hash(self, issue_data: dict) -> str: content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}" return hashlib.md5(content.encode()).hexdigest() @lru_cache(maxsize=100) async def cached_suggestion(self, issue_hash: str, model_key: str) -> str: # Find the issue corresponding to the hash (inefficient, improve if needed) found_issue = None for issue in self.issues.values(): if self._get_issue_hash(issue) == issue_hash: found_issue = issue break if not found_issue: return "Error: Issue not found for the given hash." if model_key not in HF_MODELS: return f"Error: Invalid model key: {model_key}" logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model_key}.") # Pass the actual issue dict and model_key to suggest_resolution return await self.suggest_resolution(found_issue, model_key) async def handle_webhook_event(self, event: str, action: str, payload: dict): logger.info(f"Processing webhook event: {event}, action: {action}") issue_data = payload.get('issue') if not issue_data: logger.warning("Webhook payload missing 'issue' data.") return issue_number = issue_data.get('number') if not issue_number: logger.warning("Webhook issue data missing 'number'.") return needs_update = False if action == 'closed': logger.info(f"Removing closed issue {issue_number} from active list.") if self.issues.pop(issue_number, None): needs_update = True # Optionally remove associated editor, etc. self.code_editors.pop(issue_number, None) elif action in ['opened', 'reopened', 'edited']: # Handle edited issues too logger.info(f"Adding/Updating issue {issue_number} from webhook.") self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper needs_update = True # Potentially trigger re-clustering or update specific issue details elif action == 'assigned': logger.info(f"Issue {issue_number} assigned to {payload.get('assignee', {}).get('login', 'N/A')}") if issue_number in self.issues: self.issues[issue_number] = self._process_issue_data(issue_data) # Update issue data needs_update = True else: # Issue might not be in our list if it wasn't open initially self.issues[issue_number] = self._process_issue_data(issue_data) needs_update = True else: logger.info(f"Ignoring action '{action}' for issue {issue_number}.") # Trigger a UI update notification via WebSocket if data changed if needs_update: await self.broadcast_issue_update() def _process_issue_data(self, issue_data: dict) -> dict: """Helper to structure issue data consistently.""" return { "id": issue_data['number'], "title": issue_data.get('title', 'No Title'), "body": issue_data.get('body', ''), "state": issue_data.get('state', 'unknown'), "labels": [label['name'] for label in issue_data.get('labels', [])], "assignee": issue_data.get('assignee', {}).get('login') if issue_data.get('assignee') else None, "url": issue_data.get('html_url', '#') # Add other relevant fields if needed } async def crawl_issues(self, repo_url: str, github_token: str, hf_token: str) -> Tuple[List[List], go.Figure, str]: """ Crawls issues, updates internal state, performs clustering, and returns data for UI update. """ if not repo_url or not hf_token: # GitHub token is optional for public repos return [], go.Figure(), "Error: Repository URL and HF Token are required." logger.info(f"Starting issue crawl for {repo_url}") self.repo_url = repo_url self.github_token = github_token self.hf_token = hf_token self.issues = {} # Reset issues before crawl # Extract owner/repo from URL match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", repo_url) if not match: logger.error(f"Invalid GitHub URL format: {repo_url}") return [], go.Figure(), "Error: Invalid GitHub URL format. Use https://github.com/owner/repo" owner, repo_name = match.groups() api_url = f"{GITHUB_API}/{owner}/{repo_name}/issues?state=open&per_page=100" # Fetch only open issues, max 100 headers = { "Accept": "application/vnd.github.v3+json" } if github_token: headers["Authorization"] = f"token {github_token}" try: all_issues_data = [] page = 1 logger.info(f"Fetching issues from {api_url}...") async with aiohttp.ClientSession(headers=headers) as session: while True: paginated_url = f"{api_url}&page={page}" async with session.get(paginated_url) as response: response.raise_for_status() # Raise exception for bad status codes issues_page_data = await response.json() if not issues_page_data: # No more issues on this page break logger.info(f"Fetched page {page} with {len(issues_page_data)} issues.") all_issues_data.extend(issues_page_data) # Check Link header for next page (more robust pagination) if 'next' not in response.headers.get('Link', ''): break page += 1 logger.info(f"Total issues fetched: {len(all_issues_data)}") for issue_data in all_issues_data: issue_number = issue_data['number'] self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper if not self.issues: logger.info("No open issues found.") return [], go.Figure(), "No open issues found in the repository." # Prepare data for clustering self.issue_list_for_clustering = list(self.issues.values()) logger.info("Clustering issues...") await self._cluster_similar_issues() # Update self.issue_clusters # Prepare data for Gradio Dataframe dataframe_data = [] severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0} # Map clustered indices back to issue numbers and determine severity cluster_map = {} # {issue_index: cluster_id} for cluster_id, indices in self.issue_clusters.items(): for index in indices: cluster_map[index] = cluster_id for i, issue in enumerate(self.issue_list_for_clustering): severity = self._determine_severity(issue['labels']) severity_counts[severity] += 1 cluster_id = cluster_map.get(i, -1) # -1 for noise/unclustered dataframe_data.append([ issue['id'], issue['title'], severity, cluster_id if cluster_id != -1 else "N/A" # Display N/A for noise ]) logger.info("Generating statistics plot...") stats_fig = self._generate_stats_plot(severity_counts) success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups (excluding noise)." logger.info(success_msg) return dataframe_data, stats_fig, success_msg except aiohttp.ClientResponseError as e: logger.error(f"GitHub API request failed: {e.status} {e.message}") error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token permissions and repo URL." if e.status == 404: error_msg = f"Error: Repository not found at {repo_url}. Check the URL." elif e.status == 401: error_msg = "Error: Invalid GitHub token or insufficient permissions." elif e.status == 403: error_msg = "Error: GitHub API rate limit exceeded or forbidden access. Try adding a GitHub token." return [], go.Figure(), error_msg except GitCommandError as e: logger.error(f"Git clone error: {e}") # Should not happen if not cloning return [], go.Figure(), f"Error related to Git: {e}" except Exception as e: logger.exception(f"An unexpected error occurred during issue crawl: {e}") # Log full traceback return [], go.Figure(), f"An unexpected error occurred: {e}" def _determine_severity(self, labels: List[str]) -> str: """Determines issue severity based on labels.""" labels_lower = [label.lower() for label in labels] for severity, keywords in self.severity_rules.items(): if any(keyword in label for keyword in keywords for label in labels_lower): return severity return "Unknown" # Default if no matching label found def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure: """Generates a Plotly bar chart for issue severity distribution.""" # Filter out severities with 0 counts filtered_counts = {k: v for k, v in severity_counts.items() if v > 0} if not filtered_counts: # Return an empty figure with a message if no issues found fig = go.Figure() fig.update_layout( title="Issue Severity Distribution", xaxis = {"visible": False}, yaxis = {"visible": False}, annotations = [{ "text": "No issues found to display statistics.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16} }], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)' ) return fig severities = list(filtered_counts.keys()) counts = list(filtered_counts.values()) fig = px.bar( x=severities, y=counts, title="Issue Severity Distribution", labels={'x': 'Severity', 'y': 'Number of Issues'}, color=severities, # Color bars by severity color_discrete_map={ # Define colors 'Critical': '#DC2626', # Red 'High': '#F97316', # Orange 'Medium': '#FACC15', # Yellow 'Low': '#84CC16', # Lime 'Unknown': '#6B7280' # Gray }, text=counts # Display counts on bars ) fig.update_layout( xaxis_title=None, # Cleaner look yaxis_title="Number of Issues", plot_bgcolor='rgba(0,0,0,0)', # Transparent background paper_bgcolor='rgba(0,0,0,0)', showlegend=False, xaxis={'categoryorder':'array', 'categoryarray':['Critical', 'High', 'Medium', 'Low', 'Unknown']} # Order bars ) fig.update_traces(textposition='outside') return fig async def _cluster_similar_issues(self): """Generates embeddings and clusters issues using HDBSCAN.""" if not self.issue_list_for_clustering or not self.hf_token: logger.warning("Cannot cluster issues: No issues loaded or HF token missing.") self.issue_clusters = {} return logger.info("Generating embeddings for clustering...") try: # Combine title and body for embedding generation texts_to_embed = [f"{i.get('title','')} - {i.get('body','')[:500]}" for i in self.issue_list_for_clustering] # Limit body length embeddings = await self._generate_embeddings(texts_to_embed) if embeddings is None or not isinstance(embeddings, list) or len(embeddings) != len(self.issue_list_for_clustering): logger.error(f"Failed to generate valid embeddings. Expected {len(self.issue_list_for_clustering)}, got {len(embeddings) if embeddings else 'None'}.") self.issue_clusters = {} return logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN...") # Use HDBSCAN for density-based clustering # min_cluster_size: minimum number of samples in a cluster # metric: distance metric used # allow_single_cluster: If True, allows forming a single large cluster clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True) clusters = clusterer.fit_predict(embeddings) self.issue_clusters = {} noise_count = 0 for i, cluster_id in enumerate(clusters): if cluster_id == -1: # HDBSCAN uses -1 for noise points noise_count += 1 continue # Skip noise points if cluster_id not in self.issue_clusters: self.issue_clusters[cluster_id] = [] self.issue_clusters[cluster_id].append(i) # Store original index logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters with {noise_count} noise points.") except Exception as e: logger.exception(f"Error during issue clustering: {e}") self.issue_clusters = {} async def _generate_embeddings(self, texts: List[str]): """Generates sentence embeddings using Hugging Face Inference API.""" if not self.hf_token: logger.error("Hugging Face token is not set. Cannot generate embeddings.") return None # Recommended embedding model (check HF for alternatives if needed) model_id = "sentence-transformers/all-mpnet-base-v2" api_url = f"{HF_INFERENCE_API}/{model_id}" headers = {"Authorization": f"Bearer {self.hf_token}"} logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.") # Add timeout to the request timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: try: # Add wait_for_model=True if using serverless inference endpoints payload = {"inputs": texts, "options": {"wait_for_model": True}} response = await session.post(api_url, json=payload) response.raise_for_status() result = await response.json() # Check if the result is a list of embeddings (floats) if isinstance(result, list) and all(isinstance(emb, list) for emb in result): logger.info(f"Successfully received {len(result)} embeddings.") return result elif isinstance(result, dict) and 'error' in result: logger.error(f"HF Inference API returned an error: {result['error']}") return None else: logger.error(f"Unexpected embedding format received: {type(result)}. Full response: {result}") return None except aiohttp.ClientResponseError as e: logger.error(f"HF Inference API request failed: {e.status} {e.message}") try: error_body = await e.response.text() logger.error(f"Response body: {error_body[:500]}") # Log first 500 chars except Exception as read_err: logger.error(f"Could not read error response body: {read_err}") return None except asyncio.TimeoutError: logger.error(f"HF Inference API request timed out after {timeout.total} seconds.") return None except Exception as e: logger.exception(f"An unexpected error occurred during embedding generation: {e}") return None async def generate_code_patch(self, issue_number: int, model_key: str) -> dict: """Generates a code patch suggestion using a selected AI model.""" if issue_number not in self.issues: return {"error": f"Issue {issue_number} not found."} if not self.hf_token: return {"error": "Hugging Face token not set."} if model_key not in HF_MODELS: return {"error": f"Invalid model key: {model_key}"} issue = self.issues[issue_number] model_id = HF_MODELS[model_key] logger.info(f"Generating patch for issue {issue_number} using model {model_id}") # --- Context Gathering (Placeholder) --- context = "Context gathering not implemented. Provide relevant code snippets in the issue description if possible." # context = await self._get_code_context(issue_number) # Uncomment if implemented # --- Prompt Engineering --- prompt = f"""You are an expert programmer analyzing a GitHub issue. Your task is to generate a code patch in standard `diff` format to fix the described problem. ## Issue Details: ### Title: {issue.get('title', 'N/A')} ### Body: {issue.get('body', 'N/A')} ### Labels: {', '.join(issue.get('labels', []))} ## Relevant Code Context (if available): {context} ## Instructions: 1. Carefully analyze the issue description and context. 2. Identify the specific code changes required to resolve the issue. 3. Generate a patch containing *only* the necessary code modifications. 4. Format the patch strictly according to the standard `diff` format, enclosed in a ```diff ... ``` block. 5. Provide a brief explanation *before* the diff block explaining the reasoning behind the changes. 6. If a patch cannot be reasonably determined from the provided information, state that clearly instead of generating an incorrect patch. ## Patch Suggestion: """ # --- Call Inference API --- api_url = f"{HF_INFERENCE_API}/{model_id}" headers = {"Authorization": f"Bearer {self.hf_token}"} payload = { "inputs": prompt, "parameters": { # Adjust parameters as needed "max_new_tokens": 1536, # Increased max tokens for potentially larger patches "temperature": 0.2, # Low temperature for deterministic code generation "return_full_text": False, # Only get the generated part "do_sample": False, # Turn off sampling for more deterministic output with low temp # "top_p": 0.9, # Less relevant when do_sample=False }, "options": {"wait_for_model": True} # For serverless endpoints } timeout = aiohttp.ClientTimeout(total=120) # 2 minutes timeout for generation try: async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: async with session.post(api_url, json=payload) as response: response.raise_for_status() result = await response.json() if result and isinstance(result, list) and 'generated_text' in result[0]: generated_text = result[0].get('generated_text', '') logger.info(f"Received patch suggestion from {model_id}") # Improved extraction of diff block diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL) explanation = generated_text.split("```diff")[0].strip() if not explanation: # Handle cases where explanation might be missing explanation = "No explanation provided." if diff_match: patch = diff_match.group(1).strip() # Basic validation: check if patch contains diff markers like + or - if not re.search(r'^\s*[+-]', patch, re.MULTILINE): patch = f"AI generated response, but no standard diff markers (+/-) found:\n---\n{patch}\n---" logger.warning("Generated patch lacks standard diff markers.") else: patch = "No diff block found in the AI response." logger.warning("No diff block found in patch suggestion response.") return { "explanation": explanation, "patch": patch, "model_used": model_id } elif isinstance(result, dict) and 'error' in result: logger.error(f"HF Inference API returned an error for patch generation: {result['error']}") return {"error": f"AI model returned an error: {result['error']}"} else: logger.error(f"Unexpected response format from {model_id} for patch: {result}") return {"error": "Received unexpected response format from AI model."} except aiohttp.ClientResponseError as e: logger.error(f"HF Inference API request failed for patch generation: {e.status} {e.message}") error_body = await e.response.text() logger.error(f"Response body: {error_body[:500]}") return {"error": f"AI model request failed ({e.status}). Check model availability and HF token."} except asyncio.TimeoutError: logger.error(f"HF Inference API request for patch generation timed out after {timeout.total} seconds.") return {"error": "AI model request timed out."} except Exception as e: logger.exception(f"Error generating code patch: {e}") return {"error": f"An unexpected error occurred: {e}"} async def _get_code_context(self, issue_number: int) -> str: """Placeholder for retrieving relevant code context for an issue.""" # This needs a proper implementation based on how the repo is managed # - Clone/pull the repo if not present/up-to-date # - Identify relevant files (e.g., using file paths mentioned in the issue, heuristics) # - Read relevant parts of the files logger.warning(f"Code context retrieval for issue {issue_number} is not fully implemented.") # Example: Look for file paths in the issue body # issue_body = self.issues.get(issue_number, {}).get('body', '') # Find potential file paths (very basic example) # potential_files = re.findall(r'[\w/.-]+\.(?:py|js|java|cpp|c|ts|html|css)', issue_body) # Read content from these files if they exist in the workspace repo return "Code context retrieval is currently a placeholder." async def suggest_resolution(self, issue: dict, model_key: str) -> str: """Suggests a resolution description using a selected AI model.""" if not self.hf_token: return "Error: Hugging Face token not set." if model_key not in HF_MODELS: return f"Error: Invalid model key: {model_key}" model_id = HF_MODELS[model_key] logger.info(f"Requesting resolution suggestion for issue {issue.get('id','N/A')} using {model_id}") prompt = f"""Analyze the following GitHub issue and provide a concise, step-by-step suggestion on how to resolve it. Focus on the technical steps required. Be clear and actionable. ## Issue Details: ### Title: {issue.get('title', 'N/A')} ### Body: {issue.get('body', 'N/A')} ### Labels: {', '.join(issue.get('labels', []))} ## Suggested Resolution Steps: 1. **Understand the Root Cause:** [Briefly explain the likely cause based on the description] 2. **Identify Files:** [Suggest specific files or modules likely involved] 3. **Implement Changes:** [Describe the necessary code modifications or additions] 4. **Test Thoroughly:** [Mention specific testing approaches needed] 5. **Create Pull Request:** [Standard final step] Provide details for steps 1-4 based on the issue. """ api_url = f"{HF_INFERENCE_API}/{model_id}" headers = {"Authorization": f"Bearer {self.hf_token}"} payload = { "inputs": prompt, "parameters": { "max_new_tokens": 768, # Increased token limit "temperature": 0.6, # Slightly lower temp for more focused suggestions "return_full_text": False, "do_sample": True, "top_p": 0.95, }, "options": {"wait_for_model": True} # For serverless endpoints } timeout = aiohttp.ClientTimeout(total=90) # 90 seconds timeout try: async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: async with session.post(api_url, json=payload) as response: response.raise_for_status() result = await response.json() if result and isinstance(result, list) and 'generated_text' in result[0]: suggestion = result[0].get('generated_text', 'No suggestion generated.') logger.info(f"Received suggestion from {model_id}") return suggestion.strip() elif isinstance(result, dict) and 'error' in result: logger.error(f"HF Inference API returned an error for suggestion: {result['error']}") return f"Error: AI model returned an error: {result['error']}" else: logger.error(f"Unexpected response format from {model_id} for suggestion: {result}") return "Error: Received unexpected response format from AI model." except aiohttp.ClientResponseError as e: logger.error(f"HF Inference API request failed for suggestion: {e.status} {e.message}") error_body = await e.response.text() logger.error(f"Response body: {error_body[:500]}") return f"Error: AI model request failed ({e.status}). Check model availability and HF token." except asyncio.TimeoutError: logger.error(f"HF Inference API request for suggestion timed out after {timeout.total} seconds.") return "Error: AI model request timed out." except Exception as e: logger.exception(f"Error suggesting resolution: {e}") return f"An unexpected error occurred: {e}" # --- WebSocket Methods --- async def broadcast_collaboration_status(self): """Periodically sends collaborator status to all connected clients.""" while True: await asyncio.sleep(5) # Send updates every 5 seconds if not self.ws_clients: continue # Create payload within the loop to get current status status_payload = json.dumps({ "type": "collaboration_status", "collaborators": self.collaborators }) if self.collaborators: # Only broadcast if there's someone to report logger.debug(f"Broadcasting status: {status_payload}") # Use asyncio.gather to send concurrently, handling potential errors results = await asyncio.gather( *[client.send(status_payload) for client in self.ws_clients], return_exceptions=True # Don't let one failed send stop others ) # Log any errors that occurred during broadcast active_clients = [] for i, result in enumerate(results): client = self.ws_clients[i] if isinstance(result, Exception): logger.warning(f"Failed to send status to client {getattr(client, 'client_id', client.remote_address)}: {result}. Removing client.") # Schedule removal in the main loop to avoid modifying list while iterating # self.main_loop.call_soon_threadsafe(self.remove_ws_client, client) # Requires main_loop ref else: active_clients.append(client) # self.ws_clients = active_clients # Update list - Careful with concurrency here async def handle_code_editor_update(self, issue_num: int, delta: str, client_id: str): """Applies a delta from one client and broadcasts it to others.""" if issue_num not in self.code_editors: # Initialize editor if it doesn't exist (e.g., first edit) # This requires knowing the initial content, which might be tricky. # For now, log a warning. A better approach might be needed. logger.warning(f"Received code update for non-existent editor instance for issue {issue_num}. Ignoring.") # Alternative: self.code_editors[issue_num] = OTCodeEditor(initial_content="...") return try: # Apply the delta to the server-side authoritative state # Assuming apply_delta modifies the internal state correctly parsed_delta = json.loads(delta) # Parse delta once self.code_editors[issue_num].apply_delta(parsed_delta) logger.info(f"Applied delta for issue {issue_num} from client {client_id}") # Broadcast the delta to all *other* connected clients update_payload = json.dumps({ "type": "code_update", "issue_num": issue_num, "delta": delta # Send the original delta JSON string }) tasks = [] for client in self.ws_clients: # Check if the client has an associated ID and avoid sending back to originator client_ws_id = getattr(client, 'client_id', None) if client_ws_id != client_id: tasks.append(client.send(update_payload)) if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) # Log errors during broadcast for i, result in enumerate(results): if isinstance(result, Exception): failed_client = tasks[i].__self__ # Get client from task logger.warning(f"Failed to broadcast code update to client {getattr(failed_client, 'client_id', 'N/A')}: {result}") except json.JSONDecodeError: logger.error(f"Received invalid JSON delta for issue {issue_num}: {delta}") except Exception as e: logger.exception(f"Error handling code editor update for issue {issue_num}: {e}") async def broadcast_issue_update(self): """Notifies clients that the issue list/data has changed.""" if not self.ws_clients: return logger.info("Broadcasting issue update notification to clients.") update_payload = json.dumps({"type": "issues_updated"}) results = await asyncio.gather( *[client.send(update_payload) for client in self.ws_clients], return_exceptions=True ) for i, result in enumerate(results): client = self.ws_clients[i] if isinstance(result, Exception): logger.warning(f"Failed to send issue update notification to client {getattr(client, 'client_id', client.remote_address)}: {result}") def remove_ws_client(self, client_to_remove: websockets.WebSocketServerProtocol): """Safely removes a client from the list and collaborator dict.""" client_id = getattr(client_to_remove, 'client_id', None) if client_to_remove in self.ws_clients: self.ws_clients.remove(client_to_remove) logger.info(f"Removed WebSocket client: {client_id or client_to_remove.remote_address}") if client_id and client_id in self.collaborators: del self.collaborators[client_id] logger.info(f"Removed collaborator {client_id}.") # No need to broadcast here, the periodic task will reflect the change # ========== Gradio UI Definition ========== def create_ui(manager: IssueManager): """Creates the Gradio interface.""" # --- Helper Functions for UI --- def generate_issue_preview(issue_num: Optional[int]) -> str: """Generates HTML preview for a selected issue.""" if issue_num is None or issue_num not in manager.issues: return "

Select an issue from the board to see details.

" issue = manager.issues[issue_num] # Convert markdown body to HTML using markdown2 with extras html_body = markdown2.markdown( issue.get('body', '*No description provided.*'), extras=["fenced-code-blocks", "tables", "strike", "task_list"] ) # Basic styling preview_html = """

#{issue['id']} - {issue.get('title', 'N/A')}


State: {issue.get('state', 'N/A')} | Assignee: {issue.get('assignee', 'None')}

Labels: {' | '.join(f'{l}' for l in issue.get('labels', [])) or 'None'}

{html_body}
""" return preview_html async def get_ai_suggestion_wrapper(issue_num: Optional[int], model_key: str) -> str: """Wrapper to get AI suggestion for the chat display.""" if issue_num is None or issue_num not in manager.issues: return "Please select a valid issue first." # Use cached_suggestion which handles the actual API call via lru_cache # Note: cached_suggestion needs the issue *hash* and model *ID* issue = manager.issues[issue_num] issue_hash = manager._get_issue_hash(issue) # Pass model_key directly, cached_suggestion will resolve it suggestion = await manager.cached_suggestion(issue_hash, model_key) # Format for display return f"**Suggestion based on {model_key}:**\n\n---\n{suggestion}" async def get_ai_patch_wrapper(issue_num: Optional[int], model_key: str) -> str: """Wrapper to get AI patch for the chat display.""" if issue_num is None or issue_num not in manager.issues: return "Please select a valid issue first." result = await manager.generate_code_patch(issue_num, model_key) if "error" in result: return f"**Error generating patch:** {result['error']}" else: # Format for chat display using Markdown code block return f"""**Patch Suggestion from {result.get('model_used', model_key)}:** **Explanation:** {result.get('explanation', 'N/A')} --- **Patch:** ```diff {result.get('patch', 'N/A')} ```""" # --- Gradio Blocks --- with gr.Blocks(theme=theme, title="๐Ÿค– AI Issue Resolver Pro", css=".gradio-container {max-width: 1400px !important;}") as app: gr.Markdown("""

๐Ÿš€ AI Issue Resolver Pro

Next-generation issue resolution powered by AI collaboration

""") # --- Configuration Row --- with gr.Row(variant="panel", elem_id="config-panel"): with gr.Column(scale=3): repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="[https://github.com/owner/repo](https://github.com/owner/repo)", info="Enter the full URL of the public GitHub repository.", elem_id="repo_url") with gr.Row(): github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", info="Required for private repos or higher rate limits.", elem_id="github_token") hf_token = gr.Textbox(label="Hugging Face Token", type="password", info="Required for AI model interactions.", elem_id="hf_token") with gr.Column(scale=1, min_width=250): model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value="Mistral-8x7B", label="๐Ÿค– Select AI Model", info="Choose the AI for suggestions and patches.", elem_id="model_select") crawl_btn = gr.Button("๐Ÿ›ฐ๏ธ Scan Repository Issues", variant="primary", icon="๐Ÿ”", elem_id="crawl_btn") status_output = gr.Textbox(label="Status", interactive=False, lines=1, max_lines=1, placeholder="Status updates will appear here...", elem_id="status_output") # --- Main Tabs --- with gr.Tabs(elem_id="main-tabs"): # --- Issue Board Tab --- with gr.Tab("๐Ÿ“‹ Issue Board", id="board", elem_id="tab-board"): with gr.Row(equal_height=False): with gr.Column(scale=3): gr.Markdown("### Open Issues") issue_list = gr.Dataframe( headers=["ID", "Title", "Severity", "Cluster"], datatype=["number", "str", "str", "str"], # Cluster ID shown as str interactive=True, wrap=True, # Wrap long titles elem_id="issue_list_df", ) with gr.Column(scale=2, min_width=350): gr.Markdown("### Issue Severity") stats_plot = gr.Plot(elem_id="stats_plot") # Placeholder for collaborators - updated via JS collab_status = gr.HTML("""

๐Ÿ‘ฅ Active Collaborators

Connecting...
""", elem_id="collab_status_html") # --- Resolution Studio Tab --- with gr.Tab("๐Ÿ’ป Resolution Studio", id="studio", elem_id="tab-studio"): with gr.Row(): # Left Column: Issue Details & AI Tools with gr.Column(scale=1, min_width=400): gr.Markdown("### Selected Issue Details") # Hidden number input to store selected issue ID selected_issue_id = gr.Number(label="Selected Issue ID", visible=False, precision=0, elem_id="selected_issue_id") issue_preview_html = gr.HTML( "

Select an issue from the 'Issue Board' tab.

", elem_id="issue_preview" ) with gr.Accordion("๐Ÿ› ๏ธ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"): suggest_btn = gr.Button("๐Ÿง  Suggest Resolution Steps", icon="๐Ÿ’ก", elem_id="suggest_btn") patch_btn = gr.Button("๐Ÿ“ Generate Code Patch", icon="๐Ÿฉน", elem_id="patch_btn") # Add placeholders for other buttons if needed # test_btn = gr.Button("๐Ÿงช Create Tests (Future)", icon="๐Ÿ”ฌ", interactive=False) # impact_btn = gr.Button("๐Ÿ“Š Impact Analysis (Future)", icon="๐Ÿ“ˆ", interactive=False) # Use Markdown for better formatting of AI output ai_output_display = gr.Markdown(value="*AI suggestions and patches will appear here...*", elem_id="ai_output_display") # Right Column: Code Editor with gr.Column(scale=2, min_width=500): gr.Markdown("### Collaborative Code Editor") # Use the imported code_editor component # We'll update its value dynamically when an issue is selected # Ensure the code_editor component itself handles language setting based on input dict keys or a prop # Replace the existing code_edit_component definition with: code_edit_component = code_editor( value={"placeholder.txt": "# Select an issue to load code."}, key="code_editor" ) # --- Analytics Tab (Placeholder) --- with gr.Tab("๐Ÿ“ˆ Analytics", id="analytics", elem_id="tab-analytics"): gr.Markdown("### Analytics Dashboard (Placeholder)") gr.Markdown("Future home for resolution timelines, achievement badges, and more detailed metrics.") # with gr.Row(): # gr.Markdown("#### ๐Ÿ“… Resolution Timeline") # timeline = gr.Timeline() # Requires specific data format # with gr.Row(): # gr.Markdown("#### ๐Ÿ† Achievement System") # badges = gr.HTML("
Coming Soon!
") # --- Event Handlers --- # 1. Crawl Button Click crawl_btn.click( fn=manager.crawl_issues, inputs=[repo_url, github_token, hf_token], outputs=[issue_list, stats_plot, status_output], api_name="crawl_issues", # For API access if needed show_progress="full" ) # 2. Issue Selection in Dataframe async def handle_issue_select(evt: gr.SelectData): """Handles issue selection: updates preview, loads code (placeholder).""" default_response = { selected_issue_id: gr.update(value=None), issue_preview_html: gr.update("

Select an issue from the table.

"), code_edit_component: gr.update(value={"placeholder.txt": "# Select an issue to load code."}), ai_output_display: gr.update(value="*AI suggestions and patches will appear here...*") } if evt.index[0] is None or not hasattr(evt, 'value') or not evt.value: logger.info("Issue deselected or invalid selection event.") return default_response try: selected_id = int(evt.value[0]) logger.info(f"Issue selected: ID {selected_id}") # Update components with progress indication updates = { selected_issue_id: gr.update(value=selected_id), issue_preview_html: gr.update(generate_issue_preview(selected_id)), code_edit_component: gr.update(value={"loading.txt": "# Loading code...\n"}), ai_output_display: gr.update(value="*Refreshing suggestions...*") } # --- Simulated code loading --- files_content = { f"issue_{selected_id}_code.py": ( f"# Code related to issue {selected_id}\n" "# Replace with actual file content\n\n" f"def fix_issue_{selected_id}():\n" " print('Implement solution here')" ), "README.md": f"# Issue {selected_id}\n\nResolution work in progress..." } # Update code editor component updates[code_edit_component] = gr.update(value=files_content) updates[ai_output_display] = gr.update(value="*AI suggestions ready*") # Initialize server-side editor state manager.code_editors[selected_id] = OTCodeEditor(value=files_content) logger.info(f"Initialized OT editor for issue {selected_id}") return updates except (ValueError, TypeError, IndexError, KeyError) as e: logger.error(f"Selection error: {str(e)}") return { **default_response, issue_preview_html: gr.update("

Error loading issue details

"), code_edit_component: gr.update(value={"error.txt": "# Error loading code\nPlease try again"}), ai_output_display: gr.update(value="*Error processing selection*") } issue_list.select( fn=handle_issue_select, inputs=[], # Event data is passed automatically outputs=[selected_issue_id, issue_preview_html, code_edit_component, ai_output_display], show_progress="minimal" ) # 3. Suggest Resolution Button Click suggest_btn.click( fn=get_ai_suggestion_wrapper, inputs=[selected_issue_id, model_select], outputs=[ai_output_display], # Output to Markdown component api_name="suggest_resolution", show_progress="full" ) # 4. Generate Patch Button Click patch_btn.click( fn=get_ai_patch_wrapper, inputs=[selected_issue_id, model_select], outputs=[ai_output_display], # Output to Markdown component api_name="generate_patch", show_progress="full" ) # 5. Code Editor Change (User typing) -> Send update via WebSocket # This requires JavaScript to capture the 'change' event from the Ace editor # instance within the code_editor component and send it over WebSocket. # The Python backend then receives it via handle_ws_connection. # 6. WebSocket Message (Server -> Client) -> Trigger UI Update # This uses JavaScript to listen for WebSocket messages and update Gradio components. # Example: Update collaborator list, trigger code editor update. # --- JavaScript for WebSocket Communication --- def web_socket_js(ws_port): # Generate unique client ID for this session client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}" logger.info(f"Generated Client ID for WebSocket: {client_id}") # FIX: Escape all literal curly braces `{` -> `{{` and `}` -> `}}` in the JS code # Variables like {ws_port} and {client_id} remain single-braced. return f""" """ # Inject the JavaScript into the Gradio app when it loads app.load(_js=web_socket_js(WS_PORT), fn=None, inputs=None, outputs=None) return app # ========== WebSocket Server Logic ========== async def handle_ws_connection(websocket: websockets.WebSocketServerProtocol, path: str, manager: IssueManager): """Handles incoming WebSocket connections and messages.""" client_id = None # Initialize client_id for this connection manager.ws_clients.append(websocket) logger.info(f"WebSocket client connected: {websocket.remote_address} (Total: {len(manager.ws_clients)})") try: async for message in websocket: try: data = json.loads(message) msg_type = data.get("type") # logger.debug(f"Received WS message: {data}") # Log received message content if msg_type == "join": client_id = data.get("clientId", f"anon_{websocket.id}") setattr(websocket, 'client_id', client_id) # Associate ID with socket object manager.collaborators[client_id] = {"name": client_id, "status": "Connected"} # Add to collaborators logger.info(f"Client {client_id} joined.") # Trigger immediate broadcast in case it's needed, but don't await asyncio.create_task(manager.broadcast_collaboration_status()) elif msg_type == "code_update": issue_num = data.get("issue_num") delta = data.get("delta") sender_id = data.get("clientId") # ID of the client who sent the update if issue_num is not None and delta and sender_id: # Pass client_id to handler to avoid broadcasting back to sender await manager.handle_code_editor_update(issue_num, delta, sender_id) else: logger.warning(f"Invalid code_update message received: {data}") elif msg_type == "status_update": # Client updates their status sender_id = data.get("clientId") status = data.get("status", "Idle") if sender_id and sender_id in manager.collaborators: manager.collaborators[sender_id]["status"] = status logger.info(f"Client {sender_id} status updated: {status}") # Trigger broadcast, don't await asyncio.create_task(manager.broadcast_collaboration_status()) else: logger.warning(f"Unknown WebSocket message type received: {msg_type} from {client_id or websocket.remote_address}") except json.JSONDecodeError: logger.error(f"Received invalid JSON over WebSocket from {client_id or websocket.remote_address}: {message}") except Exception as e: logger.exception(f"Error processing WebSocket message from {client_id or websocket.remote_address}: {e}") except ConnectionClosed as e: logger.info(f"WebSocket client {client_id or websocket.remote_address} disconnected: (Code: {e.code}, Reason: {e.reason})") except Exception as e: logger.exception(f"Unexpected error in WebSocket handler for {client_id or websocket.remote_address}: {e}") finally: logger.info(f"Cleaning up connection for client {client_id or websocket.remote_address}") # Use the safe removal method, ensuring it runs in the correct context if needed manager.remove_ws_client(websocket) # Trigger a final status broadcast asyncio.create_task(manager.broadcast_collaboration_status()) async def start_websocket_server(manager: IssueManager, port: int): """Starts the WebSocket server.""" # Pass manager instance to the connection handler factory handler_with_manager = lambda ws, path: handle_ws_connection(ws, path, manager) try: # Set ping interval and timeout to keep connections alive and detect broken ones server = await websockets.serve( handler_with_manager, "0.0.0.0", # Bind to all interfaces port, ping_interval=20, # Send pings every 20 seconds ping_timeout=20 # Wait 20 seconds for pong response ) logger.info(f"WebSocket server started on ws://0.0.0.0:{port}") await asyncio.Future() # Run forever until cancelled except OSError as e: logger.error(f"Failed to start WebSocket server on port {port}: {e}. Port might be in use.") # Exit or handle error appropriately raise except Exception as e: logger.exception(f"Unexpected error starting WebSocket server: {e}") raise def run_webhook_server(manager: IssueManager, port: int): """Starts the HTTP webhook server in a separate thread.""" WebhookHandler.manager_instance = manager # Pass manager instance to the class server_address = ("0.0.0.0", port) # Bind to all interfaces try: httpd = HTTPServer(server_address, WebhookHandler) logger.info(f"Webhook HTTP server started on [http://0.0.0.0](http://0.0.0.0):{port}") httpd.serve_forever() except OSError as e: logger.error(f"Failed to start Webhook server on port {port}: {e}. Port might be in use.") # Exit or handle error appropriately # Consider signaling the main thread to stop except Exception as e: logger.exception(f"Unexpected error in Webhook server: {e}") # ========== Main Execution ========== if __name__ == "__main__": # --- Setup --- manager = IssueManager() main_event_loop = asyncio.get_event_loop() manager.main_loop = main_event_loop # Ensure manager has loop reference if needed # --- Start Background Servers --- # 1. Webhook Server (HTTP) - Runs in its own thread webhook_thread = threading.Thread(target=run_webhook_server, args=(manager, WEBHOOK_PORT), daemon=True) webhook_thread.start() # 2. WebSocket Server & Broadcast Task (Asyncio) - Run in main thread's event loop async def main_async_tasks(): # Start the periodic broadcast task broadcast_task = asyncio.create_task(manager.broadcast_collaboration_status()) # Start the WebSocket server websocket_server_task = asyncio.create_task(start_websocket_server(manager, WS_PORT)) # Keep tasks running await asyncio.gather(broadcast_task, websocket_server_task, return_exceptions=True) # --- Create Gradio App --- # Must be created before starting the loop if it interacts with async functions directly app = create_ui(manager) # --- Launch Gradio App & Async Tasks --- # Gradio's launch method handles the event loop integration when run directly. # It will run the asyncio tasks alongside the FastAPI/Uvicorn server. # app.queue() # Enable queue for handling multiple requests/long-running tasks # app.launch( # share=True, # Enable for public access (use with caution) # server_name="0.0.0.0", # Bind to all interfaces for accessibility # server_port=7860, # Default Gradio port # favicon_path="[https://huggingface.co/front/assets/huggingface_logo-noborder.svg](https://huggingface.co/front/assets/huggingface_logo-noborder.svg)", # Let Gradio manage the asyncio loop # asyncio_task=main_async_tasks() # This might conflict with launch's loop management # If launch() blocks, the asyncio tasks need to be run differently, # potentially starting the loop manually before launch or using threading # for the asyncio part if launch() isn't compatible. # However, modern Gradio often handles this integration better. # Start the asyncio tasks *after* defining the app but *before* launch blocks, # or ensure launch itself runs them. Gradio's `launch` usually handles the loop. # Let's rely on Gradio's launch to manage the loop and potentially run our tasks. # If WS/broadcast doesn't work, we may need to manually manage the loop/threading. # A common pattern if launch() blocks and doesn't run background async tasks: asyncio_thread = threading.Thread(target=lambda: asyncio.run(main_async_tasks()), daemon=True) asyncio_thread.start() app.launch(share=True, server_name="0.0.0.0", server_port=7860, favicon_path="https://huggingface.co/front/assets/huggingface_logo-noborder.svg") logger.info("Gradio app launched. Webhook server running in background thread.") # The asyncio tasks (WebSocket, broadcast) should be running via Gradio's event loop.