from fastapi import FastAPI, HTTPException from pydantic import BaseModel import aiohttp import os from datetime import datetime, timedelta from groq import Groq from dotenv import load_dotenv import logging import asyncio from concurrent.futures import ThreadPoolExecutor from functools import partial # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Configuration GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "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") GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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") REPOSITORIES = [ "SigmaHQ/sigma", ] DAYS_BACK = 1 TIMEOUT = 10 # seconds # GitHub API base URL GITHUB_API_URL = "https://api.github.com" # Groq client setup try: groq_client = Groq(api_key=GROQ_API_KEY) except Exception as e: logger.error(f"Failed to initialize Groq client: {e}") groq_client = None # FastAPI app app = FastAPI(docs_url=None, redoc_url=None) class RepositoryDetails(BaseModel): repo_name: str repo_url: str changes: str description: str context: str async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> list: """Asynchronous HTTP GET request with timeout.""" try: async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response: response.raise_for_status() data = await response.json() return data if isinstance(data, list) else [] except aiohttp.ClientError as e: logger.error(f"Error fetching {url}: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching {url}: {e}") return [] async def fetch_repository_changes(repo: str, days_back: int) -> list[str]: """Fetch recent commits and pull requests for a repository asynchronously.""" logger.debug(f"Fetching changes for repository: {repo}") since_date = (datetime.now() - timedelta(days=days_back)).isoformat() headers = { "Authorization": f"token {GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } async with aiohttp.ClientSession() as session: # Fetch commits commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits" commits_params = {"since": since_date, "per_page": 100} logger.debug(f"Fetching commits from: {commits_url}") commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session) logger.debug(f"Found {len(commits)} commits for {repo}") # Fetch pull requests prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls" prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100} logger.debug(f"Fetching pull requests from: {prs_url}") prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session) logger.debug(f"Found {len(prs)} pull requests for {repo}") # Extract changes changes = [] try: for commit in commits: if "commit" in commit and "message" in commit["commit"]: changes.append(f"Commit: {commit['commit']['message']}") except (TypeError, KeyError) as e: logger.error(f"Error processing commits for {repo}: {e}") cutoff_date = datetime.now() - timedelta(days=days_back) try: for pr in prs: if "updated_at" in pr and pr["updated_at"]: updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ") if updated_at >= cutoff_date: changes.append(f"PR: {pr.get('title', 'No title')} - {pr.get('body', 'No description')}") except (TypeError, KeyError, ValueError) as e: logger.error(f"Error processing PRs for {repo}: {e}") logger.debug(f"Total changes for {repo}: {len(changes)}") return changes def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict: """Use Groq's DeepSeek model to summarize changes.""" if not groq_client: return {"description": "Groq client not initialized.", "context": "API unavailable."} try: logger.debug(f"Summarizing changes for repository: {repo}") prompt = f""" Analyze the following changes made to detection rules in the GitHub repository {repo}: {', '.join(changes[:50])} Provide a detailed response with two sections: - Description: Summarize what changes were made. - Context: Explain why these changes might be required. """ logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...") response = groq_client.chat.completions.create( model="deepseek-chat", messages=[{"role": "user", "content": prompt}], max_tokens=500, temperature=0.7 ) summary = response.choices[0].message.content logger.debug(f"Received summary from DeepSeek: {summary[:100]}...") # Extract description and context with fallback description = "Description not found." context = "Context not found." if "Description:" in summary and "Context:" in summary: description = summary.split("Description:")[1].split("Context:")[0].strip() context = summary.split("Context:")[1].strip() else: description = summary return {"description": description, "context": context} except Exception as e: logger.error(f"Error summarizing changes for {repo}: {e}") return {"description": "Error occurred during summarization.", "context": str(e)} async def process_repository(repo: str, days_back: int) -> RepositoryDetails: """Process a single repository and return its details.""" try: changes = await fetch_repository_changes(repo, days_back) if changes: summary = await asyncio.get_event_loop().run_in_executor( None, partial(summarize_changes_with_deepseek, repo, changes) ) return RepositoryDetails( repo_name=f"{repo} (+{len(changes)})", repo_url=f"https://github.com/{repo}", changes="\n".join(changes), description=summary["description"], context=summary["context"] ) else: return RepositoryDetails( repo_name=f"{repo} (No changes)", repo_url=f"https://github.com/{repo}", changes=f"No changes detected in the last {days_back} day(s).", description="No changes detected.", context="No context available." ) except Exception as e: logger.error(f"Error processing repository {repo}: {e}") return RepositoryDetails( repo_name=f"{repo} (Error)", repo_url=f"https://github.com/{repo}", changes="Error occurred.", description=str(e), context="Processing failed." ) @app.get("/monitor", response_model=list[RepositoryDetails]) async def monitor_repositories(): """Single API endpoint to fetch and summarize changes for all repositories.""" logger.debug("Starting to monitor repositories") tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions and log them final_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Task failed with exception: {result}") continue final_results.append(result) logger.debug(f"Finished monitoring repositories. Processed {len(final_results)} results.") return final_results if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)