pyuserconfig/userconfig/__init__.py

257 lines
10 KiB
Python

from userconfig.checks import check_class
from userconfig.cfgfile import Conf
import os
import time
import tempfile
from pathlib import Path
import shutil
import re
import sys
class Userconfig:
_cfg = None
def __init__(self, cfg):
self._cfg = cfg
def process_package_dir(self, package_dir):
"""
Walk through package_dir, collect all directories inside, parse them according to our specs and return
a sorted list with entries (prio, category, value, path)
:param package_dir: root of package directory
:return: list of tuples (prio, category, value, path), sorted with prio
"""
self._cfg.debug.stdout(f'process package_dir {package_dir}', 3)
package_config_file = f'{package_dir}/{self._cfg.get("configfile")}'
if not os.path.isfile(package_config_file):
self._cfg.debug.stdout(f'No config file {self._cfg.get("configfile")} in {package_dir}, skipping',
0, 'ERROR')
return None, None
dir_config = self.get_config(package_config_file)
if not dir_config:
self._cfg.debug.stdout(f'Cannot read config file {package_config_file}, skipping', 0, 'ERROR')
return None, None
classes = []
for category in os.scandir(package_dir):
if category.path == package_config_file:
continue
if not category.is_dir():
self._cfg.debug.stdout(f'{category.path} is not a directory, skipping', 3)
continue
self._cfg.debug.stdout(f'process category {category.path}', 3)
content = category.name.split('_')
# Format: <number>_<category>_<value>
if len(content) == 3:
prio_string = content[0]
category_name = content[1]
value = content[2]
elif len(content) == 2:
prio_string = content[0]
category_name = content[1]
value = ''
else:
self._cfg.debug.stdout(f'Format of package directory {category.path} wrong, skipping', 0, 'ERROR')
continue
try:
prio = int(prio_string)
except ValueError:
self._cfg.debug.stdout(f'Cannot convert prio to integer ({category.path}, skipping', 0, 'ERROR')
continue
# self._cfg.debug.stdout(f'Got class: {(prio, category_name, value)}', 3)
classes.append((prio, category_name, value, category.path))
classes.sort(key=lambda k: k[0])
return classes, dir_config
def filter_categories(self, categories):
"""
Get classes list of tuples from process_package_dir and filter it for host running the command
:param categories: list of tuples from process_package_dir
:return: matching classes for this host
"""
ret = []
for c in categories:
if not c[1]:
self._cfg.debug.stdout(f'category not set for {c[3]}, skipping', 0, 'ERROR')
continue
if check_class(c):
ret.append(c)
return ret
def process_category_dir(self, category_dir_tuple, file_list):
"""
Walk through category_dir, collect all files
:param category_dir_tuple:
:param file_list:
:return:
"""
(prio, category, value, category_dir) = category_dir_tuple
self._cfg.debug.stdout(f'process category {category_dir}', 3)
for file in os.scandir(category_dir):
if file.name.endswith('.swp'):
self._cfg.debug.stdout(f'Ignoring swap file {file.name}', 3)
continue
if file.name not in file_list:
file_list[file.name] = []
file_list[file.name].append(file.path)
return file_list
def build_file(self, files, dest_file, comment_string):
"""
merge all files in files[] and write contents into a temporary file
:param files:
:param dest_file:
:param comment_string:
:return: temporary filename
"""
content = []
self._cfg.debug.stdout(f'building file for {dest_file}', 3)
if comment_string:
content.append(f'{comment_string} {self._cfg.get("stamp")} {time.strftime("%+")}\n')
for file in files:
self._cfg.debug.stdout(f'Merging {file}', 3)
fp = open(file, "r")
file_content = fp.read()
fp.close()
# if comment_string exists, add comment with category dir and filename
if comment_string:
content.append(f'{comment_string} {"/".join(file.split("/")[-2:])}')
content.append(file_content)
(temp_fd, temp_filename) = tempfile.mkstemp(prefix=os.path.basename(dest_file), dir="/tmp")
try:
fp = os.fdopen(temp_fd, "w")
except Exception as e:
self._cfg.debug.stderr(f'Cannot write to temporary file {temp_filename}: {e}', 0, 'ERROR')
os.remove(temp_filename)
return False
self._cfg.debug.stdout(f'Writing merged files into temporary file {temp_filename}', 3)
for block in content:
fp.write(block)
fp.write("\n")
fp.close()
return temp_filename
def diff_and_copy_file(self, temp_filename, dest_filename, comment_string):
ret = False
if self.diff(dest_filename, temp_filename, comment_string):
if not self.user_config_generated(dest_filename):
self._cfg.debug.stdout(f'{dest_filename} not generated by userconfig, running back_up.', 3)
self.backup_file(dest_filename)
self.copy_file(temp_filename, dest_filename)
ret = True
os.remove(temp_filename)
self._cfg.debug.stdout(f'Removed temporary file {temp_filename}', 3, 'SUCCESS')
return ret
def create_destination_directories(self, dest_directory):
path = Path(dest_directory)
if not path.exists():
path.mkdir(parents=True)
return True
else:
return False
def get_config(self, filename):
"""reads filename as config, checks for DEST parameter and returns cfgfile object"""
try:
ret = Conf(filename=filename, debug=self._cfg.debug, force_filename=True)
except ValueError:
self._cfg.debug.stderr(f'Error reading config file {filename}', 0, 'ERROR')
return False
# check for DEST parameter
if not ret.check(section="Main", option="dest"):
self._cfg.debug.stderr(f'No dest in config file {filename}', 0, 'ERROR')
return False
# make sure DEST ends with /
if not ret.get(section="Main", option="dest").endswith("/"):
ret.set(section="Main", option="dest", value=ret.get(section="Main", option="dest")+"/")
return ret
@staticmethod
def read_skip_comment(fp, comment_string):
"""Read line from filehandle fp and skip all empty (whitespace) lines and lines starting with comment_string
"""
for line in fp:
line = line[:-1]
if ((comment_string != "" and not re.match("^"+re.escape(comment_string), line)) and line != "" and
not re.match(r"^\s+$", line)):
yield line
def diff(self, dest_file, temp_file, comment_string):
"""diff dest_file and temp_file, returns True if files differ, False if they are the same"""
self._cfg.debug.stdout(f'Diffing {dest_file} and {temp_file}, comment: {comment_string}', 3)
if not os.path.isfile(dest_file):
self._cfg.debug.stdout(f'dest_file {dest_file} does not exist.', 3)
return True
if not os.path.isfile(temp_file):
self._cfg.debug.stderr(f'Temporary file {temp_file} does not exist, this should not happen.', 0, 'ERROR')
sys.exit(1)
fp1 = open(temp_file)
fp2 = open(dest_file)
for line1, line2 in zip(self.read_skip_comment(fp1, comment_string), self.read_skip_comment(fp2, comment_string)):
if line1 != line2:
fp1.close()
fp2.close()
self._cfg.debug.stdout(f'{dest_file} differs from generated config', 3)
return True
fp1.close()
fp2.close()
self._cfg.debug.stdout(f'{dest_file} is the same as generated config', 3)
return False
def user_config_generated(self, filename):
"""returns True if filename has been generated by userconfig, False else"""
if not os.path.isfile(filename):
# filename does not exist, so it was not generated by userconfig
return False
if not self._cfg.check("stamp"):
# no STAMP in userconfig.cfg, so no way to check if file was generated by userconfig
return False
fp = open(filename, "r")
for line in fp:
if re.search(re.escape(self._cfg.get("stamp")), line):
return True
return False
def backup_file(self, filename):
"""make backup of filename, returns True if backup is successful, False else"""
if os.path.isfile(filename):
self._cfg.debug.stdout(f'{filename} exists, finding backup name.', 3)
backup_name = filename+".userconfig."+time.strftime("%F")
test_backup_name = backup_name
counter = 0
while os.path.isfile(test_backup_name):
counter += 1
test_backup_name = backup_name+"."+str(counter)
os.rename(filename, test_backup_name)
self._cfg.debug.stdout(f'Renamed {filename} to {test_backup_name}', 1, 'SUCCESS')
return True
else:
self._cfg.debug.stdout(f'{filename} does not exist, do not need backup.', 3)
return False
def copy_file(self, sourcefile, dest_file):
"""copy sourcefile to dest_file, returns True if successful, False else"""
if os.path.isfile(sourcefile):
# sourcefile exists
self._cfg.debug.stdout(f'Source file {sourcefile} exists, proceeding with copy.', 3)
if not os.path.isfile(dest_file) or os.access(dest_file, os.W_OK):
shutil.copy(sourcefile, dest_file)
return True
else:
self._cfg.debug.stdout('Destination {dest_file} is not a file or not writable.', 0, 'ERROR')
return False