Spaces:
Sleeping
Sleeping
# gamestate.py | |
import threading | |
import os | |
import json | |
import time | |
class GameState: | |
def __init__(self, state_file="global_state.json"): | |
self.state_file = state_file | |
self.lock = threading.Lock() | |
# The state dictionary holds all game objects. | |
# You could later add other keys (for example, player positions per session). | |
self.state = {"objects": []} | |
self.load_state() | |
def load_state(self): | |
"""Load the global state from a JSON file if it exists.""" | |
if os.path.exists(self.state_file): | |
try: | |
with open(self.state_file, "r", encoding="utf-8") as f: | |
self.state = json.load(f) | |
except Exception as e: | |
print(f"Error loading global state: {e}") | |
else: | |
self.state = {"objects": []} | |
def save_state(self): | |
"""Persist the global state to a JSON file.""" | |
with self.lock: | |
try: | |
with open(self.state_file, "w", encoding="utf-8") as f: | |
json.dump(self.state, f, indent=2) | |
except Exception as e: | |
print(f"Error saving global state: {e}") | |
def get_state(self): | |
"""Return a copy of the current state.""" | |
with self.lock: | |
return self.state.copy() | |
def update_state(self, new_objects): | |
""" | |
Merge a list of new object records into the state. | |
Uniqueness is determined by the 'obj_id' field. | |
""" | |
with self.lock: | |
# Create a dict of existing objects by obj_id. | |
existing = {obj["obj_id"]: obj for obj in self.state.get("objects", []) if "obj_id" in obj} | |
for obj in new_objects: | |
# If no obj_id, generate one. | |
if "obj_id" not in obj or not obj["obj_id"]: | |
obj["obj_id"] = str(time.time()) + "_" + str(len(existing)) | |
existing[obj["obj_id"]] = obj | |
self.state["objects"] = list(existing.values()) | |
self.state["last_updated"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
self.save_state() | |