Spaces:
Running
Running
File size: 9,212 Bytes
5b76d85 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser
from langchain.output_parsers import RetryOutputParser, OutputFixingParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field, field_validator
from typing import Optional, Dict, Any, List, Union
import json
import logging
from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS
# ロガーの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EvaluationResult(BaseModel):
is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)")
extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果")
extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果")
class LLMService:
"""Service for handling LLM interactions"""
def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7):
self.llm = ChatGoogleGenerativeAI(
model=model_name,
temperature=temperature,
)
self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult)
self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm(
parser=self.attack_evaluation_parser,
llm=self.llm
)
self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer)
self.layer_fixing_parser = OutputFixingParser.from_llm(
parser=self.layer_parser,
llm=self.llm
)
self.json_parser = JsonOutputParser(pydantic_object=AttackLayer)
self.json_retry_parser = RetryOutputParser.from_llm(
parser=self.json_parser,
llm=self.llm
)
async def evaluate_context(self, user_input: str) -> EvaluationResult:
"""Evaluate if the user input is valid for ATT&CK context"""
system_prompt = """
以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。
評価基準:
1. サイバー攻撃のシナリオや分析に関する内容か
2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か
3. 攻撃テクニックやタクティックに関する質問や説明か
回答形式:
{format_instructions}
注意:
- 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。
- True: コンテキストに合致
- False: コンテキストに合致しない
- シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。
- レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。
"""
logger.info(f"Evaluating context for: {user_input}")
try:
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt.format(
format_instructions=self.attack_evaluation_parser.get_format_instructions()
)),
HumanMessage(content=user_input)
])
result = self.attack_evaluation_fixing_parser.parse(response.content)
if not result.is_valid:
logger.info("Invalid context detected")
return result
except Exception as e:
logger.error(f"Error in evaluate_context: {str(e)}")
raise
async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str:
"""Generate or update a scenario based on user input"""
if not user_input:
user_input = "シナリオ更新について指示はありません"
if not current_scenario:
system_prompt = """
ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。
要件:
1. シナリオには以下の構造を含めてください:
- タイトル
- 概要
- 攻撃フェーズ
2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください
3. シナリオは具体的で分析可能な内容にしてください
4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください
5. ユーザの具体的な要件や指示に従ってください
"""
human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}"
else:
system_prompt = """
既存のシナリオを更新してください。
要件:
1. ユーザの指示に従ってシナリオを更新
2. 既存の情報は保持しつつ、新しい情報を追加または修正
3. シナリオの一貫性を維持
4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持
5. ユーザの具体的な要件や指示に従ってください
"""
human_prompt = f"""
現在のシナリオ:
{current_scenario}
以下の要件に従って、シナリオを更新してください:
{user_input}
"""
try:
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt),
HumanMessage(content=human_prompt)
])
return response.content.strip()
except Exception as e:
logger.error(f"Error in generate_scenario: {str(e)}")
raise
async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str:
"""Generate or update ATT&CK Navigator JSON based on scenario"""
if not scenario:
raise ValueError("シナリオが指定されていません")
if not layer_operation:
layer_operation = "レイヤー操作に関して指定はありません"
try:
system_prompt = """
MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。
要件:
1. シナリオの各攻撃フェーズに対応するテクニックを特定
2. テクニックにはコメントとして、シナリオ内での使用方法を説明
3. テクニックの選択は具体的な根拠に基づくこと
4. 既存のJSONがある場合は、その情報を考慮して更新
5. versionsオブジェクトには必ず以下の3つのキーを含めること:
- attack: ATT&CKのバージョン
- navigator: Navigatorのバージョン
- layer: レイヤーのバージョン
DEFAULT_LAYER_SETTINGS:
{DEFAULT_LAYER_SETTINGS}
バージョン指定のルール:
1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用
2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用
3. 既存のJSONがある場合は、そのバージョン情報を保持
"""
human_prompt = f"""
レイヤー操作:
{layer_operation}
シナリオ:
{scenario}
{'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'}
"""
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt),
HumanMessage(content=human_prompt)
])
try:
result = self.layer_fixing_parser.parse(response.content)
# バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用
if not all(k in result.versions for k in ['attack', 'navigator', 'layer']):
result.versions = DEFAULT_LAYER_SETTINGS['versions']
return json.dumps(result.model_dump(), indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"JSON parsing error: {str(e)}")
raise
except Exception as e:
logger.error(f"Error in generate_attack_json: {str(e)}")
raise |