More flexible and add deps for http+websockets server

This commit is contained in:
Yeicor
2024-02-05 19:29:48 +01:00
parent f98a95791d
commit 69106abbbd
5 changed files with 654 additions and 14 deletions

27
yacv_server/events.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import TypeVar, Generic, List, Callable
T = TypeVar('T')
class EventPublisher(Generic[T]):
"""A buffered event publisher that broadcasts to all listeners, including all previously emitted data"""
_listeners: List[Callable[[T], None]]
_buffer: List[T]
def __init__(self):
self._listeners = []
self._buffer = []
def subscribe(self, listener: Callable[[T], None]):
self._listeners.append(listener)
for data in self._buffer:
listener(data)
def unsubscribe(self, listener: Callable[[T], None]):
self._listeners.remove(listener)
def emit(self, data: T):
self._buffer.append(data)
for listener in self._listeners:
listener(data)