refactor(KIAUH): use pathlib instead of os where possible. consistent use of pathlib.

Signed-off-by: Dominik Willner <th33xitus@gmail.com>
This commit is contained in:
dw-0
2023-12-25 00:59:00 +01:00
parent da533fdd67
commit 8aeb01aca0
23 changed files with 207 additions and 209 deletions

View File

@@ -9,9 +9,8 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
from os.path import join, dirname, abspath
from pathlib import Path from pathlib import Path
APPLICATION_ROOT = dirname(dirname(abspath(__file__))) APPLICATION_ROOT = Path(__file__).resolve().parent.parent
KIAUH_CFG = join(APPLICATION_ROOT, "kiauh.cfg") KIAUH_CFG = APPLICATION_ROOT.joinpath("kiauh.cfg")
KIAUH_BACKUP_DIR = f"{Path.home()}/kiauh-backups" KIAUH_BACKUP_DIR = Path.home().joinpath("kiauh-backups")

View File

@@ -47,7 +47,7 @@ class BackupManager:
filename = custom_filename if custom_filename is not None else filename filename = custom_filename if custom_filename is not None else filename
try: try:
Path(target).mkdir(exist_ok=True) Path(target).mkdir(exist_ok=True)
shutil.copyfile(file, Path(target, filename)) shutil.copyfile(file, target.joinpath(filename))
except OSError as e: except OSError as e:
Logger.print_error(f"Unable to backup '{file}':\n{e}") Logger.print_error(f"Unable to backup '{file}':\n{e}")
continue continue
@@ -64,7 +64,7 @@ class BackupManager:
Logger.print_status(log) Logger.print_status(log)
date = get_current_date().get("date") date = get_current_date().get("date")
time = get_current_date().get("time") time = get_current_date().get("time")
shutil.copytree(source, Path(target, f"{name}-{date}-{time}")) shutil.copytree(source, target.joinpath(f"{name}-{date}-{time}"))
except OSError as e: except OSError as e:
Logger.print_error(f"Unable to backup directory '{source}':\n{e}") Logger.print_error(f"Unable to backup directory '{source}':\n{e}")
return return

View File

@@ -18,11 +18,11 @@ from kiauh.utils.logger import Logger
# noinspection PyMethodMayBeStatic # noinspection PyMethodMayBeStatic
class ConfigManager: class ConfigManager:
def __init__(self, cfg_file: str): def __init__(self, cfg_file: Path):
self.config_file = cfg_file self.config_file = cfg_file
self.config = CustomConfigParser() self.config = CustomConfigParser()
if Path(cfg_file).is_file(): if cfg_file.is_file():
self.read_config() self.read_config()
def read_config(self) -> None: def read_config(self) -> None:

View File

@@ -14,7 +14,6 @@ from pathlib import Path
from typing import List, Union, Optional, Type, TypeVar from typing import List, Union, Optional, Type, TypeVar
from kiauh.utils.constants import SYSTEMD, CURRENT_USER from kiauh.utils.constants import SYSTEMD, CURRENT_USER
from kiauh.utils.filesystem_utils import create_directory
B = TypeVar(name="B", bound="BaseInstance", covariant=True) B = TypeVar(name="B", bound="BaseInstance", covariant=True)
@@ -32,13 +31,13 @@ class BaseInstance(ABC):
self._instance_type = instance_type self._instance_type = instance_type
self._suffix = suffix self._suffix = suffix
self._user = CURRENT_USER self._user = CURRENT_USER
self._data_dir_name = self.get_data_dir_from_suffix() self._data_dir_name = self.get_data_dir_name_from_suffix()
self._data_dir = f"{Path.home()}/{self._data_dir_name}_data" self._data_dir = Path.home().joinpath(f"{self._data_dir_name}_data")
self._cfg_dir = f"{self.data_dir}/config" self._cfg_dir = self.data_dir.joinpath("config")
self._log_dir = f"{self.data_dir}/logs" self._log_dir = self.data_dir.joinpath("logs")
self._comms_dir = f"{self.data_dir}/comms" self._comms_dir = self.data_dir.joinpath("comms")
self._sysd_dir = f"{self.data_dir}/systemd" self._sysd_dir = self.data_dir.joinpath("systemd")
self._gcodes_dir = f"{self.data_dir}/gcodes" self._gcodes_dir = self.data_dir.joinpath("gcodes")
@property @property
def instance_type(self) -> Type["BaseInstance"]: def instance_type(self) -> Type["BaseInstance"]:
@@ -73,51 +72,51 @@ class BaseInstance(ABC):
self._data_dir_name = value self._data_dir_name = value
@property @property
def data_dir(self): def data_dir(self) -> Path:
return self._data_dir return self._data_dir
@data_dir.setter @data_dir.setter
def data_dir(self, value: str): def data_dir(self, value: str) -> None:
self._data_dir = value self._data_dir = value
@property @property
def cfg_dir(self): def cfg_dir(self) -> Path:
return self._cfg_dir return self._cfg_dir
@cfg_dir.setter @cfg_dir.setter
def cfg_dir(self, value: str): def cfg_dir(self, value: str) -> None:
self._cfg_dir = value self._cfg_dir = value
@property @property
def log_dir(self): def log_dir(self) -> Path:
return self._log_dir return self._log_dir
@log_dir.setter @log_dir.setter
def log_dir(self, value: str): def log_dir(self, value: str) -> None:
self._log_dir = value self._log_dir = value
@property @property
def comms_dir(self): def comms_dir(self) -> Path:
return self._comms_dir return self._comms_dir
@comms_dir.setter @comms_dir.setter
def comms_dir(self, value: str): def comms_dir(self, value: str) -> None:
self._comms_dir = value self._comms_dir = value
@property @property
def sysd_dir(self): def sysd_dir(self) -> Path:
return self._sysd_dir return self._sysd_dir
@sysd_dir.setter @sysd_dir.setter
def sysd_dir(self, value: str): def sysd_dir(self, value: str) -> None:
self._sysd_dir = value self._sysd_dir = value
@property @property
def gcodes_dir(self): def gcodes_dir(self) -> Path:
return self._gcodes_dir return self._gcodes_dir
@gcodes_dir.setter @gcodes_dir.setter
def gcodes_dir(self, value: str): def gcodes_dir(self, value: str) -> None:
self._gcodes_dir = value self._gcodes_dir = value
@abstractmethod @abstractmethod
@@ -128,7 +127,7 @@ class BaseInstance(ABC):
def delete(self, del_remnants: bool) -> None: def delete(self, del_remnants: bool) -> None:
raise NotImplementedError("Subclasses must implement the delete method") raise NotImplementedError("Subclasses must implement the delete method")
def create_folders(self, add_dirs: List[str] = None) -> None: def create_folders(self, add_dirs: List[Path] = None) -> None:
dirs = [ dirs = [
self.data_dir, self.data_dir,
self.cfg_dir, self.cfg_dir,
@@ -141,7 +140,7 @@ class BaseInstance(ABC):
dirs.extend(add_dirs) dirs.extend(add_dirs)
for _dir in dirs: for _dir in dirs:
create_directory(Path(_dir)) _dir.mkdir(exist_ok=True)
def get_service_file_name(self, extension: bool = False) -> str: def get_service_file_name(self, extension: bool = False) -> str:
name = f"{self.__class__.__name__.lower()}" name = f"{self.__class__.__name__.lower()}"
@@ -150,10 +149,10 @@ class BaseInstance(ABC):
return name if not extension else f"{name}.service" return name if not extension else f"{name}.service"
def get_service_file_path(self) -> str: def get_service_file_path(self) -> Path:
return f"{SYSTEMD}/{self.get_service_file_name(extension=True)}" return SYSTEMD.joinpath(self.get_service_file_name(extension=True))
def get_data_dir_from_suffix(self) -> str: def get_data_dir_name_from_suffix(self) -> str:
if self._suffix is None: if self._suffix is None:
return "printer" return "printer"
elif self._suffix.isdigit(): elif self._suffix.isdigit():

View File

@@ -9,9 +9,9 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import re import re
import subprocess import subprocess
from pathlib import Path
from typing import List, Optional, Union, TypeVar from typing import List, Optional, Union, TypeVar
from kiauh.core.instance_manager.base_instance import BaseInstance from kiauh.core.instance_manager.base_instance import BaseInstance
@@ -194,9 +194,10 @@ class InstanceManager:
excluded = self.instance_type.blacklist() excluded = self.instance_type.blacklist()
service_list = [ service_list = [
os.path.join(SYSTEMD, service) Path(SYSTEMD, service)
for service in os.listdir(SYSTEMD) for service in SYSTEMD.iterdir()
if pattern.search(service) and not any(s in service for s in excluded) if pattern.search(service.name)
and not any(s in service.name for s in excluded)
] ]
instance_list = [ instance_list = [
@@ -206,8 +207,8 @@ class InstanceManager:
return instance_list return instance_list
def _get_instance_suffix(self, file_path: str) -> Union[str, None]: def _get_instance_suffix(self, file_path: Path) -> Union[str, None]:
full_name = file_path.split("/")[-1].split(".")[0] full_name = file_path.name.split(".")[0]
return full_name.split("-")[-1] if "-" in full_name else None return full_name.split("-")[-1] if "-" in full_name else None

View File

@@ -9,9 +9,9 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import shutil import shutil
import subprocess import subprocess
from pathlib import Path
from kiauh.utils.input_utils import get_confirm from kiauh.utils.input_utils import get_confirm
from kiauh.utils.logger import Logger from kiauh.utils.logger import Logger
@@ -66,7 +66,7 @@ class RepoManager:
log = f"Cloning repository from '{self.repo}' with method '{self.method}'" log = f"Cloning repository from '{self.repo}' with method '{self.method}'"
Logger.print_status(log) Logger.print_status(log)
try: try:
if os.path.exists(self.target_dir): if Path(self.target_dir).exists():
question = f"'{self.target_dir}' already exists. Overwrite?" question = f"'{self.target_dir}' already exists. Overwrite?"
if not get_confirm(question, default_choice=False): if not get_confirm(question, default_choice=False):
Logger.print_info("Skipping re-clone of repository.") Logger.print_info("Skipping re-clone of repository.")

View File

@@ -9,14 +9,13 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
from pathlib import Path from pathlib import Path
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) MODULE_PATH = Path(__file__).resolve().parent
KLIPPER_DIR = f"{Path.home()}/klipper" KLIPPER_DIR = Path.home().joinpath("klipper")
KLIPPER_ENV_DIR = f"{Path.home()}/klippy-env" KLIPPER_ENV_DIR = Path.home().joinpath("klippy-env")
KLIPPER_REQUIREMENTS_TXT = f"{KLIPPER_DIR}/scripts/klippy-requirements.txt" KLIPPER_REQUIREMENTS_TXT = KLIPPER_DIR.joinpath("scripts/klippy-requirements.txt")
DEFAULT_KLIPPER_REPO_URL = "https://github.com/Klipper3D/klipper" DEFAULT_KLIPPER_REPO_URL = "https://github.com/Klipper3D/klipper"
EXIT_KLIPPER_SETUP = "Exiting Klipper setup ..." EXIT_KLIPPER_SETUP = "Exiting Klipper setup ..."

View File

@@ -9,14 +9,13 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import List from typing import List, Union
from kiauh.core.instance_manager.base_instance import BaseInstance from kiauh.core.instance_manager.base_instance import BaseInstance
from kiauh.modules.klipper import KLIPPER_DIR, KLIPPER_ENV_DIR from kiauh.modules.klipper import KLIPPER_DIR, KLIPPER_ENV_DIR, MODULE_PATH
from kiauh.utils.constants import SYSTEMD from kiauh.utils.constants import SYSTEMD
from kiauh.utils.logger import Logger from kiauh.utils.logger import Logger
@@ -29,37 +28,36 @@ class Klipper(BaseInstance):
def __init__(self, suffix: str = None): def __init__(self, suffix: str = None):
super().__init__(instance_type=self, suffix=suffix) super().__init__(instance_type=self, suffix=suffix)
self.klipper_dir = KLIPPER_DIR self.klipper_dir: Path = KLIPPER_DIR
self.env_dir = KLIPPER_ENV_DIR self.env_dir: Path = KLIPPER_ENV_DIR
self._cfg_file = self._get_cfg() self._cfg_file = self._get_cfg()
self._log = f"{self.log_dir}/klippy.log" self._log = self.log_dir.joinpath("klippy.log")
self._serial = f"{self.comms_dir}/klippy.serial" self._serial = self.comms_dir.joinpath("klippy.serial")
self._uds = f"{self.comms_dir}/klippy.sock" self._uds = self.comms_dir.joinpath("klippy.sock")
@property @property
def cfg_file(self) -> str: def cfg_file(self) -> Path:
return self._cfg_file return self._cfg_file
@property @property
def log(self) -> str: def log(self) -> Path:
return self._log return self._log
@property @property
def serial(self) -> str: def serial(self) -> Path:
return self._serial return self._serial
@property @property
def uds(self) -> str: def uds(self) -> Path:
return self._uds return self._uds
def create(self) -> None: def create(self) -> None:
Logger.print_status("Creating new Klipper Instance ...") Logger.print_status("Creating new Klipper Instance ...")
module_path = os.path.dirname(os.path.abspath(__file__)) service_template_path = MODULE_PATH.joinpath("res/klipper.service")
service_template_path = os.path.join(module_path, "res", "klipper.service")
env_template_file_path = os.path.join(module_path, "res", "klipper.env")
service_file_name = self.get_service_file_name(extension=True) service_file_name = self.get_service_file_name(extension=True)
service_file_target = f"{SYSTEMD}/{service_file_name}" service_file_target = SYSTEMD.joinpath(service_file_name)
env_file_target = os.path.abspath(f"{self.sysd_dir}/klipper.env") env_template_file_path = MODULE_PATH.joinpath("res/klipper.env")
env_file_target = self.sysd_dir.joinpath("klipper.env")
try: try:
self.create_folders() self.create_folders()
@@ -95,8 +93,11 @@ class Klipper(BaseInstance):
self._delete_klipper_remnants() self._delete_klipper_remnants()
def write_service_file( def write_service_file(
self, service_template_path: str, service_file_target: str, env_file_target: str self,
): service_template_path: Path,
service_file_target: Path,
env_file_target: Path,
) -> None:
service_content = self._prep_service_file( service_content = self._prep_service_file(
service_template_path, env_file_target service_template_path, env_file_target
) )
@@ -109,7 +110,9 @@ class Klipper(BaseInstance):
) )
Logger.print_ok(f"Service file created: {service_file_target}") Logger.print_ok(f"Service file created: {service_file_target}")
def write_env_file(self, env_template_file_path: str, env_file_target: str): def write_env_file(
self, env_template_file_path: Path, env_file_target: Path
) -> None:
env_file_content = self._prep_env_file(env_template_file_path) env_file_content = self._prep_env_file(env_template_file_path)
with open(env_file_target, "w") as env_file: with open(env_file_target, "w") as env_file:
env_file.write(env_file_content) env_file.write(env_file_content)
@@ -129,7 +132,9 @@ class Klipper(BaseInstance):
Logger.print_ok("Directories successfully deleted.") Logger.print_ok("Directories successfully deleted.")
def _prep_service_file(self, service_template_path, env_file_path): def _prep_service_file(
self, service_template_path: Path, env_file_path: Path
) -> str:
try: try:
with open(service_template_path, "r") as template_file: with open(service_template_path, "r") as template_file:
template_content = template_file.read() template_content = template_file.read()
@@ -139,12 +144,14 @@ class Klipper(BaseInstance):
) )
raise raise
service_content = template_content.replace("%USER%", self.user) service_content = template_content.replace("%USER%", self.user)
service_content = service_content.replace("%KLIPPER_DIR%", self.klipper_dir) service_content = service_content.replace(
service_content = service_content.replace("%ENV%", self.env_dir) "%KLIPPER_DIR%", str(self.klipper_dir)
service_content = service_content.replace("%ENV_FILE%", env_file_path) )
service_content = service_content.replace("%ENV%", str(self.env_dir))
service_content = service_content.replace("%ENV_FILE%", str(env_file_path))
return service_content return service_content
def _prep_env_file(self, env_template_file_path): def _prep_env_file(self, env_template_file_path: Path) -> str:
try: try:
with open(env_template_file_path, "r") as env_file: with open(env_template_file_path, "r") as env_file:
env_template_file_content = env_file.read() env_template_file_content = env_file.read()
@@ -154,18 +161,18 @@ class Klipper(BaseInstance):
) )
raise raise
env_file_content = env_template_file_content.replace( env_file_content = env_template_file_content.replace(
"%KLIPPER_DIR%", self.klipper_dir "%KLIPPER_DIR%", str(self.klipper_dir)
) )
env_file_content = env_file_content.replace( env_file_content = env_file_content.replace(
"%CFG%", f"{self.cfg_dir}/printer.cfg" "%CFG%", f"{self.cfg_dir}/printer.cfg"
) )
env_file_content = env_file_content.replace("%SERIAL%", self._serial) env_file_content = env_file_content.replace("%SERIAL%", str(self.serial))
env_file_content = env_file_content.replace("%LOG%", self._log) env_file_content = env_file_content.replace("%LOG%", str(self.log))
env_file_content = env_file_content.replace("%UDS%", self._uds) env_file_content = env_file_content.replace("%UDS%", str(self.uds))
return env_file_content return env_file_content
def _get_cfg(self): def _get_cfg(self) -> Union[Path, None]:
cfg_file_loc = f"{self.cfg_dir}/printer.cfg" cfg_file_loc = self.cfg_dir.joinpath("printer.cfg")
if Path(cfg_file_loc).is_file(): if cfg_file_loc.is_file():
return cfg_file_loc return cfg_file_loc
return None return None

View File

@@ -9,7 +9,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import List, Union from typing import List, Union
@@ -163,13 +162,13 @@ def setup_klipper_prerequesites() -> None:
def install_klipper_packages(klipper_dir: Path) -> None: def install_klipper_packages(klipper_dir: Path) -> None:
script = Path(f"{klipper_dir}/scripts/install-debian.sh") script = klipper_dir.joinpath("scripts/install-debian.sh")
packages = parse_packages_from_file(script) packages = parse_packages_from_file(script)
packages = [pkg.replace("python-dev", "python3-dev") for pkg in packages] packages = [pkg.replace("python-dev", "python3-dev") for pkg in packages]
# Add dfu-util for octopi-images # Add dfu-util for octopi-images
packages.append("dfu-util") packages.append("dfu-util")
# Add dbus requirement for DietPi distro # Add dbus requirement for DietPi distro
if os.path.exists("/boot/dietpi/.version"): if Path("/boot/dietpi/.version").exists():
packages.append("dbus") packages.append("dbus")
update_system_package_lists(silent=False) update_system_package_lists(silent=False)

View File

@@ -15,6 +15,7 @@ import grp
import shutil import shutil
import subprocess import subprocess
import textwrap import textwrap
from pathlib import Path
from typing import List, Union, Literal, Dict from typing import List, Union, Literal, Dict
@@ -113,7 +114,7 @@ def handle_single_to_multi_conversion(
instance_manager.current_instance = Klipper(suffix=name) instance_manager.current_instance = Klipper(suffix=name)
new_data_dir_name = instance_manager.current_instance.data_dir new_data_dir_name = instance_manager.current_instance.data_dir
try: try:
os.rename(old_data_dir_name, new_data_dir_name) Path(old_data_dir_name).rename(new_data_dir_name)
return instance_manager.current_instance return instance_manager.current_instance
except OSError as e: except OSError as e:
log = f"Cannot rename {old_data_dir_name} to {new_data_dir_name}:\n{e}" log = f"Cannot rename {old_data_dir_name} to {new_data_dir_name}:\n{e}"
@@ -208,8 +209,8 @@ def create_example_printer_cfg(instance: Klipper) -> None:
Logger.print_info(f"printer.cfg in '{instance.cfg_dir}' already exists.") Logger.print_info(f"printer.cfg in '{instance.cfg_dir}' already exists.")
return return
source = os.path.join(MODULE_PATH, "res", "printer.cfg") source = MODULE_PATH.joinpath("res/printer.cfg")
target = os.path.join(instance.cfg_dir, "printer.cfg") target = instance.cfg_dir.joinpath("printer.cfg")
try: try:
shutil.copy(source, target) shutil.copy(source, target)
except OSError as e: except OSError as e:
@@ -217,6 +218,6 @@ def create_example_printer_cfg(instance: Klipper) -> None:
return return
cm = ConfigManager(target) cm = ConfigManager(target)
cm.set_value("virtual_sdcard", "path", instance.gcodes_dir) cm.set_value("virtual_sdcard", "path", str(instance.gcodes_dir))
cm.write_config() cm.write_config()
Logger.print_ok(f"Example printer.cfg created in '{instance.cfg_dir}'") Logger.print_ok(f"Example printer.cfg created in '{instance.cfg_dir}'")

View File

@@ -11,12 +11,10 @@
from pathlib import Path from pathlib import Path
import os MODULE_PATH = Path(__file__).resolve().parent
MAINSAIL_DIR = Path(Path.home(), "mainsail")
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) MAINSAIL_CONFIG_DIR = Path(Path.home(), "mainsail-config")
MAINSAIL_DIR = os.path.join(Path.home(), "mainsail") MAINSAIL_CONFIG_JSON = Path(MAINSAIL_DIR, "config.json")
MAINSAIL_CONFIG_DIR = os.path.join(Path.home(), "mainsail-config")
MAINSAIL_CONFIG_JSON = os.path.join(MAINSAIL_DIR, "config.json")
MAINSAIL_URL = ( MAINSAIL_URL = (
"https://github.com/mainsail-crew/mainsail/releases/latest/download/mainsail.zip" "https://github.com/mainsail-crew/mainsail/releases/latest/download/mainsail.zip"
) )

View File

@@ -13,6 +13,7 @@
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import List
from kiauh.core.config_manager.config_manager import ConfigManager from kiauh.core.config_manager.config_manager import ConfigManager
from kiauh.core.instance_manager.instance_manager import InstanceManager from kiauh.core.instance_manager.instance_manager import InstanceManager
@@ -64,8 +65,8 @@ def remove_mainsail_dir() -> None:
def remove_nginx_config() -> None: def remove_nginx_config() -> None:
Logger.print_status("Removing Mainsails NGINX config ...") Logger.print_status("Removing Mainsails NGINX config ...")
try: try:
remove_file(Path(NGINX_SITES_AVAILABLE, "mainsail"), True) remove_file(NGINX_SITES_AVAILABLE.joinpath("mainsail"), True)
remove_file(Path(NGINX_SITES_ENABLED, "mainsail"), True) remove_file(NGINX_SITES_ENABLED.joinpath("mainsail"), True)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
log = f"Unable to remove Mainsail NGINX config:\n{e.stderr.decode()}" log = f"Unable to remove Mainsail NGINX config:\n{e.stderr.decode()}"

View File

@@ -9,7 +9,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os.path
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import List from typing import List
@@ -72,7 +71,7 @@ def run_mainsail_installation() -> None:
else: else:
return return
is_mainsail_installed = Path(f"{Path.home()}/mainsail").exists() is_mainsail_installed = Path.home().joinpath("mainsail").exists()
do_reinstall = False do_reinstall = False
if is_mainsail_installed: if is_mainsail_installed:
print_mainsail_already_installed_dialog() print_mainsail_already_installed_dialog()
@@ -92,9 +91,9 @@ def run_mainsail_installation() -> None:
cm = ConfigManager(cfg_file=KIAUH_CFG) cm = ConfigManager(cfg_file=KIAUH_CFG)
default_port = cm.get_value("mainsail", "default_port") default_port = cm.get_value("mainsail", "default_port")
mainsail_port = default_port if default_port else 80 mainsail_port = default_port if default_port else "80"
if not default_port: if not default_port:
print_mainsail_port_select_dialog(f"{mainsail_port}") print_mainsail_port_select_dialog(mainsail_port)
mainsail_port = get_number_input( mainsail_port = get_number_input(
"Configure Mainsail for port", "Configure Mainsail for port",
min_count=mainsail_port, min_count=mainsail_port,
@@ -148,11 +147,11 @@ def run_mainsail_installation() -> None:
def download_mainsail() -> None: def download_mainsail() -> None:
try: try:
Logger.print_status("Downloading Mainsail ...") Logger.print_status("Downloading Mainsail ...")
download_file(MAINSAIL_URL, f"{Path.home()}", "mainsail.zip") download_file(MAINSAIL_URL, Path.home(), "mainsail.zip")
Logger.print_ok("Download complete!") Logger.print_ok("Download complete!")
Logger.print_status("Extracting mainsail.zip ...") Logger.print_status("Extracting mainsail.zip ...")
unzip(f"{Path.home()}/mainsail.zip", MAINSAIL_DIR) unzip(Path.home().joinpath("mainsail.zip"), MAINSAIL_DIR)
Logger.print_ok("OK!") Logger.print_ok("OK!")
except Exception: except Exception:
@@ -184,8 +183,8 @@ def create_mainsail_cfg_symlink(klipper_instances: List[Klipper]) -> None:
def create_mainsail_nginx_cfg(port: int) -> None: def create_mainsail_nginx_cfg(port: int) -> None:
root_dir = MAINSAIL_DIR root_dir = MAINSAIL_DIR
source = Path(NGINX_SITES_AVAILABLE, "mainsail") source = NGINX_SITES_AVAILABLE.joinpath("mainsail")
target = Path(NGINX_SITES_ENABLED, "mainsail") target = NGINX_SITES_ENABLED.joinpath("mainsail")
try: try:
Logger.print_status("Creating NGINX config for Mainsail ...") Logger.print_status("Creating NGINX config for Mainsail ...")
remove_file(Path("/etc/nginx/sites-enabled/default"), True) remove_file(Path("/etc/nginx/sites-enabled/default"), True)
@@ -217,7 +216,7 @@ def patch_moonraker_conf(
Logger.print_info("Section already exist. Skipped ...") Logger.print_info("Section already exist. Skipped ...")
return return
template = os.path.join(MODULE_PATH, "res", template_file) template = MODULE_PATH.joinpath("res", template_file)
with open(template, "r") as t: with open(template, "r") as t:
template_content = "\n" template_content = "\n"
template_content += t.read() template_content += t.read()

View File

@@ -10,7 +10,6 @@
# ======================================================================= # # ======================================================================= #
import json import json
import os
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import List from typing import List
@@ -26,9 +25,9 @@ from kiauh.utils.logger import Logger
def get_mainsail_status() -> str: def get_mainsail_status() -> str:
return get_install_status_webui( return get_install_status_webui(
MAINSAIL_DIR, MAINSAIL_DIR,
Path(NGINX_SITES_AVAILABLE, "mainsail"), NGINX_SITES_AVAILABLE.joinpath("mainsail"),
Path(NGINX_CONFD, "upstreams.conf"), NGINX_CONFD.joinpath("upstreams.conf"),
Path(NGINX_CONFD, "common_vars.conf"), NGINX_CONFD.joinpath("common_vars.conf"),
) )
@@ -36,7 +35,7 @@ def backup_config_json(is_temp=False) -> None:
Logger.print_status(f"Backup '{MAINSAIL_CONFIG_JSON}' ...") Logger.print_status(f"Backup '{MAINSAIL_CONFIG_JSON}' ...")
bm = BackupManager() bm = BackupManager()
if is_temp: if is_temp:
fn = Path(Path.home(), "config.json.kiauh.bak") fn = Path.home().joinpath("config.json.kiauh.bak")
bm.backup_file([MAINSAIL_CONFIG_JSON], custom_filename=fn) bm.backup_file([MAINSAIL_CONFIG_JSON], custom_filename=fn)
else: else:
bm.backup_file([MAINSAIL_CONFIG_JSON]) bm.backup_file([MAINSAIL_CONFIG_JSON])
@@ -45,7 +44,7 @@ def backup_config_json(is_temp=False) -> None:
def restore_config_json() -> None: def restore_config_json() -> None:
try: try:
Logger.print_status(f"Restore '{MAINSAIL_CONFIG_JSON}' ...") Logger.print_status(f"Restore '{MAINSAIL_CONFIG_JSON}' ...")
source = os.path.join(Path.home(), "config.json.kiauh.bak") source = Path.home().joinpath("config.json.kiauh.bak")
shutil.copy(source, MAINSAIL_CONFIG_JSON) shutil.copy(source, MAINSAIL_CONFIG_JSON)
except OSError: except OSError:
Logger.print_info("Unable to restore config.json. Skipped ...") Logger.print_info("Unable to restore config.json. Skipped ...")
@@ -71,10 +70,10 @@ def symlink_webui_nginx_log(klipper_instances: List[Klipper]) -> None:
error_log = Path("/var/log/nginx/mainsail-error.log") error_log = Path("/var/log/nginx/mainsail-error.log")
for instance in klipper_instances: for instance in klipper_instances:
desti_access = Path(instance.log_dir).joinpath("mainsail-access.log") desti_access = instance.log_dir.joinpath("mainsail-access.log")
if not desti_access.exists(): if not desti_access.exists():
desti_access.symlink_to(access_log) desti_access.symlink_to(access_log)
desti_error = Path(instance.log_dir).joinpath("mainsail-error.log") desti_error = instance.log_dir.joinpath("mainsail-error.log")
if not desti_error.exists(): if not desti_error.exists():
desti_error.symlink_to(error_log) desti_error.symlink_to(error_log)

View File

@@ -9,23 +9,24 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
from pathlib import Path from pathlib import Path
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) MODULE_PATH = Path(__file__).resolve().parent
MOONRAKER_DIR = f"{Path.home()}/moonraker" MOONRAKER_DIR = Path.home().joinpath("moonraker")
MOONRAKER_ENV_DIR = f"{Path.home()}/moonraker-env" MOONRAKER_ENV_DIR = Path.home().joinpath("moonraker-env")
MOONRAKER_REQUIREMENTS_TXT = f"{MOONRAKER_DIR}/scripts/moonraker-requirements.txt" MOONRAKER_REQUIREMENTS_TXT = MOONRAKER_DIR.joinpath(
"scripts/moonraker-requirements.txt"
)
DEFAULT_MOONRAKER_REPO_URL = "https://github.com/Arksine/moonraker" DEFAULT_MOONRAKER_REPO_URL = "https://github.com/Arksine/moonraker"
DEFAULT_MOONRAKER_PORT = 7125 DEFAULT_MOONRAKER_PORT = 7125
# introduced due to # introduced due to
# https://github.com/Arksine/moonraker/issues/349 # https://github.com/Arksine/moonraker/issues/349
# https://github.com/Arksine/moonraker/pull/346 # https://github.com/Arksine/moonraker/pull/346
POLKIT_LEGACY_FILE = "/etc/polkit-1/localauthority/50-local.d/10-moonraker.pkla" POLKIT_LEGACY_FILE = Path("/etc/polkit-1/localauthority/50-local.d/10-moonraker.pkla")
POLKIT_FILE = "/etc/polkit-1/rules.d/moonraker.rules" POLKIT_FILE = Path("/etc/polkit-1/rules.d/moonraker.rules")
POLKIT_USR_FILE = "/usr/share/polkit-1/rules.d/moonraker.rules" POLKIT_USR_FILE = Path("/usr/share/polkit-1/rules.d/moonraker.rules")
POLKIT_SCRIPT = f"{Path.home()}/moonraker/scripts/set-policykit-rules.sh" POLKIT_SCRIPT = Path.home().joinpath("moonraker/scripts/set-policykit-rules.sh")
EXIT_MOONRAKER_SETUP = "Exiting Moonraker setup ..." EXIT_MOONRAKER_SETUP = "Exiting Moonraker setup ..."

View File

@@ -9,7 +9,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -30,22 +29,22 @@ class Moonraker(BaseInstance):
def __init__(self, suffix: str = None): def __init__(self, suffix: str = None):
super().__init__(instance_type=self, suffix=suffix) super().__init__(instance_type=self, suffix=suffix)
self.moonraker_dir = MOONRAKER_DIR self.moonraker_dir: Path = MOONRAKER_DIR
self.env_dir = MOONRAKER_ENV_DIR self.env_dir: Path = MOONRAKER_ENV_DIR
self.cfg_file = self._get_cfg() self.cfg_file = self._get_cfg()
self.port = self._get_port() self.port = self._get_port()
self.backup_dir = f"{self.data_dir}/backup" self.backup_dir = self.data_dir.joinpath("backup")
self.certs_dir = f"{self.data_dir}/certs" self.certs_dir = self.data_dir.joinpath("certs")
self.db_dir = f"{self.data_dir}/database" self.db_dir = self.data_dir.joinpath("database")
self.log = f"{self.log_dir}/moonraker.log" self.log = self.log_dir.joinpath("moonraker.log")
def create(self, create_example_cfg: bool = False) -> None: def create(self, create_example_cfg: bool = False) -> None:
Logger.print_status("Creating new Moonraker Instance ...") Logger.print_status("Creating new Moonraker Instance ...")
service_template_path = os.path.join(MODULE_PATH, "res", "moonraker.service") service_template_path = MODULE_PATH.joinpath("res/moonraker.service")
env_template_file_path = os.path.join(MODULE_PATH, "res", "moonraker.env") env_template_file_path = MODULE_PATH.joinpath("res/moonraker.env")
service_file_name = self.get_service_file_name(extension=True) service_file_name = self.get_service_file_name(extension=True)
service_file_target = f"{SYSTEMD}/{service_file_name}" service_file_target = SYSTEMD.joinpath(service_file_name)
env_file_target = os.path.abspath(f"{self.sysd_dir}/moonraker.env") env_file_target = self.sysd_dir.joinpath("moonraker.env")
try: try:
self.create_folders([self.backup_dir, self.certs_dir, self.db_dir]) self.create_folders([self.backup_dir, self.certs_dir, self.db_dir])
@@ -81,8 +80,11 @@ class Moonraker(BaseInstance):
self._delete_moonraker_remnants() self._delete_moonraker_remnants()
def write_service_file( def write_service_file(
self, service_template_path: str, service_file_target: str, env_file_target: str self,
): service_template_path: Path,
service_file_target: Path,
env_file_target: Path,
) -> None:
service_content = self._prep_service_file( service_content = self._prep_service_file(
service_template_path, env_file_target service_template_path, env_file_target
) )
@@ -95,7 +97,9 @@ class Moonraker(BaseInstance):
) )
Logger.print_ok(f"Service file created: {service_file_target}") Logger.print_ok(f"Service file created: {service_file_target}")
def write_env_file(self, env_template_file_path: str, env_file_target: str): def write_env_file(
self, env_template_file_path: Path, env_file_target: Path
) -> None:
env_file_content = self._prep_env_file(env_template_file_path) env_file_content = self._prep_env_file(env_template_file_path)
with open(env_file_target, "w") as env_file: with open(env_file_target, "w") as env_file:
env_file.write(env_file_content) env_file.write(env_file_content)
@@ -104,9 +108,9 @@ class Moonraker(BaseInstance):
def _delete_moonraker_remnants(self) -> None: def _delete_moonraker_remnants(self) -> None:
try: try:
Logger.print_status(f"Delete {self.moonraker_dir} ...") Logger.print_status(f"Delete {self.moonraker_dir} ...")
shutil.rmtree(Path(self.moonraker_dir)) shutil.rmtree(self.moonraker_dir)
Logger.print_status(f"Delete {self.env_dir} ...") Logger.print_status(f"Delete {self.env_dir} ...")
shutil.rmtree(Path(self.env_dir)) shutil.rmtree(self.env_dir)
except FileNotFoundError: except FileNotFoundError:
Logger.print_status("Cannot delete Moonraker directories. Not found.") Logger.print_status("Cannot delete Moonraker directories. Not found.")
except PermissionError as e: except PermissionError as e:
@@ -115,7 +119,9 @@ class Moonraker(BaseInstance):
Logger.print_ok("Directories successfully deleted.") Logger.print_ok("Directories successfully deleted.")
def _prep_service_file(self, service_template_path, env_file_path): def _prep_service_file(
self, service_template_path: Path, env_file_path: Path
) -> str:
try: try:
with open(service_template_path, "r") as template_file: with open(service_template_path, "r") as template_file:
template_content = template_file.read() template_content = template_file.read()
@@ -125,12 +131,14 @@ class Moonraker(BaseInstance):
) )
raise raise
service_content = template_content.replace("%USER%", self.user) service_content = template_content.replace("%USER%", self.user)
service_content = service_content.replace("%MOONRAKER_DIR%", self.moonraker_dir) service_content = service_content.replace(
service_content = service_content.replace("%ENV%", self.env_dir) "%MOONRAKER_DIR%", str(self.moonraker_dir)
service_content = service_content.replace("%ENV_FILE%", env_file_path) )
service_content = service_content.replace("%ENV%", str(self.env_dir))
service_content = service_content.replace("%ENV_FILE%", str(env_file_path))
return service_content return service_content
def _prep_env_file(self, env_template_file_path): def _prep_env_file(self, env_template_file_path: Path) -> str:
try: try:
with open(env_template_file_path, "r") as env_file: with open(env_template_file_path, "r") as env_file:
env_template_file_content = env_file.read() env_template_file_content = env_file.read()
@@ -140,14 +148,16 @@ class Moonraker(BaseInstance):
) )
raise raise
env_file_content = env_template_file_content.replace( env_file_content = env_template_file_content.replace(
"%MOONRAKER_DIR%", self.moonraker_dir "%MOONRAKER_DIR%", str(self.moonraker_dir)
)
env_file_content = env_file_content.replace(
"%PRINTER_DATA%", str(self.data_dir)
) )
env_file_content = env_file_content.replace("%PRINTER_DATA%", self.data_dir)
return env_file_content return env_file_content
def _get_cfg(self): def _get_cfg(self) -> Union[Path, None]:
cfg_file_loc = f"{self.cfg_dir}/moonraker.conf" cfg_file_loc = self.cfg_dir.joinpath("moonraker.conf")
if Path(cfg_file_loc).is_file(): if cfg_file_loc.is_file():
return cfg_file_loc return cfg_file_loc
return None return None

View File

@@ -163,14 +163,13 @@ def setup_moonraker_prerequesites() -> None:
repo_manager.clone_repo() repo_manager.clone_repo()
# install moonraker dependencies and create python virtualenv # install moonraker dependencies and create python virtualenv
install_moonraker_packages(Path(MOONRAKER_DIR)) install_moonraker_packages(MOONRAKER_DIR)
create_python_venv(Path(MOONRAKER_ENV_DIR)) create_python_venv(MOONRAKER_ENV_DIR)
moonraker_py_req = Path(MOONRAKER_REQUIREMENTS_TXT) install_python_requirements(MOONRAKER_ENV_DIR, MOONRAKER_REQUIREMENTS_TXT)
install_python_requirements(Path(MOONRAKER_ENV_DIR), moonraker_py_req)
def install_moonraker_packages(moonraker_dir: Path) -> None: def install_moonraker_packages(moonraker_dir: Path) -> None:
script = Path(f"{moonraker_dir}/scripts/install-moonraker.sh") script = moonraker_dir.joinpath("scripts/install-moonraker.sh")
packages = parse_packages_from_file(script) packages = parse_packages_from_file(script)
update_system_package_lists(silent=False) update_system_package_lists(silent=False)
install_system_packages(packages) install_system_packages(packages)
@@ -179,9 +178,9 @@ def install_moonraker_packages(moonraker_dir: Path) -> None:
def install_moonraker_polkit() -> None: def install_moonraker_polkit() -> None:
Logger.print_status("Installing Moonraker policykit rules ...") Logger.print_status("Installing Moonraker policykit rules ...")
legacy_file_exists = check_file_exist(Path(POLKIT_LEGACY_FILE), True) legacy_file_exists = check_file_exist(POLKIT_LEGACY_FILE, True)
polkit_file_exists = check_file_exist(Path(POLKIT_FILE), True) polkit_file_exists = check_file_exist(POLKIT_FILE, True)
usr_file_exists = check_file_exist(Path(POLKIT_USR_FILE), True) usr_file_exists = check_file_exist(POLKIT_USR_FILE, True)
if legacy_file_exists or (polkit_file_exists and usr_file_exists): if legacy_file_exists or (polkit_file_exists and usr_file_exists):
Logger.print_info("Moonraker policykit rules are already installed.") Logger.print_info("Moonraker policykit rules are already installed.")
@@ -267,7 +266,7 @@ def remove_instances(
def remove_polkit_rules() -> None: def remove_polkit_rules() -> None:
Logger.print_status("Removing all Moonraker policykit rules ...") Logger.print_status("Removing all Moonraker policykit rules ...")
if not Path(MOONRAKER_DIR).exists(): if not MOONRAKER_DIR.exists():
log = "Cannot remove policykit rules. Moonraker directory not found." log = "Cannot remove policykit rules. Moonraker directory not found."
Logger.print_warn(log) Logger.print_warn(log)
return return

View File

@@ -9,7 +9,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import shutil import shutil
from typing import Dict, Literal from typing import Dict, Literal
@@ -45,8 +44,8 @@ def create_example_moonraker_conf(
Logger.print_info(f"moonraker.conf in '{instance.cfg_dir}' already exists.") Logger.print_info(f"moonraker.conf in '{instance.cfg_dir}' already exists.")
return return
source = os.path.join(MODULE_PATH, "res", "moonraker.conf") source = MODULE_PATH.joinpath("res/moonraker.conf")
target = os.path.join(instance.cfg_dir, "moonraker.conf") target = instance.cfg_dir.joinpath("moonraker.conf")
try: try:
shutil.copy(source, target) shutil.copy(source, target)
except OSError as e: except OSError as e:
@@ -72,14 +71,14 @@ def create_example_moonraker_conf(
ip = get_ipv4_addr().split(".")[:2] ip = get_ipv4_addr().split(".")[:2]
ip.extend(["0", "0/16"]) ip.extend(["0", "0/16"])
uds = f"{instance.comms_dir}/klippy.sock" uds = instance.comms_dir.joinpath("klippy.sock")
cm = ConfigManager(target) cm = ConfigManager(target)
trusted_clients = f"\n{'.'.join(ip)}" trusted_clients = f"\n{'.'.join(ip)}"
trusted_clients += cm.get_value("authorization", "trusted_clients") trusted_clients += cm.get_value("authorization", "trusted_clients")
cm.set_value("server", "port", str(port)) cm.set_value("server", "port", str(port))
cm.set_value("server", "klippy_uds_address", uds) cm.set_value("server", "klippy_uds_address", str(uds))
cm.set_value("authorization", "trusted_clients", trusted_clients) cm.set_value("authorization", "trusted_clients", trusted_clients)
cm.write_config() cm.write_config()

View File

@@ -8,12 +8,13 @@
# # # #
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) from pathlib import Path
MODULE_PATH = Path(__file__).resolve().parent
INVALID_CHOICE = "Invalid choice. Please select a valid value." INVALID_CHOICE = "Invalid choice. Please select a valid value."
# ================== NGINX =====================# # ================== NGINX =====================#
NGINX_SITES_AVAILABLE = "/etc/nginx/sites-available" NGINX_SITES_AVAILABLE = Path("/etc/nginx/sites-available")
NGINX_SITES_ENABLED = "/etc/nginx/sites-enabled" NGINX_SITES_ENABLED = Path("/etc/nginx/sites-enabled")
NGINX_CONFD = "/etc/nginx/conf.d" NGINX_CONFD = Path("/etc/nginx/conf.d")

View File

@@ -56,7 +56,7 @@ def check_install_dependencies(deps: List[str]) -> None:
install_system_packages(requirements) install_system_packages(requirements)
def get_repo_name(repo_dir: str) -> str: def get_repo_name(repo_dir: Path) -> str:
""" """
Helper method to extract the organisation and name of a repository | Helper method to extract the organisation and name of a repository |
:param repo_dir: repository to extract the values from :param repo_dir: repository to extract the values from
@@ -72,7 +72,7 @@ def get_repo_name(repo_dir: str) -> str:
def get_install_status_common( def get_install_status_common(
instance_type: Type[BaseInstance], repo_dir: str, env_dir: str instance_type: Type[BaseInstance], repo_dir: Path, env_dir: Path
) -> str: ) -> str:
""" """
Helper method to get the installation status of software components, Helper method to get the installation status of software components,
@@ -85,10 +85,8 @@ def get_install_status_common(
:return: formatted string, containing the status :return: formatted string, containing the status
""" """
im = InstanceManager(instance_type) im = InstanceManager(instance_type)
dir_exist = Path(repo_dir).exists()
env_dir_exist = Path(env_dir).exists()
instances_exist = len(im.instances) > 0 instances_exist = len(im.instances) > 0
status = [dir_exist, env_dir_exist, instances_exist] status = [repo_dir.exists(), env_dir.exists(), instances_exist]
if all(status): if all(status):
return f"{COLOR_GREEN}Installed: {len(im.instances)}{RESET_FORMAT}" return f"{COLOR_GREEN}Installed: {len(im.instances)}{RESET_FORMAT}"
elif not any(status): elif not any(status):
@@ -109,7 +107,7 @@ def get_install_status_webui(
:param common_cfg: the required common_vars.conf :param common_cfg: the required common_vars.conf
:return: formatted string, containing the status :return: formatted string, containing the status
""" """
dir_exist = Path(install_dir).exists() dir_exist = install_dir.exists()
nginx_cfg_exist = check_file_exist(nginx_cfg) nginx_cfg_exist = check_file_exist(nginx_cfg)
upstreams_cfg_exist = check_file_exist(upstreams_cfg) upstreams_cfg_exist = check_file_exist(upstreams_cfg)
common_cfg_exist = check_file_exist(common_cfg) common_cfg_exist = check_file_exist(common_cfg)

View File

@@ -11,6 +11,7 @@
import os import os
import pwd import pwd
from pathlib import Path
# text colors and formats # text colors and formats
COLOR_WHITE = "\033[37m" # white COLOR_WHITE = "\033[37m" # white
@@ -22,4 +23,4 @@ COLOR_CYAN = "\033[96m" # bright cyan
RESET_FORMAT = "\033[0m" # reset format RESET_FORMAT = "\033[0m" # reset format
# current user # current user
CURRENT_USER = pwd.getpwuid(os.getuid())[0] CURRENT_USER = pwd.getpwuid(os.getuid())[0]
SYSTEMD = "/etc/systemd/system" SYSTEMD = Path("/etc/systemd/system")

View File

@@ -8,7 +8,6 @@
# This file may be distributed under the terms of the GNU GPLv3 license # # This file may be distributed under the terms of the GNU GPLv3 license #
# ======================================================================= # # ======================================================================= #
import os
import shutil import shutil
import subprocess import subprocess
from pathlib import Path from pathlib import Path
@@ -37,27 +36,12 @@ def check_file_exist(file_path: Path, sudo=False) -> bool:
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return False return False
else: else:
if Path(file_path).exists(): if file_path.exists():
return True return True
else: else:
return False return False
def create_directory(_dir: Path) -> None:
"""
Helper function for creating a directory or skipping if it already exists |
:param _dir: the directory to create
:return: None
"""
try:
if not os.path.isdir(_dir):
os.makedirs(_dir, exist_ok=True)
Logger.print_ok(f"Created directory: {_dir}")
except OSError as e:
Logger.print_error(f"Error creating folder: {e}")
raise
def create_symlink(source: Path, target: Path, sudo=False) -> None: def create_symlink(source: Path, target: Path, sudo=False) -> None:
try: try:
cmd = ["ln", "-sf", source, target] cmd = ["ln", "-sf", source, target]
@@ -79,14 +63,14 @@ def remove_file(file_path: Path, sudo=False) -> None:
raise raise
def unzip(file: str, target_dir: str) -> None: def unzip(filepath: Path, target_dir: Path) -> None:
""" """
Helper function to unzip a zip-archive into a target directory | Helper function to unzip a zip-archive into a target directory |
:param file: the zip-file to unzip :param filepath: the path to the zip-file to unzip
:param target_dir: the target directory to extract the files into :param target_dir: the target directory to extract the files into
:return: None :return: None
""" """
with ZipFile(file, "r") as _zip: with ZipFile(filepath, "r") as _zip:
_zip.extractall(target_dir) _zip.extractall(target_dir)
@@ -95,8 +79,8 @@ def copy_upstream_nginx_cfg() -> None:
Creates an upstream.conf in /etc/nginx/conf.d Creates an upstream.conf in /etc/nginx/conf.d
:return: None :return: None
""" """
source = os.path.join(MODULE_PATH, "res", "upstreams.conf") source = MODULE_PATH.joinpath("res/upstreams.conf")
target = os.path.join(NGINX_CONFD, "upstreams.conf") target = NGINX_CONFD.joinpath("upstreams.conf")
try: try:
command = ["sudo", "cp", source, target] command = ["sudo", "cp", source, target]
subprocess.run(command, stderr=subprocess.PIPE, check=True) subprocess.run(command, stderr=subprocess.PIPE, check=True)
@@ -111,8 +95,8 @@ def copy_common_vars_nginx_cfg() -> None:
Creates a common_vars.conf in /etc/nginx/conf.d Creates a common_vars.conf in /etc/nginx/conf.d
:return: None :return: None
""" """
source = os.path.join(MODULE_PATH, "res", "common_vars.conf") source = MODULE_PATH.joinpath("res/common_vars.conf")
target = os.path.join(NGINX_CONFD, "common_vars.conf") target = NGINX_CONFD.joinpath("common_vars.conf")
try: try:
command = ["sudo", "cp", source, target] command = ["sudo", "cp", source, target]
subprocess.run(command, stderr=subprocess.PIPE, check=True) subprocess.run(command, stderr=subprocess.PIPE, check=True)
@@ -122,7 +106,7 @@ def copy_common_vars_nginx_cfg() -> None:
raise raise
def create_nginx_cfg(name: str, port: int, root_dir: str) -> None: def create_nginx_cfg(name: str, port: int, root_dir: Path) -> None:
""" """
Creates an NGINX config from a template file and replaces all placeholders Creates an NGINX config from a template file and replaces all placeholders
:param name: name of the config to create :param name: name of the config to create
@@ -130,18 +114,18 @@ def create_nginx_cfg(name: str, port: int, root_dir: str) -> None:
:param root_dir: directory of the static files :param root_dir: directory of the static files
:return: None :return: None
""" """
tmp = f"{Path.home()}/{name}.tmp" tmp = Path.home().joinpath(f"{name}.tmp")
shutil.copy(os.path.join(MODULE_PATH, "res", "nginx_cfg"), tmp) shutil.copy(MODULE_PATH.joinpath("res/nginx_cfg"), tmp)
with open(tmp, "r+") as f: with open(tmp, "r+") as f:
content = f.read() content = f.read()
content = content.replace("%NAME%", name) content = content.replace("%NAME%", name)
content = content.replace("%PORT%", str(port)) content = content.replace("%PORT%", str(port))
content = content.replace("%ROOT_DIR%", root_dir) content = content.replace("%ROOT_DIR%", str(root_dir))
f.seek(0) f.seek(0)
f.write(content) f.write(content)
f.truncate() f.truncate()
target = os.path.join(NGINX_SITES_AVAILABLE, name) target = NGINX_SITES_AVAILABLE.joinpath(name)
try: try:
command = ["sudo", "mv", tmp, target] command = ["sudo", "mv", tmp, target]
subprocess.run(command, stderr=subprocess.PIPE, check=True) subprocess.run(command, stderr=subprocess.PIPE, check=True)

View File

@@ -98,7 +98,7 @@ def update_python_pip(target: Path) -> None:
""" """
Logger.print_status("Updating pip ...") Logger.print_status("Updating pip ...")
try: try:
command = [f"{target}/bin/pip", "install", "-U", "pip"] command = [target.joinpath("bin/pip"), "install", "-U", "pip"]
result = subprocess.run(command, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
if result.returncode != 0 or result.stderr: if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False) Logger.print_error(f"{result.stderr}", False)
@@ -120,7 +120,7 @@ def install_python_requirements(target: Path, requirements: Path) -> None:
update_python_pip(target) update_python_pip(target)
Logger.print_status("Installing Python requirements ...") Logger.print_status("Installing Python requirements ...")
try: try:
command = [f"{target}/bin/pip", "install", "-r", f"{requirements}"] command = [target.joinpath("bin/pip"), "install", "-r", f"{requirements}"]
result = subprocess.run(command, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stderr=subprocess.PIPE, text=True)
if result.returncode != 0 or result.stderr: if result.returncode != 0 or result.stderr:
Logger.print_error(f"{result.stderr}", False) Logger.print_error(f"{result.stderr}", False)
@@ -141,9 +141,12 @@ def update_system_package_lists(silent: bool, rls_info_change=False) -> None:
:return: None :return: None
""" """
cache_mtime = 0 cache_mtime = 0
cache_files = ["/var/lib/apt/periodic/update-success-stamp", "/var/lib/apt/lists"] cache_files = [
Path("/var/lib/apt/periodic/update-success-stamp"),
Path("/var/lib/apt/lists"),
]
for cache_file in cache_files: for cache_file in cache_files:
if Path(cache_file).exists(): if cache_file.exists():
cache_mtime = max(cache_mtime, os.path.getmtime(cache_file)) cache_mtime = max(cache_mtime, os.path.getmtime(cache_file))
update_age = int(time.time() - cache_mtime) update_age = int(time.time() - cache_mtime)
@@ -244,7 +247,7 @@ def get_ipv4_addr() -> str:
def download_file( def download_file(
url: str, target_folder: str, target_name: str, show_progress=True url: str, target_folder: Path, target_name: str, show_progress=True
) -> None: ) -> None:
""" """
Helper method for downloading files from a provided URL | Helper method for downloading files from a provided URL |
@@ -254,7 +257,7 @@ def download_file(
:param show_progress: show download progress or not :param show_progress: show download progress or not
:return: None :return: None
""" """
target_path = os.path.join(target_folder, target_name) target_path = target_folder.joinpath(target_name)
try: try:
if show_progress: if show_progress:
urllib.request.urlretrieve(url, target_path, download_progress) urllib.request.urlretrieve(url, target_path, download_progress)
@@ -298,8 +301,8 @@ def set_nginx_permissions() -> None:
This seems to have become necessary with Ubuntu 21+. | This seems to have become necessary with Ubuntu 21+. |
:return: None :return: None
""" """
cmd1 = f"ls -ld {Path.home()} | cut -d' ' -f1" cmd = f"ls -ld {Path.home()} | cut -d' ' -f1"
homedir_perm = subprocess.run(cmd1, shell=True, stdout=subprocess.PIPE, text=True) homedir_perm = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, text=True)
homedir_perm = homedir_perm.stdout homedir_perm = homedir_perm.stdout
if homedir_perm.count("x") < 3: if homedir_perm.count("x") < 3:
@@ -321,7 +324,7 @@ def control_systemd_service(
Logger.print_status(f"{action.capitalize()} {name}.service ...") Logger.print_status(f"{action.capitalize()} {name}.service ...")
command = ["sudo", "systemctl", action, f"{name}.service"] command = ["sudo", "systemctl", action, f"{name}.service"]
subprocess.run(command, stderr=subprocess.PIPE, check=True) subprocess.run(command, stderr=subprocess.PIPE, check=True)
Logger.print_ok(f"OK!") Logger.print_ok("OK!")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
log = f"Failed to {action} {name}.service: {e.stderr.decode()}" log = f"Failed to {action} {name}.service: {e.stderr.decode()}"
Logger.print_error(log) Logger.print_error(log)