refactor(kiauh): reword print_info to print_status and implement new print_info method

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2023-12-03 23:06:30 +01:00
parent 458c89a78a
commit bfb10c742b
9 changed files with 58 additions and 49 deletions

View File

@@ -56,7 +56,7 @@ class BackupManager:
try: try:
log = f"Creating backup of {self.backup_name} in {self.backup_dir} ..." log = f"Creating backup of {self.backup_name} in {self.backup_dir} ..."
Logger.print_info(log) Logger.print_status(log)
date = get_current_date() date = get_current_date()
dest = Path( dest = Path(
f"{self.backup_dir}/{self.backup_name}/{date.get('date')}-{date.get('time')}" f"{self.backup_dir}/{self.backup_name}/{date.get('date')}-{date.get('time')}"

View File

@@ -29,6 +29,7 @@ class InstanceManager:
self._current_instance: Optional[I] = None self._current_instance: Optional[I] = None
self._instance_suffix: Optional[str] = None self._instance_suffix: Optional[str] = None
self._instance_service: Optional[str] = None self._instance_service: Optional[str] = None
self._instance_service_full: Optional[str] = None
self._instance_service_path: Optional[str] = None self._instance_service_path: Optional[str] = None
self._instances: List[I] = [] self._instances: List[I] = []
@@ -67,6 +68,10 @@ class InstanceManager:
def instance_service(self, value: str): def instance_service(self, value: str):
self._instance_service = value self._instance_service = value
@property
def instance_service_full(self) -> str:
return f"{self._instance_service}.service"
@property @property
def instance_service_path(self) -> str: def instance_service_path(self) -> str:
return self._instance_service_path return self._instance_service_path
@@ -107,35 +112,33 @@ class InstanceManager:
raise ValueError("current_instance cannot be None") raise ValueError("current_instance cannot be None")
def enable_instance(self) -> None: def enable_instance(self) -> None:
Logger.print_info(f"Enabling {self.instance_service} ...") Logger.print_status(f"Enabling {self.instance_service_full} ...")
try: try:
command = ["sudo", "systemctl", "enable", self.instance_service] command = ["sudo", "systemctl", "enable", self.instance_service_full]
if subprocess.run(command, check=True): if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service}.service enabled.") Logger.print_ok(f"{self.instance_service_full} enabled.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
Logger.print_error( Logger.print_error(f"Error enabling service {self.instance_service_full}:")
f"Error enabling service {self.instance_service}.service:"
)
Logger.print_error(f"{e}") Logger.print_error(f"{e}")
def disable_instance(self) -> None: def disable_instance(self) -> None:
Logger.print_info(f"Disabling {self.instance_service} ...") Logger.print_status(f"Disabling {self.instance_service_full} ...")
try: try:
command = ["sudo", "systemctl", "disable", self.instance_service] command = ["sudo", "systemctl", "disable", self.instance_service_full]
if subprocess.run(command, check=True): if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service} disabled.") Logger.print_ok(f"{self.instance_service_full} disabled.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
Logger.print_error(f"Error disabling service {self.instance_service}:") Logger.print_error(f"Error disabling {self.instance_service_full}:")
Logger.print_error(f"{e}") Logger.print_error(f"{e}")
def start_instance(self) -> None: def start_instance(self) -> None:
Logger.print_info(f"Starting {self.instance_service} ...") Logger.print_status(f"Starting {self.instance_service_full} ...")
try: try:
command = ["sudo", "systemctl", "start", self.instance_service] command = ["sudo", "systemctl", "start", self.instance_service_full]
if subprocess.run(command, check=True): if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service} started.") Logger.print_ok(f"{self.instance_service_full} started.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
Logger.print_error(f"Error starting service {self.instance_service}:") Logger.print_error(f"Error starting {self.instance_service_full}:")
Logger.print_error(f"{e}") Logger.print_error(f"{e}")
def start_all_instance(self) -> None: def start_all_instance(self) -> None:
@@ -144,13 +147,13 @@ class InstanceManager:
self.start_instance() self.start_instance()
def stop_instance(self) -> None: def stop_instance(self) -> None:
Logger.print_info(f"Stopping {self.instance_service} ...") Logger.print_status(f"Stopping {self.instance_service_full} ...")
try: try:
command = ["sudo", "systemctl", "stop", self.instance_service] command = ["sudo", "systemctl", "stop", self.instance_service_full]
if subprocess.run(command, check=True): if subprocess.run(command, check=True):
Logger.print_ok(f"{self.instance_service} stopped.") Logger.print_ok(f"{self.instance_service_full} stopped.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
Logger.print_error(f"Error stopping service {self.instance_service}:") Logger.print_error(f"Error stopping {self.instance_service_full}:")
Logger.print_error(f"{e}") Logger.print_error(f"{e}")
raise raise
@@ -160,7 +163,7 @@ class InstanceManager:
self.stop_instance() self.stop_instance()
def reload_daemon(self) -> None: def reload_daemon(self) -> None:
Logger.print_info("Reloading systemd manager configuration ...") Logger.print_status("Reloading systemd manager configuration ...")
try: try:
command = ["sudo", "systemctl", "daemon-reload"] command = ["sudo", "systemctl", "daemon-reload"]
if subprocess.run(command, check=True): if subprocess.run(command, check=True):

View File

@@ -59,11 +59,13 @@ class RepoManager:
def clone_repo(self): def clone_repo(self):
log = f"Cloning repository from '{self.repo}' with method '{self.method}'" log = f"Cloning repository from '{self.repo}' with method '{self.method}'"
Logger.print_info(log) Logger.print_status(log)
try: try:
if os.path.exists(self.target_dir): if os.path.exists(self.target_dir):
if not get_confirm("Target directory already exists. Overwrite?"): if not get_confirm(
Logger.print_info("Skipping re-clone of repository ...") "Target directory already exists. Overwrite?", default_choice=False
):
Logger.print_info("Skipping re-clone of repository.")
return return
shutil.rmtree(self.target_dir) shutil.rmtree(self.target_dir)
@@ -78,7 +80,7 @@ class RepoManager:
return return
def pull_repo(self) -> None: def pull_repo(self) -> None:
Logger.print_info(f"Updating repository '{self.repo}' ...") Logger.print_status(f"Updating repository '{self.repo}' ...")
try: try:
self._pull() self._pull()
except subprocess.CalledProcessError: except subprocess.CalledProcessError:

View File

@@ -38,7 +38,7 @@ class Klipper(BaseInstance):
self.uds = f"{self.comms_dir}/klippy.sock" self.uds = f"{self.comms_dir}/klippy.sock"
def create(self) -> None: def create(self) -> None:
Logger.print_info("Creating new Klipper Instance ...") Logger.print_status("Creating new Klipper Instance ...")
module_path = os.path.dirname(os.path.abspath(__file__)) module_path = os.path.dirname(os.path.abspath(__file__))
service_template_path = os.path.join(module_path, "res", "klipper.service") service_template_path = os.path.join(module_path, "res", "klipper.service")
env_template_file_path = os.path.join(module_path, "res", "klipper.env") env_template_file_path = os.path.join(module_path, "res", "klipper.env")
@@ -66,7 +66,7 @@ class Klipper(BaseInstance):
service_file = self.get_service_file_name(extension=True) service_file = self.get_service_file_name(extension=True)
service_file_path = self.get_service_file_path() service_file_path = self.get_service_file_path()
Logger.print_info(f"Deleting Klipper Instance: {service_file}") Logger.print_status(f"Deleting Klipper Instance: {service_file}")
try: try:
command = ["sudo", "rm", "-f", service_file_path] command = ["sudo", "rm", "-f", service_file_path]
@@ -113,12 +113,12 @@ class Klipper(BaseInstance):
def _delete_klipper_remnants(self) -> None: def _delete_klipper_remnants(self) -> None:
try: try:
Logger.print_info(f"Delete {self.klipper_dir} ...") Logger.print_status(f"Delete {self.klipper_dir} ...")
shutil.rmtree(Path(self.klipper_dir)) shutil.rmtree(Path(self.klipper_dir))
Logger.print_info(f"Delete {self.env_dir} ...") Logger.print_status(f"Delete {self.env_dir} ...")
shutil.rmtree(Path(self.env_dir)) shutil.rmtree(Path(self.env_dir))
except FileNotFoundError: except FileNotFoundError:
Logger.print_info("Cannot delete Klipper directories. Not found.") Logger.print_status("Cannot delete Klipper directories. Not found.")
except PermissionError as e: except PermissionError as e:
Logger.print_error(f"Error deleting Klipper directories: {e}") Logger.print_error(f"Error deleting Klipper directories: {e}")
raise raise

View File

@@ -67,7 +67,7 @@ def run_klipper_setup(install: bool) -> None:
if install: if install:
add_additional = handle_existing_instances(instance_list) add_additional = handle_existing_instances(instance_list)
if is_klipper_installed and not add_additional: if is_klipper_installed and not add_additional:
Logger.print_info(EXIT_KLIPPER_SETUP) Logger.print_status(EXIT_KLIPPER_SETUP)
return return
install_klipper(instance_manager, instance_list) install_klipper(instance_manager, instance_list)
@@ -97,12 +97,12 @@ def install_klipper(
question = f"Number of{' additional' if len(instance_list) > 0 else ''} Klipper instances to set up" question = f"Number of{' additional' if len(instance_list) > 0 else ''} Klipper instances to set up"
install_count = get_number_input(question, 1, default=1, allow_go_back=True) install_count = get_number_input(question, 1, default=1, allow_go_back=True)
if install_count is None: if install_count is None:
Logger.print_info(EXIT_KLIPPER_SETUP) Logger.print_status(EXIT_KLIPPER_SETUP)
return return
instance_names = set_instance_suffix(instance_list, install_count) instance_names = set_instance_suffix(instance_list, install_count)
if instance_names is None: if instance_names is None:
Logger.print_info(EXIT_KLIPPER_SETUP) Logger.print_status(EXIT_KLIPPER_SETUP)
return return
if len(instance_list) < 1: if len(instance_list) < 1:
@@ -199,7 +199,7 @@ def remove_single_instance(
question = f"Delete {KLIPPER_DIR} and {KLIPPER_ENV_DIR}?" question = f"Delete {KLIPPER_DIR} and {KLIPPER_ENV_DIR}?"
del_remnants = get_confirm(question, allow_go_back=True) del_remnants = get_confirm(question, allow_go_back=True)
if del_remnants is None: if del_remnants is None:
Logger.print_info("Exiting Klipper Uninstaller ...") Logger.print_status("Exiting Klipper Uninstaller ...")
return return
try: try:
@@ -229,10 +229,10 @@ def remove_multi_instance(
question = f"Delete {KLIPPER_DIR} and {KLIPPER_ENV_DIR}?" question = f"Delete {KLIPPER_DIR} and {KLIPPER_ENV_DIR}?"
del_remnants = get_confirm(question, allow_go_back=True) del_remnants = get_confirm(question, allow_go_back=True)
if del_remnants is None: if del_remnants is None:
Logger.print_info("Exiting Klipper Uninstaller ...") Logger.print_status("Exiting Klipper Uninstaller ...")
return return
Logger.print_info("Removing all Klipper instances ...") Logger.print_status("Removing all Klipper instances ...")
for instance in instance_list: for instance in instance_list:
instance_manager.current_instance = instance instance_manager.current_instance = instance
instance_manager.stop_instance() instance_manager.stop_instance()
@@ -241,7 +241,7 @@ def remove_multi_instance(
else: else:
instance = instance_list[int(selection)] instance = instance_list[int(selection)]
log = f"Removing Klipper instance: {instance.get_service_file_name()}" log = f"Removing Klipper instance: {instance.get_service_file_name()}"
Logger.print_info(log) Logger.print_status(log)
instance_manager.current_instance = instance instance_manager.current_instance = instance
instance_manager.stop_instance() instance_manager.stop_instance()
instance_manager.disable_instance() instance_manager.disable_instance()

View File

@@ -128,7 +128,7 @@ def check_user_groups():
try: try:
for group in missing_groups: for group in missing_groups:
Logger.print_info(f"Adding user '{CURRENT_USER}' to group {group} ...") Logger.print_status(f"Adding user '{CURRENT_USER}' to group {group} ...")
command = ["sudo", "usermod", "-a", "-G", group, CURRENT_USER] command = ["sudo", "usermod", "-a", "-G", group, CURRENT_USER]
subprocess.run(command, check=True) subprocess.run(command, check=True)
Logger.print_ok(f"Group {group} assigned to user '{CURRENT_USER}'.") Logger.print_ok(f"Group {group} assigned to user '{CURRENT_USER}'.")
@@ -162,7 +162,7 @@ def handle_disruptive_system_packages() -> None:
for service in services if services else []: for service in services if services else []:
try: try:
log = f"{service} service detected! Masking {service} service ..." log = f"{service} service detected! Masking {service} service ..."
Logger.print_info(log) Logger.print_status(log)
mask_system_service(service) mask_system_service(service)
Logger.print_ok(f"{service} service masked!") Logger.print_ok(f"{service} service masked!")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:

View File

@@ -14,6 +14,7 @@ import pwd
from pathlib import Path from pathlib import Path
# text colors and formats # text colors and formats
COLOR_WHITE = "\033[37m" # white
COLOR_MAGENTA = "\033[35m" # magenta COLOR_MAGENTA = "\033[35m" # magenta
COLOR_GREEN = "\033[92m" # bright green COLOR_GREEN = "\033[92m" # bright green
COLOR_YELLOW = "\033[93m" # bright yellow COLOR_YELLOW = "\033[93m" # bright yellow

View File

@@ -10,6 +10,7 @@
# ======================================================================= # # ======================================================================= #
from kiauh.utils.constants import ( from kiauh.utils.constants import (
COLOR_WHITE,
COLOR_GREEN, COLOR_GREEN,
COLOR_YELLOW, COLOR_YELLOW,
COLOR_RED, COLOR_RED,
@@ -34,6 +35,11 @@ class Logger:
# log to kiauh.log # log to kiauh.log
pass pass
@staticmethod
def print_info(msg, prefix=True, end="\n") -> None:
message = f"[INFO] {msg}" if prefix else msg
print(f"{COLOR_WHITE}{message}{RESET_FORMAT}", end=end)
@staticmethod @staticmethod
def print_ok(msg, prefix=True, end="\n") -> None: def print_ok(msg, prefix=True, end="\n") -> None:
message = f"[OK] {msg}" if prefix else msg message = f"[OK] {msg}" if prefix else msg
@@ -50,6 +56,6 @@ class Logger:
print(f"{COLOR_RED}{message}{RESET_FORMAT}", end=end) print(f"{COLOR_RED}{message}{RESET_FORMAT}", end=end)
@staticmethod @staticmethod
def print_info(msg, prefix=True, end="\n") -> None: def print_status(msg, prefix=True, end="\n") -> None:
message = f"###### {msg}" if prefix else msg message = f"\n###### {msg}" if prefix else msg
print(f"{COLOR_MAGENTA}{message}{RESET_FORMAT}", end=end) print(f"{COLOR_MAGENTA}{message}{RESET_FORMAT}", end=end)

View File

@@ -62,7 +62,7 @@ def create_python_venv(target: Path) -> None:
:param target: Path where to create the virtualenv at :param target: Path where to create the virtualenv at
:return: None :return: None
""" """
Logger.print_info("Set up Python virtual environment ...") Logger.print_status("Set up Python virtual environment ...")
if not target.exists(): if not target.exists():
try: try:
command = ["python3", "-m", "venv", f"{target}"] command = ["python3", "-m", "venv", f"{target}"]
@@ -76,7 +76,7 @@ def create_python_venv(target: Path) -> None:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}") Logger.print_error(f"Error setting up virtualenv:\n{e.output.decode()}")
else: else:
if get_confirm("Virtualenv already exists. Re-create?"): if get_confirm("Virtualenv already exists. Re-create?", default_choice=False):
try: try:
shutil.rmtree(target) shutil.rmtree(target)
create_python_venv(target) create_python_venv(target)
@@ -93,7 +93,7 @@ def update_python_pip(target: Path) -> None:
:param target: Path of the virtualenv :param target: Path of the virtualenv
:return: None :return: None
""" """
Logger.print_info("Updating pip ...") Logger.print_status("Updating pip ...")
try: try:
command = [f"{target}/bin/pip", "install", "-U", "pip"] command = [f"{target}/bin/pip", "install", "-U", "pip"]
result = subprocess.run(command, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
@@ -115,7 +115,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
:return: None :return: None
""" """
update_python_pip(target) update_python_pip(target)
Logger.print_info("Installing Python requirements ...") Logger.print_status("Installing Python requirements ...")
try: try:
command = [f"{target}/bin/pip", "install", "-r", f"{requirements}"] command = [f"{target}/bin/pip", "install", "-r", f"{requirements}"]
result = subprocess.run(command, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
@@ -150,7 +150,7 @@ def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
return return
if not silent: if not silent:
Logger.print_info("Updating package list...") Logger.print_status("Updating package list...")
try: try:
command = ["sudo", "apt-get", "update"] command = ["sudo", "apt-get", "update"]
@@ -193,11 +193,8 @@ def create_directory(_dir: Path) -> None:
""" """
try: try:
if not os.path.isdir(_dir): if not os.path.isdir(_dir):
Logger.print_info(f"Create directory: {_dir}")
os.makedirs(_dir, exist_ok=True) os.makedirs(_dir, exist_ok=True)
Logger.print_ok("Directory created!") Logger.print_ok(f"Created directory: {_dir}")
else:
Logger.print_info(f"Directory already exists: {_dir}\nSkip creation ...")
except OSError as e: except OSError as e:
Logger.print_error(f"Error creating folder: {e}") Logger.print_error(f"Error creating folder: {e}")
raise raise