rajrakeshdr commited on
Commit
09ab86d
·
verified ·
1 Parent(s): c7eac83

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +78 -62
app.py CHANGED
@@ -6,6 +6,10 @@ from datetime import datetime, timedelta
6
  from groq import Groq
7
  from dotenv import load_dotenv
8
  import logging
 
 
 
 
9
 
10
  # Configure logging
11
  logging.basicConfig(level=logging.DEBUG)
@@ -14,9 +18,9 @@ logger = logging.getLogger(__name__)
14
  # Load environment variables
15
  load_dotenv()
16
 
17
- # Configuration (Use environment variables instead of hardcoding)
18
- GITHUB_TOKEN = "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"
19
- GROQ_API_KEY = "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"
20
  REPOSITORIES = [
21
  "falcosecurity/rules",
22
  "SigmaHQ/sigma",
@@ -31,6 +35,7 @@ REPOSITORIES = [
31
  "SlimKQL/Hunting-Queries-Detection-Rules"
32
  ]
33
  DAYS_BACK = 1
 
34
 
35
  # GitHub API base URL
36
  GITHUB_API_URL = "https://api.github.com"
@@ -48,10 +53,18 @@ class RepositoryDetails(BaseModel):
48
  description: str
49
  context: str
50
 
51
- def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
52
- """
53
- Fetch recent commits and pull requests for a repository.
54
- """
 
 
 
 
 
 
 
 
55
  try:
56
  logger.debug(f"Fetching changes for repository: {repo}")
57
  since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
@@ -60,55 +73,51 @@ def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
60
  "Accept": "application/vnd.github.v3+json"
61
  }
62
 
63
- # Fetch commits
64
- commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
65
- commits_params = {"since": since_date}
66
- logger.debug(f"Fetching commits from: {commits_url} with params: {commits_params}")
67
- commits_response = requests.get(commits_url, headers=headers, params=commits_params)
68
- commits_response.raise_for_status()
69
- commits = commits_response.json()
70
- logger.debug(f"Found {len(commits)} commits for {repo}")
71
-
72
- # Fetch pull requests
73
- prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
74
- prs_params = {"state": "all", "sort": "updated", "direction": "desc"}
75
- logger.debug(f"Fetching pull requests from: {prs_url} with params: {prs_params}")
76
- prs_response = requests.get(prs_url, headers=headers, params=prs_params)
77
- prs_response.raise_for_status()
78
- prs = prs_response.json()
79
- logger.debug(f"Found {len(prs)} pull requests for {repo}")
80
 
81
  # Extract changes
82
  changes = []
83
  for commit in commits:
84
  changes.append(f"Commit: {commit['commit']['message']}")
 
85
  for pr in prs:
86
  updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
87
- if updated_at >= datetime.now() - timedelta(days=days_back):
88
  changes.append(f"PR: {pr['title']} - {pr['body'] or 'No description'}")
89
 
90
  logger.debug(f"Total changes for {repo}: {len(changes)}")
91
  return changes
92
 
93
- except requests.exceptions.RequestException as e:
94
  logger.error(f"Error fetching changes for {repo}: {e}")
95
- raise HTTPException(status_code=500, detail=f"Error fetching changes for {repo}: {e}")
96
 
97
  def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
98
- """
99
- Use Groq's DeepSeek model to summarize changes and provide insights.
100
- """
101
  try:
102
  logger.debug(f"Summarizing changes for repository: {repo}")
103
  prompt = f"""
104
  Analyze the following changes made to detection rules in the GitHub repository {repo}:
105
- {', '.join(changes)}
106
 
107
  Provide a detailed response with two sections:
108
  - Description: Summarize what changes were made.
109
  - Context: Explain why these changes might be required.
110
  """
111
- logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...") # Truncate for brevity in logs
112
  response = groq_client.chat.completions.create(
113
  model="deepseek-chat",
114
  messages=[{"role": "user", "content": prompt}],
@@ -125,48 +134,55 @@ def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
125
  description = summary.split("Description:")[1].split("Context:")[0].strip()
126
  context = summary.split("Context:")[1].strip()
127
  else:
128
- description = summary # Fallback to full summary if sections aren't clear
129
 
130
  return {"description": description, "context": context}
131
 
132
  except Exception as e:
133
  logger.error(f"Error summarizing changes for {repo}: {e}")
134
- raise HTTPException(status_code=500, detail=f"Error summarizing changes for {repo}: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
 
136
  @app.get("/monitor", response_model=list[RepositoryDetails])
137
  async def monitor_repositories():
138
- """
139
- Single API endpoint to fetch and summarize changes for all repositories.
140
- """
141
  try:
142
  logger.debug("Starting to monitor repositories")
143
- results = []
144
- for repo in REPOSITORIES:
145
- logger.debug(f"Processing repository: {repo}")
146
- changes = fetch_repository_changes(repo, DAYS_BACK)
147
-
148
- if changes:
149
- logger.debug(f"Summarizing changes for {repo}")
150
- summary = summarize_changes_with_deepseek(repo, changes)
151
- results.append(RepositoryDetails(
152
- repo_name=f"{repo} (+{len(changes)})",
153
- repo_url=f"https://github.com/{repo}",
154
- changes="\n".join(changes),
155
- description=summary["description"],
156
- context=summary["context"]
157
- ))
158
- else:
159
- logger.debug(f"No changes detected for {repo}")
160
- results.append(RepositoryDetails(
161
- repo_name=f"{repo} (No changes)",
162
- repo_url=f"https://github.com/{repo}",
163
- changes="No changes detected in the last 7 days.",
164
- description="No changes detected.",
165
- context="No context available."
166
- ))
167
 
168
  logger.debug("Finished monitoring repositories")
169
- return results
170
 
171
  except Exception as e:
172
  logger.error(f"Error in monitor_repositories: {e}")
 
6
  from groq import Groq
7
  from dotenv import load_dotenv
8
  import logging
9
+ import asyncio
10
+ import aiohttp
11
+ from concurrent.futures import ThreadPoolExecutor
12
+ from functools import partial
13
 
14
  # Configure logging
15
  logging.basicConfig(level=logging.DEBUG)
 
18
  # Load environment variables
19
  load_dotenv()
20
 
21
+ # Configuration
22
+ GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "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")
23
+ GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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")
24
  REPOSITORIES = [
25
  "falcosecurity/rules",
26
  "SigmaHQ/sigma",
 
35
  "SlimKQL/Hunting-Queries-Detection-Rules"
36
  ]
37
  DAYS_BACK = 1
38
+ TIMEOUT = 10 # seconds for each request
39
 
40
  # GitHub API base URL
41
  GITHUB_API_URL = "https://api.github.com"
 
53
  description: str
54
  context: str
55
 
56
+ async def fetch_with_aiohttp(url: str, headers: dict, params: dict, session: aiohttp.ClientSession) -> dict:
57
+ """Asynchronous HTTP GET request with timeout."""
58
+ try:
59
+ async with session.get(url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response:
60
+ response.raise_for_status()
61
+ return await response.json()
62
+ except Exception as e:
63
+ logger.error(f"Error fetching {url}: {e}")
64
+ return []
65
+
66
+ async def fetch_repository_changes(repo: str, days_back: int) -> list[str]:
67
+ """Fetch recent commits and pull requests for a repository asynchronously."""
68
  try:
69
  logger.debug(f"Fetching changes for repository: {repo}")
70
  since_date = (datetime.now() - timedelta(days=days_back)).isoformat()
 
73
  "Accept": "application/vnd.github.v3+json"
74
  }
75
 
76
+ async with aiohttp.ClientSession() as session:
77
+ # Fetch commits
78
+ commits_url = f"{GITHUB_API_URL}/repos/{repo}/commits"
79
+ commits_params = {"since": since_date, "per_page": 100} # Limit to 100 for efficiency
80
+ logger.debug(f"Fetching commits from: {commits_url}")
81
+ commits = await fetch_with_aiohttp(commits_url, headers, commits_params, session)
82
+ logger.debug(f"Found {len(commits)} commits for {repo}")
83
+
84
+ # Fetch pull requests
85
+ prs_url = f"{GITHUB_API_URL}/repos/{repo}/pulls"
86
+ prs_params = {"state": "all", "sort": "updated", "direction": "desc", "per_page": 100}
87
+ logger.debug(f"Fetching pull requests from: {prs_url}")
88
+ prs = await fetch_with_aiohttp(prs_url, headers, prs_params, session)
89
+ logger.debug(f"Found {len(prs)} pull requests for {repo}")
 
 
 
90
 
91
  # Extract changes
92
  changes = []
93
  for commit in commits:
94
  changes.append(f"Commit: {commit['commit']['message']}")
95
+ cutoff_date = datetime.now() - timedelta(days=days_back)
96
  for pr in prs:
97
  updated_at = datetime.strptime(pr["updated_at"], "%Y-%m-%dT%H:%M:%SZ")
98
+ if updated_at >= cutoff_date:
99
  changes.append(f"PR: {pr['title']} - {pr['body'] or 'No description'}")
100
 
101
  logger.debug(f"Total changes for {repo}: {len(changes)}")
102
  return changes
103
 
104
+ except Exception as e:
105
  logger.error(f"Error fetching changes for {repo}: {e}")
106
+ return []
107
 
108
  def summarize_changes_with_deepseek(repo: str, changes: list[str]) -> dict:
109
+ """Use Groq's DeepSeek model to summarize changes (synchronous due to Groq SDK limitations)."""
 
 
110
  try:
111
  logger.debug(f"Summarizing changes for repository: {repo}")
112
  prompt = f"""
113
  Analyze the following changes made to detection rules in the GitHub repository {repo}:
114
+ {', '.join(changes[:50])} # Limit to 50 changes to avoid token overflow
115
 
116
  Provide a detailed response with two sections:
117
  - Description: Summarize what changes were made.
118
  - Context: Explain why these changes might be required.
119
  """
120
+ logger.debug(f"Sending prompt to DeepSeek: {prompt[:100]}...")
121
  response = groq_client.chat.completions.create(
122
  model="deepseek-chat",
123
  messages=[{"role": "user", "content": prompt}],
 
134
  description = summary.split("Description:")[1].split("Context:")[0].strip()
135
  context = summary.split("Context:")[1].strip()
136
  else:
137
+ description = summary
138
 
139
  return {"description": description, "context": context}
140
 
141
  except Exception as e:
142
  logger.error(f"Error summarizing changes for {repo}: {e}")
143
+ return {"description": "Error occurred.", "context": "Error occurred."}
144
+
145
+ async def process_repository(repo: str, days_back: int) -> RepositoryDetails:
146
+ """Process a single repository and return its details."""
147
+ changes = await fetch_repository_changes(repo, days_back)
148
+ if changes:
149
+ summary = await asyncio.get_event_loop().run_in_executor(
150
+ None, partial(summarize_changes_with_deepseek, repo, changes)
151
+ )
152
+ return RepositoryDetails(
153
+ repo_name=f"{repo} (+{len(changes)})",
154
+ repo_url=f"https://github.com/{repo}",
155
+ changes="\n".join(changes),
156
+ description=summary["description"],
157
+ context=summary["context"]
158
+ )
159
+ else:
160
+ return RepositoryDetails(
161
+ repo_name=f"{repo} (No changes)",
162
+ repo_url=f"https://github.com/{repo}",
163
+ changes=f"No changes detected in the last {days_back} day(s).",
164
+ description="No changes detected.",
165
+ context="No context available."
166
+ )
167
 
168
  @app.get("/monitor", response_model=list[RepositoryDetails])
169
  async def monitor_repositories():
170
+ """Single API endpoint to fetch and summarize changes for all repositories."""
 
 
171
  try:
172
  logger.debug("Starting to monitor repositories")
173
+ tasks = [process_repository(repo, DAYS_BACK) for repo in REPOSITORIES]
174
+ results = await asyncio.gather(*tasks, return_exceptions=True)
175
+
176
+ # Handle any exceptions in results
177
+ final_results = []
178
+ for result in results:
179
+ if isinstance(result, Exception):
180
+ logger.error(f"Error processing a repository: {result}")
181
+ continue
182
+ final_results.append(result)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
  logger.debug("Finished monitoring repositories")
185
+ return final_results
186
 
187
  except Exception as e:
188
  logger.error(f"Error in monitor_repositories: {e}")