rajrakeshdr commited on
Commit
d3fd6a8
·
verified ·
1 Parent(s): de3b9e4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +100 -81
app.py CHANGED
@@ -1,13 +1,12 @@
1
  from fastapi import FastAPI, HTTPException
2
  from pydantic import BaseModel
3
- import requests
4
  import os
5
  from datetime import datetime, timedelta
6
  from groq import Groq
7
  from dotenv import load_dotenv
8
  import logging
9
  import asyncio
10
- import aiohttp
11
  from concurrent.futures import ThreadPoolExecutor
12
  from functools import partial
13
 
@@ -35,13 +34,17 @@ REPOSITORIES = [
35
  "SlimKQL/Hunting-Queries-Detection-Rules"
36
  ]
37
  DAYS_BACK = 1
38
- TIMEOUT = 10 # seconds for each request
39
 
40
  # GitHub API base URL
41
  GITHUB_API_URL = "https://api.github.com"
42
 
43
  # Groq client setup
44
- groq_client = Groq(api_key=GROQ_API_KEY)
 
 
 
 
45
 
46
  # FastAPI app
47
  app = FastAPI(docs_url=None, redoc_url=None)
@@ -53,65 +56,76 @@ class RepositoryDetails(BaseModel):
53
  description: str
54
  context: str
55
 
56
- async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> dict:
57
  """Asynchronous HTTP GET request with timeout."""
58
  try:
59
  async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response:
60
  response.raise_for_status()
61
- return await response.json()
62
- except Exception as e:
 
63
  logger.error(f"Error fetching {url}: {e}")
64
  return []
 
 
 
65
 
66
  async def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
67
  """Fetch recent commits and pull requests for a repository asynchronously."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  try:
69
- logger.debug(f"Fetching changes for repository: {repo}")
70
- since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
71
- headers = {
72
- "Authorization": f"token {GITHUB_TOKEN}",
73
- "Accept": "application/vnd.github.v3+json"
74
- }
75
-
76
- async with aiohttp.ClientSession() as session:
77
- # Fetch commits
78
- commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
79
- commits_params = {"since": since_date, "per_page": 100} # Limit to 100 for efficiency
80
- logger.debug(f"Fetching commits from: {commits_url}")
81
- commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session)
82
- logger.debug(f"Found {len(commits)} commits for {repo}")
83
-
84
- # Fetch pull requests
85
- prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
86
- prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100}
87
- logger.debug(f"Fetching pull requests from: {prs_url}")
88
- prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session)
89
- logger.debug(f"Found {len(prs)} pull requests for {repo}")
90
-
91
- # Extract changes
92
- changes = []
93
  for commit in commits:
94
- changes.append(f"Commit: {commit['commit']['message']}")
95
- cutoff_date = datetime.now() - timedelta(days=days_back)
96
- for pr in prs:
97
- updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
98
- if updated_at >= cutoff_date:
99
- changes.append(f"PR: {pr['title']} - {pr['body'] or 'No description'}")
100
 
101
- logger.debug(f"Total changes for {repo}: {len(changes)}")
102
- return changes
 
 
 
 
 
 
 
103
 
104
- except Exception as e:
105
- logger.error(f"Error fetching changes for {repo}: {e}")
106
- return []
107
 
108
  def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
109
- """Use Groq's DeepSeek model to summarize changes (synchronous due to Groq SDK limitations)."""
 
 
 
110
  try:
111
  logger.debug(f"Summarizing changes for repository: {repo}")
112
  prompt = f"""
113
  Analyze the following changes made to detection rules in the GitHub repository {repo}:
114
- {', '.join(changes[:50])} # Limit to 50 changes to avoid token overflow
115
 
116
  Provide a detailed response with two sections:
117
  - Description: Summarize what changes were made.
@@ -140,53 +154,58 @@ def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
140
 
141
  except Exception as e:
142
  logger.error(f"Error summarizing changes for {repo}: {e}")
143
- return {"description": "Error occurred.", "context": "Error occurred."}
144
 
145
  async def process_repository(repo: str, days_back: int) -> RepositoryDetails:
146
  """Process a single repository and return its details."""
147
- changes = await fetch_repository_changes(repo, days_back)
148
- if changes:
149
- summary = await asyncio.get_event_loop().run_in_executor(
150
- None, partial(summarize_changes_with_deepseek, repo, changes)
151
- )
152
- return RepositoryDetails(
153
- repo_name=f"{repo} (+{len(changes)})",
154
- repo_url=f"https://github.com/{repo}",
155
- changes="\n".join(changes),
156
- description=summary["description"],
157
- context=summary["context"]
158
- )
159
- else:
 
 
 
 
 
 
 
 
 
 
160
  return RepositoryDetails(
161
- repo_name=f"{repo} (No changes)",
162
  repo_url=f"https://github.com/{repo}",
163
- changes=f"No changes detected in the last {days_back} day(s).",
164
- description="No changes detected.",
165
- context="No context available."
166
  )
167
 
168
  @app.get("/monitor", response_model=list[RepositoryDetails])
169
  async def monitor_repositories():
170
  """Single API endpoint to fetch and summarize changes for all repositories."""
171
- try:
172
- logger.debug("Starting to monitor repositories")
173
- tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES]
174
- results = await asyncio.gather(*tasks, return_exceptions=True)
175
-
176
- # Handle any exceptions in results
177
- final_results = []
178
- for result in results:
179
- if isinstance(result, Exception):
180
- logger.error(f"Error processing a repository: {result}")
181
- continue
182
- final_results.append(result)
183
-
184
- logger.debug("Finished monitoring repositories")
185
- return final_results
186
-
187
- except Exception as e:
188
- logger.error(f"Error in monitor_repositories: {e}")
189
- raise HTTPException(status_code=500, detail=str(e))
190
 
191
  if __name__ == "__main__":
192
  import uvicorn
 
1
  from fastapi import FastAPI, HTTPException
2
  from pydantic import BaseModel
3
+ import aiohttp
4
  import os
5
  from datetime import datetime, timedelta
6
  from groq import Groq
7
  from dotenv import load_dotenv
8
  import logging
9
  import asyncio
 
10
  from concurrent.futures import ThreadPoolExecutor
11
  from functools import partial
12
 
 
34
  "SlimKQL/Hunting-Queries-Detection-Rules"
35
  ]
36
  DAYS_BACK = 1
37
+ TIMEOUT = 10 # seconds
38
 
39
  # GitHub API base URL
40
  GITHUB_API_URL = "https://api.github.com"
41
 
42
  # Groq client setup
43
+ try:
44
+ groq_client = Groq(api_key=GROQ_API_KEY)
45
+ except Exception as e:
46
+ logger.error(f"Failed to initialize Groq client: {e}")
47
+ groq_client = None
48
 
49
  # FastAPI app
50
  app = FastAPI(docs_url=None, redoc_url=None)
 
56
  description: str
57
  context: str
58
 
59
+ async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> list:
60
  """Asynchronous HTTP GET request with timeout."""
61
  try:
62
  async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response:
63
  response.raise_for_status()
64
+ data = await response.json()
65
+ return data if isinstance(data, list) else []
66
+ except aiohttp.ClientError as e:
67
  logger.error(f"Error fetching {url}: {e}")
68
  return []
69
+ except Exception as e:
70
+ logger.error(f"Unexpected error fetching {url}: {e}")
71
+ return []
72
 
73
  async def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
74
  """Fetch recent commits and pull requests for a repository asynchronously."""
75
+ logger.debug(f"Fetching changes for repository: {repo}")
76
+ since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
77
+ headers = {
78
+ "Authorization": f"token {GITHUB_TOKEN}",
79
+ "Accept": "application/vnd.github.v3+json"
80
+ }
81
+
82
+ async with aiohttp.ClientSession() as session:
83
+ # Fetch commits
84
+ commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
85
+ commits_params = {"since": since_date, "per_page": 100}
86
+ logger.debug(f"Fetching commits from: {commits_url}")
87
+ commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session)
88
+ logger.debug(f"Found {len(commits)} commits for {repo}")
89
+
90
+ # Fetch pull requests
91
+ prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
92
+ prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100}
93
+ logger.debug(f"Fetching pull requests from: {prs_url}")
94
+ prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session)
95
+ logger.debug(f"Found {len(prs)} pull requests for {repo}")
96
+
97
+ # Extract changes
98
+ changes = []
99
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
  for commit in commits:
101
+ if "commit" in commit and "message" in commit["commit"]:
102
+ changes.append(f"Commit: {commit['commit']['message']}")
103
+ except (TypeError, KeyError) as e:
104
+ logger.error(f"Error processing commits for {repo}: {e}")
 
 
105
 
106
+ cutoff_date = datetime.now() - timedelta(days=days_back)
107
+ try:
108
+ for pr in prs:
109
+ if "updated_at" in pr and pr["updated_at"]:
110
+ updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
111
+ if updated_at >= cutoff_date:
112
+ changes.append(f"PR: {pr.get('title', 'No title')} - {pr.get('body', 'No description')}")
113
+ except (TypeError, KeyError, ValueError) as e:
114
+ logger.error(f"Error processing PRs for {repo}: {e}")
115
 
116
+ logger.debug(f"Total changes for {repo}: {len(changes)}")
117
+ return changes
 
118
 
119
  def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
120
+ """Use Groq's DeepSeek model to summarize changes."""
121
+ if not groq_client:
122
+ return {"description": "Groq client not initialized.", "context": "API unavailable."}
123
+
124
  try:
125
  logger.debug(f"Summarizing changes for repository: {repo}")
126
  prompt = f"""
127
  Analyze the following changes made to detection rules in the GitHub repository {repo}:
128
+ {', '.join(changes[:50])}
129
 
130
  Provide a detailed response with two sections:
131
  - Description: Summarize what changes were made.
 
154
 
155
  except Exception as e:
156
  logger.error(f"Error summarizing changes for {repo}: {e}")
157
+ return {"description": "Error occurred during summarization.", "context": str(e)}
158
 
159
  async def process_repository(repo: str, days_back: int) -> RepositoryDetails:
160
  """Process a single repository and return its details."""
161
+ try:
162
+ changes = await fetch_repository_changes(repo, days_back)
163
+ if changes:
164
+ summary = await asyncio.get_event_loop().run_in_executor(
165
+ None, partial(summarize_changes_with_deepseek, repo, changes)
166
+ )
167
+ return RepositoryDetails(
168
+ repo_name=f"{repo} (+{len(changes)})",
169
+ repo_url=f"https://github.com/{repo}",
170
+ changes="\n".join(changes),
171
+ description=summary["description"],
172
+ context=summary["context"]
173
+ )
174
+ else:
175
+ return RepositoryDetails(
176
+ repo_name=f"{repo} (No changes)",
177
+ repo_url=f"https://github.com/{repo}",
178
+ changes=f"No changes detected in the last {days_back} day(s).",
179
+ description="No changes detected.",
180
+ context="No context available."
181
+ )
182
+ except Exception as e:
183
+ logger.error(f"Error processing repository {repo}: {e}")
184
  return RepositoryDetails(
185
+ repo_name=f"{repo} (Error)",
186
  repo_url=f"https://github.com/{repo}",
187
+ changes="Error occurred.",
188
+ description=str(e),
189
+ context="Processing failed."
190
  )
191
 
192
  @app.get("/monitor", response_model=list[RepositoryDetails])
193
  async def monitor_repositories():
194
  """Single API endpoint to fetch and summarize changes for all repositories."""
195
+ logger.debug("Starting to monitor repositories")
196
+ tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES]
197
+ results = await asyncio.gather(*tasks, return_exceptions=True)
198
+
199
+ # Filter out exceptions and log them
200
+ final_results = []
201
+ for result in results:
202
+ if isinstance(result, Exception):
203
+ logger.error(f"Task failed with exception: {result}")
204
+ continue
205
+ final_results.append(result)
206
+
207
+ logger.debug(f"Finished monitoring repositories. Processed {len(final_results)} results.")
208
+ return final_results
 
 
 
 
 
209
 
210
  if __name__ == "__main__":
211
  import uvicorn