Server implementation more or less complete

This commit is contained in:
Yeicor
2024-02-08 19:52:51 +01:00
parent 792ab687f6
commit 56c097aea3
10 changed files with 230 additions and 725 deletions

View File

@@ -1,12 +1,13 @@
import logging
import os
import time
from OCP.TopoDS import TopoDS_Shape
from aiohttp import web
from server import Server
server = Server()
"""The server instance. This is the main entry point to serve CAD objects and other data to the frontend."""
if 'YACV_DISABLE_SERVER' not in os.environ:
# Start a new server ASAP to let the polling client connect while still building CAD objects
@@ -15,20 +16,16 @@ if 'YACV_DISABLE_SERVER' not in os.environ:
server.start()
def get_app() -> web.Application:
def _get_app() -> web.Application:
"""Required by aiohttp-devtools"""
logging.basicConfig(level=logging.DEBUG)
from logo.logo import build_logo
server.show_object(build_logo())
server.show_object(build_logo(), 'logo')
return server.app
def show_object(obj: TopoDS_Shape):
"""Show a CAD object in the default server"""
server.show_object(obj)
if __name__ == '__main__':
# Publish the logo to the server (reusing code from the aiohttp-devtools)
get_app()
_get_app()
# Keep the server running for testing
time.sleep(60)

32
yacv_server/glbs.py Normal file
View File

@@ -0,0 +1,32 @@
from typing import AsyncGenerator
async def glb_sequence_to_glbs(glb_sequence: AsyncGenerator[bytes, None]) -> AsyncGenerator[bytes, None]:
"""Converts a sequence of GLB files into a single GLBS file.
This is a streaming response in the custom GLBS format which consists of the "GLBS" magic text followed by
a sequence of GLB files, each with a 4-byte little-endian length prefix."""
# Write the magic text
yield b'GLBS'
# Write the GLB files
async for glb in glb_sequence:
# Write the length prefix
yield len(glb).to_bytes(4, 'little')
# Write the GLB file
yield glb
if __name__ == '__main__':
import asyncio
async def test_glb_sequence_to_glbs():
async def glb_sequence():
yield b'glb00001'
yield b'glb2'
async for chunk in glb_sequence_to_glbs(glb_sequence()):
print(chunk)
asyncio.run(test_glb_sequence_to_glbs())

View File

@@ -30,7 +30,8 @@ def create_gltf(vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray
images = []
image_blob = b''
image_blob_pointers = []
for img in images:
for i, img in enumerate(images):
img = copy.deepcopy(img) # Avoid modifying the original image
assert img.bufferView is None
assert img.uri is not None
assert img.uri.startswith('data:')
@@ -39,6 +40,7 @@ def create_gltf(vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray
img.mimeType = img.uri.split(';', maxsplit=1)[0].split(':', maxsplit=1)[1]
img.uri = None
img.bufferView = 3 + len(image_blob_pointers) - 1
images[i] = img # Replace the original image with the new copied and modified one
gltf = GLTF2(
scene=0,

View File

@@ -3,4 +3,4 @@ from os import system
if __name__ == '__main__':
# Just a reminder that a hot-reloading server can be started with the following command:
# Need to disable auto-start to avoid conflicts with the hot-reloading server
system('YACV_DISABLE_SERVER=true aiohttp-devtools runserver __init__.py --port 32323')
system('YACV_DISABLE_SERVER=true aiohttp-devtools runserver __init__.py --port 32323 --app-factory _get_app')

7
yacv_server/mylogger.py Normal file
View File

@@ -0,0 +1,7 @@
""" A simple log facility for yacv_server """
import logging
logger = logging.getLogger('yacv_server')

View File

@@ -2,6 +2,8 @@ import asyncio
from typing import List, TypeVar, \
Generic, AsyncGenerator
from mylogger import logger
T = TypeVar('T')
@@ -11,44 +13,48 @@ class BufferedPubSub(Generic[T]):
_buffer: List[T]
_subscribers: List[asyncio.Queue[T]]
_lock = asyncio.Lock()
max_buffer_size = 1000
def __init__(self):
self._buffer = []
self._subscribers = []
async def publish(self, event: T):
"""Publishes an event"""
async with self._lock:
self._buffer.append(event)
for q in self._subscribers:
await q.put(event)
def publish_nowait(self, event: T):
"""Publishes an event without blocking"""
"""Publishes an event without blocking (synchronous API does not require locking)"""
self._buffer.append(event)
if len(self._buffer) > self.max_buffer_size:
self._buffer.pop(0)
for q in self._subscribers:
q.put_nowait(event)
async def _subscribe(self, include_buffered: bool = True) -> asyncio.Queue[T]:
async def _subscribe(self, include_buffered: bool = True, include_future: bool = True) -> asyncio.Queue[T]:
"""Subscribes to events"""
q = asyncio.Queue()
async with self._lock:
self._subscribers.append(q)
if include_buffered:
for event in self._buffer:
await q.put(event)
logger.debug(f"Subscribed to %s (%d subscribers)", self, len(self._subscribers))
if include_buffered:
for event in self._buffer:
await q.put(event)
if not include_future:
await q.put(None)
return q
async def _unsubscribe(self, q: asyncio.Queue[T]):
"""Unsubscribes from events"""
async with self._lock:
self._subscribers.remove(q)
logger.debug(f"Unsubscribed from %s (%d subscribers)", self, len(self._subscribers))
async def subscribe(self, include_buffered: bool = True) -> AsyncGenerator[T, None]:
async def subscribe(self, include_buffered: bool = True, include_future: bool = True) -> AsyncGenerator[T, None]:
"""Subscribes to events as an async generator that yields events and automatically unsubscribes"""
q = await self._subscribe(include_buffered)
q = await self._subscribe(include_buffered, include_future)
try:
while True:
yield await q.get()
finally:
v = await q.get()
# include_future is incompatible with None values as they are used to signal the end of the stream
if v is None and not include_future:
break
yield v
finally: # When aclose() is called
await self._unsubscribe(q)

View File

@@ -1,19 +1,24 @@
import asyncio
import atexit
import hashlib
import os
import signal
import sys
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from threading import Thread
from typing import Optional
from typing import Optional, Dict, Union, AsyncGenerator
import tqdm.asyncio
from OCP.TopoDS import TopoDS_Shape
from aiohttp import web
from dataclasses_json import dataclass_json
from dataclasses_json import dataclass_json, config
from tqdm.contrib.logging import logging_redirect_tqdm
from glbs import glb_sequence_to_glbs
from mylogger import logger
from pubsub import BufferedPubSub
from tessellate import _hashcode
from tessellate import _hashcode, tessellate_count, tessellate
FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist')
UPDATES_API_PATH = '/api/updates'
@@ -28,6 +33,8 @@ class UpdatesApiData:
"""Name of the object. Should be unique unless you want to overwrite the previous object"""
hash: str
"""Hash of the object, to detect changes without rebuilding the object"""
obj: Optional[TopoDS_Shape] = field(default=None, metadata=config(exclude=lambda obj: True))
"""The OCCT object, if any (not serialized)"""
# noinspection PyUnusedLocal
@@ -41,12 +48,14 @@ class Server:
thread: Optional[Thread] = None
do_shutdown = asyncio.Event()
show_events = BufferedPubSub[UpdatesApiData]()
object_events: Dict[str, BufferedPubSub[bytes]] = {}
object_events_lock = asyncio.Lock()
def __init__(self, *args, **kwargs):
# --- Routes ---
# - APIs
self.app.router.add_route('GET', f'{UPDATES_API_PATH}', self.api_updates)
self.app.router.add_route('GET', f'{OBJECTS_API_PATH}/{{name}}', self.api_objects)
self.app.router.add_route('GET', f'{UPDATES_API_PATH}', self._api_updates)
self.app.router.add_route('GET', f'{OBJECTS_API_PATH}/{{name}}', self._api_object)
# - Static files from the frontend
self.app.router.add_get('/{path:(.*/|)}', _index_handler) # Any folder -> index.html
self.app.router.add_static('/', path=FRONTEND_BASE_PATH, name='static_frontend')
@@ -57,7 +66,7 @@ class Server:
"""Starts the web server in the background"""
assert self.thread is None, "Server already started"
# Start the server in a separate daemon thread
self.thread = Thread(target=self.run_server, name='yacv_server', daemon=True)
self.thread = Thread(target=self._run_server, name='yacv_server', daemon=True)
signal.signal(signal.SIGINT | signal.SIGTERM, self.stop)
atexit.register(self.stop)
self.thread.start()
@@ -75,14 +84,14 @@ class Server:
if len(args) >= 1 and args[0] in (signal.SIGINT, signal.SIGTERM):
sys.exit(0) # Exit with success
def run_server(self):
def _run_server(self):
"""Runs the web server"""
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.run_server_async())
self.loop.run_until_complete(self._run_server_async())
self.loop.stop()
self.loop.close()
async def run_server_async(self):
async def _run_server_async(self):
"""Runs the web server (async)"""
runner = web.AppRunner(self.app)
await runner.setup()
@@ -94,7 +103,7 @@ class Server:
# print('Shutting down server...')
await runner.cleanup()
async def api_updates(self, request: web.Request) -> web.WebSocketResponse:
async def _api_updates(self, request: web.Request) -> web.WebSocketResponse:
"""Handles a publish-only websocket connection that send show_object events along with their hashes and URLs"""
ws = web.WebSocketResponse()
await ws.prepare(request)
@@ -102,41 +111,145 @@ class Server:
async def _send_api_updates():
subscription = self.show_events.subscribe()
try:
first = True
async for data in subscription:
if first:
print('Started sending updates to client (%d subscribers)' % len(self.show_events._subscribers))
first = False
# noinspection PyUnresolvedReferences
await ws.send_str(data.to_json())
finally:
print('Stopped sending updates to client (%d subscribers)' % len(self.show_events._subscribers))
await subscription.aclose()
# Start sending updates to the client automatically
send_task = asyncio.create_task(_send_api_updates())
try:
print('Client connected: %s' % request.remote)
logger.debug('Client connected: %s', request.remote)
# Wait for the client to close the connection (or send a message)
await ws.receive()
finally:
# Make sure to stop sending updates to the client and close the connection
send_task.cancel()
await ws.close()
print('Client disconnected: %s' % request.remote)
logger.debug('Client disconnected: %s', request.remote)
return ws
obj_counter = 0
def show_object(self, obj: TopoDS_Shape, name: Optional[str] = None):
"""Publishes a CAD object to the server"""
start = time.time()
def _show_common(self, name: Optional[str], hash: str, start: float, obj: Optional[TopoDS_Shape] = None):
name = name or f'object_{self.obj_counter}'
self.obj_counter += 1
precomputed_info = UpdatesApiData(name=name, hash=_hashcode(obj))
print(f'show_object {precomputed_info} took {time.time() - start:.3f} seconds')
precomputed_info = UpdatesApiData(name=name, hash=hash, obj=obj)
self.show_events.publish_nowait(precomputed_info)
logger.info('show_object(%s, %s) took %.3f seconds', name, hash, time.time() - start)
return precomputed_info
async def api_objects(self, request: web.Request) -> web.Response:
return web.Response(body='TODO: Serve the object file here')
def show_gltf(self, gltf: bytes, name: Optional[str] = None, **kwargs):
"""Publishes any single-file GLTF object to the server (GLB format recommended)."""
start = time.time()
# Precompute the info and send it to the client as if it was a CAD object
precomputed_info = self._show_common(hashlib.md5(gltf).hexdigest(), name, start)
# Also pre-populate the GLTF data for the object API
publish_to = BufferedPubSub[bytes]()
publish_to.publish_nowait(gltf)
self.object_events[precomputed_info.name] = publish_to
def show_object(self, obj: Union[TopoDS_Shape, any], name: Optional[str] = None, **kwargs):
"""Publishes a CAD object to the server"""
start = time.time()
# Try to grab a shape if a different type of object was passed
if not isinstance(obj, TopoDS_Shape):
# Build123D
if 'part' in dir(obj):
obj = obj.part
if 'sketch' in dir(obj):
obj = obj.sketch
if 'line' in dir(obj):
obj = obj.line
# Build123D & CadQuery
while 'wrapped' in dir(obj) and not isinstance(obj, TopoDS_Shape):
obj = obj.wrapped
if not isinstance(obj, TopoDS_Shape):
raise ValueError(f'Cannot show object of type {type(obj)} (submit issue?)')
self._show_common(name, _hashcode(obj), start, obj)
async def _api_object(self, request: web.Request) -> web.StreamResponse:
"""Returns the object file with the matching name, building it if necessary."""
# Start exporting the object (or fail if not found)
export_data = self.export(request.match_info['name'])
response = web.StreamResponse()
try:
# First exported element is the object itself, grab it to collect data
export_obj = await anext(export_data)
# Create a new stream response with custom content type and headers
response.content_type = 'model/gltf-binary-sequence'
response.headers['Content-Disposition'] = f'attachment; filename="{request.match_info["name"]}.glbs"'
total_parts = 1 if export_obj is None else tessellate_count(export_obj)
response.headers['X-Object-Parts'] = str(total_parts)
await response.prepare(request)
# Convert the GLB sequence to a GLBS sequence and write it to the response
with logging_redirect_tqdm(tqdm_class=tqdm.asyncio.tqdm):
# noinspection PyTypeChecker
glb_parts = tqdm.asyncio.tqdm(export_data, total=total_parts)
async for chunk in glb_sequence_to_glbs(glb_parts):
await response.write(chunk)
finally:
# Close the export data subscription
await export_data.aclose()
# Close the response (if not an error)
if response.prepared:
await response.write_eof()
return response
async def export(self, name: str) -> AsyncGenerator[Union[TopoDS_Shape, bytes], None]:
"""Export the given previously-shown object to a sequence of GLB files, building it if necessary."""
start = time.time()
# Check that the object to build exists and grab it if it does
subscription = self.show_events.subscribe(include_future=False)
obj: Optional[TopoDS_Shape] = None
found = False
async for data in subscription:
if data.name == name:
obj = data.obj
found = True # Required because obj could be None
break
await subscription.aclose()
if not found:
raise web.HTTPNotFound(text=f'No object named {name} was previously shown')
# First published element is the TopoDS_Shape, which is None for glTF objects
yield obj
# Use the lock to ensure that we don't build the object twice
async with self.object_events_lock:
# If there are no object events for this name, we need to build the object
if name not in self.object_events:
# Prepare the pubsub for the object
publish_to = BufferedPubSub[bytes]()
self.object_events[name] = publish_to
def _build_object():
# Build the object
part_count = 0
for tessellation_update in tessellate(obj):
# We publish the object parts as soon as we have a new tessellation
list_of_bytes = tessellation_update.gltf.save_to_bytes()
publish_to.publish_nowait(b''.join(list_of_bytes))
part_count += 1
publish_to.publish_nowait(b'') # Signal the end of the stream
logger.info('export(%s) took %.3f seconds, %d parts', name, time.time() - start, part_count)
# We should build it fully even if we are cancelled, so we use a separate task
# Furthermore, building is CPU-bound, so we use the default executor
asyncio.get_running_loop().run_in_executor(None, _build_object)
# In either case return the elements of a subscription to the async generator
subscription = self.object_events[name].subscribe()
try:
async for chunk in subscription:
if chunk == b'':
break
yield chunk
finally:
await subscription.aclose()

View File

@@ -1,13 +1,11 @@
import concurrent
import copyreg
import hashlib
import io
import re
from concurrent.futures import ProcessPoolExecutor, Executor
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from typing import Tuple, Callable, Generator
from typing import Tuple, Generator
import OCP
import numpy as np
from OCP.BRep import BRep_Tool
from OCP.BRepAdaptor import BRepAdaptor_Curve
@@ -17,7 +15,6 @@ from OCP.TopLoc import TopLoc_Location
from OCP.TopTools import TopTools_IndexedMapOfShape
from OCP.TopoDS import TopoDS_Face, TopoDS_Edge, TopoDS_Shape, TopoDS_Vertex
from build123d import Face, Vector, Shape, Vertex
from partcad.wrappers import cq_serialize
from pygltflib import LINE_STRIP, GLTF2, Material, PbrMetallicRoughness, TRIANGLES, POINTS, TextureInfo
from gltf import create_gltf, _checkerboard_image
@@ -48,18 +45,6 @@ class TessellationUpdate:
raise ValueError(f"Unknown shape type: {self.shape}")
progress_callback_t = Callable[[TessellationUpdate], None]
def _inflate_vec(*values: float):
pnt = OCP.gp.gp_Vec(values[0], values[1], values[2])
return pnt
def _reduce_vec(pnt: OCP.gp.gp_Vec):
return _inflate_vec, (pnt.X(), pnt.Y(), pnt.Z())
def tessellate_count(ocp_shape: TopoDS_Shape) -> int:
"""Count the number of elements that will be tessellated"""
shape = Shape(ocp_shape)
@@ -70,44 +55,31 @@ def tessellate(
ocp_shape: TopoDS_Shape,
tolerance: float = 0.1,
angular_tolerance: float = 0.1,
executor: Executor = ProcessPoolExecutor(), # Set to ThreadPoolExecutor if pickling fails...
) -> Generator[TessellationUpdate, None, None]:
"""Tessellate a whole shape into a list of triangle vertices and a list of triangle indices.
It uses multiprocessing to speed up the process, and publishes progress updates to the callback.
NOTE: The logic of the method is weird because multiprocessing was tested but it seems too inefficient
with slow native packages.
"""
shape = Shape(ocp_shape)
_register_pickle_if_needed()
with executor:
futures = []
futures = []
# Submit tessellation tasks
for face in shape.faces():
futures.append(executor.submit(_tessellate_element, face.wrapped, tolerance, angular_tolerance))
for edge in shape.edges():
futures.append(executor.submit(_tessellate_element, edge.wrapped, tolerance, angular_tolerance))
for vertex in shape.vertices():
futures.append(executor.submit(_tessellate_element, vertex.wrapped, tolerance, angular_tolerance))
# Submit tessellation tasks
for face in shape.faces():
futures.append(lambda: _tessellate_element(face.wrapped, tolerance, angular_tolerance))
for edge in shape.edges():
futures.append(lambda: _tessellate_element(edge.wrapped, tolerance, angular_tolerance))
for vertex in shape.vertices():
futures.append(lambda: _tessellate_element(vertex.wrapped, tolerance, angular_tolerance))
# Collect results as they come in
for i, future in enumerate(concurrent.futures.as_completed(futures)):
sub_shape, gltf = future.result()
yield TessellationUpdate(
progress=(i + 1) / len(futures),
shape=sub_shape,
gltf=gltf,
)
_pickle_registered = False
def _register_pickle_if_needed():
global _pickle_registered
if _pickle_registered:
return
cq_serialize.register()
copyreg.pickle(OCP.gp.gp_Vec, _reduce_vec)
# Collect results as they come in
for i, future in enumerate(futures):
sub_shape, gltf = future()
yield TessellationUpdate(
progress=(i + 1) / len(futures),
shape=sub_shape,
gltf=gltf,
)
# Define the function that will tessellate each element in parallel