Commit b9bf3f98 authored by JOSSOUD Olivier's avatar JOSSOUD Olivier
Browse files

SSH DMC. Export `recompress_file` function to utils module.

parent 3e82b125
Pipeline #56903 passed with stages
in 2 minutes and 21 seconds
......@@ -2,9 +2,9 @@ import os
import re
import configobj
import datetime
import zipfile
import paramiko
import wimcollect.utils as utils
import wimcollect.logger as logger
......@@ -41,7 +41,12 @@ class Collector:
dest_filepath = self.__get_dest_filepath__(day, source_filepath)
success = self.__sftp_download__(sftp_client, source_filepath, dest_filepath)
if success:
self.__recompress_file__(day, dest_filepath)
self.logger.write(self.object_id, day.strftime("%Y-%m-%d") + ": Re-compressing from ZIP to LZMA...")
lzma_filepath = utils.recompress_file(dest_filepath)
if lzma_filepath is None:
self.logger.write(self.object_id, day.strftime("%Y-%m-%d") + ": FAILED to create archive file")
else:
self.logger.write(self.object_id, day.strftime("%Y-%m-%d") + ": Done. Archive file: " + lzma_filepath)
sftp_client.close()
......@@ -60,32 +65,6 @@ class Collector:
return dest_filepath
def __recompress_file__(self, day: datetime.date, zip_filepath: str):
self.logger.write(self.object_id, day.strftime("%Y-%m-%d") + ": Re-compressing from ZIP to LZMA...")
# Directory where the original ZIP is, where the ZIP content will temporarily be extracted and where the final
# LZMA file will be created.
current_directory = os.path.dirname(zip_filepath)
# Extract zip file and delete it
source_zip = zipfile.ZipFile(zip_filepath, 'r')
files_in_zip = [os.path.join(current_directory, filename) for filename in source_zip.namelist()]
source_zip.extractall(current_directory)
source_zip.close()
os.remove(zip_filepath)
# Compress the files
compressed_filepath = os.path.splitext(zip_filepath)[0] + ".lzma"
zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA)
for filepath in files_in_zip:
zipf.write(filepath, arcname=os.path.basename(filepath))
zipf.close()
if os.path.exists(compressed_filepath):
self.logger.write(self.object_id, day.strftime("%Y-%m-%d") + ": Done. Archive file: " + compressed_filepath)
for file in files_in_zip:
os.remove(file)
def __sftp_connect__(self) -> paramiko.SFTPClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
......@@ -102,4 +81,4 @@ class Collector:
def __sftp_download__(self, sftp_client: paramiko.SFTPClient, source_filepath: str, dest_filepath: str) -> bool:
sftp_client.get(source_filepath, dest_filepath)
success = os.path.exists(dest_filepath)
return success
\ No newline at end of file
return success
import os
import re
import configobj
import datetime
import zipfile
import paramiko
import wimcollect.logger as logger
def recompress_file(zip_filepath: str):
# Directory where the original ZIP is, where the ZIP content will temporarily be extracted and where the final
# LZMA file will be created.
current_directory = os.path.dirname(zip_filepath)
# Extract zip file and delete it
source_zip = zipfile.ZipFile(zip_filepath, 'r')
files_in_zip = [os.path.join(current_directory, filename) for filename in source_zip.namelist()]
source_zip.extractall(current_directory)
source_zip.close()
os.remove(zip_filepath)
# Compress the files
compressed_filepath = os.path.splitext(zip_filepath)[0] + ".lzma"
zipf = zipfile.ZipFile(compressed_filepath, 'w', zipfile.ZIP_LZMA)
for filepath in files_in_zip:
zipf.write(filepath, arcname=os.path.basename(filepath))
zipf.close()
if os.path.exists(compressed_filepath):
for file in files_in_zip:
os.remove(file)
return compressed_filepath
else:
return None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment