GitBot / app.py
acecalisto3's picture
Update app.py
cc9992d verified
raw
history blame
77.5 kB
import gradio as gr
import os
import aiohttp
import asyncio
from git import Repo, GitCommandError
from pathlib import Path
from datetime import datetime
import shutil
import json
import logging
import re
from typing import Dict, List, Optional, Tuple
import subprocess
import plotly.express as px
import plotly.graph_objects as go
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import speech_recognition as sr
# Removed duplicate import: from code_editor import code_editor
from functools import lru_cache
import hashlib
import markdown2
from concurrent.futures import ThreadPoolExecutor
from hdbscan import HDBSCAN
import websockets
from websockets.exceptions import ConnectionClosed
from code_editor import code_editor
# ========== Configuration ==========
WORKSPACE = Path("/tmp/issue_workspace")
WORKSPACE.mkdir(exist_ok=True)
GITHUB_API = "https://api.github.com/repos"
HF_INFERENCE_API = "https://api-inference.huggingface.co/models"
WEBHOOK_PORT = 8000
WS_PORT = 8001
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(max_workers=4)
HF_MODELS = {
"Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"Llama-3-8B": "meta-llama/Meta-Llama-3-8B",
"CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf",
"StarCoder2": "bigcode/starcoder2-15b"
}
# Default Model
DEFAULT_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1"
# ========== Modern Theme ==========
# Define the base theme
theme = gr.themes.Soft(
primary_hue="violet",
secondary_hue="emerald",
radius_size="lg",
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"]
).set(
# Apply custom settings using .set()
button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)",
button_primary_text_color="white",
# button_primary_border_radius="12px", # <-- FIX: Removed this line causing the TypeError
block_label_text_size="lg",
block_label_text_weight="600",
block_title_text_size="lg",
block_title_text_weight="800",
panel_background_fill="white",
# panel_border_radius="16px", # Assuming this might also cause issues if not supported, commented out as a precaution. Uncomment if needed and supported.
block_shadow="*shadow_drop_lg",
)
# ========== Enhanced Webhook Handler ==========
class WebhookHandler(BaseHTTPRequestHandler):
# Keep a reference to the manager instance
manager_instance = None
def do_POST(self):
content_length = int(self.headers['Content-Length'])
try:
payload = json.loads(self.rfile.read(content_length).decode('utf-8'))
except json.JSONDecodeError:
self.send_response(400)
self.end_headers()
self.wfile.write(b"Invalid JSON payload")
return
except Exception as e:
logger.error(f"Error reading webhook payload: {e}")
self.send_response(500)
self.end_headers()
return
event = self.headers.get('X-GitHub-Event')
logger.info(f"Received GitHub webhook event: {event}")
if event == 'issues' and WebhookHandler.manager_instance:
action = payload.get('action')
logger.info(f"Issue action: {action}")
if action in ['opened', 'reopened', 'closed', 'assigned', 'edited']: # Handle edited issues too
# Ensure the event loop is running in the webhook thread if needed
# Get the loop associated with the asyncio thread
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop.is_running():
asyncio.run_coroutine_threadsafe(
WebhookHandler.manager_instance.handle_webhook_event(event, action, payload),
loop
)
else:
logger.error("Asyncio event loop is not running in the target thread for webhook.")
elif event == 'ping':
logger.info("Received GitHub webhook ping.")
else:
logger.warning(f"Unhandled event type: {event} or manager not initialized.")
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
# ========== AI-Powered Issue Manager ==========
class IssueManager:
def __init__(self):
self.issues: Dict[int, dict] = {}
self.repo_url: Optional[str] = None
self.repo: Optional[Repo] = None
self.current_issue: Optional[int] = None
self.github_token: Optional[str] = None
self.hf_token: Optional[str] = None
self.collaborators: Dict[str, dict] = {} # Example: {"user1": {"status": "editing file.py"}}
self.points: int = 0
self.severity_rules: Dict[str, List[str]] = {
"Critical": ["critical", "urgent", "security", "crash", "blocker"],
"High": ["high", "important", "error", "regression", "major"],
"Medium": ["medium", "bug", "performance", "minor"],
"Low": ["low", "documentation", "enhancement", "trivial", "feature"]
}
self.issue_clusters: Dict[int, List[int]] = {} # Store clusters: {cluster_id: [issue_index1, issue_index2]}
self.issue_list_for_clustering: List[dict] = [] # Store issues in list order for clustering index mapping
# self._init_local_models() # Consider lazy loading or conditional loading
self.ws_clients: List[websockets.WebSocketServerProtocol] = [] # Use WebSocketServerProtocol
self.code_editors: Dict[int, OTCodeEditor] = {} # Store code editors for each issue
self.main_loop = asyncio.get_event_loop() # Store ref to main loop if needed elsewhere
# Placeholder for local model initialization - implement actual loading if needed
def _init_local_models(self):
logger.info("Initializing local models (placeholder)...")
# self.code_model = pipeline(...)
# self.summarizer = pipeline(...)
logger.info("Local models initialized (placeholder).")
# Simple hash for caching based on issue content
def _get_issue_hash(self, issue_data: dict) -> str:
content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}"
return hashlib.md5(content.encode()).hexdigest()
@lru_cache(maxsize=100)
async def cached_suggestion(self, issue_hash: str, model_key: str) -> str:
# Find the issue corresponding to the hash (inefficient, improve if needed)
found_issue = None
for issue in self.issues.values():
if self._get_issue_hash(issue) == issue_hash:
found_issue = issue
break
if not found_issue:
return "Error: Issue not found for the given hash."
if model_key not in HF_MODELS:
return f"Error: Invalid model key: {model_key}"
logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model_key}.")
# Pass the actual issue dict and model_key to suggest_resolution
return await self.suggest_resolution(found_issue, model_key)
async def handle_webhook_event(self, event: str, action: str, payload: dict):
logger.info(f"Processing webhook event: {event}, action: {action}")
issue_data = payload.get('issue')
if not issue_data:
logger.warning("Webhook payload missing 'issue' data.")
return
issue_number = issue_data.get('number')
if not issue_number:
logger.warning("Webhook issue data missing 'number'.")
return
needs_update = False
if action == 'closed':
logger.info(f"Removing closed issue {issue_number} from active list.")
if self.issues.pop(issue_number, None):
needs_update = True
# Optionally remove associated editor, etc.
self.code_editors.pop(issue_number, None)
elif action in ['opened', 'reopened', 'edited']: # Handle edited issues too
logger.info(f"Adding/Updating issue {issue_number} from webhook.")
self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper
needs_update = True
# Potentially trigger re-clustering or update specific issue details
elif action == 'assigned':
logger.info(f"Issue {issue_number} assigned to {payload.get('assignee', {}).get('login', 'N/A')}")
if issue_number in self.issues:
self.issues[issue_number] = self._process_issue_data(issue_data) # Update issue data
needs_update = True
else: # Issue might not be in our list if it wasn't open initially
self.issues[issue_number] = self._process_issue_data(issue_data)
needs_update = True
else:
logger.info(f"Ignoring action '{action}' for issue {issue_number}.")
# Trigger a UI update notification via WebSocket if data changed
if needs_update:
await self.broadcast_issue_update()
def _process_issue_data(self, issue_data: dict) -> dict:
"""Helper to structure issue data consistently."""
return {
"id": issue_data['number'],
"title": issue_data.get('title', 'No Title'),
"body": issue_data.get('body', ''),
"state": issue_data.get('state', 'unknown'),
"labels": [label['name'] for label in issue_data.get('labels', [])],
"assignee": issue_data.get('assignee', {}).get('login') if issue_data.get('assignee') else None,
"url": issue_data.get('html_url', '#')
# Add other relevant fields if needed
}
async def crawl_issues(self, repo_url: str, github_token: str, hf_token: str) -> Tuple[List[List], go.Figure, str]:
"""
Crawls issues, updates internal state, performs clustering, and returns data for UI update.
"""
if not repo_url or not hf_token: # GitHub token is optional for public repos
return [], go.Figure(), "Error: Repository URL and HF Token are required."
logger.info(f"Starting issue crawl for {repo_url}")
self.repo_url = repo_url
self.github_token = github_token
self.hf_token = hf_token
self.issues = {} # Reset issues before crawl
# Extract owner/repo from URL
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", repo_url)
if not match:
logger.error(f"Invalid GitHub URL format: {repo_url}")
return [], go.Figure(), "Error: Invalid GitHub URL format. Use https://github.com/owner/repo"
owner, repo_name = match.groups()
api_url = f"{GITHUB_API}/{owner}/{repo_name}/issues?state=open&per_page=100" # Fetch only open issues, max 100
headers = {
"Accept": "application/vnd.github.v3+json"
}
if github_token:
headers["Authorization"] = f"token {github_token}"
try:
all_issues_data = []
page = 1
logger.info(f"Fetching issues from {api_url}...")
async with aiohttp.ClientSession(headers=headers) as session:
while True:
paginated_url = f"{api_url}&page={page}"
async with session.get(paginated_url) as response:
response.raise_for_status() # Raise exception for bad status codes
issues_page_data = await response.json()
if not issues_page_data: # No more issues on this page
break
logger.info(f"Fetched page {page} with {len(issues_page_data)} issues.")
all_issues_data.extend(issues_page_data)
# Check Link header for next page (more robust pagination)
if 'next' not in response.headers.get('Link', ''):
break
page += 1
logger.info(f"Total issues fetched: {len(all_issues_data)}")
for issue_data in all_issues_data:
issue_number = issue_data['number']
self.issues[issue_number] = self._process_issue_data(issue_data) # Use helper
if not self.issues:
logger.info("No open issues found.")
return [], go.Figure(), "No open issues found in the repository."
# Prepare data for clustering
self.issue_list_for_clustering = list(self.issues.values())
logger.info("Clustering issues...")
await self._cluster_similar_issues() # Update self.issue_clusters
# Prepare data for Gradio Dataframe
dataframe_data = []
severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0}
# Map clustered indices back to issue numbers and determine severity
cluster_map = {} # {issue_index: cluster_id}
for cluster_id, indices in self.issue_clusters.items():
for index in indices:
cluster_map[index] = cluster_id
for i, issue in enumerate(self.issue_list_for_clustering):
severity = self._determine_severity(issue['labels'])
severity_counts[severity] += 1
cluster_id = cluster_map.get(i, -1) # -1 for noise/unclustered
dataframe_data.append([
issue['id'],
issue['title'],
severity,
cluster_id if cluster_id != -1 else "N/A" # Display N/A for noise
])
logger.info("Generating statistics plot...")
stats_fig = self._generate_stats_plot(severity_counts)
success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups (excluding noise)."
logger.info(success_msg)
return dataframe_data, stats_fig, success_msg
except aiohttp.ClientResponseError as e:
logger.error(f"GitHub API request failed: {e.status} {e.message}")
error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token permissions and repo URL."
if e.status == 404:
error_msg = f"Error: Repository not found at {repo_url}. Check the URL."
elif e.status == 401:
error_msg = "Error: Invalid GitHub token or insufficient permissions."
elif e.status == 403:
error_msg = "Error: GitHub API rate limit exceeded or forbidden access. Try adding a GitHub token."
return [], go.Figure(), error_msg
except GitCommandError as e:
logger.error(f"Git clone error: {e}") # Should not happen if not cloning
return [], go.Figure(), f"Error related to Git: {e}"
except Exception as e:
logger.exception(f"An unexpected error occurred during issue crawl: {e}") # Log full traceback
return [], go.Figure(), f"An unexpected error occurred: {e}"
def _determine_severity(self, labels: List[str]) -> str:
"""Determines issue severity based on labels."""
labels_lower = [label.lower() for label in labels]
for severity, keywords in self.severity_rules.items():
if any(keyword in label for keyword in keywords for label in labels_lower):
return severity
return "Unknown" # Default if no matching label found
def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure:
"""Generates a Plotly bar chart for issue severity distribution."""
# Filter out severities with 0 counts
filtered_counts = {k: v for k, v in severity_counts.items() if v > 0}
if not filtered_counts:
# Return an empty figure with a message if no issues found
fig = go.Figure()
fig.update_layout(
title="Issue Severity Distribution",
xaxis = {"visible": False},
yaxis = {"visible": False},
annotations = [{
"text": "No issues found to display statistics.",
"xref": "paper",
"yref": "paper",
"showarrow": False,
"font": {"size": 16}
}],
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)'
)
return fig
severities = list(filtered_counts.keys())
counts = list(filtered_counts.values())
fig = px.bar(
x=severities,
y=counts,
title="Issue Severity Distribution",
labels={'x': 'Severity', 'y': 'Number of Issues'},
color=severities, # Color bars by severity
color_discrete_map={ # Define colors
'Critical': '#DC2626', # Red
'High': '#F97316', # Orange
'Medium': '#FACC15', # Yellow
'Low': '#84CC16', # Lime
'Unknown': '#6B7280' # Gray
},
text=counts # Display counts on bars
)
fig.update_layout(
xaxis_title=None, # Cleaner look
yaxis_title="Number of Issues",
plot_bgcolor='rgba(0,0,0,0)', # Transparent background
paper_bgcolor='rgba(0,0,0,0)',
showlegend=False,
xaxis={'categoryorder':'array', 'categoryarray':['Critical', 'High', 'Medium', 'Low', 'Unknown']} # Order bars
)
fig.update_traces(textposition='outside')
return fig
async def _cluster_similar_issues(self):
"""Generates embeddings and clusters issues using HDBSCAN."""
if not self.issue_list_for_clustering or not self.hf_token:
logger.warning("Cannot cluster issues: No issues loaded or HF token missing.")
self.issue_clusters = {}
return
logger.info("Generating embeddings for clustering...")
try:
# Combine title and body for embedding generation
texts_to_embed = [f"{i.get('title','')} - {i.get('body','')[:500]}" for i in self.issue_list_for_clustering] # Limit body length
embeddings = await self._generate_embeddings(texts_to_embed)
if embeddings is None or not isinstance(embeddings, list) or len(embeddings) != len(self.issue_list_for_clustering):
logger.error(f"Failed to generate valid embeddings. Expected {len(self.issue_list_for_clustering)}, got {len(embeddings) if embeddings else 'None'}.")
self.issue_clusters = {}
return
logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN...")
# Use HDBSCAN for density-based clustering
# min_cluster_size: minimum number of samples in a cluster
# metric: distance metric used
# allow_single_cluster: If True, allows forming a single large cluster
clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True)
clusters = clusterer.fit_predict(embeddings)
self.issue_clusters = {}
noise_count = 0
for i, cluster_id in enumerate(clusters):
if cluster_id == -1: # HDBSCAN uses -1 for noise points
noise_count += 1
continue # Skip noise points
if cluster_id not in self.issue_clusters:
self.issue_clusters[cluster_id] = []
self.issue_clusters[cluster_id].append(i) # Store original index
logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters with {noise_count} noise points.")
except Exception as e:
logger.exception(f"Error during issue clustering: {e}")
self.issue_clusters = {}
async def _generate_embeddings(self, texts: List[str]):
"""Generates sentence embeddings using Hugging Face Inference API."""
if not self.hf_token:
logger.error("Hugging Face token is not set. Cannot generate embeddings.")
return None
# Recommended embedding model (check HF for alternatives if needed)
model_id = "sentence-transformers/all-mpnet-base-v2"
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.")
# Add timeout to the request
timeout = aiohttp.ClientTimeout(total=60) # 60 seconds timeout
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
try:
# Add wait_for_model=True if using serverless inference endpoints
payload = {"inputs": texts, "options": {"wait_for_model": True}}
response = await session.post(api_url, json=payload)
response.raise_for_status()
result = await response.json()
# Check if the result is a list of embeddings (floats)
if isinstance(result, list) and all(isinstance(emb, list) for emb in result):
logger.info(f"Successfully received {len(result)} embeddings.")
return result
elif isinstance(result, dict) and 'error' in result:
logger.error(f"HF Inference API returned an error: {result['error']}")
return None
else:
logger.error(f"Unexpected embedding format received: {type(result)}. Full response: {result}")
return None
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed: {e.status} {e.message}")
try:
error_body = await e.response.text()
logger.error(f"Response body: {error_body[:500]}") # Log first 500 chars
except Exception as read_err:
logger.error(f"Could not read error response body: {read_err}")
return None
except asyncio.TimeoutError:
logger.error(f"HF Inference API request timed out after {timeout.total} seconds.")
return None
except Exception as e:
logger.exception(f"An unexpected error occurred during embedding generation: {e}")
return None
async def generate_code_patch(self, issue_number: int, model_key: str) -> dict:
"""Generates a code patch suggestion using a selected AI model."""
if issue_number not in self.issues:
return {"error": f"Issue {issue_number} not found."}
if not self.hf_token:
return {"error": "Hugging Face token not set."}
if model_key not in HF_MODELS:
return {"error": f"Invalid model key: {model_key}"}
issue = self.issues[issue_number]
model_id = HF_MODELS[model_key]
logger.info(f"Generating patch for issue {issue_number} using model {model_id}")
# --- Context Gathering (Placeholder) ---
context = "Context gathering not implemented. Provide relevant code snippets in the issue description if possible."
# context = await self._get_code_context(issue_number) # Uncomment if implemented
# --- Prompt Engineering ---
prompt = f"""You are an expert programmer analyzing a GitHub issue. Your task is to generate a code patch in standard `diff` format to fix the described problem.
## Issue Details:
### Title: {issue.get('title', 'N/A')}
### Body:
{issue.get('body', 'N/A')}
### Labels: {', '.join(issue.get('labels', []))}
## Relevant Code Context (if available):
{context}
## Instructions:
1. Carefully analyze the issue description and context.
2. Identify the specific code changes required to resolve the issue.
3. Generate a patch containing *only* the necessary code modifications.
4. Format the patch strictly according to the standard `diff` format, enclosed in a ```diff ... ``` block.
5. Provide a brief explanation *before* the diff block explaining the reasoning behind the changes.
6. If a patch cannot be reasonably determined from the provided information, state that clearly instead of generating an incorrect patch.
## Patch Suggestion:
"""
# --- Call Inference API ---
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": { # Adjust parameters as needed
"max_new_tokens": 1536, # Increased max tokens for potentially larger patches
"temperature": 0.2, # Low temperature for deterministic code generation
"return_full_text": False, # Only get the generated part
"do_sample": False, # Turn off sampling for more deterministic output with low temp
# "top_p": 0.9, # Less relevant when do_sample=False
},
"options": {"wait_for_model": True} # For serverless endpoints
}
timeout = aiohttp.ClientTimeout(total=120) # 2 minutes timeout for generation
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
response.raise_for_status()
result = await response.json()
if result and isinstance(result, list) and 'generated_text' in result[0]:
generated_text = result[0].get('generated_text', '')
logger.info(f"Received patch suggestion from {model_id}")
# Improved extraction of diff block
diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL)
explanation = generated_text.split("```diff")[0].strip()
if not explanation: # Handle cases where explanation might be missing
explanation = "No explanation provided."
if diff_match:
patch = diff_match.group(1).strip()
# Basic validation: check if patch contains diff markers like + or -
if not re.search(r'^\s*[+-]', patch, re.MULTILINE):
patch = f"AI generated response, but no standard diff markers (+/-) found:\n---\n{patch}\n---"
logger.warning("Generated patch lacks standard diff markers.")
else:
patch = "No diff block found in the AI response."
logger.warning("No diff block found in patch suggestion response.")
return {
"explanation": explanation,
"patch": patch,
"model_used": model_id
}
elif isinstance(result, dict) and 'error' in result:
logger.error(f"HF Inference API returned an error for patch generation: {result['error']}")
return {"error": f"AI model returned an error: {result['error']}"}
else:
logger.error(f"Unexpected response format from {model_id} for patch: {result}")
return {"error": "Received unexpected response format from AI model."}
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed for patch generation: {e.status} {e.message}")
error_body = await e.response.text()
logger.error(f"Response body: {error_body[:500]}")
return {"error": f"AI model request failed ({e.status}). Check model availability and HF token."}
except asyncio.TimeoutError:
logger.error(f"HF Inference API request for patch generation timed out after {timeout.total} seconds.")
return {"error": "AI model request timed out."}
except Exception as e:
logger.exception(f"Error generating code patch: {e}")
return {"error": f"An unexpected error occurred: {e}"}
async def _get_code_context(self, issue_number: int) -> str:
"""Placeholder for retrieving relevant code context for an issue."""
# This needs a proper implementation based on how the repo is managed
# - Clone/pull the repo if not present/up-to-date
# - Identify relevant files (e.g., using file paths mentioned in the issue, heuristics)
# - Read relevant parts of the files
logger.warning(f"Code context retrieval for issue {issue_number} is not fully implemented.")
# Example: Look for file paths in the issue body
# issue_body = self.issues.get(issue_number, {}).get('body', '')
# Find potential file paths (very basic example)
# potential_files = re.findall(r'[\w/.-]+\.(?:py|js|java|cpp|c|ts|html|css)', issue_body)
# Read content from these files if they exist in the workspace repo
return "Code context retrieval is currently a placeholder."
async def suggest_resolution(self, issue: dict, model_key: str) -> str:
"""Suggests a resolution description using a selected AI model."""
if not self.hf_token:
return "Error: Hugging Face token not set."
if model_key not in HF_MODELS:
return f"Error: Invalid model key: {model_key}"
model_id = HF_MODELS[model_key]
logger.info(f"Requesting resolution suggestion for issue {issue.get('id','N/A')} using {model_id}")
prompt = f"""Analyze the following GitHub issue and provide a concise, step-by-step suggestion on how to resolve it. Focus on the technical steps required. Be clear and actionable.
## Issue Details:
### Title: {issue.get('title', 'N/A')}
### Body:
{issue.get('body', 'N/A')}
### Labels: {', '.join(issue.get('labels', []))}
## Suggested Resolution Steps:
1. **Understand the Root Cause:** [Briefly explain the likely cause based on the description]
2. **Identify Files:** [Suggest specific files or modules likely involved]
3. **Implement Changes:** [Describe the necessary code modifications or additions]
4. **Test Thoroughly:** [Mention specific testing approaches needed]
5. **Create Pull Request:** [Standard final step]
Provide details for steps 1-4 based on the issue.
"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 768, # Increased token limit
"temperature": 0.6, # Slightly lower temp for more focused suggestions
"return_full_text": False,
"do_sample": True,
"top_p": 0.95,
},
"options": {"wait_for_model": True} # For serverless endpoints
}
timeout = aiohttp.ClientTimeout(total=90) # 90 seconds timeout
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
response.raise_for_status()
result = await response.json()
if result and isinstance(result, list) and 'generated_text' in result[0]:
suggestion = result[0].get('generated_text', 'No suggestion generated.')
logger.info(f"Received suggestion from {model_id}")
return suggestion.strip()
elif isinstance(result, dict) and 'error' in result:
logger.error(f"HF Inference API returned an error for suggestion: {result['error']}")
return f"Error: AI model returned an error: {result['error']}"
else:
logger.error(f"Unexpected response format from {model_id} for suggestion: {result}")
return "Error: Received unexpected response format from AI model."
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed for suggestion: {e.status} {e.message}")
error_body = await e.response.text()
logger.error(f"Response body: {error_body[:500]}")
return f"Error: AI model request failed ({e.status}). Check model availability and HF token."
except asyncio.TimeoutError:
logger.error(f"HF Inference API request for suggestion timed out after {timeout.total} seconds.")
return "Error: AI model request timed out."
except Exception as e:
logger.exception(f"Error suggesting resolution: {e}")
return f"An unexpected error occurred: {e}"
# --- WebSocket Methods ---
async def broadcast_collaboration_status(self):
"""Periodically sends collaborator status to all connected clients."""
while True:
await asyncio.sleep(5) # Send updates every 5 seconds
if not self.ws_clients:
continue
# Create payload within the loop to get current status
status_payload = json.dumps({
"type": "collaboration_status",
"collaborators": self.collaborators
})
if self.collaborators: # Only broadcast if there's someone to report
logger.debug(f"Broadcasting status: {status_payload}")
# Use asyncio.gather to send concurrently, handling potential errors
results = await asyncio.gather(
*[client.send(status_payload) for client in self.ws_clients],
return_exceptions=True # Don't let one failed send stop others
)
# Log any errors that occurred during broadcast
active_clients = []
for i, result in enumerate(results):
client = self.ws_clients[i]
if isinstance(result, Exception):
logger.warning(f"Failed to send status to client {getattr(client, 'client_id', client.remote_address)}: {result}. Removing client.")
# Schedule removal in the main loop to avoid modifying list while iterating
# self.main_loop.call_soon_threadsafe(self.remove_ws_client, client) # Requires main_loop ref
else:
active_clients.append(client)
# self.ws_clients = active_clients # Update list - Careful with concurrency here
async def handle_code_editor_update(self, issue_num: int, delta: str, client_id: str):
"""Applies a delta from one client and broadcasts it to others."""
if issue_num not in self.code_editors:
# Initialize editor if it doesn't exist (e.g., first edit)
# This requires knowing the initial content, which might be tricky.
# For now, log a warning. A better approach might be needed.
logger.warning(f"Received code update for non-existent editor instance for issue {issue_num}. Ignoring.")
# Alternative: self.code_editors[issue_num] = OTCodeEditor(initial_content="...")
return
try:
# Apply the delta to the server-side authoritative state
# Assuming apply_delta modifies the internal state correctly
parsed_delta = json.loads(delta) # Parse delta once
self.code_editors[issue_num].apply_delta(parsed_delta)
logger.info(f"Applied delta for issue {issue_num} from client {client_id}")
# Broadcast the delta to all *other* connected clients
update_payload = json.dumps({
"type": "code_update",
"issue_num": issue_num,
"delta": delta # Send the original delta JSON string
})
tasks = []
for client in self.ws_clients:
# Check if the client has an associated ID and avoid sending back to originator
client_ws_id = getattr(client, 'client_id', None)
if client_ws_id != client_id:
tasks.append(client.send(update_payload))
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log errors during broadcast
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_client = tasks[i].__self__ # Get client from task
logger.warning(f"Failed to broadcast code update to client {getattr(failed_client, 'client_id', 'N/A')}: {result}")
except json.JSONDecodeError:
logger.error(f"Received invalid JSON delta for issue {issue_num}: {delta}")
except Exception as e:
logger.exception(f"Error handling code editor update for issue {issue_num}: {e}")
async def broadcast_issue_update(self):
"""Notifies clients that the issue list/data has changed."""
if not self.ws_clients:
return
logger.info("Broadcasting issue update notification to clients.")
update_payload = json.dumps({"type": "issues_updated"})
results = await asyncio.gather(
*[client.send(update_payload) for client in self.ws_clients],
return_exceptions=True
)
for i, result in enumerate(results):
client = self.ws_clients[i]
if isinstance(result, Exception):
logger.warning(f"Failed to send issue update notification to client {getattr(client, 'client_id', client.remote_address)}: {result}")
def remove_ws_client(self, client_to_remove: websockets.WebSocketServerProtocol):
"""Safely removes a client from the list and collaborator dict."""
client_id = getattr(client_to_remove, 'client_id', None)
if client_to_remove in self.ws_clients:
self.ws_clients.remove(client_to_remove)
logger.info(f"Removed WebSocket client: {client_id or client_to_remove.remote_address}")
if client_id and client_id in self.collaborators:
del self.collaborators[client_id]
logger.info(f"Removed collaborator {client_id}.")
# No need to broadcast here, the periodic task will reflect the change
# ========== Gradio UI Definition ==========
def create_ui(manager: IssueManager):
"""Creates the Gradio interface."""
# --- Helper Functions for UI ---
def generate_issue_preview(issue_num: Optional[int]) -> str:
"""Generates HTML preview for a selected issue."""
if issue_num is None or issue_num not in manager.issues:
return "<p>Select an issue from the board to see details.</p>"
issue = manager.issues[issue_num]
# Convert markdown body to HTML using markdown2 with extras
html_body = markdown2.markdown(
issue.get('body', '*No description provided.*'),
extras=["fenced-code-blocks", "tables", "strike", "task_list"]
)
# Basic styling
preview_html = """
<div style="border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px; background-color: #f9fafb; font-family: 'Inter', sans-serif;">
<h4 style="margin-top: 0; margin-bottom: 10px;">
<a href='{issue.get('url', '#')}' target='_blank' style='color: #6d28d9; text-decoration: none; font-weight: 600;'>
#{issue['id']} - {issue.get('title', 'N/A')}
</a>
</h4>
<hr style='margin: 10px 0; border-top: 1px solid #e5e7eb;'>
<p style="font-size: 0.9em; color: #4b5563; margin-bottom: 5px;">
<strong>State:</strong> <span style="background-color: #ddd; padding: 1px 4px; border-radius: 3px;">{issue.get('state', 'N/A')}</span> |
<strong>Assignee:</strong> {issue.get('assignee', 'None')}
</p>
<p style="font-size: 0.9em; color: #4b5563; margin-bottom: 10px;">
<strong>Labels:</strong> {' | '.join(f'<span style=\'background-color: #eee; padding: 2px 5px; border-radius: 4px; font-size: 0.9em; display: inline-block; margin-right: 3px;\'>{l}</span>' for l in issue.get('labels', [])) or 'None'}
</p>
<div style="margin-top: 10px; max-height: 350px; overflow-y: auto; border-top: 1px dashed #ccc; padding-top: 10px; font-size: 0.95em; line-height: 1.5;">
{html_body}
</div>
</div>
"""
return preview_html
async def get_ai_suggestion_wrapper(issue_num: Optional[int], model_key: str) -> str:
"""Wrapper to get AI suggestion for the chat display."""
if issue_num is None or issue_num not in manager.issues:
return "Please select a valid issue first."
# Use cached_suggestion which handles the actual API call via lru_cache
# Note: cached_suggestion needs the issue *hash* and model *ID*
issue = manager.issues[issue_num]
issue_hash = manager._get_issue_hash(issue)
# Pass model_key directly, cached_suggestion will resolve it
suggestion = await manager.cached_suggestion(issue_hash, model_key)
# Format for display
return f"**Suggestion based on {model_key}:**\n\n---\n{suggestion}"
async def get_ai_patch_wrapper(issue_num: Optional[int], model_key: str) -> str:
"""Wrapper to get AI patch for the chat display."""
if issue_num is None or issue_num not in manager.issues:
return "Please select a valid issue first."
result = await manager.generate_code_patch(issue_num, model_key)
if "error" in result:
return f"**Error generating patch:** {result['error']}"
else:
# Format for chat display using Markdown code block
return f"""**Patch Suggestion from {result.get('model_used', model_key)}:**
**Explanation:**
{result.get('explanation', 'N/A')}
---
**Patch:**
```diff
{result.get('patch', 'N/A')}
```"""
# --- Gradio Blocks ---
with gr.Blocks(theme=theme, title="🤖 AI Issue Resolver Pro", css=".gradio-container {max-width: 1400px !important;}") as app:
gr.Markdown("""
<div style="text-align: center; margin-bottom: 20px;">
<h1 style="color: #6d28d9; font-weight: 800;">🚀 AI Issue Resolver Pro</h1>
<p style="color: #4b5563; font-size: 1.1em;">Next-generation issue resolution powered by AI collaboration</p>
</div>
""")
# --- Configuration Row ---
with gr.Row(variant="panel", elem_id="config-panel"):
with gr.Column(scale=3):
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="[https://github.com/owner/repo](https://github.com/owner/repo)", info="Enter the full URL of the public GitHub repository.", elem_id="repo_url")
with gr.Row():
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", info="Required for private repos or higher rate limits.", elem_id="github_token")
hf_token = gr.Textbox(label="Hugging Face Token", type="password", info="Required for AI model interactions.", elem_id="hf_token")
with gr.Column(scale=1, min_width=250):
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value="Mistral-8x7B",
label="🤖 Select AI Model", info="Choose the AI for suggestions and patches.", elem_id="model_select")
crawl_btn = gr.Button("🛰️ Scan Repository Issues", variant="primary", icon="🔍", elem_id="crawl_btn")
status_output = gr.Textbox(label="Status", interactive=False, lines=1, max_lines=1, placeholder="Status updates will appear here...", elem_id="status_output")
# --- Main Tabs ---
with gr.Tabs(elem_id="main-tabs"):
# --- Issue Board Tab ---
with gr.Tab("📋 Issue Board", id="board", elem_id="tab-board"):
with gr.Row(equal_height=False):
with gr.Column(scale=3):
gr.Markdown("### Open Issues")
issue_list = gr.Dataframe(
headers=["ID", "Title", "Severity", "Cluster"],
datatype=["number", "str", "str", "str"], # Cluster ID shown as str
interactive=True,
wrap=True, # Wrap long titles
elem_id="issue_list_df",
)
with gr.Column(scale=2, min_width=350):
gr.Markdown("### Issue Severity")
stats_plot = gr.Plot(elem_id="stats_plot")
# Placeholder for collaborators - updated via JS
collab_status = gr.HTML("""
<div style="margin-top: 20px; border: 1px solid #e5e7eb; padding: 10px; border-radius: 8px; background-color: #f9fafb;">
<h4 style="margin-bottom: 5px; color: #374151; font-size: 1em;">👥 Active Collaborators</h4>
<div id="collab-list" style="font-size: 0.9em; max-height: 100px; overflow-y: auto;">
Connecting...
</div>
</div>
""", elem_id="collab_status_html")
# --- Resolution Studio Tab ---
with gr.Tab("💻 Resolution Studio", id="studio", elem_id="tab-studio"):
with gr.Row():
# Left Column: Issue Details & AI Tools
with gr.Column(scale=1, min_width=400):
gr.Markdown("### Selected Issue Details")
# Hidden number input to store selected issue ID
selected_issue_id = gr.Number(label="Selected Issue ID", visible=False, precision=0, elem_id="selected_issue_id")
issue_preview_html = gr.HTML(
"<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>",
elem_id="issue_preview"
)
with gr.Accordion("🛠️ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"):
suggest_btn = gr.Button("🧠 Suggest Resolution Steps", icon="💡", elem_id="suggest_btn")
patch_btn = gr.Button("📝 Generate Code Patch", icon="🩹", elem_id="patch_btn")
# Add placeholders for other buttons if needed
# test_btn = gr.Button("🧪 Create Tests (Future)", icon="🔬", interactive=False)
# impact_btn = gr.Button("📊 Impact Analysis (Future)", icon="📈", interactive=False)
# Use Markdown for better formatting of AI output
ai_output_display = gr.Markdown(value="*AI suggestions and patches will appear here...*", elem_id="ai_output_display")
# Right Column: Code Editor
with gr.Column(scale=2, min_width=500):
gr.Markdown("### Collaborative Code Editor")
# Use the imported code_editor component
# We'll update its value dynamically when an issue is selected
# Ensure the code_editor component itself handles language setting based on input dict keys or a prop
# Replace the existing code_edit_component definition with:
code_edit_component = code_editor(
value={"placeholder.txt": "# Select an issue to load code."},
key="code_editor"
)
# --- Analytics Tab (Placeholder) ---
with gr.Tab("📈 Analytics", id="analytics", elem_id="tab-analytics"):
gr.Markdown("### Analytics Dashboard (Placeholder)")
gr.Markdown("Future home for resolution timelines, achievement badges, and more detailed metrics.")
# with gr.Row():
# gr.Markdown("#### 📅 Resolution Timeline")
# timeline = gr.Timeline() # Requires specific data format
# with gr.Row():
# gr.Markdown("#### 🏆 Achievement System")
# badges = gr.HTML("<div class='badges'>Coming Soon!</div>")
# --- Event Handlers ---
# 1. Crawl Button Click
crawl_btn.click(
fn=manager.crawl_issues,
inputs=[repo_url, github_token, hf_token],
outputs=[issue_list, stats_plot, status_output],
api_name="crawl_issues", # For API access if needed
show_progress="full"
)
# 2. Issue Selection in Dataframe
async def handle_issue_select(evt: gr.SelectData):
"""Handles issue selection: updates preview, loads code (placeholder)."""
default_response = {
selected_issue_id: gr.update(value=None),
issue_preview_html: gr.update("<p style='color: #6b7280;'>Select an issue from the table.</p>"),
code_edit_component: gr.update(value={"placeholder.txt": "# Select an issue to load code."}),
ai_output_display: gr.update(value="*AI suggestions and patches will appear here...*")
}
if evt.index[0] is None or not hasattr(evt, 'value') or not evt.value:
logger.info("Issue deselected or invalid selection event.")
return default_response
try:
selected_id = int(evt.value[0])
logger.info(f"Issue selected: ID {selected_id}")
# Update components with progress indication
updates = {
selected_issue_id: gr.update(value=selected_id),
issue_preview_html: gr.update(generate_issue_preview(selected_id)),
code_edit_component: gr.update(value={"loading.txt": "# Loading code...\n"}),
ai_output_display: gr.update(value="*Refreshing suggestions...*")
}
# --- Simulated code loading ---
files_content = {
f"issue_{selected_id}_code.py": (
f"# Code related to issue {selected_id}\n"
"# Replace with actual file content\n\n"
f"def fix_issue_{selected_id}():\n"
" print('Implement solution here')"
),
"README.md": f"# Issue {selected_id}\n\nResolution work in progress..."
}
# Update code editor component
updates[code_edit_component] = gr.update(value=files_content)
updates[ai_output_display] = gr.update(value="*AI suggestions ready*")
# Initialize server-side editor state
manager.code_editors[selected_id] = OTCodeEditor(value=files_content)
logger.info(f"Initialized OT editor for issue {selected_id}")
return updates
except (ValueError, TypeError, IndexError, KeyError) as e:
logger.error(f"Selection error: {str(e)}")
return {
**default_response,
issue_preview_html: gr.update("<p style='color: red;'>Error loading issue details</p>"),
code_edit_component: gr.update(value={"error.txt": "# Error loading code\nPlease try again"}),
ai_output_display: gr.update(value="*Error processing selection*")
}
issue_list.select(
fn=handle_issue_select,
inputs=[], # Event data is passed automatically
outputs=[selected_issue_id, issue_preview_html, code_edit_component, ai_output_display],
show_progress="minimal"
)
# 3. Suggest Resolution Button Click
suggest_btn.click(
fn=get_ai_suggestion_wrapper,
inputs=[selected_issue_id, model_select],
outputs=[ai_output_display], # Output to Markdown component
api_name="suggest_resolution",
show_progress="full"
)
# 4. Generate Patch Button Click
patch_btn.click(
fn=get_ai_patch_wrapper,
inputs=[selected_issue_id, model_select],
outputs=[ai_output_display], # Output to Markdown component
api_name="generate_patch",
show_progress="full"
)
# 5. Code Editor Change (User typing) -> Send update via WebSocket
# This requires JavaScript to capture the 'change' event from the Ace editor
# instance within the code_editor component and send it over WebSocket.
# The Python backend then receives it via handle_ws_connection.
# 6. WebSocket Message (Server -> Client) -> Trigger UI Update
# This uses JavaScript to listen for WebSocket messages and update Gradio components.
# Example: Update collaborator list, trigger code editor update.
# --- JavaScript for WebSocket Communication ---
def web_socket_js(ws_port):
# Generate unique client ID for this session
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}"
logger.info(f"Generated Client ID for WebSocket: {client_id}")
# FIX: Escape all literal curly braces `{` -> `{{` and `}` -> `}}` in the JS code
# Variables like {ws_port} and {client_id} remain single-braced.
return f"""
<script>
// Ensure this runs only once per page load
if (!window.collabWs) {{
console.log('Initializing WebSocket connection...');
// Determine WebSocket URL based on environment (local vs HF Space)
let wsUrl;
if (window.location.hostname === 'localhost' || window.location.hostname === '127.0.0.1') {{
wsUrl = `ws://localhost:{ws_port}`;
}} else {{
// Assume running on HF Space or similar, use wss://
// Construct the WS URL based on the current window location (more robust)
const spaceHost = window.location.host;
wsUrl = `wss://${{spaceHost}}/ws`; // Standard path for Gradio WS proxy
console.log('Detected non-local environment, using secure WebSocket URL:', wsUrl);
}}
window.collabWs = new WebSocket(wsUrl);
window.clientId = '{client_id}'; // Store client ID globally for this session
window.aceEditorInstance = null; // Store reference to the Ace editor
window.currentIssueId = null; // Track the currently selected issue ID in the editor
window.collabWs.onopen = function(event) {{
console.log('WebSocket connection established.');
// Send join message with client ID
window.sendWsMessage({{ type: 'join', clientId: window.clientId }});
// Update collaborator list display
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = 'Connected. Waiting for status...';
}};
window.collabWs.onmessage = function(event) {{
// console.log('WebSocket message received:', event.data);
try {{
const data = JSON.parse(event.data);
// --- Collaboration Status Update ---
if (data.type === 'collaboration_status') {{
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) {{
const collaborators = data.collaborators || {{}};
const activeCollaborators = Object.entries(collaborators)
.filter(([id, info]) => id !== window.clientId); // Exclude self
if (activeCollaborators.length > 0) {{
collabListDiv.innerHTML = activeCollaborators
.map(([id, info]) => `<div class="collab-item" style="margin-bottom: 3px;">${{info.name || id}}: ${{info.status || 'Idle'}}</div>`)
.join('');
}} else {{
collabListDiv.innerHTML = 'You are the only active user.';
}}
}}
// --- Code Update from another client ---
}} else if (data.type === 'code_update') {{
console.log('Received code update delta for issue:', data.issue_num, 'from client:', data.clientId);
// Apply delta only if it's for the currently viewed issue and not from self
if (window.aceEditorInstance && data.issue_num === window.currentIssueId && data.clientId !== window.clientId) {{
try {{
const delta = JSON.parse(data.delta); // Parse the delta string
// Prevent triggering the 'change' listener we set up
window.aceEditorInstance.getSession().getDocument().applyDeltas([delta]);
console.log('Applied remote delta to Ace editor for issue:', data.issue_num);
}} catch (e) {{
console.error('Failed to parse or apply remote delta:', e, data.delta);
}}
}} else {{
console.log('Ignoring remote delta (wrong issue, self, or editor not ready). Current:', window.currentIssueId, 'Received:', data.issue_num);
}}
// --- Issue List Update Notification ---
}} else if (data.type === 'issues_updated') {{
console.log('Received issues updated notification.');
// Update status bar to prompt user action
const statusBar = document.getElementById('status_output')?.querySelector('textarea');
if (statusBar) {{
const timestamp = new Date().toLocaleTimeString();
statusBar.value = `[${{timestamp}}] Issue list updated on server. Click "Scan Repository Issues" to refresh.`;
// Manually dispatch input event for Gradio
statusBar.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
// Optionally add a more visible notification element
}}
}} catch (e) {{
console.error('Failed to parse WebSocket message or update UI:', e, event.data);
}}
}};
window.collabWs.onclose = function(event) {{
console.warn('WebSocket connection closed:', event.code, event.reason);
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Disconnected</span>';
// Implement basic reconnection logic (optional)
// setTimeout(initWebSocket, 5000); // Attempt to reconnect after 5 seconds
}};
window.collabWs.onerror = function(error) {{
console.error('WebSocket error:', error);
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Connection Error</span>';
}};
// Function to send messages
window.sendWsMessage = function(message) {{
if (window.collabWs && window.collabWs.readyState === WebSocket.OPEN) {{
window.collabWs.send(JSON.stringify(message));
}} else {{
console.error('WebSocket not connected. Cannot send message:', message);
}}
}};
// --- JS Integration with Code Editor Component ---
// Tries to find the Ace editor instance and attach listeners.
// Needs to run after the component is rendered.
function setupCodeEditorListener() {{
// Check if Ace is loaded
if (typeof ace === 'undefined') {{
console.warn('Ace editor library not found. Retrying...');
setTimeout(setupCodeEditorListener, 1000);
return;
}}
const editorElement = document.querySelector('#code_editor_component .ace_editor');
if (!editorElement) {{
console.warn('Ace editor element not found yet. Retrying...');
setTimeout(setupCodeEditorListener, 1000); // Retry if element not ready
return;
}}
// Get Ace editor instance (may vary based on component implementation)
try {{
window.aceEditorInstance = ace.edit(editorElement);
console.log('Ace Editor instance found:', window.aceEditorInstance);
}} catch (e) {{
console.error('Failed to initialize Ace editor instance:', e);
// Maybe the instance is already created and stored differently?
// Look for common patterns if direct init fails.
if (editorElement.env && editorElement.env.editor) {{
window.aceEditorInstance = editorElement.env.editor;
console.log('Found Ace Editor instance via element.env.editor');
}} else {{
console.error('Could not find Ace editor instance. Collaboration may not work.');
return; // Stop if editor cannot be found
}}
}}
if (window.aceEditorInstance) {{
console.log('Attaching change listener to Ace editor.');
window.aceEditorInstance.getSession().on('change', function(delta) {{
// Check if the change was initiated by the current user
// `aceEditor.curOp` is one way, but might not always be reliable.
// A simpler check: ignore deltas if they originate from applyDeltas (often used for remote changes)
const isUserChange = !delta.ignore; // Add an 'ignore' flag when applying remote deltas
// More robust check: Use internal flags if available
const isUserOriginated = window.aceEditorInstance.curOp && window.aceEditorInstance.curOp.command.name;
if (isUserOriginated) {{
// console.log('Code changed by user:', delta);
// Get the current issue ID from the hidden Gradio input
const issueIdInput = document.querySelector('#selected_issue_id input');
const currentIssueIdStr = issueIdInput ? issueIdInput.value : null;
window.currentIssueId = currentIssueIdStr ? parseInt(currentIssueIdStr, 10) : null;
if (window.currentIssueId !== null && !isNaN(window.currentIssueId)) {{
// Send the delta via WebSocket
window.sendWsMessage({{
type: 'code_update',
issue_num: window.currentIssueId,
delta: JSON.stringify(delta), // Send delta as JSON string
clientId: window.clientId
}});
}} else {{
// console.warn('No valid issue selected, cannot send code update.');
}}
}} else {{
// console.log('Ignoring programmatic change delta:', delta);
}}
}});
console.log('Ace editor listener attached.');
}} else {{
console.error('Ace editor instance is null after setup attempt.');
}}
}} // end setupCodeEditorListener
// --- Initialization and Observation ---
// Use MutationObserver to detect when the editor component is added/changed in the DOM.
const observerTarget = document.body; // Observe the whole body or a closer parent container
const observer = new MutationObserver((mutationsList, observer) => {{
for(const mutation of mutationsList) {{
if (mutation.type === 'childList') {{
// Check if the specific editor element we need is now present
if (document.querySelector('#code_editor_component .ace_editor')) {{
console.log("Ace editor element detected in DOM, attempting setup...");
// Debounce setup call slightly to ensure editor is fully ready
clearTimeout(window.setupEditorTimeout);
window.setupEditorTimeout = setTimeout(setupCodeEditorListener, 200);
// Optionally disconnect observer if setup only needs to happen once,
// but re-observing might be needed if component re-renders completely.
// observer.disconnect();
// break; // Found it, no need to check other mutations in this batch
}}
}}
}}
}});
// Start observing the target node for configured mutations
if(observerTarget) {{
console.log("Starting MutationObserver to detect code editor element.");
observer.observe(observerTarget, {{ childList: true, subtree: true }});
}}
// Initial attempt to set up listener after a short delay, in case element exists on load
setTimeout(setupCodeEditorListener, 500);
}} else {{
console.log('WebSocket connection appears to be already initialized.');
}}
</script>
"""
# Inject the JavaScript into the Gradio app when it loads
app.load(_js=web_socket_js(WS_PORT), fn=None, inputs=None, outputs=None)
return app
# ========== WebSocket Server Logic ==========
async def handle_ws_connection(websocket: websockets.WebSocketServerProtocol, path: str, manager: IssueManager):
"""Handles incoming WebSocket connections and messages."""
client_id = None # Initialize client_id for this connection
manager.ws_clients.append(websocket)
logger.info(f"WebSocket client connected: {websocket.remote_address} (Total: {len(manager.ws_clients)})")
try:
async for message in websocket:
try:
data = json.loads(message)
msg_type = data.get("type")
# logger.debug(f"Received WS message: {data}") # Log received message content
if msg_type == "join":
client_id = data.get("clientId", f"anon_{websocket.id}")
setattr(websocket, 'client_id', client_id) # Associate ID with socket object
manager.collaborators[client_id] = {"name": client_id, "status": "Connected"} # Add to collaborators
logger.info(f"Client {client_id} joined.")
# Trigger immediate broadcast in case it's needed, but don't await
asyncio.create_task(manager.broadcast_collaboration_status())
elif msg_type == "code_update":
issue_num = data.get("issue_num")
delta = data.get("delta")
sender_id = data.get("clientId") # ID of the client who sent the update
if issue_num is not None and delta and sender_id:
# Pass client_id to handler to avoid broadcasting back to sender
await manager.handle_code_editor_update(issue_num, delta, sender_id)
else:
logger.warning(f"Invalid code_update message received: {data}")
elif msg_type == "status_update": # Client updates their status
sender_id = data.get("clientId")
status = data.get("status", "Idle")
if sender_id and sender_id in manager.collaborators:
manager.collaborators[sender_id]["status"] = status
logger.info(f"Client {sender_id} status updated: {status}")
# Trigger broadcast, don't await
asyncio.create_task(manager.broadcast_collaboration_status())
else:
logger.warning(f"Unknown WebSocket message type received: {msg_type} from {client_id or websocket.remote_address}")
except json.JSONDecodeError:
logger.error(f"Received invalid JSON over WebSocket from {client_id or websocket.remote_address}: {message}")
except Exception as e:
logger.exception(f"Error processing WebSocket message from {client_id or websocket.remote_address}: {e}")
except ConnectionClosed as e:
logger.info(f"WebSocket client {client_id or websocket.remote_address} disconnected: (Code: {e.code}, Reason: {e.reason})")
except Exception as e:
logger.exception(f"Unexpected error in WebSocket handler for {client_id or websocket.remote_address}: {e}")
finally:
logger.info(f"Cleaning up connection for client {client_id or websocket.remote_address}")
# Use the safe removal method, ensuring it runs in the correct context if needed
manager.remove_ws_client(websocket)
# Trigger a final status broadcast
asyncio.create_task(manager.broadcast_collaboration_status())
async def start_websocket_server(manager: IssueManager, port: int):
"""Starts the WebSocket server."""
# Pass manager instance to the connection handler factory
handler_with_manager = lambda ws, path: handle_ws_connection(ws, path, manager)
try:
# Set ping interval and timeout to keep connections alive and detect broken ones
server = await websockets.serve(
handler_with_manager,
"0.0.0.0", # Bind to all interfaces
port,
ping_interval=20, # Send pings every 20 seconds
ping_timeout=20 # Wait 20 seconds for pong response
)
logger.info(f"WebSocket server started on ws://0.0.0.0:{port}")
await asyncio.Future() # Run forever until cancelled
except OSError as e:
logger.error(f"Failed to start WebSocket server on port {port}: {e}. Port might be in use.")
# Exit or handle error appropriately
raise
except Exception as e:
logger.exception(f"Unexpected error starting WebSocket server: {e}")
raise
def run_webhook_server(manager: IssueManager, port: int):
"""Starts the HTTP webhook server in a separate thread."""
WebhookHandler.manager_instance = manager # Pass manager instance to the class
server_address = ("0.0.0.0", port) # Bind to all interfaces
try:
httpd = HTTPServer(server_address, WebhookHandler)
logger.info(f"Webhook HTTP server started on [http://0.0.0.0](http://0.0.0.0):{port}")
httpd.serve_forever()
except OSError as e:
logger.error(f"Failed to start Webhook server on port {port}: {e}. Port might be in use.")
# Exit or handle error appropriately
# Consider signaling the main thread to stop
except Exception as e:
logger.exception(f"Unexpected error in Webhook server: {e}")
# ========== Main Execution ==========
if __name__ == "__main__":
# --- Setup ---
manager = IssueManager()
main_event_loop = asyncio.get_event_loop()
manager.main_loop = main_event_loop # Ensure manager has loop reference if needed
# --- Start Background Servers ---
# 1. Webhook Server (HTTP) - Runs in its own thread
webhook_thread = threading.Thread(target=run_webhook_server, args=(manager, WEBHOOK_PORT), daemon=True)
webhook_thread.start()
# 2. WebSocket Server & Broadcast Task (Asyncio) - Run in main thread's event loop
async def main_async_tasks():
# Start the periodic broadcast task
broadcast_task = asyncio.create_task(manager.broadcast_collaboration_status())
# Start the WebSocket server
websocket_server_task = asyncio.create_task(start_websocket_server(manager, WS_PORT))
# Keep tasks running
await asyncio.gather(broadcast_task, websocket_server_task, return_exceptions=True)
# --- Create Gradio App ---
# Must be created before starting the loop if it interacts with async functions directly
app = create_ui(manager)
# --- Launch Gradio App & Async Tasks ---
# Gradio's launch method handles the event loop integration when run directly.
# It will run the asyncio tasks alongside the FastAPI/Uvicorn server.
# app.queue() # Enable queue for handling multiple requests/long-running tasks
# app.launch(
# share=True, # Enable for public access (use with caution)
# server_name="0.0.0.0", # Bind to all interfaces for accessibility
# server_port=7860, # Default Gradio port
# favicon_path="[https://huggingface.co/front/assets/huggingface_logo-noborder.svg](https://huggingface.co/front/assets/huggingface_logo-noborder.svg)",
# Let Gradio manage the asyncio loop
# asyncio_task=main_async_tasks() # This might conflict with launch's loop management
# If launch() blocks, the asyncio tasks need to be run differently,
# potentially starting the loop manually before launch or using threading
# for the asyncio part if launch() isn't compatible.
# However, modern Gradio often handles this integration better.
# Start the asyncio tasks *after* defining the app but *before* launch blocks,
# or ensure launch itself runs them. Gradio's `launch` usually handles the loop.
# Let's rely on Gradio's launch to manage the loop and potentially run our tasks.
# If WS/broadcast doesn't work, we may need to manually manage the loop/threading.
# A common pattern if launch() blocks and doesn't run background async tasks:
asyncio_thread = threading.Thread(target=lambda: asyncio.run(main_async_tasks()), daemon=True)
asyncio_thread.start()
app.launch(share=True, server_name="0.0.0.0", server_port=7860, favicon_path="https://huggingface.co/front/assets/huggingface_logo-noborder.svg")
logger.info("Gradio app launched. Webhook server running in background thread.")
# The asyncio tasks (WebSocket, broadcast) should be running via Gradio's event loop.