from datetime import datetime from tqdm import tqdm import json import os import warnings import random from typing import Any, Dict, List, Optional, Literal from collections import defaultdict import uuid from bw_utils import * from modules.main_role_agent import RPAgent from modules.world_agent import WorldAgent from modules.history_manager import HistoryManager import argparse warnings.filterwarnings('ignore') class Server(): def __init__(self, preset_path: str, world_llm_name: str, role_llm_name: str, embedding_name:str = "bge-m3") : """ The initialization function of the system. Args: preset_path (str): The path to config file of this experiment. world_llm_name (str, optional): The base model of the world agent. Defaults to 'gpt-4o'. role_llm_name (str, optional): The base model of all the role agents. Defaults to 'gpt-4o'. mode (str, optional): If set to be 'script', the role agents will act according to the given script. If set to be 'free', the role agents will act freely based on their backround. Defaults to 'free'. """ self.role_llm_name: str = role_llm_name self.world_llm_name: str = world_llm_name self.embedding_name:str = embedding_name config = load_json_file(preset_path) self.preset_path = preset_path self.config: Dict = config self.experiment_name: str = os.path.basename(preset_path).replace(".json","") + "/" + config["experiment_subname"] + "_" + role_llm_name role_agent_codes: List[str] = config['role_agent_codes'] world_file_path: str = config["world_file_path"] map_file_path: str = config["map_file_path"] if "map_file_path" in config else "" role_file_dir: str = config["role_file_dir"] if "role_file_dir" in config else "./data/roles/" loc_file_path: str = config["loc_file_path"] self.intervention: str = "" if "intervention" in config else "" self.event = self.intervention self.script: str = config["script"] if "script" in config else "" self.language: str = config["language"] if "language" in config else "zh" self.source:str = config["source"] if "source" in config else "" self.idx: int = 0 self.cur_round: int = 0 self.progress: str = "剧本刚刚开始,还什么都没有发生" if self.language == 'zh' else "The story has just begun, nothing happens yet." self.moving_roles_info: Dict[str, Any] = {} self.history_manager = HistoryManager() self.start_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") self.current_status = { "location_code":"", "group":role_agent_codes, } self.scene_characters = {} self.event_history = [] self.role_llm = get_models(role_llm_name) self.logger = get_logger(self.experiment_name) self.init_role_agents(role_agent_codes = role_agent_codes, role_file_dir = role_file_dir, world_file_path=world_file_path, llm = self.role_llm) if world_llm_name == role_llm_name: self.world_llm = self.role_llm else: self.world_llm = get_models(world_llm_name) self.init_world_agent_from_file(world_file_path = world_file_path, map_file_path = map_file_path, loc_file_path = loc_file_path, llm = self.world_llm) # Init def init_role_agents(self, role_agent_codes: List[str], role_file_dir:str, world_file_path:str, llm) -> None: self.role_codes: List[str] = role_agent_codes self.role_agents: Dict[str, RPAgent] = {} for role_code in role_agent_codes: if check_role_code_availability(role_code,role_file_dir): self.role_agents[role_code] = RPAgent(role_code=role_code, role_file_dir=role_file_dir, world_file_path=world_file_path, source = self.source, language=self.language, llm_name = self.role_llm_name, llm = llm, embedding_name=self.embedding_name, ) # print(f"{role_code} Initialized.") else: print(f"Warning: The specified role `{role_code}` does not exist.") def init_world_agent_from_file(self, world_file_path: str, map_file_path: str, loc_file_path: str, llm) -> None: self.world_agent: WorldAgent = WorldAgent(world_file_path = world_file_path, location_file_path = loc_file_path, map_file_path = map_file_path, llm_name=self.world_llm_name, llm = llm, language=self.language) for role_code in self.role_agents: self.role_agents[role_code].world_db = self.world_agent.db self.role_agents[role_code].world_db_name = self.world_agent.db_name def init_role_locations(self, random_allocate: bool = True): """ Set initial positions of the roles. Args: random_allocate (bool, optional): if set to be True, the initial positions of the roles are randomly assigned. Defaults to True. """ init_locations_code = random.choices(self.world_agent.locations, k = len(self.role_codes)) for i,role_code in enumerate(self.role_codes): self.role_agents[role_code].set_location(init_locations_code[i], self.world_agent.find_location_name(init_locations_code[i])) info_text = f"{self.role_agents[role_code].nickname} 现在位于 {self.world_agent.find_location_name(init_locations_code[i])}" \ if self.language == "zh" else f"{self.role_agents[role_code].nickname} is now located at {self.world_agent.find_location_name(init_locations_code[i])}" self.log(info_text) # Simulation def simulate_generator(self, rounds: int = 10, save_dir: str = "", if_save: Literal[0,1] = 0, mode: Literal["free", "script"] = "free", scene_mode: Literal[0,1] = 1,): """ The main function of the simulation. Args: rounds (int, optional): The max rounds of simulation. Defaults to 10. save_dir (str, optional): _description_. Defaults to "". if_save (Literal[0,1], optional): _description_. Defaults to 0. """ self.mode = mode meta_info: Dict[str, Any] = self.continue_simulation_from_file(save_dir) self.if_save: int = if_save start_round: int = meta_info["round"] sub_start_round:int = meta_info["sub_round"] if "sub_round" in meta_info else 0 if start_round == rounds: return # Setting Locations if not meta_info["location_setted"]: self.log("========== Start Location Setting ==========") self.init_role_locations() self._save_current_simulation("location") # Setting Goals if not meta_info["goal_setted"]: yield ("system","","-- Setting Goals --",None) self.log("========== Start Goal Setting ==========") if self.mode == "free": self.get_event() self.log(f"--------- Free Mode: Current Event ---------\n{self.event}\n") self.event_history.append(self.event) elif self.mode == "script": self.get_script() self.log(f"--------- Script Mode: Setted Script ---------\n{self.script}\n") self.event_history.append(self.event) if self.mode == "free": for role_code in self.role_codes: motivation = self.role_agents[role_code].set_motivation( world_description = self.world_agent.description, other_roles_info = self._get_group_members_info_dict(self.role_agents), intervention = self.event, script = self.script ) info_text = f"{self.role_agents[role_code].nickname} 设立了动机: {motivation}" \ if self.language == "zh" else f"{self.role_agents[role_code].nickname} has set the motivation: {motivation}" record_id = str(uuid.uuid4()) self.log(info_text) self.record(role_code=role_code, detail=info_text, actor = role_code, group = [role_code], actor_type = 'role', act_type="goal setting", record_id = record_id) yield ("role",role_code,info_text,record_id) self._save_current_simulation("goal") yield ("system","","-- Simulation Started --",None) selected_role_codes = [] # Simulating for current_round in range(start_round, rounds): self.cur_round = current_round self.log(f"========== Round {current_round+1} Started ==========") if self.event and current_round >= 1: self.log(f"--------- Current Event ---------\n{self.event}\n") yield ("world","","-- Current Event --\n"+self.event, None) self.event_history.append(self.event) if len(self.moving_roles_info) == len(self.role_codes): self.settle_movement() continue # Characters in next scene if scene_mode: group = self._name2code( self.world_agent.decide_screen_actors( self._get_locations_info(False), self.history_manager.get_recent_history(5), self.event, list(set(selected_role_codes + list(self.moving_roles_info.keys()))))) selected_role_codes += group if len(selected_role_codes) > len(self.role_codes): selected_role_codes = [] else: group = self.role_codes self.current_status['group'] = group self.current_status['location_code'] = self.role_agents[group[0]].location_code self.scene_characters[str(current_round)] = group # Prologue # if current_round == 0 and len(group) > 0 # prologue = self.world_agent.generate_location_prologue(location_code=self.role_agents[group[0]].location_code, history_text=self._get_history_text(group),event=self.event,location_info_text=self._find_roles_at_location(self.role_agents[group[0]].location_code,name=True)) # self.log("--Prologue--: "+prologue) # self.record(role_code="None",detail=prologue,act_type="prologue") start_idx = len(self.history_manager) sub_round = sub_start_round for sub_round in range(sub_start_round,3): if self.mode == "script": self.script_instruct(self.progress) else: for role_code in group: self.role_agents[role_code].update_goal(other_roles_status=self._get_status_text(self.role_codes)) for role_code in group: if scene_mode: role_code = self._name2code(self.world_agent.decide_next_actor("\n".join(self.history_manager.get_recent_history(3)),self._get_group_members_info_text(group,status=True),self.script)) if scene_mode else role_code yield from self.implement_next_plan(role_code = role_code, group = group) self._save_current_simulation("action", current_round, sub_round) if_end,epilogue = self.world_agent.judge_if_ended("\n".join(self.history_manager.get_recent_history(len(self.history_manager)-start_idx))) if if_end: record_id = str(uuid.uuid4()) self.log("--Epilogue--: "+epilogue) self.record(role_code = "None", detail = epilogue, actor_type="world", act_type="epilogue", actor = "world", group = [], record_id = record_id) yield ("world","","--Epilogue--: "+epilogue, record_id) break for role_code in group: yield from self.decide_whether_to_move(role_code = role_code, group = self._find_group(role_code)) self.role_agents[role_code].update_status() self.settle_movement() self.update_event(group) sub_start_round = 0 self._save_current_simulation("action", current_round + 1,sub_round + 1) # Main functions using llm def implement_next_plan(self,role_code: str, group: List[str]): other_roles_info = self._get_group_members_info_dict(group) plan = self.role_agents[role_code].plan( other_roles_info = other_roles_info, available_locations = self.world_agent.locations, world_description = self.world_agent.description, intervention = self.event, ) info_text = plan["detail"] if plan["target_role_codes"]: plan["target_role_codes"] = self._name2code(plan["target_role_codes"]) record_id = str(uuid.uuid4()) self.log(f"-Action-\n{self.role_agents[role_code].role_name}: "+ info_text) self.record(role_code = role_code, detail = plan["detail"], actor_type = 'role', act_type = "plan", actor = role_code, group = plan["target_role_codes"] + [role_code], plan = plan, record_id = record_id ) yield ("role", role_code, info_text, record_id) if plan["interact_type"] == "single" and len(plan["target_role_codes"]) == 1 and plan["target_role_codes"][0] in group: yield from self.start_single_role_interaction(plan, record_id) elif plan["interact_type"] == "multi" and len(plan["target_role_codes"]) > 1 and set(plan["target_role_codes"]).issubset(set(group)) : yield from self.start_multi_role_interaction(plan, record_id) elif plan["interact_type"] == "enviroment": yield from self.start_enviroment_interaction(plan,role_code, record_id) elif plan["interact_type"] == "npc" and plan["target_npc_name"]: yield from self.start_npc_interaction(plan,role_code,target_name=plan["target_npc_name"], record_id = record_id) return info_text def decide_whether_to_move(self, role_code: str, group: List[str]): if len(self.world_agent.locations) <= 1: return False if_move, move_detail, destination_code = self.role_agents[role_code].move(locations_info_text = self._get_locations_info(), locations_info = self.world_agent.locations_info) if if_move: self.log(move_detail) print(f"角色选择移动。{self.role_agents[role_code].role_name}正在前往{self.world_agent.find_location_name(destination_code)}" if self.language == "zh" else f"The role decides to move. {self.role_agents[role_code].role_name} is heading to {self.world_agent.find_location_name(destination_code)}.") self.record(role_code = role_code, detail = move_detail, actor_type = 'role', act_type = "move", actor = role_code, group = [role_code], destinatiion_code = destination_code ) yield ("role",role_code,move_detail,None) distance = self.world_agent.get_distance(self.role_agents[role_code].location_code, destination_code) self.role_agents[role_code].set_location(location_code=None, location_name=None) self.moving_roles_info[role_code] = { "location_code":destination_code, "distance":distance } return if_move def start_enviroment_interaction(self, plan: Dict[str, Any], role_code: str, record_id: str): """ Handles the role's interaction with the environment. It gets interaction results from agents in the world, record the result and update the status of the role. Args: plan (Dict[str, Any]): The details of the action. role_code (str): The action maker. Returns: (str): The enviroment response. """ if "action" not in plan: plan["action"] = "" self.current_status['group'] = [role_code] location_code = self.role_agents[role_code].location_code result = self.world_agent.enviroment_interact(action_maker_name = self.role_agents[role_code].role_name, action = plan["action"], action_detail = conceal_thoughts(self.history_manager.search_record_detail(record_id)), location_code = location_code) env_record_id = str(uuid.uuid4()) self.log(f"(Enviroment):{result}") self.record(role_code = role_code, detail = result, actor_type = 'world', act_type = "enviroment", initiator = role_code, actor = "world", group = [role_code], record_id = env_record_id) yield ("world","","(Enviroment):" + result, env_record_id) return conceal_thoughts(self.history_manager.search_record_detail(record_id)) + self.history_manager.search_record_detail(env_record_id) def start_npc_interaction(self, plan: Dict[str, Any], role_code: str, target_name: str, record_id: str, max_rounds: int = 3): """ Handles the role's interaction with the environment. It gets interaction results from agents in the world, record the result and update the status of the role. Args: plan (Dict[str, Any]): The details of the action. role_code (str): The action maker. target_name (str): The target npc. Returns: (str): The enviroment response. """ interaction = plan start_idx = len(self.history_manager) self.log(f"----------NPC Interaction----------\n") self.current_status['group'] = [role_code,target_name] for round in range(max_rounds): npc_interaction = self.world_agent.npc_interact(action_maker_name=self.role_agents[role_code].role_name, action_detail=self.history_manager.search_record_detail(record_id), location_name=self.role_agents[role_code].location_name, target_name=target_name) npc_detail = npc_interaction["detail"] npc_record_id = str(uuid.uuid4()) self.log(f"{target_name}: " + npc_detail) self.record(role_code = role_code, detail = npc_detail, actor_type = 'world', act_type = "npc", actor = "world", group = [role_code], npc_name = target_name, record_id = npc_record_id ) yield ("world","",f"(NPC-{target_name}):" + npc_detail, npc_record_id) if npc_interaction["if_end_interaction"]: break interaction = self.role_agents[role_code].npc_interact( npc_name = target_name, npc_response = self.history_manager.search_record_detail(npc_record_id), history = self.history_manager.get_subsequent_history(start_idx = start_idx), intervention = self.event ) detail = interaction["detail"] record_id = str(uuid.uuid4()) self.log(f"{self.role_agents[role_code].role_name}: " + detail) self.record(role_code = role_code, detail = detail, actor_type = 'role', act_type = "npc", actor = role_code, group = [role_code], npc_name = target_name, record_id = record_id) yield ("role",role_code,detail,record_id) if interaction["if_end_interaction"]: break if_end,epilogue = self.world_agent.judge_if_ended("\n".join(self.history_manager.get_subsequent_history(start_idx))) if if_end: break return "\n".join(self.history_manager.get_subsequent_history(start_idx = start_idx)) def start_single_role_interaction(self, plan: Dict[str, Any], record_id: str, max_rounds: int = 8): interaction = plan acted_role_code = interaction["role_code"] acting_role_code = interaction["target_role_codes"][0] if acting_role_code not in self.role_codes: print(f"Warning: Role {acting_role_code} does not exist.") return self.current_status['group'] = [acted_role_code,acting_role_code] start_idx = len(self.history_manager) for round in range(max_rounds): interaction = self.role_agents[acting_role_code].single_role_interact( action_maker_code = acted_role_code, action_maker_name = self.role_agents[acted_role_code].role_name, action_detail = conceal_thoughts(self.history_manager.search_record_detail(record_id)), action_maker_profile = self.role_agents[acted_role_code].role_profile, intervention = self.event ) detail = interaction["detail"] record_id = str(uuid.uuid4()) self.log(f"{self.role_agents[acting_role_code].role_name}: " + detail) self.record(role_code = acting_role_code, detail = detail, actor_type = 'role', act_type = "single", group = [acted_role_code,acting_role_code], target_role_code = acting_role_code, planning_role_code = plan["role_code"], round = round, record_id = record_id ) yield ("role",acting_role_code,detail,record_id) if interaction["if_end_interaction"]: return if interaction["extra_interact_type"] == "npc": print("---Extra NPC Interact---") result = yield from self.start_npc_interaction(plan=interaction, role_code=acted_role_code, target_name=interaction["target_npc_name"], record_id=record_id) interaction["detail"] = result elif interaction["extra_interact_type"] == "enviroment": print("---Extra Env Interact---") result = yield from self.start_enviroment_interaction(plan=interaction,role_code=acted_role_code,record_id=record_id) interaction["detail"] = result if_end,epilogue = self.world_agent.judge_if_ended("\n".join(self.history_manager.get_subsequent_history(start_idx))) if if_end: break acted_role_code,acting_role_code = acting_role_code,acted_role_code return def start_multi_role_interaction(self, plan: Dict[str, Any], record_id: str, max_rounds: int = 8): interaction = plan acted_role_code = interaction["role_code"] group = interaction["target_role_codes"] group.append(acted_role_code) for code in group: if code not in self.role_codes: print(f"Warning: Role {code} does not exist.") return self.current_status['group'] = group start_idx = len(self.history_manager) other_roles_info = self._get_group_members_info_dict(group) for round in range(max_rounds): acting_role_code = self._name2code(self.world_agent.decide_next_actor(history_text = "\n".join(self.history_manager.get_recent_history(3)), roles_info_text = self._get_group_members_info_text(remove_list_elements(group,acted_role_code),status=True))) interaction = self.role_agents[acting_role_code].multi_role_interact( action_maker_code = acted_role_code, action_maker_name = self.role_agents[acted_role_code].role_name, action_detail = conceal_thoughts(self.history_manager.search_record_detail(record_id)), action_maker_profile = self.role_agents[acted_role_code].role_profile, other_roles_info = other_roles_info, intervention = self.event ) detail = interaction["detail"] record_id = str(uuid.uuid4()) self.log(f"{self.role_agents[acting_role_code].role_name}: "+ detail) self.record(role_code = acting_role_code, detail = detail, actor_type = 'role', act_type = "multi", group = group, actor = acting_role_code, planning_role_code = plan["role_code"], round = round, record_id = record_id ) yield ("role",acting_role_code,detail,record_id) if interaction["if_end_interaction"]: break result = "" if interaction["extra_interact_type"] == "npc": print("---Extra NPC Interact---") result = yield from self.start_npc_interaction(plan=interaction,role_code=acting_role_code,target_name=interaction["target_npc_name"],record_id = record_id) elif interaction["extra_interact_type"] == "enviroment": print("---Extra Env Interact---") result = yield from self.start_enviroment_interaction(plan=interaction,role_code=acting_role_code,record_id = record_id) interaction["detail"] = self.history_manager.search_record_detail(record_id) + result acted_role_code = acting_role_code if_end,epilogue = self.world_agent.judge_if_ended("\n".join(self.history_manager.get_subsequent_history(start_idx))) if if_end: break return # Sub functions using llm def script_instruct(self, last_progress: str, top_k: int = 5): """ Under the script mode, generate instruction for the roles at the beginning of each round. Args: last_progress (str): Where the script went in the last round. top_k (int, optional): The number of action history of each role to refer. Defaults to 1. Returns: Dict[str, Any]: Instruction for each role. """ roles_info_text = self._get_group_members_info_text(self.role_codes,status=True) history_text = "\n".join([self.role_agents[role_code].history_manager.get_recent_history(1)[0] for role_code in self.role_codes]) instruction = self.world_agent.get_script_instruction( roles_info_text=roles_info_text, event = self.event, history_text=history_text, script=self.script, last_progress = last_progress) for code in instruction: if code == "progress": self.log("剧本进度:"+ instruction["progress"]) if self.language == "zh" else self.log("Current Stage:"+ instruction["progress"]) elif code in self.role_codes: # self.role_agents[code].update_goal(instruction = instruction[code]) self.role_agents[code].goal = instruction[code] else: print("Instruction failed, role code:",code) return instruction def get_event(self,): if self.intervention == "" and not self.script: roles_info_text = self._get_group_members_info_text(self.role_codes,profile=True) status_text = self._get_status_text(self.role_codes) event = self.world_agent.generate_event(roles_info_text=roles_info_text,event=self.intervention,history_text=status_text) self.intervention = event elif self.intervention == "" and self.script: self.intervention = self.script self.event = self.intervention return self.intervention def get_script(self,): if self.script == "": roles_info_text = self._get_group_members_info_text(self.role_codes,profile=True) status = "\n".join([self.role_agents[role_code].status for role_code in self.role_codes]) script = self.world_agent.generate_script(roles_info_text=roles_info_text,event=self.intervention,history_text=status) self.script = script # self.event = self.script return self.script def update_event(self, group: List[str], top_k: int = 1): if self.intervention == "": self.event = "" else: status_text = self._get_status_text(group) self.event = self.world_agent.update_event(self.event, self.intervention, status_text, script = self.script) # other def record(self, role_code: str, detail: str, actor_type: str, act_type: str, group: List[str] = [], actor: str = "", record_id = None, **kwargs): if act_type == "plan" and "plan" in kwargs: detail = f"{self.role_agents[role_code].nickname}: {detail}" interact_type = kwargs["plan"]["interact_type"] target = ", ".join(kwargs["plan"]["target_role_codes"]) other_info = f"Interact type: {interact_type}, Target: {target}" elif act_type == "move" and "destination_code" in kwargs: destination = kwargs["destination_code"] other_info = f"Desitination:{destination}" elif act_type == "single": detail = f"{self.role_agents[role_code].nickname}: {detail}" target, planning_role, round = kwargs["target_role_code"],kwargs["planning_role_code"],kwargs["round"] other_info = f"Target: {target}, Planning Role: {planning_role}, Round: {round}" elif act_type == "multi": detail = f"{self.role_agents[role_code].nickname}: {detail}" planning_role, round = kwargs["planning_role_code"],kwargs["round"] other_info = f"Group member:{group}, Planning Role: {planning_role}, Round:{round}," elif act_type == "npc": name = kwargs["npc_name"] other_info = f"Target: {name}" elif act_type == "enviroment": other_info = "" else: other_info = "" record = { "cur_round":self.cur_round, "role_code":role_code, "detail":detail, "actor":actor, "group":group, # visible group "actor_type":actor_type, "act_type":act_type, "other_info":other_info, "record_id":record_id } self.history_manager.add_record(record) for code in group: self.role_agents[code].record(record) def settle_movement(self,): for role_code in self.moving_roles_info.copy(): if not self.moving_roles_info[role_code]["distance"]: location_code = self.moving_roles_info[role_code]["location_code"] self.role_agents[role_code].set_location(location_code, self.world_agent.find_location_name(location_code)) self.log(f"{self.role_agents[role_code].role_name} 已到达 【{self.world_agent.find_location_name(location_code)}】" if self.language == "zh" else f"{self.role_agents[role_code].role_name} has reached [{self.world_agent.find_location_name(location_code)}]") del self.moving_roles_info[role_code] else: self.moving_roles_info[role_code]["distance"] -= 1 def _find_group(self,role_code): return [code for code in self.role_codes if self.role_agents[code].location_code==self.role_agents[role_code].location_code] def _find_roles_at_location(self,location_code,name = False): if name: return [self.role_agents[code].nickname for code in self.role_codes if self.role_agents[code].location_code==location_code] else: return [code for code in self.role_codes if self.role_agents[code].location_code==location_code] def _get_status_text(self,group): return "\n".join([self.role_agents[role_code].status for role_code in group]) def _get_group_members_info_text(self,group, profile = False,status = False): roles_info_text = "" for i, role_code in enumerate(group): name = self.role_agents[role_code].role_name roles_info_text += f"{i+1}. {name}\n(role_code:{role_code})\n" if profile: profile = self.role_agents[role_code].role_profile roles_info_text += f"{profile}\n" if status: status = self.role_agents[role_code].status roles_info_text += f"{status}\n" return roles_info_text def _get_group_members_info_dict(self,group: List[str]): info = { role_code: { "nickname": self.role_agents[role_code].nickname, "profile": self.role_agents[role_code].role_profile } for role_code in group } return info def _get_locations_info(self,detailed = True): location_info_text = "---当前各角色位置---\n" if self.language == "zh" else "---Current Location of Roles---\n" if detailed: for i,location_code in enumerate(self.world_agent.locations_info): location_name = self.world_agent.find_location_name(location_code) description = self.world_agent.locations_info[location_code]["description"] location_info_text += f"\n{i+1}. {location_name}\nlocation_code:{location_code}\n{description}\n\n" role_names = [f"{self.role_agents[code].role_name}({code})" for code in self.role_codes if self.role_agents[code].location_code == location_code] role_names = ", ".join(role_names) location_info_text += "目前在这里的角色有:" + role_names if self.language == "zh" else "Roles located here: " + role_names else: for i,location_code in enumerate(self.world_agent.locations_info): location_name = self.world_agent.find_location_name(location_code) role_names = [f"{self.role_agents[code].role_name}({code})" for code in self.role_codes if self.role_agents[code].location_code == location_code] if len(role_names) == 0:continue role_names = ", ".join(role_names) location_info_text += f"【{location_name}】:" + role_names +";" return location_info_text def _name2code(self,roles): name_dic = {self.role_agents[code].role_name:code for code in self.role_codes} name_dic.update({self.role_agents[code].nickname:code for code in self.role_codes}) if isinstance(roles, list): processed_roles = [] for role in roles: if role in self.role_codes: processed_roles.append(role) elif role in name_dic: processed_roles.append(name_dic[role]) elif "-" in role and role.split("-")[0] in name_dic: processed_roles.append(name_dic[role.split("-")[0]]) elif role.replace("_","·") in self.role_codes: processed_roles.append(role.replace("_","·")) else: processed_roles.append(role) return processed_roles elif isinstance(roles, str) : roles = roles.replace("\n","") if roles in self.role_codes: return roles elif roles in name_dic: return name_dic[roles] elif f"{roles}-{self.language}" in self.role_codes: return f"{roles}-{self.language}" elif "-" in roles and roles.split("-")[0] in name_dic: return name_dic[roles.split("-")[0]] elif roles.replace("_","·") in self.role_codes: return roles.replace("_","·") return roles def log(self,text): self.logger.info(text) print(text) def _save_current_simulation(self, stage: Literal["location", "goal", "action"], current_round: int = 0, sub_round:int = 0): """ Save the current simulation progress. Args: stage (Literal["location", "goal", "action"]): The stage in which the simulation has been carried out current_round (int, optional): If the stage is "action", specify the number of rounds that have been completed. Defaults to 0. """ if not self.if_save: return save_dir = f"./experiment_saves/{self.experiment_name}/{self.role_llm_name}/{self.start_time}" create_dir(save_dir) location_setted, goal_setted = False,False if stage in ["location","goal","action"]: location_setted = True if stage in ["goal","action"]: goal_setted = True meta_info = { "location_setted":location_setted, "goal_setted": goal_setted, "round": current_round, "sub_round": sub_round, } save_json_file(os.path.join(save_dir, "meta_info.json"), meta_info) name = self.experiment_name.split("/")[0] save_json_file(os.path.join(save_dir, f"{name}.json"), self.config) filename = os.path.join(save_dir, f"./server_info.json") save_json_file(filename, self.__getstate__() ) self.history_manager.save_to_file(save_dir) if hasattr(self, 'role_agents'): for role_code in self.role_codes: self.role_agents[role_code].save_to_file(save_dir) self.world_agent.save_to_file(save_dir) def continue_simulation_from_file(self, save_dir: str): """ Restore the record of the last simulation. Args: save_dir (str): The path where the last simulation record was saved. Returns: Dict[str, Any]: The meta information recording the progress of the simulation """ if os.path.exists(save_dir): meta_info = load_json_file(os.path.join(save_dir, "./meta_info.json")) filename = os.path.join(save_dir, f"./server_info.json") states = load_json_file(filename) self.__setstate__(states) for role_code in self.role_codes: self.role_agents[role_code].load_from_file(save_dir) self.world_agent.load_from_file(save_dir) self.history_manager.load_from_file(save_dir) for record in self.history_manager.detailed_history: for code in record["group"]: if code in self.role_codes: self.role_agents[code].record(record) else: meta_info = { "location_setted":False, "goal_setted": False, "round": 0, "sub_round": 0, } return meta_info def __getstate__(self): states = {key: value for key, value in self.__dict__.items() \ if isinstance(value, (str, int, list, dict, bool, type(None))) \ and key not in ['role_agents','world_agent','logger']} return states def __setstate__(self, states): self.__dict__.update(states) class BookWorld(): def __init__(self, preset_path: str, world_llm_name: str, role_llm_name: str, embedding_name:str = "bge-m3") : self.server = Server(preset_path, world_llm_name=world_llm_name, role_llm_name=role_llm_name, embedding_name=embedding_name) self.selected_scene = None def set_generator(self, rounds:int = 10, save_dir:str = "", if_save: Literal[0,1] = 0, mode: Literal["free", "script"] = "free", scene_mode: Literal[0,1] = 0,): self.server.continue_simulation_from_file(save_dir) self.generator = self.server.simulate_generator(rounds = rounds, save_dir = save_dir, if_save = if_save, mode = mode, scene_mode = scene_mode) def get_map_info(self): location_codes = self.server.world_agent.locations location_names = [self.server.world_agent.find_location_name(location_code) for location_code in location_codes] n = len(location_codes) distances = [] for i in range(n): for j in range(i+1,n): if self.server.world_agent.get_distance(location_codes[i], location_codes[j]): distances.append({ "source": location_names[i], "target": location_names[j], "distance": self.server.world_agent.get_distance(location_codes[i], location_codes[j]) }) return { "places": location_names, "distances": distances } def select_scene(self,scene_number): if scene_number == None: self.selected_scene = scene_number else: self.selected_scene = str(scene_number) def get_characters_info(self): characters_info = [] if self.selected_scene == None: codes = self.server.role_codes else: codes = self.server.scene_characters[str(self.selected_scene)] for (i, code) in enumerate(codes): agent = self.server.role_agents[code] location = agent.location_name if code in self.server.moving_roles_info: location_name = self.server.world_agent.find_location_name(self.server.moving_roles_info[code]["location_code"]) distance = self.server.moving_roles_info[code]['distance'] location = f"Reaching {location_name}... ({distance})" chara_info = { "id": i, "name": agent.nickname, "icon": agent.icon_path, "description": agent.role_profile, "goal": agent.goal if agent.goal else agent.motivation, "state": agent.status, "location": location } characters_info.append(chara_info) return characters_info def generate_next_message(self): message_type, code, text,message_id = next(self.generator) if message_type == "role": username = self.server.role_agents[code].role_name icon_path = self.server.role_agents[code].icon_path else: username = message_type icon_path = "" message = { 'username': username, 'type': message_type, # role, world, system 'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'text': text, 'icon': icon_path, "uuid": message_id, "scene": self.server.cur_round } return message def get_settings_info(self): return self.server.world_agent.world_settings def get_current_status(self): status = self.server.current_status status['event'] = self.server.event group = [] for code in status['group']: if code in self.server.role_codes: group.append(self.server.role_agents[code].nickname) else: group.append(code) status['group'] = group location_code = self.server.current_status['location_code'] if location_code not in self.server.world_agent.locations_info: location_name,location_description = "Undefined","Undefined" else: location_name,location_description = self.server.world_agent.find_location_name(location_code),self.server.world_agent.locations_info[location_code]["description"] status['location'] = {'name': location_name, 'description': location_description} return status def handle_message_edit(self,record_id,new_text): group = self.server.history_manager.modify_record(record_id,new_text) for code in group: self.server.role_agents[code].history_manager.modify_record(record_id,new_text) return def get_history_messages(self,save_dir): messages = [] for record in self.server.history_manager.detailed_history: message_type = record["actor_type"] code = record["role_code"] if message_type == "role": username = self.server.role_agents[code].role_name icon_path = self.server.role_agents[code].icon_path else: username = message_type icon_path = "./frontend/assets/images/default-icon.jpg" messages.append({ 'username': username, 'type': message_type, # role, world, system 'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'text': record["detail"], 'icon': icon_path, "uuid": record["record_id"], "scene": record["cur_round"] }) return messages def generate_story(self,): logs = self.server.history_manager.get_complete_history() story = self.server.world_agent.log2story(logs) return story if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--world_llm', type=str, default='gpt-4o-mini') parser.add_argument('--role_llm', type=str, default='gpt-4o-mini') parser.add_argument('--genre', type=str, default='mgdv2') parser.add_argument('--preset_path', type=str, default='') parser.add_argument('--if_save', type=int, default=1, choices=[0,1]) parser.add_argument('--scene_mode', type=int, default=0, choices=[0,1]) parser.add_argument('--rounds', type=int, default=10) parser.add_argument('--save_dir', type=str, default='') parser.add_argument('--mode', type=str, default='free', choices=['free','script']) args = parser.parse_args() world_llm_name = args.world_llm role_llm_name = args.role_llm rounds = args.rounds genre = args.genre preset_path = args.preset_path save_dir = args.save_dir if_save = args.if_save scene_mode = args.scene_mode mode = args.mode if not preset_path: preset_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),f"./config/experiment_{genre}.json") bw = BookWorld(preset_path, world_llm_name=world_llm_name, role_llm_name=role_llm_name) bw.set_generator(rounds = rounds, save_dir = save_dir, if_save = if_save, scene_mode = scene_mode,mode = mode) for i in range(100): try: bw.generate_next_message() except StopIteration: break