utils.py 3.97 KB
Newer Older
1
import os
2
import sys
3
import configobj
4
import pkgutil
5
import zipfile
6
import tempfile
7

8
import wimcollect.common.logger as logger
9 10


11
class LogConfig:
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
    def __init__(self, object_id: str, config_parser: configobj.ConfigObj = None, log: logger = None):
        # Config
        if config_parser is None:
            self.config = get_config_parser()
        else:
            self.config = config_parser

        # Logger
        if log is None:
            self.logger = logger.Logger(self.config)
        else:
            self.logger = log
        self.object_id = object_id


def get_config_parser() -> configobj.ConfigObj:
    # Configuration file
    pkgpath = os.path.dirname(pkgutil.get_loader("wimcollect").path)
    conf_file_path = os.path.join(pkgpath, "config", "settings.ini")
    if not os.path.exists(conf_file_path):
        msg = "Configuration file not found in [" + conf_file_path + "]"
        sys.stderr.write(msg)
        raise FileNotFoundError(msg)
    config_parser = configobj.ConfigObj(conf_file_path)
    return config_parser

38

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
def recompress_file(zip_filepath: str) -> str:
    """Recompress archive file in LZMA.

    LZMA compression algorithm produces smaller files than usual ZIP, and the decompression is faster.

    This function is useful for converting ZIP files from external sources into LZMA file.

    Parameters
    ----------
    zip_filepath: str
        Full file path of the source ZIP file.

    Returns
    -------
    str
        Full file path of the LZMA-compressed output file. Basically the same as `zip_filepath`, with `.zip` replaced
        by `.lzma`.
    """
57 58 59 60
    # Directory where the original ZIP is, where the ZIP content will temporarily be extracted and where the final
    # LZMA file will be created.
    current_directory = os.path.dirname(zip_filepath)

61 62 63 64 65
    with tempfile.TemporaryDirectory() as tmp_dirpath:
        # Extract zip file and delete it
        files_in_zip = extract_compressed_file(compressed_filepath=zip_filepath,
                                               dest_dir=tmp_dirpath,
                                               delete_compressed=True)
66

67 68 69 70 71 72
        # Compress the files
        compressed_filepath = os.path.splitext(zip_filepath)[0] + ".lzma"  # Replace .zip by .lzma.
        zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA)
        for filepath in files_in_zip:
            zipf.write(filepath, arcname=os.path.basename(filepath))
        zipf.close()
73

74 75 76 77
    if not os.path.exists(compressed_filepath):
        raise Exception("Failed to recompress " + zip_filepath)

    return compressed_filepath
JOSSOUD Olivier's avatar
JOSSOUD Olivier committed
78 79 80 81 82 83 84 85 86 87


def extract_compressed_file(compressed_filepath: str, dest_dir: str, delete_compressed: bool = False) -> list:
    source_zip = zipfile.ZipFile(compressed_filepath, 'r')
    files_in_zip = [os.path.join(dest_dir, filename) for filename in source_zip.namelist()]
    source_zip.extractall(dest_dir)
    source_zip.close()
    if delete_compressed:
        os.remove(compressed_filepath)
    return files_in_zip
JOSSOUD Olivier's avatar
JOSSOUD Olivier committed
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111


def compress_file(uncompressed_filepath: str, delete_uncompressed_if_success: bool = True):
    """Compress a single file, using LZMA algorithm.

    - The output compressed file is created in the same directory as the uncompressed file.
    - The output compressed file name is the same as the uncompressed file, followed by `.lzma`.

    Parameters
    ----------
    uncompressed_filepath: str
        Full file path of the to-be-compressed file.
    delete_uncompressed_if_success: bool
        If `True` delete the source uncompressed file once the compression is successfully done.
    """
    compressed_filepath = uncompressed_filepath + ".lzma"
    zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA)
    zipf.write(uncompressed_filepath, arcname=os.path.basename(uncompressed_filepath))
    zipf.close()

    if os.path.exists(compressed_filepath) and delete_uncompressed_if_success:
        os.remove(uncompressed_filepath)
    else:
        raise FileNotFoundError("Failed to compress " + uncompressed_filepath)