From 3fbf6ea4978589ec85fa26a469681621fccffa72 Mon Sep 17 00:00:00 2001 From: Yeicor <4929005+Yeicor@users.noreply.github.com> Date: Tue, 6 Feb 2024 20:55:04 +0100 Subject: [PATCH] Add async buffered pubsub, websocket updates endpoint and initial fast hashing of objects --- pyproject.toml | 6 +++-- yacv_server/events.py | 40 ----------------------------- yacv_server/pubsub.py | 54 +++++++++++++++++++++++++++++++++++++++ yacv_server/server.py | 46 ++++++++++++++++++++++++++++----- yacv_server/tessellate.py | 23 +++++++++++++++++ 5 files changed, 121 insertions(+), 48 deletions(-) delete mode 100644 yacv_server/events.py create mode 100644 yacv_server/pubsub.py diff --git a/pyproject.toml b/pyproject.toml index b5e8494..8540414 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,11 +13,13 @@ python = "^3.9" build123d = "^0.3.0" partcad = "^0.3.84" +# Web +aiohttp = "^3.9.3" +aiohttp-devtools = "^1.1.2" + # Misc pygltflib = "^1.16.1" tqdm = "^4.66.1" -aiohttp = "^3.9.3" -aiohttp-devtools = "^1.1.2" [build-system] requires = ["poetry-core"] diff --git a/yacv_server/events.py b/yacv_server/events.py deleted file mode 100644 index eeb6c06..0000000 --- a/yacv_server/events.py +++ /dev/null @@ -1,40 +0,0 @@ -import asyncio -from typing import TypeVar, Generic, List, Callable, Tuple - -T = TypeVar('T') - - -class EventPublisher(Generic[T]): - """A buffered event publisher that broadcasts to all listeners, including all previously emitted data""" - - _listeners: List[Callable[[T], None]] - _buffer: List[T] - _lock: asyncio.Lock - - def __init__(self): - self._listeners = [] - self._buffer = [] - self._lock = asyncio.Lock() - - async def subscribe(self, listener: Callable[[T], None]): - async with self._lock: - self._listeners.append(listener) - for data in self._buffer: - listener(data) - - def unsubscribe(self, listener: Callable[[T], None]): - async with self._lock: - self._listeners.remove(listener) - - def emit(self, data: T): - async with self._lock: - self._buffer.append(data) - for listener in self._listeners: - listener(data) - - def buffer(self) -> Tuple[List[T], asyncio.Lock]: - return self._buffer, self._lock - - def clear(self): - async with self._lock: - self._buffer.clear() diff --git a/yacv_server/pubsub.py b/yacv_server/pubsub.py new file mode 100644 index 0000000..482bd7d --- /dev/null +++ b/yacv_server/pubsub.py @@ -0,0 +1,54 @@ +import asyncio +from typing import List, TypeVar, \ + Generic, AsyncGenerator + +T = TypeVar('T') + + +class BufferedPubSub(Generic[T]): + """A simple implementation of publish-subscribe pattern using asyncio and buffering all previous events""" + + _buffer: List[T] + _subscribers: List[asyncio.Queue[T]] + _lock = asyncio.Lock() + + def __init__(self): + self._buffer = [] + self._subscribers = [] + + async def publish(self, event: T): + """Publishes an event""" + async with self._lock: + self._buffer.append(event) + for q in self._subscribers: + await q.put(event) + + def publish_nowait(self, event: T): + """Publishes an event without blocking""" + self._buffer.append(event) + for q in self._subscribers: + q.put_nowait(event) + + async def _subscribe(self, include_buffered: bool = True) -> asyncio.Queue[T]: + """Subscribes to events""" + q = asyncio.Queue() + async with self._lock: + self._subscribers.append(q) + if include_buffered: + for event in self._buffer: + await q.put(event) + return q + + async def _unsubscribe(self, q: asyncio.Queue[T]): + """Unsubscribes from events""" + async with self._lock: + self._subscribers.remove(q) + + async def subscribe(self, include_buffered: bool = True) -> AsyncGenerator[T, None]: + """Subscribes to events as an async generator that yields events and automatically unsubscribes""" + q = await self._subscribe(include_buffered) + try: + while True: + yield await q.get() + finally: + await self._unsubscribe(q) diff --git a/yacv_server/server.py b/yacv_server/server.py index 078a306..e7c5727 100644 --- a/yacv_server/server.py +++ b/yacv_server/server.py @@ -4,12 +4,17 @@ import os import signal import sys from threading import Thread -from typing import Optional +from typing import Optional, Tuple from OCP.TopoDS import TopoDS_Shape from aiohttp import web +from pubsub import BufferedPubSub +from tessellate import _hashcode + FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist') +UPDATES_API_PATH = '/api/updates' +OBJECTS_API_PATH = '/api/objects' # /{name} # noinspection PyUnusedLocal @@ -22,11 +27,13 @@ class Server: runner: web.AppRunner thread: Optional[Thread] = None do_shutdown = asyncio.Event() + show_events = BufferedPubSub[Tuple[TopoDS_Shape, str]]() def __init__(self, *args, **kwargs): # --- Routes --- - # - API - # self.app.router.add_route({'POST','GET'}, '/api/{collection}', api_handler) + # - APIs + self.app.router.add_route('GET', f'{UPDATES_API_PATH}', self.api_updates) + self.app.router.add_route('GET', f'{OBJECTS_API_PATH}/{{name}}', self.api_objects) # - Static files from the frontend self.app.router.add_get('/{path:(.*/|)}', _index_handler) # Any folder -> index.html self.app.router.add_static('/', path=FRONTEND_BASE_PATH, name='static_frontend') @@ -74,10 +81,37 @@ class Server: # print('Shutting down server...') await runner.cleanup() - def show_object(self, obj: TopoDS_Shape): - pass + async def api_updates(self, request: web.Request) -> web.WebSocketResponse: + """Handles a publish-only websocket connection that send show_object events along with their hashes and URLs""" + ws = web.WebSocketResponse() + await ws.prepare(request) + + print('New client connected') + async for (obj, name) in self.show_events.subscribe(): + hash_code = _hashcode(obj) + url = f'{UPDATES_API_PATH}/{name}' + print('New object:', name, hash_code, url) + await ws.send_json({'name': name, 'hash': hash_code, 'url': url}) + + # TODO: Start previous loop in a separate task and detect connection close to stop it + + return ws + + obj_counter = 0 + + def show_object(self, obj: TopoDS_Shape, name: Optional[str] = None): + """Publishes a CAD object to the server""" + name = name or f'object_{self.obj_counter}' + self.obj_counter += 1 + self.show_events.publish_nowait((obj, name)) + + async def api_objects(self, request: web.Request) -> web.Response: + return web.Response(body='TODO: Serve the object file here') def get_app() -> web.Application: """Required by aiohttp-devtools""" - return Server().app + from logo.logo import build_logo + server = Server() + server.show_object(build_logo()) + return server.app diff --git a/yacv_server/tessellate.py b/yacv_server/tessellate.py index 0281b1e..eed291c 100644 --- a/yacv_server/tessellate.py +++ b/yacv_server/tessellate.py @@ -1,5 +1,8 @@ import concurrent import copyreg +import hashlib +import io +import re from concurrent.futures import ProcessPoolExecutor, Executor from dataclasses import dataclass from typing import Tuple, Callable, Generator @@ -9,7 +12,9 @@ import numpy as np from OCP.BRep import BRep_Tool from OCP.BRepAdaptor import BRepAdaptor_Curve from OCP.GCPnts import GCPnts_TangentialDeflection +from OCP.TopExp import TopExp from OCP.TopLoc import TopLoc_Location +from OCP.TopTools import TopTools_IndexedMapOfShape from OCP.TopoDS import TopoDS_Face, TopoDS_Edge, TopoDS_Shape, TopoDS_Vertex from build123d import Face, Vector, Shape, Vertex from partcad.wrappers import cq_serialize @@ -188,3 +193,21 @@ def _tessellate_vertex(ocp_vertex: TopoDS_Vertex) -> GLTF2: pbrMetallicRoughness=PbrMetallicRoughness(baseColorFactor=[1.0, 0.5, 0.5, 1.0]), alphaCutoff=None) return create_gltf(vertices, indices, tex_coord, mode, material) + + +def _hashcode(obj: TopoDS_Shape) -> str: + """Utility to compute the hash code of a shape recursively without the need to tessellate it""" + # NOTE: obj.HashCode(MAX_HASH_CODE) is not stable across different runs of the same program + # This is best-effort and not guaranteed to be unique + data = io.BytesIO() + map_of_shapes = TopTools_IndexedMapOfShape() + TopExp.MapShapes_s(obj, map_of_shapes) + for i in range(1, map_of_shapes.Extent() + 1): + sub_shape = map_of_shapes.FindKey(i) + sub_data = io.BytesIO() + TopoDS_Shape.DumpJson(sub_shape, sub_data) + val = sub_data.getvalue() + val = re.sub(b'"this": "[^"]*"', b'', val) # Remove memory address + data.write(val) + to_hash = data.getvalue() + return hashlib.md5(to_hash, usedforsecurity=False).hexdigest()