awacke1's picture
Update app.py
9e3fe96 verified
raw
history blame
39.3 kB
import streamlit as st
import os
import json
import time
import hashlib
import glob
import base64
import io
import streamlit.components.v1 as components
import edge_tts
import nest_asyncio
import re
import pytz
import random
import asyncio
from datetime import datetime
from PyPDF2 import PdfReader
import threading
import pandas as pd
from PIL import Image
from streamlit_javascript import st_javascript
# ==============================================================================
# Configuration & Constants
# ==============================================================================
nest_asyncio.apply()
st.set_page_config(
page_title="๐Ÿค–๐Ÿ—๏ธ Shared World Builder ๐Ÿ†",
page_icon="๐Ÿ—๏ธ",
layout="wide",
initial_sidebar_state="expanded"
)
icons = '๐Ÿค–๐Ÿ—๏ธ๐Ÿ—ฃ๏ธ๐Ÿ’พ'
Site_Name = '๐Ÿค–๐Ÿ—๏ธ Shared World Builder ๐Ÿ—ฃ๏ธ'
START_ROOM = "World Lobby ๐ŸŒ"
MEDIA_DIR = "."
STATE_FILE = "user_state.txt"
FUN_USERNAMES = {
"BuilderBot ๐Ÿค–": "en-US-AriaNeural", "WorldWeaver ๐Ÿ•ธ๏ธ": "en-US-JennyNeural",
"Terraformer ๐ŸŒฑ": "en-GB-SoniaNeural", "SkyArchitect โ˜๏ธ": "en-AU-NatashaNeural",
"PixelPainter ๐ŸŽจ": "en-CA-ClaraNeural", "VoxelVortex ๐ŸŒช๏ธ": "en-US-GuyNeural",
"CosmicCrafter โœจ": "en-GB-RyanNeural", "GeoGuru ๐Ÿ—บ๏ธ": "en-AU-WilliamNeural",
"BlockBard ๐Ÿงฑ": "en-CA-LiamNeural", "SoundSculptor ๐Ÿ”Š": "en-US-AnaNeural",
}
EDGE_TTS_VOICES = list(set(FUN_USERNAMES.values()))
CHAT_DIR = "chat_logs"
AUDIO_CACHE_DIR = "audio_cache"
AUDIO_DIR = "audio_logs"
SAVED_WORLDS_DIR = "saved_worlds"
HISTORY_LOG_DIR = "history_logs"
PLOT_WIDTH = 50.0
PLOT_DEPTH = 50.0
WORLD_STATE_FILE = os.path.join(SAVED_WORLDS_DIR, "history.json")
PLAYER_TIMEOUT_SECONDS = 15 * 60 # 15 minutes
FILE_EMOJIS = {"md": "๐Ÿ“", "mp3": "๐ŸŽต", "png": "๐Ÿ–ผ๏ธ", "mp4": "๐ŸŽฅ", "zip": "๐Ÿ“ฆ", "json": "๐Ÿ“„"}
PRIMITIVE_MAP = {
"๐ŸŒณ": "Tree", "๐Ÿ—ฟ": "Rock", "๐Ÿ›๏ธ": "Simple House", "๐ŸŒฒ": "Pine Tree", "๐Ÿงฑ": "Brick Wall",
"๐Ÿ”ต": "Sphere", "๐Ÿ“ฆ": "Cube", "๐Ÿงด": "Cylinder", "๐Ÿฆ": "Cone", "๐Ÿฉ": "Torus",
"๐Ÿ„": "Mushroom", "๐ŸŒต": "Cactus", "๐Ÿ”ฅ": "Campfire", "โญ": "Star", "๐Ÿ’Ž": "Gem",
"๐Ÿ—ผ": "Tower", "๐Ÿšง": "Barrier", "โ›ฒ": "Fountain", "๐Ÿฎ": "Lantern", "๐Ÿชง": "Sign Post"
}
for d in [CHAT_DIR, AUDIO_DIR, AUDIO_CACHE_DIR, SAVED_WORLDS_DIR, HISTORY_LOG_DIR]:
os.makedirs(d, exist_ok=True)
state_lock = threading.Lock()
# ==============================================================================
# Utility Functions
# ==============================================================================
def get_current_time_str(tz='UTC'):
try:
timezone = pytz.timezone(tz)
now_aware = datetime.now(timezone)
except pytz.UnknownTimeZoneError:
now_aware = datetime.now(pytz.utc)
return now_aware.strftime('%Y%m%d_%H%M%S')
def clean_filename_part(text, max_len=30):
if not isinstance(text, str):
text = "invalid_name"
text = re.sub(r'\s+', '_', text)
text = re.sub(r'[^\w\-.]', '', text)
return text[:max_len]
def ensure_dir(dir_path):
os.makedirs(dir_path, exist_ok=True)
def generate_filename(content, username, extension):
timestamp = get_current_time_str()
content_hash = hashlib.md5(content[:150].encode()).hexdigest()[:6]
clean_username = clean_filename_part(username)
return f"{clean_username}_{timestamp}_{content_hash}.{extension}"
# ==============================================================================
# History File Management
# ==============================================================================
def initialize_history_file():
ensure_dir(SAVED_WORLDS_DIR)
if not os.path.exists(WORLD_STATE_FILE):
initial_state = {"objects": {}, "players": {}, "action_history": []}
with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(initial_state, f, indent=2)
def read_history_file():
initialize_history_file()
try:
with open(WORLD_STATE_FILE, 'r', encoding='utf-8') as f:
state = json.load(f)
# Ensure all expected keys exist
state.setdefault("objects", {})
state.setdefault("players", {})
state.setdefault("action_history", [])
return state
except Exception as e:
print(f"Error reading history file: {e}")
return {"objects": {}, "players": {}, "action_history": []}
def write_history_file(state):
with state_lock:
try:
with open(WORLD_STATE_FILE, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
except Exception as e:
print(f"Error writing history file: {e}")
def prune_inactive_players(state):
current_time = time.time()
players = state.get("players", {})
updated_players = {}
for username, data in players.items():
last_action = data.get("last_action_timestamp", 0)
if current_time - last_action <= PLAYER_TIMEOUT_SECONDS:
updated_players[username] = data
state["players"] = updated_players
return state
def update_player_state(username, position=None):
state = read_history_file()
state = prune_inactive_players(state)
players = state.get("players", {})
if username not in players:
players[username] = {"position": position or {"x": PLOT_WIDTH / 2, "y": 0.5, "z": PLOT_DEPTH / 2}, "last_action_timestamp": time.time()}
else:
if position:
players[username]["position"] = position
players[username]["last_action_timestamp"] = time.time()
state["players"] = players
write_history_file(state)
if position:
update_action_history(username, "move", {"position": position}, state)
return state
# ๐Ÿ“Š Update Action History: Records actions in session state and history file
def update_action_history(username, action_type, data, state):
timestamp = get_current_time_str()
action_entry = {
"timestamp": timestamp,
"username": username,
"action": action_type,
"data": data
}
if 'action_history' not in st.session_state:
st.session_state.action_history = []
st.session_state.action_history.append(action_entry)
state["action_history"].append(action_entry)
write_history_file(state)
# ๐Ÿ› ๏ธ Persist World Objects: Validates and saves object data to history.json
def persist_world_objects(obj_data, username, action_type):
if not obj_data or not isinstance(obj_data, dict) or 'obj_id' not in obj_data:
print(f"Invalid object data for {action_type}: {obj_data}")
return read_history_file()
state = read_history_file()
state = prune_inactive_players(state)
if action_type == "place":
if 'type' not in obj_data or obj_data['type'] not in PRIMITIVE_MAP.values():
print(f"Invalid object type: {obj_data.get('type', 'None')}")
return state
state["objects"][obj_data['obj_id']] = obj_data
update_action_history(username, "place", {
"obj_id": obj_data['obj_id'],
"type": obj_data['type'],
"position": obj_data['position']
}, state)
elif action_type == "delete":
if obj_data['obj_id'] in state["objects"]:
obj_info = state["objects"][obj_data['obj_id']]
del state["objects"][obj_data['obj_id']]
update_action_history(username, "delete", {
"obj_id": obj_data['obj_id'],
"type": obj_info['type'],
"position": obj_info['position']
}, state)
write_history_file(state)
log_action(username, action_type, obj_data)
return state
def log_action(username, action_type, data):
timestamp = get_current_time_str()
log_entry = {
"timestamp": timestamp,
"username": username,
"action": action_type,
"data": data
}
shared_log_file = os.path.join(HISTORY_LOG_DIR, "shared_history.jsonl")
try:
with open(shared_log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f"Error writing to shared history log: {e}")
clean_username = clean_filename_part(username)
player_log_file = os.path.join(HISTORY_LOG_DIR, f"{clean_username}_history.jsonl")
try:
with open(player_log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f"Error writing to player history log {player_log_file}: {e}")
# ๐Ÿ“ฉ Save and Log Chat: Saves chat message to chat_logs and logs to history_logs
async def save_and_log_chat(username, message, voice):
if not message.strip():
print("Empty chat message, skipping save.")
return None, None
timestamp_str = get_current_time_str()
entry = f"[{timestamp_str}] {username} ({voice}): {message}"
md_filename_base = generate_filename(message, username, "md")
md_file_path = os.path.join(CHAT_DIR, md_filename_base)
try:
ensure_dir(CHAT_DIR)
with open(md_file_path, 'w', encoding='utf-8') as f:
f.write(entry)
except Exception as e:
print(f"Error saving chat to {md_file_path}: {e}")
return None, None
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
st.session_state.chat_history.append(entry)
audio_file = None
if st.session_state.get('enable_audio', True):
text_cleaned = clean_text_for_tts(message)
if text_cleaned and text_cleaned != "No text":
filename_base = generate_filename(text_cleaned, username, "mp3")
save_path = os.path.join(AUDIO_DIR, filename_base)
ensure_dir(AUDIO_DIR)
try:
communicate = edge_tts.Communicate(text_cleaned, voice)
await communicate.save(save_path)
if os.path.exists(save_path) and os.path.getsize(save_path) > 0:
audio_file = save_path
else:
print(f"Audio file {save_path} failed generation.")
except Exception as e:
print(f"Edge TTS Error: {e}")
state = read_history_file()
update_action_history(username, "chat", {"message": message}, state)
return md_file_path, audio_file
# ๐Ÿ’พ Save World State: Saves current state to a named file
def save_world_state(world_name):
if not world_name.strip():
st.error("World name cannot be empty.")
return False
clean_name = clean_filename_part(world_name)
timestamp = get_current_time_str()
filename = f"world_{clean_name}_{timestamp}.json"
save_path = os.path.join(SAVED_WORLDS_DIR, filename)
state = read_history_file()
try:
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
print(f"Saved world state to {save_path}")
st.success(f"Saved world as {filename}")
return True
except Exception as e:
print(f"Error saving world state to {save_path}: {e}")
st.error(f"Failed to save world: {e}")
return False
# ๐Ÿ“‚ Load World State: Loads a saved state from a file
def load_world_state(filename):
load_path = os.path.join(SAVED_WORLDS_DIR, filename)
if not os.path.exists(load_path):
st.error(f"World file not found: {filename}")
return False
try:
with open(load_path, 'r', encoding='utf-8') as f:
state = json.load(f)
state.setdefault("objects", {})
state.setdefault("players", {})
state.setdefault("action_history", [])
write_history_file(state)
st.session_state.world_state = state
st.session_state.action_history = state["action_history"]
print(f"Loaded world state from {load_path}")
st.success(f"Loaded world {filename}")
st.rerun()
return True
except Exception as e:
print(f"Error loading world state from {load_path}: {e}")
st.error(f"Failed to load world: {e}")
return False
# ==============================================================================
# JavaScript Message Handling
# ==============================================================================
def handle_js_messages():
message = st_javascript("""
window.addEventListener('message', (event) => {
return JSON.stringify(event.data);
}, {once: true});
return null;
""")
if message:
try:
data = json.loads(message)
action = data.get("type")
payload = data.get("payload", {})
username = payload.get("username", st.session_state.username)
if action == "place_object":
state = persist_world_objects(payload["object_data"], username, "place")
st.session_state.world_state = state
st.rerun()
elif action == "delete_object":
state = persist_world_objects({"obj_id": payload["obj_id"]}, username, "delete")
st.session_state.world_state = state
st.rerun()
elif action == "move_player":
state = update_player_state(username, payload["position"])
st.session_state.world_state = state
st.rerun()
except json.JSONDecodeError:
print(f"Invalid JS message: {message}")
except Exception as e:
print(f"Error handling JS message: {e}")
# ==============================================================================
# User State & Session Init
# ==============================================================================
def save_username(username):
try:
with open(STATE_FILE, 'w') as f:
f.write(username)
except Exception as e:
print(f"Failed save username: {e}")
def load_username():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return f.read().strip()
except Exception as e:
print(f"Failed load username: {e}")
return None
def init_session_state():
defaults = {
'message_counter': 0,
'audio_cache': {},
'tts_voice': "en-US-AriaNeural",
'chat_history': [],
'action_history': [],
'enable_audio': True,
'download_link_cache': {},
'username': None,
'autosend': False,
'last_message': "",
'selected_object': 'None',
'paste_image_base64': "",
'new_world_name': "MyWorld",
'world_state': {"objects": {}, "players": {}, "action_history": []}
}
for k, v in defaults.items():
if k not in st.session_state:
st.session_state[k] = v
if not isinstance(st.session_state.audio_cache, dict):
st.session_state.audio_cache = {}
if not isinstance(st.session_state.download_link_cache, dict):
st.session_state.download_link_cache = {}
if not isinstance(st.session_state.action_history, list):
st.session_state.action_history = []
if 'username' not in st.session_state or not st.session_state.username:
saved_username = load_username()
st.session_state.username = saved_username if saved_username else random.choice(list(FUN_USERNAMES.keys()))
save_username(st.session_state.username)
# Load history on startup
state = read_history_file()
st.session_state.world_state = state
st.session_state.action_history = state.get("action_history", [])
update_player_state(st.session_state.username)
# ==============================================================================
# Audio / TTS / Chat / File Handling Helpers
# ==============================================================================
def clean_text_for_tts(text):
if not isinstance(text, str):
return "No text"
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
text = re.sub(r'[#*_`!]', '', text)
text = ' '.join(text.split())
return text[:250] or "No text"
def create_file(content, username, file_type="md", save_path=None):
if not save_path:
filename = generate_filename(content, username, file_type)
save_path = os.path.join(MEDIA_DIR, filename)
ensure_dir(os.path.dirname(save_path))
try:
with open(save_path, 'w', encoding='utf-8') as f:
f.write(content)
return save_path
except Exception as e:
print(f"Error creating file {save_path}: {e}")
return None
def get_download_link(file_path, file_type="md"):
if not file_path or not os.path.exists(file_path):
basename = os.path.basename(file_path) if file_path else "N/A"
return f"<small>Not found: {basename}</small>"
try:
mtime = os.path.getmtime(file_path)
except OSError:
mtime = 0
cache_key = f"dl_{file_path}_{mtime}"
if 'download_link_cache' not in st.session_state:
st.session_state.download_link_cache = {}
if cache_key not in st.session_state.download_link_cache:
try:
with open(file_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
mime_types = {"md": "text/markdown", "mp3": "audio/mpeg", "png": "image/png", "mp4": "video/mp4", "zip": "application/zip", "json": "application/json"}
basename = os.path.basename(file_path)
link_html = f'<a href="data:{mime_types.get(file_type, "application/octet-stream")};base64,{b64}" download="{basename}" title="Download {basename}">{FILE_EMOJIS.get(file_type, "๐Ÿ“„")}</a>'
st.session_state.download_link_cache[cache_key] = link_html
except Exception as e:
print(f"Error generating DL link for {file_path}: {e}")
return f"<small>Err</small>"
return st.session_state.download_link_cache.get(cache_key, "<small>CacheErr</small>")
def play_and_download_audio(file_path):
if file_path and os.path.exists(file_path):
try:
st.audio(file_path)
file_type = file_path.split('.')[-1]
st.markdown(get_download_link(file_path, file_type), unsafe_allow_html=True)
except Exception as e:
st.error(f"Audio display error for {os.path.basename(file_path)}: {e}")
async def load_chat_history():
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
if not st.session_state.chat_history:
ensure_dir(CHAT_DIR)
print("Loading chat history from files...")
chat_files = sorted(glob.glob(os.path.join(CHAT_DIR, "*.md")), key=os.path.getmtime)
loaded_count = 0
temp_history = []
for f_path in chat_files:
try:
with open(f_path, 'r', encoding='utf-8') as file:
temp_history.append(file.read().strip())
loaded_count += 1
except Exception as e:
print(f"Err read chat {f_path}: {e}")
st.session_state.chat_history = temp_history
print(f"Loaded {loaded_count} chat entries from files.")
return st.session_state.chat_history
def create_zip_of_files(files_to_zip, prefix="Archive"):
if not files_to_zip:
st.warning("No files provided to zip.")
return None
timestamp = get_current_time_str()
zip_name = f"{prefix}_{timestamp}.zip"
try:
print(f"Creating zip: {zip_name}...")
with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as z:
for f in files_to_zip:
if os.path.exists(f):
z.write(f, os.path.basename(f))
else:
print(f"Skip zip missing: {f}")
print("Zip success.")
st.success(f"Created {zip_name}")
return zip_name
except Exception as e:
print(f"Zip failed: {e}")
st.error(f"Zip failed: {e}")
return None
def delete_files(file_patterns, exclude_files=None):
protected = [STATE_FILE, "app.py", "index.html", "requirements.txt", "README.md", WORLD_STATE_FILE]
if exclude_files:
protected.extend(exclude_files)
deleted_count = 0
errors = 0
for pattern in file_patterns:
pattern_path = pattern
print(f"Attempting to delete files matching: {pattern_path}")
try:
files_to_delete = glob.glob(pattern_path)
if not files_to_delete:
print(f"No files found for pattern: {pattern}")
continue
for f_path in files_to_delete:
basename = os.path.basename(f_path)
if os.path.isfile(f_path) and basename not in protected:
try:
os.remove(f_path)
print(f"Deleted: {f_path}")
deleted_count += 1
except Exception as e:
print(f"Failed delete {f_path}: {e}")
errors += 1
elif os.path.isdir(f_path):
print(f"Skipping directory: {f_path}")
except Exception as glob_e:
print(f"Error matching pattern {pattern}: {glob_e}")
errors += 1
msg = f"Deleted {deleted_count} files."
if errors > 0:
msg += f" Encountered {errors} errors."
st.warning(msg) if errors > 0 else st.success(msg) if deleted_count > 0 else st.info("No matching files found to delete.")
st.session_state['download_link_cache'] = {}
st.session_state['audio_cache'] = {}
async def save_pasted_image(image, username):
if not image:
return None
try:
img_hash = hashlib.md5(image.tobytes()).hexdigest()[:8]
timestamp = get_current_time_str()
filename = f"{timestamp}_pasted_{img_hash}.png"
filepath = os.path.join(MEDIA_DIR, filename)
image.save(filepath, "PNG")
print(f"Pasted image saved: {filepath}")
return filepath
except Exception as e:
print(f"Failed image save: {e}")
return None
def paste_image_component():
pasted_img = None
img_type = None
paste_input = st.text_area("Paste Image Data Here", key="paste_input_area", height=50, value="")
if st.button("Process Pasted Image ๐Ÿ“‹", key="paste_form_button"):
if paste_input and paste_input.startswith('data:image'):
try:
mime_type = paste_input.split(';')[0].split(':')[1]
base64_str = paste_input.split(',')[1]
img_bytes = base64.b64decode(base64_str)
pasted_img = Image.open(io.BytesIO(img_bytes))
img_type = mime_type.split('/')[1]
st.image(pasted_img, caption=f"Pasted ({img_type.upper()})", width=150)
st.session_state.paste_image_base64 = base64_str
except ImportError:
st.error("Pillow library needed for image pasting.")
except Exception as e:
st.error(f"Img decode err: {e}")
st.session_state.paste_image_base64 = ""
else:
st.warning("No valid image data pasted.")
st.session_state.paste_image_base64 = ""
return pasted_img
class AudioProcessor:
def __init__(self):
self.cache_dir = AUDIO_CACHE_DIR
ensure_dir(self.cache_dir)
self.metadata = json.load(open(f"{self.cache_dir}/metadata.json", 'r')) if os.path.exists(f"{self.cache_dir}/metadata.json") else {}
def _save_metadata(self):
try:
with open(f"{self.cache_dir}/metadata.json", 'w') as f:
json.dump(self.metadata, f, indent=2)
except Exception as e:
print(f"Failed metadata save: {e}")
async def create_audio(self, text, voice='en-US-AriaNeural'):
cache_key = hashlib.md5(f"{text[:150]}:{voice}".encode()).hexdigest()
cache_path = os.path.join(self.cache_dir, f"{cache_key}.mp3")
if cache_key in self.metadata and os.path.exists(cache_path):
return cache_path
text_cleaned = clean_text_for_tts(text)
if not text_cleaned:
return None
ensure_dir(os.path.dirname(cache_path))
try:
communicate = edge_tts.Communicate(text_cleaned, voice)
await communicate.save(cache_path)
if os.path.exists(cache_path) and os.path.getsize(cache_path) > 0:
self.metadata[cache_key] = {'timestamp': datetime.now().isoformat(), 'text_length': len(text_cleaned), 'voice': voice}
self._save_metadata()
return cache_path
else:
return None
except Exception as e:
print(f"TTS Create Audio Error: {e}")
return None
def process_pdf_tab(pdf_file, max_pages, voice):
st.subheader("PDF Processing Results")
if pdf_file is None:
st.info("Upload a PDF file and click 'Process PDF' to begin.")
return
audio_processor = AudioProcessor()
try:
reader = PdfReader(pdf_file)
if reader.is_encrypted:
st.warning("PDF is encrypted.")
return
total_pages = min(len(reader.pages), max_pages)
st.write(f"Processing first {total_pages} pages of '{pdf_file.name}'...")
texts, audios = {}, {}
page_threads = []
results_lock = threading.Lock()
def process_page_sync(page_num, page_text):
async def run_async_audio():
return await audio_processor.create_audio(page_text, voice)
try:
audio_path = asyncio.run(run_async_audio())
if audio_path:
with results_lock:
audios[page_num] = audio_path
except Exception as page_e:
print(f"Err process page {page_num+1}: {page_e}")
for i in range(total_pages):
try:
page = reader.pages[i]
text = page.extract_text()
if text and text.strip():
texts[i] = text
thread = threading.Thread(target=process_page_sync, args=(i, text))
page_threads.append(thread)
thread.start()
else:
texts[i] = "[No text extracted]"
except Exception as extract_e:
texts[i] = f"[Error extract: {extract_e}]"
print(f"Error page {i+1} extract: {extract_e}")
progress_bar = st.progress(0.0, text="Processing pages...")
total_threads = len(page_threads)
start_join_time = time.time()
while any(t.is_alive() for t in page_threads):
completed_threads = total_threads - sum(t.is_alive() for t in page_threads)
progress = completed_threads / total_threads if total_threads > 0 else 1.0
progress_bar.progress(min(progress, 1.0), text=f"Processed {completed_threads}/{total_threads} pages...")
if time.time() - start_join_time > 600:
print("PDF processing timed out.")
break
time.sleep(0.5)
progress_bar.progress(1.0, text="Processing complete.")
for i in range(total_pages):
with st.expander(f"Page {i+1}"):
st.markdown(texts.get(i, "[Error getting text]"))
audio_file = audios.get(i)
if audio_file:
play_and_download_audio(audio_file)
else:
st.caption("Audio generation failed or was skipped.")
except Exception as pdf_e:
st.error(f"Err read PDF: {pdf_e}")
st.exception(pdf_e)
# ==============================================================================
# Streamlit UI Layout Functions
# ==============================================================================
def render_sidebar():
with st.sidebar:
st.header("๐Ÿ’พ World State")
st.caption("Manage the shared world state.")
# Active Players
state = read_history_file()
players = state.get("players", {})
st.subheader("Active Players")
current_time = time.time()
for username, data in players.items():
last_action = data.get("last_action_timestamp", 0)
minutes_ago = (current_time - last_action) / 60
st.write(f"{username}: Last active {minutes_ago:.1f} minutes ago")
# Action History Dataset
st.subheader("Action History")
history_data = []
for entry in st.session_state.action_history:
data = entry["data"]
if entry["action"] in ["place", "delete"]:
pos = data.get("position", {})
position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})"
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": entry["action"].capitalize(),
"Object Type": data.get("type", "N/A"),
"Position": position_str
})
elif entry["action"] == "move":
pos = data.get("position", {})
position_str = f"({pos.get('x', 0):.1f}, {pos.get('y', 0):.1f}, {pos.get('z', 0):.1f})"
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": "Move",
"Object Type": "Player",
"Position": position_str
})
elif entry["action"] == "chat":
history_data.append({
"Time": entry["timestamp"],
"Player": entry["username"],
"Action": "Chat",
"Object Type": "Message",
"Position": data.get("message", "N/A")[:50]
})
if history_data:
st.dataframe(pd.DataFrame(history_data), height=200, use_container_width=True)
else:
st.caption("No actions recorded yet.")
# Save World
st.subheader("Save World")
world_name = st.text_input("World Name", value="MyWorld", key="save_world_name")
if st.button("๐Ÿ’พ Save World", key="save_world"):
save_world_state(world_name)
# Load World
st.subheader("Load World")
saved_worlds = [os.path.basename(f) for f in glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json"))]
selected_world = st.selectbox("Select Saved World", ["None"] + saved_worlds, key="load_world_select")
if selected_world != "None" and st.button("๐Ÿ“‚ Load World", key="load_world"):
load_world_state(selected_world)
st.markdown("---")
st.header("๐Ÿ—๏ธ Build Tools")
st.caption("Select an object to place.")
# CSS for tool buttons
st.markdown("""
<style>
.tool-button {
width: 40px;
height: 40px;
margin: 2px;
font-size: 20px;
cursor: pointer;
border: 2px solid #ccc;
background-color: #f9f9f9;
}
.tool-button:hover {
background-color: #e0e0e0;
}
.tool-button.selected {
border-color: #007bff;
background-color: #e7f3ff;
}
.tool-grid {
display: grid;
grid-template-columns: repeat(5, 1fr);
gap: 2px;
}
</style>
""", unsafe_allow_html=True)
# Tool buttons
current_tool = st.session_state.get('selected_object', 'None')
cols = st.columns(5)
col_idx = 0
for emoji, name in PRIMITIVE_MAP.items():
button_key = f"primitive_{name}"
button_type = "primary" if current_tool == name else "secondary"
if cols[col_idx % 5].button(emoji, key=button_key, help=name, type=button_type, use_container_width=True):
if st.session_state.selected_object != name:
st.session_state.selected_object = name
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "tool_change", {"tool": name}, read_history_file())
st.rerun()
col_idx += 1
st.markdown("---")
if st.button("๐Ÿšซ Clear Tool", key="clear_tool", use_container_width=True):
if st.session_state.selected_object != 'None':
st.session_state.selected_object = 'None'
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "tool_change", {"tool": "None"}, read_history_file())
st.rerun()
st.markdown("---")
st.header("๐Ÿ—ฃ๏ธ Voice & User")
current_username = st.session_state.get('username', list(FUN_USERNAMES.keys())[0])
username_options = list(FUN_USERNAMES.keys())
current_index = 0
try:
current_index = username_options.index(current_username)
except ValueError:
current_index = 0
new_username = st.selectbox("Change Name/Voice", options=username_options, index=current_index, key="username_select", format_func=lambda x: x.split(" ")[0])
if new_username != st.session_state.username:
old_username = st.session_state.username
state = read_history_file()
if old_username in state["players"]:
state["players"][new_username] = state["players"].pop(old_username)
write_history_file(state)
st.session_state.username = new_username
st.session_state.tts_voice = FUN_USERNAMES[new_username]
save_username(st.session_state.username)
update_player_state(st.session_state.username)
update_action_history(st.session_state.username, "rename", {"old_username": old_username, "new_username": new_username}, state)
st.rerun()
st.session_state['enable_audio'] = st.toggle("Enable TTS Audio", value=st.session_state.get('enable_audio', True))
def render_main_content():
st.title(f"{Site_Name} - User: {st.session_state.username}")
tab_world, tab_chat, tab_pdf, tab_files = st.tabs(["๐Ÿ—๏ธ World Builder", "๐Ÿ—ฃ๏ธ Chat", "๐Ÿ“š PDF Tools", "๐Ÿ“‚ Files & Settings"])
with tab_world:
st.header("Shared 3D World")
st.caption("Click to place objects with the selected tool, or click to move player. Right-click to delete. State is saved in history.json.")
state = st.session_state.world_state
html_file_path = 'index.html'
try:
with open(html_file_path, 'r', encoding='utf-8') as f:
html_template = f.read()
js_injection_script = f"""<script>
window.USERNAME = {json.dumps(st.session_state.username)};
window.SELECTED_OBJECT_TYPE = {json.dumps(st.session_state.selected_object)};
window.PLOT_WIDTH = {json.dumps(PLOT_WIDTH)};
window.PLOT_DEPTH = {json.dumps(PLOT_DEPTH)};
window.WORLD_STATE = {json.dumps(state)};
console.log("Streamlit State Injected:", {{ username: window.USERNAME, selectedObject: window.SELECTED_OBJECT_TYPE, worldState: window.WORLD_STATE }});
</script>"""
html_content_with_state = html_template.replace('</head>', js_injection_script + '\n</head>', 1)
components.html(html_content_with_state, height=700, scrolling=False)
handle_js_messages()
except FileNotFoundError:
st.error(f"CRITICAL ERROR: Could not find '{html_file_path}'.")
except Exception as e:
st.error(f"Error loading 3D component: {e}")
st.exception(e)
with tab_chat:
st.header(f"{START_ROOM} Chat")
chat_history_task = asyncio.run(load_chat_history())
chat_container = st.container(height=500)
with chat_container:
if chat_history_task:
st.markdown("----\n".join(reversed(chat_history_task[-50:])))
else:
st.caption("No chat messages yet.")
message_value = st.text_input("Your Message:", key=f"message_input_{st.session_state.get('message_counter', 0)}", label_visibility="collapsed")
send_button_clicked = st.button("Send Chat ๐Ÿ’ฌ", key="send_chat_button")
should_autosend = st.session_state.get('autosend', False) and message_value
if send_button_clicked or should_autosend:
message_to_send = message_value
if message_to_send.strip() and message_to_send != st.session_state.get('last_message', ''):
st.session_state.last_message = message_to_send
voice = FUN_USERNAMES.get(st.session_state.username, "en-US-AriaNeural")
asyncio.run(save_and_log_chat(st.session_state.username, message_to_send, voice))
update_player_state(st.session_state.username)
st.session_state.message_counter = st.session_state.get('message_counter', 0) + 1
st.rerun()
elif send_button_clicked:
st.toast("Message empty or same as last.")
st.checkbox("Autosend Chat", key="autosend")
with tab_files:
st.header("๐Ÿ“‚ Files & Settings")
st.subheader("๐Ÿ’พ World State Management")
if st.button("Clear World State", key="clear_world_state"):
state = {"objects": {}, "players": {}, "action_history": []}
write_history_file(state)
st.session_state.world_state = state
st.session_state.action_history = []
st.success("World state cleared!")
st.rerun()
st.subheader("๐Ÿ“ฆ Download Archives")
zip_files = sorted(glob.glob(os.path.join(MEDIA_DIR, "*.zip")), key=os.path.getmtime, reverse=True)
if zip_files:
col_zip1, col_zip2, col_zip3 = st.columns(3)
with col_zip1:
if st.button("Zip Worlds"):
create_zip_of_files([WORLD_STATE_FILE] + glob.glob(os.path.join(SAVED_WORLDS_DIR, "world_*.json")), "Worlds")
with col_zip2:
if st.button("Zip Chats"):
create_zip_of_files(glob.glob(os.path.join(CHAT_DIR, "*.md")), "Chats")
with col_zip3:
if st.button("Zip Audio"):
create_zip_of_files(glob.glob(os.path.join(AUDIO_DIR, "*.mp3")) + glob.glob(os.path.join(AUDIO_CACHE_DIR, "*.mp3")), "Audio")
st.caption("Existing Zip Files:")
for zip_file in zip_files:
st.markdown(get_download_link(zip_file, "zip"), unsafe_allow_html=True)
else:
st.caption("No zip archives found.")
# ==============================================================================
# Main Execution Logic
# ==============================================================================
if __name__ == "__main__":
init_session_state()
render_sidebar()
render_main_content()