Commit 0db2a772 authored by Guillaume's avatar Guillaume
Browse files

resolve conflict

parents 9d2083fe 225e0f75
......@@ -73,3 +73,5 @@ Pour faire le 2/ ...
-----------------------------------------------------------------
=======
Test
#!bin/python
# -*- coding: utf-8 -*-
import signal
import subprocess
from argparse import ArgumentParser
from multiprocessing.dummy import Pool
from multiprocessing import Pool
from pathlib import Path
import lockfile
from tqdm import tqdm
from DRSParser import DRSParser
from constants import *
from utils import *
from DRSParser import DRSParser
class Process(object):
"""
Child process.
"""
def __init__(self, ctx):
"""
Processing context passed to each process.
"""
self.drs = ctx.drs
self.outcat = ctx.outcat
self.depth = ctx.depth
self.project = ctx.project
def __call__(self, entry):
def decode_path(p):
"""
Any error switches to the next child process.
It does not stop the main process at all.
Child process.
"""
# Escape in case of error.
try:
# Split entry into full file path and latest boolean.
path, latest = map(str.strip, entry.split())
path, latest = map(str.strip, p.split())
# Convert path into pathlib.Path object.
path = Path(path)
# Get facets from path.
facets = self.drs.get_facets_from_path(path)
facets = drs.get_facets_from_path(path)
# Update facets from filename.
facets.update(self.drs.get_facets_from_filename(path.stem))
facets.update(drs.get_facets_from_filename(path.stem))
# Build CSV entry.
entry_facets = [facets[i] for i in self.drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), self.project]
entry_facets = [facets[i] for i in drs.dir_keys]
entry = [IPSL_DATA_ROOT + path.as_posix(), args.project]
entry.extend(entry_facets)
entry.extend([facets['period_start'], facets['period_end'], latest])
# Build catalog filename.
if self.depth == 'project':
catpath = os.path.join(self.outcat,
self.project,
self.project)
if args.depth == 'project':
catpath = os.path.join(args.outcat,
args.project,
args.project)
else:
catdepth = entry_facets[:self.drs.dir_keys.index(self.depth) + 1]
catpath = os.path.join(self.outcat,
self.project,
catdepth = entry_facets[:drs.dir_keys.index(args.depth) + 1]
catpath = os.path.join(args.outcat,
args.project,
'_'.join(catdepth))
# Lock catalog file to avoid multiprocessing concurrent access.
......@@ -77,7 +57,7 @@ class Process(object):
if not os.path.isfile(catpath + CSV_EXTENSION):
# Build CSV header.
header = ['path', 'project']
header.extend(self.drs.dir_keys)
header.extend(drs.dir_keys)
header.extend(['period_start', 'period_end', 'latest'])
# Ensure header and entry have same length.
......@@ -87,76 +67,12 @@ class Process(object):
make_csv(catpath + CSV_EXTENSION, header)
# Write JSON catalog in the same time.
make_json(catpath + JSON_EXTENSION, self.project, header)
make_json(catpath + JSON_EXTENSION, args.project, header)
# Write catalog entry.
with open(catpath + CSV_EXTENSION, 'a+') as f:
f.write(','.join(entry) + '\n')
except Exception:
raise
class Runner(object):
def __init__(self, threads):
# Initialize the pool.
self.pool = None
if threads != 1:
self.pool = Pool(processes=threads)
def _handle_sigterm(self):
# Properly kill the pool in case of SIGTERM.
if self.pool:
self.pool.terminate()
exit(1)
def run(self, sources, ctx):
# Instantiate signal handler.
sig_handler = signal.signal(signal.SIGTERM, self._handle_sigterm)
# Read sources.
for source in sources:
# Get total entires in a fast way for beautiful progress bar.
total = int(subprocess.check_output(["wc", "-l", source]).split()[0])
# Instantiate pool of processes.
if self.pool:
# Instantiate pool iterator with progress bar.
processes = tqdm(self.pool.imap(Process(ctx), self.get_entries(source)), desc='Catalog generation',
total=total)
# Sequential processing use basic map function.
else:
# Instantiate processes iterator with progress bar.
processes = tqdm(map(Process(ctx), self.get_entries(source)), desc='Catalog generation', total=total)
# Run processes in a dummy variable.
_ = [x for x in processes]
# Terminate pool in case of SIGTERM signal.
signal.signal(signal.SIGTERM, sig_handler)
# Close the pool.
if self.pool:
self.pool.close()
self.pool.join()
@staticmethod
def get_entries(source):
# Iterate over each line of the source file.
with open(source, 'r+') as f:
for entry in f:
yield entry
def get_args():
"""
......@@ -193,14 +109,14 @@ def get_args():
)
parser.add_argument(
'-t', '--threads',
'-c', '--processes',
metavar='1',
type=int,
default=1,
help="""
Number of threads.
Number of processes.
Set to "1" seems pure sequential processing (default).
Set to "-1" seems all available threads as returned by "multiprocessing.cpu_count()".
Set to "-1" seems all available processes as returned by "multiprocessing.cpu_count()".
"""
)
......@@ -215,57 +131,48 @@ def get_args():
return parser.prog, parser.parse_args()
class Context(object):
"""
Base class for processing context manager.
"""
def __init__(self, args):
# Set project.
self.project = args.project
"""
Run main program
# Set DRS parser.
self.drs = DRSParser(args.project)
"""
# Get command-line arguments.
prog, args = get_args()
# Set output catalog directory.
self.outcat = args.outcat
# Add program name as argument.
setattr(args, 'prog', prog)
# Set max pool processes.
self.threads = args.threads
# Set DRS parser.
drs = DRSParser(args.project)
# Set catalog depth.
self.depth = args.depth
# Read sources.
for source in INPUT_SOURCES[args.project]:
# Set sources
self.sources = INPUT_SOURCES[args.project]
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
# Get total entries in a fast way for beautiful progress bar.
total = int(subprocess.check_output(["wc", "-l", source]).split()[0])
if args.processes != 1:
def main():
"""
Run main program
# Instantiate pool of workers.
pool = Pool(processes=args.processes)
"""
# Get command-line arguments.
prog, args = get_args()
# Add program name as argument.
setattr(args, 'prog', prog)
# Instantiate pool iterator with progress bar.
processes = tqdm(pool.imap(decode_path,
get_entries(source)),
desc='Catalog generation',
total=total)
# Instantiate processing context
with Context(args) as ctx:
# Instantiate the runner.
r = Runner(ctx.threads)
# Sequential processing use basic map function.
else:
# Run the pool.
r.run(ctx.sources, ctx)
# Instantiate processes iterator with progress bar.
processes = tqdm(map(decode_path,
get_entries(source)),
desc='Catalog generation',
total=total)
# Run processes in a dummy variable.
_ = [x for x in processes]
if __name__ == "__main__":
main()
if args.processes != 1:
pool.close()
pool.join()
......@@ -13,6 +13,13 @@ from jinja2 import Template
from constants import CATALOG_DESCRIPTION_TEMPLATE
def get_entries(source):
# Iterate over each line of the source file.
with open(source, 'r') as f:
for entry in f:
yield entry
def make_csv(catpath, header):
"""
Write header into CSV catalog file.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment