|
import time |
|
|
|
from threading import Thread, Lock |
|
|
|
|
|
class Listener: |
|
task_queue = [] |
|
lock = Lock() |
|
thread = None |
|
|
|
@classmethod |
|
def _process_tasks(cls): |
|
while True: |
|
task = None |
|
with cls.lock: |
|
if cls.task_queue: |
|
task = cls.task_queue.pop(0) |
|
|
|
if task is None: |
|
time.sleep(0.001) |
|
continue |
|
|
|
func, args, kwargs = task |
|
try: |
|
func(*args, **kwargs) |
|
except Exception as e: |
|
print(f"Error in listener thread: {e}") |
|
|
|
@classmethod |
|
def add_task(cls, func, *args, **kwargs): |
|
with cls.lock: |
|
cls.task_queue.append((func, args, kwargs)) |
|
|
|
if cls.thread is None: |
|
cls.thread = Thread(target=cls._process_tasks, daemon=True) |
|
cls.thread.start() |
|
|
|
|
|
def async_run(func, *args, **kwargs): |
|
Listener.add_task(func, *args, **kwargs) |
|
|
|
|
|
class FIFOQueue: |
|
def __init__(self): |
|
self.queue = [] |
|
self.lock = Lock() |
|
|
|
def push(self, item): |
|
with self.lock: |
|
self.queue.append(item) |
|
|
|
def pop(self): |
|
with self.lock: |
|
if self.queue: |
|
return self.queue.pop(0) |
|
return None |
|
|
|
def top(self): |
|
with self.lock: |
|
if self.queue: |
|
return self.queue[0] |
|
return None |
|
|
|
def next(self): |
|
while True: |
|
with self.lock: |
|
if self.queue: |
|
return self.queue.pop(0) |
|
|
|
time.sleep(0.001) |
|
|
|
|
|
class AsyncStream: |
|
def __init__(self): |
|
self.input_queue = FIFOQueue() |
|
self.output_queue = FIFOQueue() |
|
|