from userconfig.checks import check_class from userconfig.cfgfile import Conf import os import time import tempfile from pathlib import Path import shutil import re import sys class Userconfig: _cfg = None def __init__(self, cfg): self._cfg = cfg def process_package_dir(self, package_dir): """ Walk through package_dir, collect all directories inside, parse them according to our specs and return a sorted list with entries (prio, category, value, path) :param package_dir: root of package directory :return: list of tuples (prio, category, value, path), sorted with prio """ self._cfg.debug.stdout(f'process package_dir {package_dir}', 3) package_config_file = f'{package_dir}/{self._cfg.get("configfile")}' if not os.path.isfile(package_config_file): self._cfg.debug.stdout(f'No config file {self._cfg.get("configfile")} in {package_dir}, skipping', 0, 'ERROR') return None, None dir_config = self.get_config(package_config_file) if not dir_config: self._cfg.debug.stdout(f'Cannot read config file {package_config_file}, skipping', 0, 'ERROR') return None, None classes = [] for category in os.scandir(package_dir): if category.path == package_config_file: continue if not category.is_dir(): self._cfg.debug.stdout(f'{category.path} is not a directory, skipping', 3) continue self._cfg.debug.stdout(f'process category {category.path}', 3) content = category.name.split('_') # Format: __ if len(content) == 3: prio_string = content[0] category_name = content[1] value = content[2] elif len(content) == 2: prio_string = content[0] category_name = content[1] value = '' else: self._cfg.debug.stdout(f'Format of package directory {category.path} wrong, skipping', 0, 'ERROR') continue try: prio = int(prio_string) except ValueError: self._cfg.debug.stdout(f'Cannot convert prio to integer ({category.path}, skipping', 0, 'ERROR') continue # self._cfg.debug.stdout(f'Got class: {(prio, category_name, value)}', 3) classes.append((prio, category_name, value, category.path)) classes.sort(key=lambda k: k[0]) return classes, dir_config def filter_categories(self, categories): """ Get classes list of tuples from process_package_dir and filter it for host running the command :param categories: list of tuples from process_package_dir :return: matching classes for this host """ ret = [] for c in categories: if not c[1]: self._cfg.debug.stdout(f'category not set for {c[3]}, skipping', 0, 'ERROR') continue if check_class(c): ret.append(c) return ret def process_category_dir(self, category_dir_tuple, file_list): """ Walk through category_dir, collect all files :param category_dir_tuple: :param file_list: :return: """ (prio, category, value, category_dir) = category_dir_tuple self._cfg.debug.stdout(f'process category {category_dir}', 3) for file in os.scandir(category_dir): if file.name not in file_list: file_list[file.name] = [] file_list[file.name].append(file.path) return file_list def build_file(self, files, dest_file, comment_string): """ merge all files in files[] and write contents into a temporary file :param files: :param dest_file: :param comment_string: :return: temporary filename """ content = [] self._cfg.debug.stdout(f'building file for {dest_file}', 3) if comment_string: content.append(f'{comment_string} {self._cfg.get("stamp")} {time.strftime("%+")}\n') for file in files: self._cfg.debug.stdout(f'Merging {file}', 3) fp = open(file, "r") file_content = fp.read() fp.close() # if comment_string exists, add comment with category dir and filename if comment_string: content.append(f'{comment_string} {"/".join(file.split("/")[-2:])}') content.append(file_content) (temp_fd, temp_filename) = tempfile.mkstemp(prefix=os.path.basename(dest_file), dir="/tmp") try: fp = os.fdopen(temp_fd, "w") except Exception as e: self._cfg.debug.stderr(f'Cannot write to temporary file {temp_filename}: {e}', 0, 'ERROR') os.remove(temp_filename) return False self._cfg.debug.stdout(f'Writing merged files into temporary file {temp_filename}', 3) for block in content: fp.write(block) fp.write("\n") fp.close() return temp_filename def diff_and_copy_file(self, temp_filename, dest_filename, comment_string): ret = False if self.diff(dest_filename, temp_filename, comment_string): if not self.user_config_generated(dest_filename): self._cfg.debug.stdout(f'{dest_filename} not generated by userconfig, running back_up.', 3) self.backup_file(dest_filename) self.copy_file(temp_filename, dest_filename) ret = True os.remove(temp_filename) self._cfg.debug.stdout(f'Removed temporary file {temp_filename}', 3, 'SUCCESS') return ret def create_destination_directories(self, dest_directory): path = Path(dest_directory) if not path.exists(): path.mkdir(parents=True) return True else: return False def get_config(self, filename): """reads filename as config, checks for DEST parameter and returns cfgfile object""" try: ret = Conf(filename=filename, debug=self._cfg.debug, force_filename=True) except ValueError: self._cfg.debug.stderr(f'Error reading config file {filename}', 0, 'ERROR') return False # check for DEST parameter if not ret.check(section="Main", option="dest"): self._cfg.debug.stderr(f'No dest in config file {filename}', 0, 'ERROR') return False # make sure DEST ends with / if not ret.get(section="Main", option="dest").endswith("/"): ret.set(section="Main", option="dest", value=ret.get(section="Main", option="dest")+"/") return ret @staticmethod def read_skip_comment(fp, comment_string): """Read line from filehandle fp and skip all empty (whitespace) lines and lines starting with comment_string """ for line in fp: line = line[:-1] if ((comment_string != "" and not re.match("^"+re.escape(comment_string), line)) and line != "" and not re.match(r"^\s+$", line)): yield line def diff(self, dest_file, temp_file, comment_string): """diff dest_file and temp_file, returns True if files differ, False if they are the same""" self._cfg.debug.stdout(f'Diffing {dest_file} and {temp_file}, comment: {comment_string}', 3) if not os.path.isfile(dest_file): self._cfg.debug.stdout(f'dest_file {dest_file} does not exist.', 3) return True if not os.path.isfile(temp_file): self._cfg.debug.stderr(f'Temporary file {temp_file} does not exist, this should not happen.', 0, 'ERROR') sys.exit(1) fp1 = open(temp_file) fp2 = open(dest_file) for line1, line2 in zip(self.read_skip_comment(fp1, comment_string), self.read_skip_comment(fp2, comment_string)): if line1 != line2: fp1.close() fp2.close() self._cfg.debug.stdout(f'{dest_file} differs from generated config', 3) return True fp1.close() fp2.close() self._cfg.debug.stdout(f'{dest_file} is the same as generated config', 3) return False def user_config_generated(self, filename): """returns True if filename has been generated by userconfig, False else""" if not os.path.isfile(filename): # filename does not exist, so it was not generated by userconfig return False if not self._cfg.check("stamp"): # no STAMP in userconfig.cfg, so no way to check if file was generated by userconfig return False fp = open(filename, "r") for line in fp: if re.search(re.escape(self._cfg.get("stamp")), line): return True return False def backup_file(self, filename): """make backup of filename, returns True if backup is successful, False else""" if os.path.isfile(filename): self._cfg.debug.stdout(f'{filename} exists, finding backup name.', 3) backup_name = filename+".userconfig."+time.strftime("%F") test_backup_name = backup_name counter = 0 while os.path.isfile(test_backup_name): counter += 1 test_backup_name = backup_name+"."+str(counter) os.rename(filename, test_backup_name) self._cfg.debug.stdout(f'Renamed {filename} to {test_backup_name}', 1, 'SUCCESS') return True else: self._cfg.debug.stdout(f'{filename} does not exist, do not need backup.', 3) return False def copy_file(self, sourcefile, dest_file): """copy sourcefile to dest_file, returns True if successful, False else""" if os.path.isfile(sourcefile): # sourcefile exists self._cfg.debug.stdout(f'Source file {sourcefile} exists, proceeding with copy.', 3) if not os.path.isfile(dest_file) or os.access(dest_file, os.W_OK): shutil.copy(sourcefile, dest_file) return True else: self._cfg.debug.stdout('Destination {dest_file} is not a file or not writable.', 0, 'ERROR') return False