Spaces:
Sleeping
Sleeping
File size: 5,838 Bytes
5ee28ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
from crewai import Agent, Task, Crew
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
# Initialize FastAPI app
app = FastAPI()
# Define agents using CrewAI
class ThreatIntelligenceCrew:
def __init__(self, api_key):
self.api_key = api_key
# Define agents
self.data_collector = Agent(
role="Data Collector",
goal="Fetch threat data from external sources",
backstory="Specializes in collecting IOCs from threat intelligence feeds.",
tools=[self.fetch_iocs] # Custom tool for fetching IOCs
)
self.analyst = Agent(
role="Threat Analyst",
goal="Analyze collected data for suspicious activity",
backstory="Expert in identifying patterns and anomalies in threat data.",
tools=[self.analyze_iocs] # Custom tool for analyzing IOCs
)
self.correlator = Agent(
role="Threat Correlator",
goal="Correlate IOCs with known threat actors",
backstory="Specializes in linking IOCs to advanced threat groups.",
tools=[self.correlate_threats] # Custom tool for correlation
)
self.reporter = Agent(
role="Threat Reporter",
goal="Generate actionable threat intelligence reports",
backstory="Expert in creating clear and concise reports for security teams.",
tools=[self.generate_report] # Custom tool for reporting
)
self.responder = Agent(
role="Response Advisor",
goal="Recommend mitigation actions based on threats",
backstory="Specializes in providing actionable recommendations to mitigate risks.",
tools=[self.recommend_actions] # Custom tool for response
)
# Custom tool: Fetch IOCs from AlienVault OTX
def fetch_iocs(self, indicator_type="ipv4"):
url = f"https://otx.alienvault.com/api/v1/indicators/{indicator_type}/recent"
headers = {"X-OTX-API-KEY": self.api_key}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
return {"error": "Failed to fetch IOCs"}
# Custom tool: Analyze IOCs
def analyze_iocs(self, iocs):
suspicious_iocs = []
for ioc in iocs.get("results", []):
if ioc.get("pulse_info", {}).get("count", 0) > 5: # Example threshold
suspicious_iocs.append(ioc)
return suspicious_iocs
# Custom tool: Correlate threats
def correlate_threats(self, iocs):
threat_actors = {
"APT28": ["1.2.3.4", "5.6.7.8"],
"Lazarus Group": ["9.10.11.12"]
}
correlated_threats = {}
for ioc in iocs:
ip = ioc.get("indicator")
for actor, ips in threat_actors.items():
if ip in ips:
correlated_threats[ip] = actor
return correlated_threats
# Custom tool: Generate report
def generate_report(self, suspicious_iocs, correlated_threats):
report = {
"suspicious_iocs": suspicious_iocs,
"correlated_threats": correlated_threats,
"summary": f"Found {len(suspicious_iocs)} suspicious IOCs, with {len(correlated_threats)} linked to known threat actors."
}
return report
# Custom tool: Recommend actions
def recommend_actions(self, correlated_threats):
actions = []
for ip, actor in correlated_threats.items():
actions.append(f"Block IP {ip} (linked to {actor})")
return actions
# Define tasks for the crew
def create_tasks(self, indicator_type):
fetch_task = Task(
description=f"Fetch IOCs of type {indicator_type} from AlienVault OTX",
agent=self.data_collector,
expected_output="A list of IOCs in JSON format."
)
analyze_task = Task(
description="Analyze the fetched IOCs for suspicious activity",
agent=self.analyst,
expected_output="A list of suspicious IOCs."
)
correlate_task = Task(
description="Correlate suspicious IOCs with known threat actors",
agent=self.correlator,
expected_output="A dictionary mapping IOCs to threat actors."
)
report_task = Task(
description="Generate a threat intelligence report",
agent=self.reporter,
expected_output="A JSON report with suspicious IOCs, correlated threats, and a summary."
)
respond_task = Task(
description="Recommend mitigation actions based on the report",
agent=self.responder,
expected_output="A list of recommended actions."
)
return [fetch_task, analyze_task, correlate_task, report_task, respond_task]
# Execute the crew
def run_crew(self, indicator_type):
tasks = self.create_tasks(indicator_type)
crew = Crew(
agents=[self.data_collector, self.analyst, self.correlator, self.reporter, self.responder],
tasks=tasks
)
return crew.kickoff()
# FastAPI endpoint
class ThreatIntelRequest(BaseModel):
indicator_type: str = "ipv4"
@app.post("/threat-intel")
def threat_intel(request: ThreatIntelRequest):
try:
# Initialize the crew
crew = ThreatIntelligenceCrew(api_key="your_alienvault_api_key")
# Run the crew and get results
result = crew.run_crew(request.indicator_type)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run the FastAPI app
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |