Spaces:
Sleeping
Sleeping
from crewai import Agent, Task, Crew | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import requests | |
# Initialize FastAPI app | |
app = FastAPI() | |
# Define agents using CrewAI | |
class ThreatIntelligenceCrew: | |
def __init__(self, api_key): | |
self.api_key = api_key | |
# Define agents | |
self.data_collector = Agent( | |
role="Data Collector", | |
goal="Fetch threat data from external sources", | |
backstory="Specializes in collecting IOCs from threat intelligence feeds.", | |
tools=[self.fetch_iocs] # Custom tool for fetching IOCs | |
) | |
self.analyst = Agent( | |
role="Threat Analyst", | |
goal="Analyze collected data for suspicious activity", | |
backstory="Expert in identifying patterns and anomalies in threat data.", | |
tools=[self.analyze_iocs] # Custom tool for analyzing IOCs | |
) | |
self.correlator = Agent( | |
role="Threat Correlator", | |
goal="Correlate IOCs with known threat actors", | |
backstory="Specializes in linking IOCs to advanced threat groups.", | |
tools=[self.correlate_threats] # Custom tool for correlation | |
) | |
self.reporter = Agent( | |
role="Threat Reporter", | |
goal="Generate actionable threat intelligence reports", | |
backstory="Expert in creating clear and concise reports for security teams.", | |
tools=[self.generate_report] # Custom tool for reporting | |
) | |
self.responder = Agent( | |
role="Response Advisor", | |
goal="Recommend mitigation actions based on threats", | |
backstory="Specializes in providing actionable recommendations to mitigate risks.", | |
tools=[self.recommend_actions] # Custom tool for response | |
) | |
# Custom tool: Fetch IOCs from AlienVault OTX | |
def fetch_iocs(self, indicator_type="ipv4"): | |
url = f"https://otx.alienvault.com/api/v1/indicators/{indicator_type}/recent" | |
headers = {"X-OTX-API-KEY": self.api_key} | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return {"error": "Failed to fetch IOCs"} | |
# Custom tool: Analyze IOCs | |
def analyze_iocs(self, iocs): | |
suspicious_iocs = [] | |
for ioc in iocs.get("results", []): | |
if ioc.get("pulse_info", {}).get("count", 0) > 5: # Example threshold | |
suspicious_iocs.append(ioc) | |
return suspicious_iocs | |
# Custom tool: Correlate threats | |
def correlate_threats(self, iocs): | |
threat_actors = { | |
"APT28": ["1.2.3.4", "5.6.7.8"], | |
"Lazarus Group": ["9.10.11.12"] | |
} | |
correlated_threats = {} | |
for ioc in iocs: | |
ip = ioc.get("indicator") | |
for actor, ips in threat_actors.items(): | |
if ip in ips: | |
correlated_threats[ip] = actor | |
return correlated_threats | |
# Custom tool: Generate report | |
def generate_report(self, suspicious_iocs, correlated_threats): | |
report = { | |
"suspicious_iocs": suspicious_iocs, | |
"correlated_threats": correlated_threats, | |
"summary": f"Found {len(suspicious_iocs)} suspicious IOCs, with {len(correlated_threats)} linked to known threat actors." | |
} | |
return report | |
# Custom tool: Recommend actions | |
def recommend_actions(self, correlated_threats): | |
actions = [] | |
for ip, actor in correlated_threats.items(): | |
actions.append(f"Block IP {ip} (linked to {actor})") | |
return actions | |
# Define tasks for the crew | |
def create_tasks(self, indicator_type): | |
fetch_task = Task( | |
description=f"Fetch IOCs of type {indicator_type} from AlienVault OTX", | |
agent=self.data_collector, | |
expected_output="A list of IOCs in JSON format." | |
) | |
analyze_task = Task( | |
description="Analyze the fetched IOCs for suspicious activity", | |
agent=self.analyst, | |
expected_output="A list of suspicious IOCs." | |
) | |
correlate_task = Task( | |
description="Correlate suspicious IOCs with known threat actors", | |
agent=self.correlator, | |
expected_output="A dictionary mapping IOCs to threat actors." | |
) | |
report_task = Task( | |
description="Generate a threat intelligence report", | |
agent=self.reporter, | |
expected_output="A JSON report with suspicious IOCs, correlated threats, and a summary." | |
) | |
respond_task = Task( | |
description="Recommend mitigation actions based on the report", | |
agent=self.responder, | |
expected_output="A list of recommended actions." | |
) | |
return [fetch_task, analyze_task, correlate_task, report_task, respond_task] | |
# Execute the crew | |
def run_crew(self, indicator_type): | |
tasks = self.create_tasks(indicator_type) | |
crew = Crew( | |
agents=[self.data_collector, self.analyst, self.correlator, self.reporter, self.responder], | |
tasks=tasks | |
) | |
return crew.kickoff() | |
# FastAPI endpoint | |
class ThreatIntelRequest(BaseModel): | |
indicator_type: str = "ipv4" | |
def threat_intel(request: ThreatIntelRequest): | |
try: | |
# Initialize the crew | |
crew = ThreatIntelligenceCrew(api_key="your_alienvault_api_key") | |
# Run the crew and get results | |
result = crew.run_crew(request.indicator_type) | |
return result | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Run the FastAPI app | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |