Add async buffered pubsub, websocket updates endpoint and initial fast hashing of objects

This commit is contained in:
Yeicor
2024-02-06 20:55:04 +01:00
parent e79b9adc61
commit 3fbf6ea497
5 changed files with 121 additions and 48 deletions

View File

@@ -13,11 +13,13 @@ python = "^3.9"
build123d = "^0.3.0" build123d = "^0.3.0"
partcad = "^0.3.84" partcad = "^0.3.84"
# Web
aiohttp = "^3.9.3"
aiohttp-devtools = "^1.1.2"
# Misc # Misc
pygltflib = "^1.16.1" pygltflib = "^1.16.1"
tqdm = "^4.66.1" tqdm = "^4.66.1"
aiohttp = "^3.9.3"
aiohttp-devtools = "^1.1.2"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View File

@@ -1,40 +0,0 @@
import asyncio
from typing import TypeVar, Generic, List, Callable, Tuple
T = TypeVar('T')
class EventPublisher(Generic[T]):
"""A buffered event publisher that broadcasts to all listeners, including all previously emitted data"""
_listeners: List[Callable[[T], None]]
_buffer: List[T]
_lock: asyncio.Lock
def __init__(self):
self._listeners = []
self._buffer = []
self._lock = asyncio.Lock()
async def subscribe(self, listener: Callable[[T], None]):
async with self._lock:
self._listeners.append(listener)
for data in self._buffer:
listener(data)
def unsubscribe(self, listener: Callable[[T], None]):
async with self._lock:
self._listeners.remove(listener)
def emit(self, data: T):
async with self._lock:
self._buffer.append(data)
for listener in self._listeners:
listener(data)
def buffer(self) -> Tuple[List[T], asyncio.Lock]:
return self._buffer, self._lock
def clear(self):
async with self._lock:
self._buffer.clear()

54
yacv_server/pubsub.py Normal file
View File

@@ -0,0 +1,54 @@
import asyncio
from typing import List, TypeVar, \
Generic, AsyncGenerator
T = TypeVar('T')
class BufferedPubSub(Generic[T]):
"""A simple implementation of publish-subscribe pattern using asyncio and buffering all previous events"""
_buffer: List[T]
_subscribers: List[asyncio.Queue[T]]
_lock = asyncio.Lock()
def __init__(self):
self._buffer = []
self._subscribers = []
async def publish(self, event: T):
"""Publishes an event"""
async with self._lock:
self._buffer.append(event)
for q in self._subscribers:
await q.put(event)
def publish_nowait(self, event: T):
"""Publishes an event without blocking"""
self._buffer.append(event)
for q in self._subscribers:
q.put_nowait(event)
async def _subscribe(self, include_buffered: bool = True) -> asyncio.Queue[T]:
"""Subscribes to events"""
q = asyncio.Queue()
async with self._lock:
self._subscribers.append(q)
if include_buffered:
for event in self._buffer:
await q.put(event)
return q
async def _unsubscribe(self, q: asyncio.Queue[T]):
"""Unsubscribes from events"""
async with self._lock:
self._subscribers.remove(q)
async def subscribe(self, include_buffered: bool = True) -> AsyncGenerator[T, None]:
"""Subscribes to events as an async generator that yields events and automatically unsubscribes"""
q = await self._subscribe(include_buffered)
try:
while True:
yield await q.get()
finally:
await self._unsubscribe(q)

View File

@@ -4,12 +4,17 @@ import os
import signal import signal
import sys import sys
from threading import Thread from threading import Thread
from typing import Optional from typing import Optional, Tuple
from OCP.TopoDS import TopoDS_Shape from OCP.TopoDS import TopoDS_Shape
from aiohttp import web from aiohttp import web
from pubsub import BufferedPubSub
from tessellate import _hashcode
FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist') FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist')
UPDATES_API_PATH = '/api/updates'
OBJECTS_API_PATH = '/api/objects' # /{name}
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@@ -22,11 +27,13 @@ class Server:
runner: web.AppRunner runner: web.AppRunner
thread: Optional[Thread] = None thread: Optional[Thread] = None
do_shutdown = asyncio.Event() do_shutdown = asyncio.Event()
show_events = BufferedPubSub[Tuple[TopoDS_Shape, str]]()
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# --- Routes --- # --- Routes ---
# - API # - APIs
# self.app.router.add_route({'POST','GET'}, '/api/{collection}', api_handler) self.app.router.add_route('GET', f'{UPDATES_API_PATH}', self.api_updates)
self.app.router.add_route('GET', f'{OBJECTS_API_PATH}/{{name}}', self.api_objects)
# - Static files from the frontend # - Static files from the frontend
self.app.router.add_get('/{path:(.*/|)}', _index_handler) # Any folder -> index.html self.app.router.add_get('/{path:(.*/|)}', _index_handler) # Any folder -> index.html
self.app.router.add_static('/', path=FRONTEND_BASE_PATH, name='static_frontend') self.app.router.add_static('/', path=FRONTEND_BASE_PATH, name='static_frontend')
@@ -74,10 +81,37 @@ class Server:
# print('Shutting down server...') # print('Shutting down server...')
await runner.cleanup() await runner.cleanup()
def show_object(self, obj: TopoDS_Shape): async def api_updates(self, request: web.Request) -> web.WebSocketResponse:
pass """Handles a publish-only websocket connection that send show_object events along with their hashes and URLs"""
ws = web.WebSocketResponse()
await ws.prepare(request)
print('New client connected')
async for (obj, name) in self.show_events.subscribe():
hash_code = _hashcode(obj)
url = f'{UPDATES_API_PATH}/{name}'
print('New object:', name, hash_code, url)
await ws.send_json({'name': name, 'hash': hash_code, 'url': url})
# TODO: Start previous loop in a separate task and detect connection close to stop it
return ws
obj_counter = 0
def show_object(self, obj: TopoDS_Shape, name: Optional[str] = None):
"""Publishes a CAD object to the server"""
name = name or f'object_{self.obj_counter}'
self.obj_counter += 1
self.show_events.publish_nowait((obj, name))
async def api_objects(self, request: web.Request) -> web.Response:
return web.Response(body='TODO: Serve the object file here')
def get_app() -> web.Application: def get_app() -> web.Application:
"""Required by aiohttp-devtools""" """Required by aiohttp-devtools"""
return Server().app from logo.logo import build_logo
server = Server()
server.show_object(build_logo())
return server.app

View File

@@ -1,5 +1,8 @@
import concurrent import concurrent
import copyreg import copyreg
import hashlib
import io
import re
from concurrent.futures import ProcessPoolExecutor, Executor from concurrent.futures import ProcessPoolExecutor, Executor
from dataclasses import dataclass from dataclasses import dataclass
from typing import Tuple, Callable, Generator from typing import Tuple, Callable, Generator
@@ -9,7 +12,9 @@ import numpy as np
from OCP.BRep import BRep_Tool from OCP.BRep import BRep_Tool
from OCP.BRepAdaptor import BRepAdaptor_Curve from OCP.BRepAdaptor import BRepAdaptor_Curve
from OCP.GCPnts import GCPnts_TangentialDeflection from OCP.GCPnts import GCPnts_TangentialDeflection
from OCP.TopExp import TopExp
from OCP.TopLoc import TopLoc_Location from OCP.TopLoc import TopLoc_Location
from OCP.TopTools import TopTools_IndexedMapOfShape
from OCP.TopoDS import TopoDS_Face, TopoDS_Edge, TopoDS_Shape, TopoDS_Vertex from OCP.TopoDS import TopoDS_Face, TopoDS_Edge, TopoDS_Shape, TopoDS_Vertex
from build123d import Face, Vector, Shape, Vertex from build123d import Face, Vector, Shape, Vertex
from partcad.wrappers import cq_serialize from partcad.wrappers import cq_serialize
@@ -188,3 +193,21 @@ def _tessellate_vertex(ocp_vertex: TopoDS_Vertex) -> GLTF2:
pbrMetallicRoughness=PbrMetallicRoughness(baseColorFactor=[1.0, 0.5, 0.5, 1.0]), pbrMetallicRoughness=PbrMetallicRoughness(baseColorFactor=[1.0, 0.5, 0.5, 1.0]),
alphaCutoff=None) alphaCutoff=None)
return create_gltf(vertices, indices, tex_coord, mode, material) return create_gltf(vertices, indices, tex_coord, mode, material)
def _hashcode(obj: TopoDS_Shape) -> str:
"""Utility to compute the hash code of a shape recursively without the need to tessellate it"""
# NOTE: obj.HashCode(MAX_HASH_CODE) is not stable across different runs of the same program
# This is best-effort and not guaranteed to be unique
data = io.BytesIO()
map_of_shapes = TopTools_IndexedMapOfShape()
TopExp.MapShapes_s(obj, map_of_shapes)
for i in range(1, map_of_shapes.Extent() + 1):
sub_shape = map_of_shapes.FindKey(i)
sub_data = io.BytesIO()
TopoDS_Shape.DumpJson(sub_shape, sub_data)
val = sub_data.getvalue()
val = re.sub(b'"this": "[^"]*"', b'', val) # Remove memory address
data.write(val)
to_hash = data.getvalue()
return hashlib.md5(to_hash, usedforsecurity=False).hexdigest()