from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser from langchain.output_parsers import RetryOutputParser, OutputFixingParser from langchain_core.prompts import PromptTemplate from pydantic import BaseModel, Field, field_validator from typing import Optional, Dict, Any, List, Union import json import logging from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS # ロガーの設定 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) class EvaluationResult(BaseModel): is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)") extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果") extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果") class LLMService: """Service for handling LLM interactions""" def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7): self.llm = ChatGoogleGenerativeAI( model=model_name, temperature=temperature, ) self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult) self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm( parser=self.attack_evaluation_parser, llm=self.llm ) self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer) self.layer_fixing_parser = OutputFixingParser.from_llm( parser=self.layer_parser, llm=self.llm ) self.json_parser = JsonOutputParser(pydantic_object=AttackLayer) self.json_retry_parser = RetryOutputParser.from_llm( parser=self.json_parser, llm=self.llm ) async def evaluate_context(self, user_input: str) -> EvaluationResult: """Evaluate if the user input is valid for ATT&CK context""" system_prompt = """ 以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。 評価基準: 1. サイバー攻撃のシナリオや分析に関する内容か 2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か 3. 攻撃テクニックやタクティックに関する質問や説明か 回答形式: {format_instructions} 注意: - 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。 - True: コンテキストに合致 - False: コンテキストに合致しない - シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。 - レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。 """ logger.info(f"Evaluating context for: {user_input}") try: response = await self.llm.ainvoke([ SystemMessage(content=system_prompt.format( format_instructions=self.attack_evaluation_parser.get_format_instructions() )), HumanMessage(content=user_input) ]) result = self.attack_evaluation_fixing_parser.parse(response.content) if not result.is_valid: logger.info("Invalid context detected") return result except Exception as e: logger.error(f"Error in evaluate_context: {str(e)}") raise async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str: """Generate or update a scenario based on user input""" if not user_input: user_input = "シナリオ更新について指示はありません" if not current_scenario: system_prompt = """ ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。 要件: 1. シナリオには以下の構造を含めてください: - タイトル - 概要 - 攻撃フェーズ 2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください 3. シナリオは具体的で分析可能な内容にしてください 4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください 5. ユーザの具体的な要件や指示に従ってください """ human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}" else: system_prompt = """ 既存のシナリオを更新してください。 要件: 1. ユーザの指示に従ってシナリオを更新 2. 既存の情報は保持しつつ、新しい情報を追加または修正 3. シナリオの一貫性を維持 4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持 5. ユーザの具体的な要件や指示に従ってください """ human_prompt = f""" 現在のシナリオ: {current_scenario} 以下の要件に従って、シナリオを更新してください: {user_input} """ try: response = await self.llm.ainvoke([ SystemMessage(content=system_prompt), HumanMessage(content=human_prompt) ]) return response.content.strip() except Exception as e: logger.error(f"Error in generate_scenario: {str(e)}") raise async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str: """Generate or update ATT&CK Navigator JSON based on scenario""" if not scenario: raise ValueError("シナリオが指定されていません") if not layer_operation: layer_operation = "レイヤー操作に関して指定はありません" try: system_prompt = """ MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。 要件: 1. シナリオの各攻撃フェーズに対応するテクニックを特定 2. テクニックにはコメントとして、シナリオ内での使用方法を説明 3. テクニックの選択は具体的な根拠に基づくこと 4. 既存のJSONがある場合は、その情報を考慮して更新 5. versionsオブジェクトには必ず以下の3つのキーを含めること: - attack: ATT&CKのバージョン - navigator: Navigatorのバージョン - layer: レイヤーのバージョン DEFAULT_LAYER_SETTINGS: {DEFAULT_LAYER_SETTINGS} バージョン指定のルール: 1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用 2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用 3. 既存のJSONがある場合は、そのバージョン情報を保持 """ human_prompt = f""" レイヤー操作: {layer_operation} シナリオ: {scenario} {'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'} """ response = await self.llm.ainvoke([ SystemMessage(content=system_prompt), HumanMessage(content=human_prompt) ]) try: result = self.layer_fixing_parser.parse(response.content) # バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用 if not all(k in result.versions for k in ['attack', 'navigator', 'layer']): result.versions = DEFAULT_LAYER_SETTINGS['versions'] return json.dumps(result.model_dump(), indent=2, ensure_ascii=False) except Exception as e: logger.error(f"JSON parsing error: {str(e)}") raise except Exception as e: logger.error(f"Error in generate_attack_json: {str(e)}") raise