diff --git a/frontend/App.vue b/frontend/App.vue index d7e348a..9d7fc0a 100644 --- a/frontend/App.vue +++ b/frontend/App.vue @@ -35,16 +35,30 @@ provide('disableTap', {disableTap, setDisableTap}); async function onModelUpdateRequest(event: NetworkUpdateEvent) { // Load/unload a new batch of models to optimize rendering time console.log("Received model update request", event.models); + let shutdownRequestIndex = event.models.findIndex((model) => model.isRemove == null); + let shutdownRequest = null; + if (shutdownRequestIndex !== -1) { + console.log("Will shut down the connection after this load, as requested by the server"); + shutdownRequest = event.models.splice(shutdownRequestIndex, 1)[0]; + } let doc = sceneDocument.value; for (let modelIndex in event.models) { let isLast = parseInt(modelIndex) === event.models.length - 1; let model = event.models[modelIndex]; - if (!model.isRemove) { - doc = await SceneMgr.loadModel(sceneUrl, doc, model.name, model.url, isLast, isLast); - } else { - doc = await SceneMgr.removeModel(sceneUrl, doc, model.name, isLast); + try { + if (!model.isRemove) { + doc = await SceneMgr.loadModel(sceneUrl, doc, model.name, model.url, isLast, isLast); + } else { + doc = await SceneMgr.removeModel(sceneUrl, doc, model.name, isLast); + } + } catch (e) { + console.error("Error loading model", model, e); } } + if (shutdownRequest !== null) { + console.log("Shutting down the connection as requested by the server"); + event.disconnectForALittleBit(); + } sceneDocument.value = doc triggerRef(sceneDocument); // Why not triggered automatically? } diff --git a/frontend/misc/gltf.ts b/frontend/misc/gltf.ts index 82c943b..b261b48 100644 --- a/frontend/misc/gltf.ts +++ b/frontend/misc/gltf.ts @@ -12,9 +12,10 @@ export let extrasNameValueHelpers = "__helpers"; * * Remember to call mergeFinalize after all models have been merged (slower required operations). */ -export async function mergePartial(url: string, name: string, document: Document): Promise { +export async function mergePartial(url: string, name: string, document: Document, networkFinished: () => void = () => {}): Promise { // Load the new document let newDoc = await io.read(url); + networkFinished() // Remove any previous model with the same name await document.transform(dropByName(name)); diff --git a/frontend/misc/network.ts b/frontend/misc/network.ts index f599f38..fa14aa1 100644 --- a/frontend/misc/network.ts +++ b/frontend/misc/network.ts @@ -7,22 +7,24 @@ class NetworkUpdateEventModel { url: string; // TODO: Detect and manage instances of the same object (same hash, different name) hash: string | null; - isRemove: boolean; + isRemove: boolean | null; // This is null for a shutdown event - constructor(name: string, url: string, hash: string | null, isDelete: boolean) { + constructor(name: string, url: string, hash: string | null, isRemove: boolean | null) { this.name = name; this.url = url; this.hash = hash; - this.isRemove = isDelete; + this.isRemove = isRemove; } } export class NetworkUpdateEvent extends Event { models: NetworkUpdateEventModel[]; + disconnectForALittleBit: () => void; - constructor(models: NetworkUpdateEventModel[]) { + constructor(models: NetworkUpdateEventModel[], disconnectForALittleBit: () => void) { super("update"); this.models = models; + this.disconnectForALittleBit = disconnectForALittleBit; } } @@ -57,37 +59,57 @@ export class NetworkManager extends EventTarget { } } - private async monitorDevServer(url: URL) { + private async monitorDevServer(url: URL, pendingTimeout: { id: number } = {id: -1}) { try { // WARNING: This will spam the console logs with failed requests when the server is down - let response = await fetch(url.toString()); + const controller = new AbortController(); + let response = await fetch(url.toString(), {signal: controller.signal}); // console.log("Monitoring", url.toString(), response); if (response.status === 200) { let lines = readLinesStreamings(response.body!.getReader()); for await (let line of lines) { if (!line || !line.startsWith("data:")) continue; - let data = JSON.parse(line.slice(5)); + let data: { name: string, hash: string, is_remove: boolean | null } = JSON.parse(line.slice(5)); // console.debug("WebSocket message", data); let urlObj = new URL(url); urlObj.searchParams.delete("api_updates"); urlObj.searchParams.set("api_object", data.name); - this.foundModel(data.name, data.hash, urlObj.toString(), data.is_remove); + this.foundModel(data.name, data.hash, urlObj.toString(), data.is_remove, async () => { + console.log("Disconnecting for a little bit"); + controller.abort(); + clearTimeout(pendingTimeout.id!); + pendingTimeout.id = -2; + setTimeout(() => { + console.log("Reconnecting after a little bit"); + this.monitorDevServer(url, pendingTimeout) + }, settings.monitorEveryMs * 50); + }); } } } catch (e) { // Ignore errors (retry very soon) } - setTimeout(() => this.monitorDevServer(url), settings.monitorEveryMs); + if (pendingTimeout.id >= -1) { + pendingTimeout.id = setTimeout(() => { + console.log("Reconnecting fast"); + this.monitorDevServer(url, pendingTimeout) + }, settings.monitorEveryMs); + } return; } - private foundModel(name: string, hash: string | null, url: string, isRemove: boolean) { + private foundModel(name: string, hash: string | null, url: string, isRemove: boolean | null, disconnectForALittleBit: () => void = () => { + }) { let prevHash = this.knownObjectHashes[name]; // console.debug("Found model", name, "with hash", hash, "and previous hash", prevHash); if (!hash || hash !== prevHash || isRemove) { - if (!isRemove) { + // Update known hashes + if (isRemove == false) { this.knownObjectHashes[name] = hash; - } else { + } else if (isRemove == true) { + if (!(name in this.knownObjectHashes)) return; // Nothing to remove... delete this.knownObjectHashes[name]; + // Also update buffered updates if the model is removed + //this.bufferedUpdates = this.bufferedUpdates.filter(m => m.name !== name); } let newModel = new NetworkUpdateEventModel(name, url, hash, isRemove); this.bufferedUpdates.push(newModel); @@ -95,7 +117,7 @@ export class NetworkManager extends EventTarget { // Optimization: try to batch updates automatically for faster rendering if (this.batchTimeout !== null) clearTimeout(this.batchTimeout); this.batchTimeout = setTimeout(() => { - this.dispatchEvent(new NetworkUpdateEvent(this.bufferedUpdates)); + this.dispatchEvent(new NetworkUpdateEvent(this.bufferedUpdates, disconnectForALittleBit)); this.bufferedUpdates = []; }, batchTimeout); } diff --git a/frontend/misc/scene.ts b/frontend/misc/scene.ts index 99feffe..64e6722 100644 --- a/frontend/misc/scene.ts +++ b/frontend/misc/scene.ts @@ -9,11 +9,11 @@ import {Matrix4} from "three/src/math/Matrix4.js" /** This class helps manage SceneManagerData. All methods are static to support reactivity... */ export class SceneMgr { /** Loads a GLB model from a URL and adds it to the viewer or replaces it if the names match */ - static async loadModel(sceneUrl: Ref, document: Document, name: string, url: string, updateHelpers: boolean = true, reloadScene: boolean = true): Promise { + static async loadModel(sceneUrl: Ref, document: Document, name: string, url: string, updateHelpers: boolean = true, reloadScene: boolean = true, networkFinished: () => void = () => {}): Promise { let loadStart = performance.now(); // Start merging into the current document, replacing or adding as needed - document = await mergePartial(url, name, document); + document = await mergePartial(url, name, document, networkFinished); console.log("Model", name, "loaded in", performance.now() - loadStart, "ms"); diff --git a/poetry.lock b/poetry.lock index 68fda51..e6013b7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -318,17 +318,6 @@ qtconsole = ["qtconsole"] test = ["pickleshare", "pytest (<7.1)", "pytest-asyncio (<0.22)", "testpath"] test-extra = ["curio", "matplotlib (!=3.2.0)", "nbformat", "numpy (>=1.22)", "pandas", "pickleshare", "pytest (<7.1)", "pytest-asyncio (<0.22)", "testpath", "trio"] -[[package]] -name = "iterators" -version = "0.2.0" -description = "Iterator utility classes and functions" -optional = false -python-versions = ">=3.6" -files = [ - {file = "iterators-0.2.0-py3-none-any.whl", hash = "sha256:1d7ff03f576c9de0e01bac66209556c066d6b1fc45583a99cfc9f4645be7900e"}, - {file = "iterators-0.2.0.tar.gz", hash = "sha256:e9927a1ea1ef081830fd1512f3916857c36bd4b37272819a6cd29d0f44431b97"}, -] - [[package]] name = "jedi" version = "0.19.1" @@ -962,4 +951,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "d9746e99dd8861758730e68d12dc72d9ec5fb0101b3c070a7d7a373439c658a0" +content-hash = "567ef9c980c250ace7e380098b810250a36b92dd2e824b5b4f4851898a675e09" diff --git a/pyproject.toml b/pyproject.toml index e846853..1358539 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ build123d = "^0.4.0" # Misc pygltflib = "^1.16.2" pillow = "^10.2.0" -iterators = "^0.2.0" [tool.poetry.build] generate-setup-file = false diff --git a/yacv_server/myhttp.py b/yacv_server/myhttp.py index 6b2377a..34ce981 100644 --- a/yacv_server/myhttp.py +++ b/yacv_server/myhttp.py @@ -1,12 +1,9 @@ import io import os -import threading import urllib.parse from http import HTTPStatus from http.server import SimpleHTTPRequestHandler -from iterators import TimeoutIterator - from yacv_server.mylogger import logger # Find the frontend folder (optional, but recommended) @@ -26,13 +23,9 @@ OBJECTS_API_PATH = '/api/object' # /{name} class HTTPHandler(SimpleHTTPRequestHandler): yacv: 'yacv.YACV' - frontend_lock: threading.Lock # To avoid exiting too early while frontend makes requests - at_least_one_client: threading.Event def __init__(self, *args, yacv: 'yacv.YACV', **kwargs): self.yacv = yacv - self.frontend_lock = threading.Lock() - self.at_least_one_client = threading.Event() super().__init__(*args, **kwargs, directory=FRONTEND_BASE_PATH) def log_message(self, fmt, *args): @@ -77,69 +70,65 @@ class HTTPHandler(SimpleHTTPRequestHandler): def _api_updates(self): """Handles a publish-only websocket connection that send show_object events along with their hashes and URLs""" - self.send_response(HTTPStatus.OK) - self.send_header("Content-Type", "text/event-stream") - self.send_header("Cache-Control", "no-cache") - # Chunked transfer encoding! - self.send_header("Transfer-Encoding", "chunked") - self.end_headers() - self.at_least_one_client.set() - logger.debug('Updates client connected') - def write_chunk(_chunk_data: str): - self.wfile.write(hex(len(_chunk_data))[2:].encode('utf-8')) - self.wfile.write(b'\r\n') - self.wfile.write(_chunk_data.encode('utf-8')) - self.wfile.write(b'\r\n') - self.wfile.flush() + # Keep a shared read lock to know if any frontend is still working before shutting down + with self.yacv.frontend_lock.r_locked(): - write_chunk('retry: 100\n\n') + # Avoid accepting new connections while shutting down + if self.yacv.shutting_down.is_set() and not self.yacv.at_least_one_client.is_set(): + self.send_error(HTTPStatus.SERVICE_UNAVAILABLE, 'Server is shutting down') + return + self.yacv.at_least_one_client.set() + logger.debug('Updates client connected') - # Send buffered events first, while keeping a lock - with self.frontend_lock: - for data in self.yacv.show_events.buffer(): - logger.debug('Sending info about %s: %s', data.name, data) - # noinspection PyUnresolvedReferences - to_send = data.to_json() - write_chunk(f'data: {to_send}\n\n') + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + # Chunked transfer encoding! + self.send_header("Transfer-Encoding", "chunked") + self.end_headers() - # Send future events over the same connection - # Also send keep-alive to know if the client is still connected - subscription = self.yacv.show_events.subscribe(include_buffered=False) - it = TimeoutIterator(subscription, sentinel=None, reset_on_next=True, timeout=5.0) # Keep-alive interval - try: - for data in it: - if data is None: - write_chunk(':keep-alive\n\n') - else: - logger.debug('Sending info about %s: %s', data.name, data) - # noinspection PyUnresolvedReferences - to_send = data.to_json() - write_chunk(f'data: {to_send}\n\n') - except BrokenPipeError: # Client disconnected normally - pass - finally: - logger.debug('Updates client disconnected') + def write_chunk(_chunk_data: str): + self.wfile.write(hex(len(_chunk_data))[2:].encode('utf-8')) + self.wfile.write(b'\r\n') + self.wfile.write(_chunk_data.encode('utf-8')) + self.wfile.write(b'\r\n') + self.wfile.flush() + + write_chunk('retry: 100\n\n') + + subscription = self.yacv.show_events.subscribe(yield_timeout=1.0) # Keep-alive interval try: - it.interrupt() - next(it) # Make sure the iterator is interrupted before trying to close the subscription + for data in subscription: + if data is None: + write_chunk(':keep-alive\n\n') + else: + logger.debug('Sending info about %s: %s', data.name, data) + # noinspection PyUnresolvedReferences + to_send = data.to_json() + write_chunk(f'data: {to_send}\n\n') + except BrokenPipeError: # Client disconnected normally + pass + finally: subscription.close() - except BaseException as e: - logger.debug('Ignoring error while closing subscription: %s', e) + + logger.debug('Updates client disconnected') def _api_object(self, obj_name: str): """Returns the object file with the matching name, building it if necessary.""" - with self.frontend_lock: - # Export the object (or fail if not found) - exported_glb = self.yacv.export(obj_name) - if exported_glb is None: - self.send_error(HTTPStatus.NOT_FOUND, f'Object {obj_name} not found') - return io.BytesIO() + # Export the object (or fail if not found) + _export = self.yacv.export(obj_name) + if _export is None: + self.send_error(HTTPStatus.NOT_FOUND, f'Object {obj_name} not found') + return io.BytesIO() - # Wrap the GLB in a response and return it - self.send_response(HTTPStatus.OK) - self.send_header('Content-Type', 'model/gltf-binary') - self.send_header('Content-Length', str(len(exported_glb))) - self.send_header('Content-Disposition', f'attachment; filename="{obj_name}.glb"') - self.end_headers() - self.wfile.write(exported_glb) + exported_glb, _hash = _export + + # Wrap the GLB in a response and return it + self.send_response(HTTPStatus.OK) + self.send_header('Content-Type', 'model/gltf-binary') + self.send_header('Content-Length', str(len(exported_glb))) + self.send_header('Content-Disposition', f'attachment; filename="{obj_name}.glb"') + self.send_header('E-Tag', f'"{_hash}"') + self.end_headers() + self.wfile.write(exported_glb) diff --git a/yacv_server/pubsub.py b/yacv_server/pubsub.py index 42b1891..5a0e627 100644 --- a/yacv_server/pubsub.py +++ b/yacv_server/pubsub.py @@ -1,4 +1,4 @@ -import threading +import queue import queue import threading from typing import List, TypeVar, \ @@ -8,6 +8,8 @@ from yacv_server.mylogger import logger T = TypeVar('T') +_end_of_queue = object() + class BufferedPubSub(Generic[T]): """A simple implementation of publish-subscribe pattern using threading and buffering all previous events""" @@ -45,7 +47,7 @@ class BufferedPubSub(Generic[T]): for event in self._buffer: q.put(event) if not include_future: - q.put(None) + q.put(_end_of_queue) return q def _unsubscribe(self, q: queue.Queue[T]): @@ -54,14 +56,18 @@ class BufferedPubSub(Generic[T]): self._subscribers.remove(q) logger.debug(f"Unsubscribed from %s (%d subscribers)", self, len(self._subscribers)) - def subscribe(self, include_buffered: bool = True, include_future: bool = True) -> Generator[T, None, None]: + def subscribe(self, include_buffered: bool = True, include_future: bool = True, yield_timeout: float = 0.0) -> \ + Generator[T, None, None]: """Subscribes to events as an generator that yields events and automatically unsubscribes""" q = self._subscribe(include_buffered, include_future) try: while True: - v = q.get() + try: + v = q.get(timeout=yield_timeout) + except queue.Empty: + v = None # include_future is incompatible with None values as they are used to signal the end of the stream - if v is None and not include_future: + if v is _end_of_queue: break yield v finally: # When aclose() is called @@ -80,4 +86,4 @@ class BufferedPubSub(Generic[T]): def clear(self): """Clears the buffer""" with self._buffer_lock: - self._buffer.clear() \ No newline at end of file + self._buffer.clear() diff --git a/yacv_server/rwlock.py b/yacv_server/rwlock.py new file mode 100644 index 0000000..898aaa8 --- /dev/null +++ b/yacv_server/rwlock.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +""" rwlock.py + + A class to implement read-write locks on top of the standard threading + library. + + This is implemented with two mutexes (threading.Lock instances) as per this + wikipedia pseudocode: + + https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes + + Code written by Tyler Neylon at Unbox Research. + + This file is public domain. +""" + +# _______________________________________________________________________ +# Imports + +from contextlib import contextmanager +from threading import Lock + + +# _______________________________________________________________________ +# Class + +class RWLock(object): + """ RWLock class; this is meant to allow an object to be read from by + multiple threads, but only written to by a single thread at a time. See: + https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock + + Usage: + + from rwlock import RWLock + + my_obj_rwlock = RWLock() + + # When reading from my_obj: + with my_obj_rwlock.r_locked(): + do_read_only_things_with(my_obj) + + # When writing to my_obj: + with my_obj_rwlock.w_locked(): + mutate(my_obj) + """ + + def __init__(self): + + self.w_lock = Lock() + self.num_r_lock = Lock() + self.num_r = 0 + + # ___________________________________________________________________ + # Reading methods. + + def r_acquire(self, *args, **kwargs): + self.num_r_lock.acquire(*args, **kwargs) + self.num_r += 1 + if self.num_r == 1: + self.w_lock.acquire(*args, **kwargs) + self.num_r_lock.release() + + def r_release(self, *args, **kwargs): + assert self.num_r > 0 + self.num_r_lock.acquire(*args, **kwargs) + self.num_r -= 1 + if self.num_r == 0: + self.w_lock.release() + self.num_r_lock.release() + + @contextmanager + def r_locked(self, *args, **kwargs): + """ This method is designed to be used via the `with` statement. """ + try: + self.r_acquire(*args, **kwargs) + yield + finally: + self.r_release() + + # ___________________________________________________________________ + # Writing methods. + + def w_acquire(self, *args, **kwargs): + self.w_lock.acquire(*args, **kwargs) + + def w_release(self): + self.w_lock.release() + + @contextmanager + def w_locked(self, *args, **kwargs): + """ This method is designed to be used via the `with` statement. """ + try: + self.w_acquire(*args, **kwargs) + yield + finally: + self.w_release() diff --git a/yacv_server/yacv.py b/yacv_server/yacv.py index bb13932..af6c3aa 100644 --- a/yacv_server/yacv.py +++ b/yacv_server/yacv.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from http.server import ThreadingHTTPServer from importlib.metadata import version from threading import Thread -from typing import Optional, Dict, Union, Callable, List +from typing import Optional, Dict, Union, Callable, List, Tuple from OCP.TopLoc import TopLoc_Location from OCP.TopoDS import TopoDS_Shape @@ -18,6 +18,7 @@ from OCP.TopoDS import TopoDS_Shape from build123d import Shape, Axis, Location, Vector from dataclasses_json import dataclass_json +from rwlock import RWLock from yacv_server.cad import get_shape, grab_all_cad, CADCoreLike, CADLike from yacv_server.myhttp import HTTPHandler from yacv_server.mylogger import logger @@ -33,8 +34,8 @@ class UpdatesApiData: """Name of the object. Should be unique unless you want to overwrite the previous object""" hash: str """Hash of the object, to detect changes without rebuilding the object""" - is_remove: bool - """Whether to remove the object from the scene""" + is_remove: Optional[bool] + """Whether to remove the object from the scene. If None, this is a shutdown request""" YACVSupported = Union[bytes, CADCoreLike] @@ -46,7 +47,7 @@ class UpdatesApiFullData(UpdatesApiData): kwargs: Optional[Dict[str, any]] """The show_object options, if any (not serialized)""" - def __init__(self, obj: YACVSupported, name: str, _hash: str, is_remove: bool = False, + def __init__(self, obj: YACVSupported, name: str, _hash: str, is_remove: Optional[bool] = False, kwargs: Optional[Dict[str, any]] = None): self.name = name self.hash = _hash @@ -60,22 +61,42 @@ class UpdatesApiFullData(UpdatesApiData): class YACV: + """The main yacv_server class, which manages the web server and the CAD objects.""" + + # Startup server_thread: Optional[Thread] + """The main thread running the server (will spawn other threads for each request)""" server: Optional[ThreadingHTTPServer] + """The server object""" startup_complete: threading.Event + """Event to signal when the server has started""" + + # Running show_events: BufferedPubSub[UpdatesApiFullData] + """PubSub for show events (objects to be shown in/removed from the scene)""" build_events: Dict[str, BufferedPubSub[bytes]] - object_events_lock: threading.Lock + """PubSub for build events (objects that were built)""" + build_events_lock: threading.Lock + """Lock to ensure that objects are only built once""" + + # Shutdown + at_least_one_client: threading.Event + """Event to signal when at least one client has connected""" + shutting_down: threading.Event + """Event to signal when the server is shutting down""" + frontend_lock: RWLock + """Lock to ensure that the frontend has finished working before we shut down""" def __init__(self): self.server_thread = None self.server = None self.startup_complete = threading.Event() - self.at_least_one_client = threading.Event() self.show_events = BufferedPubSub() self.build_events = {} - self.object_events_lock = threading.Lock() - self.frontend_lock = threading.Lock() + self.build_events_lock = threading.Lock() + self.at_least_one_client = threading.Event() + self.shutting_down = threading.Event() + self.frontend_lock = RWLock() logger.info('Using yacv-server v%s', version('yacv-server')) def start(self): @@ -100,38 +121,36 @@ class YACV: logger.error('Cannot stop server because it is not running') return + # Inform the server that we are shutting down + self.shutting_down.set() + # noinspection PyTypeChecker + self.show_events.publish(UpdatesApiFullData(name='__shutdown', _hash='', is_remove=None, obj=None)) + + # If we were too fast, ensure that at least one client has connected graceful_secs_connect = float(os.getenv('YACV_GRACEFUL_SECS_CONNECT', 12.0)) - graceful_secs_request = float(os.getenv('YACV_GRACEFUL_SECS_REQUEST', 5.0)) - # Make sure we can hold the lock for more than 100ms (to avoid exiting too early) - logger.info('Stopping server (waiting for at least one frontend request first, cancel with CTRL+C)...') - start = time.time() - try: - while not self.at_least_one_client.wait( - graceful_secs_connect / 10) and time.time() - start < graceful_secs_connect: - time.sleep(0.01) - except KeyboardInterrupt: - pass + if graceful_secs_connect > 0: + start = time.time() + try: + if not self.at_least_one_client.is_set(): + logger.warning( + 'Waiting for at least one frontend request before stopping server, cancel with CTRL+C...') + while (not self.at_least_one_client.wait(graceful_secs_connect / 10) and + time.time() - start < graceful_secs_connect): + time.sleep(0.01) + except KeyboardInterrupt: + pass - logger.info('Stopping server (waiting for no more frontend requests)...') - start = time.time() - try: - while time.time() - start < graceful_secs_request: - if self.frontend_lock.locked(): - start = time.time() - time.sleep(0.01) - except KeyboardInterrupt: - pass + # Wait for the server to stop gracefully (all frontends to stop working) + graceful_secs_request = float(os.getenv('YACV_GRACEFUL_SECS_WORK', 1000000)) + with self.frontend_lock.w_locked(timeout=graceful_secs_request): + # Stop the server + self.server.shutdown() - # Stop the server in the background - self.server.shutdown() - logger.info('Stopping server (sent)...') - - # Wait for the server to stop gracefully - self.server_thread.join(timeout=30) - self.server_thread = None - logger.info('Stopping server (confirmed)...') - if len(args) >= 1 and args[0] in (signal.SIGINT, signal.SIGTERM): - sys.exit(0) # Exit with success + # Wait for the server thread to stop + self.server_thread.join(timeout=30) + self.server_thread = None + if len(args) >= 1 and args[0] in (signal.SIGINT, signal.SIGTERM): + sys.exit(0) # Exit with success def _run_server(self): """Runs the web server""" @@ -187,7 +206,7 @@ class YACV: self.show_events.delete(old_show_event) # Delete any cached object builds - with self.object_events_lock: + with self.build_events_lock: if name in self.build_events: del self.build_events[name] @@ -228,8 +247,8 @@ class YACV: res.remove(old_event) return res - def export(self, name: str) -> Optional[bytes]: - """Export the given previously-shown object to a single GLB file, building it if necessary.""" + def export(self, name: str) -> Optional[Tuple[bytes, str]]: + """Export the given previously-shown object to a single GLB blob, building it if necessary.""" start = time.time() # Check that the object to build exists and grab it if it does @@ -240,7 +259,7 @@ class YACV: event = events[-1] # Use the lock to ensure that we don't build the object twice - with self.object_events_lock: + with self.build_events_lock: # If there are no object events for this name, we need to build the object if name not in self.build_events: logger.debug('Building object %s with hash %s', name, event.hash) @@ -266,7 +285,7 @@ class YACV: # In either case return the elements of a subscription to the async generator subscription = self.build_events[name].subscribe() try: - return next(subscription) + return next(subscription), event.hash finally: subscription.close() @@ -277,7 +296,7 @@ class YACV: for name in self.shown_object_names(): if export_filter(name, self._show_events(name)[-1].obj): with open(os.path.join(folder, f'{name}.glb'), 'wb') as f: - f.write(self.export(name)) + f.write(self.export(name)[0]) # noinspection PyUnusedLocal