mitre-attack / services /llm_service.py
nyasukun's picture
feat: initial project setup and core functionality
5b76d85
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser
from langchain.output_parsers import RetryOutputParser, OutputFixingParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field, field_validator
from typing import Optional, Dict, Any, List, Union
import json
import logging
from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS
# ロガーの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class EvaluationResult(BaseModel):
is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)")
extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果")
extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果")
class LLMService:
"""Service for handling LLM interactions"""
def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7):
self.llm = ChatGoogleGenerativeAI(
model=model_name,
temperature=temperature,
)
self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult)
self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm(
parser=self.attack_evaluation_parser,
llm=self.llm
)
self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer)
self.layer_fixing_parser = OutputFixingParser.from_llm(
parser=self.layer_parser,
llm=self.llm
)
self.json_parser = JsonOutputParser(pydantic_object=AttackLayer)
self.json_retry_parser = RetryOutputParser.from_llm(
parser=self.json_parser,
llm=self.llm
)
async def evaluate_context(self, user_input: str) -> EvaluationResult:
"""Evaluate if the user input is valid for ATT&CK context"""
system_prompt = """
以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。
評価基準:
1. サイバー攻撃のシナリオや分析に関する内容か
2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か
3. 攻撃テクニックやタクティックに関する質問や説明か
回答形式:
{format_instructions}
注意:
- 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。
- True: コンテキストに合致
- False: コンテキストに合致しない
- シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。
- レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。
"""
logger.info(f"Evaluating context for: {user_input}")
try:
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt.format(
format_instructions=self.attack_evaluation_parser.get_format_instructions()
)),
HumanMessage(content=user_input)
])
result = self.attack_evaluation_fixing_parser.parse(response.content)
if not result.is_valid:
logger.info("Invalid context detected")
return result
except Exception as e:
logger.error(f"Error in evaluate_context: {str(e)}")
raise
async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str:
"""Generate or update a scenario based on user input"""
if not user_input:
user_input = "シナリオ更新について指示はありません"
if not current_scenario:
system_prompt = """
ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。
要件:
1. シナリオには以下の構造を含めてください:
- タイトル
- 概要
- 攻撃フェーズ
2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください
3. シナリオは具体的で分析可能な内容にしてください
4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください
5. ユーザの具体的な要件や指示に従ってください
"""
human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}"
else:
system_prompt = """
既存のシナリオを更新してください。
要件:
1. ユーザの指示に従ってシナリオを更新
2. 既存の情報は保持しつつ、新しい情報を追加または修正
3. シナリオの一貫性を維持
4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持
5. ユーザの具体的な要件や指示に従ってください
"""
human_prompt = f"""
現在のシナリオ:
{current_scenario}
以下の要件に従って、シナリオを更新してください:
{user_input}
"""
try:
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt),
HumanMessage(content=human_prompt)
])
return response.content.strip()
except Exception as e:
logger.error(f"Error in generate_scenario: {str(e)}")
raise
async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str:
"""Generate or update ATT&CK Navigator JSON based on scenario"""
if not scenario:
raise ValueError("シナリオが指定されていません")
if not layer_operation:
layer_operation = "レイヤー操作に関して指定はありません"
try:
system_prompt = """
MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。
要件:
1. シナリオの各攻撃フェーズに対応するテクニックを特定
2. テクニックにはコメントとして、シナリオ内での使用方法を説明
3. テクニックの選択は具体的な根拠に基づくこと
4. 既存のJSONがある場合は、その情報を考慮して更新
5. versionsオブジェクトには必ず以下の3つのキーを含めること:
- attack: ATT&CKのバージョン
- navigator: Navigatorのバージョン
- layer: レイヤーのバージョン
DEFAULT_LAYER_SETTINGS:
{DEFAULT_LAYER_SETTINGS}
バージョン指定のルール:
1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用
2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用
3. 既存のJSONがある場合は、そのバージョン情報を保持
"""
human_prompt = f"""
レイヤー操作:
{layer_operation}
シナリオ:
{scenario}
{'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'}
"""
response = await self.llm.ainvoke([
SystemMessage(content=system_prompt),
HumanMessage(content=human_prompt)
])
try:
result = self.layer_fixing_parser.parse(response.content)
# バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用
if not all(k in result.versions for k in ['attack', 'navigator', 'layer']):
result.versions = DEFAULT_LAYER_SETTINGS['versions']
return json.dumps(result.model_dump(), indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"JSON parsing error: {str(e)}")
raise
except Exception as e:
logger.error(f"Error in generate_attack_json: {str(e)}")
raise