GitBot / app.py
acecalisto3's picture
Update app.py
6c6f5bc verified
import gradio as gr
import os
import aiohttp
import asyncio
from git import Repo, GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from pathlib import Path
from datetime import datetime, timedelta, timezone
import shutil
import json
import logging
import re
from typing import Dict, List, Optional, Tuple, Any
import subprocess
import plotly.express as px
import plotly.graph_objects as go
import time
import random
import pandas as pd
from collections import Counter
import string
from concurrent.futures import ThreadPoolExecutor
from hdbscan import HDBSCAN
# --- Required Imports ---
import hashlib
from functools import lru_cache
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import markdown2
import websockets
from websockets.server import WebSocketServerProtocol
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError, WebSocketException
import signal # For graceful shutdown
# ---------------------
# Assuming code_editor is available, e.g., installed via pip or included locally
try:
from code_editor import code_editor
except ImportError:
logging.error("The 'code_editor' Gradio component is not installed or available.")
logging.error("Please install it, e.g., 'pip install gradio_code_editor'")
def code_editor(*args, **kwargs):
logging.warning("Using dummy code_editor. Code editing and collaboration will not function.")
# Create a dummy component that looks like a Textbox but is non-interactive
return gr.Textbox(label=kwargs.get('label', 'Code Editor (Unavailable)'), interactive=False, value="Error: Code editor component not found. Install 'gradio_code_editor'.", lines=10)
# ========== Configuration ==========
WORKSPACE = Path("./issue_workspace")
WORKSPACE.mkdir(exist_ok=True)
GITHUB_API = "https://api.github.com/repos"
HF_INFERENCE_API = "https://api-inference.huggingface.co/models"
WEBHOOK_PORT = int(os.environ.get("WEBHOOK_PORT", 8000))
WS_PORT = int(os.environ.get("WS_PORT", 8001))
GRADIO_PORT = int(os.environ.get("GRADIO_PORT", 7860))
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Set a higher logging level for libraries that are too verbose, e.g.:
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("aiohttp").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING) # Might be used by git or other libs
logging.getLogger("git").setLevel(logging.WARNING)
logging.getLogger("hdbscan").setLevel(logging.WARNING) # HDBSCAN can be chatty
# Use ThreadPoolExecutor for any synchronous blocking operations if needed,
# but most heavy lifting (API calls, git) is now async.
executor = ThreadPoolExecutor(max_workers=4)
# Example HF models (replace with your actual models)
# Ensure these models are suitable for the tasks (text generation, embeddings)
HF_MODELS = {
"Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"Llama-2-7B-chat": "huggingface/llama-2-7b-chat-hf",
"CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf",
"Gemma-7B-it": "google/gemma-7b-it", # Added another option
}
# Embedding model ID - fixed, not user selectable
HF_EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
# Select default models defensively
DEFAULT_MODEL_KEY = "Mistral-8x7B" if "Mistral-8x7B" in HF_MODELS else (list(HF_MODELS.keys())[0] if HF_MODELS else None)
DEFAULT_MODEL_ID = HF_MODELS.get(DEFAULT_MODEL_KEY, None)
DEFAULT_IDLE_MODEL_KEY = "Gemma-7B-it" if "Gemma-7B-it" in HF_MODELS else DEFAULT_MODEL_KEY # Prefer a smaller one if available
DEFAULT_IDLE_MODEL_ID = HF_MODELS.get(DEFAULT_IDLE_MODEL_KEY, DEFAULT_MODEL_ID)
if not HF_MODELS:
logger.critical("No HF models configured! AI features will be disabled.")
elif DEFAULT_MODEL_ID is None:
logger.critical(f"Default model key '{DEFAULT_MODEL_KEY}' not found in configured models. AI features may be limited.")
if DEFAULT_IDLE_MODEL_ID is None:
logger.warning(f"Idle model key '{DEFAULT_IDLE_MODEL_KEY}' not found or no models configured. Idle tasks may be disabled or use the default model if available.")
# --- Idle State Configuration ---
STALE_ISSUE_THRESHOLD_DAYS = 30
MAX_SUMMARY_COMPUTATIONS_PER_CYCLE = 2
MAX_CONTEXT_COMPUTATIONS_PER_CYCLE = 3
MAX_MISSING_INFO_COMPUTATIONS_PER_CYCLE = 1
MAX_ANALYSIS_COMPUTATIONS_PER_CYCLE = 1
RECLUSTER_THRESHOLD = 5 # Number of significant webhook changes before re-clustering is flagged
IDLE_PROCESSING_INTERVAL_SECONDS = 60.0 # How often the idle task loop runs
# ========== Placeholder OTCodeEditor Class ==========
# WARNING: This is a placeholder and DOES NOT implement Operational Transformation.
# Concurrent edits WILL lead to data loss or inconsistencies.
class OTCodeEditor:
"""
Placeholder for an Operational Transformation (OT) enabled code editor backend.
This implementation is NOT thread-safe and does NOT handle concurrent edits correctly.
It merely logs received deltas and maintains a basic revision counter.
The actual document state is held client-side by the Gradio code_editor component.
A real collaborative editor would require a robust OT backend to manage
the authoritative document state and transform operations.
"""
def __init__(self, initial_value: Dict[str, str]):
# In a real OT system, this would initialize the document state
# For this placeholder, we just store the initial files dict
self.files: Dict[str, str] = initial_value.copy()
self.revision = 0 # Basic revision counter, not used for OT logic
logger.debug(f"OTCodeEditor initialized with files: {list(self.files.keys())}")
def apply_delta(self, delta: Dict[str, Any]):
# VERY basic placeholder: This logs the delta but does NOT perform OT.
# It does NOT handle concurrent edits safely.
# In a real OT system, this would transform the delta against the current state
# and apply it, incrementing the revision based on successful application.
logger.warning(f"Placeholder apply_delta called. Delta: {str(delta)[:200]}. "
"WARNING: Full Operational Transformation is NOT implemented. Concurrent edits are UNSAFE.")
# The Gradio component holds the actual text state client-side.
# A real OT backend would use the delta to update its authoritative state.
# We only increment revision for basic tracking visibility if needed.
self.revision += 1
def get_content(self) -> Dict[str, str]:
# This is not used by the current Gradio code_editor integration,
# as the component holds the state client-side.
# In a real OT system, this would return the current authoritative document state.
return self.files.copy()
# ========== Modern Theme ==========
try:
theme = gr.themes.Soft(
primary_hue="violet",
secondary_hue="emerald",
radius_size="lg",
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"]
).set(
button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)",
button_primary_text_color="white",
block_label_text_size="lg",
block_label_text_weight="600",
block_title_text_size="lg",
block_title_text_weight="800",
panel_background_fill="white",
block_shadow="*shadow_drop_lg",
# Add some more modern touches
input_background_fill="#f9fafb",
input_border_color="#e5e7eb",
input_border_radius="md",
button_secondary_background_fill="#f3f4f6",
button_secondary_text_color="#374151",
button_secondary_border_color="#d1d5db",
table_border_color="#e5e7eb",
table_row_background_even="#f9fafb",
table_row_background_odd="#ffffff",
# Use slightly softer colors for plots if default is too bright
# (This might need specific Plotly config instead of theme)
)
except AttributeError as e:
logger.warning(f"Could not apply all theme settings (might be Gradio version difference): {e}. Using default Soft theme.")
theme = gr.themes.Soft()
# Additional UI/UX Enhancements
custom_css = """
/* Smooth transitions for buttons and inputs */
button, input, select, textarea {
transition: background-color 0.3s ease, color 0.3s ease, border-color 0.3s ease;
}
/* Hover effects for buttons */
button:hover {
filter: brightness(1.1);
}
/* Focus styles for inputs */
input:focus, select:focus, textarea:focus {
outline: none;
border-color: #8B5CF6;
box-shadow: 0 0 5px rgba(139, 92, 246, 0.5);
}
/* Scrollbar styling for scrollable divs */
#issue_preview_div::-webkit-scrollbar,
#ai_output_md::-webkit-scrollbar {
width: 8px;
}
#issue_preview_div::-webkit-scrollbar-thumb,
#ai_output_md::-webkit-scrollbar-thumb {
background-color: #8B5CF6;
border-radius: 4px;
}
/* Responsive layout improvements */
@media (max-width: 768px) {
#config-panel {
flex-direction: column !important;
}
#issue_list_df {
font-size: 0.9em !important;
}
}
/* Enhanced table styles */
.gradio-dataframe-container table {
border-collapse: separate !important;
border-spacing: 0 8px !important;
}
.gradio-dataframe-container table tr {
background-color: #f9fafb !important;
border-radius: 8px !important;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.gradio-dataframe-container table tr:hover {
background-color: #ede9fe !important;
cursor: pointer;
}
/* Button styles */
.gr-button {
border-radius: 8px !important;
font-weight: 600 !important;
}
/* Accordion header styles */
#ai_tools_accordion > .label {
font-weight: 700 !important;
font-size: 1.1em !important;
color: #6b21a8 !important;
}
/* Status bar improvements */
#status_output_txt textarea {
background-color: #f3f4f6 !important;
border-radius: 6px !important;
font-family: 'Fira Code', monospace !important;
font-size: 0.95em !important;
color: #4b5563 !important;
}
/* Code editor improvements */
#code_editor_component {
border: 1px solid #ddd !important;
border-radius: 8px !important;
box-shadow: 0 2px 6px rgba(139, 92, 246, 0.15);
}
/* Plot improvements */
#stats_plot_viz, #analytics_severity_plot {
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
background-color: white;
}
"""
# ========== Enhanced Webhook Handler ==========
class WebhookHandler(BaseHTTPRequestHandler):
manager_instance: Optional['IssueManager'] = None
main_loop: Optional[asyncio.AbstractEventLoop] = None # Store reference to the main asyncio loop
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
self.send_response(400)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Empty payload")
logger.warning("Received empty webhook payload.")
return
try:
payload_bytes = self.rfile.read(content_length)
payload = json.loads(payload_bytes.decode('utf-8'))
except json.JSONDecodeError:
logger.error(f"Invalid JSON payload received: {payload_bytes[:500]}")
self.send_response(400)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Invalid JSON payload")
return
except Exception as e:
logger.error(f"Error reading webhook payload: {e}")
self.send_response(500)
self.end_headers()
return
event = self.headers.get('X-GitHub-Event')
delivery_id = self.headers.get('X-GitHub-Delivery')
logger.info(f"Received GitHub webhook event: {event} (Delivery ID: {delivery_id})")
if event == 'issues' and WebhookHandler.manager_instance and WebhookHandler.main_loop:
action = payload.get('action')
logger.info(f"Issue action: {action}")
# Handle common actions that affect issue state or content
if action in ['opened', 'reopened', 'closed', 'assigned', 'unassigned', 'edited', 'labeled', 'unlabeled', 'milestoned', 'demilestoned']:
# Check if the loop is running before scheduling
if WebhookHandler.main_loop.is_running():
# Schedule the async handler to run in the main event loop
# Use run_coroutine_threadsafe as this is called from a different thread
asyncio.run_coroutine_threadsafe(
WebhookHandler.manager_instance.handle_webhook_event(event, action, payload),
WebhookHandler.main_loop
)
logger.debug(f"Scheduled webhook processing for action '{action}' in main loop.")
else:
logger.error("Asyncio event loop is not running in the target thread for webhook. Cannot process event.")
# Respond with an error as processing couldn't be scheduled
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Async processing loop not available.")
return
else:
logger.info(f"Webhook action '{action}' received but not actively handled by current logic.")
elif event == 'ping':
logger.info("Received GitHub webhook ping.")
else:
logger.warning(f"Unhandled event type: {event} or manager/loop not initialized.")
# Always respond 200 OK for successful receipt, even if action is ignored
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
# ========== AI-Powered Issue Manager ==========
class IssueManager:
def __init__(self):
self.issues: Dict[int, dict] = {}
self.repo_url: Optional[str] = None
self.repo_owner: Optional[str] = None
self.repo_name: Optional[str] = None
self.repo_local_path: Optional[Path] = None
self.repo: Optional[Repo] = None
self.github_token: Optional[str] = None
self.hf_token: Optional[str] = None
self.collaborators: Dict[str, dict] = {} # {client_id: {name: str, status: str}}
self.points: int = 0 # Placeholder for potential gamification
self.severity_rules: Dict[str, List[str]] = {
"Critical": ["critical", "urgent", "security", "crash", "blocker", "p0", "s0"],
"High": ["high", "important", "error", "regression", "major", "p1", "s1"],
"Medium": ["medium", "bug", "performance", "minor", "p2", "s2"],
"Low": ["low", "documentation", "enhancement", "trivial", "feature", "p3", "s3", "chore", "refactor", "question", "help wanted"]
}
self.issue_clusters: Dict[int, List[int]] = {} # {cluster_id: [issue_index_in_list, ...]}
self.issue_list_for_clustering: List[dict] = [] # List of issue dicts used for the last clustering run
self.ws_clients: List[WebSocketServerProtocol] = []
self.code_editors: Dict[int, OTCodeEditor] = {} # {issue_id: OTCodeEditor_instance}
# Get or create the loop in the thread where the manager is initialized (main thread)
try:
self.main_loop = asyncio.get_running_loop()
logger.debug(f"IssueManager found running asyncio loop: {id(self.main_loop)}")
except RuntimeError:
self.main_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.main_loop)
logger.debug(f"IssueManager created and set new asyncio loop: {id(self.main_loop)}")
self.broadcast_task: Optional[asyncio.Task] = None
self.idle_task: Optional[asyncio.Task] = None
# --- State for Idle Processing Results ---
self.precomputed_context: Dict[int, Dict[str, Any]] = {} # {issue_id: {content: str, files: list, error: str, timestamp: float}}
self.precomputed_summaries: Dict[int, Dict[str, Any]] = {} # {issue_id: {summary: str, error: str, timestamp: float}}
self.precomputed_missing_info: Dict[int, Dict[str, Any]] = {} # {issue_id: {info_needed: str, error: str, timestamp: float}}
self.precomputed_analysis: Dict[int, Dict[str, Any]] = {} # {issue_id: {hypothesis: str, error: str, timestamp: float}}
# self.code_embeddings: Dict[str, List[float]] = {} # Not currently used after clustering
self.potential_duplicates: Dict[int, List[int]] = {} # {issue_id: [duplicate_issue_id, ...]}
self.stale_issues: List[int] = [] # [issue_id, ...]
self.high_priority_candidates: List[int] = [] # [issue_id, ...]
self.last_webhook_time: float = time.time() # Track last webhook for potential future use
self.needs_recluster: bool = False
self._webhook_change_count = 0
# --- Configuration for Idle Tasks ---
self.idle_processing_interval = IDLE_PROCESSING_INTERVAL_SECONDS
self.max_context_computations_per_cycle = MAX_CONTEXT_COMPUTATIONS_PER_CYCLE
self.max_summary_computations_per_cycle = MAX_SUMMARY_COMPUTATIONS_PER_CYCLE
self.max_missing_info_computations_per_cycle = MAX_MISSING_INFO_COMPUTATIONS_PER_CYCLE
self.max_analysis_computations_per_cycle = MAX_ANALYSIS_COMPUTATIONS_PER_CYCLE
self.stale_issue_threshold_days = STALE_ISSUE_THRESHOLD_DAYS
self.recluster_threshold = RECLUSTER_THRESHOLD
# Shutdown signals (placeholders, set by main execution block)
self.stop_ws_server = None
self.stop_webhook_server = None
def start_broadcast_loop(self):
"""Starts the periodic broadcast task."""
# Ensure task is created in the correct loop
if not self.main_loop.is_running():
logger.error("Cannot start broadcast loop: Main event loop is not running.")
return
if not self.broadcast_task or self.broadcast_task.done():
self.broadcast_task = self.main_loop.create_task(self.broadcast_collaboration_status())
logger.info("Started collaboration status broadcast loop.")
else:
logger.debug("Broadcast loop already running.")
def stop_broadcast_loop(self):
"""Stops the periodic broadcast task."""
if self.broadcast_task and not self.broadcast_task.done():
logger.info("Stopping collaboration status broadcast loop...")
self.broadcast_task.cancel()
# Await the task to finish cancellation in an async context if needed,
# but cancelling is sufficient to signal it to stop.
self.broadcast_task = None # Clear the reference
def _get_issue_hash(self, issue_data: Optional[dict]) -> str:
"""Generates a hash based on key issue content for caching AI suggestions."""
if not issue_data:
return "empty_issue_hash" # Handle cases where issue_data is None
content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}{','.join(issue_data.get('labels',[]))}"
return hashlib.md5(content.encode('utf-8', errors='ignore')).hexdigest() # Use utf-8 and ignore errors
@lru_cache(maxsize=100)
async def cached_suggestion(self, issue_hash: str, model_key: str) -> str:
"""Retrieves or generates an AI suggestion, using an LRU cache based on issue content hash."""
logger.debug(f"Checking cache for suggestion: hash={issue_hash}, model={model_key}")
# The cache decorator handles the cache hit/miss logic.
# If it's a miss, the decorated function body is executed.
# Find the issue data corresponding to the hash
found_issue = None
# This linear scan is inefficient for many issues, but hashes are only for cache keys.
# A dict mapping hashes to issue IDs could be more efficient if this becomes a bottleneck.
# However, issues dict is not huge usually, so this is likely fine.
for issue in self.issues.values():
if self._get_issue_hash(issue) == issue_hash:
found_issue = issue
break
if not found_issue:
logger.error(f"Could not find issue data for hash {issue_hash} in current state. Suggestion cannot be generated.")
# Corrected: Return error message here if issue data is missing
return "Error: Issue data for this suggestion request (hash) not found in current state. The issue might have been updated or closed. Please re-select the issue."
if model_key not in HF_MODELS or HF_MODELS.get(model_key) is None:
logger.error(f"Invalid or unconfigured model key requested: {model_key}")
return f"Error: Invalid or unconfigured model key: {model_key}"
logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model_key}.")
# Call the actual suggestion generation function
return await self.suggest_resolution(found_issue, model_key)
async def handle_webhook_event(self, event: str, action: str, payload: dict):
"""Processes incoming webhook events to update the issue state."""
logger.info(f"Processing webhook event: {event}, action: {action}")
issue_data = payload.get('issue')
repo_data = payload.get('repository')
if not issue_data or not repo_data:
logger.warning("Webhook payload missing 'issue' or 'repository' data.")
return
event_repo_url = repo_data.get('html_url')
# Only process events for the currently loaded repository
# Use .rstrip("/") on both sides for robust comparison
if self.repo_url is None or event_repo_url is None or event_repo_url.rstrip("/") != self.repo_url.rstrip("/"):
logger.info(f"Ignoring webhook event for different repository: {event_repo_url} (Current: {self.repo_url})")
return
issue_number = issue_data.get('number')
if issue_number is None: # Check explicitly for None
logger.warning("Webhook issue data missing 'number'.")
return
needs_ui_update = False
significant_change = False # Flag for changes affecting clustering/content/AI caches
if action == 'closed':
logger.info(f"Webhook: Removing closed issue {issue_number} from active list.")
if issue_number in self.issues:
self.issues.pop(issue_number)
needs_ui_update = True
significant_change = True # Closing is a significant change
# Clean up associated cached/computed data
self.precomputed_context.pop(issue_number, None)
self.precomputed_summaries.pop(issue_number, None)
self.precomputed_missing_info.pop(issue_number, None)
self.precomputed_analysis.pop(issue_number, None)
self.potential_duplicates.pop(issue_number, None)
# Remove from lists if present (use list comprehension for safe removal)
self.stale_issues = [i for i in self.stale_issues if i != issue_number]
self.high_priority_candidates = [i for i in self.high_priority_candidates if i != issue_number]
# Remove the code editor instance for the closed issue
self.code_editors.pop(issue_number, None)
logger.debug(f"Cleaned up state for closed issue {issue_number}.")
else:
logger.debug(f"Webhook: Issue {issue_number} closed, but not found in current active list. No state change needed.")
elif action in ['opened', 'reopened', 'edited', 'assigned', 'unassigned', 'labeled', 'unlabeled', 'milestoned', 'demilestoned']:
logger.info(f"Webhook: Adding/Updating issue {issue_number} (action: {action}).")
processed_data = self._process_issue_data(issue_data)
old_issue = self.issues.get(issue_number)
# Check for changes that impact AI suggestions or clustering
if not old_issue or \
old_issue.get('body') != processed_data.get('body') or \
old_issue.get('title') != processed_data.get('title') or \
set(old_issue.get('labels', [])) != set(processed_data.get('labels', [])):
significant_change = True
logger.info(f"Significant change detected for issue {issue_number} (content/labels).")
# Invalidate ALL precomputed AI state on significant edit
self.precomputed_context.pop(issue_number, None)
self.precomputed_summaries.pop(issue_number, None)
self.precomputed_missing_info.pop(issue_number, None)
self.precomputed_analysis.pop(issue_number, None)
# Clear the entire suggestion cache on significant change
self.cached_suggestion.cache_clear()
logger.debug("Cleared suggestion cache due to significant issue change.")
# Check if state-related fields changed (affecting idle processing lists)
# This check is for logging/debugging, the idle loop re-evaluates lists anyway
if not old_issue or \
old_issue.get('updated_at') != processed_data.get('updated_at') or \
old_issue.get('assignee') != processed_data.get('assignee') or \
set(old_issue.get('labels', [])) != set(processed_data.get('labels', [])) or \
old_issue.get('state') != processed_data.get('state'): # State change (open/reopened)
logger.debug(f"State-related change detected for issue {issue_number} (update time, assignee, labels, state). Idle loop will re-evaluate.")
self.issues[issue_number] = processed_data
needs_ui_update = True
else:
logger.info(f"Ignoring webhook action '{action}' for issue {issue_number} (already filtered).")
# --- Track changes for idle processing ---
if needs_ui_update:
self.last_webhook_time = time.time()
if significant_change:
self._increment_change_counter()
# Rebuild the list used for clustering immediately if a significant change occurred
# This list is a snapshot used by the async clustering task
self.issue_list_for_clustering = list(self.issues.values())
logger.info("Issue list for clustering updated due to significant webhook change.")
# Broadcast UI update notification
# Schedule this in the main loop using call_soon_threadsafe
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(asyncio.create_task, self.broadcast_issue_update())
logger.debug("Scheduled issue update broadcast.")
else:
logger.warning("Main loop not running, cannot broadcast issue update.")
def _increment_change_counter(self):
"""Increments change counter and sets recluster flag if threshold reached."""
self._webhook_change_count += 1
logger.debug(f"Significant change detected. Change count: {self._webhook_change_count}/{self.recluster_threshold}")
if self._webhook_change_count >= self.recluster_threshold:
self.needs_recluster = True
logger.info(f"Change threshold ({self.recluster_threshold}) reached. Flagging for re-clustering.")
def _process_issue_data(self, issue_data: dict) -> dict:
"""Helper to structure issue data consistently."""
return {
"id": issue_data.get('number'), # Use .get for safety
"title": issue_data.get('title', 'No Title Provided'),
"body": issue_data.get('body', ''),
"state": issue_data.get('state', 'unknown'),
"labels": sorted([label.get('name', '') for label in issue_data.get('labels', []) if isinstance(label, dict) and label.get('name')]), # Ensure labels are dicts and have name
"assignee": issue_data.get('assignee', {}).get('login') if issue_data.get('assignee') and isinstance(issue_data.get('assignee'), dict) else None,
"url": issue_data.get('html_url', '#'),
"created_at": issue_data.get('created_at'),
"updated_at": issue_data.get('updated_at'),
}
async def crawl_issues(self, repo_url: str, github_token: Optional[str], hf_token: Optional[str]) -> Tuple[List[List], go.Figure, str, go.Figure]:
"""
Crawls issues, resets state, clones repo, clusters, starts background tasks.
Returns dataframe data, stats plot, status message, and analytics plot.
"""
# Strip whitespace from inputs
repo_url = repo_url.strip() if repo_url else None
github_token = github_token.strip() if github_token else None
hf_token = hf_token.strip() if hf_token else None
# Define a default empty plot for consistent return type
def get_empty_plot(title="Plot"):
fig = go.Figure()
fig.update_layout(title=title, xaxis={"visible": False}, yaxis={"visible": False},
annotations=[{"text": "Scan needed.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}],
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)')
return fig
if not repo_url or not hf_token:
logger.error("Repository URL and Hugging Face Token are required.")
empty_fig = get_empty_plot("Issue Severity Distribution")
return [], empty_fig, "Error: Repository URL and Hugging Face Token are required.", empty_fig
logger.info(f"Starting new issue crawl and setup for {repo_url}")
# --- Reset Manager State ---
# Stop background tasks first
self.stop_idle_processing()
self.stop_broadcast_loop()
self.issues = {}
# Clear code_editors instances
self.code_editors = {}
self.issue_clusters = {}
self.issue_list_for_clustering = []
self.cached_suggestion.cache_clear() # Clear AI suggestion cache
self.precomputed_context = {}
self.precomputed_summaries = {}
self.precomputed_missing_info = {}
self.precomputed_analysis = {}
# self.code_embeddings = {} # Not used
self.potential_duplicates = {}
self.stale_issues = []
self.high_priority_candidates = []
self.needs_recluster = False
self._webhook_change_count = 0
self.last_webhook_time = time.time()
self.repo = None # Clear the repo object
self.repo_url = repo_url
self.github_token = github_token
self.hf_token = hf_token
logger.info("Internal state reset for new crawl.")
# --- Repository Cloning/Updating ---
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", self.repo_url)
if not match:
logger.error(f"Invalid GitHub URL format: {self.repo_url}")
empty_fig = get_empty_plot("Issue Severity Distribution")
return [], empty_fig, "Error: Invalid GitHub URL format. Use https://github.com/owner/repo", empty_fig
self.repo_owner, self.repo_name = match.groups()
self.repo_local_path = WORKSPACE / f"{self.repo_owner}_{self.repo_name}"
try:
if self.repo_local_path.exists():
logger.info(f"Attempting to update existing repository clone at {self.repo_local_path}")
try:
self.repo = Repo(self.repo_local_path)
# Ensure the origin remote matches the requested URL
if not self.repo.remotes or 'origin' not in self.repo.remotes:
logger.warning(f"Existing repo at {self.repo_local_path} has no 'origin' remote. Re-cloning.")
if self.repo_local_path.exists(): shutil.rmtree(self.repo_local_path)
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}"))
else:
origin = self.repo.remotes.origin
remote_url = next((u for u in origin.urls), None) # Get first URL
expected_urls = {self.repo_url, self.repo_url + ".git"}
if remote_url not in expected_urls:
logger.warning(f"Existing repo path {self.repo_local_path} has different remote URL ('{remote_url}' vs '{self.repo_url}'). Removing and re-cloning.")
# Remove the directory entirely before re-cloning
if self.repo_local_path.exists(): shutil.rmtree(self.repo_local_path)
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}"))
else:
logger.info("Pulling latest changes...")
# Use a timeout for pull operations
try:
# Fetch first to get latest refs
origin.fetch(progress=lambda op, cur, tot, msg: logger.debug(f"Fetch progress: {msg}"), timeout=120)
# Then pull
origin.pull(progress=lambda op, cur, tot, msg: logger.debug(f"Pull progress: {msg}"), timeout=120)
# Unshallow if necessary
if self.repo.git.rev_parse('--is-shallow-repository').strip() == 'true':
logger.info("Repository is shallow, unshallowing...")
# Use a timeout for unshallow
self.repo.git.fetch('--unshallow', timeout=300)
except GitCommandError as pull_err:
logger.error(f"Git pull/fetch error: {pull_err}. Proceeding with potentially stale local copy.")
except Exception as pull_err:
logger.exception(f"Unexpected error during git pull/fetch: {pull_err}. Proceeding with potentially stale local copy.")
except (InvalidGitRepositoryError, NoSuchPathError):
logger.warning(f"Invalid or missing Git repository at {self.repo_local_path}. Re-cloning.")
# Ensure directory is clean before re-cloning
if self.repo_local_path.exists(): shutil.rmtree(self.repo_local_path)
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}"))
except GitCommandError as git_err:
logger.error(f"Git operation error during update: {git_err}. Trying to proceed with existing copy, but it might be stale.")
if not self.repo: # If repo object wasn't successfully created before the error
try: self.repo = Repo(self.repo_local_path)
except Exception: logger.error("Failed to even load existing repo after git error.")
except Exception as e:
logger.exception(f"An unexpected error occurred during repository update check: {e}")
# If repo object wasn't successfully created before the error
if not self.repo:
try: self.repo = Repo(self.repo_local_path)
except Exception: logger.error("Failed to even load existing repo after update error.")
else:
logger.info(f"Cloning repository {self.repo_url} to {self.repo_local_path}")
# Use a timeout for the initial clone
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}"), timeout=300)
logger.info("Repository clone/update process finished.")
if not self.repo:
raise Exception("Repository object could not be initialized after cloning/update.")
except GitCommandError as e:
logger.error(f"Failed to clone/update repository: {e}")
empty_fig = get_empty_plot("Issue Severity Distribution")
empty_fig.update_layout(annotations=[{"text": "Repo Error.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, f"Error cloning/updating repository: {e}. Check URL, permissions, and network.", empty_fig
except Exception as e:
logger.exception(f"An unexpected error occurred during repository handling: {e}")
empty_fig = get_empty_plot("Issue Severity Distribution")
empty_fig.update_layout(annotations=[{"text": "Repo Error.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, f"An unexpected error occurred during repo setup: {e}", empty_fig
# --- Issue Fetching ---
api_url = f"{GITHUB_API}/{self.repo_owner}/{self.repo_name}/issues?state=open&per_page=100"
headers = {"Accept": "application/vnd.github.v3+json"}
if github_token:
headers["Authorization"] = f"token {github_token}"
try:
all_issues_data = []
page = 1
logger.info(f"Fetching open issues from GitHub API (repo: {self.repo_owner}/{self.repo_name})...")
async with aiohttp.ClientSession(headers=headers) as session:
while True:
paginated_url = api_url
logger.debug(f"Fetching URL: {paginated_url}")
# Use a timeout for API requests
try:
async with session.get(paginated_url, timeout=30) as response:
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining')
rate_limit_reset = response.headers.get('X-RateLimit-Reset')
logger.debug(f"GitHub API Response Status: {response.status}, RateLimit Remaining: {rate_limit_remaining}, Reset: {rate_limit_reset}")
if response.status == 403 and rate_limit_remaining == '0':
reset_time = int(rate_limit_reset) if rate_limit_reset else time.time() + 60
wait_time = max(reset_time - time.time() + 5, 0) # Wait until reset time + a buffer
logger.warning(f"GitHub API rate limit exceeded. Waiting until {datetime.fromtimestamp(reset_time).strftime('%H:%M:%S')} ({wait_time:.0f}s)")
await asyncio.sleep(wait_time)
continue # Retry the same page
response.raise_for_status() # Raise for other 4xx/5xx errors
issues_page_data = await response.json()
if not issues_page_data: break # No more issues on this page
logger.info(f"Fetched page {page} with {len(issues_page_data)} items.")
all_issues_data.extend(issues_page_data)
link_header = response.headers.get('Link')
if link_header and 'rel="next"' in link_header:
# Simple parsing for the next link, more robust parsing might be needed for complex headers
next_url_match = re.search(r'<([^>]+)>;\s*rel="next"', link_header)
if next_url_match:
# The next URL is provided directly, use it
api_url = next_url_match.group(1)
page += 1 # Increment page counter for logging, though not strictly needed for the loop logic now
logger.debug(f"Found next page link: {api_url}")
else:
logger.warning("Link header contains 'rel=\"next\"' but could not parse the URL. Stopping pagination.")
break
else:
logger.debug("No 'next' link found in Link header. Assuming last page.")
break
await asyncio.sleep(0.1) # Small delay between requests
except asyncio.TimeoutError:
logger.warning(f"GitHub API request timed out for page {page}. Stopping pagination early.")
break # Stop pagination on timeout
except aiohttp.ClientResponseError as e:
# Re-raise client response errors so the outer handler catches them
raise e
except Exception as e:
logger.exception(f"An unexpected error occurred during GitHub API pagination for page {page}. Stopping pagination.")
break # Stop pagination on unexpected error
logger.info(f"Total items fetched (including potential PRs): {len(all_issues_data)}")
# Filter out pull requests (issues with 'pull_request' key)
self.issues = {
issue_data['number']: self._process_issue_data(issue_data)
for issue_data in all_issues_data
if 'pull_request' not in issue_data and issue_data.get('number') is not None # Ensure number exists
}
logger.info(f"Filtered out pull requests, {len(self.issues)} actual open issues remaining.")
empty_fig = get_empty_plot("Issue Severity Distribution")
if not self.issues:
logger.warning("No open issues found for this repository.")
empty_fig.update_layout(annotations=[{"text": "No issues found.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, "No open issues found in the repository.", empty_fig
# --- Clustering and UI Data Prep ---
self.issue_list_for_clustering = list(self.issues.values())
logger.info("Clustering issues...")
await self._cluster_similar_issues()
# --- Initial Idle Task Prep (Run synchronously after load) ---
logger.info("Identifying potential duplicates based on initial clusters...")
self._identify_potential_duplicates()
logger.info("Identifying potentially stale issues...")
self._identify_stale_issues()
logger.info("Identifying high priority candidates...")
self._identify_high_priority_candidates()
# --- Prepare Dataframe Output & Stats ---
dataframe_data = []
severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0}
index_to_cluster_id: Dict[int, int] = {}
# Map issue index in the clustering list back to its cluster ID
for cluster_id, indices in self.issue_clusters.items():
for index in indices:
if 0 <= index < len(self.issue_list_for_clustering):
index_to_cluster_id[index] = cluster_id
else:
logger.warning(f"Clustering returned invalid index {index} for list of length {len(self.issue_list_for_clustering)}")
for i, issue in enumerate(self.issue_list_for_clustering):
severity = self._determine_severity(issue.get('labels', [])) # Use get for safety
severity_counts[severity] += 1
# Get cluster ID using the index from the clustering list
cluster_id = index_to_cluster_id.get(i, -1)
dataframe_data.append([
issue.get('id', 'N/A'), # Use get for safety
issue.get('title', 'No Title'), # Use get for safety
severity,
cluster_id if cluster_id != -1 else "N/A" # Display "N/A" for noise (-1)
])
logger.info("Generating statistics plot...")
stats_fig = self._generate_stats_plot(severity_counts)
# --- Start Background Tasks ---
# Ensure tasks are created in the manager's loop
self.start_broadcast_loop()
self.start_idle_processing()
success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups. Repo ready. Background analysis started."
logger.info(success_msg)
# Return both plots (stats and analytics severity are the same initially)
return dataframe_data, stats_fig, success_msg, stats_fig
except aiohttp.ClientResponseError as e:
logger.error(f"GitHub API request failed: Status={e.status}, Message='{e.message}', URL='{e.request_info.url}'")
error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token/URL."
if e.status == 404: error_msg = f"Error: Repository not found at {self.repo_url}."
elif e.status == 401: error_msg = "Error: Invalid GitHub token or insufficient permissions for this repository."
elif e.status == 403:
rate_limit_remaining = e.headers.get('X-RateLimit-Remaining') # FIX: Access rate_limit_remaining from error headers
rate_limit_reset = e.headers.get('X-RateLimit-Reset')
reset_time_str = "unknown"
if rate_limit_reset:
try: reset_time_str = datetime.fromtimestamp(int(rate_limit_reset), timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')
except ValueError: pass
error_msg = f"Error: GitHub API rate limit likely exceeded or access forbidden (Remaining: {rate_limit_remaining}). Reset time: {reset_time_str}. Check token or wait."
self.stop_idle_processing()
self.stop_broadcast_loop()
empty_fig = get_empty_plot("Issue Severity Distribution")
empty_fig.update_layout(annotations=[{"text": "API Error.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, error_msg, empty_fig
except asyncio.TimeoutError:
logger.error("GitHub API request timed out.")
self.stop_idle_processing()
self.stop_broadcast_loop()
empty_fig = get_empty_plot("Issue Severity Distribution")
empty_fig.update_layout(annotations=[{"text": "API Timeout.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, "Error: GitHub API request timed out.", empty_fig
except Exception as e:
self.stop_idle_processing()
self.stop_broadcast_loop()
logger.exception(f"An unexpected error occurred during issue crawl: {e}")
empty_fig = get_empty_plot("Issue Severity Distribution")
empty_fig.update_layout(annotations=[{"text": "Unexpected Error.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}])
return [], empty_fig, f"An unexpected error occurred: {e}", empty_fig
def _determine_severity(self, labels: List[str]) -> str:
"""Determines issue severity based on labels using predefined rules."""
labels_lower = {label.lower().strip() for label in labels}
for severity, keywords in self.severity_rules.items():
if any(keyword in label for label in labels_lower for keyword in keywords):
return severity
return "Unknown"
def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure:
"""Generates a Plotly bar chart for issue severity distribution."""
filtered_counts = {k: v for k, v in severity_counts.items() if v > 0}
if not filtered_counts:
fig = go.Figure()
fig.update_layout(title="Issue Severity Distribution", xaxis={"visible": False}, yaxis={"visible": False},
# FIX: Corrected dictionary syntax for font size
annotations=[{"text": "No issues to display.", "xref": "paper", "yref": "paper", "showarrow": False, "font": {"size": 16}}],
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)')
return fig # Return the empty figure here
severities = list(filtered_counts.keys())
counts = list(filtered_counts.values())
order = ['Critical', 'High', 'Medium', 'Low', 'Unknown']
# Sort severities based on the predefined order
severities_sorted = sorted(severities, key=lambda x: order.index(x) if x in order else len(order))
counts_sorted = [filtered_counts[s] for s in severities_sorted]
fig = px.bar(x=severities_sorted, y=counts_sorted, title="Issue Severity Distribution",
labels={'x': 'Severity', 'y': 'Number of Issues'}, color=severities_sorted,
color_discrete_map={'Critical': '#DC2626', 'High': '#F97316', 'Medium': '#FACC15', 'Low': '#84CC16', 'Unknown': '#6B7280'},
text=counts_sorted)
fig.update_layout(xaxis_title=None, yaxis_title="Number of Issues", plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)', showlegend=False,
xaxis={'categoryorder':'array', 'categoryarray': order},
yaxis={'rangemode': 'tozero'}) # Ensure y-axis starts at 0
fig.update_traces(textposition='outside')
return fig
async def _cluster_similar_issues(self):
"""Generates embeddings and clusters issues using HDBSCAN. Uses self.issue_list_for_clustering."""
if not self.issue_list_for_clustering:
logger.warning("Cannot cluster issues: No issues loaded or list is empty.")
self.issue_clusters = {}
self._webhook_change_count = 0 # Reset on empty list
self.needs_recluster = False
return
if not self.hf_token or HF_EMBEDDING_MODEL is None:
logger.error("Cannot cluster issues: Hugging Face token or embedding model missing.")
self.issue_clusters = {}
self._webhook_change_count = 0 # Reset on missing token/model
self.needs_recluster = False
return
num_issues = len(self.issue_list_for_clustering)
logger.info(f"Generating embeddings for {num_issues} issues for clustering using {HF_EMBEDDING_MODEL}...")
try:
# Use title + a snippet of the body for embedding
texts_to_embed = [
f"Title: {i.get('title','')} Body: {i.get('body','')[:500]}" # Limit body length
for i in self.issue_list_for_clustering
]
embeddings = await self._generate_embeddings(texts_to_embed)
if embeddings is None or not isinstance(embeddings, list) or len(embeddings) != num_issues:
logger.error(f"Failed to generate valid embeddings for clustering. Expected {num_issues}, got {type(embeddings)} len {len(embeddings) if embeddings else 'N/A'}.")
self.issue_clusters = {}
self._webhook_change_count = 0 # Reset on embedding failure
self.needs_recluster = False
return
logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN clustering...")
# Adjust min_cluster_size dynamically based on issue count?
min_cluster_size = max(2, min(5, num_issues // 10)) # Example: min 2, max 5, or 10% of issues
clusterer = HDBSCAN(min_cluster_size=min_cluster_size, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True)
clusters = clusterer.fit_predict(embeddings)
new_issue_clusters: Dict[int, List[int]] = {}
noise_count = 0
for i, cluster_id in enumerate(clusters):
cluster_id_int = int(cluster_id)
if cluster_id_int == -1:
noise_count += 1
continue
if cluster_id_int not in new_issue_clusters:
new_issue_clusters[cluster_id_int] = []
new_issue_clusters[cluster_id_int].append(i)
self.issue_clusters = new_issue_clusters
logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters (min size {min_cluster_size}) with {noise_count} noise points.")
# Reset the change counter and flag after successful clustering
self._webhook_change_count = 0
self.needs_recluster = False
logger.debug("Reset webhook change counter and recluster flag after clustering.")
except Exception as e:
logger.exception(f"Error during issue clustering: {e}")
self.issue_clusters = {}
self._webhook_change_count = 0 # Reset on clustering failure
self.needs_recluster = False
def _identify_potential_duplicates(self):
"""Populates self.potential_duplicates based on self.issue_clusters and self.issue_list_for_clustering."""
self.potential_duplicates = {}
if not self.issue_clusters or not self.issue_list_for_clustering:
logger.debug("Skipping duplicate identification: No clusters or issue list.")
return
index_to_id = {}
try:
for i, issue in enumerate(self.issue_list_for_clustering):
issue_id = issue.get('id')
if issue_id is None:
logger.warning(f"Issue at index {i} in clustering list is missing an ID.")
continue
index_to_id[i] = issue_id
except Exception as e:
logger.error(f"Error creating index-to-ID map for duplicate check: {e}. Issue list might be inconsistent.")
return
for cluster_id, indices in self.issue_clusters.items():
if len(indices) > 1:
# Get issue IDs for indices in this cluster, skipping any invalid indices
cluster_issue_ids = [index_to_id[i] for i in indices if i in index_to_id]
if len(cluster_issue_ids) > 1: # Ensure there's more than one valid issue ID in the cluster
for issue_id in cluster_issue_ids:
# For each issue in the cluster, list all *other* issues in the same cluster as potential duplicates
self.potential_duplicates[issue_id] = [other_id for other_id in cluster_issue_ids if other_id != issue_id]
logger.info(f"Identified potential duplicates for {len(self.potential_duplicates)} issues based on clustering.")
async def _generate_embeddings(self, texts: List[str]):
"""Generates sentence embeddings using Hugging Face Inference API."""
if not self.hf_token:
logger.error("Hugging Face token is not set. Cannot generate embeddings.")
return None
if not texts:
logger.warning("Embedding generation requested with empty text list.")
return []
if HF_EMBEDDING_MODEL is None:
logger.error("HF Embedding model is not configured.")
return None
model_id = HF_EMBEDDING_MODEL # Use the fixed embedding model
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
timeout = aiohttp.ClientTimeout(total=180) # Increased timeout for embedding large batches
logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.")
# HF Inference API has a limit on the number of inputs per request (often 512 or 1024)
# Batching is recommended for large lists of texts.
batch_size = 500 # Example batch size, adjust based on model limits if known
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
payload = {"inputs": batch_texts, "options": {"wait_for_model": True}}
logger.debug(f"Processing embedding batch {i//batch_size + 1}/{(len(texts)-1)//batch_size + 1} ({len(batch_texts)} texts)")
# Implement retry logic for batches
retries = 3
for attempt in range(retries):
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining')
logger.debug(f"HF Embedding API Response Status: {response.status}, RateLimit Remaining: {rate_limit_remaining}")
if response.status == 429: # Too Many Requests
retry_after = int(response.headers.get('Retry-After', 10))
logger.warning(f"HF Embedding API rate limited. Waiting for {retry_after} seconds before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue # Retry the same batch
response.raise_for_status() # Raise for other 4xx/5xx errors
result = await response.json()
if isinstance(result, list) and all(isinstance(emb, list) and all(isinstance(f, float) for f in emb) for emb in result):
if len(result) == len(batch_texts):
all_embeddings.extend(result)
logger.debug(f"Successfully received {len(result)} embeddings for batch.")
break # Batch successful, move to next batch
else:
logger.error(f"HF Embedding API returned wrong number of embeddings for batch: Got {len(result)}, expected {len(batch_texts)}.")
return None # Indicate failure
elif isinstance(result, dict) and 'error' in result:
error_msg = result['error']
estimated_time = result.get('estimated_time')
logger.error(f"HF Inference API embedding error on batch: {error_msg}" + (f" (Estimated time: {estimated_time}s)" if estimated_time else ""))
return None # Indicate failure
else:
logger.error(f"Unexpected embedding format received on batch: Type={type(result)}. Response: {str(result)[:500]}")
return None # Indicate failure
except asyncio.TimeoutError:
logger.warning(f"HF Inference API embedding request timed out after {timeout.total} seconds for batch. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait a bit before retrying timeout
continue
else:
logger.error("Max retries reached for embedding batch timeout.")
return None # Indicate failure
except aiohttp.ClientResponseError as e:
error_body = await e.text()
logger.error(f"HF Inference API embedding request failed on batch: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1 and e.status in [500, 502, 503, 504]: # Retry on server errors
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached or non-retryable error for embedding batch.")
return None # Indicate failure
except Exception as e:
logger.exception(f"Unexpected error during embedding generation on batch: {e}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached for unexpected error during embedding batch.")
return None # Indicate failure
else: # This else block executes if the inner loop completes without a 'break' (i.e., all retries failed)
logger.error(f"Failed to process embedding batch after {retries} retries.")
return None # Indicate failure
await asyncio.sleep(0.1) # Small delay between batches
if len(all_embeddings) == len(texts):
logger.info(f"Successfully generated embeddings for all {len(all_embeddings)} texts.")
return all_embeddings
else:
logger.error(f"Embedding generation failed partway through. Expected {len(texts)}, got {len(all_embeddings)}.")
return None # Indicate overall failure
async def generate_code_patch(self, issue_number: int, model_key: str) -> dict:
"""Generates a code patch suggestion using a selected AI model."""
if issue_number not in self.issues:
return {"error": f"Issue {issue_number} not found."}
if not self.hf_token:
return {"error": "Hugging Face token not set."}
if model_key not in HF_MODELS or HF_MODELS.get(model_key) is None:
return {"error": f"Invalid or unconfigured model key: {model_key}"}
if not self.repo_local_path or not self.repo:
return {"error": "Repository not cloned/available locally. Please scan the repository first."}
issue = self.issues[issue_number]
model_id = HF_MODELS[model_key]
logger.info(f"Generating patch for issue {issue_number} ('{issue.get('title', 'N/A')[:50]}...') using model {model_id}")
# --- Context Gathering ---
context_str = "Context gathering failed or not available."
context_source = "Error"
start_time_context = time.time()
context_data = self.precomputed_context.get(issue_number) # Use .get for safety
if context_data:
timestamp = context_data.get('timestamp', 0)
timestamp_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
if context_data.get("error"):
context_str = f"Pre-computed context retrieval failed: {context_data['error']}"
context_source = f"Pre-computed (Failed @ {timestamp_str})"
elif context_data.get("content"):
context_str = context_data["content"]
num_files = len(context_data.get('files',[]))
context_source = f"Pre-computed ({num_files} files @ {timestamp_str})"
else:
context_str = "Pre-computed context was empty or unavailable."
context_source = f"Pre-computed (Empty @ {timestamp_str})"
logger.info(f"Using pre-computed context for issue {issue_number} (Source: {context_source})")
else:
logger.info(f"No pre-computed context found for issue {issue_number}, computing now.")
context_source = "Computed On-Demand"
# Compute context on demand and store it
context_result = await self._get_code_context(issue)
self.precomputed_context[issue_number] = {
"content": context_result.get("content"),
"files": context_result.get("files", []),
"error": context_result.get("error"),
"timestamp": time.time()
}
if "error" in context_result and context_result["error"]:
context_str = f"Error retrieving context: {context_result['error']}"
context_source += " (Error)"
else:
context_str = context_result.get("content", "No specific context found.")
context_source += f" ({len(context_result.get('files',[]))} files)"
context_load_duration = time.time() - start_time_context
logger.info(f"Computed context on-demand in {context_load_duration:.2f}s. Source: {context_source}")
# --- Get Pre-computed Info ---
summary_text = self._get_precomputed_text(issue_number, self.precomputed_summaries, "summary", "Summary")
missing_info_text = self._get_precomputed_text(issue_number, self.precomputed_missing_info, "info_needed", "Missing Info Analysis")
analysis_text = self._get_precomputed_text(issue_number, self.precomputed_analysis, "hypothesis", "Preliminary Analysis")
duplicate_info = self._get_duplicate_info_text(issue_number)
# --- Enhanced Prompt ---
# Added clear delimiters and instructions for the AI
prompt = f"""You are an expert software engineer AI assistant generating a minimal `diff` code patch to fix a GitHub issue.
<issue_details>
Issue ID: {issue.get('id', 'N/A')}
Title: {issue.get('title', 'N/A')}
Labels: {', '.join(issue.get('labels', []))}
Body:
{issue.get('body', 'N/A')}
</issue_details>
<ai_precomputation_results>
Summary: {summary_text}
Missing Information Analysis: {missing_info_text}
Preliminary Hypothesis: {analysis_text}
{duplicate_info}
</ai_precomputation_results>
<relevant_code_context source="{context_source}">
{context_str}
</relevant_code_context>
<instructions>
1. **Analyze:** Carefully review all provided information (issue details, AI analysis, code context).
2. **Identify Changes:** Determine the minimal code modifications *strictly within the provided <relevant_code_context>* to address the issue.
3. **Format:** Generate a standard `diff` patch (--- a/..., +++ b/..., @@ ..., +/-, space for context). Use exact relative paths from the `<relevant_code_context>` section. If the context is insufficient to generate a meaningful patch, state this clearly in the explanation.
4. **Output Structure:** Provide a concise explanation *before* the ```diff ... ``` block. The explanation should summarize the proposed fix.
5. **Constraints:**
* If the provided `<relevant_code_context>` is insufficient or irrelevant, state "Insufficient context to generate patch." in your explanation and **do not generate a ```diff``` block.** Explain *why* the context is insufficient.
* Do not invent code, file paths, or line numbers not present or implied by the context.
* Generate *only* the explanation and the diff block (if applicable). Do not include any other conversational text or markdown sections.
</instructions>
Patch Suggestion:
"""
# --- Call Inference API ---
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
# Increased max_new_tokens for potentially larger patches/explanations
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 3072, "temperature": 0.1, "return_full_text": False, "do_sample": False},
"options": {"wait_for_model": True}
}
timeout = aiohttp.ClientTimeout(total=240) # Increased timeout
retries = 3
for attempt in range(retries):
try:
start_time_api = time.time()
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
api_duration = time.time() - start_time_api
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining')
logger.debug(f"HF Patch API Response Status: {response.status}, Duration: {api_duration:.2f}s, RateLimit Remaining: {rate_limit_remaining}")
if response.status == 429: # Too Many Requests
retry_after = int(response.headers.get('Retry-After', 10))
logger.warning(f"HF Patch API rate limited. Waiting for {retry_after} seconds before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue # Retry the same request
response.raise_for_status() # Raise for other 4xx/5xx errors
result = await response.json()
if result and isinstance(result, list) and 'generated_text' in result[0]:
generated_text = result[0].get('generated_text', '').strip()
logger.info(f"Received patch suggestion from {model_id} ({len(generated_text)} chars).")
# Robustly find the diff block
diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL | re.IGNORECASE)
explanation = generated_text # Default explanation is the whole text
patch_content = None
if diff_match:
patch_content = diff_match.group(1).strip()
# Split text before and after the first diff block
parts = generated_text.split("```diff", 1)
explanation = parts[0].strip()
# Check if the patch content looks like a diff (contains ---, +++, @@)
if not re.search(r'^(--- |\+\+\+ |@@ )', patch_content, re.MULTILINE):
logger.warning(f"Generated text contains ```diff``` block but content doesn't look like a standard diff for issue {issue_number}. Treating as no valid patch.")
# Treat as no valid patch generated
patch_content = None
explanation = generated_text # Fallback to entire text as explanation
if not explanation: explanation = "(No explanation provided by AI.)"
if patch_content:
logger.info(f"Successfully parsed diff block for issue {issue_number}.")
return {"explanation": explanation, "patch": patch_content, "model_used": model_id, "status": "Patch Generated"}
else:
# Check if the explanation indicates insufficient context or inability to generate
if re.search(r"(insufficient context|cannot generate|unable to create patch|context required)", explanation, re.IGNORECASE):
logger.warning(f"AI indicated insufficient context for issue {issue_number}.")
return {"explanation": explanation, "patch": None, "model_used": model_id, "status": "Insufficient Context"}
else:
logger.warning(f"No valid diff block found in patch suggestion response for issue {issue_number}.")
return {"explanation": f"(AI response did not contain a valid ```diff block. Full response below)\n---\n{generated_text}", "patch": None, "model_used": model_id, "status": "No Diff Generated"}
elif isinstance(result, dict) and 'error' in result:
error_msg = result['error']
estimated_time = result.get('estimated_time')
logger.error(f"HF Inference API patch error for issue {issue_number}: {error_msg}" + (f" (Est: {estimated_time}s)" if estimated_time else "") + f". Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying API error
continue
else:
logger.error("Max retries reached for API error during patch generation.")
return {"error": f"AI model error after retries: {error_msg}"}
else:
logger.error(f"Unexpected patch response format from {model_id} for issue {issue_number}: {str(result)[:500]}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached for unexpected API response during patch generation.")
return {"error": "Unexpected response format from AI model after retries."}
except asyncio.TimeoutError:
logger.warning(f"HF Inference API patch request timed out ({timeout.total}s) for issue {issue_number}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying timeout
continue
else:
logger.error("Max retries reached for patch generation timeout.")
return {"error": "AI model request timed out after retries."}
except aiohttp.ClientResponseError as e:
error_body = await e.response.text()
logger.error(f"HF Inference API patch request failed for issue {issue_number}: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1 and e.status in [500, 502, 503, 504]: # Retry on server errors
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached or non-retryable error for patch generation.")
return {"error": f"AI model request failed ({e.status}) after retries. Details: {error_body[:200]}..."}
except Exception as e:
logger.exception(f"Unexpected error during code patch generation for issue {issue_number}: {e}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached for unexpected error during patch generation.")
return {"error": f"An unexpected error occurred during patch generation after retries: {e}"}
# This part is reached if all retries fail
logger.error(f"Patch generation failed for issue {issue_number} after {retries} retries.")
return {"error": f"Patch generation failed after {retries} retries."}
async def _get_code_context(self, issue: dict) -> dict:
"""Retrieves relevant code context based on file paths mentioned in the issue."""
if not self.repo or not self.repo_local_path:
return {"content": "Local repository not available.", "files": [], "error": "Local repository not available."}
issue_id = issue.get('id', 'N/A')
issue_body = issue.get('body', '') or ""
issue_title = issue.get('title', '') or ""
text_to_search = f"{issue_title}\n{issue_body}"
if not text_to_search.strip():
return {"content": "No issue title or body provided to search for file paths.", "files": [], "error": None}
# Enhanced path pattern:
# - Looks for paths starting with /, ./, or just a filename/path segment
# - Allows common path characters (letters, numbers, ., /, _, -)
# - Requires a file extension from a common list
# - Uses word boundaries or punctuation/whitespace lookarounds to avoid partial matches within words
# - Allows paths enclosed in quotes or backticks
path_pattern = r"""
(?<![\w/.-]) # Ensure it's not part of a larger word or path segment immediately before
(?:
['"`]? # Optional opening quote/backtick
( # Capture the path
(?: \./ | / )? # Optional ./ or / at the start
(?:[\w./_-]+ /)* # Zero or more directory segments
[\w._-]+ # Filename (must have at least one word char, dot, underscore, or hyphen)
\. # The dot before the extension
(?:py|js|jsx|ts|tsx|java|c|cpp|h|hpp|cs|go|rs|php|rb|html|css|scss|less|md|yaml|yml|json|toml|sh|bash|zsh|ipynb|config|cfg|ini|xml|sql|gradle|tf|tfvars|dockerfile|Makefile|Dockerfile|txt|log) # Common extensions
)
['"`]? # Optional closing quote/backtick
)
(?=[\s\n\.,;:"')\]!>]|$) # Ensure it's followed by whitespace, punctuation, or end of string
"""
potential_files = set(re.findall(path_pattern, text_to_search, re.VERBOSE | re.IGNORECASE))
if not potential_files:
return {"content": "No file paths matching common patterns found in the issue title or body.", "files": [], "error": None}
logger.info(f"Found {len(potential_files)} potential file references in issue {issue_id}: {potential_files}")
context_content = ""
max_context_length = 6000 # Character limit for total context fed to the model
files_included = []
files_not_found = []
files_read_error = []
files_skipped_length = []
file_snippet_length = 1000 # Characters per file snippet
for file_path_str in potential_files:
# Clean up quotes/backticks and normalize separators
normalized_path_str = file_path_str.strip('\'"`').replace('\\', '/')
# Resolve relative paths if the repo root is the current directory context
relative_path = Path(normalized_path_str)
try:
# Construct the full path relative to the repo root
full_path = self.repo_local_path / relative_path
# Resolve the path to handle '..' etc. and get the absolute path
resolved_full_path = full_path.resolve()
# Ensure the resolved path is still within the repository directory
if not resolved_full_path.is_relative_to(self.repo_local_path.resolve()):
logger.warning(f"Skipping potentially malicious path outside repo: {file_path_str} resolved to {resolved_full_path}")
files_read_error.append(file_path_str + " (outside repo)")
continue
# Get the clean relative path string from the resolved path
relative_path_str = str(resolved_full_path.relative_to(self.repo_local_path.resolve()))
except Exception as e:
logger.warning(f"Could not resolve or validate path {file_path_str} for issue {issue_id}: {e}")
files_read_error.append(file_path_str + " (path error)")
continue
if resolved_full_path.is_file():
try:
file_content = resolved_full_path.read_text(encoding='utf-8', errors='ignore')
content_snippet = f"---\nFile: {relative_path_str}\n---\n{file_content[:file_snippet_length]}{' [...]' if len(file_content) > file_snippet_length else ''}\n\n"
if len(context_content) + len(content_snippet) <= max_context_length:
context_content += content_snippet
files_included.append(relative_path_str)
else:
logger.warning(f"Skipping file {relative_path_str} for context in issue {issue_id} due to total length limit ({max_context_length} chars).")
files_skipped_length.append(relative_path_str)
except OSError as e:
logger.warning(f"Could not read file {resolved_full_path} for issue {issue_id}: {e}")
files_read_error.append(relative_path_str)
except Exception as e:
logger.warning(f"Unexpected error reading file {resolved_full_path} for issue {issue_id}: {e}")
files_read_error.append(relative_path_str)
else:
logger.info(f"Potential path '{relative_path_str}' (from '{file_path_str}') not found or not a file in local repo for issue {issue_id}.")
files_not_found.append(relative_path_str)
final_content = ""
error_status = None
if files_included:
final_content = context_content.strip()
logger.info(f"Included context from {len(files_included)} files for issue {issue_id}: {files_included}")
else:
final_content = "No content could be retrieved from the potential file paths found."
logger.warning(f"Context generation for issue {issue_id} resulted in no included files.")
if potential_files: # If paths were found but none included
error_status = "No readable or found files among potential paths."
status_notes = []
if files_not_found:
status_notes.append(f"Files mentioned but not found: {files_not_found}")
logger.info(f"Files mentioned but not found for issue {issue_id}: {files_not_found}")
if files_read_error:
status_notes.append(f"Files failed to read: {files_read_error}")
logger.warning(f"Files mentioned but failed to read for issue {issue_id}: {files_read_error}")
if files_skipped_length:
status_notes.append(f"File content skipped due to length limit: {files_skipped_length}")
logger.warning(f"File content skipped due to length limit for issue {issue_id}: {files_skipped_length}")
if status_notes:
final_content += "\n\n--- Context Notes ---\n" + "\n".join(status_notes)
if error_status is None and (files_not_found or files_read_error):
error_status = "Some mentioned files were not found or could not be read."
return {"content": final_content.strip(), "files": files_included, "error": error_status}
async def suggest_resolution(self, issue: dict, model_key: str) -> str:
"""Suggests a resolution description using a selected AI model."""
if not self.hf_token:
return "Error: Hugging Face token not set."
if model_key not in HF_MODELS or HF_MODELS.get(model_key) is None:
return f"Error: Invalid or unconfigured model key: {model_key}"
model_id = HF_MODELS[model_key]
issue_id = issue.get('id','N/A')
logger.info(f"Requesting resolution suggestion for issue {issue_id} ('{issue.get('title', 'N/A')[:50]}...') using {model_id}")
# --- Get Pre-computed Info ---
summary_text = self._get_precomputed_text(issue_id, self.precomputed_summaries, "summary", "Summary")
missing_info_text = self._get_precomputed_text(issue_id, self.precomputed_missing_info, "info_needed", "Missing Info Analysis")
analysis_text = self._get_precomputed_text(issue_id, self.precomputed_analysis, "hypothesis", "Preliminary Analysis")
duplicate_info = self._get_duplicate_info_text(issue_id)
# Enhanced Prompt - More structured and clear instructions
prompt = f"""You are a helpful AI assistant acting as a senior software engineer. Analyze the following GitHub issue and provide concise, actionable, step-by-step suggestions on how to approach its resolution.
<issue_details>
Issue ID: {issue.get('id', 'N/A')}
Title: {issue.get('title', 'N/A')}
Labels: {', '.join(issue.get('labels', []))}
Body:
{issue.get('body', 'N/A')[:2000]} # Limit body length for suggestion prompt
</issue_details>
<ai_precomputation_results>
Summary: {summary_text}
Missing Information Analysis: {missing_info_text}
Preliminary Hypothesis: {analysis_text}
{duplicate_info}
</ai_precomputation_results>
<instructions>
Based on *all* the information provided above, outline a potential plan. Use clear headings for each step.
1. **Address Missing Information (If Any):** If the 'Missing Information Analysis' identified gaps, state concrete steps to get that information (e.g., "Request logs from the user," "Ask for steps to reproduce," "Check environment details"). If none needed, state "Issue description seems reasonably complete."
2. **Refine Understanding / Verify Hypothesis:** Based on the 'Preliminary Hypothesis' and issue details, state the most likely root cause or the main goal of the issue. Mention any ambiguities or areas needing confirmation.
3. **Identify Relevant Code Areas (Hypothesize):** Based on keywords, error messages, paths mentioned, or common conventions, list specific files, functions, or classes likely needing investigation. Explicitly state if this is speculative.
4. **Propose Implementation / Investigation Steps:** Describe the core logic changes, debugging steps, configuration updates, or API interactions needed. Break it down into logical, actionable steps.
5. **Testing Recommendations:** Suggest specific unit, integration, or manual tests crucial for verifying the fix or implementation. Mention key scenarios to cover.
6. **Next Steps (Standard Workflow):** Briefly mention the standard development workflow (e.g., branching, coding, testing, committing, PR).
</instructions>
Suggested Resolution Approach:
"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
# Adjusted parameters for more creative but still focused output
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 1500, "temperature": 0.7, "return_full_text": False, "do_sample": True, "top_p": 0.95},
"options": {"wait_for_model": True}
}
timeout = aiohttp.ClientTimeout(total=120) # Increased timeout
retries = 3
for attempt in range(retries):
try:
start_time_api = time.time()
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
api_duration = time.time() - start_time_api
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining')
logger.debug(f"HF Suggestion API Response Status: {response.status}, Duration: {api_duration:.2f}s, RateLimit Remaining: {rate_limit_remaining}")
if response.status == 429: # Too Many Requests
retry_after = int(response.headers.get('Retry-After', 10))
logger.warning(f"HF Suggestion API rate limited. Waiting for {retry_after} seconds before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue # Retry the same request
response.raise_for_status() # Raise for other 4xx/5xx errors
result = await response.json()
if result and isinstance(result, list) and 'generated_text' in result[0]:
suggestion = result[0].get('generated_text', 'AI Error: No suggestion text generated.').strip()
logger.info(f"Received suggestion from {model_id} for issue {issue_id} ({len(suggestion)} chars).")
return suggestion
elif isinstance(result, dict) and 'error' in result:
error_msg = result['error']
estimated_time = result.get('estimated_time')
logger.error(f"HF Inference API suggestion error for issue {issue_id}: {error_msg}" + (f" (Est: {estimated_time}s)" if estimated_time else "") + f". Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying API error
continue
else:
logger.error("Max retries reached for API error during suggestion generation.")
return f"Error: AI model returned an error after retries: {error_msg}"
else:
logger.error(f"Unexpected suggestion response format from {model_id} for issue {issue_id}: {str(result)[:500]}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached for unexpected API response during suggestion generation.")
return "Error: Received unexpected response format from AI model after retries."
except asyncio.TimeoutError:
logger.warning(f"HF Inference API suggestion request timed out ({timeout.total}s) for issue {issue_id}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying timeout
continue
else:
logger.error("Max retries reached for suggestion generation timeout.")
return "Error: AI model request timed out after retries. The model might be busy."
except aiohttp.ClientResponseError as e:
error_body = await e.response.text()
logger.error(f"HF Inference API suggestion request failed for issue {issue_id}: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1 and e.status in [500, 502, 503, 504]: # Retry on server errors
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached or non-retryable error for suggestion generation.")
return f"Error: AI model request failed ({e.status}) after retries. Check model/token/API status. Details: {error_body[:200]}..."
except Exception as e:
logger.exception(f"Error suggesting resolution for issue {issue_id}: {e}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached for unexpected error during suggestion generation.")
return f"An unexpected error occurred during suggestion generation after retries: {e}"
# This part is reached if all retries fail
logger.error(f"Suggestion generation failed for issue {issue_id} after {retries} retries.")
return f"Suggestion generation failed after {retries} retries."
def _get_precomputed_text(self, issue_id: int, data_dict: dict, key: str, name: str) -> str:
"""Safely retrieves precomputed text, handling errors and pending states."""
if issue_id in data_dict:
entry = data_dict[issue_id]
timestamp = entry.get("timestamp", 0)
# Consider it "recent" if computed within the last 2 idle cycles
is_recent = time.time() - timestamp < self.idle_processing_interval * 2
if entry.get("error"):
# Sanitize error message to prevent HTML injection in preview
sanitized_error = gr.Textbox.sanitize_html(str(entry['error']))
return f"❌ {name} Error (at {datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')}): {sanitized_error}"
elif key in entry and entry[key] is not None: # Check key exists and is not None
# Sanitize the text content
sanitized_text = gr.Textbox.sanitize_html(str(entry[key]))
return sanitized_text
else: # No error, but key might be missing or None
if is_recent:
return f"⏳ {name} computation pending..."
else:
return f"(ℹ️ {name} not computed or result was empty)"
else:
return f"(⏳ {name} not computed yet)"
def _get_duplicate_info_text(self, issue_id: int) -> str:
"""Formats duplicate info text."""
if issue_id in self.potential_duplicates:
dup_ids = self.potential_duplicates[issue_id]
if dup_ids:
# Sort duplicate IDs for consistent display
sorted_dup_ids = sorted(dup_ids)
# Use the JS selectIssue function to make links clickable in the preview HTML
dup_links = ", ".join([f"<a href='#' onclick='window.selectIssue({dup_id}); return false;' title='Issue #{dup_id} has similar content'>#{dup_id}</a>" for dup_id in sorted_dup_ids])
return f"\n### ⚠️ Potential Duplicate Issues:\nIssue IDs: {dup_links}"
return ""
async def broadcast_collaboration_status(self):
"""Periodically sends collaborator status to all connected clients."""
while True:
try:
await asyncio.sleep(5)
if not self.ws_clients: continue
# Clean up collaborators dict - remove entries for clients no longer in ws_clients
active_client_ids = {getattr(client, 'client_id', None) for client in self.ws_clients}
collaborators_to_remove = [cid for cid in self.collaborators if cid not in active_client_ids]
for cid in collaborators_to_remove:
logger.debug(f"Removing stale collaborator entry for client ID {cid}")
del self.collaborators[cid]
status_payload = json.dumps({"type": "collaboration_status", "collaborators": self.collaborators})
active_clients_snapshot = list(self.ws_clients) # Take a snapshot
tasks = []
# No need for disconnected_clients list here, remove_ws_client handles it
for client in active_clients_snapshot:
# Check if client is closed using the standard websockets property
if client.closed:
# remove_ws_client will be called by the exception handler or finally block
continue
try:
tasks.append(client.send(status_payload))
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError, WebSocketException) as e:
# These exceptions indicate the connection is likely already closed or broken
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected before send: {type(e).__name__}. Attempting removal.")
# Schedule removal in the main loop if not already in it
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.error("Main loop not running, cannot schedule client removal.")
continue # Skip to next client
except Exception as e:
logger.error(f"Unexpected error preparing send to client {getattr(client, 'client_id', client.remote_address)}: {e}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue # Skip to next client
# No need to await tasks here if remove_ws_client is scheduled via call_soon_threadsafe
except asyncio.CancelledError:
logger.info("Broadcast loop cancelled.")
break
except Exception as e:
logger.exception(f"Error in broadcast loop: {e}")
await asyncio.sleep(10) # Wait before next attempt after unexpected error
async def handle_code_editor_update(self, issue_num: int, delta_str: str, sender_client_id: str):
"""
Applies a delta from one client and broadcasts it to others.
WARNING: Lacks Operational Transformation - concurrent edits are UNSAFE.
"""
# Ensure issue exists and is currently loaded/tracked
if issue_num is None or issue_num not in self.issues:
logger.warning(f"Received code update for non-existent issue {issue_num} in manager. Ignoring.")
return
# Ensure an editor instance exists for this issue
if issue_num not in self.code_editors:
logger.warning(f"Received code update for issue {issue_num} but no editor instance found. Ignoring.")
# This state indicates the UI might be out of sync with the backend state.
# Ideally, the client only sends updates after handle_issue_select has run.
return
logger.debug(f"Handling code editor update for issue {issue_num} from {sender_client_id}. "
"WARNING: NO OT IMPLEMENTED - Last write wins / potential conflicts.")
try:
delta_obj = json.loads(delta_str)
# In a real OT system, apply_delta would update the authoritative state.
# Here, we just log and increment revision for the placeholder.
# The actual text state is managed client-side by the Gradio component.
self.code_editors[issue_num].apply_delta(delta_obj)
logger.debug(f"Applied delta for issue {issue_num} from client {sender_client_id} (Placeholder OT Logic - Revision {self.code_editors[issue_num].revision})")
update_payload = json.dumps({
"type": "code_update",
"issue_num": issue_num,
"delta": delta_str, # Send the original delta string
"senderId": sender_client_id
})
active_clients_snapshot = list(self.ws_clients) # Take a snapshot
tasks = []
# No need for disconnected_clients list here, remove_ws_client handles it
for client in active_clients_snapshot:
client_ws_id = getattr(client, 'client_id', None)
# Only broadcast to other clients, and only if the client is open
if client_ws_id != sender_client_id and not client.closed:
try:
tasks.append(client.send(update_payload))
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError, WebSocketException) as e:
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected during broadcast: {type(e).__name__}. Attempting removal.")
# Schedule removal in the main loop if not already in it
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue # Skip to next client
except Exception as e:
logger.error(f"Unexpected error preparing broadcast to client {getattr(client, 'client_id', client.remote_address)}: {e}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue # Skip to next client
if tasks:
logger.debug(f"Broadcasting code update for issue {issue_num} to {len(tasks)} other clients.")
# No need to await tasks here if remove_ws_client is scheduled via call_soon_threadsafe
except json.JSONDecodeError:
logger.error(f"Received invalid JSON delta for issue {issue_num} from {sender_client_id}: {delta_str[:200]}")
except Exception as e:
logger.exception(f"Error handling code editor update for issue {issue_num} from {sender_client_id}: {e}")
async def broadcast_issue_update(self):
"""Notifies clients that the issue list/data has changed (e.g., due to webhook)."""
if not self.ws_clients: return
logger.info("Broadcasting 'issues_updated' notification to all clients.")
update_payload = json.dumps({"type": "issues_updated"})
active_clients_snapshot = list(self.ws_clients)
tasks = []
# No need for disconnected_clients list here, remove_ws_client handles it
for client in active_clients_snapshot:
if client.closed:
continue
try:
tasks.append(client.send(update_payload))
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError, WebSocketException) as e:
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected during issue update broadcast: {type(e).__name__}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue
except Exception as e:
logger.error(f"Unexpected error preparing issue update broadcast to client {getattr(client, 'client_id', client.remote_address)}: {e}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue
if tasks:
logger.debug(f"Broadcasting issue update to {len(tasks)} clients.")
# No need to await tasks here if remove_ws_client is scheduled via call_soon_threadsafe
def remove_ws_client(self, client_to_remove: WebSocketServerProtocol):
"""Safely removes a client from the list and collaborator dict."""
# Use the stored client_id or remote_address for logging
client_id = getattr(client_to_remove, 'client_id', None)
client_addr = client_to_remove.remote_address if hasattr(client_to_remove, 'remote_address') else 'N/A'
client_desc = f"{client_id or 'Unknown ID'} ({client_addr})"
removed_from_list = False
removed_from_collab = False
# This method should ideally be called from the main async loop thread
# if it modifies shared state like self.ws_clients and self.collaborators.
# If called from elsewhere (e.g., a thread's error handler), use call_soon_threadsafe.
# Assuming it's called from the WS async handler or scheduled into the main loop.
try:
# Check if the client is actually in the list before attempting removal
if client_to_remove in self.ws_clients:
self.ws_clients.remove(client_to_remove)
removed_from_list = True
logger.info(f"Removed WebSocket client from list: {client_desc} (Remaining: {len(self.ws_clients)})")
else:
logger.debug(f"Client {client_desc} not found in list during removal attempt.")
except ValueError:
logger.debug(f"Client {client_desc} already removed from list or not found (ValueError).")
pass # Already removed
if client_id and client_id in self.collaborators:
del self.collaborators[client_id]
removed_from_collab = True
logger.info(f"Removed collaborator entry for {client_id}.")
# Schedule an immediate status broadcast if anything was removed
if (removed_from_list or removed_from_collab):
if self.main_loop.is_running():
# Use call_soon_threadsafe to schedule the async coroutine in the main loop
self.main_loop.call_soon_threadsafe(asyncio.create_task, self.broadcast_collaboration_status_once())
logger.debug(f"Scheduled immediate status broadcast after removing client {client_desc}.")
else:
logger.warning("Main loop not running, cannot schedule immediate status broadcast after client removal.")
async def broadcast_collaboration_status_once(self):
"""Sends a single collaboration status update immediately."""
if not self.ws_clients: return
status_payload = json.dumps({"type": "collaboration_status", "collaborators": self.collaborators})
active_clients_snapshot = list(self.ws_clients)
tasks = []
# No need for disconnected_clients list here, remove_ws_client handles it
for client in active_clients_snapshot:
# Check if client is closed using the standard websockets property
if client.closed:
continue
try:
tasks.append(client.send(status_payload))
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError, WebSocketException) as e:
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected during single broadcast: {type(e).__name__}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue
except Exception as e:
logger.error(f"Unexpected error preparing single broadcast to client {getattr(client, 'client_id', client.remote_address)}: {e}. Attempting removal.")
if self.main_loop.is_running():
self.main_loop.call_soon_threadsafe(self.remove_ws_client, client)
else:
logger.warning("Main loop not running, cannot schedule client removal.")
continue
if tasks:
logger.debug(f"Broadcasting single status update to {len(tasks)} clients.")
# No need to await tasks here if remove_ws_client is scheduled via call_soon_threadsafe
def _identify_stale_issues(self):
"""Identifies issues not updated recently based on 'updated_at'."""
self.stale_issues = []
threshold = timedelta(days=self.stale_issue_threshold_days)
now_aware = datetime.now(timezone.utc)
for issue_id, issue_data in self.issues.items():
updated_at_str = issue_data.get("updated_at")
if updated_at_str:
try:
# Handle potential different ISO 8601 formats (with/without Z, different offsets)
# datetime.fromisoformat handles Z and +/-HH:MM offsets
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00"))
# Ensure both datetimes are timezone-aware for comparison
if updated_at.tzinfo is None:
# Assume UTC if no timezone info (common for older systems)
updated_at = updated_at.replace(tzinfo=timezone.utc)
if now_aware - updated_at > threshold:
self.stale_issues.append(issue_id)
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse 'updated_at' ('{updated_at_str}') for issue {issue_id}: {e}")
else:
logger.debug(f"Issue {issue_id} missing 'updated_at' field.")
logger.info(f"Identified {len(self.stale_issues)} potentially stale issues (updated > {self.stale_issue_threshold_days} days ago).")
def _identify_high_priority_candidates(self):
"""Identifies high-priority issues (e.g., Critical/High severity)."""
self.high_priority_candidates = []
for issue_id, issue_data in self.issues.items():
severity = self._determine_severity(issue_data.get('labels', []))
if severity in ["Critical", "High"]:
self.high_priority_candidates.append(issue_id)
logger.info(f"Identified {len(self.high_priority_candidates)} high-priority candidates (Critical/High severity).")
async def _compute_and_store_summary(self, issue_id: int):
"""Generates and stores a summary for a given issue using an LLM (Idle Task)."""
# Re-check if issue exists just before starting computation
if issue_id not in self.issues:
logger.warning(f"Skipping summary generation for issue {issue_id}: Issue no longer exists.")
return
if not self.hf_token:
self.precomputed_summaries[issue_id] = {"error": "HF token not set", "timestamp": time.time()}
return
if DEFAULT_IDLE_MODEL_ID is None:
self.precomputed_summaries[issue_id] = {"error": "Idle model not configured", "timestamp": time.time()}
return
try:
issue = self.issues[issue_id]
model_id = DEFAULT_IDLE_MODEL_ID # Use designated idle model
logger.info(f"Idle Task: Generating summary for issue {issue_id} using {model_id}")
start_time = time.time()
prompt = f"""Concisely summarize the following GitHub issue in 1-2 sentences. Focus on the core problem or request reported by the user.
Issue Title: {issue.get('title', 'N/A')}
Issue Body (first 1000 chars):
{issue.get('body', 'N/A')[:1000]}
Summary:"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 128, "temperature": 0.2, "return_full_text": False, "do_sample": False},
"options": {"wait_for_model": True}
}
timeout = aiohttp.ClientTimeout(total=60)
retries = 2 # Fewer retries for idle tasks
for attempt in range(retries):
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logger.warning(f"Idle Summary API rate limited. Waiting {retry_after}s before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
result = await response.json()
duration = time.time() - start_time
if result and isinstance(result, list) and 'generated_text' in result[0]:
summary = result[0].get('generated_text', '').strip() or "(AI generated empty summary)"
self.precomputed_summaries[issue_id] = {"summary": summary, "error": None, "timestamp": time.time()}
logger.info(f"Stored summary for issue {issue_id} (took {duration:.2f}s).")
break # Success
elif isinstance(result, dict) and 'error' in result:
raise ValueError(f"API Error: {result['error']}")
else:
raise ValueError(f"Unexpected API response format: {str(result)[:200]}")
except (asyncio.TimeoutError, aiohttp.ClientResponseError, ValueError) as e:
logger.warning(f"Idle Summary API error for issue {issue_id}: {e}. Retry {attempt + 1}/{retries}.", exc_info=False)
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
except Exception as e:
logger.exception(f"Unexpected error in Idle Summary task for issue {issue_id}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
else: # Executed if loop completes without break
raise Exception(f"Max retries ({retries}) failed for issue {issue_id}")
except Exception as e:
err_msg = f"Failed summary: {e}"
logger.error(f"Failed to generate summary for issue {issue_id}: {e}", exc_info=False) # Keep log cleaner
self.precomputed_summaries[issue_id] = {"error": err_msg, "summary": None, "timestamp": time.time()}
async def _compute_and_store_missing_info(self, issue_id: int):
"""Idle Task: Use LLM to identify missing information needed for an issue."""
# Re-check if issue exists just before starting computation
if issue_id not in self.issues: return
if not self.hf_token:
self.precomputed_missing_info[issue_id] = {"error": "HF token not set", "timestamp": time.time()}
return
if DEFAULT_IDLE_MODEL_ID is None:
self.precomputed_missing_info[issue_id] = {"error": "Idle model not configured", "timestamp": time.time()}
return
try:
issue = self.issues[issue_id]
model_id = DEFAULT_IDLE_MODEL_ID # Use cheap model
logger.info(f"Idle Task: Identifying missing info for issue {issue_id} using {model_id}")
start_time = time.time()
prompt = f"""Analyze the following GitHub issue description. Identify critical information potentially missing for effective debugging or resolution. List the missing items concisely (e.g., "Steps to reproduce", "Error logs", "Expected vs. Actual behavior", "Environment details"). If the description seems reasonably complete, respond with ONLY the word "None".
Issue Title: {issue.get('title', 'N/A')}
Issue Body:
{issue.get('body', 'N/A')[:1500]}
Missing Information:"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 64, "temperature": 0.1, "return_full_text": False, "do_sample": False},
"options": {"wait_for_model": True}
}
timeout = aiohttp.ClientTimeout(total=45)
retries = 2
for attempt in range(retries):
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logger.warning(f"Idle Missing Info API rate limited. Waiting {retry_after}s before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
result = await response.json()
duration = time.time() - start_time
if result and isinstance(result, list) and 'generated_text' in result[0]:
info_needed = result[0].get('generated_text', '').strip()
if info_needed.lower() == "none" or not info_needed:
info_needed = "None needed."
self.precomputed_missing_info[issue_id] = {"info_needed": info_needed, "error": None, "timestamp": time.time()}
logger.info(f"Stored missing info analysis for issue {issue_id} (took {duration:.2f}s): '{info_needed[:50]}...'")
break # Success
elif isinstance(result, dict) and 'error' in result:
raise ValueError(f"API Error: {result['error']}")
else:
raise ValueError(f"Unexpected API response format: {str(result)[:200]}")
except (asyncio.TimeoutError, aiohttp.ClientResponseError, ValueError) as e:
logger.warning(f"Idle Missing Info API error for issue {issue_id}: {e}. Retry {attempt + 1}/{retries}.", exc_info=False)
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
except Exception as e:
logger.exception(f"Unexpected error in Idle Missing Info task for issue {issue_id}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
else: # Executed if loop completes without break
raise Exception(f"Max retries ({retries}) failed for issue {issue_id}")
except Exception as e:
err_msg = f"Failed missing info analysis: {e}"
logger.error(f"Failed missing info analysis for issue {issue_id}: {e}", exc_info=False)
self.precomputed_missing_info[issue_id] = {"error": err_msg, "info_needed": None, "timestamp": time.time()}
async def _compute_and_store_preliminary_analysis(self, issue_id: int):
"""Idle Task: Use LLM for a very concise preliminary hypothesis."""
# Re-check if issue exists just before starting computation
if issue_id not in self.issues: return
if not self.hf_token:
self.precomputed_analysis[issue_id] = {"error": "HF token not set", "timestamp": time.time()}
return
if DEFAULT_IDLE_MODEL_ID is None:
self.precomputed_analysis[issue_id] = {"error": "Idle model not configured", "timestamp": time.time()}
return
try:
issue = self.issues[issue_id]
model_id = DEFAULT_IDLE_MODEL_ID # Use cheap model
logger.info(f"Idle Task: Generating preliminary analysis for issue {issue_id} using {model_id}")
start_time = time.time()
prompt = f"""Analyze the GitHub issue below. Provide a single, concise sentence hypothesizing the root cause OR the main goal. Start with "Hypothesis:". If unsure, respond ONLY with "Hypothesis: Further investigation needed.".
Issue Title: {issue.get('title', 'N/A')}
Issue Body:
{issue.get('body', 'N/A')[:1500]}
Response:"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": 80, "temperature": 0.3, "return_full_text": False, "do_sample": False},
"options": {"wait_for_model": True}
}
timeout = aiohttp.ClientTimeout(total=45)
retries = 2
for attempt in range(retries):
try:
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.post(api_url, json=payload) as response:
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 5))
logger.warning(f"Idle Analysis API rate limited. Waiting {retry_after}s before retry {attempt + 1}/{retries}.")
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
result = await response.json()
duration = time.time() - start_time
if result and isinstance(result, list) and 'generated_text' in result[0]:
hypothesis = result[0].get('generated_text', '').strip()
if not hypothesis.lower().startswith("hypothesis:"):
hypothesis = "Hypothesis: (Analysis unclear)"
elif len(hypothesis) < 15:
hypothesis = "Hypothesis: (Analysis failed or too short)"
self.precomputed_analysis[issue_id] = {"hypothesis": hypothesis, "error": None, "timestamp": time.time()}
logger.info(f"Stored preliminary analysis for issue {issue_id} (took {duration:.2f}s): '{hypothesis[:60]}...'")
break # Success
elif isinstance(result, dict) and 'error' in result:
raise ValueError(f"API Error: {result['error']}")
else:
raise ValueError(f"Unexpected API response format: {str(result)[:200]}")
except (asyncio.TimeoutError, aiohttp.ClientResponseError, ValueError) as e:
logger.warning(f"Idle Analysis API error for issue {issue_id}: {e}. Retry {attempt + 1}/{retries}.", exc_info=False)
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
except Exception as e:
logger.exception(f"Unexpected error in Idle Analysis task for issue {issue_id}. Retry {attempt + 1}/{retries}.")
if attempt < retries - 1:
await asyncio.sleep(5)
continue
else:
raise e # Re-raise after max retries
else: # Executed if loop completes without break
raise Exception(f"Max retries ({retries}) failed for issue {issue_id}")
except Exception as e:
err_msg = f"Failed preliminary analysis: {e}"
logger.error(f"Failed preliminary analysis for issue {issue_id}: {e}", exc_info=False)
self.precomputed_analysis[issue_id] = {"error": err_msg, "hypothesis": None, "timestamp": time.time()}
def start_idle_processing(self):
"""Starts the background idle processing task if conditions are met."""
# Ensure task is created in the correct loop
if not self.main_loop.is_running():
logger.error("Cannot start idle processing: Main event loop is not running.")
return
if not self.idle_task or self.idle_task.done():
if self.hf_token and self.repo: # HF token and repo are minimum requirements
self.idle_task = self.main_loop.create_task(self.run_idle_processing())
logger.info(f"Started background idle processing task (interval: {self.idle_processing_interval}s).")
else:
missing = []
if not self.hf_token: missing.append("HF Token")
if not self.repo: missing.append("Repository")
logger.warning(f"Cannot start idle processing: Missing {', '.join(missing)}.")
else:
logger.debug("Idle processing task already running.")
def stop_idle_processing(self):
"""Stops the background idle processing task."""
if self.idle_task and not self.idle_task.done():
logger.info("Stopping background idle processing task...")
self.idle_task.cancel()
# Await the task to finish cancellation in an async context if needed
self.idle_task = None # Clear the reference
async def run_idle_processing(self):
"""Main loop for background analysis, including new idle thinking tasks."""
logger.info("Idle processing loop starting.")
try:
while True:
await asyncio.sleep(self.idle_processing_interval)
# Re-check conditions inside the loop
if not self.issues or not self.repo or not self.hf_token:
logger.debug("Idle processing skipped: No issues, repo, or HF token.")
continue # Continue the loop, just skip the processing cycle
logger.info(f"--- Starting idle processing cycle ---")
cycle_tasks = []
start_time_cycle = time.time()
# 1. Re-clustering (if needed)
if self.needs_recluster:
logger.info("Idle Task: Scheduling re-clustering and duplicate identification...")
# Capture current issues list *before* the async task potentially runs
# This snapshot ensures consistency for the clustering process
self.issue_list_for_clustering = list(self.issues.values())
cycle_tasks.append(self._run_clustering_and_duplicates_async())
else:
logger.debug("No re-clustering needed in this cycle.")
# 2. Identify Stale & High Priority Issues (Sync)
# These are quick operations, run synchronously
try:
self._identify_stale_issues()
self._identify_high_priority_candidates()
except Exception as e:
logger.error(f"Error during synchronous stale/priority identification: {e}")
# --- Identify Issues Needing Work ---
# Get current list of issue IDs
current_issue_ids = list(self.issues.keys())
# Shuffle to process issues more evenly over time, not just oldest/newest
random.shuffle(current_issue_ids)
# Filter issues that need specific computations
# Check if the issue still exists in self.issues before adding to needing list
issues_needing_context = [i for i in current_issue_ids if i in self.issues and i not in self.precomputed_context]
issues_needing_summary = [i for i in current_issue_ids if i in self.issues and i not in self.precomputed_summaries]
issues_needing_missing_info = [i for i in current_issue_ids if i in self.issues and i not in self.precomputed_missing_info]
issues_needing_analysis = [i for i in current_issue_ids if i in self.issues and i not in self.precomputed_analysis]
# Prioritize high-priority and stale issues for analysis tasks
# Ensure candidates are still in the 'needing' list before adding to ordered lists
priority_summary_candidates = [i for i in self.high_priority_candidates if i in issues_needing_summary]
stale_summary_candidates = [i for i in self.stale_issues if i in issues_needing_summary and i not in priority_summary_candidates]
other_summary_candidates = [i for i in issues_needing_summary if i not in priority_summary_candidates and i not in stale_summary_candidates]
ordered_summary_candidates = priority_summary_candidates + stale_summary_candidates + other_summary_candidates
priority_analysis_candidates = [i for i in self.high_priority_candidates if i in issues_needing_analysis]
stale_analysis_candidates = [i for i in self.stale_issues if i in issues_needing_analysis and i not in priority_analysis_candidates]
other_analysis_candidates = [i for i in issues_needing_analysis if i not in priority_analysis_candidates and i not in stale_analysis_candidates]
ordered_analysis_candidates = priority_analysis_candidates + stale_analysis_candidates + other_analysis_candidates
priority_info_candidates = [i for i in self.high_priority_candidates if i in issues_needing_missing_info]
stale_info_candidates = [i for i in self.stale_issues if i in issues_needing_missing_info and i not in priority_info_candidates]
other_info_candidates = [i for i in issues_needing_missing_info if i not in priority_info_candidates and i not in stale_info_candidates]
ordered_info_candidates = priority_info_candidates + stale_info_candidates + other_info_candidates
# Context is needed for patch generation, prioritize high-priority/stale issues
priority_context_candidates = [i for i in self.high_priority_candidates if i in issues_needing_context]
stale_context_candidates = [i for i in self.stale_issues if i in issues_needing_context and i not in priority_context_candidates]
other_context_candidates = [i for i in issues_needing_context if i not in priority_context_candidates and i not in stale_context_candidates]
ordered_context_candidates = priority_context_candidates + stale_context_candidates + other_context_candidates
logger.debug(f"Idle candidates: Ctx:{len(issues_needing_context)} ({len(ordered_context_candidates)} ordered), Sum:{len(issues_needing_summary)} ({len(ordered_summary_candidates)} ordered), "
f"Info:{len(issues_needing_missing_info)} ({len(ordered_info_candidates)} ordered), Anl:{len(issues_needing_analysis)} ({len(ordered_analysis_candidates)} ordered)")
# 3. Schedule Context Pre-computation (I/O bound)
context_computed_count = 0
for issue_id in ordered_context_candidates:
if context_computed_count < self.max_context_computations_per_cycle:
# Check if issue still exists before scheduling
if issue_id in self.issues:
cycle_tasks.append(self._compute_and_store_context(issue_id))
context_computed_count += 1
else: break
if context_computed_count > 0: logger.info(f"Scheduled {context_computed_count} context computations.")
# 4. Schedule Summary Generation (LLM - Medium Cost)
summary_computed_count = 0
for issue_id in ordered_summary_candidates:
if summary_computed_count < self.max_summary_computations_per_cycle:
if issue_id in self.issues:
cycle_tasks.append(self._compute_and_store_summary(issue_id))
summary_computed_count += 1
else: break
if summary_computed_count > 0: logger.info(f"Scheduled {summary_computed_count} summary computations.")
# 5. Schedule Missing Info Analysis (LLM - Low Cost)
missing_info_count = 0
for issue_id in ordered_info_candidates:
if missing_info_count < self.max_missing_info_computations_per_cycle:
if issue_id in self.issues:
cycle_tasks.append(self._compute_and_store_missing_info(issue_id))
missing_info_count += 1
else: break
if missing_info_count > 0: logger.info(f"Scheduled {missing_info_count} missing info analyses.")
# 6. Schedule Preliminary Analysis (LLM - Low Cost)
analysis_count = 0
for issue_id in ordered_analysis_candidates:
if analysis_count < self.max_analysis_computations_per_cycle:
if issue_id in self.issues:
cycle_tasks.append(self._compute_and_store_preliminary_analysis(issue_id))
analysis_count += 1
else: break
if analysis_count > 0: logger.info(f"Scheduled {analysis_count} preliminary analyses.")
# --- Execute Scheduled Async Tasks ---
if cycle_tasks:
logger.info(f"Executing {len(cycle_tasks)} async idle tasks for this cycle...")
# Run tasks concurrently
# Use return_exceptions=True to ensure all tasks are attempted even if one fails
results = await asyncio.gather(*cycle_tasks, return_exceptions=True)
num_errors = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
num_errors += 1
# Log the error, but gather ensures other tasks complete
logger.error(f"Error encountered in background idle task {i+1}/{len(cycle_tasks)}: {result}", exc_info=False) # Keep log cleaner
cycle_duration = time.time() - start_time_cycle
logger.info(f"Idle processing cycle finished in {cycle_duration:.2f} seconds. {len(results)} tasks processed ({num_errors} errors).")
else:
logger.info("No async idle tasks to perform in this cycle.")
logger.info(f"--- Finished idle processing cycle ---")
except asyncio.CancelledError:
logger.info("Idle processing loop cancelled.")
except Exception as e:
logger.exception(f"Critical error in idle processing loop: {e}")
# Wait longer before potentially restarting after a critical error
await asyncio.sleep(self.idle_processing_interval * 5)
finally:
logger.info("Idle processing loop finished.")
async def _compute_and_store_context(self, issue_id: int):
"""Helper async task to compute and store context for one issue during idle time."""
# Re-check if issue exists just before starting computation
if issue_id not in self.issues:
logger.warning(f"Skipping context computation for issue {issue_id}: Issue no longer exists.")
return
try:
logger.debug(f"Starting background context computation for issue {issue_id}...")
start_time = time.time()
issue_data = self.issues[issue_id] # Get latest issue data
context_result = await self._get_code_context(issue_data)
duration = time.time() - start_time
computed_data = {
"content": context_result.get("content"),
"files": context_result.get("files", []),
"error": context_result.get("error"),
"timestamp": time.time()
}
# Store the result even if there was an error during computation
self.precomputed_context[issue_id] = computed_data
log_msg = f"Stored context result for issue {issue_id} (found {len(computed_data['files'])} files, took {duration:.2f}s)."
if computed_data['error']:
log_msg += f" Error: {computed_data['error']}"
logger.info(log_msg)
except Exception as e:
logger.exception(f"Failed to compute context for issue {issue_id} in background task: {e}")
# Store an error state in precomputed_context
self.precomputed_context[issue_id] = {"error": f"Unexpected computation error: {e}", "timestamp": time.time(), "content": None, "files": []}
async def _run_clustering_and_duplicates_async(self):
"""Runs clustering and then identifies duplicates as a single background task unit."""
try:
logger.info("Background Task: Starting re-clustering issues...")
start_time = time.time()
# _cluster_similar_issues uses self.issue_list_for_clustering (snapshot taken before scheduling)
await self._cluster_similar_issues() # This resets flags on success internally
cluster_duration = time.time() - start_time
if self.issue_clusters:
logger.info(f"Background Task: Clustering finished in {cluster_duration:.2f}s. Identifying duplicates...")
start_time_dup = time.time()
# _identify_potential_duplicates uses the results of _cluster_similar_issues
self._identify_potential_duplicates()
dup_duration = time.time() - start_time_dup
logger.info(f"Background Task: Duplicate identification finished in {dup_duration:.2f}s.")
else:
logger.warning("Background Task: Clustering did not produce results. Skipping duplicate identification.")
# Flags are already reset by _cluster_similar_issues on failure/empty
except Exception as e:
logger.error(f"Error during background clustering/duplicate identification task: {e}", exc_info=True)
# ========== Gradio UI Definition ==========
def create_ui(manager: IssueManager) -> gr.Blocks:
"""Creates the Gradio interface."""
# --- Helper Functions for UI ---
def generate_issue_preview(issue_num: Optional[int]) -> str:
"""Generates HTML preview including new idle thinking results."""
if issue_num is None or issue_num not in manager.issues:
return "<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>"
try:
issue = manager.issues[issue_num]
# Use markdown2 for rendering issue body
html_body = markdown2.markdown(
issue.get('body', '*No description provided.*') or '*No description provided.*',
extras=["fenced-code-blocks", "tables", "strike", "task_list", "code-friendly", "html-classes", "nofollow", "spoiler"]
)
# Generate HTML for labels
labels_html = ' '.join(f'<span style=\'background-color: #f3f4f6; color: #374151; border: 1px solid #d1d5db; padding: 2px 6px; border-radius: 4px; font-size: 0.85em; display: inline-block; margin-right: 4px; margin-bottom: 4px;\'>{gr.Textbox.sanitize_html(l)}</span>' for l in issue.get('labels', [])) or '<span style="color: #6b7280;">None</span>'
# Generate HTML for status indicators
status_indicators = []
if issue_num in manager.stale_issues:
status_indicators.append(f"<span title='No updates in >{manager.stale_issue_threshold_days} days' style='color: #b91c1c; font-weight: bold; font-size: 0.9em; background-color: #fee2e2; padding: 1px 4px; border-radius: 3px;'>[Stale]</span>")
if issue_num in manager.high_priority_candidates:
severity = manager._determine_severity(issue.get('labels', []))
if severity == "Critical": color, bgcolor = "#ef4444", "#fee2e2"
elif severity == "High": color, bgcolor = "#f97316", "#ffedd5"
else: color, bgcolor = "#c2410c", "#fffbeb" # Should ideally not happen if logic works
status_indicators.append(f"<span title='Marked as {severity}' style='color: {color}; font-weight: bold; font-size: 0.9em; background-color: {bgcolor}; padding: 1px 4px; border-radius: 3px;'>[{severity}]</span>")
status_html = " ".join(status_indicators)
# --- Get Precomputed Data with Helper ---
summary_text = manager._get_precomputed_text(issue_num, manager.precomputed_summaries, "summary", "Summary")
missing_info_text = manager._get_precomputed_text(issue_num, manager.precomputed_missing_info, "info_needed", "Missing Info")
analysis_text = manager._get_precomputed_text(issue_num, manager.precomputed_analysis, "hypothesis", "Analysis")
duplicate_text = manager._get_duplicate_info_text(issue_num) # This already includes formatting and links
# --- Format AI Sections ---
ai_sections = []
# Only show sections if they have content or an error, not just pending state
# Use the presence of icons (❌, ⏳, ℹ️) to determine if it's just a status message
if not summary_text.startswith("⏳") and not summary_text.startswith("(ℹ️"):
color = "#f0e6ff" if not summary_text.startswith("❌") else "#fee2e2"
border_color = "#ddd6fe" if not summary_text.startswith("❌") else "#fecaca"
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 10px; border-top: 1px dashed #eee; padding-top: 8px; background-color: {color}; padding: 6px 10px; border-radius: 4px; border: 1px solid {border_color};">
<strong>🤖 AI Summary:</strong> {summary_text}
</div>""")
elif summary_text.startswith("❌"):
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 10px; border-top: 1px dashed #eee; padding-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;">
<strong>🤖 {summary_text}</strong>
</div>""")
if not missing_info_text.startswith("⏳") and not missing_info_text.startswith("(ℹ️"):
color = "#fffbeb" if "None needed." not in missing_info_text and not missing_info_text.startswith("❌") else "#f0fdf4" if "None needed." in missing_info_text else "#fee2e2"
border_color = "#fef3c7" if "None needed." not in missing_info_text and not missing_info_text.startswith("❌") else "#bbf7d0" if "None needed." in missing_info_text else "#fecaca"
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 8px; background-color: {color}; padding: 6px 10px; border-radius: 4px; border: 1px solid {border_color};">
<strong>🤔 AI Missing Info Analysis:</strong> {missing_info_text}
</div>""")
elif missing_info_text.startswith("❌"):
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;">
<strong>🤔 {missing_info_text}</strong>
</div>""")
if not analysis_text.startswith("⏳") and not analysis_text.startswith("(ℹ️"):
color = "#e0f2fe" if not analysis_text.startswith("❌") else "#fee2e2"
border_color = "#bae6fd" if not analysis_text.startswith("❌") else "#fecaca"
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 8px; background-color: {color}; padding: 6px 10px; border-radius: 4px; border: 1px solid {border_color};">
<strong>🔬 AI Preliminary Analysis:</strong> {analysis_text}
</div>""")
elif analysis_text.startswith("❌"):
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;">
<strong>🔬 {analysis_text}</strong>
</div>""")
if duplicate_text: # duplicate_text already contains HTML links if duplicates exist
ai_sections.append(f"""
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fffbeb; padding: 6px 10px; border-radius: 4px; border: 1px solid #fef3c7;">
{duplicate_text}
</div>""")
ai_sections_html = "\n".join(ai_sections)
# Construct the full preview HTML
preview_html = f"""
<div style="border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px; background-color: #ffffff; font-family: 'Inter', sans-serif; display: flex; flex-direction: column; max-height: 80vh;">
<h4 style="margin-top: 0; margin-bottom: 10px; font-size: 1.1em; display: flex; justify-content: space-between; align-items: center;">
<a href='{issue.get('url', '#')}' target='_blank' style='color: #6d28d9; text-decoration: none; font-weight: 600;' title="Open issue #{issue.get('id', 'N/A')} on GitHub">
#{issue.get('id', 'N/A')} - {gr.Textbox.sanitize_html(issue.get('title', 'N/A'))}
</a>
<span style="margin-left: 10px; flex-shrink: 0;">{status_html}</span>
</h4>
<hr style='margin: 10px 0; border-top: 1px solid #e5e7eb;'>
<div style="font-size: 0.9em; color: #4b5563; margin-bottom: 10px; display: flex; justify-content: space-between; flex-wrap: wrap; gap: 15px;">
<span><strong>State:</strong> <span style="text-transform: capitalize; background-color: {'#dcfce7' if issue.get('state') == 'open' else '#fee2e2'}; color: {'#166534' if issue.get('state') == 'open' else '#991b1b'}; padding: 1px 6px; border-radius: 3px; font-size: 0.9em; border: 1px solid {'#86efac' if issue.get('state') == 'open' else '#fecaca'};">{issue.get('state', 'N/A')}</span></span>
<span><strong>Assignee:</strong> {issue.get('assignee') or '<span style="color: #6b7280;">None</span>'}</span>
<span><strong>Severity:</strong> {manager._determine_severity(issue.get('labels', []))}</span>
</div>
<div style="font-size: 0.9em; color: #4b5563; margin-bottom: 12px;">
<strong>Labels:</strong> {labels_html}
</div>
<!-- AI Analysis Sections -->
{ai_sections_html}
<!-- Issue Body -->
<div style="margin-top: 15px; border-top: 1px solid #eee; padding-top: 10px; font-size: 0.95em; line-height: 1.6; overflow-y: auto; flex-grow: 1; max-height: 35vh; word-wrap: break-word;">
<h5 style="margin-bottom: 5px; margin-top: 0; color: #374151;">Description:</h5>
{html_body}
</div>
</div>
"""
return preview_html
except Exception as e:
logger.exception(f"Error generating issue preview for {issue_num}: {e}")
return f"<p style='color: red;'>Error generating preview for issue {issue_num}. Check logs.</p>"
async def get_ai_suggestion_wrapper(issue_num: Optional[int], model_key: str, progress=gr.Progress()) -> str:
"""UI wrapper for getting AI suggestions, handles state and progress."""
progress(0, desc="Preparing request...")
if issue_num is None or issue_num not in manager.issues:
return "⚠️ Error: Please select a valid issue first."
if not manager.hf_token:
return "🔒 Error: Hugging Face Token is not configured."
if model_key not in HF_MODELS or HF_MODELS.get(model_key) is None:
return f"⚠️ Error: Invalid or unconfigured model key selected: {model_key}"
issue = manager.issues[issue_num]
issue_hash = manager._get_issue_hash(issue)
logger.info(f"Requesting suggestion for issue {issue_num} (hash: {issue_hash}) using model {model_key}.")
try:
progress(0.3, desc=f"Querying {model_key}...")
# The cached_suggestion method handles the cache lookup and actual generation
suggestion = await manager.cached_suggestion(issue_hash, model_key)
progress(1, desc="Suggestion received.")
if suggestion.lower().startswith("error:"):
return f"⚠️ {suggestion}"
else:
# Format the output clearly
return f"**💡 Suggestion based on {model_key}:**\n\n---\n{suggestion}"
except Exception as e:
logger.exception(f"Error in get_ai_suggestion_wrapper for issue {issue_num}: {e}")
return f"❌ An unexpected error occurred while getting the suggestion: {e}"
async def get_ai_patch_wrapper(issue_num: Optional[int], model_key: str, progress=gr.Progress()) -> str:
"""UI wrapper for getting AI patches, handles state and progress."""
progress(0, desc="Preparing request...")
if issue_num is None or issue_num not in manager.issues:
return "⚠️ Error: Please select a valid issue first."
if not manager.hf_token:
return "🔒 Error: Hugging Face Token is not configured."
if not manager.repo:
return "❌ Error: Repository not loaded. Please scan the repository first."
if model_key not in HF_MODELS or HF_MODELS.get(model_key) is None:
return f"⚠️ Error: Invalid or unconfigured model key selected: {model_key}"
logger.info(f"Requesting patch for issue {issue_num} using model {model_key}.")
progress(0.1, desc="Gathering code context (using cache if available)...")
try:
# Context is gathered inside generate_code_patch, which uses the precomputed_context cache
progress(0.4, desc=f"Querying {model_key} for patch...")
result = await manager.generate_code_patch(issue_num, model_key)
progress(1, desc="Patch result received.")
if "error" in result:
logger.error(f"Patch generation failed for issue {issue_num}: {result['error']}")
return f"**❌ Error generating patch:**\n\n{result['error']}"
else:
model_used = result.get('model_used', model_key)
explanation = result.get('explanation', '(No explanation provided)')
patch_content = result.get('patch')
status_msg = result.get('status') # Get specific status if available
header = f"**🩹 Patch Suggestion from {model_used}:**"
if status_msg:
header = f"**🩹 Patch Generation Result from {model_used}:** ({status_msg})"
if patch_content:
# Escape backticks in patch content for markdown code block
patch_content_sanitized = patch_content.replace('`', '\\`')
logger.info(f"Successfully generated patch for issue {issue_num} using {model_used}.")
return f"""{header}
**Explanation:**
{explanation}
---
**Patch:**
```diff
{patch_content_sanitized}
```"""
else:
logger.warning(f"AI provided explanation but no patch for issue {issue_num}. Explanation: {explanation}")
return f"""{header}
**Explanation:**
{explanation}
---
**(No valid diff block generated)**"""
except Exception as e:
logger.exception(f"Error in get_ai_patch_wrapper for issue {issue_num}: {e}")
return f"❌ An unexpected error occurred while generating the patch: {e}"
async def handle_issue_select(evt: gr.SelectData):
"""
Handles issue selection in the Dataframe: updates preview, loads code context into editor.
Reads the selected issue ID from the event data.
Returns updates for multiple components.
"""
# Default state for deselection or error
default_updates = {
# Update the hidden state element first
"selected_issue_id_hidden": gr.update(value=""),
"issue_preview_html": gr.update(value="<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>"),
# Clear the code editor
"code_edit_component": gr.update(value={"placeholder.txt": "# Select an issue to load relevant code context."}, interactive=True, language="text"),
"ai_output_display": gr.update(value="*AI suggestions and patches will appear here after selecting an issue and action.*"),
"copy_patch_btn": gr.update(visible=False), # Hide copy button on selection change
}
# Check if a row was actually selected and has a value
if evt.index is None or not hasattr(evt, 'value') or not evt.value or evt.value[0] is None:
logger.info("Issue deselected or invalid selection event.")
# Return default updates to clear the UI
return default_updates
try:
# The first column (index 0) is the Issue ID
selected_id = int(evt.value[0])
logger.info(f"Issue selected via Dataframe: ID {selected_id}")
if selected_id not in manager.issues:
logger.error(f"Selected issue ID {selected_id} not found in manager's issue list.")
return {
**default_updates,
"issue_preview_html": gr.update(value=f"<p style='color: red; font-weight: bold;'>Error: Issue {selected_id} not found in the current list. Try re-scanning.</p>"),
"selected_issue_id_hidden": gr.update(value=""), # Ensure hidden state is cleared
}
issue_data = manager.issues[selected_id]
files_content: Dict[str, str] = {}
context_source_msg = "Loading context..."
context_load_start = time.time()
# --- Load or Compute Code Context ---
# Prioritize pre-computed context if available
context_data = manager.precomputed_context.get(selected_id) # Use .get for safety
if context_data:
timestamp = context_data.get('timestamp', 0)
timestamp_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
if context_data.get("error"):
context_source_msg = f"Pre-computed (Failed @ {timestamp_str})"
files_content["error_context.txt"] = f"# Error loading pre-computed context:\n# {context_data['error']}"
elif context_data.get("files"):
context_source_msg = f"Pre-computed ({len(context_data['files'])} files @ {timestamp_str})"
logger.info(f"Loading {len(context_data['files'])} files from pre-computed context for issue {selected_id}: {context_data['files']}")
loaded_count = 0
for file_path_str in context_data["files"]:
full_path = manager.repo_local_path / file_path_str
try:
if full_path.is_file():
# Use resolved path for robust reading
resolved_full_path = full_path.resolve()
# Double-check it's still in the repo path after resolve
if resolved_full_path.is_relative_to(manager.repo_local_path.resolve()):
files_content[file_path_str] = resolved_full_path.read_text(encoding='utf-8', errors='ignore')
loaded_count += 1
else:
logger.warning(f"Pre-computed file path {file_path_str} resolved outside repo during load for issue {selected_id}.")
files_content[file_path_str] = f"# File path resolved outside repository: {file_path_str}"
else:
logger.warning(f"Pre-computed file path {file_path_str} not found or not a file during load for issue {selected_id}.")
files_content[file_path_str] = f"# File not found or not a file: {file_path_str}"
except Exception as e:
logger.warning(f"Error reading pre-computed file {full_path} for issue {selected_id}: {e}")
files_content[file_path_str] = f"# Error reading file: {e}"
# This check should be outside the for loop, indented the same as the for loop
if loaded_count == 0 and context_data.get("files"): # Use .get for safety
files_content["error_reading_files.txt"] = "# Precomputed context found file references, but failed to read any file content."
# This else corresponds to elif context_data.get("files"):
else: # Line 2690 - Corrected indentation
context_source_msg = f"Pre-computed (No files found @ {timestamp_str})"
files_content[f"issue_{selected_id}_context.md"] = context_data.get("content", "# No specific code context found (pre-computed).")
else: # Handle on-demand context computation
logger.info(f"Context not pre-computed for issue {selected_id}, computing on demand for editor.")
context_source_msg = "Computed On-Demand"
context_result = await manager._get_code_context(issue_data)
# Store the newly computed context for future use (e.g., patch generation)
manager.precomputed_context[selected_id] = {
"content": context_result.get("content"),
"files": context_result.get("files", []),
"error": context_result.get("error"),
"timestamp": time.time()
}
if "error" in context_result and context_result["error"]:
context_source_msg += f" (Error: {context_result['error']})"
files_content["error_context.txt"] = f"# Error loading context on demand:\n# {context_result['error']}"
elif context_result.get("files"):
context_source_msg += f" ({len(context_result['files'])} files)"
logger.info(f"Loading {len(context_result['files'])} files computed on-demand for issue {selected_id}: {context_result['files']}")
loaded_count = 0
for file_path_str in context_result["files"]:
full_path = manager.repo_local_path / file_path_str
try:
if full_path.is_file():
resolved_full_path = full_path.resolve()
if resolved_full_path.is_relative_to(manager.repo_local_path.resolve()):
files_content[file_path_str] = resolved_full_path.read_text(encoding='utf-8', errors='ignore')
loaded_count +=1
else:
logger.warning(f"On-demand file path {file_path_str} resolved outside repo during load for issue {selected_id}.")
files_content[file_path_str] = f"# File path resolved outside repository: {file_path_str}"
else:
logger.warning(f"On-demand file path {file_path_str} not found or not a file during load for issue {selected_id}.")
files_content[file_path_str] = f"# File not found or not a file: {file_path_str}"
except Exception as e:
logger.warning(f"Error reading on-demand file {full_path} for issue {selected_id}: {e}")
files_content[file_path_str] = f"# Error reading file: {e}"
# This check should be outside the try/except and for loop
if loaded_count == 0 and context_result.get("files"): # Use .get for safety
files_content["error_reading_files.txt"] = "# Context computation found file references, but failed to read any file content."
# This else corresponds to the inner if/elif within the on-demand block
else:
context_source_msg += " (No files found)"
files_content[f"issue_{selected_id}_context.md"] = context_result.get("content", "# No specific code context found.")
# This line should be outside both the 'if context_data' and its 'else' branch
context_load_duration = time.time() - context_load_start
logger.info(f"Context loading for editor took {context_load_duration:.2f}s. Source: {context_source_msg}")
if not files_content:
files_content["placeholder.txt"] = f"# No relevant files found or context failed to load for issue {selected_id}."
# Initialize or update the OT editor instance for this issue
manager.code_editors[selected_id] = OTCodeEditor(initial_value=files_content)
logger.info(f"Initialized/Updated OT editor state for issue {selected_id} with files: {list(files_content.keys())}")
# Determine the initial language hint for the editor based on the first file extension
initial_language = "text" # Default to plain text
if files_content:
first_file_name = list(files_content.keys())[0]
# Extract extension (part after the last dot)
_, ext = os.path.splitext(first_file_name)
if ext:
# Remove leading dot and convert to lowercase
ext = ext[1:].lower()
# Map common extensions to ACE editor modes (approximate)
# Check available modes in ACE editor documentation if needed
lang_map = {
'py': 'python', 'js': 'javascript', 'jsx': 'javascript', 'ts': 'typescript', 'tsx': 'typescript',
'java': 'java', 'c': 'c_cpp', 'cpp': 'c_cpp', 'h': 'c_cpp', 'hpp': 'c_cpp', 'cs': 'csharp',
'go': 'golang', 'rs': 'rust', 'php': 'php', 'rb': 'ruby', 'html': 'html', 'css': 'css',
'scss': 'scss', 'less': 'less', 'md': 'markdown', 'yaml': 'yaml', 'yml': 'yaml', 'json': 'json',
'toml': 'toml', 'sh': 'sh', 'bash': 'sh', 'zsh': 'sh', 'sql': 'sql', 'xml': 'xml',
'dockerfile': 'dockerfile', 'makefile': 'makefile', 'txt': 'text', 'log': 'text',
'conf': 'text', 'cfg': 'text', 'ini': 'ini'
}
initial_language = lang_map.get(ext, 'text')
# Prepare updates for all relevant UI components
updates = {
# Update the hidden state element with the selected ID (as string)
"selected_issue_id_hidden": gr.update(value=str(selected_id)),
"issue_preview_html": gr.update(value=generate_issue_preview(selected_id)),
"code_edit_component": gr.update(value=files_content, interactive=True, language=initial_language),
"ai_output_display": gr.update(value=f"*Context loaded ({context_source_msg}). Ready for AI actions or editing.*"),
"copy_patch_btn": gr.update(visible=False), # Hide copy button on new selection
}
return updates
except (ValueError, TypeError, IndexError, KeyError, AttributeError) as e:
logger.exception(f"Error processing issue selection event (evt.value: {getattr(evt, 'value', 'N/A')}): {e}")
# Return default state plus an error message
return {
**default_updates,
"issue_preview_html": gr.update(value="<p style='color: red; font-weight: bold;'>Error loading issue details. Please check logs and try again.</p>"),
"code_edit_component": gr.update(value={"error.txt": f"# Error loading code context for selection.\n# Error: {e}"}, interactive=False, language="text"),
"ai_output_display": gr.update(value="*Error processing selection. See logs.*"),
"selected_issue_id_hidden": gr.update(value=""), # Ensure hidden state is cleared
"copy_patch_btn": gr.update(visible=False), # Hide copy button on error
}
# Check if a row was actually selected and has a value
if evt.index is None or not hasattr(evt, 'value') or not evt.value or evt.value[0] is None:
logger.info("Issue deselected or invalid selection event.")
# Return default updates to clear the UI
return default_updates
try:
# The first column (index 0) is the Issue ID
selected_id = int(evt.value[0])
logger.info(f"Issue selected via Dataframe: ID {selected_id}")
if selected_id not in manager.issues:
logger.error(f"Selected issue ID {selected_id} not found in manager's issue list.")
return {
**default_updates,
"issue_preview_html": gr.update(value=f"<p style='color: red; font-weight: bold;'>Error: Issue {selected_id} not found in the current list. Try re-scanning.</p>"),
"selected_issue_id_hidden": gr.update(value=""), # Ensure hidden state is cleared
}
issue_data = manager.issues[selected_id]
files_content: Dict[str, str] = {}
context_source_msg = "Loading context..."
context_load_start = time.time()
# --- Load or Compute Code Context ---
# Prioritize pre-computed context if available
context_data = manager.precomputed_context.get(selected_id) # Use .get for safety
if context_data:
timestamp = context_data.get('timestamp', 0)
timestamp_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
if context_data.get("error"):
context_source_msg = f"Pre-computed (Failed @ {timestamp_str})"
files_content["error_context.txt"] = f"# Error loading pre-computed context:\n# {context_data['error']}"
elif context_data.get("files"):
context_source_msg = f"Pre-computed ({len(context_data['files'])} files @ {timestamp_str})"
logger.info(f"Loading {len(context_data['files'])} files from pre-computed context for issue {selected_id}: {context_data['files']}")
loaded_count = 0
for file_path_str in context_data["files"]:
full_path = manager.repo_local_path / file_path_str
try:
if full_path.is_file():
# Use resolved path for robust reading
resolved_full_path = full_path.resolve()
# Double-check it's still in the repo path after resolve
if resolved_full_path.is_relative_to(manager.repo_local_path.resolve()):
files_content[file_path_str] = resolved_full_path.read_text(encoding='utf-8', errors='ignore')
loaded_count += 1
else:
logger.warning(f"Pre-computed file path {file_path_str} resolved outside repo during load for issue {selected_id}.")
files_content[file_path_str] = f"# File path resolved outside repository: {file_path_str}"
else:
logger.warning(f"Pre-computed file path {file_path_str} not found or not a file during load for issue {selected_id}.")
files_content[file_path_str] = f"# File not found or not a file: {file_path_str}"
except Exception as e:
logger.warning(f"Error reading pre-computed file {full_path} for issue {selected_id}: {e}")
files_content[file_path_str] = f"# Error reading file: {e}"
if loaded_count == 0 and context_data["files"]:
files_content["error_reading_files.txt"] = "# Precomputed context found file references, but failed to read any file content."
else:
context_source_msg = f"Pre-computed (No files found @ {timestamp_str})"
files_content[f"issue_{selected_id}_context.md"] = context_data.get("content", "# No specific code context found (pre-computed).")
else:
logger.info(f"Context not pre-computed for issue {selected_id}, computing on demand for editor.")
context_source_msg = "Computed On-Demand"
context_result = await manager._get_code_context(issue_data)
# Store the newly computed context for future use (e.g., patch generation)
manager.precomputed_context[selected_id] = {
"content": context_result.get("content"),
"files": context_result.get("files", []),
"error": context_result.get("error"),
"timestamp": time.time()
}
if "error" in context_result and context_result["error"]:
context_source_msg += f" (Error: {context_result['error']})"
files_content["error_context.txt"] = f"# Error loading context on demand:\n# {context_result['error']}"
elif context_result.get("files"):
context_source_msg += f" ({len(context_result['files'])} files)"
logger.info(f"Loading {len(context_result['files'])} files computed on-demand for issue {selected_id}: {context_result['files']}")
loaded_count = 0
for file_path_str in context_result["files"]:
full_path = manager.repo_local_path / file_path_str
try:
if full_path.is_file():
resolved_full_path = full_path.resolve()
if resolved_full_path.is_relative_to(manager.repo_local_path.resolve()):
files_content[file_path_str] = resolved_full_path.read_text(encoding='utf-8', errors='ignore')
loaded_count +=1
else:
logger.warning(f"On-demand file path {file_path_str} resolved outside repo during load for issue {selected_id}.")
files_content[file_path_str] = f"# File path resolved outside repository: {file_path_str}"
else:
logger.warning(f"On-demand file path {file_path_str} not found or not a file during load for issue {selected_id}.")
files_content[file_path_str] = f"# File not found or not a file: {file_path_str}"
except Exception as e:
logger.warning(f"Error reading on-demand file {full_path} for issue {selected_id}: {e}")
files_content[file_path_str] = f"# Error reading file: {e}"
if loaded_count == 0 and context_result["files"]:
files_content["error_reading_files.txt"] = "# Context computation found file references, but failed to read any file content."
else:
context_source_msg += " (No files found)"
files_content[f"issue_{selected_id}_context.md"] = context_result.get("content", "# No specific code context found.")
context_load_duration = time.time() - context_load_start
logger.info(f"Context loading for editor took {context_load_duration:.2f}s. Source: {context_source_msg}")
if not files_content:
files_content["placeholder.txt"] = f"# No relevant files found or context failed to load for issue {selected_id}."
# Initialize or update the OT editor instance for this issue
manager.code_editors[selected_id] = OTCodeEditor(initial_value=files_content)
logger.info(f"Initialized/Updated OT editor state for issue {selected_id} with files: {list(files_content.keys())}")
# Determine the initial language hint for the editor based on the first file extension
initial_language = "text" # Default to plain text
if files_content:
first_file_name = list(files_content.keys())[0]
# Extract extension (part after the last dot)
_, ext = os.path.splitext(first_file_name)
if ext:
# Remove leading dot and convert to lowercase
ext = ext[1:].lower()
# Map common extensions to ACE editor modes (approximate)
# Check available modes in ACE editor documentation if needed
lang_map = {
'py': 'python', 'js': 'javascript', 'jsx': 'javascript', 'ts': 'typescript', 'tsx': 'typescript',
'java': 'java', 'c': 'c_cpp', 'cpp': 'c_cpp', 'h': 'c_cpp', 'hpp': 'c_cpp', 'cs': 'csharp',
'go': 'golang', 'rs': 'rust', 'php': 'php', 'rb': 'ruby', 'html': 'html', 'css': 'css',
'scss': 'scss', 'less': 'less', 'md': 'markdown', 'yaml': 'yaml', 'yml': 'yaml', 'json': 'json',
'toml': 'toml', 'sh': 'sh', 'bash': 'sh', 'zsh': 'sh', 'sql': 'sql', 'xml': 'xml',
'dockerfile': 'dockerfile', 'makefile': 'makefile', 'txt': 'text', 'log': 'text',
'conf': 'text', 'cfg': 'text', 'ini': 'ini'
}
initial_language = lang_map.get(ext, 'text')
# Prepare updates for all relevant UI components
updates = {
# Update the hidden state element with the selected ID (as string)
"selected_issue_id_hidden": gr.update(value=str(selected_id)),
"issue_preview_html": gr.update(value=generate_issue_preview(selected_id)),
"code_edit_component": gr.update(value=files_content, interactive=True, language=initial_language),
"ai_output_display": gr.update(value=f"*Context loaded ({context_source_msg}). Ready for AI actions or editing.*"),
"copy_patch_btn": gr.update(visible=False), # Hide copy button on new selection
}
return updates
except (ValueError, TypeError, IndexError, KeyError, AttributeError) as e:
logger.exception(f"Error processing issue selection event (evt.value: {getattr(evt, 'value', 'N/A')}): {e}")
# Return default state plus an error message
return {
**default_updates,
"issue_preview_html": gr.update(value="<p style='color: red; font-weight: bold;'>Error loading issue details. Please check logs and try again.</p>"),
"code_edit_component": gr.update(value={"error.txt": f"# Error loading code context for selection.\n# Error: {e}"}, interactive=False, language="text"),
"ai_output_display": gr.update(value="*Error processing selection. See logs.*"),
"selected_issue_id_hidden": gr.update(value=""), # Ensure hidden state is cleared
"copy_patch_btn": gr.update(visible=False), # Hide copy button on error
}
# --- Analytics Helper ---
def update_cluster_analytics(mgr: IssueManager):
"""Generates data for the cluster analytics dataframe."""
if not mgr.issue_clusters or not mgr.issue_list_for_clustering:
return [["N/A", 0, "No clusters found"]]
cluster_data = []
# Expanded stop words list (lowercase)
stop_words = set("a an the is are was were be been being has have had do does did will would shall should can could may might must it its this that these those there their then than and or but if because as at by for with without about against between into through during before after above below to from up down in out on off over under again further then once here there when where why how all any both each few more most other some such no nor not only own same so than too very s t can will just don should now d ll m o re ve yo ain aren couldn didn doesn isn it's mustn ouren shan shouldn wasn weren won wouldn".split())
# Add common code/issue terms (lowercase)
stop_words.update({'issue', 'bug', 'error', 'fix', 'feat', 'chore', 'refactor', 'docs', 'test', 'file', 'line', 'code', 'when', 'user', 'report', 'problem', 'need', 'want', 'get', 'use', 'try', 'make', 'add', 'remove', 'change', 'update', 'create', 'build', 'run', 'start', 'stop', 'click', 'button', 'page', 'app', 'data', 'system', 'service', 'component', 'module', 'function', 'class', 'method', 'variable', 'value', 'string', 'number', 'list', 'dict', 'object', 'array', 'true', 'false', 'null', 'none', 'type', 'size', 'color', 'style', 'width', 'height', 'margin', 'padding', 'border', 'display', 'position', 'float', 'clear', 'overflow', 'index', 'key', 'value', 'id', 'name', 'text', 'html', 'css', 'js', 'py', 'java', 'c', 'cpp', 'h', 'hpp', 'go', 'rs', 'php', 'rb', 'json', 'yaml', 'xml', 'sql', 'log', 'config', 'setup', 'install', 'version', 'release', 'branch', 'commit', 'pull', 'request', 'merge', 'diff', 'patch', 'context', 'suggestion', 'analysis', 'missing', 'information', 'duplicate', 'potential', 'cluster', 'severity', 'label', 'assignee', 'state', 'title', 'body', 'url', 'created', 'updated', 'time', 'day', 'week', 'month', 'year', 'ago', 'now', 'new', 'old', 'different', 'same', 'similar', 'group', 'count', 'distribution', 'analytics', 'overview', 'studio', 'board', 'resolution', 'collaborative', 'editor', 'context', 'aware', 'assistance', 'tools', 'output', 'patch', 'steps', 'approach', 'plan', 'identify', 'propose', 'recommendation', 'workflow', 'standard', 'important', 'critical', 'high', 'medium', 'low', 'unknown', 'none', 'needed', 'analysis', 'hypothesis', 'preliminary', 'relevant', 'area', 'investigation', 'implementation', 'testing', 'next'})
def get_top_keywords(indices):
text = ""
for idx in indices:
# Ensure index is valid for the current issue list snapshot
if 0 <= idx < len(mgr.issue_list_for_clustering):
issue = mgr.issue_list_for_clustering[idx]
text += issue.get('title', '') + " " + issue.get('body', '')[:500] + " " # Use more body text
text = text.lower()
# Remove punctuation
text = text.translate(str.maketrans('', '', string.punctuation))
# Split into words and filter
words = [w for w in text.split() if len(w) > 2 and w not in stop_words] # Min word length 3
if not words: return "N/A"
counts = Counter(words)
# Get top 5 most common words
top_5 = [word for word, count in counts.most_common(5)]
return ", ".join(top_5)
# Sort clusters by size (descending)
sorted_clusters = sorted(mgr.issue_clusters.items(), key=lambda item: len(item[1]), reverse=True)
# Limit to top N clusters for display
top_n_clusters = 10 # Display top 10 clusters
cluster_data = []
for cluster_id, indices in sorted_clusters[:top_n_clusters]:
keywords = get_top_keywords(indices)
cluster_data.append([cluster_id, len(indices), keywords])
if not cluster_data:
return [["N/A", 0, "No clusters found"]]
return cluster_data
# --- Gradio Blocks ---
with gr.Blocks(theme=theme, title="AI Issue Resolver Pro", css="""
#collab-list .collab-item { margin-bottom: 4px; font-size: 0.9em; }
.gradio-container { max-width: 1600px !important; }
#issue_preview_div { max-height: 80vh; overflow-y: auto; } /* Ensure preview is scrollable */
#ai_output_md { max-height: 60vh; overflow-y: auto; } /* Ensure AI output is scrollable */
#code_editor_component { min-height: 500px; height: 70vh; } /* Give editor more space */
#ai_tools_accordion > .label { background-color: #f0f9ff; } /* Light blue background for tools header */
#ai_tools_accordion.closed > .label { background-color: #f0f9ff; } /* Keep background when closed */
#ai_tools_accordion > .label:hover { background-color: #e0f2fe; } /* Hover effect */
#ai_tools_accordion > .label > .icon { color: #0ea5e9; } /* Tool icon color */
.panel { border: 1px solid #e5e7eb; border-radius: 8px; padding: 15px; background-color: #f9fafb; } /* Styled panel */
#config-panel { margin-bottom: 20px; }
#status_output_txt textarea { font-family: monospace; font-size: 0.9em; background-color: #eef; } /* Style status bar */
.gradio-dataframe-container { overflow-x: auto; } /* Ensure dataframe is scrollable horizontally */
#issue_list_df table { width: 100%; } /* Make dataframe table use full width */
#issue_list_df th, #issue_list_df td { white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } /* Prevent text wrapping in table headers/cells */
#issue_list_df td:nth-child(1) { width: 80px; min-width: 80px; } /* Fix ID column width */
#issue_list_df td:nth-child(2) { width: 60%; max-width: 400px; } /* Give Title column more space */
#issue_list_df td:nth-child(3) { width: 15%; min-width: 100px; } /* Severity */
#issue_list_df td:nth-child(4) { width: 15%; min-width: 80px; } /* Cluster */
.gradio-dataframe-container table td { cursor: pointer; } /* Indicate rows are clickable */
""") as demo_app:
gr.Markdown("""
<div style="text-align: center; margin-bottom: 20px;">
<h1 style="color: #6d28d9; font-weight: 800;">🚀 AI Issue Resolver Pro</h1>
<p style="color: #4b5563; font-size: 1.1em;">Collaborative Issue Resolution Powered by AI</p>
</div>
""")
# Use a hidden textbox to reliably pass the selected issue ID to JS
selected_issue_id_hidden = gr.Textbox(visible=False, value="", elem_id="selected_issue_id_hidden")
with gr.Row(variant="panel", elem_id="config-panel") as config_row:
with gr.Column(scale=3):
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo", info="Enter the full URL.", elem_id="repo_url_inp")
with gr.Row():
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", placeholder="ghp_...", info="For private repos or higher rate limits.", elem_id="github_token_inp")
hf_token = gr.Textbox(label="Hugging Face Token", type="password", placeholder="hf_...", info="Required for AI features.", elem_id="hf_token_inp")
with gr.Column(scale=1, min_width=250):
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value=DEFAULT_MODEL_KEY,
label="🤖 Select AI Model", info="Used for suggestions & patches.", elem_id="model_select_dd",
interactive=bool(HF_MODELS)) # Disable if no models configured
crawl_btn = gr.Button("🛰️ Scan Repository Issues", variant="primary", icon="🔍", elem_id="crawl_btn")
status_output = gr.Textbox(label="Status Log", interactive=False, lines=1, max_lines=1,
placeholder="Status updates appear here... Idle tasks may run in background.",
elem_id="status_output_txt")
with gr.Tabs(elem_id="main-tabs"):
with gr.Tab("📋 Issue Board", id="board", elem_id="tab-board"):
with gr.Row(equal_height=False):
with gr.Column(scale=3):
gr.Markdown("### Open Issues (Select Row to View/Edit)")
issue_list = gr.Dataframe(
headers=["ID", "Title", "Severity", "Cluster"],
datatype=["number", "str", "str", "str"],
interactive=True,
row_count=(10, "dynamic"), # Show fewer rows initially
col_count=(4, "fixed"),
wrap=True,
elem_id="issue_list_df",
label="Issues",
value=[] # Start with empty list
)
with gr.Column(scale=2, min_width=350):
gr.Markdown("### Issue Severity Distribution")
stats_plot = gr.Plot(label="Severity Plot", elem_id="stats_plot_viz", value=manager._generate_stats_plot({})) # Initialize with empty plot
collab_status = gr.HTML("""
<div style="margin-top: 20px; border: 1px solid #e5e7eb; padding: 10px; border-radius: 8px; background-color: #f9fafb;">
<h4 style="margin-bottom: 5px; color: #374151; font-size: 1em;">👥 Active Collaborators</h4>
<div id="collab-list" style="min-height: 30px; max-height: 100px; overflow-y: auto; padding-top: 5px;">
<span style="color: #888;">Connecting...</span>
</div>
</div>
""", elem_id="collab_status_html")
with gr.Tab("💻 Resolution Studio", id="studio", elem_id="tab-studio"):
with gr.Row(equal_height=False):
with gr.Column(scale=1, min_width=450):
gr.Markdown("### Selected Issue Details")
issue_preview_html = gr.HTML("<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>", elem_id="issue_preview_div")
with gr.Accordion("🛠️ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"):
suggest_btn = gr.Button("🧠 Suggest Resolution Steps", icon="💡", elem_id="suggest_btn", interactive=bool(HF_MODELS))
patch_btn = gr.Button("📝 Generate Code Patch", icon="🩹", elem_id="patch_btn", interactive=bool(HF_MODELS))
gr.Markdown("### AI Output")
# Use a Markdown component with scrollability
ai_output_display = gr.Markdown(value="*AI suggestions and patches will appear here...*", elem_id="ai_output_md")
with gr.Row():
copy_patch_btn = gr.Button("📋 Copy Patch", elem_id="copy_patch_btn", visible=False) # Initially hidden
clear_ai_output_btn = gr.Button("🧹 Clear Output", elem_id="clear_ai_output_btn")
with gr.Column(scale=2, min_width=600):
gr.Markdown("### Collaborative Code Editor (Context-Aware)")
gr.Markdown("<p style='color: orange; font-weight: bold;'>⚠️ Warning: Real-time collaborative editing is experimental and may lose data with simultaneous edits. Use with caution and save work frequently!</p>")
# Initialize with placeholder content
code_edit_component = code_editor(
label="Code Context / Editor",
language="python", # Default language, can be changed based on file extension in JS
interactive=True,
elem_id="code_editor_component",
value={"placeholder.txt": "# Select an issue to load relevant code context."}
)
with gr.Tab("📈 Analytics", id="analytics", elem_id="tab-analytics"):
gr.Markdown("### Repository Analytics")
with gr.Row(equal_height=False):
with gr.Column(scale=1):
gr.Markdown("#### Issue Severity Distribution")
# Initialize with empty plot, will be updated after crawl
analytics_severity_plot = gr.Plot(label="Severity Distribution (Analytics)", elem_id="analytics_severity_plot", value=manager._generate_stats_plot({}))
with gr.Column(scale=1):
gr.Markdown("#### Issue Cluster Analysis (Top Clusters)")
cluster_info_df = gr.Dataframe(
headers=["Cluster ID", "Issue Count", "Top Keywords (Example)"],
datatype=["number", "number", "str"],
value=[["Scan a repository to see cluster data.", 0, ""]], # Initial placeholder data
label="Issue Clusters", elem_id="cluster_info_df",
interactive=False, # Make this dataframe non-interactive
row_count=(5, "dynamic"),
col_count=(3, "fixed"),
wrap=True
)
gr.Markdown("*(Analytics update after scanning the repository. More detailed analytics could be added.)*")
# --- Event Handlers ---
# The crawl button updates the issue list, stats plot, status, and analytics plot
crawl_btn.click(
fn=manager.crawl_issues,
inputs=[repo_url, github_token, hf_token],
outputs=[issue_list, stats_plot, status_output, analytics_severity_plot],
api_name="crawl_issues",
show_progress="full"
).then(
# After crawl_issues completes, update the cluster analytics dataframe
fn=lambda: update_cluster_analytics(manager),
inputs=[],
outputs=[cluster_info_df]
)
# The issue list selection updates the preview, code editor, AI output area, and hidden state
issue_list.select(
fn=handle_issue_select,
# Pass the event data, which contains the selected row's value (including ID)
inputs=[gr.SelectData()],
outputs=[selected_issue_id_hidden, issue_preview_html, code_edit_component, ai_output_display, copy_patch_btn],
show_progress="minimal",
# Trigger the JS function to update the tracked issue ID and editor listeners
# This is handled by the MutationObserver and reading the hidden input value in JS
)
# AI Suggestion button
suggest_btn.click(
fn=get_ai_suggestion_wrapper,
inputs=[selected_issue_id_hidden, model_select], # Read selected issue ID from hidden state
outputs=[ai_output_display],
api_name="suggest_resolution",
show_progress="full"
).then(
# After getting suggestion, hide the copy patch button
fn=lambda: gr.update(visible=False),
inputs=[],
outputs=[copy_patch_btn]
)
# AI Patch button
patch_btn.click(
fn=get_ai_patch_wrapper,
inputs=[selected_issue_id_hidden, model_select], # Read selected issue ID from hidden state
outputs=[ai_output_display],
api_name="generate_patch",
show_progress="full"
).then(
# After getting patch, check if it contains a diff block and show the copy button if so
# Use a lambda to check the output text
fn=lambda output_text: gr.update(visible="```diff" in output_text),
inputs=[ai_output_display],
outputs=[copy_patch_btn]
)
# Clear AI Output button
clear_ai_output_btn.click(
fn=lambda: ["*AI suggestions and patches will appear here...*", gr.update(visible=False)],
inputs=[],
outputs=[ai_output_display, copy_patch_btn]
)
# --- JavaScript for WebSocket Communication and UI Interaction ---
def web_socket_js(ws_port, gradio_port):
# Generate a unique client ID on page load
# Note: This JS is generated *once* when the Gradio app is created.
# The actual client ID is generated and logged when the JS runs in the browser.
# The Python-side logging here is just for context during app startup.
temp_client_id_placeholder = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}"
logger.info(f"Generating JS with placeholder Client ID: {temp_client_id_placeholder}")
return f"""
<script>
// IIFE to keep variables local and prevent global pollution
(function() {{
// Prevent multiple initializations if script is loaded more than once
if (window.collabWsInitialized) {{
console.log('WebSocket script already initialized. Skipping setup.');
// If the existing WS is closed, try to reconnect
if (!window.collabWs || window.collabWs.readyState === WebSocket.CLOSED) {{
console.log('Existing WebSocket is closed, attempting reconnect...');
connectWebSocket();
}}
return;
}}
window.collabWsInitialized = true;
console.log('Initializing WebSocket connection script...'); // Client ID logged after generation
// Generate a unique client ID in the browser
const clientId = 'client_' + Math.random().toString(36).substring(2, 10);
window.clientId = clientId; // Make client ID accessible globally if needed
console.log('Generated Client ID for WebSocket:', clientId);
let wsUrl;
const wsPort = {ws_port};
const gradioPort = {gradio_port}; // Not strictly needed for WS URL if path proxy used
const hostname = window.location.hostname;
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
// Determine WS URL: Use direct port if localhost/127.0.0.1 or gradio.live
// Otherwise, assume a reverse proxy and use the same host/port with a path
if (hostname === 'localhost' || hostname === '127.0.0.1' || hostname.endsWith('.gradio.live')) {{
// For local or gradio.live, use the specific WS_PORT
wsUrl = `${{protocol}}//${{hostname}}:${{wsPort}}`;
console.log('Detected local/gradio.live environment, using direct WebSocket URL:', wsUrl);
}} else {{
// For other environments (e.g., Hugging Face Spaces), assume /ws path on the Gradio port
// This requires the Space config to proxy /ws to the WS server port
const wsPath = '/ws';
wsUrl = `${{protocol}}//${{window.location.host}}${{wsPath}}`; // Use window.location.host which includes port if non-standard
console.log('Detected non-local environment, assuming proxied WebSocket URL:', wsUrl);
}}
let collabWs = null;
window.collabWs = collabWs; // Make WS object accessible globally
let aceEditorInstance = null;
let currentIssueId = null; // Track the issue ID currently loaded in the editor
let reconnectAttempts = 0;
const maxReconnectAttempts = 15; // Increased attempts
let reconnectTimeout = null; // Store timeout handle
let editorSetupAttempts = 0;
const maxEditorSetupAttempts = 30; // Increased attempts
let editorSetupTimeout = null; // Store timeout handle
let editorChangeListenerAttached = false;
let lastSentDeltaTimestamp = 0;
const deltaSendDebounceMs = 100; // Reduced debounce for responsiveness
// --- UI Update Functions ---
function updateCollabList(collaborators) {{
const collabListDiv = document.getElementById('collab-list');
if (!collabListDiv) return;
// Filter out the current client from the list displayed to the user
const activeCollaborators = Object.entries(collaborators || {{}})
.filter(([id, info]) => id !== clientId);
if (activeCollaborators.length > 0) {{
collabListDiv.innerHTML = activeCollaborators
.map(([id, info]) => `
<div class="collab-item" title="Client ID: ${{id}}">
<span style="font-weight: 500;">${{info.name || id.substring(0, 8)}}:</span>
<span style="color: #555;">${{info.status || 'Idle'}}</span>
</div>`)
.join('');
}} else {{
collabListDiv.innerHTML = '<span style="color: #6b7280;">You are the only active user.</span>';
}}
}
function updateStatusBar(message, isError = false) {{
// Find the Gradio Textbox element by its elem_id
const statusBar = document.querySelector('#status_output_txt textarea');
if (statusBar) {{
const timestamp = new Date().toLocaleTimeString();
statusBar.value = `[${{timestamp}}] ${{message}}`;
// Update styling based on error status
statusBar.style.color = isError ? '#D32F2F' : '#333';
statusBar.style.fontWeight = isError ? 'bold' : 'normal';
}}
}}
function getSelectedIssueId() {{
// Find the hidden textbox element by its elem_id and get its value
const hiddenInput = document.querySelector('#selected_issue_id_hidden input[type="hidden"]');
if (hiddenInput && hiddenInput.value && hiddenInput.value !== 'null') {{
try {{
return parseInt(hiddenInput.value, 10);
}} catch (e) {{
console.error("Failed to parse selected issue ID from hidden input:", hiddenInput.value, e);
}}
}}
return null;
}
// Function to trigger issue selection from JS (e.g., clicking duplicate link)
// This is a hacky workaround as Gradio doesn't expose dataframe selection directly to JS easily.
// It relies on simulating a click on the correct row.
window.selectIssue = function(issueId) {{
console.log("Attempting to select issue ID:", issueId);
const dataframe = document.getElementById('issue_list_df');
if (!dataframe) {{
console.error("Dataframe element not found (#issue_list_df).");
updateStatusBar("UI Error: Dataframe not found.", true);
return;
}}
// Find the row containing the issue ID in the first column
const rows = dataframe.querySelectorAll('tbody tr');
let foundRow = null;
for (const row of rows) {{
const firstCell = row.querySelector('td:first-child');
if (firstCell && parseInt(firstCell.textContent.trim(), 10) === issueId) {{
foundRow = row;
break; // Found the row
}}
}}
if (foundRow) {{
console.log("Found row for issue ID", issueId, "Simulating click.");
// Simulate a click event on the row
foundRow.click();
// Switch to the Studio tab
const studioTabButton = document.querySelector('#tab-studio button');
if(studioTabButton) {{
studioTabButton.click();
console.log("Switched to Resolution Studio tab.");
}} else {{
console.warn("Resolution Studio tab button not found.");
}}
}} else {{
console.warn("Row for issue ID", issueId, "not found in the dataframe.");
updateStatusBar(`Issue #${{issueId}} not found in the current list. Try re-scanning.`, true);
}}
}};
// --- WebSocket Handling ---
function connectWebSocket() {{
if (collabWs && (collabWs.readyState === WebSocket.OPEN || collabWs.readyState === WebSocket.CONNECTING)) {{
console.log(`WebSocket already ${(collabWs.readyState === WebSocket.OPEN) ? 'open' : 'connecting'}. State: ${{collabWs.readyState}}`);
return;
}}
if (reconnectAttempts >= maxReconnectAttempts) {{
console.error('Max WebSocket reconnection attempts reached.');
updateStatusBar('Collaboration failed - Max reconnect attempts reached.', true);
return;
}}
console.log(`Attempting WebSocket connection to ${{wsUrl}} (Attempt ${{reconnectAttempts + 1}}/${{maxReconnectAttempts}})...`);
updateStatusBar(`Connecting collaboration service (Attempt ${{reconnectAttempts + 1}})...`);
try {{
collabWs = new WebSocket(wsUrl);
window.collabWs = collabWs; // Update global reference
}} catch (e) {{
console.error("WebSocket constructor failed:", e);
updateStatusBar("Collaboration connection failed (init error).", true);
// Handle reconnection attempt here as well
handleWsClose({{ code: 1006, reason: "Constructor failed", wasClean: false }});
return; // Prevent setting up listeners on failed constructor
}}
collabWs.onopen = function(event) {{
console.log('WebSocket connection established.');
updateStatusBar('Collaboration connected.');
reconnectAttempts = 0;
if(reconnectTimeout) {{ clearTimeout(reconnectTimeout); reconnectTimeout = null; }}
// Send join message with client ID and a default name
sendWsMessage({{ type: 'join', clientId: clientId, name: `User_${{clientId.substring(0,4)}}` }});
// Set initial status based on whether an issue is selected
const initialStatus = getSelectedIssueId() !== null ? `Viewing Issue #${{getSelectedIssueId()}}` : 'Idle';
sendWsMessage({{ type: 'status_update', clientId: clientId, status: initialStatus }});
// Attempt to set up editor listeners immediately after connection
setupCodeEditorListener();
}};
collabWs.onmessage = function(event) {{
try {{
const data = JSON.parse(event.data);
if (data.type === 'collaboration_status') {{
updateCollabList(data.collaborators);
}} else if (data.type === 'code_update') {{
const receivedIssueNum = parseInt(data.issue_num, 10);
// Only apply update if it's for the current issue and not from this client
if (aceEditorInstance && receivedIssueNum === currentIssueId && data.senderId !== clientId) {{
console.debug(`Applying remote delta for issue ${{receivedIssueNum}} from ${{data.senderId}}.`);
try {{
const delta = JSON.parse(data.delta);
// Add a custom property to the delta to signal our change listener to ignore it
const deltaToApply = {{...delta, ignore: true}};
// Apply the delta to the ACE editor document
aceEditorInstance.getSession().getDocument().applyDeltas([deltaToApply]);
}} catch (e) {{ console.error('Failed to parse or apply remote delta:', e, data.delta); }}
}}
}} else if (data.type === 'issues_updated') {{
console.log('Received notification: Issue list updated on server.');
updateStatusBar('Issue list updated on server. Refreshing the page or re-scanning is recommended.');
// Optional: Visually highlight the crawl button
const crawlButton = document.getElementById('crawl_btn');
if (crawlButton) {{
crawlButton.style.transition = 'background-color 0.5s ease';
crawlButton.style.backgroundColor = '#fef08a'; // Yellowish highlight
setTimeout(() => {{ crawlButton.style.backgroundColor = ''; }}, 3000); // Revert after 3 seconds
}}
}} else {{
console.warn("Received unknown WebSocket message type:", data.type, data);
}}
}} catch (e) {{
console.error('Failed to parse WebSocket message or update UI:', e, event.data);
}}
}};
collabWs.onclose = function(event) {{
console.warn(`WebSocket connection closed: Code=${{event.code}}, Reason='${{event.reason || 'N/A'}}', Clean=${{event.wasClean}}`);
handleWsClose(event); // Centralize close handling
}};
collabWs.onerror = function(error) {{
console.error('WebSocket error event:', error);
updateStatusBar('Collaboration connection error.', true);
// onclose will likely fire after onerror.
}};
}
function handleWsClose(event) {{
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: #D32F2F; font-weight: bold;">Collaboration Disconnected</span>';
updateStatusBar('Collaboration disconnected.', true);
collabWs = null; // Clear the reference
window.collabWs = null;
aceEditorInstance = null; // Clear editor instance reference on disconnect
editorChangeListenerAttached = false; // Allow re-attaching on reconnect
// Only attempt reconnect if not a clean close (e.g., server restart)
if (!event.wasClean && event.code !== 1000 && event.code !== 1001) {{ // 1000: normal, 1001: going away
if (reconnectAttempts < maxReconnectAttempts) {{
const delay = Math.pow(2, reconnectAttempts) * 1000 + Math.random() * 1000; // Exponential backoff
console.log(`Attempting to reconnect WebSocket in approx. ${{Math.round(delay / 1000)}} seconds...`);
reconnectTimeout = setTimeout(connectWebSocket, delay);
reconnectAttempts++;
}} else {{
console.error('Max WebSocket reconnection attempts reached.');
updateStatusBar('Collaboration failed - Max reconnect attempts reached.', true);
}}
}} else {{
console.log('WebSocket closed cleanly or going away. No reconnect attempt.');
}}
}
function sendWsMessage(message) {{
if (collabWs && collabWs.readyState === WebSocket.OPEN) {{
try {{
collabWs.send(JSON.stringify(message));
}} catch (e) {{
console.error("Failed to stringify or send WebSocket message:", e, message);
}}
}} else {{
console.warn('WebSocket not connected. Cannot send message:', message);
// Maybe queue messages to send on reconnect? Not implemented here.
}}
}}
window.sendWsMessage = sendWsMessage; // Expose send function globally
// --- Code Editor Integration ---
function setupCodeEditorListener() {{
// Find the ACE editor instance managed by the Gradio component
// This relies on the internal structure of gradio_code_editor
const editorElement = document.querySelector('#code_editor_component .ace_editor');
// Check if Ace library is available and the editor element exists
if (typeof ace === 'undefined' || !editorElement) {{
if (editorSetupAttempts < maxEditorSetupAttempts) {{
console.warn(`Ace library or editor element not found yet. Retrying editor setup (${{editorSetupAttempts + 1}}/${{maxEditorSetupAttempts}})...`);
editorSetupAttempts++;
editorSetupTimeout = setTimeout(setupCodeEditorListener, 500 + Math.random() * 500); // Retry faster
}} else {{
console.error("Ace library or editor element not found after multiple attempts. Code editor collaboration disabled.");
updateStatusBar("Code editor library or UI element failed to load.", true);
}}
return;
}}
// Prevent double initialization on the same editor instance
// Check if aceEditorInstance is already set AND corresponds to the current editorElement
if (aceEditorInstance && aceEditorInstance.container === editorElement) {{
console.debug("Ace Editor instance already initialized for this element.");
// Editor is already set up, just ensure the correct issue ID is tracked
updateTrackedIssueId(); // Re-check issue ID in case it changed
return;
}}
try {{
// Get the ACE editor instance associated with the element
aceEditorInstance = ace.edit(editorElement);
if (!aceEditorInstance) {{ throw new Error("ace.edit(element) returned null or undefined."); }}
console.log('Ace Editor instance acquired successfully:', aceEditorInstance);
// Clear previous listeners if any (important if the editor component is re-rendered)
if (aceEditorInstance._changeListener) {{
aceEditorInstance.getSession().off('change', aceEditorInstance._changeListener);
delete aceEditorInstance._changeListener;
}}
if (aceEditorInstance._focusListener) {{
aceEditorInstance.off('focus', aceEditorInstance._focusListener);
delete aceEditorInstance._focusListener;
}}
if (aceEditorInstance._blurListener) {{
aceEditorInstance.off('blur', aceEditorInstance._blurListener);
delete aceEditorInstance._blurListener;
}}
// Attach change listener
console.log("Attaching Ace editor 'change' listener...");
const changeListener = function(delta) {{
// Check for the custom 'ignore' flag added when applying remote deltas
if (delta.ignore) {{
console.debug("Ignoring change due to 'ignore' flag.");
return; // Ignore remote changes
}}
// Only send delta if an issue is currently selected
if (currentIssueId !== null) {{
const now = Date.now();
// Simple debounce to avoid flooding server with rapid keystrokes
if (now - lastSentDeltaTimestamp > deltaSendDebounceMs) {{
console.debug(`User change detected on issue ${{currentIssueId}}. Sending delta:`, delta);
sendWsMessage({{
type: 'code_update',
issue_num: currentIssueId,
delta: JSON.stringify(delta), // Send delta as string
clientId: clientId
}});
lastSentDeltaTimestamp = now;
}} else {{
console.debug("Debouncing change.");
}}
}}
}};
aceEditorInstance.getSession().on('change', changeListener);
aceEditorInstance._changeListener = changeListener; // Store reference to remove later
// Add focus/blur listeners for status updates
const focusListener = () => {{
if(currentIssueId !== null) {{
sendWsMessage({{ type: 'status_update', clientId: clientId, status: `Editing Issue #${{currentIssueId}}`}});
}}
}};
const blurListener = () => {{
// Update status based on whether an issue is still selected
const status = getSelectedIssueId() !== null ? `Viewing Issue #${{getSelectedIssueId()}}` : 'Idle';
sendWsMessage({{ type: 'status_update', clientId: clientId, status: status}});
}};
aceEditorInstance.on('focus', focusListener);
aceEditorInstance.on('blur', blurListener);
aceEditorInstance._focusListener = focusListener; // Store reference
aceEditorInstance._blurListener = blurListener; // Store reference
editorChangeListenerAttached = true; // This flag is now less critical with listener removal/re-attachment
console.log('Ace editor listeners attached successfully.');
editorSetupAttempts = 0; // Reset attempts on success
if(editorSetupTimeout) {{ clearTimeout(editorSetupTimeout); editorSetupTimeout = null; }}
// Update the tracked issue ID after the editor is set up
updateTrackedIssueId();
}} catch (e) {{
console.error('Failed to initialize Ace editor instance or attach listeners:', e);
// Clean up potential partial setup
aceEditorInstance = null; // Ensure reference is nullified on failure
editorChangeListenerAttached = false; // Allow retry
// Retry setup if max attempts not reached
if (editorSetupAttempts < maxEditorSetupAttempts) {{
console.warn(`Retrying editor setup after error (${{editorSetupAttempts + 1}}/${{maxEditorSetupAttempts}})...`);
editorSetupAttempts++;
editorSetupTimeout = setTimeout(setupCodeEditorListener, 2000 + Math.random() * 500);
}} else {{
console.error("Max editor setup attempts failed after error. Code editor collaboration disabled.");
updateStatusBar("Code editor setup failed repeatedly.", true);
}}
}}
}
// Function to update the internally tracked issue ID
function updateTrackedIssueId() {{
const newIssueId = getSelectedIssueId(); // Use the reliable hidden input method
if (newIssueId !== currentIssueId) {{
console.log(`Updating tracked issue ID: from ${{currentIssueId}} to ${{newIssueId}}`);
currentIssueId = newIssueId;
// Update collaborator status on issue change
const status = currentIssueId !== null ? `Viewing Issue #${{currentIssueId}}` : 'Idle';
sendWsMessage({{ type: 'status_update', clientId: clientId, status: status}});
// If a new issue is selected and editor wasn't ready, try setting it up
if (currentIssueId !== null && !aceEditorInstance) {{
console.log("New issue selected, triggering editor setup check.");
// Clear any pending setup timeout and try again soon
if(editorSetupTimeout) clearTimeout(editorSetupTimeout);
editorSetupTimeout = setTimeout(setupCodeEditorListener, 250);
}} else if (currentIssueId === null) {{
// If deselected, ensure status is updated.
console.log("Issue deselected.");
}}
}}
}
// --- MutationObserver to detect UI changes ---
// Observe the main Gradio container for changes that might indicate
// a new issue selection or editor component updates.
const observerTargetNode = document.querySelector('.gradio-container'); // Observe the main container
// Watch for value changes on inputs and childList/subtree changes
const observerConfig = {{ childList: true, subtree: true, attributes: true, attributeFilter: ['value'] }};
const observerCallback = (mutationsList, observer) => {{
let issueIdCheckNeeded = false;
let editorSetupCheckNeeded = false;
for (const mutation of mutationsList) {{
// Check if the hidden input for selected issue ID changed value
if (mutation.type === 'attributes' && mutation.attributeName === 'value' &&
mutation.target.matches('#selected_issue_id_hidden input[type="hidden"]')) {{
// console.debug("Detected change in hidden issue ID input value.");
issueIdCheckNeeded = true;
}}
// Check if the code editor component or its contents changed
// Look for changes *within* the editor container or the container itself
if (mutation.target.id === 'code_editor_component' || mutation.target.closest('#code_editor_component')) {{
// console.debug("Detected change within code editor component subtree.");
editorSetupCheckNeeded = true;
}}
// Also check childList changes on the main Gradio container, as this
// might indicate components being added/removed or updated.
if (mutation.type === 'childList' && mutation.target === observerTargetNode) {{
// console.debug("Detected child list change in main gradio container.");
issueIdCheckNeeded = true; // Issue list/selection might have changed
editorSetupCheckNeeded = true; // Editor component might have been re-rendered
}}
}}
if (issueIdCheckNeeded) {{
// Debounce issue ID check
clearTimeout(window.issueIdCheckTimeout);
window.issueIdCheckTimeout = setTimeout(updateTrackedIssueId, 50); // Short debounce
}}
if (editorSetupCheckNeeded) {{
// Debounce editor setup check
clearTimeout(editorSetupTimeout); // Clear the internal editor setup timeout
editorSetupTimeout = setTimeout(setupCodeEditorListener, 100); // Short debounce
}}
}};
// Start observing
if (observerTargetNode) {{
console.log("Starting MutationObserver to detect Gradio UI changes.");
const observer = new MutationObserver(observerCallback);
observer.observe(observerTargetNode, observerConfig);
// Store observer globally if needed for disconnect later
window.gradioUIObserver = observer;
}} else {{
console.error("Could not find observer target node (.gradio-container).");
updateStatusBar("UI monitoring failed. Collaboration features may be unstable.", true);
}}
// --- Initialization ---
connectWebSocket(); // Start connecting WebSocket on page load
// Initial checks after a short delay to allow Gradio UI to render
// Use requestAnimationFrame or a slightly longer timeout for initial setup
// to ensure the DOM is ready.
requestAnimationFrame(() => {{
setTimeout(updateTrackedIssueId, 500);
setTimeout(setupCodeEditorListener, 700);
}});
// Add event listener for the copy patch button
const copyPatchButton = document.getElementById('copy_patch_btn');
if (copyPatchButton) {{
copyPatchButton.addEventListener('click', function() {{
const aiOutput = document.getElementById('ai_output_md');
if (aiOutput) {{
// Get the text content of the AI output Markdown block
const text = aiOutput.textContent || aiOutput.innerText;
// Use regex to find the content within the ```diff ``` block
// FIX: Escape the backslash in the regex for newline
const diffMatch = text.match(/```diff\\n(.*?)```/s);
if (diffMatch && diffMatch[1]) {{
navigator.clipboard.writeText(diffMatch[1].trim()).then(() => {{
console.log("Patch copied to clipboard.");
// Optional: Provide visual feedback
copyPatchButton.textContent = "✅ Copied!";
setTimeout(() => {{ copyPatchButton.textContent = "📋 Copy Patch"; }}, 2000);
}}).catch(err => {{
console.error("Failed to copy patch:", err);
copyPatchButton.textContent = "❌ Failed";
setTimeout(() => {{ copyPatchButton.textContent = "📋 Copy Patch"; }}, 2000);
}});
}} else {{
console.warn("No diff block found in AI output to copy.");
copyPatchButton.textContent = "❓ No Patch";
setTimeout(() => {{ copyPatchButton.textContent = "📋 Copy Patch"; }}, 2000);
}}
}}
}});
}} else {{
console.warn("Copy patch button element not found.");
}}
}})(); // End IIFE
</script>
"""
# The _js parameter injects the JavaScript code into the Gradio page
demo_app.load(_js=web_socket_js(WS_PORT, GRADIO_PORT), fn=None, inputs=None, outputs=None)
return demo_app
# ========== WebSocket Server Logic ==========
async def handle_ws_connection(websocket: WebSocketServerProtocol, path: str, manager: IssueManager):
"""Handles incoming WebSocket connections and messages for collaboration."""
# Generate a client ID and attach it to the websocket object
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}"
setattr(websocket, 'client_id', client_id)
remote_addr = websocket.remote_address
logger.info(f"WebSocket client connected: {remote_addr} assigned ID {client_id}")
# Add the new client to the list of active clients
manager.ws_clients.append(websocket)
logger.info(f"Client list size: {len(manager.ws_clients)}")
try:
# Wait for the first message (expected to be 'join') or other messages
async for message in websocket:
try:
# Ensure message is bytes or string before decoding/parsing
if isinstance(message, bytes):
message = message.decode('utf-8')
if not isinstance(message, str):
logger.warning(f"Received non-string/bytes message from {client_id}: {message!r}")
continue
data = json.loads(message)
msg_type = data.get("type")
# Use the client_id assigned by the server, not one sent by the client, for security
sender_id = client_id
logger.debug(f"Received WS message type '{msg_type}' from {sender_id} ({remote_addr})")
if msg_type == "join":
# Store collaborator info when they explicitly join
# Use the name provided by the client, default if not provided
client_name = data.get("name", f"User_{sender_id[:4]}")
# Ensure name is a string and not too long
client_name = str(client_name)[:50] if client_name else f"User_{sender_id[:4]}"
if sender_id in manager.collaborators:
# Update existing entry if client reconnects or sends join again
manager.collaborators[sender_id].update({"name": client_name, "status": "Connected"})
logger.info(f"Client {sender_id} ({client_name}) updated status to Connected.")
else:
# Add new entry
manager.collaborators[sender_id] = {"name": client_name, "status": "Connected"}
logger.info(f"Client {sender_id} ({client_name}) joined collaboration. Current collaborators: {list(manager.collaborators.keys())}")
# Broadcast updated status list to all clients
await manager.broadcast_collaboration_status_once()
elif msg_type == "code_update":
issue_num = data.get("issue_num")
delta_str = data.get("delta")
# Ensure data is valid and sender ID matches
if issue_num is not None and delta_str is not None and sender_id == client_id:
# FIX: Corrected call - handle_ws_connection is already async, just await the async manager method
await manager.handle_code_editor_update(int(issue_num), delta_str, sender_id)
else:
logger.warning(f"Invalid or unauthorized 'code_update' message from {sender_id}: Missing issue_num/delta or sender mismatch. Data: {str(data)[:200]}")
elif msg_type == "status_update":
status = data.get("status", "Idle")
# Only update status for the client ID that sent the message
if sender_id == client_id:
# Ensure status is a string and not too long
status = str(status)[:100] if status else "Idle"
if sender_id in manager.collaborators:
manager.collaborators[sender_id]["status"] = status
# Broadcast updated status list
await manager.broadcast_collaboration_status_once()
else:
# This might happen if 'join' wasn't received first, or state is out of sync
logger.warning(f"Received status update from client {sender_id} not in collaborator list. Adding/Updating with default name.")
manager.collaborators[sender_id] = {"name": f"User_{sender_id[:4]} (Re-added)", "status": status}
await manager.broadcast_collaboration_status_once()
else:
logger.warning(f"Unauthorized status update from {sender_id} attempting to update status for another client. Ignoring.")
else:
logger.warning(f"Unknown WebSocket message type '{msg_type}' received from {sender_id} ({remote_addr}). Message: {str(message)[:200]}")
except json.JSONDecodeError:
logger.error(f"Received invalid JSON over WebSocket from {client_id} ({remote_addr}): {str(message)[:200]}...")
except Exception as e:
logger.exception(f"Error processing WebSocket message from {client_id} ({remote_addr}): {e}")
# Catch standard socket exceptions for disconnects
except (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError, WebSocketException) as e:
logger.info(f"WebSocket client {client_id} ({remote_addr}) disconnected: Type={type(e).__name__}, Code={getattr(e, 'code', 'N/A')}, Reason='{getattr(e, 'reason', 'N/A')}'")
except Exception as e:
logger.exception(f"Unexpected error in WebSocket handler for {client_id} ({remote_addr}): {e}")
finally:
# Ensure cleanup happens regardless of how the loop exits
logger.info(f"Cleaning up connection for client {client_id} ({remote_addr})")
# Pass the websocket object itself for removal
# Schedule this in the main loop if not already in it
if manager.main_loop.is_running():
manager.main_loop.call_soon_threadsafe(manager.remove_ws_client, websocket)
else:
logger.warning("Main loop not running, cannot schedule final client removal.")
async def start_websocket_server(manager: IssueManager, port: int):
"""Starts the WebSocket server."""
# The handler needs the manager instance
handler_with_manager = lambda ws, path: handle_ws_connection(ws, path, manager)
server = None
# Use a Future to signal when the server should stop
stop_event = asyncio.Future()
# Add a method to signal the server to stop from outside the async function
# This is needed for graceful shutdown from the main thread
manager.stop_ws_server = lambda: stop_event.set_result(True) if not stop_event.done() else None
try:
# Start the websockets server
server = await websockets.serve(
handler_with_manager,
"0.0.0.0", # Listen on all interfaces
port,
ping_interval=20, # Send ping every 20 seconds
ping_timeout=20 # Close connection if no pong received within 20 seconds
)
logger.info(f"WebSocket server started successfully on ws://0.0.0.0:{port}")
# Wait until the stop_event is set (signaled from main thread or shutdown)
await stop_event
except OSError as e:
logger.error(f"Failed to start WebSocket server on port {port}: {e}. Is the port already in use?")
# Signal the main loop to stop gracefully if possible, or re-raise
# If this happens during startup, main thread will catch it and trigger shutdown.
# No need to explicitly call shutdown_handler here, the main thread's except/finally will handle it.
raise SystemExit(f"WebSocket Port {port} unavailable. Application cannot start.") from e
except asyncio.CancelledError:
logger.info("WebSocket server task cancelled.")
except Exception as e:
logger.exception(f"An unexpected error occurred starting or running the WebSocket server: {e}")
# Ensure the stop event is set so the await completes
if not stop_event.done(): stop_event.set_result(True)
raise # Re-raise to potentially stop the main loop
finally:
if server:
logger.info("Attempting to stop WebSocket server...")
server.close() # Signal server to close
await server.wait_closed() # Wait for server to finish closing connections
logger.info("WebSocket server stopped.")
# Ensure the stop event is completed even if there was an error before await
if not stop_event.done():
stop_event.set_result(True)
def run_webhook_server(manager: IssueManager, port: int, main_loop: asyncio.AbstractEventLoop):
"""Starts the HTTP webhook server in a separate thread."""
# Pass the manager instance and the main asyncio loop reference to the handler class
WebhookHandler.manager_instance = manager
WebhookHandler.main_loop = main_loop
httpd = None
# Add a method to signal the webhook server thread to stop
# This is needed for graceful shutdown from the main thread
# Use a simple flag and check it periodically, or rely on httpd.shutdown()
# httpd.shutdown() is the standard way for BaseHTTPRequestHandler servers
manager.stop_webhook_server = lambda: httpd.shutdown() if httpd else None
try:
server_address = ("0.0.0.0", port)
# Create the HTTP server instance
httpd = HTTPServer(server_address, WebhookHandler)
logger.info(f"Webhook HTTP server starting on http://0.0.0.0:{port}")
# Start serving requests (this call blocks the thread)
httpd.serve_forever()
except OSError as e:
logger.error(f"Failed to start Webhook server on port {port}: {e}. Is the port already in use?")
# If the server fails to start, signal the main loop to stop
if main_loop.is_running():
# Use call_soon_threadsafe as this is in a different thread
main_loop.call_soon_threadsafe(main_loop.stop)
except Exception as e:
logger.exception(f"Unexpected error in Webhook server thread: {e}")
# If an unexpected error occurs, signal the main loop to stop
if main_loop.is_running():
main_loop.call_soon_threadsafe(main_loop.stop)
finally:
if httpd:
logger.info("Shutting down Webhook HTTP server...")
# This method stops the serve_forever() loop
# It needs to be called from a different thread than the one running serve_forever()
# The manager.stop_webhook_server lambda is designed for this.
# If this finally block is reached due to an exception *within* serve_forever(),
# httpd.shutdown() might not be needed or might fail, but calling it is safer.
try:
httpd.shutdown() # Signal the server to stop accepting new connections and finish current ones
except Exception as e:
logger.warning(f"Error during httpd.shutdown(): {e}")
try:
httpd.server_close() # Close the server socket
except Exception as e:
logger.warning(f"Error during httpd.server_close(): {e}")
logger.info("Webhook server thread finished.")
# ========== Main Execution ==========
if __name__ == "__main__":
print("--- Acknowledging potential TensorFlow/CUDA warnings ---")
print("If you see warnings like 'Could not load dynamic library...' or related to CUDA/GPU,")
print("they are often harmless if you are not using a local GPU-accelerated model.")
print("Hugging Face Inference API calls run remotely and do not require local GPU setup.")
print("--- Starting Application ---")
# Get or create the event loop for the main thread
# This loop will run the async tasks (WS server, idle tasks, broadcast)
try:
loop = asyncio.get_event_loop()
logger.info(f"Using existing asyncio loop in main thread: {id(loop)}")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.info(f"Created and set new asyncio loop in main thread: {id(loop)}")
manager = IssueManager()
# Start the webhook server in a separate thread
webhook_thread = threading.Thread(
target=run_webhook_server,
args=(manager, WEBHOOK_PORT, loop), # Pass the main loop to the webhook thread
name="WebhookServerThread",
daemon=True # Allow the main program to exit even if this thread is running
)
webhook_thread.start()
# Create the Gradio UI
app = create_ui(manager)
# Start the WebSocket server as an asyncio task in the main loop
ws_task = loop.create_task(start_websocket_server(manager, WS_PORT))
# Start the manager's background tasks (idle processing, broadcast)
# These tasks are created in the manager's loop (which is the main loop here)
manager.start_idle_processing()
manager.start_broadcast_loop()
def shutdown_handler(signum, frame):
"""Graceful shutdown logic."""
logger.info(f"Signal {signum} received. Initiating shutdown...")
# Signal the WebSocket server to stop
if hasattr(manager, 'stop_ws_server') and manager.stop_ws_server:
manager.stop_ws_server()
# Signal the webhook server to stop
if hasattr(manager, 'stop_webhook_server') and manager.stop_webhook_server:
manager.stop_webhook_server()
# Stop manager's internal async tasks
manager.stop_idle_processing()
manager.stop_broadcast_loop()
# Stop the main asyncio loop
if loop.is_running():
logger.info("Stopping main asyncio loop...")
# Use call_soon_threadsafe because signal handlers run in a different context
loop.call_soon_threadsafe(loop.stop)
else:
logger.warning("Main asyncio loop not running during shutdown handler.")
# Add signal handlers for graceful shutdown (e.g., Ctrl+C)
try:
# For Unix-like systems
loop.add_signal_handler(signal.SIGINT, shutdown_handler, signal.SIGINT, None)
loop.add_signal_handler(signal.SIGTERM, shutdown_handler, signal.SIGTERM, None)
logger.info("Added signal handlers for SIGINT and SIGTERM.")
except NotImplementedError:
# Signal handlers are not available on Windows
logger.warning("Signal handlers not available on this platform (likely Windows). Ctrl+C may not be graceful.")
# On Windows, KeyboardInterrupt is usually caught directly in the main thread
# Launch Gradio app
# Use prevent_thread_lock=True to run Gradio server in a separate thread,
# freeing the main thread to run the asyncio loop.
try:
logger.info(f"Launching Gradio app on port {GRADIO_PORT}...")
app.launch(
server_name="0.0.0.0",
server_port=GRADIO_PORT,
share=False, # Set to True to share publicly (requires auth token usually)
debug=True, # Keep debug=True for development logs
inbrowser=True,
prevent_thread_lock=True # Run Gradio server in a separate thread
)
logger.info("Gradio app launched in separate thread.")
# Run the main asyncio loop indefinitely to keep async tasks running
logger.info("Running main asyncio loop...")
loop.run_forever() # This blocks the main thread until loop.stop() is called
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received in main thread.")
# If signal handlers are not implemented (e.g., Windows), KeyboardInterrupt
# is caught here. Trigger shutdown manually.
# Check if signal handlers were likely not registered by checking if signal.SIGINT exists
if not hasattr(signal, 'SIGINT'):
shutdown_handler(None, None) # Call shutdown logic
except SystemExit as e:
logger.error(f"SystemExit received: {e}")
# SystemExit might be raised by port binding errors etc.
# The source of SystemExit might have already triggered loop.stop() or shutdown.
# Ensure cleanup happens if not already started.
# Check if the loop is still running; if so, stop it.
if loop.is_running():
logger.info("SystemExit caught before loop stopped, triggering shutdown.")
shutdown_handler(None, None)
else:
logger.info("SystemExit caught, but loop already stopped. Proceeding with final cleanup.")
except Exception as e:
logger.exception(f"An unexpected error occurred in the main thread: {e}")
# Trigger graceful shutdown on unexpected error
shutdown_handler(None, None)
finally:
logger.info("Main thread exiting finally block.")
# Wait for background tasks/threads to complete shutdown...
logger.info("Running loop briefly to complete pending tasks (e.g., cancellations)...")
try:
# Gather remaining tasks (like ws_task cancellation, idle_task cancellation)
# Exclude the current task if inside a task (though we are in the main thread here)
# asyncio.all_tasks() returns tasks for the *current* thread's loop
pending_tasks = [task for task in asyncio.all_tasks(loop=loop) if not task.done()]
if pending_tasks:
# Use a timeout for waiting for tasks to finish
try:
# Wait for pending tasks, ignoring exceptions during shutdown
loop.run_until_complete(asyncio.wait(pending_tasks, timeout=5, return_when=asyncio.ALL_COMPLETED))
logger.info(f"Completed pending tasks.")
except asyncio.TimeoutError:
logger.warning(f"Timed out waiting for {len(pending_tasks)} pending tasks to complete.")
except Exception as e:
logger.error(f"Error during final loop run_until_complete: {e}")
else:
logger.info("No pending tasks to complete.")
except Exception as e:
logger.error(f"Error checking pending tasks: {e}")
# Wait for the webhook thread to join (if it's not daemon, or if shutdown() needs time)
# Since it's daemon, it will be killed, but explicit shutdown is better.
# Add a small timeout for join in case shutdown() hangs.
# Check if the thread is still alive before joining
if webhook_thread.is_alive():
logger.info("Waiting for webhook thread to join...")
webhook_thread.join(timeout=2) # Wait up to 2 seconds
# Close the loop
if not loop.is_closed():
logger.info("Closing asyncio loop.")
loop.close()
logger.info("Application shutdown complete.")