TI / app.py
rajrakeshdr's picture
Create app.py
5ee28ef verified
raw
history blame
5.84 kB
from crewai import Agent, Task, Crew
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
# Initialize FastAPI app
app = FastAPI()
# Define agents using CrewAI
class ThreatIntelligenceCrew:
def __init__(self, api_key):
self.api_key = api_key
# Define agents
self.data_collector = Agent(
role="Data Collector",
goal="Fetch threat data from external sources",
backstory="Specializes in collecting IOCs from threat intelligence feeds.",
tools=[self.fetch_iocs] # Custom tool for fetching IOCs
)
self.analyst = Agent(
role="Threat Analyst",
goal="Analyze collected data for suspicious activity",
backstory="Expert in identifying patterns and anomalies in threat data.",
tools=[self.analyze_iocs] # Custom tool for analyzing IOCs
)
self.correlator = Agent(
role="Threat Correlator",
goal="Correlate IOCs with known threat actors",
backstory="Specializes in linking IOCs to advanced threat groups.",
tools=[self.correlate_threats] # Custom tool for correlation
)
self.reporter = Agent(
role="Threat Reporter",
goal="Generate actionable threat intelligence reports",
backstory="Expert in creating clear and concise reports for security teams.",
tools=[self.generate_report] # Custom tool for reporting
)
self.responder = Agent(
role="Response Advisor",
goal="Recommend mitigation actions based on threats",
backstory="Specializes in providing actionable recommendations to mitigate risks.",
tools=[self.recommend_actions] # Custom tool for response
)
# Custom tool: Fetch IOCs from AlienVault OTX
def fetch_iocs(self, indicator_type="ipv4"):
url = f"https://otx.alienvault.com/api/v1/indicators/{indicator_type}/recent"
headers = {"X-OTX-API-KEY": self.api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return {"error": "Failed to fetch IOCs"}
# Custom tool: Analyze IOCs
def analyze_iocs(self, iocs):
suspicious_iocs = []
for ioc in iocs.get("results", []):
if ioc.get("pulse_info", {}).get("count", 0) > 5: # Example threshold
suspicious_iocs.append(ioc)
return suspicious_iocs
# Custom tool: Correlate threats
def correlate_threats(self, iocs):
threat_actors = {
"APT28": ["1.2.3.4", "5.6.7.8"],
"Lazarus Group": ["9.10.11.12"]
}
correlated_threats = {}
for ioc in iocs:
ip = ioc.get("indicator")
for actor, ips in threat_actors.items():
if ip in ips:
correlated_threats[ip] = actor
return correlated_threats
# Custom tool: Generate report
def generate_report(self, suspicious_iocs, correlated_threats):
report = {
"suspicious_iocs": suspicious_iocs,
"correlated_threats": correlated_threats,
"summary": f"Found {len(suspicious_iocs)} suspicious IOCs, with {len(correlated_threats)} linked to known threat actors."
}
return report
# Custom tool: Recommend actions
def recommend_actions(self, correlated_threats):
actions = []
for ip, actor in correlated_threats.items():
actions.append(f"Block IP {ip} (linked to {actor})")
return actions
# Define tasks for the crew
def create_tasks(self, indicator_type):
fetch_task = Task(
description=f"Fetch IOCs of type {indicator_type} from AlienVault OTX",
agent=self.data_collector,
expected_output="A list of IOCs in JSON format."
)
analyze_task = Task(
description="Analyze the fetched IOCs for suspicious activity",
agent=self.analyst,
expected_output="A list of suspicious IOCs."
)
correlate_task = Task(
description="Correlate suspicious IOCs with known threat actors",
agent=self.correlator,
expected_output="A dictionary mapping IOCs to threat actors."
)
report_task = Task(
description="Generate a threat intelligence report",
agent=self.reporter,
expected_output="A JSON report with suspicious IOCs, correlated threats, and a summary."
)
respond_task = Task(
description="Recommend mitigation actions based on the report",
agent=self.responder,
expected_output="A list of recommended actions."
)
return [fetch_task, analyze_task, correlate_task, report_task, respond_task]
# Execute the crew
def run_crew(self, indicator_type):
tasks = self.create_tasks(indicator_type)
crew = Crew(
agents=[self.data_collector, self.analyst, self.correlator, self.reporter, self.responder],
tasks=tasks
)
return crew.kickoff()
# FastAPI endpoint
class ThreatIntelRequest(BaseModel):
indicator_type: str = "ipv4"
@app.post("/threat-intel")
def threat_intel(request: ThreatIntelRequest):
try:
# Initialize the crew
crew = ThreatIntelligenceCrew(api_key="your_alienvault_api_key")
# Run the crew and get results
result = crew.run_crew(request.indicator_type)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run the FastAPI app
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)