configfile: Split configfile code into three separate classes

Separate out the low-level parsing code to a new ConfigFileReader()
class.

Separate out the auto-save handling code to a new ConfigAutoSave()
class.

This simplifies the main PrinterConfig() class.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2024-09-25 23:34:59 -04:00
parent faa89be816
commit 9adb313ee8

View File

@@ -1,12 +1,17 @@
# Code for reading and writing the Klipper config file # Code for reading and writing the Klipper config file
# #
# Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net> # Copyright (C) 2016-2024 Kevin O'Connor <kevin@koconnor.net>
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
import sys, os, glob, re, time, logging, configparser, io import sys, os, glob, re, time, logging, configparser, io
error = configparser.Error error = configparser.Error
######################################################################
# Config section parsing helper
######################################################################
class sentinel: class sentinel:
pass pass
@@ -134,30 +139,13 @@ class ConfigWrapper:
pconfig = self.printer.lookup_object("configfile") pconfig = self.printer.lookup_object("configfile")
pconfig.deprecate(self.section, option, value, msg) pconfig.deprecate(self.section, option, value, msg)
AUTOSAVE_HEADER = """
#*# <---------------------- SAVE_CONFIG ---------------------->
#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated.
#*#
"""
class PrinterConfig: ######################################################################
def __init__(self, printer): # Config file parsing (with include file support)
self.printer = printer ######################################################################
self.autosave = None
self.deprecated = {} class ConfigFileReader:
self.runtime_warnings = [] def read_config_file(self, filename):
self.deprecate_warnings = []
self.status_raw_config = {}
self.status_save_pending = {}
self.status_settings = {}
self.status_warnings = []
self.save_config_pending = False
gcode = self.printer.lookup_object('gcode')
gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG,
desc=self.cmd_SAVE_CONFIG_help)
def get_printer(self):
return self.printer
def _read_config_file(self, filename):
try: try:
f = open(filename, 'r') f = open(filename, 'r')
data = f.read() data = f.read()
@@ -167,53 +155,6 @@ class PrinterConfig:
logging.exception(msg) logging.exception(msg)
raise error(msg) raise error(msg)
return data.replace('\r\n', '\n') return data.replace('\r\n', '\n')
def _find_autosave_data(self, data):
regular_data = data
autosave_data = ""
pos = data.find(AUTOSAVE_HEADER)
if pos >= 0:
regular_data = data[:pos]
autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip()
# Check for errors and strip line prefixes
if "\n#*# " in regular_data:
logging.warning("Can't read autosave from config file"
" - autosave state corrupted")
return data, ""
out = [""]
for line in autosave_data.split('\n'):
if ((not line.startswith("#*#")
or (len(line) >= 4 and not line.startswith("#*# ")))
and autosave_data):
logging.warning("Can't read autosave from config file"
" - modifications after header")
return data, ""
out.append(line[4:])
out.append("")
return regular_data, "\n".join(out)
comment_r = re.compile('[#;].*$')
value_r = re.compile('[^A-Za-z0-9_].*$')
def _strip_duplicates(self, data, config):
# Comment out fields in 'data' that are defined in 'config'
lines = data.split('\n')
section = None
is_dup_field = False
for lineno, line in enumerate(lines):
pruned_line = self.comment_r.sub('', line).rstrip()
if not pruned_line:
continue
if pruned_line[0].isspace():
if is_dup_field:
lines[lineno] = '#' + lines[lineno]
continue
is_dup_field = False
if pruned_line[0] == '[':
section = pruned_line[1:-1].strip()
continue
field = self.value_r.sub('', pruned_line)
if config.fileconfig.has_option(section, field):
is_dup_field = True
lines[lineno] = '#' + lines[lineno]
return "\n".join(lines)
def _parse_config_buffer(self, buffer, filename, fileconfig): def _parse_config_buffer(self, buffer, filename, fileconfig):
if not buffer: if not buffer:
return return
@@ -235,7 +176,7 @@ class PrinterConfig:
raise error("Include file '%s' does not exist" % (include_glob,)) raise error("Include file '%s' does not exist" % (include_glob,))
include_filenames.sort() include_filenames.sort()
for include_filename in include_filenames: for include_filename in include_filenames:
include_data = self._read_config_file(include_filename) include_data = self.read_config_file(include_filename)
self._parse_config(include_data, include_filename, fileconfig, self._parse_config(include_data, include_filename, fileconfig,
visited) visited)
return include_filenames return include_filenames
@@ -265,30 +206,209 @@ class PrinterConfig:
buffer.append(line) buffer.append(line)
self._parse_config_buffer(buffer, filename, fileconfig) self._parse_config_buffer(buffer, filename, fileconfig)
visited.remove(path) visited.remove(path)
def _build_config_wrapper(self, data, filename): def build_fileconfig(self, data, filename):
if sys.version_info.major >= 3: if sys.version_info.major >= 3:
fileconfig = configparser.RawConfigParser( fileconfig = configparser.RawConfigParser(
strict=False, inline_comment_prefixes=(';', '#')) strict=False, inline_comment_prefixes=(';', '#'))
else: else:
fileconfig = configparser.RawConfigParser() fileconfig = configparser.RawConfigParser()
self._parse_config(data, filename, fileconfig, set()) self._parse_config(data, filename, fileconfig, set())
return ConfigWrapper(self.printer, fileconfig, {}, 'printer') return fileconfig
def _build_config_string(self, config): def build_config_string(self, fileconfig):
sfile = io.StringIO() sfile = io.StringIO()
config.fileconfig.write(sfile) fileconfig.write(sfile)
return sfile.getvalue().strip() return sfile.getvalue().strip()
def read_config(self, filename):
return self._build_config_wrapper(self._read_config_file(filename),
filename) ######################################################################
def read_main_config(self): # Config auto save helper
######################################################################
AUTOSAVE_HEADER = """
#*# <---------------------- SAVE_CONFIG ---------------------->
#*# DO NOT EDIT THIS BLOCK OR BELOW. The contents are auto-generated.
#*#
"""
class ConfigAutoSave:
def __init__(self, printer):
self.printer = printer
self.fileconfig = None
self.status_save_pending = {}
self.save_config_pending = False
gcode = self.printer.lookup_object('gcode')
gcode.register_command("SAVE_CONFIG", self.cmd_SAVE_CONFIG,
desc=self.cmd_SAVE_CONFIG_help)
def _find_autosave_data(self, data):
regular_data = data
autosave_data = ""
pos = data.find(AUTOSAVE_HEADER)
if pos >= 0:
regular_data = data[:pos]
autosave_data = data[pos + len(AUTOSAVE_HEADER):].strip()
# Check for errors and strip line prefixes
if "\n#*# " in regular_data:
logging.warning("Can't read autosave from config file"
" - autosave state corrupted")
return data, ""
out = [""]
for line in autosave_data.split('\n'):
if ((not line.startswith("#*#")
or (len(line) >= 4 and not line.startswith("#*# ")))
and autosave_data):
logging.warning("Can't read autosave from config file"
" - modifications after header")
return data, ""
out.append(line[4:])
out.append("")
return regular_data, "\n".join(out)
comment_r = re.compile('[#;].*$')
value_r = re.compile('[^A-Za-z0-9_].*$')
def _strip_duplicates(self, data, fileconfig):
# Comment out fields in 'data' that are defined in 'config'
lines = data.split('\n')
section = None
is_dup_field = False
for lineno, line in enumerate(lines):
pruned_line = self.comment_r.sub('', line).rstrip()
if not pruned_line:
continue
if pruned_line[0].isspace():
if is_dup_field:
lines[lineno] = '#' + lines[lineno]
continue
is_dup_field = False
if pruned_line[0] == '[':
section = pruned_line[1:-1].strip()
continue
field = self.value_r.sub('', pruned_line)
if fileconfig.has_option(section, field):
is_dup_field = True
lines[lineno] = '#' + lines[lineno]
return "\n".join(lines)
def load_main_config(self):
filename = self.printer.get_start_args()['config_file'] filename = self.printer.get_start_args()['config_file']
data = self._read_config_file(filename) cfgrdr = ConfigFileReader()
data = cfgrdr.read_config_file(filename)
regular_data, autosave_data = self._find_autosave_data(data) regular_data, autosave_data = self._find_autosave_data(data)
regular_config = self._build_config_wrapper(regular_data, filename) regular_fileconfig = cfgrdr.build_fileconfig(regular_data, filename)
autosave_data = self._strip_duplicates(autosave_data, regular_config) autosave_data = self._strip_duplicates(autosave_data,
self.autosave = self._build_config_wrapper(autosave_data, filename) regular_fileconfig)
cfg = self._build_config_wrapper(regular_data + autosave_data, filename) self.fileconfig = cfgrdr.build_fileconfig(autosave_data, filename)
return cfg return cfgrdr.build_fileconfig(regular_data + autosave_data, filename)
def get_status(self, eventtime):
return {'save_config_pending': self.save_config_pending,
'save_config_pending_items': self.status_save_pending}
def set(self, section, option, value):
if not self.fileconfig.has_section(section):
self.fileconfig.add_section(section)
svalue = str(value)
self.fileconfig.set(section, option, svalue)
pending = dict(self.status_save_pending)
if not section in pending or pending[section] is None:
pending[section] = {}
else:
pending[section] = dict(pending[section])
pending[section][option] = svalue
self.status_save_pending = pending
self.save_config_pending = True
logging.info("save_config: set [%s] %s = %s", section, option, svalue)
def remove_section(self, section):
if self.fileconfig.has_section(section):
self.fileconfig.remove_section(section)
pending = dict(self.status_save_pending)
pending[section] = None
self.status_save_pending = pending
self.save_config_pending = True
elif (section in self.status_save_pending and
self.status_save_pending[section] is not None):
pending = dict(self.status_save_pending)
del pending[section]
self.status_save_pending = pending
self.save_config_pending = True
def _disallow_include_conflicts(self, regular_data, cfgname, gcode):
cfgrdr = ConfigFileReader()
regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname)
for section in self.fileconfig.sections():
for option in self.fileconfig.options(section):
if regular_fileconfig.has_option(section, option):
msg = ("SAVE_CONFIG section '%s' option '%s' conflicts "
"with included value" % (section, option))
raise gcode.error(msg)
cmd_SAVE_CONFIG_help = "Overwrite config file and restart"
def cmd_SAVE_CONFIG(self, gcmd):
if not self.fileconfig.sections():
return
gcode = self.printer.lookup_object('gcode')
# Create string containing autosave data
cfgrdr = ConfigFileReader()
autosave_data = cfgrdr.build_config_string(self.fileconfig)
lines = [('#*# ' + l).strip()
for l in autosave_data.split('\n')]
lines.insert(0, "\n" + AUTOSAVE_HEADER.rstrip())
lines.append("")
autosave_data = '\n'.join(lines)
# Read in and validate current config file
cfgname = self.printer.get_start_args()['config_file']
try:
data = cfgrdr.read_config_file(cfgname)
regular_data, old_autosave_data = self._find_autosave_data(data)
regular_fileconfig = cfgrdr.build_fileconfig(regular_data, cfgname)
except error as e:
msg = "Unable to parse existing config on SAVE_CONFIG"
logging.exception(msg)
raise gcode.error(msg)
regular_data = self._strip_duplicates(regular_data, self.fileconfig)
self._disallow_include_conflicts(regular_data, cfgname, gcode)
data = regular_data.rstrip() + autosave_data
# Determine filenames
datestr = time.strftime("-%Y%m%d_%H%M%S")
backup_name = cfgname + datestr
temp_name = cfgname + "_autosave"
if cfgname.endswith(".cfg"):
backup_name = cfgname[:-4] + datestr + ".cfg"
temp_name = cfgname[:-4] + "_autosave.cfg"
# Create new config file with temporary name and swap with main config
logging.info("SAVE_CONFIG to '%s' (backup in '%s')",
cfgname, backup_name)
try:
f = open(temp_name, 'w')
f.write(data)
f.close()
os.rename(cfgname, backup_name)
os.rename(temp_name, cfgname)
except:
msg = "Unable to write config file during SAVE_CONFIG"
logging.exception(msg)
raise gcode.error(msg)
# Request a restart
gcode.request_restart('restart')
######################################################################
# Main printer config tracking
######################################################################
class PrinterConfig:
def __init__(self, printer):
self.printer = printer
self.autosave = ConfigAutoSave(printer)
self.deprecated = {}
self.runtime_warnings = []
self.deprecate_warnings = []
self.status_raw_config = {}
self.status_settings = {}
self.status_warnings = []
def get_printer(self):
return self.printer
def read_config(self, filename):
cfgrdr = ConfigFileReader()
data = cfgrdr.read_config_file(filename)
fileconfig = cfgrdr.build_fileconfig(data, filename)
return ConfigWrapper(self.printer, fileconfig, {}, 'printer')
def read_main_config(self):
fileconfig = self.autosave.load_main_config()
return ConfigWrapper(self.printer, fileconfig, {}, 'printer')
def check_unused_options(self, config): def check_unused_options(self, config):
fileconfig = config.fileconfig fileconfig = config.fileconfig
objects = dict(self.printer.lookup_objects()) objects = dict(self.printer.lookup_objects())
@@ -312,8 +432,9 @@ class PrinterConfig:
# Setup get_status() # Setup get_status()
self._build_status(config) self._build_status(config)
def log_config(self, config): def log_config(self, config):
cfgrdr = ConfigFileReader()
lines = ["===== Config file =====", lines = ["===== Config file =====",
self._build_config_string(config), cfgrdr.build_config_string(config.fileconfig),
"======================="] "======================="]
self.printer.set_rollover_info("config", "\n".join(lines)) self.printer.set_rollover_info("config", "\n".join(lines))
# Status reporting # Status reporting
@@ -345,91 +466,13 @@ class PrinterConfig:
self.deprecate_warnings.append(res) self.deprecate_warnings.append(res)
self.status_warnings = self.runtime_warnings + self.deprecate_warnings self.status_warnings = self.runtime_warnings + self.deprecate_warnings
def get_status(self, eventtime): def get_status(self, eventtime):
return {'config': self.status_raw_config, status = {'config': self.status_raw_config,
'settings': self.status_settings, 'settings': self.status_settings,
'warnings': self.status_warnings, 'warnings': self.status_warnings}
'save_config_pending': self.save_config_pending, status.update(self.autosave.get_status(eventtime))
'save_config_pending_items': self.status_save_pending} return status
# Autosave functions # Autosave functions
def set(self, section, option, value): def set(self, section, option, value):
if not self.autosave.fileconfig.has_section(section): self.autosave.set(section, option, value)
self.autosave.fileconfig.add_section(section)
svalue = str(value)
self.autosave.fileconfig.set(section, option, svalue)
pending = dict(self.status_save_pending)
if not section in pending or pending[section] is None:
pending[section] = {}
else:
pending[section] = dict(pending[section])
pending[section][option] = svalue
self.status_save_pending = pending
self.save_config_pending = True
logging.info("save_config: set [%s] %s = %s", section, option, svalue)
def remove_section(self, section): def remove_section(self, section):
if self.autosave.fileconfig.has_section(section): self.autosave.remove_section(section)
self.autosave.fileconfig.remove_section(section)
pending = dict(self.status_save_pending)
pending[section] = None
self.status_save_pending = pending
self.save_config_pending = True
elif (section in self.status_save_pending and
self.status_save_pending[section] is not None):
pending = dict(self.status_save_pending)
del pending[section]
self.status_save_pending = pending
self.save_config_pending = True
def _disallow_include_conflicts(self, regular_data, cfgname, gcode):
config = self._build_config_wrapper(regular_data, cfgname)
for section in self.autosave.fileconfig.sections():
for option in self.autosave.fileconfig.options(section):
if config.fileconfig.has_option(section, option):
msg = ("SAVE_CONFIG section '%s' option '%s' conflicts "
"with included value" % (section, option))
raise gcode.error(msg)
cmd_SAVE_CONFIG_help = "Overwrite config file and restart"
def cmd_SAVE_CONFIG(self, gcmd):
if not self.autosave.fileconfig.sections():
return
gcode = self.printer.lookup_object('gcode')
# Create string containing autosave data
autosave_data = self._build_config_string(self.autosave)
lines = [('#*# ' + l).strip()
for l in autosave_data.split('\n')]
lines.insert(0, "\n" + AUTOSAVE_HEADER.rstrip())
lines.append("")
autosave_data = '\n'.join(lines)
# Read in and validate current config file
cfgname = self.printer.get_start_args()['config_file']
try:
data = self._read_config_file(cfgname)
regular_data, old_autosave_data = self._find_autosave_data(data)
config = self._build_config_wrapper(regular_data, cfgname)
except error as e:
msg = "Unable to parse existing config on SAVE_CONFIG"
logging.exception(msg)
raise gcode.error(msg)
regular_data = self._strip_duplicates(regular_data, self.autosave)
self._disallow_include_conflicts(regular_data, cfgname, gcode)
data = regular_data.rstrip() + autosave_data
# Determine filenames
datestr = time.strftime("-%Y%m%d_%H%M%S")
backup_name = cfgname + datestr
temp_name = cfgname + "_autosave"
if cfgname.endswith(".cfg"):
backup_name = cfgname[:-4] + datestr + ".cfg"
temp_name = cfgname[:-4] + "_autosave.cfg"
# Create new config file with temporary name and swap with main config
logging.info("SAVE_CONFIG to '%s' (backup in '%s')",
cfgname, backup_name)
try:
f = open(temp_name, 'w')
f.write(data)
f.close()
os.rename(cfgname, backup_name)
os.rename(temp_name, cfgname)
except:
msg = "Unable to write config file during SAVE_CONFIG"
logging.exception(msg)
raise gcode.error(msg)
# Request a restart
gcode.request_restart('restart')