Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import aiohttp | |
import os | |
from datetime import datetime, timedelta | |
from groq import Groq | |
from dotenv import load_dotenv | |
import logging | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
from functools import partial | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
# Configuration | |
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "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") | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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") | |
REPOSITORIES = [ | |
"SigmaHQ/sigma", | |
] | |
DAYS_BACK = 1 | |
TIMEOUT = 10 # seconds | |
# GitHub API base URL | |
GITHUB_API_URL = "https://api.github.com" | |
# Groq client setup | |
try: | |
groq_client = Groq(api_key=GROQ_API_KEY) | |
except Exception as e: | |
logger.error(f"Failed to initialize Groq client: {e}") | |
groq_client = None | |
# FastAPI app | |
app = FastAPI(docs_url=None, redoc_url=None) | |
class RepositoryDetails(BaseModel): | |
repo_name: str | |
repo_url: str | |
changes: str | |
description: str | |
context: str | |
async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> list: | |
"""Asynchronous HTTP GET request with timeout.""" | |
try: | |
async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response: | |
response.raise_for_status() | |
data = await response.json() | |
return data if isinstance(data, list) else [] | |
except aiohttp.ClientError as e: | |
logger.error(f"Error fetching {url}: {e}") | |
return [] | |
except Exception as e: | |
logger.error(f"Unexpected error fetching {url}: {e}") | |
return [] | |
async def fetch_repository_changes(repo: str, days_back: int) -> list[str]: | |
"""Fetch recent commits and pull requests for a repository asynchronously.""" | |
logger.debug(f"Fetching changes for repository: {repo}") | |
since_date = (datetime.now() - timedelta(days=days_back)).isoformat() | |
headers = { | |
"Authorization": f"token {GITHUB_TOKEN}", | |
"Accept": "application/vnd.github.v3+json" | |
} | |
async with aiohttp.ClientSession() as session: | |
# Fetch commits | |
commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits" | |
commits_params = {"since": since_date, "per_page": 100} | |
logger.debug(f"Fetching commits from: {commits_url}") | |
commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session) | |
logger.debug(f"Found {len(commits)} commits for {repo}") | |
# Fetch pull requests | |
prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls" | |
prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100} | |
logger.debug(f"Fetching pull requests from: {prs_url}") | |
prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session) | |
logger.debug(f"Found {len(prs)} pull requests for {repo}") | |
# Extract changes | |
changes = [] | |
try: | |
for commit in commits: | |
if "commit" in commit and "message" in commit["commit"]: | |
changes.append(f"Commit: {commit['commit']['message']}") | |
except (TypeError, KeyError) as e: | |
logger.error(f"Error processing commits for {repo}: {e}") | |
cutoff_date = datetime.now() - timedelta(days=days_back) | |
try: | |
for pr in prs: | |
if "updated_at" in pr and pr["updated_at"]: | |
updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ") | |
if updated_at >= cutoff_date: | |
changes.append(f"PR: {pr.get('title', 'No title')} - {pr.get('body', 'No description')}") | |
except (TypeError, KeyError, ValueError) as e: | |
logger.error(f"Error processing PRs for {repo}: {e}") | |
logger.debug(f"Total changes for {repo}: {len(changes)}") | |
return changes | |
def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict: | |
"""Use Groq's DeepSeek model to summarize changes.""" | |
if not groq_client: | |
return {"description": "Groq client not initialized.", "context": "API unavailable."} | |
try: | |
logger.debug(f"Summarizing changes for repository: {repo}") | |
prompt = f""" | |
Analyze the following changes made to detection rules in the GitHub repository {repo}: | |
{', '.join(changes[:50])} | |
Provide a detailed response with two sections: | |
- Description: Summarize what changes were made. | |
- Context: Explain why these changes might be required. | |
""" | |
logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...") | |
response = groq_client.chat.completions.create( | |
model="deepseek-chat", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=500, | |
temperature=0.7 | |
) | |
summary = response.choices[0].message.content | |
logger.debug(f"Received summary from DeepSeek: {summary[:100]}...") | |
# Extract description and context with fallback | |
description = "Description not found." | |
context = "Context not found." | |
if "Description:" in summary and "Context:" in summary: | |
description = summary.split("Description:")[1].split("Context:")[0].strip() | |
context = summary.split("Context:")[1].strip() | |
else: | |
description = summary | |
return {"description": description, "context": context} | |
except Exception as e: | |
logger.error(f"Error summarizing changes for {repo}: {e}") | |
return {"description": "Error occurred during summarization.", "context": str(e)} | |
async def process_repository(repo: str, days_back: int) -> RepositoryDetails: | |
"""Process a single repository and return its details.""" | |
try: | |
changes = await fetch_repository_changes(repo, days_back) | |
if changes: | |
summary = await asyncio.get_event_loop().run_in_executor( | |
None, partial(summarize_changes_with_deepseek, repo, changes) | |
) | |
return RepositoryDetails( | |
repo_name=f"{repo} (+{len(changes)})", | |
repo_url=f"https://github.com/{repo}", | |
changes="\n".join(changes), | |
description=summary["description"], | |
context=summary["context"] | |
) | |
else: | |
return RepositoryDetails( | |
repo_name=f"{repo} (No changes)", | |
repo_url=f"https://github.com/{repo}", | |
changes=f"No changes detected in the last {days_back} day(s).", | |
description="No changes detected.", | |
context="No context available." | |
) | |
except Exception as e: | |
logger.error(f"Error processing repository {repo}: {e}") | |
return RepositoryDetails( | |
repo_name=f"{repo} (Error)", | |
repo_url=f"https://github.com/{repo}", | |
changes="Error occurred.", | |
description=str(e), | |
context="Processing failed." | |
) | |
async def monitor_repositories(): | |
"""Single API endpoint to fetch and summarize changes for all repositories.""" | |
logger.debug("Starting to monitor repositories") | |
tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Filter out exceptions and log them | |
final_results = [] | |
for result in results: | |
if isinstance(result, Exception): | |
logger.error(f"Task failed with exception: {result}") | |
continue | |
final_results.append(result) | |
logger.debug(f"Finished monitoring repositories. Processed {len(final_results)} results.") | |
return final_results | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |