big rewrite focusing on faster performance and selection improvements

This commit is contained in:
Yeicor
2024-03-10 15:34:39 +01:00
parent a9ce189c45
commit 719395863d
24 changed files with 663 additions and 1266 deletions

View File

@@ -1,6 +1,8 @@
import asyncio
import threading
import queue
import threading
from typing import List, TypeVar, \
Generic, AsyncGenerator
Generic, Generator
from yacv_server.mylogger import logger
@@ -8,61 +10,74 @@ T = TypeVar('T')
class BufferedPubSub(Generic[T]):
"""A simple implementation of publish-subscribe pattern using asyncio and buffering all previous events"""
"""A simple implementation of publish-subscribe pattern using threading and buffering all previous events"""
_buffer: List[T]
_subscribers: List[asyncio.Queue[T]]
_lock = asyncio.Lock()
max_buffer_size = 1000
_buffer_lock: threading.Lock
_subscribers: List[queue.Queue[T]]
_subscribers_lock: threading.Lock
max_buffer_size: int
def __init__(self):
def __init__(self, max_buffer_size: int = 100):
self._buffer = []
self._buffer_lock = threading.Lock()
self._subscribers = []
self._subscribers_lock = threading.Lock()
self.max_buffer_size = max_buffer_size
def publish_nowait(self, event: T):
def publish(self, event: T):
"""Publishes an event without blocking (synchronous API does not require locking)"""
self._buffer.append(event)
if len(self._buffer) > self.max_buffer_size:
self._buffer.pop(0)
for q in self._subscribers:
q.put_nowait(event)
with self._buffer_lock:
self._buffer.append(event)
if len(self._buffer) > self.max_buffer_size:
self._buffer.pop(0)
for q in self._subscribers:
q.put(event)
async def _subscribe(self, include_buffered: bool = True, include_future: bool = True) -> asyncio.Queue[T]:
def _subscribe(self, include_buffered: bool = True, include_future: bool = True) -> queue.Queue[T]:
"""Subscribes to events"""
q = asyncio.Queue()
async with self._lock:
q = queue.Queue()
with self._subscribers_lock:
self._subscribers.append(q)
logger.debug(f"Subscribed to %s (%d subscribers)", self, len(self._subscribers))
if include_buffered:
for event in self._buffer:
await q.put(event)
with self._buffer_lock:
for event in self._buffer:
q.put(event)
if not include_future:
await q.put(None)
q.put(None)
return q
async def _unsubscribe(self, q: asyncio.Queue[T]):
def _unsubscribe(self, q: queue.Queue[T]):
"""Unsubscribes from events"""
async with self._lock:
with self._subscribers_lock:
self._subscribers.remove(q)
logger.debug(f"Unsubscribed from %s (%d subscribers)", self, len(self._subscribers))
async def subscribe(self, include_buffered: bool = True, include_future: bool = True) -> AsyncGenerator[T, None]:
"""Subscribes to events as an async generator that yields events and automatically unsubscribes"""
q = await self._subscribe(include_buffered, include_future)
def subscribe(self, include_buffered: bool = True, include_future: bool = True) -> Generator[T, None, None]:
"""Subscribes to events as an generator that yields events and automatically unsubscribes"""
q = self._subscribe(include_buffered, include_future)
try:
while True:
v = await q.get()
v = q.get()
# include_future is incompatible with None values as they are used to signal the end of the stream
if v is None and not include_future:
break
yield v
finally: # When aclose() is called
await self._unsubscribe(q)
self._unsubscribe(q)
def buffer(self) -> List[T]:
"""Returns a shallow copy of the list of buffered events"""
return self._buffer[:]
with self._buffer_lock:
return self._buffer[:]
def delete(self, event: T):
"""Deletes an event from the buffer"""
self._buffer.remove(event)
with self._buffer_lock:
self._buffer.remove(event)
def clear(self):
"""Clears the buffer"""
with self._buffer_lock:
self._buffer.clear()