rajrakeshdr commited on
Commit
ca272d0
·
verified ·
1 Parent(s): dfdce9b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +41 -159
app.py CHANGED
@@ -1,159 +1,41 @@
1
- from crewai import Agent, Task, Crew
2
- from fastapi import FastAPI, HTTPException
3
- from pydantic import BaseModel
4
- import requests
5
-
6
- # Initialize FastAPI app
7
- app = FastAPI()
8
-
9
- # Define agents using CrewAI
10
- class ThreatIntelligenceCrew:
11
- def __init__(self, api_key):
12
- self.api_key = api_key
13
-
14
- # Define agents
15
- self.data_collector = Agent(
16
- role="Data Collector",
17
- goal="Fetch threat data from external sources",
18
- backstory="Specializes in collecting IOCs from threat intelligence feeds.",
19
- tools=[self.fetch_iocs] # Custom tool for fetching IOCs
20
- )
21
-
22
- self.analyst = Agent(
23
- role="Threat Analyst",
24
- goal="Analyze collected data for suspicious activity",
25
- backstory="Expert in identifying patterns and anomalies in threat data.",
26
- tools=[self.analyze_iocs] # Custom tool for analyzing IOCs
27
- )
28
-
29
- self.correlator = Agent(
30
- role="Threat Correlator",
31
- goal="Correlate IOCs with known threat actors",
32
- backstory="Specializes in linking IOCs to advanced threat groups.",
33
- tools=[self.correlate_threats] # Custom tool for correlation
34
- )
35
-
36
- self.reporter = Agent(
37
- role="Threat Reporter",
38
- goal="Generate actionable threat intelligence reports",
39
- backstory="Expert in creating clear and concise reports for security teams.",
40
- tools=[self.generate_report] # Custom tool for reporting
41
- )
42
-
43
- self.responder = Agent(
44
- role="Response Advisor",
45
- goal="Recommend mitigation actions based on threats",
46
- backstory="Specializes in providing actionable recommendations to mitigate risks.",
47
- tools=[self.recommend_actions] # Custom tool for response
48
- )
49
-
50
- # Custom tool: Fetch IOCs from AlienVault OTX
51
- def fetch_iocs(self, indicator_type="ipv4"):
52
- url = f"https://otx.alienvault.com/api/v1/indicators/{indicator_type}/recent"
53
- headers = {"X-OTX-API-KEY": self.api_key}
54
- response = requests.get(url, headers=headers)
55
- if response.status_code == 200:
56
- return response.json()
57
- else:
58
- return {"error": "Failed to fetch IOCs"}
59
-
60
- # Custom tool: Analyze IOCs
61
- def analyze_iocs(self, iocs):
62
- suspicious_iocs = []
63
- for ioc in iocs.get("results", []):
64
- if ioc.get("pulse_info", {}).get("count", 0) > 5: # Example threshold
65
- suspicious_iocs.append(ioc)
66
- return suspicious_iocs
67
-
68
- # Custom tool: Correlate threats
69
- def correlate_threats(self, iocs):
70
- threat_actors = {
71
- "APT28": ["1.2.3.4", "5.6.7.8"],
72
- "Lazarus Group": ["9.10.11.12"]
73
- }
74
- correlated_threats = {}
75
- for ioc in iocs:
76
- ip = ioc.get("indicator")
77
- for actor, ips in threat_actors.items():
78
- if ip in ips:
79
- correlated_threats[ip] = actor
80
- return correlated_threats
81
-
82
- # Custom tool: Generate report
83
- def generate_report(self, suspicious_iocs, correlated_threats):
84
- report = {
85
- "suspicious_iocs": suspicious_iocs,
86
- "correlated_threats": correlated_threats,
87
- "summary": f"Found {len(suspicious_iocs)} suspicious IOCs, with {len(correlated_threats)} linked to known threat actors."
88
- }
89
- return report
90
-
91
- # Custom tool: Recommend actions
92
- def recommend_actions(self, correlated_threats):
93
- actions = []
94
- for ip, actor in correlated_threats.items():
95
- actions.append(f"Block IP {ip} (linked to {actor})")
96
- return actions
97
-
98
- # Define tasks for the crew
99
- def create_tasks(self, indicator_type):
100
- fetch_task = Task(
101
- description=f"Fetch IOCs of type {indicator_type} from AlienVault OTX",
102
- agent=self.data_collector,
103
- expected_output="A list of IOCs in JSON format."
104
- )
105
-
106
- analyze_task = Task(
107
- description="Analyze the fetched IOCs for suspicious activity",
108
- agent=self.analyst,
109
- expected_output="A list of suspicious IOCs."
110
- )
111
-
112
- correlate_task = Task(
113
- description="Correlate suspicious IOCs with known threat actors",
114
- agent=self.correlator,
115
- expected_output="A dictionary mapping IOCs to threat actors."
116
- )
117
-
118
- report_task = Task(
119
- description="Generate a threat intelligence report",
120
- agent=self.reporter,
121
- expected_output="A JSON report with suspicious IOCs, correlated threats, and a summary."
122
- )
123
-
124
- respond_task = Task(
125
- description="Recommend mitigation actions based on the report",
126
- agent=self.responder,
127
- expected_output="A list of recommended actions."
128
- )
129
-
130
- return [fetch_task, analyze_task, correlate_task, report_task, respond_task]
131
-
132
- # Execute the crew
133
- def run_crew(self, indicator_type):
134
- tasks = self.create_tasks(indicator_type)
135
- crew = Crew(
136
- agents=[self.data_collector, self.analyst, self.correlator, self.reporter, self.responder],
137
- tasks=tasks
138
- )
139
- return crew.kickoff()
140
-
141
- # FastAPI endpoint
142
- class ThreatIntelRequest(BaseModel):
143
- indicator_type: str = "ipv4"
144
-
145
- @app.post("/threat-intel")
146
- def threat_intel(request: ThreatIntelRequest):
147
- try:
148
- # Initialize the crew
149
- crew = ThreatIntelligenceCrew(api_key="your_alienvault_api_key")
150
- # Run the crew and get results
151
- result = crew.run_crew(request.indicator_type)
152
- return result
153
- except Exception as e:
154
- raise HTTPException(status_code=500, detail=str(e))
155
-
156
- # Run the FastAPI app
157
- if __name__ == "__main__":
158
- import uvicorn
159
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
+ import streamlit as st
2
+ from transformers import AutoModelForCausalLM, AutoTokenizer
3
+ import torch
4
+
5
+ # Load the model and tokenizer
6
+ model_name = "rajrakeshdr/IntelliSoc"
7
+ tokenizer = AutoTokenizer.from_pretrained(model_name)
8
+ model = AutoModelForCausalLM.from_pretrained(model_name)
9
+
10
+ # Streamlit app title
11
+ st.title("IntelliSoc Text Generation")
12
+
13
+ # Input prompt
14
+ prompt = st.text_area("Enter your prompt:", "Once upon a time")
15
+
16
+ # Slider for max length
17
+ max_length = st.slider("Max length of generated text", 50, 200, 100)
18
+
19
+ # Generate text on button click
20
+ if st.button("Generate Text"):
21
+ # Tokenize input
22
+ inputs = tokenizer(prompt, return_tensors="pt", truncation=True, padding=True)
23
+
24
+ # Generate text
25
+ with torch.no_grad():
26
+ outputs = model.generate(
27
+ inputs.input_ids,
28
+ max_length=max_length,
29
+ num_return_sequences=1,
30
+ no_repeat_ngram_size=2,
31
+ top_k=50,
32
+ top_p=0.95,
33
+ temperature=0.7
34
+ )
35
+
36
+ # Decode the generated text
37
+ generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
38
+
39
+ # Display the generated text
40
+ st.write("Generated Text:")
41
+ st.write(generated_text)