import os import sys import configobj import pkgutil import zipfile import tempfile import wimcollect.common.logger as logger class LogConfig: def __init__(self, object_id: str, config_parser: configobj.ConfigObj = None, log: logger = None): # Config if config_parser is None: self.config = get_config_parser() else: self.config = config_parser # Logger self.object_id = object_id if log is None: self.logger = logger.Logger(self.config, self.object_id) else: self.logger = log def get_config_parser() -> configobj.ConfigObj: # Configuration file pkgpath = os.path.dirname(pkgutil.get_loader("wimcollect").path) conf_file_path = os.path.join(pkgpath, "config", "settings.ini") if not os.path.exists(conf_file_path): msg = "Configuration file not found in [" + conf_file_path + "]" sys.stderr.write(msg) raise FileNotFoundError(msg) config_parser = configobj.ConfigObj(conf_file_path) return config_parser def recompress_file(zip_filepath: str) -> str: """Recompress archive file in LZMA. LZMA compression algorithm produces smaller files than usual ZIP, and the decompression is faster. This function is useful for converting ZIP files from external sources into LZMA file. Parameters ---------- zip_filepath: str Full file path of the source ZIP file. Returns ------- str Full file path of the LZMA-compressed output file. Basically the same as `zip_filepath`, with `.zip` replaced by `.lzma`. """ # Directory where the original ZIP is, where the ZIP content will temporarily be extracted and where the final # LZMA file will be created. current_directory = os.path.dirname(zip_filepath) with tempfile.TemporaryDirectory() as tmp_dirpath: # Extract zip file and delete it files_in_zip = extract_compressed_file(compressed_filepath=zip_filepath, dest_dir=tmp_dirpath, delete_compressed=True) # Compress the files compressed_filepath = os.path.splitext(zip_filepath)[0] + ".lzma" # Replace .zip by .lzma. zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA) for filepath in files_in_zip: zipf.write(filepath, arcname=os.path.basename(filepath)) zipf.close() if not os.path.exists(compressed_filepath): raise Exception("Failed to recompress " + zip_filepath) return compressed_filepath def extract_compressed_file(compressed_filepath: str, dest_dir: str, delete_compressed: bool = False) -> list: source_zip = zipfile.ZipFile(compressed_filepath, 'r') files_in_zip = [os.path.join(dest_dir, filename) for filename in source_zip.namelist()] source_zip.extractall(dest_dir) source_zip.close() if delete_compressed: os.remove(compressed_filepath) return files_in_zip def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bool = True) -> str: """Compress a single file, using LZMA algorithm. - The output compressed file is created in the same directory as the uncompressed file. - The output compressed file name is the same as the uncompressed file, followed by `.lzma`. Parameters ---------- uncompressed_filepath: str Full file path of the to-be-compressed file. delete_uncompressed_if_success: bool If `True` delete the source uncompressed file once the compression is successfully done. Returns ------- str Full file path of the LZMA-compressed output file. Basically the same as `uncompressed_filepath`, with its extension replaced by `.lzma`. """ compressed_filepath = uncompressed_filepath + ".lzma" zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA) zipf.write(uncompressed_filepath, arcname=os.path.basename(uncompressed_filepath)) zipf.close() if os.path.exists(compressed_filepath) and delete_uncompressed_if_success: os.remove(uncompressed_filepath) else: raise FileNotFoundError("Failed to compress " + uncompressed_filepath) return compressed_filepath