Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import aiohttp | |
import asyncio | |
from git import Repo, GitCommandError | |
from pathlib import Path | |
from datetime import datetime | |
import shutil | |
import json | |
import logging | |
import re | |
from typing import Dict, List, Optional, Tuple | |
import subprocess | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
import threading | |
from http.server import HTTPServer, BaseHTTPRequestHandler | |
import speech_recognition as sr | |
# Removed duplicate import: from code_editor import code_editor | |
from functools import lru_cache | |
import hashlib | |
import markdown2 | |
from concurrent.futures import ThreadPoolExecutor | |
from hdbscan import HDBSCAN | |
import websockets | |
from websockets.exceptions import ConnectionClosed | |
from code_editor import code_editor | |
# ========== Configuration ========== | |
WORKSPACE = Path("/tmp/issue_workspace") | |
WORKSPACE.mkdir(exist_ok=True) | |
GITHUB_API = "https://api.github.com/repos" | |
HF_INFERENCE_API = "https://api-inference.huggingface.co/models" | |
WEBHOOK_PORT = 8000 | |
WS_PORT = 8001 | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
executor = ThreadPoolExecutor(max_workers=4) | |
HF_MODELS = { | |
"Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"Llama-3-8B": "meta-llama/Meta-Llama-3-8B", | |
"CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf", | |
"StarCoder2": "bigcode/starcoder2-15b" | |
} | |
# Default Model | |
DEFAULT_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
# ========== Modern Theme ========== | |
# Define the base theme | |
theme = gr.themes.Soft( | |
primary_hue="violet", | |
secondary_hue="emerald", | |
radius_size="lg", | |
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] | |
).set( | |
# Apply custom settings using .set() | |
button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)", | |
button_primary_text_color="white", | |
# button_primary_border_radius="12px", # <-- FIX: Removed this line causing the TypeError | |
block_label_text_size="lg", | |
block_label_text_weight="600", | |
block_title_text_size="lg", | |
block_title_text_weight="800", | |
panel_background_fill="white", | |
# panel_border_radius="16px", # Assuming this might also cause issues if not supported, commented out as a precaution. Uncomment if needed and supported. | |
block_shadow="*shadow_drop_lg", | |
) | |
# ========== Enhanced Webhook Handler ========== | |
class WebhookHandler(BaseHTTPRequestHandler): | |
# Keep a reference to the manager instance | |
manager_instance = None | |
def do_POST(self): | |
content_length = int(self.headers['Content-Length']) | |
try: | |
payload = json.loads(self.rfile.read(content_length).decode('utf-8')) | |
except json.JSONDecodeError: | |
self.send_response(400) | |
self.end_headers() | |
self.wfile.write(b"Invalid JSON payload") | |
return | |
except Exception as e: | |
logger.error(f"Error reading webhook payload: {e}") | |
self.send_response(500) | |
self.end_headers() | |
return | |
event = self.headers.get('X-GitHub-Event') | |
logger.info(f"Received GitHub webhook event: {event}") | |
if event == 'issues' and WebhookHandler.manager_instance: | |
action = payload.get('action') | |
logger.info(f"Issue action: {action}") | |
if action in ['opened', 'reopened', 'closed', 'assigned', 'edited']: # Handle edited issues too | |
# Ensure the event loop is running in the webhook thread if needed | |
# Get the loop associated with the asyncio thread | |
loop = asyncio.get_event_loop_policy().get_event_loop() | |
if loop.is_running(): | |
asyncio.run_coroutine_threadsafe( | |
WebhookHandler.manager_instance.handle_webhook_event(event, action, payload), | |
loop | |
) | |
else: | |
logger.error("Asyncio event loop is not running in the target thread for webhook.") | |
elif event == 'ping': | |
logger.info("Received GitHub webhook ping.") | |
else: | |
logger.warning(f"Unhandled event type: {event} or manager not initialized.") | |
self.send_response(200) | |
self.end_headers() | |
self.wfile.write(b"OK") | |
# ========== AI-Powered Issue Manager ========== | |
class IssueManager: | |
def __init__(self): | |
self.issues: Dict[int, dict] = {} | |
self.repo_url: Optional[str] = None | |
self.repo: Optional[Repo] = None | |
self.current_issue: Optional[int] = None | |
self.github_token: Optional[str] = None | |
self.hf_token: Optional[str] = None | |
self.collaborators: Dict[str, dict] = {} # Example: {"user1": {"status": "editing file.py"}} | |
self.points: int = 0 | |
self.severity_rules: Dict[str, List[str]] = { | |
"Critical": ["critical", "urgent", "security", "crash", "blocker"], | |
"High": ["high", "important", "error", "regression", "major"], | |
"Medium": ["medium", "bug", "performance", "minor"], | |
"Low": ["low", "documentation", "enhancement", "trivial", "feature"] | |
} | |
self.issue_clusters: Dict[int, List[int]] = {} # Store clusters: {cluster_id: [issue_index1, issue_index2]} | |
self.issue_list_for_clustering: List[dict] = [] # Store issues in list order for clustering index mapping | |
# self._init_local_models() # Consider lazy loading or conditional loading | |
self.ws_clients: List[websockets.WebSocketServerProtocol] = [] # Use WebSocketServerProtocol | |
self.code_editors: Dict[int, OTCodeEditor] = {} # Store code editors for each issue | |
self.main_loop = asyncio.get_event_loop() # Store ref to main loop if needed elsewhere | |
# Placeholder for local model initialization - implement actual loading if needed | |
def _init_local_models(self): | |
logger.info("Initializing local models (placeholder)...") | |
# self.code_model = pipeline(...) | |
# self.summarizer = pipeline(...) | |
logger.info("Local models initialized (placeholder).") | |
# Simple hash for caching based on issue content | |
def _get_issue_hash(self, issue_data: dict) -> str: | |
content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}" | |
return hashlib.md5(content.encode()).hexdigest() | |
async def cached_suggestion(self, issue_hash: str, model_key: str) -> str: | |
# Find the issue corresponding to the hash (inefficient, improve if needed) | |
found_issue = None | |
for issue in self.issues.values(): | |
if self._get_issue_hash(issue) == issue_hash: | |
found_issue = issue | |
break | |
if not found_issue: | |
return "Error: Issue not found for the given hash." | |
if model_key not in HF_MODELS: | |
return f"Error: Invalid model key: {model_key}" | |
logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model_key}.") | |
# Pass the actual issue dict and model_key to suggest_resolution | |
return await self.suggest_resolution(found_issue, model_key) | |
async def handle_webhook_event(self, event: str, action: str, payload: dict): | |
logger.info(f"Processing webhook event: {event}, action: {action}") | |
issue_data = payload.get('issue') | |
if not issue_data: | |
logger.warning("Webhook payload missing 'issue' data.") | |
return | |
issue_number = issue_data.get('number') | |
if not issue_number: | |
logger.warning("Webhook issue data missing 'number'.") | |
return | |
needs_update = False | |
if action == 'closed': | |
logger.info(f"Removing closed issue {issue_number} from active list.") | |
if self.issues.pop(issue_number, None): | |
needs_update = True | |
# Optionally remove associated editor, etc. | |
self.code_editors.pop(issue_number, None) | |
elif action in ['opened', 'reopened', 'edited']: # Handle edited issues too | |
logger.info(f"Adding/Updating issue {issue_number} from webhook.") | |
self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper | |
needs_update = True | |
# Potentially trigger re-clustering or update specific issue details | |
elif action == 'assigned': | |
logger.info(f"Issue {issue_number} assigned to {payload.get('assignee', {}).get('login', 'N/A')}") | |
if issue_number in self.issues: | |
self.issues[issue_number] = self._process_issue_data(issue_data) # Update issue data | |
needs_update = True | |
else: # Issue might not be in our list if it wasn't open initially | |
self.issues[issue_number] = self._process_issue_data(issue_data) | |
needs_update = True | |
else: | |
logger.info(f"Ignoring action '{action}' for issue {issue_number}.") | |
# Trigger a UI update notification via WebSocket if data changed | |
if needs_update: | |
await self.broadcast_issue_update() | |
def _process_issue_data(self, issue_data: dict) -> dict: | |
"""Helper to structure issue data consistently.""" | |
return { | |
"id": issue_data['number'], | |
"title": issue_data.get('title', 'No Title'), | |
"body": issue_data.get('body', ''), | |
"state": issue_data.get('state', 'unknown'), | |
"labels": [label['name'] for label in issue_data.get('labels', [])], | |
"assignee": issue_data.get('assignee', {}).get('login') if issue_data.get('assignee') else None, | |
"url": issue_data.get('html_url', '#') | |
# Add other relevant fields if needed | |
} | |
async def crawl_issues(self, repo_url: str, github_token: str, hf_token: str) -> Tuple[List[List], go.Figure, str]: | |
""" | |
Crawls issues, updates internal state, performs clustering, and returns data for UI update. | |
""" | |
if not repo_url or not hf_token: # GitHub token is optional for public repos | |
return [], go.Figure(), "Error: Repository URL and HF Token are required." | |
logger.info(f"Starting issue crawl for {repo_url}") | |
self.repo_url = repo_url | |
self.github_token = github_token | |
self.hf_token = hf_token | |
self.issues = {} # Reset issues before crawl | |
# Extract owner/repo from URL | |
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", repo_url) | |
if not match: | |
logger.error(f"Invalid GitHub URL format: {repo_url}") | |
return [], go.Figure(), "Error: Invalid GitHub URL format. Use https://github.com/owner/repo" | |
owner, repo_name = match.groups() | |
api_url = f"{GITHUB_API}/{owner}/{repo_name}/issues?state=open&per_page=100" # Fetch only open issues, max 100 | |
headers = { | |
"Accept": "application/vnd.github.v3+json" | |
} | |
if github_token: | |
headers["Authorization"] = f"token {github_token}" | |
try: | |
all_issues_data = [] | |
page = 1 | |
logger.info(f"Fetching issues from {api_url}...") | |
async with aiohttp.ClientSession(headers=headers) as session: | |
while True: | |
paginated_url = f"{api_url}&page={page}" | |
async with session.get(paginated_url) as response: | |
response.raise_for_status() # Raise exception for bad status codes | |
issues_page_data = await response.json() | |
if not issues_page_data: # No more issues on this page | |
break | |
logger.info(f"Fetched page {page} with {len(issues_page_data)} issues.") | |
all_issues_data.extend(issues_page_data) | |
# Check Link header for next page (more robust pagination) | |
if 'next' not in response.headers.get('Link', ''): | |
break | |
page += 1 | |
logger.info(f"Total issues fetched: {len(all_issues_data)}") | |
for issue_data in all_issues_data: | |
issue_number = issue_data['number'] | |
self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper | |
if not self.issues: | |
logger.info("No open issues found.") | |
return [], go.Figure(), "No open issues found in the repository." | |
# Prepare data for clustering | |
self.issue_list_for_clustering = list(self.issues.values()) | |
logger.info("Clustering issues...") | |
await self._cluster_similar_issues() # Update self.issue_clusters | |
# Prepare data for Gradio Dataframe | |
dataframe_data = [] | |
severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0} | |
# Map clustered indices back to issue numbers and determine severity | |
cluster_map = {} # {issue_index: cluster_id} | |
for cluster_id, indices in self.issue_clusters.items(): | |
for index in indices: | |
cluster_map[index] = cluster_id | |
for i, issue in enumerate(self.issue_list_for_clustering): | |
severity = self._determine_severity(issue['labels']) | |
severity_counts[severity] += 1 | |
cluster_id = cluster_map.get(i, -1) # -1 for noise/unclustered | |
dataframe_data.append([ | |
issue['id'], | |
issue['title'], | |
severity, | |
cluster_id if cluster_id != -1 else "N/A" # Display N/A for noise | |
]) | |
logger.info("Generating statistics plot...") | |
stats_fig = self._generate_stats_plot(severity_counts) | |
success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups (excluding noise)." | |
logger.info(success_msg) | |
return dataframe_data, stats_fig, success_msg | |
except aiohttp.ClientResponseError as e: | |
logger.error(f"GitHub API request failed: {e.status} {e.message}") | |
error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token permissions and repo URL." | |
if e.status == 404: | |
error_msg = f"Error: Repository not found at {repo_url}. Check the URL." | |
elif e.status == 401: | |
error_msg = "Error: Invalid GitHub token or insufficient permissions." | |
elif e.status == 403: | |
error_msg = "Error: GitHub API rate limit exceeded or forbidden access. Try adding a GitHub token." | |
return [], go.Figure(), error_msg | |
except GitCommandError as e: | |
logger.error(f"Git clone error: {e}") # Should not happen if not cloning | |
return [], go.Figure(), f"Error related to Git: {e}" | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred during issue crawl: {e}") # Log full traceback | |
return [], go.Figure(), f"An unexpected error occurred: {e}" | |
def _determine_severity(self, labels: List[str]) -> str: | |
"""Determines issue severity based on labels.""" | |
labels_lower = [label.lower() for label in labels] | |
for severity, keywords in self.severity_rules.items(): | |
if any(keyword in label for keyword in keywords for label in labels_lower): | |
return severity | |
return "Unknown" # Default if no matching label found | |
def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure: | |
"""Generates a Plotly bar chart for issue severity distribution.""" | |
# Filter out severities with 0 counts | |
filtered_counts = {k: v for k, v in severity_counts.items() if v > 0} | |
if not filtered_counts: | |
# Return an empty figure with a message if no issues found | |
fig = go.Figure() | |
fig.update_layout( | |
title="Issue Severity Distribution", | |
xaxis = {"visible": False}, | |
yaxis = {"visible": False}, | |
annotations = [{ | |
"text": "No issues found to display statistics.", | |
"xref": "paper", | |
"yref": "paper", | |
"showarrow": False, | |
"font": {"size": 16} | |
}], | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)' | |
) | |
return fig | |
severities = list(filtered_counts.keys()) | |
counts = list(filtered_counts.values()) | |
fig = px.bar( | |
x=severities, | |
y=counts, | |
title="Issue Severity Distribution", | |
labels={'x': 'Severity', 'y': 'Number of Issues'}, | |
color=severities, # Color bars by severity | |
color_discrete_map={ # Define colors | |
'Critical': '#DC2626', # Red | |
'High': '#F97316', # Orange | |
'Medium': '#FACC15', # Yellow | |
'Low': '#84CC16', # Lime | |
'Unknown': '#6B7280' # Gray | |
}, | |
text=counts # Display counts on bars | |
) | |
fig.update_layout( | |
xaxis_title=None, # Cleaner look | |
yaxis_title="Number of Issues", | |
plot_bgcolor='rgba(0,0,0,0)', # Transparent background | |
paper_bgcolor='rgba(0,0,0,0)', | |
showlegend=False, | |
xaxis={'categoryorder':'array', 'categoryarray':['Critical', 'High', 'Medium', 'Low', 'Unknown']} # Order bars | |
) | |
fig.update_traces(textposition='outside') | |
return fig | |
async def _cluster_similar_issues(self): | |
"""Generates embeddings and clusters issues using HDBSCAN.""" | |
if not self.issue_list_for_clustering or not self.hf_token: | |
logger.warning("Cannot cluster issues: No issues loaded or HF token missing.") | |
self.issue_clusters = {} | |
return | |
logger.info("Generating embeddings for clustering...") | |
try: | |
# Combine title and body for embedding generation | |
texts_to_embed = [f"{i.get('title','')} - {i.get('body','')[:500]}" for i in self.issue_list_for_clustering] # Limit body length | |
embeddings = await self._generate_embeddings(texts_to_embed) | |
if embeddings is None or not isinstance(embeddings, list) or len(embeddings) != len(self.issue_list_for_clustering): | |
logger.error(f"Failed to generate valid embeddings. Expected {len(self.issue_list_for_clustering)}, got {len(embeddings) if embeddings else 'None'}.") | |
self.issue_clusters = {} | |
return | |
logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN...") | |
# Use HDBSCAN for density-based clustering | |
# min_cluster_size: minimum number of samples in a cluster | |
# metric: distance metric used | |
# allow_single_cluster: If True, allows forming a single large cluster | |
clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True) | |
clusters = clusterer.fit_predict(embeddings) | |
self.issue_clusters = {} | |
noise_count = 0 | |
for i, cluster_id in enumerate(clusters): | |
if cluster_id == -1: # HDBSCAN uses -1 for noise points | |
noise_count += 1 | |
continue # Skip noise points | |
if cluster_id not in self.issue_clusters: | |
self.issue_clusters[cluster_id] = [] | |
self.issue_clusters[cluster_id].append(i) # Store original index | |
logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters with {noise_count} noise points.") | |
except Exception as e: | |
logger.exception(f"Error during issue clustering: {e}") | |
self.issue_clusters = {} | |
async def _generate_embeddings(self, texts: List[str]): | |
"""Generates sentence embeddings using Hugging Face Inference API.""" | |
if not self.hf_token: | |
logger.error("Hugging Face token is not set. Cannot generate embeddings.") | |
return None | |
# Recommended embedding model (check HF for alternatives if needed) | |
model_id = "sentence-transformers/all-mpnet-base-v2" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.") | |
# Add timeout to the request | |
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
try: | |
# Add wait_for_model=True if using serverless inference endpoints | |
payload = {"inputs": texts, "options": {"wait_for_model": True}} | |
response = await session.post(api_url, json=payload) | |
response.raise_for_status() | |
result = await response.json() | |
# Check if the result is a list of embeddings (floats) | |
if isinstance(result, list) and all(isinstance(emb, list) for emb in result): | |
logger.info(f"Successfully received {len(result)} embeddings.") | |
return result | |
elif isinstance(result, dict) and 'error' in result: | |
logger.error(f"HF Inference API returned an error: {result['error']}") | |
return None | |
else: | |
logger.error(f"Unexpected embedding format received: {type(result)}. Full response: {result}") | |
return None | |
except aiohttp.ClientResponseError as e: | |
logger.error(f"HF Inference API request failed: {e.status} {e.message}") | |
try: | |
error_body = await e.response.text() | |
logger.error(f"Response body: {error_body[:500]}") # Log first 500 chars | |
except Exception as read_err: | |
logger.error(f"Could not read error response body: {read_err}") | |
return None | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API request timed out after {timeout.total} seconds.") | |
return None | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred during embedding generation: {e}") | |
return None | |
async def generate_code_patch(self, issue_number: int, model_key: str) -> dict: | |
"""Generates a code patch suggestion using a selected AI model.""" | |
if issue_number not in self.issues: | |
return {"error": f"Issue {issue_number} not found."} | |
if not self.hf_token: | |
return {"error": "Hugging Face token not set."} | |
if model_key not in HF_MODELS: | |
return {"error": f"Invalid model key: {model_key}"} | |
issue = self.issues[issue_number] | |
model_id = HF_MODELS[model_key] | |
logger.info(f"Generating patch for issue {issue_number} using model {model_id}") | |
# --- Context Gathering (Placeholder) --- | |
context = "Context gathering not implemented. Provide relevant code snippets in the issue description if possible." | |
# context = await self._get_code_context(issue_number) # Uncomment if implemented | |
# --- Prompt Engineering --- | |
prompt = f"""You are an expert programmer analyzing a GitHub issue. Your task is to generate a code patch in standard `diff` format to fix the described problem. | |
## Issue Details: | |
### Title: {issue.get('title', 'N/A')} | |
### Body: | |
{issue.get('body', 'N/A')} | |
### Labels: {', '.join(issue.get('labels', []))} | |
## Relevant Code Context (if available): | |
{context} | |
## Instructions: | |
1. Carefully analyze the issue description and context. | |
2. Identify the specific code changes required to resolve the issue. | |
3. Generate a patch containing *only* the necessary code modifications. | |
4. Format the patch strictly according to the standard `diff` format, enclosed in a ```diff ... ``` block. | |
5. Provide a brief explanation *before* the diff block explaining the reasoning behind the changes. | |
6. If a patch cannot be reasonably determined from the provided information, state that clearly instead of generating an incorrect patch. | |
## Patch Suggestion: | |
""" | |
# --- Call Inference API --- | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": { # Adjust parameters as needed | |
"max_new_tokens": 1536, # Increased max tokens for potentially larger patches | |
"temperature": 0.2, # Low temperature for deterministic code generation | |
"return_full_text": False, # Only get the generated part | |
"do_sample": False, # Turn off sampling for more deterministic output with low temp | |
# "top_p": 0.9, # Less relevant when do_sample=False | |
}, | |
"options": {"wait_for_model": True} # For serverless endpoints | |
} | |
timeout = aiohttp.ClientTimeout(total=120) # 2 minutes timeout for generation | |
try: | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
response.raise_for_status() | |
result = await response.json() | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
generated_text = result[0].get('generated_text', '') | |
logger.info(f"Received patch suggestion from {model_id}") | |
# Improved extraction of diff block | |
diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL) | |
explanation = generated_text.split("```diff")[0].strip() | |
if not explanation: # Handle cases where explanation might be missing | |
explanation = "No explanation provided." | |
if diff_match: | |
patch = diff_match.group(1).strip() | |
# Basic validation: check if patch contains diff markers like + or - | |
if not re.search(r'^\s*[+-]', patch, re.MULTILINE): | |
patch = f"AI generated response, but no standard diff markers (+/-) found:\n---\n{patch}\n---" | |
logger.warning("Generated patch lacks standard diff markers.") | |
else: | |
patch = "No diff block found in the AI response." | |
logger.warning("No diff block found in patch suggestion response.") | |
return { | |
"explanation": explanation, | |
"patch": patch, | |
"model_used": model_id | |
} | |
elif isinstance(result, dict) and 'error' in result: | |
logger.error(f"HF Inference API returned an error for patch generation: {result['error']}") | |
return {"error": f"AI model returned an error: {result['error']}"} | |
else: | |
logger.error(f"Unexpected response format from {model_id} for patch: {result}") | |
return {"error": "Received unexpected response format from AI model."} | |
except aiohttp.ClientResponseError as e: | |
logger.error(f"HF Inference API request failed for patch generation: {e.status} {e.message}") | |
error_body = await e.response.text() | |
logger.error(f"Response body: {error_body[:500]}") | |
return {"error": f"AI model request failed ({e.status}). Check model availability and HF token."} | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API request for patch generation timed out after {timeout.total} seconds.") | |
return {"error": "AI model request timed out."} | |
except Exception as e: | |
logger.exception(f"Error generating code patch: {e}") | |
return {"error": f"An unexpected error occurred: {e}"} | |
async def _get_code_context(self, issue_number: int) -> str: | |
"""Placeholder for retrieving relevant code context for an issue.""" | |
# This needs a proper implementation based on how the repo is managed | |
# - Clone/pull the repo if not present/up-to-date | |
# - Identify relevant files (e.g., using file paths mentioned in the issue, heuristics) | |
# - Read relevant parts of the files | |
logger.warning(f"Code context retrieval for issue {issue_number} is not fully implemented.") | |
# Example: Look for file paths in the issue body | |
# issue_body = self.issues.get(issue_number, {}).get('body', '') | |
# Find potential file paths (very basic example) | |
# potential_files = re.findall(r'[\w/.-]+\.(?:py|js|java|cpp|c|ts|html|css)', issue_body) | |
# Read content from these files if they exist in the workspace repo | |
return "Code context retrieval is currently a placeholder." | |
async def suggest_resolution(self, issue: dict, model_key: str) -> str: | |
"""Suggests a resolution description using a selected AI model.""" | |
if not self.hf_token: | |
return "Error: Hugging Face token not set." | |
if model_key not in HF_MODELS: | |
return f"Error: Invalid model key: {model_key}" | |
model_id = HF_MODELS[model_key] | |
logger.info(f"Requesting resolution suggestion for issue {issue.get('id','N/A')} using {model_id}") | |
prompt = f"""Analyze the following GitHub issue and provide a concise, step-by-step suggestion on how to resolve it. Focus on the technical steps required. Be clear and actionable. | |
## Issue Details: | |
### Title: {issue.get('title', 'N/A')} | |
### Body: | |
{issue.get('body', 'N/A')} | |
### Labels: {', '.join(issue.get('labels', []))} | |
## Suggested Resolution Steps: | |
1. **Understand the Root Cause:** [Briefly explain the likely cause based on the description] | |
2. **Identify Files:** [Suggest specific files or modules likely involved] | |
3. **Implement Changes:** [Describe the necessary code modifications or additions] | |
4. **Test Thoroughly:** [Mention specific testing approaches needed] | |
5. **Create Pull Request:** [Standard final step] | |
Provide details for steps 1-4 based on the issue. | |
""" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": 768, # Increased token limit | |
"temperature": 0.6, # Slightly lower temp for more focused suggestions | |
"return_full_text": False, | |
"do_sample": True, | |
"top_p": 0.95, | |
}, | |
"options": {"wait_for_model": True} # For serverless endpoints | |
} | |
timeout = aiohttp.ClientTimeout(total=90) # 90 seconds timeout | |
try: | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
response.raise_for_status() | |
result = await response.json() | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
suggestion = result[0].get('generated_text', 'No suggestion generated.') | |
logger.info(f"Received suggestion from {model_id}") | |
return suggestion.strip() | |
elif isinstance(result, dict) and 'error' in result: | |
logger.error(f"HF Inference API returned an error for suggestion: {result['error']}") | |
return f"Error: AI model returned an error: {result['error']}" | |
else: | |
logger.error(f"Unexpected response format from {model_id} for suggestion: {result}") | |
return "Error: Received unexpected response format from AI model." | |
except aiohttp.ClientResponseError as e: | |
logger.error(f"HF Inference API request failed for suggestion: {e.status} {e.message}") | |
error_body = await e.response.text() | |
logger.error(f"Response body: {error_body[:500]}") | |
return f"Error: AI model request failed ({e.status}). Check model availability and HF token." | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API request for suggestion timed out after {timeout.total} seconds.") | |
return "Error: AI model request timed out." | |
except Exception as e: | |
logger.exception(f"Error suggesting resolution: {e}") | |
return f"An unexpected error occurred: {e}" | |
# --- WebSocket Methods --- | |
async def broadcast_collaboration_status(self): | |
"""Periodically sends collaborator status to all connected clients.""" | |
while True: | |
await asyncio.sleep(5) # Send updates every 5 seconds | |
if not self.ws_clients: | |
continue | |
# Create payload within the loop to get current status | |
status_payload = json.dumps({ | |
"type": "collaboration_status", | |
"collaborators": self.collaborators | |
}) | |
if self.collaborators: # Only broadcast if there's someone to report | |
logger.debug(f"Broadcasting status: {status_payload}") | |
# Use asyncio.gather to send concurrently, handling potential errors | |
results = await asyncio.gather( | |
*[client.send(status_payload) for client in self.ws_clients], | |
return_exceptions=True # Don't let one failed send stop others | |
) | |
# Log any errors that occurred during broadcast | |
active_clients = [] | |
for i, result in enumerate(results): | |
client = self.ws_clients[i] | |
if isinstance(result, Exception): | |
logger.warning(f"Failed to send status to client {getattr(client, 'client_id', client.remote_address)}: {result}. Removing client.") | |
# Schedule removal in the main loop to avoid modifying list while iterating | |
# self.main_loop.call_soon_threadsafe(self.remove_ws_client, client) # Requires main_loop ref | |
else: | |
active_clients.append(client) | |
# self.ws_clients = active_clients # Update list - Careful with concurrency here | |
async def handle_code_editor_update(self, issue_num: int, delta: str, client_id: str): | |
"""Applies a delta from one client and broadcasts it to others.""" | |
if issue_num not in self.code_editors: | |
# Initialize editor if it doesn't exist (e.g., first edit) | |
# This requires knowing the initial content, which might be tricky. | |
# For now, log a warning. A better approach might be needed. | |
logger.warning(f"Received code update for non-existent editor instance for issue {issue_num}. Ignoring.") | |
# Alternative: self.code_editors[issue_num] = OTCodeEditor(initial_content="...") | |
return | |
try: | |
# Apply the delta to the server-side authoritative state | |
# Assuming apply_delta modifies the internal state correctly | |
parsed_delta = json.loads(delta) # Parse delta once | |
self.code_editors[issue_num].apply_delta(parsed_delta) | |
logger.info(f"Applied delta for issue {issue_num} from client {client_id}") | |
# Broadcast the delta to all *other* connected clients | |
update_payload = json.dumps({ | |
"type": "code_update", | |
"issue_num": issue_num, | |
"delta": delta # Send the original delta JSON string | |
}) | |
tasks = [] | |
for client in self.ws_clients: | |
# Check if the client has an associated ID and avoid sending back to originator | |
client_ws_id = getattr(client, 'client_id', None) | |
if client_ws_id != client_id: | |
tasks.append(client.send(update_payload)) | |
if tasks: | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Log errors during broadcast | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
failed_client = tasks[i].__self__ # Get client from task | |
logger.warning(f"Failed to broadcast code update to client {getattr(failed_client, 'client_id', 'N/A')}: {result}") | |
except json.JSONDecodeError: | |
logger.error(f"Received invalid JSON delta for issue {issue_num}: {delta}") | |
except Exception as e: | |
logger.exception(f"Error handling code editor update for issue {issue_num}: {e}") | |
async def broadcast_issue_update(self): | |
"""Notifies clients that the issue list/data has changed.""" | |
if not self.ws_clients: | |
return | |
logger.info("Broadcasting issue update notification to clients.") | |
update_payload = json.dumps({"type": "issues_updated"}) | |
results = await asyncio.gather( | |
*[client.send(update_payload) for client in self.ws_clients], | |
return_exceptions=True | |
) | |
for i, result in enumerate(results): | |
client = self.ws_clients[i] | |
if isinstance(result, Exception): | |
logger.warning(f"Failed to send issue update notification to client {getattr(client, 'client_id', client.remote_address)}: {result}") | |
def remove_ws_client(self, client_to_remove: websockets.WebSocketServerProtocol): | |
"""Safely removes a client from the list and collaborator dict.""" | |
client_id = getattr(client_to_remove, 'client_id', None) | |
if client_to_remove in self.ws_clients: | |
self.ws_clients.remove(client_to_remove) | |
logger.info(f"Removed WebSocket client: {client_id or client_to_remove.remote_address}") | |
if client_id and client_id in self.collaborators: | |
del self.collaborators[client_id] | |
logger.info(f"Removed collaborator {client_id}.") | |
# No need to broadcast here, the periodic task will reflect the change | |
# ========== Gradio UI Definition ========== | |
def create_ui(manager: IssueManager): | |
"""Creates the Gradio interface.""" | |
# --- Helper Functions for UI --- | |
def generate_issue_preview(issue_num: Optional[int]) -> str: | |
"""Generates HTML preview for a selected issue.""" | |
if issue_num is None or issue_num not in manager.issues: | |
return "<p>Select an issue from the board to see details.</p>" | |
issue = manager.issues[issue_num] | |
# Convert markdown body to HTML using markdown2 with extras | |
html_body = markdown2.markdown( | |
issue.get('body', '*No description provided.*'), | |
extras=["fenced-code-blocks", "tables", "strike", "task_list"] | |
) | |
# Basic styling | |
preview_html = """ | |
<div style="border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px; background-color: #f9fafb; font-family: 'Inter', sans-serif;"> | |
<h4 style="margin-top: 0; margin-bottom: 10px;"> | |
<a href='{issue.get('url', '#')}' target='_blank' style='color: #6d28d9; text-decoration: none; font-weight: 600;'> | |
#{issue['id']} - {issue.get('title', 'N/A')} | |
</a> | |
</h4> | |
<hr style='margin: 10px 0; border-top: 1px solid #e5e7eb;'> | |
<p style="font-size: 0.9em; color: #4b5563; margin-bottom: 5px;"> | |
<strong>State:</strong> <span style="background-color: #ddd; padding: 1px 4px; border-radius: 3px;">{issue.get('state', 'N/A')}</span> | | |
<strong>Assignee:</strong> {issue.get('assignee', 'None')} | |
</p> | |
<p style="font-size: 0.9em; color: #4b5563; margin-bottom: 10px;"> | |
<strong>Labels:</strong> {' | '.join(f'<span style=\'background-color: #eee; padding: 2px 5px; border-radius: 4px; font-size: 0.9em; display: inline-block; margin-right: 3px;\'>{l}</span>' for l in issue.get('labels', [])) or 'None'} | |
</p> | |
<div style="margin-top: 10px; max-height: 350px; overflow-y: auto; border-top: 1px dashed #ccc; padding-top: 10px; font-size: 0.95em; line-height: 1.5;"> | |
{html_body} | |
</div> | |
</div> | |
""" | |
return preview_html | |
async def get_ai_suggestion_wrapper(issue_num: Optional[int], model_key: str) -> str: | |
"""Wrapper to get AI suggestion for the chat display.""" | |
if issue_num is None or issue_num not in manager.issues: | |
return "Please select a valid issue first." | |
# Use cached_suggestion which handles the actual API call via lru_cache | |
# Note: cached_suggestion needs the issue *hash* and model *ID* | |
issue = manager.issues[issue_num] | |
issue_hash = manager._get_issue_hash(issue) | |
# Pass model_key directly, cached_suggestion will resolve it | |
suggestion = await manager.cached_suggestion(issue_hash, model_key) | |
# Format for display | |
return f"**Suggestion based on {model_key}:**\n\n---\n{suggestion}" | |
async def get_ai_patch_wrapper(issue_num: Optional[int], model_key: str) -> str: | |
"""Wrapper to get AI patch for the chat display.""" | |
if issue_num is None or issue_num not in manager.issues: | |
return "Please select a valid issue first." | |
result = await manager.generate_code_patch(issue_num, model_key) | |
if "error" in result: | |
return f"**Error generating patch:** {result['error']}" | |
else: | |
# Format for chat display using Markdown code block | |
return f"""**Patch Suggestion from {result.get('model_used', model_key)}:** | |
**Explanation:** | |
{result.get('explanation', 'N/A')} | |
--- | |
**Patch:** | |
```diff | |
{result.get('patch', 'N/A')} | |
```""" | |
# --- Gradio Blocks --- | |
with gr.Blocks(theme=theme, title="🤖 AI Issue Resolver Pro", css=".gradio-container {max-width: 1400px !important;}") as app: | |
gr.Markdown(""" | |
<div style="text-align: center; margin-bottom: 20px;"> | |
<h1 style="color: #6d28d9; font-weight: 800;">🚀 AI Issue Resolver Pro</h1> | |
<p style="color: #4b5563; font-size: 1.1em;">Next-generation issue resolution powered by AI collaboration</p> | |
</div> | |
""") | |
# --- Configuration Row --- | |
with gr.Row(variant="panel", elem_id="config-panel"): | |
with gr.Column(scale=3): | |
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="[https://github.com/owner/repo](https://github.com/owner/repo)", info="Enter the full URL of the public GitHub repository.", elem_id="repo_url") | |
with gr.Row(): | |
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", info="Required for private repos or higher rate limits.", elem_id="github_token") | |
hf_token = gr.Textbox(label="Hugging Face Token", type="password", info="Required for AI model interactions.", elem_id="hf_token") | |
with gr.Column(scale=1, min_width=250): | |
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value="Mistral-8x7B", | |
label="🤖 Select AI Model", info="Choose the AI for suggestions and patches.", elem_id="model_select") | |
crawl_btn = gr.Button("🛰️ Scan Repository Issues", variant="primary", icon="🔍", elem_id="crawl_btn") | |
status_output = gr.Textbox(label="Status", interactive=False, lines=1, max_lines=1, placeholder="Status updates will appear here...", elem_id="status_output") | |
# --- Main Tabs --- | |
with gr.Tabs(elem_id="main-tabs"): | |
# --- Issue Board Tab --- | |
with gr.Tab("📋 Issue Board", id="board", elem_id="tab-board"): | |
with gr.Row(equal_height=False): | |
with gr.Column(scale=3): | |
gr.Markdown("### Open Issues") | |
issue_list = gr.Dataframe( | |
headers=["ID", "Title", "Severity", "Cluster"], | |
datatype=["number", "str", "str", "str"], # Cluster ID shown as str | |
interactive=True, | |
wrap=True, # Wrap long titles | |
elem_id="issue_list_df", | |
) | |
with gr.Column(scale=2, min_width=350): | |
gr.Markdown("### Issue Severity") | |
stats_plot = gr.Plot(elem_id="stats_plot") | |
# Placeholder for collaborators - updated via JS | |
collab_status = gr.HTML(""" | |
<div style="margin-top: 20px; border: 1px solid #e5e7eb; padding: 10px; border-radius: 8px; background-color: #f9fafb;"> | |
<h4 style="margin-bottom: 5px; color: #374151; font-size: 1em;">👥 Active Collaborators</h4> | |
<div id="collab-list" style="font-size: 0.9em; max-height: 100px; overflow-y: auto;"> | |
Connecting... | |
</div> | |
</div> | |
""", elem_id="collab_status_html") | |
# --- Resolution Studio Tab --- | |
with gr.Tab("💻 Resolution Studio", id="studio", elem_id="tab-studio"): | |
with gr.Row(): | |
# Left Column: Issue Details & AI Tools | |
with gr.Column(scale=1, min_width=400): | |
gr.Markdown("### Selected Issue Details") | |
# Hidden number input to store selected issue ID | |
selected_issue_id = gr.Number(label="Selected Issue ID", visible=False, precision=0, elem_id="selected_issue_id") | |
issue_preview_html = gr.HTML( | |
"<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>", | |
elem_id="issue_preview" | |
) | |
with gr.Accordion("🛠️ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"): | |
suggest_btn = gr.Button("🧠 Suggest Resolution Steps", icon="💡", elem_id="suggest_btn") | |
patch_btn = gr.Button("📝 Generate Code Patch", icon="🩹", elem_id="patch_btn") | |
# Add placeholders for other buttons if needed | |
# test_btn = gr.Button("🧪 Create Tests (Future)", icon="🔬", interactive=False) | |
# impact_btn = gr.Button("📊 Impact Analysis (Future)", icon="📈", interactive=False) | |
# Use Markdown for better formatting of AI output | |
ai_output_display = gr.Markdown(value="*AI suggestions and patches will appear here...*", elem_id="ai_output_display") | |
# Right Column: Code Editor | |
with gr.Column(scale=2, min_width=500): | |
gr.Markdown("### Collaborative Code Editor") | |
# Use the imported code_editor component | |
# We'll update its value dynamically when an issue is selected | |
# Ensure the code_editor component itself handles language setting based on input dict keys or a prop | |
# Replace the existing code_edit_component definition with: | |
code_edit_component = code_editor( | |
value={"placeholder.txt": "# Select an issue to load code."}, | |
key="code_editor" | |
) | |
# --- Analytics Tab (Placeholder) --- | |
with gr.Tab("📈 Analytics", id="analytics", elem_id="tab-analytics"): | |
gr.Markdown("### Analytics Dashboard (Placeholder)") | |
gr.Markdown("Future home for resolution timelines, achievement badges, and more detailed metrics.") | |
# with gr.Row(): | |
# gr.Markdown("#### 📅 Resolution Timeline") | |
# timeline = gr.Timeline() # Requires specific data format | |
# with gr.Row(): | |
# gr.Markdown("#### 🏆 Achievement System") | |
# badges = gr.HTML("<div class='badges'>Coming Soon!</div>") | |
# --- Event Handlers --- | |
# 1. Crawl Button Click | |
crawl_btn.click( | |
fn=manager.crawl_issues, | |
inputs=[repo_url, github_token, hf_token], | |
outputs=[issue_list, stats_plot, status_output], | |
api_name="crawl_issues", # For API access if needed | |
show_progress="full" | |
) | |
# 2. Issue Selection in Dataframe | |
async def handle_issue_select(evt: gr.SelectData): | |
"""Handles issue selection: updates preview, loads code (placeholder).""" | |
default_response = { | |
selected_issue_id: gr.update(value=None), | |
issue_preview_html: gr.update("<p style='color: #6b7280;'>Select an issue from the table.</p>"), | |
code_edit_component: gr.update(value={"placeholder.txt": "# Select an issue to load code."}), | |
ai_output_display: gr.update(value="*AI suggestions and patches will appear here...*") | |
} | |
if evt.index[0] is None or not hasattr(evt, 'value') or not evt.value: | |
logger.info("Issue deselected or invalid selection event.") | |
return default_response | |
try: | |
selected_id = int(evt.value[0]) | |
logger.info(f"Issue selected: ID {selected_id}") | |
# Update components with progress indication | |
updates = { | |
selected_issue_id: gr.update(value=selected_id), | |
issue_preview_html: gr.update(generate_issue_preview(selected_id)), | |
code_edit_component: gr.update(value={"loading.txt": "# Loading code...\n"}), | |
ai_output_display: gr.update(value="*Refreshing suggestions...*") | |
} | |
# --- Simulated code loading --- | |
files_content = { | |
f"issue_{selected_id}_code.py": ( | |
f"# Code related to issue {selected_id}\n" | |
"# Replace with actual file content\n\n" | |
f"def fix_issue_{selected_id}():\n" | |
" print('Implement solution here')" | |
), | |
"README.md": f"# Issue {selected_id}\n\nResolution work in progress..." | |
} | |
# Update code editor component | |
updates[code_edit_component] = gr.update(value=files_content) | |
updates[ai_output_display] = gr.update(value="*AI suggestions ready*") | |
# Initialize server-side editor state | |
manager.code_editors[selected_id] = OTCodeEditor(value=files_content) | |
logger.info(f"Initialized OT editor for issue {selected_id}") | |
return updates | |
except (ValueError, TypeError, IndexError, KeyError) as e: | |
logger.error(f"Selection error: {str(e)}") | |
return { | |
**default_response, | |
issue_preview_html: gr.update("<p style='color: red;'>Error loading issue details</p>"), | |
code_edit_component: gr.update(value={"error.txt": "# Error loading code\nPlease try again"}), | |
ai_output_display: gr.update(value="*Error processing selection*") | |
} | |
issue_list.select( | |
fn=handle_issue_select, | |
inputs=[], # Event data is passed automatically | |
outputs=[selected_issue_id, issue_preview_html, code_edit_component, ai_output_display], | |
show_progress="minimal" | |
) | |
# 3. Suggest Resolution Button Click | |
suggest_btn.click( | |
fn=get_ai_suggestion_wrapper, | |
inputs=[selected_issue_id, model_select], | |
outputs=[ai_output_display], # Output to Markdown component | |
api_name="suggest_resolution", | |
show_progress="full" | |
) | |
# 4. Generate Patch Button Click | |
patch_btn.click( | |
fn=get_ai_patch_wrapper, | |
inputs=[selected_issue_id, model_select], | |
outputs=[ai_output_display], # Output to Markdown component | |
api_name="generate_patch", | |
show_progress="full" | |
) | |
# 5. Code Editor Change (User typing) -> Send update via WebSocket | |
# This requires JavaScript to capture the 'change' event from the Ace editor | |
# instance within the code_editor component and send it over WebSocket. | |
# The Python backend then receives it via handle_ws_connection. | |
# 6. WebSocket Message (Server -> Client) -> Trigger UI Update | |
# This uses JavaScript to listen for WebSocket messages and update Gradio components. | |
# Example: Update collaborator list, trigger code editor update. | |
# --- JavaScript for WebSocket Communication --- | |
def web_socket_js(ws_port): | |
# Generate unique client ID for this session | |
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}" | |
logger.info(f"Generated Client ID for WebSocket: {client_id}") | |
# FIX: Escape all literal curly braces `{` -> `{{` and `}` -> `}}` in the JS code | |
# Variables like {ws_port} and {client_id} remain single-braced. | |
return f""" | |
<script> | |
// Ensure this runs only once per page load | |
if (!window.collabWs) {{ | |
console.log('Initializing WebSocket connection...'); | |
// Determine WebSocket URL based on environment (local vs HF Space) | |
let wsUrl; | |
if (window.location.hostname === 'localhost' || window.location.hostname === '127.0.0.1') {{ | |
wsUrl = `ws://localhost:{ws_port}`; | |
}} else {{ | |
// Assume running on HF Space or similar, use wss:// | |
// Construct the WS URL based on the current window location (more robust) | |
const spaceHost = window.location.host; | |
wsUrl = `wss://${{spaceHost}}/ws`; // Standard path for Gradio WS proxy | |
console.log('Detected non-local environment, using secure WebSocket URL:', wsUrl); | |
}} | |
window.collabWs = new WebSocket(wsUrl); | |
window.clientId = '{client_id}'; // Store client ID globally for this session | |
window.aceEditorInstance = null; // Store reference to the Ace editor | |
window.currentIssueId = null; // Track the currently selected issue ID in the editor | |
window.collabWs.onopen = function(event) {{ | |
console.log('WebSocket connection established.'); | |
// Send join message with client ID | |
window.sendWsMessage({{ type: 'join', clientId: window.clientId }}); | |
// Update collaborator list display | |
const collabListDiv = document.getElementById('collab-list'); | |
if (collabListDiv) collabListDiv.innerHTML = 'Connected. Waiting for status...'; | |
}}; | |
window.collabWs.onmessage = function(event) {{ | |
// console.log('WebSocket message received:', event.data); | |
try {{ | |
const data = JSON.parse(event.data); | |
// --- Collaboration Status Update --- | |
if (data.type === 'collaboration_status') {{ | |
const collabListDiv = document.getElementById('collab-list'); | |
if (collabListDiv) {{ | |
const collaborators = data.collaborators || {{}}; | |
const activeCollaborators = Object.entries(collaborators) | |
.filter(([id, info]) => id !== window.clientId); // Exclude self | |
if (activeCollaborators.length > 0) {{ | |
collabListDiv.innerHTML = activeCollaborators | |
.map(([id, info]) => `<div class="collab-item" style="margin-bottom: 3px;">${{info.name || id}}: ${{info.status || 'Idle'}}</div>`) | |
.join(''); | |
}} else {{ | |
collabListDiv.innerHTML = 'You are the only active user.'; | |
}} | |
}} | |
// --- Code Update from another client --- | |
}} else if (data.type === 'code_update') {{ | |
console.log('Received code update delta for issue:', data.issue_num, 'from client:', data.clientId); | |
// Apply delta only if it's for the currently viewed issue and not from self | |
if (window.aceEditorInstance && data.issue_num === window.currentIssueId && data.clientId !== window.clientId) {{ | |
try {{ | |
const delta = JSON.parse(data.delta); // Parse the delta string | |
// Prevent triggering the 'change' listener we set up | |
window.aceEditorInstance.getSession().getDocument().applyDeltas([delta]); | |
console.log('Applied remote delta to Ace editor for issue:', data.issue_num); | |
}} catch (e) {{ | |
console.error('Failed to parse or apply remote delta:', e, data.delta); | |
}} | |
}} else {{ | |
console.log('Ignoring remote delta (wrong issue, self, or editor not ready). Current:', window.currentIssueId, 'Received:', data.issue_num); | |
}} | |
// --- Issue List Update Notification --- | |
}} else if (data.type === 'issues_updated') {{ | |
console.log('Received issues updated notification.'); | |
// Update status bar to prompt user action | |
const statusBar = document.getElementById('status_output')?.querySelector('textarea'); | |
if (statusBar) {{ | |
const timestamp = new Date().toLocaleTimeString(); | |
statusBar.value = `[${{timestamp}}] Issue list updated on server. Click "Scan Repository Issues" to refresh.`; | |
// Manually dispatch input event for Gradio | |
statusBar.dispatchEvent(new Event('input', {{ bubbles: true }})); | |
}} | |
// Optionally add a more visible notification element | |
}} | |
}} catch (e) {{ | |
console.error('Failed to parse WebSocket message or update UI:', e, event.data); | |
}} | |
}}; | |
window.collabWs.onclose = function(event) {{ | |
console.warn('WebSocket connection closed:', event.code, event.reason); | |
const collabListDiv = document.getElementById('collab-list'); | |
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Disconnected</span>'; | |
// Implement basic reconnection logic (optional) | |
// setTimeout(initWebSocket, 5000); // Attempt to reconnect after 5 seconds | |
}}; | |
window.collabWs.onerror = function(error) {{ | |
console.error('WebSocket error:', error); | |
const collabListDiv = document.getElementById('collab-list'); | |
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Connection Error</span>'; | |
}}; | |
// Function to send messages | |
window.sendWsMessage = function(message) {{ | |
if (window.collabWs && window.collabWs.readyState === WebSocket.OPEN) {{ | |
window.collabWs.send(JSON.stringify(message)); | |
}} else {{ | |
console.error('WebSocket not connected. Cannot send message:', message); | |
}} | |
}}; | |
// --- JS Integration with Code Editor Component --- | |
// Tries to find the Ace editor instance and attach listeners. | |
// Needs to run after the component is rendered. | |
function setupCodeEditorListener() {{ | |
// Check if Ace is loaded | |
if (typeof ace === 'undefined') {{ | |
console.warn('Ace editor library not found. Retrying...'); | |
setTimeout(setupCodeEditorListener, 1000); | |
return; | |
}} | |
const editorElement = document.querySelector('#code_editor_component .ace_editor'); | |
if (!editorElement) {{ | |
console.warn('Ace editor element not found yet. Retrying...'); | |
setTimeout(setupCodeEditorListener, 1000); // Retry if element not ready | |
return; | |
}} | |
// Get Ace editor instance (may vary based on component implementation) | |
try {{ | |
window.aceEditorInstance = ace.edit(editorElement); | |
console.log('Ace Editor instance found:', window.aceEditorInstance); | |
}} catch (e) {{ | |
console.error('Failed to initialize Ace editor instance:', e); | |
// Maybe the instance is already created and stored differently? | |
// Look for common patterns if direct init fails. | |
if (editorElement.env && editorElement.env.editor) {{ | |
window.aceEditorInstance = editorElement.env.editor; | |
console.log('Found Ace Editor instance via element.env.editor'); | |
}} else {{ | |
console.error('Could not find Ace editor instance. Collaboration may not work.'); | |
return; // Stop if editor cannot be found | |
}} | |
}} | |
if (window.aceEditorInstance) {{ | |
console.log('Attaching change listener to Ace editor.'); | |
window.aceEditorInstance.getSession().on('change', function(delta) {{ | |
// Check if the change was initiated by the current user | |
// `aceEditor.curOp` is one way, but might not always be reliable. | |
// A simpler check: ignore deltas if they originate from applyDeltas (often used for remote changes) | |
const isUserChange = !delta.ignore; // Add an 'ignore' flag when applying remote deltas | |
// More robust check: Use internal flags if available | |
const isUserOriginated = window.aceEditorInstance.curOp && window.aceEditorInstance.curOp.command.name; | |
if (isUserOriginated) {{ | |
// console.log('Code changed by user:', delta); | |
// Get the current issue ID from the hidden Gradio input | |
const issueIdInput = document.querySelector('#selected_issue_id input'); | |
const currentIssueIdStr = issueIdInput ? issueIdInput.value : null; | |
window.currentIssueId = currentIssueIdStr ? parseInt(currentIssueIdStr, 10) : null; | |
if (window.currentIssueId !== null && !isNaN(window.currentIssueId)) {{ | |
// Send the delta via WebSocket | |
window.sendWsMessage({{ | |
type: 'code_update', | |
issue_num: window.currentIssueId, | |
delta: JSON.stringify(delta), // Send delta as JSON string | |
clientId: window.clientId | |
}}); | |
}} else {{ | |
// console.warn('No valid issue selected, cannot send code update.'); | |
}} | |
}} else {{ | |
// console.log('Ignoring programmatic change delta:', delta); | |
}} | |
}}); | |
console.log('Ace editor listener attached.'); | |
}} else {{ | |
console.error('Ace editor instance is null after setup attempt.'); | |
}} | |
}} // end setupCodeEditorListener | |
// --- Initialization and Observation --- | |
// Use MutationObserver to detect when the editor component is added/changed in the DOM. | |
const observerTarget = document.body; // Observe the whole body or a closer parent container | |
const observer = new MutationObserver((mutationsList, observer) => {{ | |
for(const mutation of mutationsList) {{ | |
if (mutation.type === 'childList') {{ | |
// Check if the specific editor element we need is now present | |
if (document.querySelector('#code_editor_component .ace_editor')) {{ | |
console.log("Ace editor element detected in DOM, attempting setup..."); | |
// Debounce setup call slightly to ensure editor is fully ready | |
clearTimeout(window.setupEditorTimeout); | |
window.setupEditorTimeout = setTimeout(setupCodeEditorListener, 200); | |
// Optionally disconnect observer if setup only needs to happen once, | |
// but re-observing might be needed if component re-renders completely. | |
// observer.disconnect(); | |
// break; // Found it, no need to check other mutations in this batch | |
}} | |
}} | |
}} | |
}}); | |
// Start observing the target node for configured mutations | |
if(observerTarget) {{ | |
console.log("Starting MutationObserver to detect code editor element."); | |
observer.observe(observerTarget, {{ childList: true, subtree: true }}); | |
}} | |
// Initial attempt to set up listener after a short delay, in case element exists on load | |
setTimeout(setupCodeEditorListener, 500); | |
}} else {{ | |
console.log('WebSocket connection appears to be already initialized.'); | |
}} | |
</script> | |
""" | |
# Inject the JavaScript into the Gradio app when it loads | |
app.load(_js=web_socket_js(WS_PORT), fn=None, inputs=None, outputs=None) | |
return app | |
# ========== WebSocket Server Logic ========== | |
async def handle_ws_connection(websocket: websockets.WebSocketServerProtocol, path: str, manager: IssueManager): | |
"""Handles incoming WebSocket connections and messages.""" | |
client_id = None # Initialize client_id for this connection | |
manager.ws_clients.append(websocket) | |
logger.info(f"WebSocket client connected: {websocket.remote_address} (Total: {len(manager.ws_clients)})") | |
try: | |
async for message in websocket: | |
try: | |
data = json.loads(message) | |
msg_type = data.get("type") | |
# logger.debug(f"Received WS message: {data}") # Log received message content | |
if msg_type == "join": | |
client_id = data.get("clientId", f"anon_{websocket.id}") | |
setattr(websocket, 'client_id', client_id) # Associate ID with socket object | |
manager.collaborators[client_id] = {"name": client_id, "status": "Connected"} # Add to collaborators | |
logger.info(f"Client {client_id} joined.") | |
# Trigger immediate broadcast in case it's needed, but don't await | |
asyncio.create_task(manager.broadcast_collaboration_status()) | |
elif msg_type == "code_update": | |
issue_num = data.get("issue_num") | |
delta = data.get("delta") | |
sender_id = data.get("clientId") # ID of the client who sent the update | |
if issue_num is not None and delta and sender_id: | |
# Pass client_id to handler to avoid broadcasting back to sender | |
await manager.handle_code_editor_update(issue_num, delta, sender_id) | |
else: | |
logger.warning(f"Invalid code_update message received: {data}") | |
elif msg_type == "status_update": # Client updates their status | |
sender_id = data.get("clientId") | |
status = data.get("status", "Idle") | |
if sender_id and sender_id in manager.collaborators: | |
manager.collaborators[sender_id]["status"] = status | |
logger.info(f"Client {sender_id} status updated: {status}") | |
# Trigger broadcast, don't await | |
asyncio.create_task(manager.broadcast_collaboration_status()) | |
else: | |
logger.warning(f"Unknown WebSocket message type received: {msg_type} from {client_id or websocket.remote_address}") | |
except json.JSONDecodeError: | |
logger.error(f"Received invalid JSON over WebSocket from {client_id or websocket.remote_address}: {message}") | |
except Exception as e: | |
logger.exception(f"Error processing WebSocket message from {client_id or websocket.remote_address}: {e}") | |
except ConnectionClosed as e: | |
logger.info(f"WebSocket client {client_id or websocket.remote_address} disconnected: (Code: {e.code}, Reason: {e.reason})") | |
except Exception as e: | |
logger.exception(f"Unexpected error in WebSocket handler for {client_id or websocket.remote_address}: {e}") | |
finally: | |
logger.info(f"Cleaning up connection for client {client_id or websocket.remote_address}") | |
# Use the safe removal method, ensuring it runs in the correct context if needed | |
manager.remove_ws_client(websocket) | |
# Trigger a final status broadcast | |
asyncio.create_task(manager.broadcast_collaboration_status()) | |
async def start_websocket_server(manager: IssueManager, port: int): | |
"""Starts the WebSocket server.""" | |
# Pass manager instance to the connection handler factory | |
handler_with_manager = lambda ws, path: handle_ws_connection(ws, path, manager) | |
try: | |
# Set ping interval and timeout to keep connections alive and detect broken ones | |
server = await websockets.serve( | |
handler_with_manager, | |
"0.0.0.0", # Bind to all interfaces | |
port, | |
ping_interval=20, # Send pings every 20 seconds | |
ping_timeout=20 # Wait 20 seconds for pong response | |
) | |
logger.info(f"WebSocket server started on ws://0.0.0.0:{port}") | |
await asyncio.Future() # Run forever until cancelled | |
except OSError as e: | |
logger.error(f"Failed to start WebSocket server on port {port}: {e}. Port might be in use.") | |
# Exit or handle error appropriately | |
raise | |
except Exception as e: | |
logger.exception(f"Unexpected error starting WebSocket server: {e}") | |
raise | |
def run_webhook_server(manager: IssueManager, port: int): | |
"""Starts the HTTP webhook server in a separate thread.""" | |
WebhookHandler.manager_instance = manager # Pass manager instance to the class | |
server_address = ("0.0.0.0", port) # Bind to all interfaces | |
try: | |
httpd = HTTPServer(server_address, WebhookHandler) | |
logger.info(f"Webhook HTTP server started on [http://0.0.0.0](http://0.0.0.0):{port}") | |
httpd.serve_forever() | |
except OSError as e: | |
logger.error(f"Failed to start Webhook server on port {port}: {e}. Port might be in use.") | |
# Exit or handle error appropriately | |
# Consider signaling the main thread to stop | |
except Exception as e: | |
logger.exception(f"Unexpected error in Webhook server: {e}") | |
# ========== Main Execution ========== | |
if __name__ == "__main__": | |
# --- Setup --- | |
manager = IssueManager() | |
main_event_loop = asyncio.get_event_loop() | |
manager.main_loop = main_event_loop # Ensure manager has loop reference if needed | |
# --- Start Background Servers --- | |
# 1. Webhook Server (HTTP) - Runs in its own thread | |
webhook_thread = threading.Thread(target=run_webhook_server, args=(manager, WEBHOOK_PORT), daemon=True) | |
webhook_thread.start() | |
# 2. WebSocket Server & Broadcast Task (Asyncio) - Run in main thread's event loop | |
async def main_async_tasks(): | |
# Start the periodic broadcast task | |
broadcast_task = asyncio.create_task(manager.broadcast_collaboration_status()) | |
# Start the WebSocket server | |
websocket_server_task = asyncio.create_task(start_websocket_server(manager, WS_PORT)) | |
# Keep tasks running | |
await asyncio.gather(broadcast_task, websocket_server_task, return_exceptions=True) | |
# --- Create Gradio App --- | |
# Must be created before starting the loop if it interacts with async functions directly | |
app = create_ui(manager) | |
# --- Launch Gradio App & Async Tasks --- | |
# Gradio's launch method handles the event loop integration when run directly. | |
# It will run the asyncio tasks alongside the FastAPI/Uvicorn server. | |
# app.queue() # Enable queue for handling multiple requests/long-running tasks | |
# app.launch( | |
# share=True, # Enable for public access (use with caution) | |
# server_name="0.0.0.0", # Bind to all interfaces for accessibility | |
# server_port=7860, # Default Gradio port | |
# favicon_path="[https://huggingface.co/front/assets/huggingface_logo-noborder.svg](https://huggingface.co/front/assets/huggingface_logo-noborder.svg)", | |
# Let Gradio manage the asyncio loop | |
# asyncio_task=main_async_tasks() # This might conflict with launch's loop management | |
# If launch() blocks, the asyncio tasks need to be run differently, | |
# potentially starting the loop manually before launch or using threading | |
# for the asyncio part if launch() isn't compatible. | |
# However, modern Gradio often handles this integration better. | |
# Start the asyncio tasks *after* defining the app but *before* launch blocks, | |
# or ensure launch itself runs them. Gradio's `launch` usually handles the loop. | |
# Let's rely on Gradio's launch to manage the loop and potentially run our tasks. | |
# If WS/broadcast doesn't work, we may need to manually manage the loop/threading. | |
# A common pattern if launch() blocks and doesn't run background async tasks: | |
asyncio_thread = threading.Thread(target=lambda: asyncio.run(main_async_tasks()), daemon=True) | |
asyncio_thread.start() | |
app.launch(share=True, server_name="0.0.0.0", server_port=7860, favicon_path="https://huggingface.co/front/assets/huggingface_logo-noborder.svg") | |
logger.info("Gradio app launched. Webhook server running in background thread.") | |
# The asyncio tasks (WebSocket, broadcast) should be running via Gradio's event loop. |