Tai Truong
fix readme
d202ada
raw
history blame contribute delete
1.62 kB
import re
import threading
from contextlib import contextmanager
from pathlib import Path
from filelock import FileLock
from platformdirs import user_cache_dir
class KeyedMemoryLockManager:
"""A manager for acquiring and releasing memory locks based on a key."""
def __init__(self) -> None:
self.locks: dict[str, threading.Lock] = {}
self.global_lock = threading.Lock()
def _get_lock(self, key: str):
with self.global_lock:
if key not in self.locks:
self.locks[key] = threading.Lock()
return self.locks[key]
@contextmanager
def lock(self, key: str):
lock = self._get_lock(key)
lock.acquire()
try:
yield
finally:
lock.release()
class KeyedWorkerLockManager:
"""A manager for acquiring locks between workers based on a key."""
def __init__(self) -> None:
self.locks_dir = Path(user_cache_dir("langflow"), ensure_exists=True) / "worker_locks"
def _validate_key(self, key: str) -> bool:
"""Validate that the string only contains alphanumeric characters and underscores.
Parameters:
s (str): The string to validate.
Returns:
bool: True if the string is valid, False otherwise.
"""
pattern = re.compile(r"^\w+$")
return bool(pattern.match(key))
@contextmanager
def lock(self, key: str):
if not self._validate_key(key):
msg = f"Invalid key: {key}"
raise ValueError(msg)
lock = FileLock(self.locks_dir / key)
with lock:
yield