DE / app.py
rajrakeshdr's picture
Update app.py
c84fb78 verified
raw
history blame
8 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import aiohttp
import os
from datetime import datetime, timedelta
from groq import Groq
from dotenv import load_dotenv
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Configuration
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "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")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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")
REPOSITORIES = [
"SigmaHQ/sigma",
]
DAYS_BACK = 1
TIMEOUT = 10 # seconds
# GitHub API base URL
GITHUB_API_URL = "https://api.github.com"
# Groq client setup
try:
groq_client = Groq(api_key=GROQ_API_KEY)
except Exception as e:
logger.error(f"Failed to initialize Groq client: {e}")
groq_client = None
# FastAPI app
app = FastAPI(docs_url=None, redoc_url=None)
class RepositoryDetails(BaseModel):
repo_name: str
repo_url: str
changes: str
description: str
context: str
async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> list:
"""Asynchronous HTTP GET request with timeout."""
try:
async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response:
response.raise_for_status()
data = await response.json()
return data if isinstance(data, list) else []
except aiohttp.ClientError as e:
logger.error(f"Error fetching {url}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching {url}: {e}")
return []
async def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
"""Fetch recent commits and pull requests for a repository asynchronously."""
logger.debug(f"Fetching changes for repository: {repo}")
since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
async with aiohttp.ClientSession() as session:
# Fetch commits
commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
commits_params = {"since": since_date, "per_page": 100}
logger.debug(f"Fetching commits from: {commits_url}")
commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session)
logger.debug(f"Found {len(commits)} commits for {repo}")
# Fetch pull requests
prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100}
logger.debug(f"Fetching pull requests from: {prs_url}")
prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session)
logger.debug(f"Found {len(prs)} pull requests for {repo}")
# Extract changes
changes = []
try:
for commit in commits:
if "commit" in commit and "message" in commit["commit"]:
changes.append(f"Commit: {commit['commit']['message']}")
except (TypeError, KeyError) as e:
logger.error(f"Error processing commits for {repo}: {e}")
cutoff_date = datetime.now() - timedelta(days=days_back)
try:
for pr in prs:
if "updated_at" in pr and pr["updated_at"]:
updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
if updated_at >= cutoff_date:
changes.append(f"PR: {pr.get('title', 'No title')} - {pr.get('body', 'No description')}")
except (TypeError, KeyError, ValueError) as e:
logger.error(f"Error processing PRs for {repo}: {e}")
logger.debug(f"Total changes for {repo}: {len(changes)}")
return changes
def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
"""Use Groq's DeepSeek model to summarize changes."""
if not groq_client:
return {"description": "Groq client not initialized.", "context": "API unavailable."}
try:
logger.debug(f"Summarizing changes for repository: {repo}")
prompt = f"""
Analyze the following changes made to detection rules in the GitHub repository {repo}:
{', '.join(changes[:50])}
Provide a detailed response with two sections:
- Description: Summarize what changes were made.
- Context: Explain why these changes might be required.
"""
logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...")
response = groq_client.chat.completions.create(
model="deepseek-chat",
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
temperature=0.7
)
summary = response.choices[0].message.content
logger.debug(f"Received summary from DeepSeek: {summary[:100]}...")
# Extract description and context with fallback
description = "Description not found."
context = "Context not found."
if "Description:" in summary and "Context:" in summary:
description = summary.split("Description:")[1].split("Context:")[0].strip()
context = summary.split("Context:")[1].strip()
else:
description = summary
return {"description": description, "context": context}
except Exception as e:
logger.error(f"Error summarizing changes for {repo}: {e}")
return {"description": "Error occurred during summarization.", "context": str(e)}
async def process_repository(repo: str, days_back: int) -> RepositoryDetails:
"""Process a single repository and return its details."""
try:
changes = await fetch_repository_changes(repo, days_back)
if changes:
summary = await asyncio.get_event_loop().run_in_executor(
None, partial(summarize_changes_with_deepseek, repo, changes)
)
return RepositoryDetails(
repo_name=f"{repo} (+{len(changes)})",
repo_url=f"https://github.com/{repo}",
changes="\n".join(changes),
description=summary["description"],
context=summary["context"]
)
else:
return RepositoryDetails(
repo_name=f"{repo} (No changes)",
repo_url=f"https://github.com/{repo}",
changes=f"No changes detected in the last {days_back} day(s).",
description="No changes detected.",
context="No context available."
)
except Exception as e:
logger.error(f"Error processing repository {repo}: {e}")
return RepositoryDetails(
repo_name=f"{repo} (Error)",
repo_url=f"https://github.com/{repo}",
changes="Error occurred.",
description=str(e),
context="Processing failed."
)
@app.get("/monitor", response_model=list[RepositoryDetails])
async def monitor_repositories():
"""Single API endpoint to fetch and summarize changes for all repositories."""
logger.debug("Starting to monitor repositories")
tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and log them
final_results = []
for result in results:
if isinstance(result, Exception):
logger.error(f"Task failed with exception: {result}")
continue
final_results.append(result)
logger.debug(f"Finished monitoring repositories. Processed {len(final_results)} results.")
return final_results
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)