stepper: Implement active callbacks via motion_queuing.register_flush_callback()

Use the existing register_flush_callback() system to implement motor
activity checking.  This simplifies the generate_steps() code.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-08-05 14:50:36 -04:00
parent b5e573957c
commit c454e88d9a
4 changed files with 29 additions and 14 deletions

View File

@@ -235,16 +235,26 @@ class MCU_stepper:
return old_tq
def add_active_callback(self, cb):
self._active_callbacks.append(cb)
if len(self._active_callbacks) == 1:
printer = self._mcu.get_printer()
motion_queuing = printer.lookup_object('motion_queuing')
motion_queuing.register_flush_callback(self._check_active)
def _check_active(self, must_flush_time, max_step_gen_time):
sk = self._stepper_kinematics
ret = self._itersolve_check_active(sk, max_step_gen_time)
if not ret:
# Stepper motor still not active
return
# Motor is active, disable future checking
printer = self._mcu.get_printer()
motion_queuing = printer.lookup_object('motion_queuing')
motion_queuing.unregister_flush_callback(self._check_active)
cbs = self._active_callbacks
self._active_callbacks = []
# Invoke callbacks
for cb in cbs:
cb(ret)
def generate_steps(self, flush_time):
# Check for activity if necessary
if self._active_callbacks:
sk = self._stepper_kinematics
ret = self._itersolve_check_active(sk, flush_time)
if ret:
cbs = self._active_callbacks
self._active_callbacks = []
for cb in cbs:
cb(ret)
# Generate steps
sk = self._stepper_kinematics
ret = self._itersolve_generate_steps(sk, flush_time)