Spaces:
Running
Running
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
from langchain_core.output_parsers import PydanticOutputParser, JsonOutputParser | |
from langchain.output_parsers import RetryOutputParser, OutputFixingParser | |
from langchain_core.prompts import PromptTemplate | |
from pydantic import BaseModel, Field, field_validator | |
from typing import Optional, Dict, Any, List, Union | |
import json | |
import logging | |
from models.attack_schema import AttackLayer, Technique, DEFAULT_LAYER_SETTINGS | |
# ロガーの設定 | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
class EvaluationResult(BaseModel): | |
is_valid: bool = Field(description="コンテキスト評価結果(TrueまたはFalse)") | |
extracted_scenario: Optional[str] = Field(None, description="シナリオの抽出結果") | |
extracted_layer_operation: Optional[str] = Field(None, description="レイヤー操作の抽出結果") | |
class LLMService: | |
"""Service for handling LLM interactions""" | |
def __init__(self, model_name: str = "gemini-2.0-flash", temperature: float = 0.7): | |
self.llm = ChatGoogleGenerativeAI( | |
model=model_name, | |
temperature=temperature, | |
) | |
self.attack_evaluation_parser = PydanticOutputParser(pydantic_object=EvaluationResult) | |
self.attack_evaluation_fixing_parser = OutputFixingParser.from_llm( | |
parser=self.attack_evaluation_parser, | |
llm=self.llm | |
) | |
self.layer_parser = PydanticOutputParser(pydantic_object=AttackLayer) | |
self.layer_fixing_parser = OutputFixingParser.from_llm( | |
parser=self.layer_parser, | |
llm=self.llm | |
) | |
self.json_parser = JsonOutputParser(pydantic_object=AttackLayer) | |
self.json_retry_parser = RetryOutputParser.from_llm( | |
parser=self.json_parser, | |
llm=self.llm | |
) | |
async def evaluate_context(self, user_input: str) -> EvaluationResult: | |
"""Evaluate if the user input is valid for ATT&CK context""" | |
system_prompt = """ | |
以下のユーザ入力が、サイバー攻撃の分析やMITRE ATT&CKフレームワークのレイヤーに対する指示として適切かどうかを評価し、指示を分割してください。 | |
評価基準: | |
1. サイバー攻撃のシナリオや分析に関する内容か | |
2. ATT&CKフレームワークのレイヤーに対する指示や更新要求か | |
3. 攻撃テクニックやタクティックに関する質問や説明か | |
回答形式: | |
{format_instructions} | |
注意: | |
- 評価結果(is_valid)は必ずTrueまたはFalseのいずれかのみを出力してください。 | |
- True: コンテキストに合致 | |
- False: コンテキストに合致しない | |
- シナリオの抽出結果(extracted_scenario)は、ユーザ入力の文書からサイバー攻撃分析シナリオに関する部分を抽出してください。なければ省力可能。 | |
- レイヤー操作の抽出結果(extracted_layer_operation)は、ユーザ入力の文書からMITRE ATT&CKフレームワークのレイヤー操作に関する部分を抽出してください。なければ省略可能。 | |
""" | |
logger.info(f"Evaluating context for: {user_input}") | |
try: | |
response = await self.llm.ainvoke([ | |
SystemMessage(content=system_prompt.format( | |
format_instructions=self.attack_evaluation_parser.get_format_instructions() | |
)), | |
HumanMessage(content=user_input) | |
]) | |
result = self.attack_evaluation_fixing_parser.parse(response.content) | |
if not result.is_valid: | |
logger.info("Invalid context detected") | |
return result | |
except Exception as e: | |
logger.error(f"Error in evaluate_context: {str(e)}") | |
raise | |
async def generate_scenario(self, user_input: str, current_scenario: Optional[str] = None) -> str: | |
"""Generate or update a scenario based on user input""" | |
if not user_input: | |
user_input = "シナリオ更新について指示はありません" | |
if not current_scenario: | |
system_prompt = """ | |
ユーザ入力から想起されるサイバー攻撃分析シナリオを作成してください。 | |
要件: | |
1. シナリオには以下の構造を含めてください: | |
- タイトル | |
- 概要 | |
- 攻撃フェーズ | |
2. 各攻撃フェーズは、MITRE ATT&CKフレームワークのTacticsに沿ったものにしてください | |
3. シナリオは具体的で分析可能な内容にしてください | |
4. 実際の攻撃事例や既知の脅威アクターの戦術を参考にしてください | |
5. ユーザの具体的な要件や指示に従ってください | |
""" | |
human_prompt = f"以下の要件に従って、サイバー攻撃分析シナリオを作成してください:\n\n{user_input}" | |
else: | |
system_prompt = """ | |
既存のシナリオを更新してください。 | |
要件: | |
1. ユーザの指示に従ってシナリオを更新 | |
2. 既存の情報は保持しつつ、新しい情報を追加または修正 | |
3. シナリオの一貫性を維持 | |
4. シナリオの構造(タイトル、概要、攻撃フェーズなど)を維持 | |
5. ユーザの具体的な要件や指示に従ってください | |
""" | |
human_prompt = f""" | |
現在のシナリオ: | |
{current_scenario} | |
以下の要件に従って、シナリオを更新してください: | |
{user_input} | |
""" | |
try: | |
response = await self.llm.ainvoke([ | |
SystemMessage(content=system_prompt), | |
HumanMessage(content=human_prompt) | |
]) | |
return response.content.strip() | |
except Exception as e: | |
logger.error(f"Error in generate_scenario: {str(e)}") | |
raise | |
async def generate_attack_json(self, layer_operation: str = None, scenario: str = None, existing_json: Optional[Dict] = None) -> str: | |
"""Generate or update ATT&CK Navigator JSON based on scenario""" | |
if not scenario: | |
raise ValueError("シナリオが指定されていません") | |
if not layer_operation: | |
layer_operation = "レイヤー操作に関して指定はありません" | |
try: | |
system_prompt = """ | |
MITRE ATT&CK Navigatorのレイヤーを生成または更新してください。 | |
要件: | |
1. シナリオの各攻撃フェーズに対応するテクニックを特定 | |
2. テクニックにはコメントとして、シナリオ内での使用方法を説明 | |
3. テクニックの選択は具体的な根拠に基づくこと | |
4. 既存のJSONがある場合は、その情報を考慮して更新 | |
5. versionsオブジェクトには必ず以下の3つのキーを含めること: | |
- attack: ATT&CKのバージョン | |
- navigator: Navigatorのバージョン | |
- layer: レイヤーのバージョン | |
DEFAULT_LAYER_SETTINGS: | |
{DEFAULT_LAYER_SETTINGS} | |
バージョン指定のルール: | |
1. ユーザーが特定のバージョンを指定した場合(例:「ATT&CKバージョンは16で」)、その値を優先して使用 | |
2. ユーザーが指定していない場合は、デフォルトのバージョン設定を使用 | |
3. 既存のJSONがある場合は、そのバージョン情報を保持 | |
""" | |
human_prompt = f""" | |
レイヤー操作: | |
{layer_operation} | |
シナリオ: | |
{scenario} | |
{'既存のJSON:' + json.dumps(existing_json, indent=2) if existing_json else '新規作成'} | |
""" | |
response = await self.llm.ainvoke([ | |
SystemMessage(content=system_prompt), | |
HumanMessage(content=human_prompt) | |
]) | |
try: | |
result = self.layer_fixing_parser.parse(response.content) | |
# バージョン情報が正しくない場合は、デフォルトのバージョン情報を使用 | |
if not all(k in result.versions for k in ['attack', 'navigator', 'layer']): | |
result.versions = DEFAULT_LAYER_SETTINGS['versions'] | |
return json.dumps(result.model_dump(), indent=2, ensure_ascii=False) | |
except Exception as e: | |
logger.error(f"JSON parsing error: {str(e)}") | |
raise | |
except Exception as e: | |
logger.error(f"Error in generate_attack_json: {str(e)}") | |
raise |