File size: 1,735 Bytes
5b76d85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from typing import List, Optional, Union, Dict, Any
from langgraph.graph.message import MessagesState
from langchain_core.messages import AIMessage, HumanMessage
from pydantic import BaseModel

class AttackState(MessagesState, total=False):
    """State for the ATT&CK Navigator workflow"""
    attack_json: Optional[Dict[str, Any]] = None
    scenario: Optional[str] = None
    is_valid_context: Optional[bool] = None
    extracted_user_scenario: Optional[str] = None
    extracted_user_layer_operation: Optional[str] = None

def get_initial_state() -> AttackState:
    """Get the initial state for the workflow"""
    return AttackState(
        messages=[],
        attack_json=None,
        scenario=None,
        is_valid_context=None,
        extracted_user_scenario=None,
        extracted_user_layer_operation=None
    )

def add_user_message(state: AttackState, content: str) -> AttackState:
    """Add a user message to the state"""
    state['messages'].append(HumanMessage(content=content))
    return state

def add_ai_message(state: AttackState, content: str) -> AttackState:
    """Add an AI message to the state"""
    state['messages'].append(AIMessage(content=content))
    return state

def set_attack_json(state: AttackState, attack_json: Dict[str, Any]) -> AttackState:
    """Set the ATT&CK JSON in the state"""
    state['attack_json'] = attack_json
    return state

def set_scenario(state: AttackState, scenario: str) -> AttackState:
    """Set the scenario text in the state"""
    state['scenario'] = scenario
    return state

def set_valid_context(state: AttackState, is_valid: bool) -> AttackState:
    """Set the context validity in the state"""
    state['is_valid_context'] = is_valid
    return state