removed tools, moved functions into userconfig

This commit is contained in:
Marcus Stoegbauer 2024-03-31 23:28:11 +02:00
parent e43dd9013c
commit 1a32b4e096
5 changed files with 193 additions and 354 deletions

2
.idea/misc.xml generated
View File

@ -3,5 +3,5 @@
<component name="Black"> <component name="Black">
<option name="sdkName" value="Python 3.6 (pyuserconfig)" /> <option name="sdkName" value="Python 3.6 (pyuserconfig)" />
</component> </component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (userconfig)" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6 (pyuserconfig) (2)" project-jdk-type="Python SDK" />
</project> </project>

View File

@ -4,7 +4,7 @@
<content url="file://$MODULE_DIR$"> <content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" /> <excludeFolder url="file://$MODULE_DIR$/venv" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="jdk" jdkName="Python 3.6 (pyuserconfig) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="TestRunnerService"> <component name="TestRunnerService">

View File

@ -1,11 +1,65 @@
import os import os
import argparse import argparse
from userconfig.cfgfile import Conf from userconfig.cfgfile import Conf
from userconfig.tools import Debug
from userconfig import Userconfig from userconfig import Userconfig
import sys import sys
import subprocess import subprocess
class Debug:
_verbose = 0
_COLOR_RED = '\033[91m'
_COLOR_BLUE = '\33[34m'
_COLOR_GREEN = '\33[32m'
_COLOR_YELLOW = '\033[93m'
_COLOR_END = '\33[0m'
_FORMAT = {'STANDARD': '',
'ERROR': f'{_COLOR_RED}ERROR:{_COLOR_END} ',
'NOTICE': f'{_COLOR_BLUE}NOTICE:{_COLOR_END} ',
'WARNING': f'{_COLOR_YELLOW}WARNING:{_COLOR_END} ',
'SUCCESS': f'{_COLOR_GREEN}SUCCESS:{_COLOR_END} '
}
def __init__(self, verbose=0):
self.set_verbose(verbose)
def set_verbose(self, verbose):
self._verbose = verbose
def add_verbose(self):
self._verbose += 1
def get_verbose(self):
return self._verbose
def stdout(self, out, verbose_level=0, category='STANDARD'):
if self._verbose >= verbose_level:
if category in self._FORMAT:
print(f'{self._FORMAT[category]}{out}')
else:
print(f'[category {category} unknown]:{out}')
def stderr(self, out, verbose_level=0, category='STANDARD'):
if self._verbose >= verbose_level:
if category in self._FORMAT:
print(f'{self._FORMAT[category]}{out}', file=sys.stderr)
else:
print(f'[category {category} unknown]:{out}', file=sys.stderr)
def red(self, out):
return f'{self._COLOR_RED}{out}{self._COLOR_END}'
def green(self, out):
return f'{self._COLOR_GREEN}{out}{self._COLOR_END}'
def blue(self, out):
return f'{self._COLOR_BLUE}{out}{self._COLOR_END}'
def yellow(self, out):
return f'{self._COLOR_YELLOW}{out}{self._COLOR_END}'
def main(): def main():
parser = argparse.ArgumentParser(prog='userconfig', parser = argparse.ArgumentParser(prog='userconfig',
description='Manages configuration files, usually in the user home directory') description='Manages configuration files, usually in the user home directory')
@ -20,13 +74,12 @@ def main():
debug = Debug() debug = Debug()
# debug.set_verbose(cmdline.verbose) # debug.set_verbose(cmdline.verbose)
# cfg = Conf(filename=cmdline.file, debug=debug) # cfg = Conf(filename=cmdline.file, debug=debug)
debug.set_verbose(4) debug.set_verbose(1)
cfg = Conf(filename='/Users/lysis/userconfig2-test.conf', debug=debug) cfg = Conf(filename='/home/lysis/userconfig2-test.conf', debug=debug)
# cfg = Conf(filename='/Users/lysis/etc/userconfig2.conf', debug=debug)
uc = Userconfig(cfg) uc = Userconfig(cfg)
cfg.debug.stdout(f"Verbose level is {cfg.debug.get_verbose()}", 1, 'STANDARD') cfg.debug.stdout(f'Verbose level: {cfg.debug.get_verbose()}', 1, 'STANDARD')
configdir = cfg.get("configdir") configdir = cfg.get('configdir')
# configdir is the root of the userconfig files # configdir is the root of the userconfig files
# Directory structure: # Directory structure:
# configdir/ # configdir/
@ -49,7 +102,7 @@ def main():
# [Number]_[category]_[value] # [Number]_[category]_[value]
# Number: can have leading zeros for better sorting in directory structure, will be used for sorting and priority # Number: can have leading zeros for better sorting in directory structure, will be used for sorting and priority
# category: which kind of match we are looking for. currently: Arch for `uname -s`, Host for `hostname -s` # category: which kind of match we are looking for. currently: Arch for `uname -s`, Host for `hostname -s`
# value: optional, if category requires input, for example: Arch_Linux # value: optional, if category requires an input value, for example: Arch_Linux
# can be empty for example if category is all (no match needed, we want this always to be applied) # can be empty for example if category is all (no match needed, we want this always to be applied)
cfg.debug.stdout(f'configdir: {configdir}', 1) cfg.debug.stdout(f'configdir: {configdir}', 1)
@ -62,7 +115,8 @@ def main():
if os.path.isfile(f'{package.path}/.ignore'): if os.path.isfile(f'{package.path}/.ignore'):
cfg.debug.stdout(f'{package.path} contains .ignore, skipping', 2, 'WARNING') cfg.debug.stdout(f'{package.path} contains .ignore, skipping', 2, 'WARNING')
continue continue
cfg.debug.stdout(f'============ start {package.path} ============', 2) cfg.debug.stdout(f'Package: {package.path}', 1, 'NOTICE')
cfg.debug.stdout(f'============ start {cfg.debug.green(package.path)} ============', 2)
(category_dirs, dir_config) = uc.process_package_dir(package.path) (category_dirs, dir_config) = uc.process_package_dir(package.path)
cfg.debug.stdout(f'Got categories: {category_dirs}', 2) cfg.debug.stdout(f'Got categories: {category_dirs}', 2)
host_category_dirs = uc.filter_categories(category_dirs) host_category_dirs = uc.filter_categories(category_dirs)
@ -84,8 +138,9 @@ def main():
# Make sure all directories for destination file exist # Make sure all directories for destination file exist
uc.create_destination_directories(dir_config.get(section="Main", option="dest")) uc.create_destination_directories(dir_config.get(section="Main", option="dest"))
temp_filename = uc.build_file(file_list[file], dest, comment_string) temp_filename = uc.build_file(file_list[file], dest, comment_string)
uc.copy_file(temp_filename, dest, comment_string) uc.diff_and_copy_file(temp_filename, dest, comment_string)
cfg.debug.stdout(f'============ end {package.path} ============\n\n', 2) cfg.debug.stdout(f'============ end {cfg.debug.green(package.path)} ============\n\n', 2)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -1,10 +1,12 @@
from userconfig.tools import get_config
from userconfig.checks import check_class from userconfig.checks import check_class
from userconfig.cfgfile import Conf
import os import os
from userconfig.tools import diff, user_config_generated, backup_file, copy_file
import time import time
import tempfile import tempfile
from pathlib import Path from pathlib import Path
import shutil
import re
import sys
class Userconfig: class Userconfig:
@ -13,7 +15,6 @@ class Userconfig:
def __init__(self, cfg): def __init__(self, cfg):
self._cfg = cfg self._cfg = cfg
""" This is NEW """
def process_package_dir(self, package_dir): def process_package_dir(self, package_dir):
""" """
Walk through package_dir, collect all directories inside, parse them according to our specs and return Walk through package_dir, collect all directories inside, parse them according to our specs and return
@ -28,7 +29,7 @@ class Userconfig:
self._cfg.debug.stdout(f'No config file {self._cfg.get("configfile")} in {package_dir}, skipping', self._cfg.debug.stdout(f'No config file {self._cfg.get("configfile")} in {package_dir}, skipping',
1, 'ERROR') 1, 'ERROR')
return None return None
dir_config = get_config(package_config_file, self._cfg) dir_config = self.get_config(package_config_file)
if not dir_config: if not dir_config:
self._cfg.debug.stdout(f'Cannot read config file {package_config_file}, skipping', 1, 'ERROR') self._cfg.debug.stdout(f'Cannot read config file {package_config_file}, skipping', 1, 'ERROR')
return None return None
@ -95,52 +96,52 @@ class Userconfig:
file_list[file.name].append(file.path) file_list[file.name].append(file.path)
return file_list return file_list
def build_file(self, files, destfile, commentstring): def build_file(self, files, dest_file, comment_string):
""" """
merge all files in files[] and write contents into a tempfile merge all files in files[] and write contents into a temporary file
:param files: :param files:
:param destfile: :param dest_file:
:param commentstring: :param comment_string:
:return: tempfile name :return: temporary filename
""" """
content = [] content = []
self._cfg.debug.stdout(f'*** building file for {destfile}', 3) self._cfg.debug.stdout(f'*** building file for {dest_file}', 3)
if commentstring: if comment_string:
content.append(f'{commentstring} {self._cfg.get("stamp")} {time.strftime("%+")}\n') content.append(f'{comment_string} {self._cfg.get("stamp")} {time.strftime("%+")}\n')
for file in files: for file in files:
self._cfg.debug.stdout(f'Merging {file}', 3) self._cfg.debug.stdout(f'Merging {file}', 3)
fp = open(file, "r") fp = open(file, "r")
filecontent = fp.read() file_content = fp.read()
fp.close() fp.close()
# if commentstring exists, add comment with category dir and filename # if comment_string exists, add comment with category dir and filename
if commentstring: if comment_string:
content.append(f'{commentstring} {"/".join(file.split("/")[-2:])}') content.append(f'{comment_string} {"/".join(file.split("/")[-2:])}')
content.append(filecontent) content.append(file_content)
(tempfd, tempfilename) = tempfile.mkstemp(prefix=os.path.basename(destfile), dir="/tmp") (temp_fd, temp_filename) = tempfile.mkstemp(prefix=os.path.basename(dest_file), dir="/tmp")
try: try:
fp = os.fdopen(tempfd, "w") fp = os.fdopen(temp_fd, "w")
except Exception as e: except Exception as e:
self._cfg.debug.stderr(f"Cannot write to temporary file {tempfilename}: {e}", 0, 'ERROR') self._cfg.debug.stderr(f"Cannot write to temporary file {temp_filename}: {e}", 0, 'ERROR')
os.remove(tempfilename) os.remove(temp_filename)
return False return False
self._cfg.debug.stdout(f'Writing merged files into tempfile {tempfilename}', 3) self._cfg.debug.stdout(f'Writing merged files into temporary file {temp_filename}', 3)
for block in content: for block in content:
fp.write(block) fp.write(block)
fp.write("\n") fp.write("\n")
fp.close() fp.close()
return tempfilename return temp_filename
def copy_file(self, temp_filename, dest_filename, commentstring): def diff_and_copy_file(self, temp_filename, dest_filename, comment_string):
if diff(dest_filename, temp_filename, commentstring, self._cfg): if self.diff(dest_filename, temp_filename, comment_string):
self._cfg.debug.stdout(f"File {dest_filename} has changed", 0, 'NOTICE') self._cfg.debug.stdout(f"File {dest_filename} has changed", 0, 'NOTICE')
if not user_config_generated(dest_filename, self._cfg): if not self.user_config_generated(dest_filename):
self._cfg.debug.stdout(f"{dest_filename} not generated by userconfig, backing up.", 3) self._cfg.debug.stdout(f"{dest_filename} not generated by userconfig, backing up.", 3)
backup_file(dest_filename, self._cfg) self.backup_file(dest_filename)
self._cfg.debug.stdout(f"Copy {temp_filename} to {dest_filename}", 0, 'NOTICE') self._cfg.debug.stdout(f"Copy {temp_filename} to {dest_filename}", 0, 'NOTICE')
copy_file(temp_filename, dest_filename, self._cfg) self.copy_file(temp_filename, dest_filename)
self._cfg.debug.stdout(f"Removing temporary file {temp_filename}", 3) self._cfg.debug.stdout(f"Removing temporary file {temp_filename}", 3)
os.remove(temp_filename) os.remove(temp_filename)
@ -150,160 +151,103 @@ class Userconfig:
if not path.exists(): if not path.exists():
path.mkdir(parents=True) path.mkdir(parents=True)
# def build_file_old(self, classfiles, destfile, commentstring): def get_config(self, filename):
# """open all classfiles, assemble them and write the contents into a tempfile """reads filename as config, checks for DEST parameter and returns cfgfile object"""
# returns the name of tempfile try:
# :param classfiles: ret = Conf(filename=filename, debug=self._cfg.debug, force_filename=True)
# :param destfile: except ValueError:
# :param commentstring: self._cfg.debug.stderr("Error reading config file %s" % filename)
# :return: str return False
# """ # check for DEST parameter
# content = [] if not ret.check(section="Main", option="dest"):
# self._cfg.debug.stdout(f" ================ build_file {destfile} ===============", 1) self._cfg.debug.stderr("No dest in config file %s" % filename)
# if commentstring != "": return False
# self._cfg.debug.stdout(" +++ commentstring found, adding header.", 3) # make sure DEST ends with /
# content.append(commentstring + " " + self._cfg.get("stamp") + " " + time.strftime("%+") + "\n") if not ret.get(section="Main", option="dest").endswith("/"):
# ret.set(section="Main", option="dest", value=ret.get(section="Main", option="dest")+"/")
# for f in classfiles: return ret
# self._cfg.debug.stdout(" +++ Merging %s." % f, 4)
# fp = open(f, "r")
# filecontent = fp.read()
# fp.close()
# if commentstring == "":
# # look for stamp in content, replace with real stamp
# if re.search(re.escape(self._cfg.get("stampreplace")), filecontent):
# self._cfg.debug.stdout(" +++ commentstring empty, replacing stamp in file", 3)
# filecontent = re.sub(re.escape(self._cfg.get("stampreplace")), self._cfg.get("stamp"),
# filecontent)
# content.append(filecontent)
#
# (tempfd, tempfilename) = tempfile.mkstemp(prefix=os.path.basename(destfile), dir="/tmp")
#
# try:
# fp = os.fdopen(tempfd, "w")
# except Exception as e:
# self._cfg.debug.stderr(f"Cannot write to temporary file {tempfilename}: {e}")
# os.remove(tempfilename)
# return False
# self._cfg.debug.stdout(" +++ Writing merged files into tempfile %s." % tempfilename, 3)
# for block in content:
# fp.write(block)
# fp.write("\n")
# fp.close()
# self._cfg.debug.stdout(f" ================ done: build_file {destfile} ===============", 1)
# return tempfilename
# def process_all_files(self, destfiles, dir_config): @staticmethod
# """processes all files in destfiles, generate files from classes, compare and copy if necessary""" def read_skip_comment(fp, comment_string):
# self._cfg.debug.stdout(" ================ process_all_files ===============", 1) """Read line from filehandle fp and skip all empty (whitespace) lines and lines starting with comment_string
# for df in destfiles.keys(): """
# self._cfg.debug.stdout(" ??? Processing source files for %s." % df, 2) for line in fp:
# if not os.path.exists(os.path.dirname(df)): line = line[:-1]
# self._cfg.debug.stdout(" +++ Directory %s does not exist, creating" % os.path.dirname(df), 1) if ((comment_string != "" and not re.match("^"+re.escape(comment_string), line)) and line != "" and
# os.mkdir(os.path.dirname(df)) not re.match(r"^\s+$", line)):
# if not os.path.isdir(os.path.dirname(df)): yield line
# self._cfg.debug.stdout(f" --- Destination directory {os.path.dirname(df)} does not exist, skipping.",
# 1)
# return False
# # assemble file to tmp
# commentstring = ""
# if dir_config.check(section="Main", option="commentstring"):
# commentstring = dir_config.get(section="Main", option="commentstring")
# self._cfg.debug.stdout(" +++ Found commentstring %s in %s" % (commentstring, df), 3)
#
# tempfilename = self.build_file(destfiles[df], df, commentstring)
# if not tempfilename:
# self._cfg.debug.stdout(" --- Error while creating temp file for %s, skipping." % df, 1)
# continue
# self._cfg.debug.stdout(" +++ Merged files %s for %s into %s" % (str(destfiles[df]), df, tempfilename), 2)
#
# # diff assembled file and config file
# if diff(df, tempfilename, commentstring, self._cfg):
# self._cfg.debug.stdout("File %s has changed" % df, 0)
# if not user_config_generated(df, self._cfg):
# self._cfg.debug.stdout(" +++ %s not generated by userconfig, backing up." % df, 2)
# # file not generated from userconfig -> back up
# backup_file(df, self._cfg)
# # copy tmp file to real location
# self._cfg.debug.stdout("Copy %s to %s." % (tempfilename, df), 0)
# copy_file(tempfilename, df, self._cfg)
# # remove tmp
# self._cfg.debug.stdout(" +++ Removing temporary file %s." % tempfilename, 2)
# os.remove(tempfilename)
# self._cfg.debug.stdout(" ================ process_all_files ===============", 1)
# def workconf(self, directory, depth=2): def diff(self, dest_file, temp_file, comment_string):
# """walks through directory, collecting all filenames, returns list of all filenames""" """diff dest_file and temp_file, returns True if files differ, False if they are the same"""
# dirs = os.listdir(directory) self._cfg.debug.stdout("Diffing %s and %s, comment: %s" % (dest_file, temp_file, comment_string), 3)
# ret = [] if not os.path.isfile(dest_file):
# self._cfg.debug.stdout(f" ================ workconf {directory} ===============", 1) self._cfg.debug.stdout("Destination file %s does not exist, returning True." % dest_file, 3)
# for d in dirs: # dest_file does not exist -> copy temp_file over
# name = directory + "/" + d return True
# if os.path.isdir(name): if not os.path.isfile(temp_file):
# self.workconf(name, depth + 1) # temp_file does not exist, this should never happen
# if name.endswith(".swp"): self._cfg.debug.stderr("Temporary file %s does not exist, this should not happen." % temp_file)
# continue sys.exit(1)
# if d == ".svn":
# continue
# ret.append(name)
# self._cfg.debug.stdout(" +++ Found file %s in directory %s" % (name, directory), 4)
# self._cfg.debug.stdout(f" ================ done: workconf {directory} ===============", 1)
# return ret
# def workdir(self, directory): fp1 = open(temp_file)
# """walks through all host classes, checking if the classes directory exists in directory fp2 = open(dest_file)
# then collect all filenames within this directory and return a dict of all files for this
# directory for line1, line2 in zip(self.read_skip_comment(fp1, comment_string), self.read_skip_comment(fp2, comment_string)):
# """ if line1 != line2:
# self._cfg.debug.stdout(f" ================ workdir {directory} ===============", 1) fp1.close()
# # skip directory if no CONFIGFILE present fp2.close()
# if not os.path.isfile(directory+"/"+self._cfg.get("configfile")): self._cfg.debug.stdout("%s differs, return true" % dest_file, 3)
# self._cfg.debug.stdout(f' --- No file {self._cfg.get("configfile")} in {directory}, skipping.', 1) return True
# return {}, None fp1.close()
# fp2.close()
# # get config file for directory self._cfg.debug.stdout("%s is the same, return false" % dest_file, 3)
# dir_config = get_config(directory + "/" + self._cfg.get("configfile"), self._cfg) return False
# if not dir_config:
# self._cfg.debug.stdout(f' --- Cannot read config file {self._cfg.get("configfile")} in {directory}, ' def user_config_generated(self, filename):
# f'skipping.', 1) """returns True if filename has been generated by userconfig, False else"""
# return {}, None
# if not os.path.isfile(filename):
# destdir = dir_config.get(section="Main", option="dest") # filename does not exist, so it was not generated by userconfig
# # destfiles is a dict of all files that will be created from the classes config return False
# # key is the destination filename, values are all classes filenames that are used to
# # build the file if not self._cfg.check("stamp"):
# destfiles = {} # no STAMP in userconfig.cfg, so no way to check if file was generated by userconfig
# try: return False
# reverse_sort = dir_config.get(section="Main", option="reverse", boolean=True)
# except ValueError: fp = open(filename, "r")
# reverse_sort = False
# self._cfg.debug.stdout(f' +++ reverse_sort is {reverse_sort}', 3) for line in fp:
# if os.access(directory + "/install.sh", os.X_OK): if re.search(re.escape(self._cfg.get("stamp")), line):
# subprocess.call([directory + "/install.sh"]) return True
# return False
# # walk through all know classes in directory and find filenames
# for h in classes_for_host(reverse_sort): def backup_file(self, filename):
# # build classes directory """make backup of filename, returns True if backup is successful, False else"""
# if h[0] != "": if os.path.isfile(filename):
# classdir = directory+"/"+h[0]+"_"+h[1] self._cfg.debug.stdout("%s exists, finding backup name." % filename, 3)
# else: backup_name = filename+".userconfig."+time.strftime("%F")
# classdir = directory+"/"+h[1] test_backup_name = backup_name
# self._cfg.debug.stdout(" ??? Looking for directory %s." % classdir, 4) counter = 0
# # if class directory exists while os.path.isfile(test_backup_name):
# if os.path.isdir(classdir): counter += 1
# self._cfg.debug.stdout(" +++ Found directory %s, getting files." % classdir, 4) test_backup_name = backup_name+"."+str(counter)
# # get list of files within this class directory self._cfg.debug.stdout("Renaming %s to %s" % (filename, test_backup_name), 1)
# tempfiles = self.workconf(classdir) os.rename(filename, test_backup_name)
# self._cfg.debug.stdout(" +++ Got %d files: %s." % (len(tempfiles), str(tempfiles)), 4) return True
# # put files into dict else:
# for f in tempfiles: self._cfg.debug.stdout("%s does not exist, do not need backup." % filename, 3)
# destname = destdir+os.path.basename(f) # destination filename return False
# if destname not in destfiles:
# destfiles[destname] = [] def copy_file(self, sourcefile, dest_file):
# destfiles[destname].append(f) # append each file to dict """copy sourcefile to dest_file, returns True if successful, False else"""
# self._cfg.debug.stdout(f" +++ Added file to {destname}, now {len(destfiles[destname])} files: "
# f"{destfiles[destname]}", 4) if os.path.isfile(sourcefile):
# # sourcefile exists
# self._cfg.debug.stdout(" === workdir: %s, Files: %s" % (directory, str(destfiles)), 3) self._cfg.debug.stdout("Source file %s exists, proceeding with copy." % sourcefile, 3)
# self._cfg.debug.stdout(f" ================ done: workdir {directory} ===============", 1) if not os.path.isfile(dest_file) or os.access(dest_file, os.W_OK):
# return destfiles, dir_config self._cfg.debug.stdout("Copying %s to %s" % (sourcefile, dest_file), 1)
shutil.copy(sourcefile, dest_file)
return True
else:
self._cfg.debug.stdout("Destination file %s does not exist or is not writable." % dest_file, 3)
return False

View File

@ -1,160 +0,0 @@
import sys
import os
from userconfig.cfgfile import Conf
import re
import time
import shutil
class Debug:
_verbose = 0
_COLOR_RED = '\033[91m'
_COLOR_BLUE = '\33[34m'
_COLOR_GREEN = '\33[32m'
_COLOR_YELLOW = '\033[93m'
_COLOR_END = '\33[0m'
_FORMAT = { 'STANDARD': '',
'ERROR': f'{_COLOR_RED}ERROR:{_COLOR_END} ',
'NOTICE': f'{_COLOR_BLUE}NOTICE:{_COLOR_END} ',
'WARNING': f'{_COLOR_YELLOW}WARNING:{_COLOR_END} ',
'SUCCESS': f'{_COLOR_GREEN}SUCCESS:{_COLOR_END} '
}
def __init__(self, verbose=0):
self.set_verbose(verbose)
def set_verbose(self, verbose):
self._verbose = verbose
def add_verbose(self):
self._verbose += 1
def get_verbose(self):
return self._verbose
def stdout(self, out, verbose_level=0, category='STANDARD'):
if self._verbose >= verbose_level:
if category in self._FORMAT:
print(f'{self._FORMAT[category]}{out}')
else:
print(f'[category {category} unknown]:{out}')
def stderr(self, out, verbose_level=0, category='STANDARD'):
if self._verbose >= verbose_level:
if category in self._FORMAT:
print(f'{self._FORMAT[category]}{out}', file=sys.stderr)
else:
print(f'[category {category} unknown]:{out}', file=sys.stderr)
def get_config(filename, cfg):
"""reads filename as config, checks for DEST parameter and returns cfgfile object"""
try:
ret = Conf(filename=filename, debug=cfg.debug, force_filename=True)
except ValueError:
cfg.debug.stderr("Error reading config file %s" % filename)
return False
# check for DEST parameter
if not ret.check(section="Main", option="dest"):
cfg.debug.stderr("No dest in config file %s" % filename)
return False
# make sure DEST ends with /
if not ret.get(section="Main", option="dest").endswith("/"):
ret.set(section="Main", option="dest", value=ret.get(section="Main", option="dest")+"/")
return ret
def read_skip_comment(fp, commentstring):
"""Read line from filehandle fp and skip all empty (whitespace) lines and lines starting with commentstring
"""
for line in fp:
line = line[:-1]
if ((commentstring != "" and not re.match("^"+re.escape(commentstring), line)) and line != "" and
not re.match(r"^\s+$", line)):
yield line
def diff(destfile, tempfile, commentstring, cfg):
"""diff destfile and tempfile, returns True if files differ, False if they are the same"""
cfg.debug.stdout("Diffing %s and %s, comment: %s" % (destfile, tempfile, commentstring), 3)
if not os.path.isfile(destfile):
cfg.debug.stdout("Destfile %s does not exist, returning True." % destfile, 3)
# destfile does not exist -> copy tempfile over
return True
# if not destfile
if not os.path.isfile(tempfile):
# tempfile does not exist, this should never happen
cfg.debug.stderr("Temporary file %s does not exist, this should not happen." % tempfile)
sys.exit(1)
# if not tempfile
fp1 = open(tempfile)
fp2 = open(destfile)
for line1, line2 in zip(read_skip_comment(fp1, commentstring), read_skip_comment(fp2, commentstring)):
if line1 != line2:
fp1.close()
fp2.close()
cfg.debug.stdout("%s differs, return true" % destfile, 3)
return True
# if differ
# for line
fp1.close()
fp2.close()
cfg.debug.stdout("%s is the same, return false" % destfile, 3)
return False
def user_config_generated(filename, cfg):
"""returns True if filename has been generated by userconfig, False else"""
if not os.path.isfile(filename):
# filename does not exist, so it was not generated by userconfig
return False
if not cfg.check("stamp"):
# no STAMP in userconfig.cfg, so no way to check if file was generated by userconfig
return False
fp = open(filename, "r")
for line in fp:
if re.search(re.escape(cfg.get("stamp")), line):
return True
return False
def backup_file(filename, cfg):
"""make backup of filename, returns True if backup is successful, False else"""
if os.path.isfile(filename):
cfg.debug.stdout("%s exists, finding backup name." % filename, 3)
backupname = filename+".userconfig."+time.strftime("%F")
testbackupname = backupname
counter = 0
while os.path.isfile(testbackupname):
counter += 1
testbackupname = backupname+"."+str(counter)
cfg.debug.stdout("Renaming %s to %s" % (filename, testbackupname), 1)
os.rename(filename, testbackupname)
return True
else:
cfg.debug.stdout("%s does not exist, do not need backup." % filename, 3)
return False
def copy_file(sourcefile, destfile, cfg):
"""copy sourcefile to destfile, returns True if successful, False else"""
if os.path.isfile(sourcefile):
# sourcefile exists
cfg.debug.stdout("Source file %s exists, proceeding with copy." % sourcefile, 3)
if not os.path.isfile(destfile) or os.access(destfile, os.W_OK):
cfg.debug.stdout("Copying %s to %s" % (sourcefile, destfile), 1)
shutil.copy(sourcefile, destfile)
return True
# destfile is writable
else:
cfg.debug.stdout("Destination file %s does not exist or is not writable." % destfile, 3)
return False