start to pre-merge CAD models on the server to improve frontend performance

This commit is contained in:
Yeicor
2024-02-18 21:06:45 +01:00
parent 9d429de804
commit 0ff39e045f
6 changed files with 288 additions and 330 deletions

2
.gitignore vendored
View File

@@ -11,6 +11,8 @@
# TODO: Figure out which assets to keep in the repo # TODO: Figure out which assets to keep in the repo
/assets/fox.glb /assets/fox.glb
/assets/logo.glbs /assets/logo.glbs
/assets/logo.glb
/assets/logo.stl
*.iml *.iml
venv/ venv/

View File

@@ -1,36 +0,0 @@
from typing import AsyncGenerator
async def glb_sequence_to_glbs(glb_sequence: AsyncGenerator[bytes, None], count: int = -1) -> AsyncGenerator[bytes, None]:
"""Converts a sequence of GLB files into a single GLBS file.
This is a streaming response in the custom GLBS format which consists of the "GLBS" magic text followed by
a count of GLB files (0xffffffff if unknown) and a sequence of GLB files, each with a length prefix. All numbers are
4-byte little-endian unsigned integers."""
# Write the magic text
yield b'GLBS'
# Write the count
yield count.to_bytes(4, 'little')
# Write the GLB files
async for glb in glb_sequence:
# Write the length prefix
yield len(glb).to_bytes(4, 'little')
# Write the GLB file
yield glb
if __name__ == '__main__':
import asyncio
async def test_glb_sequence_to_glbs():
async def glb_sequence():
yield b'glb00001'
yield b'glb2'
async for chunk in glb_sequence_to_glbs(glb_sequence(), 2):
print(chunk)
asyncio.run(test_glb_sequence_to_glbs())

View File

@@ -1,133 +1,244 @@
import numpy as np import numpy as np
from build123d import Vector
from pygltflib import * from pygltflib import *
_checkerboard_image = Image(uri='data:image/png;base64,' _checkerboard_image_bytes = base64.decodebytes(
'iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAIAAAD91JpzAAAAF0lEQVQI12N49OjR////Gf' b'iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAIAAAD91JpzAAAAF0lEQVQI12N49OjR////Gf'
'/////48WMATwULS8tcyj8AAAAASUVORK5CYII=') b'/////48WMATwULS8tcyj8AAAAASUVORK5CYII=')
def create_gltf(vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray, mode: int = TRIANGLES, class GLTFMgr:
material: Optional[Material] = None, images: Optional[List[Image]] = None) -> GLTF2: """A utility class to build our GLTF2 objects easily and incrementally"""
"""Create a glTF object from vertices and optionally indices.
If indices are not set, vertices are interpreted as line_strip.""" gltf: GLTF2 = GLTF2(
assert vertices.ndim == 2
assert vertices.shape[1] == 3
vertices = vertices.astype(np.float32)
vertices_blob = vertices.tobytes()
# print(vertices)
indices = indices.astype(np.uint8)
indices_blob = indices.flatten().tobytes()
# print(indices)
tex_coord = tex_coord.astype(np.float32)
tex_coord_blob = tex_coord.tobytes()
# print(tex_coord)
if images is None:
images = []
image_blob = b''
image_blob_pointers = []
for i, img in enumerate(images):
img = copy.deepcopy(img) # Avoid modifying the original image
assert img.bufferView is None
assert img.uri is not None
assert img.uri.startswith('data:')
image_blob_pointers.append(len(image_blob))
image_blob += base64.decodebytes(img.uri.split('base64,', maxsplit=1)[1].encode('ascii'))
img.mimeType = img.uri.split(';', maxsplit=1)[0].split(':', maxsplit=1)[1]
img.uri = None
img.bufferView = 3 + len(image_blob_pointers) - 1
images[i] = img # Replace the original image with the new copied and modified one
gltf = GLTF2(
scene=0,
scenes=[Scene(nodes=[0])], scenes=[Scene(nodes=[0])],
nodes=[Node(mesh=0)], nodes=[Node(mesh=0)],
meshes=[ meshes=[Mesh(primitives=[])],
Mesh( accessors=[],
primitives=[
Primitive(
attributes=Attributes(POSITION=1, TEXCOORD_0=2) if len(tex_coord) > 0 else Attributes(
POSITION=1),
indices=0,
mode=mode,
material=0 if material is not None else None,
)
]
)
],
materials=[material] if material is not None else [],
accessors=[
Accessor(
bufferView=0,
componentType=UNSIGNED_BYTE,
count=indices.size,
type=SCALAR,
max=[int(indices.max())],
min=[int(indices.min())],
),
Accessor(
bufferView=1,
componentType=FLOAT,
count=len(vertices),
type=VEC3,
max=vertices.max(axis=0).tolist(),
min=vertices.min(axis=0).tolist(),
),
] + ([
Accessor(
bufferView=2,
componentType=FLOAT,
count=len(tex_coord),
type=VEC2,
max=tex_coord.max(axis=0).tolist(),
min=tex_coord.min(axis=0).tolist(),
)] if len(tex_coord) > 0 else [])
,
bufferViews=[ bufferViews=[
BufferView( BufferView(buffer=0, byteLength=len(_checkerboard_image_bytes), byteOffset=0, target=ELEMENT_ARRAY_BUFFER)],
buffer=0, buffers=[Buffer(byteLength=len(_checkerboard_image_bytes))],
byteLength=len(indices_blob), samplers=[Sampler(magFilter=NEAREST)],
target=ELEMENT_ARRAY_BUFFER, textures=[Texture(source=0, sampler=0)],
), images=[Image(bufferView=0, mimeType='image/png')],
BufferView( materials=[Material(pbrMetallicRoughness=PbrMetallicRoughness(baseColorTexture=TextureInfo(index=0)))],
buffer=0,
byteOffset=len(indices_blob),
byteLength=len(vertices_blob),
target=ARRAY_BUFFER,
),
] + (
[
BufferView(
buffer=0,
byteOffset=len(indices_blob) + len(vertices_blob),
byteLength=len(tex_coord_blob),
target=ARRAY_BUFFER,
),
] if len(tex_coord) > 0 else []) + (
[
BufferView(
buffer=0,
byteOffset=len(indices_blob) + len(
vertices_blob) + len(tex_coord_blob) + image_blob_pointers[i],
byteLength=image_blob_pointers[i + 1] - image_blob_pointers[i] if i + 1 < len(
image_blob_pointers) else len(image_blob) - image_blob_pointers[i],
)
for i, img in enumerate(images)
] if len(images) > 0 else []),
buffers=[
Buffer(
byteLength=len(indices_blob) + len(vertices_blob) + len(tex_coord_blob) + len(image_blob),
)
],
samplers=[Sampler(magFilter=NEAREST)] if len(images) > 0 else [],
textures=[Texture(source=i, sampler=0) for i, _ in enumerate(images)],
images=images,
) )
gltf.set_binary_blob(indices_blob + vertices_blob + tex_coord_blob + image_blob) def __init__(self):
self.gltf.set_binary_blob(_checkerboard_image_bytes)
return gltf def add_face(self, vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray):
"""Add a face to the GLTF as a new primitive of the unique mesh"""
self._add_any(vertices, indices, tex_coord, mode=TRIANGLES)
def add_edge(self, vertices: np.ndarray):
"""Add an edge to the GLTF as a new primitive of the unique mesh"""
indices = np.array(list(map(lambda i: [i, i + 1], range(len(vertices) - 1))), dtype=np.uint8)
tex_coord = np.array([[i / (len(vertices) - 1), 0] for i in range(len(vertices))], dtype=np.float32)
self._add_any(vertices, indices, tex_coord, mode=LINE_STRIP)
def add_vertex(self, vertex: Vector):
"""Add a vertex to the GLTF as a new primitive of the unique mesh"""
vertices = np.array([[vertex.X, vertex.Y, vertex.Z]])
indices = np.array([0], dtype=np.uint8)
tex_coord = np.array([[0, 0]], dtype=np.float32)
self._add_any(vertices, indices, tex_coord, mode=POINTS)
def _add_any(self, vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray, mode: int = TRIANGLES):
assert vertices.ndim == 2
assert vertices.shape[1] == 3
vertices = vertices.astype(np.float32)
vertices_blob = vertices.tobytes()
indices = indices.astype(np.uint8)
indices_blob = indices.flatten().tobytes()
tex_coord = tex_coord.astype(np.float32)
tex_coord_blob = tex_coord.tobytes()
accessor_base = len(self.gltf.accessors)
self.gltf.meshes[0].primitives.append(
Primitive(
attributes=Attributes(POSITION=accessor_base + 1, TEXCOORD_0=accessor_base + 2),
indices=accessor_base,
mode=mode,
material=0, # TODO special selected material and face/edge/vertex default materials
)
)
buffer_view_base = len(self.gltf.bufferViews)
self.gltf.accessors.extend([
Accessor(
bufferView=buffer_view_base,
componentType=UNSIGNED_BYTE,
count=indices.size,
type=SCALAR,
max=[int(indices.max())],
min=[int(indices.min())],
),
Accessor(
bufferView=buffer_view_base + 1,
componentType=FLOAT,
count=len(vertices),
type=VEC3,
max=vertices.max(axis=0).tolist(),
min=vertices.min(axis=0).tolist(),
),
Accessor(
bufferView=buffer_view_base + 2,
componentType=FLOAT,
count=len(tex_coord),
type=VEC2,
max=tex_coord.max(axis=0).tolist(),
min=tex_coord.min(axis=0).tolist(),
)
])
binary_blob = self.gltf.binary_blob()
binary_blob_base = len(binary_blob)
self.gltf.bufferViews.extend([
BufferView(
buffer=0,
byteOffset=binary_blob_base,
byteLength=len(indices_blob),
target=ELEMENT_ARRAY_BUFFER,
),
BufferView(
buffer=0,
byteOffset=binary_blob_base + len(indices_blob),
byteLength=len(vertices_blob),
target=ARRAY_BUFFER,
),
BufferView(
buffer=0,
byteOffset=binary_blob_base + len(indices_blob) + len(vertices_blob),
byteLength=len(tex_coord_blob),
target=ARRAY_BUFFER,
)
])
self.gltf.set_binary_blob(binary_blob + indices_blob + vertices_blob + tex_coord_blob)
#
#
# def create_gltf(vertices: np.ndarray, indices: np.ndarray, tex_coord: np.ndarray, mode: int = TRIANGLES,
# material: Optional[Material] = None, images: Optional[List[Image]] = None) -> GLTF2:
# """Create a glTF object from vertices and optionally indices.
#
# If indices are not set, vertices are interpreted as line_strip."""
#
# assert vertices.ndim == 2
# assert vertices.shape[1] == 3
# vertices = vertices.astype(np.float32)
# vertices_blob = vertices.tobytes()
# # print(vertices)
#
# indices = indices.astype(np.uint8)
# indices_blob = indices.flatten().tobytes()
# # print(indices)
#
# tex_coord = tex_coord.astype(np.float32)
# tex_coord_blob = tex_coord.tobytes()
# # print(tex_coord)
#
# if images is None:
# images = []
# image_blob = b''
# image_blob_pointers = []
# for i, img in enumerate(images):
# image_blob = img_to_blob(i, image_blob, image_blob_pointers, images, img)
#
# gltf = GLTF2(
# scene=0,
# scenes=[Scene(nodes=[0])],
# nodes=[Node(mesh=0)],
# meshes=[
# Mesh(
# primitives=[
# Primitive(
# attributes=Attributes(POSITION=1, TEXCOORD_0=2) if len(tex_coord) > 0 else Attributes(
# POSITION=1),
# indices=0,
# mode=mode,
# material=0 if material is not None else None,
# )
# ]
# )
# ],
# materials=[material] if material is not None else [],
# accessors=[
# Accessor(
# bufferView=0,
# componentType=UNSIGNED_BYTE,
# count=indices.size,
# type=SCALAR,
# max=[int(indices.max())],
# min=[int(indices.min())],
# ),
# Accessor(
# bufferView=1,
# componentType=FLOAT,
# count=len(vertices),
# type=VEC3,
# max=vertices.max(axis=0).tolist(),
# min=vertices.min(axis=0).tolist(),
# ),
# ] + ([
# Accessor(
# bufferView=2,
# componentType=FLOAT,
# count=len(tex_coord),
# type=VEC2,
# max=tex_coord.max(axis=0).tolist(),
# min=tex_coord.min(axis=0).tolist(),
# )] if len(tex_coord) > 0 else [])
# ,
# bufferViews=[
# BufferView(
# buffer=0,
# byteLength=len(indices_blob),
# target=ELEMENT_ARRAY_BUFFER,
# ),
# BufferView(
# buffer=0,
# byteOffset=len(indices_blob),
# byteLength=len(vertices_blob),
# target=ARRAY_BUFFER,
# ),
# ] + (
# [
# BufferView(
# buffer=0,
# byteOffset=len(indices_blob) + len(vertices_blob),
# byteLength=len(tex_coord_blob),
# target=ARRAY_BUFFER,
# ),
# ] if len(tex_coord) > 0 else []) + (
# [
# BufferView(
# buffer=0,
# byteOffset=len(indices_blob) + len(
# vertices_blob) + len(tex_coord_blob) + image_blob_pointers[i],
# byteLength=image_blob_pointers[i + 1] - image_blob_pointers[i] if i + 1 < len(
# image_blob_pointers) else len(image_blob) - image_blob_pointers[i],
# )
# for i, img in enumerate(images)
# ] if len(images) > 0 else []),
# buffers=[
# Buffer(
# byteLength=len(indices_blob) + len(vertices_blob) + len(tex_coord_blob) + len(image_blob),
# )
# ],
# samplers=[Sampler(magFilter=NEAREST)] if len(images) > 0 else [],
# textures=[Texture(source=i, sampler=0) for i, _ in enumerate(images)],
# images=images,
# )
#
# gltf.set_binary_blob(indices_blob + vertices_blob + tex_coord_blob + image_blob)
#
# return gltf
def img_blob(img: Image) -> bytes:
return base64.decodebytes(img.uri.split('base64,', maxsplit=1)[1].encode('ascii'))

View File

@@ -30,21 +30,16 @@ if __name__ == "__main__":
from __init__ import show_object, server from __init__ import show_object, server
ASSETS_DIR = os.getenv('ASSETS_DIR', os.path.join(os.path.dirname(__file__), '..', 'assets')) ASSETS_DIR = os.getenv('ASSETS_DIR', os.path.join(os.path.dirname(__file__), '..', 'assets'))
# 1. Add the CAD part of the logo to the server # Add the CAD part of the logo to the server
obj = build_logo() obj = build_logo()
Shape(obj).export_stl(os.path.join(ASSETS_DIR, 'logo.stl'))
show_object(obj, 'logo') show_object(obj, 'logo')
# 2. Load the GLTF part of the logo # Save the complete logo to a single GLB file
with open(os.path.join(ASSETS_DIR, 'fox.glb'), 'rb') as f: with open(os.path.join(ASSETS_DIR, 'logo.glb'), 'wb') as f:
gltf = f.read()
show_object(gltf, 'fox')
# 3. Save the complete logo to a GLBS file
with open(os.path.join(ASSETS_DIR, 'logo.glbs'), 'wb') as f:
async def writer(): async def writer():
async for chunk in server.export_all(): f.write(await server.export('logo'))
f.write(chunk)
asyncio.run(writer()) asyncio.run(writer())
print('Logo saved to', os.path.join(ASSETS_DIR, 'logo.glbs')) print('Logo saved to', os.path.join(ASSETS_DIR, 'logo.glb'))

View File

@@ -1,26 +1,22 @@
import asyncio import asyncio
import atexit import atexit
import hashlib import hashlib
import logging
import os import os
import signal import signal
import sys import sys
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from threading import Thread from threading import Thread
from typing import Optional, Dict, Union, AsyncGenerator, List from typing import Optional, Dict, Union
import tqdm.asyncio
from OCP.TopoDS import TopoDS_Shape from OCP.TopoDS import TopoDS_Shape
from aiohttp import web from aiohttp import web
from build123d import Shape, Axis from build123d import Shape, Axis
from dataclasses_json import dataclass_json, config from dataclasses_json import dataclass_json, config
from tqdm.contrib.logging import logging_redirect_tqdm
from glbs import glb_sequence_to_glbs
from mylogger import logger from mylogger import logger
from pubsub import BufferedPubSub from pubsub import BufferedPubSub
from tessellate import _hashcode, tessellate_count, tessellate from tessellate import _hashcode, tessellate
FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist') FRONTEND_BASE_PATH = os.getenv('FRONTEND_BASE_PATH', '../dist')
UPDATES_API_PATH = '/api/updates' UPDATES_API_PATH = '/api/updates'
@@ -197,42 +193,28 @@ class Server:
self._show_common(name, _hashcode(obj), start, obj) self._show_common(name, _hashcode(obj), start, obj)
async def _api_object(self, request: web.Request) -> web.StreamResponse: async def _api_object(self, request: web.Request) -> web.Response:
"""Returns the object file with the matching name, building it if necessary.""" """Returns the object file with the matching name, building it if necessary."""
# Start exporting the object (or fail if not found) # Export the object (or fail if not found)
export_data = self._export(request.match_info['name']) exported_glb = self.export(request.match_info['name'])
response = web.StreamResponse() response = web.Response()
try: try:
# First exported element is the object itself, grab it to collect data
export_obj = await anext(export_data)
# Create a new stream response with custom content type and headers # Create a new stream response with custom content type and headers
response.content_type = 'model/gltf-binary-sequence' response.content_type = 'model/gltf-binary'
response.headers['Content-Disposition'] = f'attachment; filename="{request.match_info["name"]}.glbs"' response.headers['Content-Disposition'] = f'attachment; filename="{request.match_info["name"]}.glb"'
total_parts = 1 if export_obj is None else tessellate_count(export_obj)
response.headers['X-Object-Parts'] = str(total_parts)
await response.prepare(request) await response.prepare(request)
# Convert the GLB sequence to a GLBS sequence and write it to the response # Stream the export data to the response
with logging_redirect_tqdm(tqdm_class=tqdm.asyncio.tqdm): response.body = exported_glb
if logger.isEnabledFor(logging.INFO):
# noinspection PyTypeChecker
export_data_iter = tqdm.asyncio.tqdm(export_data, total=total_parts)
else:
export_data_iter = export_data
async for chunk in glb_sequence_to_glbs(export_data_iter, total_parts):
await response.write(chunk)
finally: finally:
# Close the export data subscription
await export_data.aclose()
# Close the response (if not an error) # Close the response (if not an error)
if response.prepared: if response.prepared:
await response.write_eof() await response.write_eof()
return response return response
async def _export(self, name: str) -> AsyncGenerator[Union[TopoDS_Shape, bytes], None]: async def export(self, name: str) -> bytes:
"""Export the given previously-shown object to a sequence of GLB files, building it if necessary.""" """Export the given previously-shown object to a single GLB file, building it if necessary."""
start = time.time() start = time.time()
# Check that the object to build exists and grab it if it does # Check that the object to build exists and grab it if it does
found = False found = False
@@ -249,9 +231,6 @@ class Server:
if not found: if not found:
raise web.HTTPNotFound(text=f'No object named {name} was previously shown') raise web.HTTPNotFound(text=f'No object named {name} was previously shown')
# First published element is the TopoDS_Shape, which is None for glTF objects
yield obj
# Use the lock to ensure that we don't build the object twice # Use the lock to ensure that we don't build the object twice
async with self.object_events_lock: async with self.object_events_lock:
# If there are no object events for this name, we need to build the object # If there are no object events for this name, we need to build the object
@@ -261,15 +240,12 @@ class Server:
self.object_events[name] = publish_to self.object_events[name] = publish_to
def _build_object(): def _build_object():
# Build the object # Build and publish the object (once)
part_count = 0 gltf = tessellate(obj) # TODO: Publish tessellate options
for tessellation_update in tessellate(obj): glb_list_of_bytes = gltf.save_to_bytes()
# We publish the object parts as soon as we have a new tessellation publish_to.publish_nowait(b''.join(glb_list_of_bytes))
list_of_bytes = tessellation_update.gltf.save_to_bytes() logger.info('export(%s) took %.3f seconds, %d parts', name, time.time() - start,
publish_to.publish_nowait(b''.join(list_of_bytes)) len(gltf.meshes[0].primitives))
part_count += 1
publish_to.publish_nowait(b'') # Signal the end of the stream
logger.info('export(%s) took %.3f seconds, %d parts', name, time.time() - start, part_count)
# We should build it fully even if we are cancelled, so we use a separate task # We should build it fully even if we are cancelled, so we use a separate task
# Furthermore, building is CPU-bound, so we use the default executor # Furthermore, building is CPU-bound, so we use the default executor
@@ -278,57 +254,6 @@ class Server:
# In either case return the elements of a subscription to the async generator # In either case return the elements of a subscription to the async generator
subscription = self.object_events[name].subscribe() subscription = self.object_events[name].subscribe()
try: try:
async for chunk in subscription: return await anext(subscription)
if chunk == b'':
break
yield chunk
finally:
await subscription.aclose()
async def export_all(self) -> AsyncGenerator[bytes, None]:
"""Export all previously shown objects to a single GLBS file, returned as an async generator.
This is useful for fully-static deployments where the frontend handles everything."""
# Check that the object to build exists and grab it if it does
all_object_names: List[str] = []
total_export_size = 0
subscription = self.show_events.subscribe(include_future=False)
try:
async for data in subscription:
all_object_names.append(data.name)
if data.obj is not None:
total_export_size += tessellate_count(data.obj)
else:
total_export_size += 1
finally:
await subscription.aclose()
# Create a generator that merges the export of all objects
async def _merge_exports() -> AsyncGenerator[bytes, None]:
for i, name in enumerate(all_object_names):
obj_subscription = self._export(name)
try:
obj = await anext(obj_subscription)
glb_parts = obj_subscription
if logger.isEnabledFor(logging.INFO):
total = tessellate_count(obj) if obj is not None else 1
# noinspection PyTypeChecker
glb_parts = tqdm.asyncio.tqdm(obj_subscription, total=total)
async for glb_part in glb_parts:
yield glb_part
finally:
await obj_subscription.aclose()
# Need to have a single subscription to all objects to write a valid GLBS file
subscription = _merge_exports()
try:
with logging_redirect_tqdm(tqdm_class=tqdm.asyncio.tqdm):
glbs_parts = subscription
if logger.isEnabledFor(logging.INFO):
# noinspection PyTypeChecker
glbs_parts = tqdm.asyncio.tqdm(glbs_parts, total=total_export_size, position=0)
glbs_parts = glb_sequence_to_glbs(glbs_parts, total_export_size)
async for glbs_part in glbs_parts:
yield glbs_part
finally: finally:
await subscription.aclose() await subscription.aclose()

View File

@@ -19,7 +19,7 @@ from build123d import Face, Vector, Shape, Vertex
from pygltflib import LINE_STRIP, GLTF2, Material, PbrMetallicRoughness, TRIANGLES, POINTS, TextureInfo from pygltflib import LINE_STRIP, GLTF2, Material, PbrMetallicRoughness, TRIANGLES, POINTS, TextureInfo
import mylogger import mylogger
from gltf import create_gltf, _checkerboard_image from gltf import GLTFMgr
@dataclass @dataclass
@@ -55,57 +55,36 @@ def tessellate_count(ocp_shape: TopoDS_Shape) -> int:
def tessellate( def tessellate(
ocp_shape: TopoDS_Shape, ocp_shape: TopoDS_Shape,
tolerance: float = 0.1, tolerance: float = 1e-3,
angular_tolerance: float = 0.1, angular_tolerance: float = 0.1,
) -> Generator[TessellationUpdate, None, None]: faces: bool = True,
"""Tessellate a whole shape into a list of triangle vertices and a list of triangle indices. edges: bool = True,
vertices: bool = True,
NOTE: The logic of the method is weird because multiprocessing was tested, but it seems too inefficient ) -> GLTF2:
with slow native packages. """Tessellate a whole shape into a list of triangle vertices and a list of triangle indices."""
""" mgr = GLTFMgr()
shape = Shape(ocp_shape) shape = Shape(ocp_shape)
features = []
# Submit tessellation tasks # Perform tessellation tasks
for face in shape.faces(): if faces:
features.append(_tessellate_element(face.wrapped, tolerance, angular_tolerance)) for face in shape.faces():
for edge in shape.edges(): _tessellate_face(mgr, face.wrapped, tolerance, angular_tolerance)
features.append(_tessellate_element(edge.wrapped, tolerance, angular_tolerance)) if edges:
for vertex in shape.vertices(): for edge in shape.edges():
features.append(_tessellate_element(vertex.wrapped, tolerance, angular_tolerance)) _tessellate_edge(mgr, edge.wrapped, tolerance, angular_tolerance)
if vertices:
for vertex in shape.vertices():
_tessellate_vertex(mgr, vertex.wrapped)
# Collect results as they come in return mgr.gltf
for i, future in enumerate(features):
sub_shape, gltf = future
yield TessellationUpdate(
progress=(i + 1) / len(features),
shape=sub_shape,
gltf=gltf,
)
# Define the function that will tessellate each element in parallel
def _tessellate_element(
element: TopoDS_Shape, tolerance: float, angular_tolerance: float) -> Tuple[TopoDS_Shape, GLTF2]:
if isinstance(element, TopoDS_Face):
return element, _tessellate_face(element, tolerance, angular_tolerance)
elif isinstance(element, TopoDS_Edge):
return element, _tessellate_edge(element, angular_tolerance, angular_tolerance)
elif isinstance(element, TopoDS_Vertex):
return element, _tessellate_vertex(element)
else:
raise ValueError(f"Unknown element type: {element}")
TriMesh = Tuple[list[Vector], list[Tuple[int, int, int]]]
def _tessellate_face( def _tessellate_face(
mgr: GLTFMgr,
ocp_face: TopoDS_Face, ocp_face: TopoDS_Face,
tolerance: float = 0.1, tolerance: float = 1e-3,
angular_tolerance: float = 0.1 angular_tolerance: float = 0.1
) -> GLTF2: ):
"""Tessellate a face into a list of triangle vertices and a list of triangle indices"""
face = Face(ocp_face) face = Face(ocp_face)
face.mesh(tolerance, angular_tolerance) face.mesh(tolerance, angular_tolerance)
loc = TopLoc_Location() loc = TopLoc_Location()
@@ -124,19 +103,15 @@ def _tessellate_face(
vertices = np.array(list(map(lambda v: [v.X, v.Y, v.Z], tri_mesh[0]))) vertices = np.array(list(map(lambda v: [v.X, v.Y, v.Z], tri_mesh[0])))
indices = np.array(tri_mesh[1]) indices = np.array(tri_mesh[1])
tex_coord = np.array(uv) tex_coord = np.array(uv)
mode = TRIANGLES mgr.add_face(vertices, indices, tex_coord)
material = Material(pbrMetallicRoughness=PbrMetallicRoughness(
baseColorFactor=[0.3, 1.0, 0.2, 1.0], metallicFactor=0.1, baseColorTexture=TextureInfo(index=0)),
alphaCutoff=None)
return create_gltf(vertices, indices, tex_coord, mode, material, images=[_checkerboard_image])
def _tessellate_edge( def _tessellate_edge(
mgr: GLTFMgr,
ocp_edge: TopoDS_Edge, ocp_edge: TopoDS_Edge,
angular_deflection: float = 0.1, angular_deflection: float = 1e-3,
curvature_deflection: float = 0.1, curvature_deflection: float = 0.1,
) -> GLTF2: ):
"""Tessellate a wire or edge into a list of ordered vertices"""
curve = BRepAdaptor_Curve(ocp_edge) curve = BRepAdaptor_Curve(ocp_edge)
discretizer = GCPnts_TangentialDeflection(curve, angular_deflection, curvature_deflection) discretizer = GCPnts_TangentialDeflection(curve, angular_deflection, curvature_deflection)
assert discretizer.NbPoints() > 1, "Edge is too small??" assert discretizer.NbPoints() > 1, "Edge is too small??"
@@ -151,26 +126,12 @@ def _tessellate_edge(
for i in range(1, discretizer.NbPoints() + 1) for i in range(1, discretizer.NbPoints() + 1)
) )
] ]
indices = np.array(list(map(lambda i: [i, i + 1], range(len(vertices) - 1))), dtype=np.uint8) mgr.add_edge(np.array(vertices))
tex_coord = np.array([], dtype=np.float32)
mode = LINE_STRIP
material = Material(
pbrMetallicRoughness=PbrMetallicRoughness(baseColorFactor=[0.0, 0.0, 0.3, 1.0]),
alphaCutoff=None)
return create_gltf(np.array(vertices), indices, tex_coord, mode, material)
def _tessellate_vertex(ocp_vertex: TopoDS_Vertex) -> GLTF2: def _tessellate_vertex(mgr: GLTFMgr, ocp_vertex: TopoDS_Vertex):
"""Tessellate a vertex into a list of triangle vertices and a list of triangle indices"""
c = Vertex(ocp_vertex).center() c = Vertex(ocp_vertex).center()
vertices = np.array([[c.X, c.Y, c.Z]]) mgr.add_vertex(c)
indices = np.array([0])
tex_coord = np.array([], dtype=np.float32)
mode = POINTS
material = Material(
pbrMetallicRoughness=PbrMetallicRoughness(baseColorFactor=[1.0, 0.5, 0.5, 1.0]),
alphaCutoff=None)
return create_gltf(vertices, indices, tex_coord, mode, material)
def _hashcode(obj: TopoDS_Shape) -> str: def _hashcode(obj: TopoDS_Shape) -> str: