File size: 9,212 Bytes
5b76d85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser
from langchain.output_parsers import RetryOutputParser, OutputFixingParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field, field_validator
from typing import Optional, Dict, Any, List, Union
import json
import logging
from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS

# ロガーの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class EvaluationResult(BaseModel):
    is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)")
    extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果")
    extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果")

class LLMService:
    """Service for handling LLM interactions"""
    
    def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7):
        self.llm = ChatGoogleGenerativeAI(
            model=model_name,
            temperature=temperature,
        )
        self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult)
        self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm(
            parser=self.attack_evaluation_parser,
            llm=self.llm
        )
        self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer)
        self.layer_fixing_parser = OutputFixingParser.from_llm(
            parser=self.layer_parser,
            llm=self.llm
        )
        self.json_parser = JsonOutputParser(pydantic_object=AttackLayer)
        self.json_retry_parser = RetryOutputParser.from_llm(
            parser=self.json_parser,
            llm=self.llm
        )

    async def evaluate_context(self, user_input: str) -> EvaluationResult:
        """Evaluate if the user input is valid for ATT&CK context"""
        system_prompt = """
        以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。

        評価基準:
        1. サイバー攻撃のシナリオや分析に関する内容か
        2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か
        3. 攻撃テクニックやタクティックに関する質問や説明か

        回答形式:
        {format_instructions}

        注意:
        - 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。
        - True: コンテキストに合致
        - False: コンテキストに合致しない
        - シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。
        - レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。
        """

        logger.info(f"Evaluating context for: {user_input}")
        try:
            response = await self.llm.ainvoke([
                SystemMessage(content=system_prompt.format(
                    format_instructions=self.attack_evaluation_parser.get_format_instructions()
                )),
                HumanMessage(content=user_input)
            ])
            
            result = self.attack_evaluation_fixing_parser.parse(response.content)
            if not result.is_valid:
                logger.info("Invalid context detected")
            return result
            
        except Exception as e:
            logger.error(f"Error in evaluate_context: {str(e)}")
            raise

    async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str:
        """Generate or update a scenario based on user input"""
        if not user_input:
            user_input = "シナリオ更新について指示はありません"
        if not current_scenario:
            system_prompt = """
            ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。

            要件:
            1. シナリオには以下の構造を含めてください:
               - タイトル
               - 概要
               - 攻撃フェーズ
            2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください
            3. シナリオは具体的で分析可能な内容にしてください
            4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください
            5. ユーザの具体的な要件や指示に従ってください
            """
            human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}"
        else:
            system_prompt = """
            既存のシナリオを更新してください。

            要件:
            1. ユーザの指示に従ってシナリオを更新
            2. 既存の情報は保持しつつ、新しい情報を追加または修正
            3. シナリオの一貫性を維持
            4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持
            5. ユーザの具体的な要件や指示に従ってください
            """
            human_prompt = f"""
            現在のシナリオ:
            {current_scenario}

            以下の要件に従って、シナリオを更新してください:

            {user_input}
            """

        try:
            response = await self.llm.ainvoke([
                SystemMessage(content=system_prompt),
                HumanMessage(content=human_prompt)
            ])
            return response.content.strip()
        except Exception as e:
            logger.error(f"Error in generate_scenario: {str(e)}")
            raise

    async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str:
        """Generate or update ATT&CK Navigator JSON based on scenario"""

        if not scenario:
            raise ValueError("シナリオが指定されていません")
        if not layer_operation:
            layer_operation = "レイヤー操作に関して指定はありません"
        try:
            system_prompt = """
            MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。

            要件:
            1. シナリオの各攻撃フェーズに対応するテクニックを特定
            2. テクニックにはコメントとして、シナリオ内での使用方法を説明
            3. テクニックの選択は具体的な根拠に基づくこと
            4. 既存のJSONがある場合は、その情報を考慮して更新
            5. versionsオブジェクトには必ず以下の3つのキーを含めること:
               - attack: ATT&CKのバージョン
               - navigator: Navigatorのバージョン
               - layer: レイヤーのバージョン

            DEFAULT_LAYER_SETTINGS:
            {DEFAULT_LAYER_SETTINGS}

            バージョン指定のルール:
            1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用
            2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用
            3. 既存のJSONがある場合は、そのバージョン情報を保持
            """
            
            human_prompt = f"""
            レイヤー操作:
            {layer_operation}

            シナリオ:
            {scenario}

            {'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'}
            """

            response = await self.llm.ainvoke([
                SystemMessage(content=system_prompt),
                HumanMessage(content=human_prompt)
            ])
            
            try:
                result = self.layer_fixing_parser.parse(response.content)
                # バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用
                if not all(k in result.versions for k in ['attack', 'navigator', 'layer']):
                    result.versions = DEFAULT_LAYER_SETTINGS['versions']
                return json.dumps(result.model_dump(), indent=2, ensure_ascii=False)
            except Exception as e:
                logger.error(f"JSON parsing error: {str(e)}")
                raise
                
        except Exception as e:
            logger.error(f"Error in generate_attack_json: {str(e)}")
            raise