nyasukun commited on
Commit
5b76d85
·
1 Parent(s): 8773015

feat: initial project setup and core functionality

Browse files

- Add basic MITRE ATT&CK layer generation
- Configure logging and error handling
- Setup project structure and dependencies
- Add .gitignore for Python/Pipenv
- Implement core LLM service

.gitignore ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python
2
+ __pycache__/
3
+ *.py[cod]
4
+ *$py.class
5
+ *.so
6
+ .Python
7
+ build/
8
+ develop-eggs/
9
+ dist/
10
+ downloads/
11
+ eggs/
12
+ .eggs/
13
+ lib/
14
+ lib64/
15
+ parts/
16
+ sdist/
17
+ var/
18
+ wheels/
19
+ *.egg-info/
20
+ .installed.cfg
21
+ *.egg
22
+
23
+ # Virtual Environment
24
+ venv/
25
+ env/
26
+ ENV/
27
+
28
+ # Pipenv
29
+ Pipfile
30
+ Pipfile.lock
31
+ .pipenv/
32
+
33
+ # IDE
34
+ .idea/
35
+ .vscode/
36
+ *.swp
37
+ *.swo
38
+
39
+ # Logs
40
+ *.log
41
+ logs/
42
+
43
+ # Local development settings
44
+ .env
45
+ .env.local
46
+ .env.*.local
47
+
48
+ # Chainlit
49
+ .chainlit/
50
+ chainlit.md
51
+
52
+ # Project specific
53
+ responses/
54
+ public/responses/
55
+ test_output_*.json
Dockerfile CHANGED
@@ -4,6 +4,8 @@ RUN useradd -m -u 1000 user
4
  USER user
5
  ENV PATH="/home/user/.local/bin:$PATH"
6
 
 
 
7
  WORKDIR /app
8
 
9
  COPY --chown=user ./requirements.txt requirements.txt
 
4
  USER user
5
  ENV PATH="/home/user/.local/bin:$PATH"
6
 
7
+ RUN --mount=type=secret,id=GEMINI_API_KEY,mode=0444,required=true
8
+
9
  WORKDIR /app
10
 
11
  COPY --chown=user ./requirements.txt requirements.txt
app.py CHANGED
@@ -1,28 +1,53 @@
1
  import chainlit as cl
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  @cl.on_chat_start
4
  async def start():
5
  """
6
  This function is called when a new chat starts.
7
- You can use it to initialize any variables or models.
8
- """
9
- # Initialize any variables or models here
10
- await cl.Message(
11
- content="Welcome to the chatbot! How can I help you today?"
12
- ).send()
13
-
14
- @cl.on_message
15
- async def main(message: str):
16
- """
17
- This function is called every time a user inputs a message.
18
- Args:
19
- message: The user's message
20
- """
21
- # Process the user's message here
22
- response = f"You said: {message.content}"
23
 
24
- # Send a response back to the user
25
- await cl.Message(
26
- content=response
27
- ).send()
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import chainlit as cl
2
+ from models.chat_state import get_initial_state, add_user_message
3
+ from workflows.chat_workflow import chainlit_app
4
+
5
+ # Chainlit message handler
6
+ @cl.on_message
7
+ async def process_user_message(message: cl.Message):
8
+ """Process user message and generate response"""
9
+ # Get state from session
10
+ state = cl.user_session.get("state")
11
+
12
+ # Add user message to state
13
+ state = add_user_message(state, message.content)
14
+
15
+ # Generate response
16
+ state = await chainlit_app.ainvoke(state)
17
+
18
+ # Update state in session
19
+ cl.user_session.set("state", state)
20
 
21
  @cl.on_chat_start
22
  async def start():
23
  """
24
  This function is called when a new chat starts.
25
+ Initialize the chat state and store it in the session
26
+ """
27
+ # Initialize state
28
+ state = get_initial_state()
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ # Store state in session
31
+ cl.user_session.set("state", state)
32
+
33
+
34
 
35
+ @cl.set_chat_profiles
36
+ async def chat_profile():
37
+ return [
38
+ cl.ChatProfile(
39
+ name="mitre_attck_navigator_layer_writer",
40
+ markdown_description="MITRE ATT&CK Navigatorのlayerを生成するチャットボットです。",
41
+ icon="public/icon.jpg",
42
+ starters=[
43
+ cl.Starter(
44
+ label="Lockbitの攻撃シナリオ",
45
+ message="Lockbitの攻撃シナリオを生成してください"
46
+ ),
47
+ cl.Starter(
48
+ label="ランサムウェアギャングが使っているテクニックの頻度",
49
+ message="最近のランサムウェアギャングが使っているテクニックと頻度を色の濃さで表現してください"
50
+ ),
51
+ ]
52
+ )
53
+ ]
models/attack_schema.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MITRE ATT&CK Navigator JSON Schema
3
+ """
4
+
5
+ from pydantic import BaseModel, Field, validator, StringConstraints
6
+ from typing import Optional, Dict, Any, List, Annotated
7
+ import re
8
+ from datetime import datetime
9
+
10
+ # カスタム型の定義
11
+ TechniqueID = Annotated[str, StringConstraints(pattern=r'^T\d{4}(\.\d{3})?$')]
12
+ TacticName = Annotated[str, StringConstraints(pattern=r'^[a-z-]+$')]
13
+ HexColor = Annotated[str, StringConstraints(pattern=r'^#[0-9A-Fa-f]{6}$')]
14
+ Domain = Annotated[str, StringConstraints(pattern=r'^(enterprise-attack|mobile-attack|ics-attack)$')]
15
+
16
+ class Technique(BaseModel):
17
+ """Technique model for ATT&CK Navigator"""
18
+ techniqueID: TechniqueID = Field(description="The MITRE ATT&CK technique ID (e.g., T1234 or T1234.001)")
19
+ tactic: Optional[TacticName] = Field(description="The tactic this technique belongs to (lowercase with hyphens)")
20
+ score: Optional[int] = Field(description="Score between 0 and 100", ge=0, le=100)
21
+ color: Optional[HexColor] = Field(description="Hex color code (e.g., #FF0000)")
22
+ comment: Optional[str] = Field(description="Comment about the technique", max_length=1000)
23
+ enabled: Optional[bool] = Field(description="Whether the technique is enabled", default=True)
24
+ metadata: Optional[List[Dict[str, str]]] = Field(description="Additional metadata")
25
+ links: Optional[List[Dict[str, str]]] = Field(description="Related links")
26
+ showSubtechniques: Optional[bool] = Field(description="Whether to show subtechniques", default=True)
27
+
28
+ @validator('links')
29
+ def validate_links(cls, v):
30
+ if v is not None:
31
+ for link in v:
32
+ if not all(k in link for k in ['label', 'url']):
33
+ raise ValueError("Each link must have 'label' and 'url' fields")
34
+ return v
35
+
36
+ class VersionInfo(BaseModel):
37
+ """バージョン情報のサブスキーマ"""
38
+ attack: str = Field(
39
+ ...,
40
+ description="""ATT&CKのバージョン。ユーザーが指定したバージョンを使用すること。
41
+ 例: ユーザーが「ATT&CKバージョンは16で」と指定した場合、"16"と設定する。
42
+ 指定がない場合は、DEFAULT_LAYER_SETTINGSの値を使用する。"""
43
+ )
44
+ navigator: str = Field(
45
+ ...,
46
+ description="""Navigatorのバージョン。ユーザーが指定したバージョンを使用すること。
47
+ 指定がない場合は、DEFAULT_LAYER_SETTINGSの値を使用する。"""
48
+ )
49
+ layer: str = Field(
50
+ ...,
51
+ description="""レイヤーのバージョン。ユーザーが指定したバージョンを使用すること。
52
+ 指定がない場合は、DEFAULT_LAYER_SETTINGSの値を使用する。"""
53
+ )
54
+
55
+ class AttackLayer(BaseModel):
56
+ """MITRE ATT&CK Navigator Layer"""
57
+ name: str = Field(..., description="レイヤーの名前")
58
+ description: str = Field(..., description="レイヤーの説明")
59
+ domain: Domain = Field(..., description="ATT&CKドメイン")
60
+ versions: VersionInfo = Field(..., description="バージョン情報")
61
+ filters: Dict[str, Any] = Field(
62
+ default_factory=lambda: {
63
+ "platforms": [
64
+ "Windows",
65
+ "Linux",
66
+ "macOS",
67
+ "Containers",
68
+ "IaaS",
69
+ "Network"
70
+ ]
71
+ },
72
+ description="フィルター設定"
73
+ )
74
+ sorting: int = Field(default=0, description="ソート順")
75
+ layout: Dict[str, Any] = Field(
76
+ default_factory=lambda: {
77
+ "layout": "side",
78
+ "showName": True,
79
+ "showID": False,
80
+ "showAggregateScores": True,
81
+ "countUnscored": True,
82
+ "aggregateFunction": "average"
83
+ },
84
+ description="レイアウト設定"
85
+ )
86
+ hideDisabled: bool = Field(default=False, description="無効なテクニックを非表示")
87
+ techniques: List[Technique] = Field(..., description="テクニックのリスト")
88
+ gradient: Dict[str, Any] = Field(
89
+ default_factory=lambda: {
90
+ "colors": ["#ff6666", "#ffe766", "#8ec843"],
91
+ "minValue": 0,
92
+ "maxValue": 100
93
+ },
94
+ description="スコアのグラデーション設定"
95
+ )
96
+ legendItems: List[Dict[str, Any]] = Field(
97
+ default_factory=list,
98
+ description="凡例アイテム"
99
+ )
100
+ showTacticRowBackground: bool = Field(default=False, description="タクティック行の背景を表示")
101
+ tacticRowBackground: str = Field(default="#dddddd", description="タクティック行の背景色")
102
+ selectTechniquesAcrossTactics: bool = Field(default=True, description="タクティック間でテクニックを選択")
103
+ selectSubtechniquesWithParent: bool = Field(default=False, description="サブテクニックを親と共に選択")
104
+ selectVisibleTechniques: bool = Field(default=False, description="表示されているテクニックのみを選択")
105
+ metadata: List[Dict[str, str]] = Field(
106
+ default_factory=lambda: [
107
+ {"name": "作成者", "value": "AIアシスタント"},
108
+ {"name": "作成日", "value": datetime.now().strftime("%Y-%m-%d")}
109
+ ],
110
+ description="メタデータ"
111
+ )
112
+
113
+ # デフォルトのレイヤー設定
114
+ DEFAULT_LAYER_SETTINGS = {
115
+ "versions": {
116
+ "attack": "16.1",
117
+ "navigator": "5.1.0",
118
+ "layer": "4.5"
119
+ },
120
+ "layout": {
121
+ "layout": "side",
122
+ "showName": True,
123
+ "showID": False,
124
+ "showAggregateScores": True,
125
+ "countUnscored": True,
126
+ "aggregateFunction": "average"
127
+ },
128
+ "gradient": {
129
+ "colors": ["#ff6666", "#ffe766", "#8ec843"],
130
+ "minValue": 0,
131
+ "maxValue": 100
132
+ }
133
+ }
134
+
135
+ # プロンプトテンプレート
136
+ ATTACK_PROMPT = """
137
+ あなたはMITRE ATT&CKフレームワークの専門家です。
138
+ 以下のシナリオに基づいて、MITRE ATT&CK NavigatorのレイヤーJSONを生成してください。
139
+
140
+ 回答形式:
141
+ {format_instructions}
142
+
143
+ 注意:
144
+ - 有効なMITRE ATT&CKテクニックIDを使用すること
145
+ - 適切なタクティクスを含めること
146
+ - スコアは0-100の範囲で設定すること
147
+ - 有効な16進数カラーコードを使用すること
148
+ - 意味のあるコメントを含めること
149
+ - 適切なメタデータを設定すること
150
+ """
models/chat_state.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Union, Dict, Any
2
+ from langgraph.graph.message import MessagesState
3
+ from langchain_core.messages import AIMessage, HumanMessage
4
+ from pydantic import BaseModel
5
+
6
+ class AttackState(MessagesState, total=False):
7
+ """State for the ATT&CK Navigator workflow"""
8
+ attack_json: Optional[Dict[str, Any]] = None
9
+ scenario: Optional[str] = None
10
+ is_valid_context: Optional[bool] = None
11
+ extracted_user_scenario: Optional[str] = None
12
+ extracted_user_layer_operation: Optional[str] = None
13
+
14
+ def get_initial_state() -> AttackState:
15
+ """Get the initial state for the workflow"""
16
+ return AttackState(
17
+ messages=[],
18
+ attack_json=None,
19
+ scenario=None,
20
+ is_valid_context=None,
21
+ extracted_user_scenario=None,
22
+ extracted_user_layer_operation=None
23
+ )
24
+
25
+ def add_user_message(state: AttackState, content: str) -> AttackState:
26
+ """Add a user message to the state"""
27
+ state['messages'].append(HumanMessage(content=content))
28
+ return state
29
+
30
+ def add_ai_message(state: AttackState, content: str) -> AttackState:
31
+ """Add an AI message to the state"""
32
+ state['messages'].append(AIMessage(content=content))
33
+ return state
34
+
35
+ def set_attack_json(state: AttackState, attack_json: Dict[str, Any]) -> AttackState:
36
+ """Set the ATT&CK JSON in the state"""
37
+ state['attack_json'] = attack_json
38
+ return state
39
+
40
+ def set_scenario(state: AttackState, scenario: str) -> AttackState:
41
+ """Set the scenario text in the state"""
42
+ state['scenario'] = scenario
43
+ return state
44
+
45
+ def set_valid_context(state: AttackState, is_valid: bool) -> AttackState:
46
+ """Set the context validity in the state"""
47
+ state['is_valid_context'] = is_valid
48
+ return state
public/icon.jpg ADDED
requirements.txt CHANGED
@@ -1,2 +1,7 @@
1
  chainlit
2
  websockets
 
 
 
 
 
 
1
  chainlit
2
  websockets
3
+ langgraph
4
+ google-generativeai
5
+ langchain
6
+ langchain-google-genai
7
+ uvicorn
services/file_service.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import chainlit as cl
3
+ import uuid
4
+ from datetime import datetime
5
+ # Ensure public/jsons directory exists
6
+ os.makedirs("public/jsons", exist_ok=True)
7
+
8
+ def save_response(content: str) -> tuple[str, str]:
9
+ """
10
+ Save response content to a file and return filename and filepath
11
+ """
12
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
13
+ file_uuid = str(uuid.uuid4())
14
+ filename = f"layer_{file_uuid}_{timestamp}.json"
15
+ filepath = os.path.join("public/jsons", filename)
16
+
17
+ with open(filepath, "w", encoding="utf-8") as f:
18
+ f.write(content)
19
+
20
+ return filename, filepath
21
+
22
+ def create_file_element(filename: str, filepath: str) -> cl.File:
23
+ """
24
+ Create a Chainlit File element for the saved response
25
+ """
26
+ return cl.File(
27
+ name=filename,
28
+ path=filepath,
29
+ display="inline",
30
+ )
services/llm_service.py ADDED
@@ -0,0 +1,190 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langchain_google_genai import ChatGoogleGenerativeAI
2
+ from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
3
+ from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser
4
+ from langchain.output_parsers import RetryOutputParser, OutputFixingParser
5
+ from langchain_core.prompts import PromptTemplate
6
+ from pydantic import BaseModel, Field, field_validator
7
+ from typing import Optional, Dict, Any, List, Union
8
+ import json
9
+ import logging
10
+ from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS
11
+
12
+ # ロガーの設定
13
+ logger = logging.getLogger(__name__)
14
+ logger.setLevel(logging.INFO)
15
+
16
+ class EvaluationResult(BaseModel):
17
+ is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)")
18
+ extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果")
19
+ extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果")
20
+
21
+ class LLMService:
22
+ """Service for handling LLM interactions"""
23
+
24
+ def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7):
25
+ self.llm = ChatGoogleGenerativeAI(
26
+ model=model_name,
27
+ temperature=temperature,
28
+ )
29
+ self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult)
30
+ self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm(
31
+ parser=self.attack_evaluation_parser,
32
+ llm=self.llm
33
+ )
34
+ self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer)
35
+ self.layer_fixing_parser = OutputFixingParser.from_llm(
36
+ parser=self.layer_parser,
37
+ llm=self.llm
38
+ )
39
+ self.json_parser = JsonOutputParser(pydantic_object=AttackLayer)
40
+ self.json_retry_parser = RetryOutputParser.from_llm(
41
+ parser=self.json_parser,
42
+ llm=self.llm
43
+ )
44
+
45
+ async def evaluate_context(self, user_input: str) -> EvaluationResult:
46
+ """Evaluate if the user input is valid for ATT&CK context"""
47
+ system_prompt = """
48
+ 以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。
49
+
50
+ 評価基準:
51
+ 1. サイバー攻撃のシナリオや分析に関する内容か
52
+ 2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か
53
+ 3. 攻撃テクニックやタクティックに関する質問や説明か
54
+
55
+ 回答形式:
56
+ {format_instructions}
57
+
58
+ 注意:
59
+ - 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。
60
+ - True: コンテキストに合致
61
+ - False: コンテキストに合致しない
62
+ - シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。
63
+ - レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。
64
+ """
65
+
66
+ logger.info(f"Evaluating context for: {user_input}")
67
+ try:
68
+ response = await self.llm.ainvoke([
69
+ SystemMessage(content=system_prompt.format(
70
+ format_instructions=self.attack_evaluation_parser.get_format_instructions()
71
+ )),
72
+ HumanMessage(content=user_input)
73
+ ])
74
+
75
+ result = self.attack_evaluation_fixing_parser.parse(response.content)
76
+ if not result.is_valid:
77
+ logger.info("Invalid context detected")
78
+ return result
79
+
80
+ except Exception as e:
81
+ logger.error(f"Error in evaluate_context: {str(e)}")
82
+ raise
83
+
84
+ async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str:
85
+ """Generate or update a scenario based on user input"""
86
+ if not user_input:
87
+ user_input = "シナリオ更新について指示はありません"
88
+ if not current_scenario:
89
+ system_prompt = """
90
+ ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。
91
+
92
+ 要件:
93
+ 1. シナリオには以下の構造を含めてください:
94
+ - タイトル
95
+ - 概要
96
+ - 攻撃フェーズ
97
+ 2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください
98
+ 3. シナリオは具体的で分析可能な内容にしてください
99
+ 4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください
100
+ 5. ユーザの具体的な要件や指示に従ってください
101
+ """
102
+ human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}"
103
+ else:
104
+ system_prompt = """
105
+ 既存のシナリオを更新してください。
106
+
107
+ 要件:
108
+ 1. ユーザの指示に従ってシナリオを更新
109
+ 2. 既存の情報は保持しつつ、新しい情報を追加または修正
110
+ 3. シナリオの一貫性を維持
111
+ 4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持
112
+ 5. ユーザの具体的な要件や指示に従ってください
113
+ """
114
+ human_prompt = f"""
115
+ 現在のシナリオ:
116
+ {current_scenario}
117
+
118
+ 以下の要件に従って、シナリオを更新してください:
119
+
120
+ {user_input}
121
+ """
122
+
123
+ try:
124
+ response = await self.llm.ainvoke([
125
+ SystemMessage(content=system_prompt),
126
+ HumanMessage(content=human_prompt)
127
+ ])
128
+ return response.content.strip()
129
+ except Exception as e:
130
+ logger.error(f"Error in generate_scenario: {str(e)}")
131
+ raise
132
+
133
+ async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str:
134
+ """Generate or update ATT&CK Navigator JSON based on scenario"""
135
+
136
+ if not scenario:
137
+ raise ValueError("シナリオが指定されていません")
138
+ if not layer_operation:
139
+ layer_operation = "レイヤー操作に関して指定はありません"
140
+ try:
141
+ system_prompt = """
142
+ MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。
143
+
144
+ 要件:
145
+ 1. シナリオの各攻撃フェーズに対応するテクニックを特定
146
+ 2. テクニックにはコメントとして、シナリオ内での使用方法を説明
147
+ 3. テクニックの選択は具体的な根拠に基づくこと
148
+ 4. 既存のJSONがある場合は、その情報を考慮して更新
149
+ 5. versionsオブジェクトには必ず以下の3つのキーを含めること:
150
+ - attack: ATT&CKのバージョン
151
+ - navigator: Navigatorのバージョン
152
+ - layer: レイヤーのバージョン
153
+
154
+ DEFAULT_LAYER_SETTINGS:
155
+ {DEFAULT_LAYER_SETTINGS}
156
+
157
+ バージョン指定のルール:
158
+ 1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用
159
+ 2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用
160
+ 3. 既存のJSONがある場合は、そのバージョン情報を保持
161
+ """
162
+
163
+ human_prompt = f"""
164
+ レイヤー操作:
165
+ {layer_operation}
166
+
167
+ シナリオ:
168
+ {scenario}
169
+
170
+ {'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'}
171
+ """
172
+
173
+ response = await self.llm.ainvoke([
174
+ SystemMessage(content=system_prompt),
175
+ HumanMessage(content=human_prompt)
176
+ ])
177
+
178
+ try:
179
+ result = self.layer_fixing_parser.parse(response.content)
180
+ # バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用
181
+ if not all(k in result.versions for k in ['attack', 'navigator', 'layer']):
182
+ result.versions = DEFAULT_LAYER_SETTINGS['versions']
183
+ return json.dumps(result.model_dump(), indent=2, ensure_ascii=False)
184
+ except Exception as e:
185
+ logger.error(f"JSON parsing error: {str(e)}")
186
+ raise
187
+
188
+ except Exception as e:
189
+ logger.error(f"Error in generate_attack_json: {str(e)}")
190
+ raise
workflows/chat_workflow.py ADDED
@@ -0,0 +1,376 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langgraph.graph import StateGraph, START, END
2
+ from langgraph.graph.message import MessagesState
3
+ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
4
+ from services.llm_service import LLMService
5
+ from services.file_service import save_response, create_file_element
6
+ from models.chat_state import AttackState, get_initial_state
7
+ import chainlit as cl
8
+ import json
9
+ import logging
10
+ import os
11
+ from urllib.parse import quote
12
+ # Initialize services
13
+ llm_service = LLMService()
14
+ logger = logging.getLogger(__name__)
15
+
16
+ CHAINLIT_URL = os.environ.get("SPACE_HOST")
17
+ if not CHAINLIT_URL:
18
+ CHAINLIT_URL = "http://localhost:8080"
19
+
20
+ @cl.step(name="コンテキスト評価", type="evaluation")
21
+ async def evaluate_context_node(state: AttackState) -> AttackState:
22
+ """Node for evaluating if the user input is valid for ATT&CK context"""
23
+ msg = cl.Message(content="")
24
+
25
+ # Get the last user message
26
+ user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)]
27
+ user_message = user_messages[-1].content if user_messages else ""
28
+
29
+ try:
30
+ evaluation_result = await llm_service.evaluate_context(user_message)
31
+ state['is_valid_context'] = evaluation_result.is_valid
32
+ state['extracted_user_scenario'] = evaluation_result.extracted_scenario
33
+ state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation
34
+
35
+ if state['is_valid_context']:
36
+ response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。"
37
+ else:
38
+ response_text = "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。"
39
+
40
+ await msg.stream_token(response_text)
41
+ await msg.send()
42
+ state['messages'].append(AIMessage(content=response_text))
43
+
44
+ except Exception as e:
45
+ error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}"
46
+ await msg.stream_token(error_msg)
47
+ await msg.send()
48
+ state['messages'].append(AIMessage(content=error_msg))
49
+ state['is_valid_context'] = False
50
+
51
+ return state
52
+
53
+ @cl.step(name="シナリオ更新", type="update")
54
+ async def update_scenario_node(state: AttackState) -> AttackState:
55
+ """Node for updating the scenario based on user input"""
56
+ msg = cl.Message(content="")
57
+
58
+ # Get the last user message
59
+ user_message = state.get('extracted_user_scenario', None)
60
+ current_scenario = state.get('scenario', None)
61
+ if not user_message and not current_scenario:
62
+ raise ValueError("シナリオの更新に必要な情報がありません。")
63
+
64
+ try:
65
+ updated_scenario = await llm_service.generate_scenario(user_message, current_scenario)
66
+ state['scenario'] = updated_scenario
67
+
68
+ message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。"
69
+ await msg.stream_token(message)
70
+ await msg.send()
71
+ state['messages'].append(AIMessage(content=message))
72
+
73
+ except Exception as e:
74
+ error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}"
75
+ await msg.stream_token(error_msg)
76
+ await msg.send()
77
+ state['messages'].append(AIMessage(content=error_msg))
78
+
79
+ return state
80
+
81
+ @cl.step(name="JSON生成/更新", type="generation", language="json")
82
+ async def generate_json_node(state: AttackState) -> AttackState:
83
+ """Node for generating or updating ATT&CK Navigator JSON"""
84
+ user_message = state.get('extracted_user_layer_operation')
85
+ current_scenario = state.get('scenario')
86
+ existing_json = state.get('attack_json')
87
+
88
+ try:
89
+ json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json)
90
+
91
+ # Save JSON to file
92
+ filename, filepath = save_response(json_content)
93
+ file_element = create_file_element(filename, filepath)
94
+
95
+ json_url = CHAINLIT_URL + "/" + filepath
96
+ json_url = quote(json_url)
97
+
98
+ # Prepare and send the response message
99
+ response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。"
100
+ response += " ファイルをダウンロードしてインポートできます。"
101
+ response += f"ATT&CK Navigator : https://mitre-attack.github.io/attack-navigator//#layerURL={json_url}"
102
+
103
+ msg = cl.Message(content=response, elements=[file_element])
104
+ await msg.send()
105
+
106
+ # Update state
107
+ state['messages'].append(AIMessage(content=response))
108
+ state['attack_json'] = json.loads(json_content)
109
+
110
+ except Exception as e:
111
+ error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}"
112
+ msg = cl.Message(content=error_msg)
113
+ await msg.send()
114
+ state['messages'].append(AIMessage(content=error_msg))
115
+
116
+ return state
117
+
118
+ async def display_state_node(state: AttackState) -> AttackState:
119
+ """Node for displaying the current state before ending"""
120
+ async with cl.Step(name="状態表示", type="display") as step:
121
+ # コンテキストの評価結果
122
+ if state.get('is_valid_context') is not None:
123
+ status = "有効" if state['is_valid_context'] else "無効"
124
+ step.input = "コンテキスト評価"
125
+ step.output = f"コンテキストの評価結果: {status}"
126
+
127
+ # 現在のシナリオ
128
+ if state.get('scenario'):
129
+ async with cl.Step(name="シナリオ情報", type="display") as scenario_step:
130
+ scenario_step.input = "現在のシナリオ"
131
+ scenario_step.output = state['scenario']
132
+
133
+ # JSONの状態
134
+ if state.get('attack_json'):
135
+ async with cl.Step(name="ATT&CK情報", type="display") as attack_step:
136
+ techniques = state['attack_json'].get('techniques', [])
137
+ technique_count = len(techniques)
138
+ attack_step.input = "登録済みテクニック"
139
+
140
+ if technique_count > 0:
141
+ technique_list = "\n".join([
142
+ f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}"
143
+ for t in techniques[:5]
144
+ ])
145
+ if technique_count > 5:
146
+ technique_list += f"\n... 他 {technique_count - 5} 件"
147
+ attack_step.output = f"登録済みテクニック数: {technique_count}\n\n{technique_list}"
148
+ else:
149
+ attack_step.output = "登録済みテクニックはありません"
150
+ msg = cl.Message(content="処理が完了しました。")
151
+ await msg.send()
152
+ return state
153
+
154
+ # Create the graph
155
+ workflow = StateGraph(AttackState)
156
+
157
+ # Add nodes
158
+ workflow.add_node("evaluate_context", evaluate_context_node)
159
+ workflow.add_node("update_scenario", update_scenario_node)
160
+ workflow.add_node("generate_json", generate_json_node)
161
+ workflow.add_node("display_state", display_state_node)
162
+
163
+ # Add edges
164
+ workflow.add_edge(START, "evaluate_context")
165
+ workflow.add_conditional_edges(
166
+ "evaluate_context",
167
+ lambda state: state.get('is_valid_context', False),
168
+ {
169
+ True: "update_scenario",
170
+ False: "display_state"
171
+ }
172
+ )
173
+ workflow.add_edge("update_scenario", "generate_json")
174
+ workflow.add_edge("generate_json", "display_state")
175
+ workflow.add_edge("display_state", END)
176
+
177
+ # Compile the graph
178
+ chainlit_app = workflow.compile()
179
+
180
+ async def test_evaluate_context_node(state: AttackState) -> AttackState:
181
+ """テスト用のコンテキスト評価ノード"""
182
+ user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)]
183
+ user_message = user_messages[-1].content if user_messages else ""
184
+
185
+ try:
186
+ evaluation_result = await llm_service.evaluate_context(user_message)
187
+ state['is_valid_context'] = evaluation_result.is_valid
188
+ state['extracted_user_scenario'] = evaluation_result.extracted_scenario
189
+ state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation
190
+ response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。" if state['is_valid_context'] else "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。"
191
+ state['messages'].append(AIMessage(content=response_text))
192
+ except Exception as e:
193
+ error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}"
194
+ state['messages'].append(AIMessage(content=error_msg))
195
+ state['is_valid_context'] = False
196
+
197
+ return state
198
+
199
+ async def test_update_scenario_node(state: AttackState) -> AttackState:
200
+ """テスト用のシナリオ更新ノード"""
201
+ # Get the last user message
202
+ user_message = state.get('extracted_user_scenario')
203
+ current_scenario = state.get('scenario')
204
+
205
+ try:
206
+ updated_scenario = await llm_service.generate_scenario(user_message, current_scenario)
207
+ state['scenario'] = updated_scenario
208
+ message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。"
209
+ state['messages'].append(AIMessage(content=message))
210
+ except Exception as e:
211
+ error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}"
212
+ state['messages'].append(AIMessage(content=error_msg))
213
+
214
+ return state
215
+
216
+ async def test_generate_json_node(state: AttackState) -> AttackState:
217
+ """テスト用のJSON生成ノード"""
218
+ user_message = state.get('extracted_user_layer_operation')
219
+ current_scenario = state.get('scenario')
220
+ existing_json = state.get('attack_json')
221
+
222
+ try:
223
+ json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json)
224
+ response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。"
225
+ response += " ファイルをダウンロードしてインポートできます。"
226
+ state['messages'].append(AIMessage(content=response))
227
+ state['attack_json'] = json.loads(json_content)
228
+ except Exception as e:
229
+ error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}"
230
+ state['messages'].append(AIMessage(content=error_msg))
231
+
232
+ return state
233
+
234
+ async def test_display_state_node(state: AttackState) -> AttackState:
235
+ """テスト用の状態表示ノード"""
236
+ summary = []
237
+
238
+ if state.get('is_valid_context') is not None:
239
+ status = "有効" if state['is_valid_context'] else "無効"
240
+ summary.append(f"コンテキストの評価結果: {status}")
241
+
242
+ if state.get('scenario'):
243
+ summary.append(f"現在のシナリオ:\n{state['scenario']}")
244
+
245
+ if state.get('attack_json'):
246
+ techniques = state['attack_json'].get('techniques', [])
247
+ technique_count = len(techniques)
248
+ summary.append(f"登録済みテクニック数: {technique_count}")
249
+
250
+ if technique_count > 0:
251
+ technique_list = "\n".join([
252
+ f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}"
253
+ for t in techniques[:5]
254
+ ])
255
+ if technique_count > 5:
256
+ technique_list += f"\n... 他 {technique_count - 5} 件"
257
+ summary.append(f"\n登録済みテクニック:\n{technique_list}")
258
+
259
+ if summary:
260
+ state_summary = "\n\n".join(summary)
261
+ state['messages'].append(AIMessage(content=f"現在の状態:\n{state_summary}"))
262
+
263
+ return state
264
+
265
+ async def main():
266
+ """テスト用のメイン関数"""
267
+ try:
268
+ # 初期状態の作成
269
+ initial_state = get_initial_state()
270
+
271
+ # テスト用の既存シナリオ
272
+ existing_scenario = """
273
+ 標的システムへの不正アクセスシナリオ
274
+
275
+ 概要:
276
+ 攻撃者は、標的のシステムに不正アクセスを試み、機密情報を窃取します。
277
+
278
+ 攻撃フェーズ:
279
+ 1. 初期アクセス
280
+ - パスワードスプレー攻撃による認証情報の取得
281
+ - 有効なアカウントの特定
282
+
283
+ 2. 実行
284
+ - 取得した認証情報を使用してシステムにログイン
285
+ - 不正なコマンドの実行
286
+
287
+ 3. 権限昇格
288
+ - 管理者権限の取得
289
+ - システム設定の変更
290
+
291
+ 4. 防御回避
292
+ - ログの削除
293
+ - 攻撃痕跡の隠蔽
294
+ """
295
+
296
+ # テスト用の既存JSON
297
+ existing_json = {
298
+ "name": "Test Layer",
299
+ "versions": {
300
+ "attack": "16.0",
301
+ "navigator": "4.9.0",
302
+ "layer": "4.5"
303
+ },
304
+ "domain": "enterprise-attack",
305
+ "description": "Test layer for development",
306
+ "filters": {
307
+ "platforms": ["Windows", "Linux", "macOS"]
308
+ },
309
+ "gradient": {
310
+ "colors": ["#ffffff", "#ff6666"],
311
+ "minValue": 0,
312
+ "maxValue": 100
313
+ },
314
+ "techniques": [
315
+ {
316
+ "techniqueID": "T1110",
317
+ "score": 50,
318
+ "color": "#ff6666",
319
+ "comment": "パスワードスプレー攻撃による認証情報の取得",
320
+ "enabled": True
321
+ },
322
+ {
323
+ "techniqueID": "T1078",
324
+ "score": 50,
325
+ "color": "#ff6666",
326
+ "comment": "有効なアカウントを使用した不正アクセス",
327
+ "enabled": True
328
+ }
329
+ ]
330
+ }
331
+
332
+ # 初期状態に既存データを設定
333
+ initial_state['scenario'] = existing_scenario
334
+ initial_state['attack_json'] = existing_json
335
+
336
+ # テスト用のユーザーメッセージ
337
+ test_message = """
338
+ 以下の攻撃シナリオを分析してください:
339
+
340
+ 攻撃者は、標的のシステムに不正アクセスを試みます。
341
+ まず、パスワードスプレー攻撃を実行して、有効なアカウントの認証情報を取得���ます。
342
+ 取得した認証情報を使用して、システムにログインし、機密情報を窃取します。
343
+ 最後に、攻撃の痕跡を隠蔽するために、ログを削除します。
344
+ """
345
+
346
+ test_message = "ATTACKのバージョンを16にして、テクニックは青にして。"
347
+
348
+ # ユーザーメッセージを状態に追加
349
+ initial_state['messages'].append(HumanMessage(content=test_message))
350
+
351
+ # テスト用ワークフローの実行
352
+ state = await test_evaluate_context_node(initial_state)
353
+
354
+ if state.get('is_valid_context', False):
355
+ # シナリオ更新
356
+ state = await test_update_scenario_node(state)
357
+
358
+ # JSON生成
359
+ state = await test_generate_json_node(state)
360
+
361
+ # 状態表示
362
+ state = await test_display_state_node(state)
363
+
364
+ # 結果の表示
365
+ for msg in state['messages']:
366
+ role = "User" if isinstance(msg, HumanMessage) else "Assistant"
367
+ print(f"\n{role}:")
368
+ print(msg.content)
369
+
370
+ except Exception as e:
371
+ print(f"エラーが発生しました: {str(e)}")
372
+ raise
373
+
374
+ if __name__ == "__main__":
375
+ import asyncio
376
+ asyncio.run(main())