refactor(Klipper): refactor klipper_setup to reduce cognitive complexity

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2023-12-27 23:49:34 +01:00
parent 83e5d9c0d5
commit f3b0e45e39
2 changed files with 101 additions and 59 deletions

View File

@@ -10,13 +10,11 @@
# ======================================================================= #
from pathlib import Path
from typing import List
from kiauh import KIAUH_CFG
from kiauh.core.backup_manager.backup_manager import BackupManager
from kiauh.core.config_manager.config_manager import ConfigManager
from kiauh.core.instance_manager.instance_manager import InstanceManager
from kiauh.core.instance_manager.name_scheme import NameScheme
from kiauh.modules.klipper import (
EXIT_KLIPPER_SETUP,
DEFAULT_KLIPPER_REPO_URL,
@@ -25,23 +23,22 @@ from kiauh.modules.klipper import (
KLIPPER_REQUIREMENTS_TXT,
)
from kiauh.modules.klipper.klipper import Klipper
from kiauh.modules.klipper.klipper_dialogs import (
print_update_warn_dialog,
print_select_custom_name_dialog,
)
from kiauh.modules.klipper.klipper_dialogs import print_update_warn_dialog
from kiauh.modules.klipper.klipper_utils import (
handle_disruptive_system_packages,
check_user_groups,
handle_to_multi_instance_conversion,
create_example_printer_cfg,
detect_name_scheme,
add_to_existing,
get_install_count,
assign_custom_name,
init_name_scheme,
check_is_single_to_multi_conversion,
update_name_scheme,
handle_instance_naming,
)
from kiauh.core.repo_manager.repo_manager import RepoManager
from kiauh.modules.moonraker.moonraker import Moonraker
from kiauh.utils.input_utils import get_confirm, get_number_input
from kiauh.utils.input_utils import get_confirm
from kiauh.utils.logger import Logger
from kiauh.utils.system_utils import (
parse_packages_from_file,
@@ -52,83 +49,50 @@ from kiauh.utils.system_utils import (
)
# TODO: this method needs refactoring! (but it works for now)
def install_klipper() -> None:
im = InstanceManager(Klipper)
kl_instances: List[Klipper] = im.instances
kl_im = InstanceManager(Klipper)
# ask to add new instances, if there are existing ones
if kl_instances and not add_to_existing():
if kl_im.instances and not add_to_existing():
Logger.print_status(EXIT_KLIPPER_SETUP)
return
install_count = get_install_count()
# install_count = None -> user entered "b" to go back
if install_count is None:
Logger.print_status(EXIT_KLIPPER_SETUP)
return
# create a dict of the size of the existing instances + install count
name_scheme = NameScheme.SINGLE
single_to_multi = len(kl_instances) == 1 and kl_instances[0].suffix == ""
name_dict = {c: "" for c in range(len(kl_instances) + install_count)}
name_dict = {c: "" for c in range(len(kl_im.instances) + install_count)}
name_scheme = init_name_scheme(kl_im.instances, install_count)
mr_im = InstanceManager(Moonraker)
name_scheme = update_name_scheme(
name_scheme, name_dict, kl_im.instances, mr_im.instances
)
if (not kl_instances and install_count > 1) or single_to_multi:
print_select_custom_name_dialog()
if get_confirm("Assign custom names?", False, allow_go_back=True):
name_scheme = NameScheme.CUSTOM
else:
name_scheme = NameScheme.INDEX
# if there are more moonraker instances installed than klipper, we
# load their names into the name_dict, as we will detect and enforce that naming scheme
mr_instances: List[Moonraker] = InstanceManager(Moonraker).instances
if len(mr_instances) > len(kl_instances):
for k, v in enumerate(mr_instances):
name_dict[k] = v.suffix
name_scheme = detect_name_scheme(mr_instances)
elif len(kl_instances) > 1:
for k, v in enumerate(kl_instances):
name_dict[k] = v.suffix
name_scheme = detect_name_scheme(kl_instances)
# set instance names if multiple instances will be created
if name_scheme != NameScheme.SINGLE:
for k in name_dict:
if name_dict[k] == "" and name_scheme == NameScheme.INDEX:
name_dict[k] = str(k + 1)
elif name_dict[k] == "" and name_scheme == NameScheme.CUSTOM:
assign_custom_name(k, name_dict)
handle_instance_naming(name_dict, name_scheme)
create_example_cfg = get_confirm("Create example printer.cfg?")
if not kl_instances:
if not kl_im.instances:
setup_klipper_prerequesites()
count = 0
for name in name_dict:
if name_dict[name] in [n.suffix for n in kl_instances]:
if name_dict[name] in [n.suffix for n in kl_im.instances]:
continue
else:
count += 1
if single_to_multi:
if check_is_single_to_multi_conversion(kl_im.instances):
handle_to_multi_instance_conversion(name_dict[name])
single_to_multi = False
count -= 1
else:
new_instance = Klipper(suffix=name_dict[name])
im.current_instance = new_instance
im.create_instance()
im.enable_instance()
if create_example_cfg:
create_example_printer_cfg(new_instance)
im.start_instance()
continue
count += 1
create_klipper_instance(name_dict[name], create_example_cfg)
if count == install_count:
break
im.reload_daemon()
kl_im.reload_daemon()
# step 4: check/handle conflicting packages/services
handle_disruptive_system_packages()
@@ -194,3 +158,14 @@ def update_klipper() -> None:
)
repo_manager.pull_repo()
instance_manager.start_all_instance()
def create_klipper_instance(name: str, create_example_cfg: bool) -> None:
kl_im = InstanceManager(Klipper)
new_instance = Klipper(suffix=name)
kl_im.current_instance = new_instance
kl_im.create_instance()
kl_im.enable_instance()
if create_example_cfg:
create_example_printer_cfg(new_instance)
kl_im.start_instance()

View File

@@ -29,7 +29,9 @@ from kiauh.modules.klipper.klipper_dialogs import (
print_missing_usergroup_dialog,
print_instance_overview,
print_select_instance_count_dialog,
print_select_custom_name_dialog,
)
from kiauh.modules.moonraker.moonraker import Moonraker
from kiauh.modules.moonraker.moonraker_utils import moonraker_to_multi_conversion
from kiauh.utils.common import get_install_status_common, get_repo_name
from kiauh.utils.constants import CURRENT_USER
@@ -45,6 +47,65 @@ def get_klipper_status() -> Dict[Literal["status", "repo"], str]:
}
def check_is_multi_install(
existing_instances: List[Klipper], install_count: int
) -> bool:
return not existing_instances and install_count > 1
def check_is_single_to_multi_conversion(existing_instances: List[Klipper]) -> bool:
return len(existing_instances) == 1 and existing_instances[0].suffix == ""
def init_name_scheme(
existing_instances: List[Klipper], install_count: int
) -> NameScheme:
if check_is_multi_install(
existing_instances, install_count
) or check_is_single_to_multi_conversion(existing_instances):
print_select_custom_name_dialog()
if get_confirm("Assign custom names?", False, allow_go_back=True):
return NameScheme.CUSTOM
else:
return NameScheme.INDEX
else:
return NameScheme.SINGLE
def update_name_scheme(
name_scheme: NameScheme,
name_dict: Dict[int, str],
klipper_instances: List[Klipper],
moonraker_instances: List[Moonraker],
) -> NameScheme:
# if there are more moonraker instances installed than klipper, we
# load their names into the name_dict, as we will detect and enforce that naming scheme
if len(moonraker_instances) > len(klipper_instances):
update_name_dict(name_dict, moonraker_instances)
return detect_name_scheme(moonraker_instances)
elif len(klipper_instances) > 1:
update_name_dict(name_dict, klipper_instances)
return detect_name_scheme(klipper_instances)
else:
return name_scheme
def update_name_dict(name_dict: Dict[int, str], instances: List[BaseInstance]) -> None:
for k, v in enumerate(instances):
name_dict[k] = v.suffix
def handle_instance_naming(name_dict: Dict[int, str], name_scheme: NameScheme) -> None:
if name_scheme == NameScheme.SINGLE:
return
for k in name_dict:
if name_dict[k] == "" and name_scheme == NameScheme.INDEX:
name_dict[k] = str(k + 1)
elif name_dict[k] == "" and name_scheme == NameScheme.CUSTOM:
assign_custom_name(k, name_dict)
def add_to_existing() -> bool:
kl_instances = InstanceManager(Klipper).instances
print_instance_overview(kl_instances)
@@ -52,6 +113,12 @@ def add_to_existing() -> bool:
def get_install_count() -> Union[int, None]:
"""
Print a dialog for selecting the amount of Klipper instances
to set up with an option to navigate back. Returns None if the
user selected to go back, otherwise an integer greater or equal than 1 |
:return: Integer >= 1 or None
"""
kl_instances = InstanceManager(Klipper).instances
print_select_instance_count_dialog()
question = f"Number of{' additional' if len(kl_instances) > 0 else ''} Klipper instances to set up"