## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see http://www.gnu.org/licenses/gpl.html import os.path as path import subprocess import threading from glob import glob from datetime import datetime import logging from utils import str_web from contextlib import closing import cPickle as pickle from pipelet.utils import init_logger class NullHandler(logging.Handler): """ Extension of the logging handler class. """ def emit(self, record): """ Avoid warnings. """ pass class EnvironmentBase(): """ Basic segment's facilities. An environment contains all segment's facilities provided to the user. The base class is almost empty, and should be extended to define a completely new environment. The default class can be extended to complete the default environment with new utilities. """ def __init__(self, w): """ Initialize the base environment with task input. Parameters ---------- w: a worker instance """ self._worker = w self._seg_input = w.task.task_input def _get_data_fn(self, x): """ Complete the filename with the path to the working directory. Parameters ---------- x: string, filename suffix Returns ------- string, filename """ return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input), x) def _get_namespace(self): """ Prepare the namespace for a segment. Returns ------- Dictionnary, namespace containing all the public methods of the class. """ glo = {} for k in dir(self): if k[0] != "_": glo[k] = getattr(self, k) return glo def _get_log_file(self): """ Return the log file name """ return self._get_data_fn("seg_"+self._worker.task.seg+".log") def _hook(self, hook_name, glo): """ Execute hook code. Search for an extra segment code file, and update dictionnary with the result of its execution. Parameters ---------- hook_name: string, hook name glo: dict, global dictionnary to update. """ code = self._worker.pipe.repository.get_hook_string(self._worker.task.seg, hook_name) if code: exec(code, glo) else: print "No hook file named %s for seg %s"%(self._worker.task.seg,hook_name) def _close(self, glo): """ Return segment's output from dictionnary. Parameters ---------- glo: segment execution dictionnary """ return None class Environment(EnvironmentBase): """ Default segment's facilities. The default environment contains some basic utilities related to : - filename - parameters - sub process The default environment can be extended to complete the default environment with new utilities. """ def __init__(self, w): """ Initialize the base environment with task input. Parameters ---------- w: a worker instance """ self._worker = w self.seg_input = w.task.task_input ## list of temporary files self._tmpf = [] self.logger = init_logger (self._get_data_fn(""), self._get_log_file(), level=[]) def get_data_fn(self, x): """ Complete the filename with the path to the working directory. Parameters ---------- x: string, filename suffix Returns ------- string, filename """ return self._get_data_fn(x) def hook(self, hook_name, glo): """ Execute hook code. Search for an extra segment code file, and update dictionnary with the result of its execution. Parameters ---------- hook_name: string, hook name glo: dict, global dictionnary to update. """ self.logger.info ("hooking %s"%hook_name) return self._hook(hook_name, glo) def glob_seg(self, x, y): """ Return the list of filename matching y in the working directory of segment x. Parameters ---------- x: string, segment name y: string, regexp of file to glob. Returns ------- list of filenames. """ segx = self._worker.pipe.find_seg(self._worker.task.seg, x) if segx is None: self.logger.warning("No parent segment matching %s found"%x) return glob(path.join(self._worker.pipe.get_data_dir(segx),y))\ +glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y))) def logged_subprocess(self, args, shell=False, except_on_failure=True): """ Execute a subprocess and log its output. Create files process_name.log and process_name.err Parameters ---------- args: a Task argument shell: If set to True, the command is run via a shell except_on_failure: If set to True, raise an exception when command execution return non zero Returns ------- tuple (outputfile, errorfile) """ proc = args[0] of = self.get_data_fn(proc+'.log') ef = self.get_data_fn(proc+'.err') o = file(of,'w') e = file(ef,'w') o.write('#'+' '.join([str(a) for a in args])+'\n') o.flush() p=subprocess.Popen(args, stdout=o, stderr=e, shell=shell) p.communicate()[0] if except_on_failure: if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, args[0]) o.close() e.close() return (of, ef) def get_tmp_fn(self): """ Obtain a temporary filename Note : has to be part of the segment execution environment The temporary file is added to the intern list for future removal. Returns ------- string, temporary filename. """ tf = path.join(self._worker.work_dir, str(datetime.now().toordinal())) self._tmpf.append(tf) return tf def load_param(self, seg,glo,param_name='*'): """ Update the global dictionnary with parameters of a segment. Parameters ---------- seg : string, segment name glo : dict, the global dictionnary param_name : string list, parameters name. """ segx = self._worker.pipe.find_seg(self._worker.task.seg, seg) if segx is None: self.logger.warning("No parent segment matching %s found"%seg) if not path.exists(self._worker.pipe.get_param_file(segx)): return self.load_products(self._worker.pipe.get_param_file(segx), glo, param_name=param_name) def save_products(self, filename, glo, param_name='*'): """ Use pickle to save a part of a given namespace. Parameters ---------- filename : string, save in this filename. glo : dict, the namespace. param_name : string list, name of objects to save. If '*' save everything in the dictionnary. """ new_dict = {} if param_name == '*': param_name = glo.keys() for k in param_name: try: new_dict[k] = glo[k] except KeyError: logger.warning('Fail to save object %s in file %s'%(k,filename)) f = file(filename,'w') pickle.dump(new_dict,f) f.close() def load_products(self, filename, glo, param_name='*'): """ Update a namespace by unpickling requested object from the file. Parameters ---------- filename: string, the pickle filename. glo : dict, the namespace to update. param_name : string list, names of object to unpickle. If '*', everything from the file is loaded. """ try: f = file(filename) new_dict = pickle.load(f) f.close() except IOError: logger.warning('No such file: %s'%filename) except UnpicklingError: logger.warning('Failed to unpickle from file: %s'%filename) f.close() if param_name == '*': param_name = new_dict.keys() for k in param_name: try: glo[k] = new_dict[k] except KeyError: logger.warning( 'Fail to load object %s from file %s'%(k,filename) ) def clean_tmp(self): """ Delete the list of tmp file created by a segment """ for e in self._tmpf: subprocess.Popen(['rm', e]).communicate()[0] self._tmpf = [] def _make_tag(self, seg, glo): """ Write a tag using parameters of a segment. Parameters ---------- seg : string, segment name glo : dict, the global dictionnary Returns ------- string. """ try: var_tag = glo['lst_tag'] except KeyError: var_tag = [] strtag = '' for param in var_tag: if param in glo: strtag = strtag + param + '=' + str_web(glo[param])+' ' else: logger.warning( 'parameter '+param+' not in dictionary') var_tag = strtag + ' ('+ datetime.today().strftime("%e %m - %R")+') ' fn = self._worker.pipe.get_meta_file(seg) with closing(file(fn, 'r')) as f: d = pickle.load(f) d['param'] = var_tag with closing(file(fn, 'w')) as f: r = pickle.dump(d,f) return var_tag def _save_param(self, seg, glo,param_name='*'): """ Save parameters of a segment. Parameters ---------- seg : string, segment name glo : dict, the global dictionnary param_name : string list, parameters name. """ if path.exists(self._worker.pipe.get_param_file(seg)): return self.save_products(self._worker.pipe.get_param_file(seg),glo, param_name=param_name) def _close(self, glo): """ Close environment. This routine performs post processing like saving tag, parameters, etc. Parameters ---------- glo : dict, the global dictionnary seg : string, segment name """ seg = self._worker.task.seg self.clean_tmp() try: # Save params var_key = glo['lst_par'] self._save_param(seg, glo, param_name=var_key) except KeyError: self.logger.warning( 'Nothing to save in param file for seg %s' % seg) except Exception: self.logger.warning( 'Fail to save the param file for seg %s' % seg) self._worker.task.param = self._make_tag(seg, glo) try: res = glo["seg_output"] except: res = [] self.logger.info("No segment output found, setting seg_output to None") return res