Commit 4a730795 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Feature #872 la suite

parent 7b047d24
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import os.path as path
import subprocess
import threading
from glob import glob
from datetime import datetime
import logging
from utils import str_web
from contextlib import closing
import cPickle as pickle
from pipelet.utils import init_logger
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
"""
def emit(self, record):
""" Avoid warnings.
"""
pass
class EnvironmentBase():
""" Basic segment's facilities.
An environment contains all segment's facilities provided to the
user. The base class is almost empty, and should be extended to
define a completely new environment. The default class can be
extended to complete the default environment with new utilities.
"""
def __init__(self, w):
""" Initialize the base environment with task input.
Parameters
----------
w: a worker instance
"""
self._worker = w
self._segment_args = w.task.task_input
def _get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input), x)
def _get_namespace(self):
""" Prepare the namespace for a segment.
Returns
-------
Dictionnary, namespace containing all the
public methods of the class.
"""
L = dir(self)
l = []
for e in L:
if e[0] != "_":
l.append(e)
return l
def _get_log_file(self):
""" Return the log file name
"""
return self._get_data_fn("segment_error.log")
def _hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code = self._worker.pipe.repository.get_hook_string(self._worker.task.seg, hook_name)
if code:
exec(code, glo)
else:
print "No hook file named %s for seg %s"%(self._worker.task.seg,hook_name)
class Environment(EnvironmentBase):
""" Default segment's facilities.
The default environment contains some basic utilities related to :
- filename
- parameters
- sub process
The default environment can be extended to complete the
default environment with new utilities.
"""
def __init__(self, w):
""" Initialize the base environment with task input.
Parameters
----------
w: a worker instance
"""
self._worker = w
self.segment_input = w.task.task_input
## list of temporary files
self._tmpf = []
self.logger = init_logger (self._get_data_fn(""), self._get_log_file(), level=[])
def get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return self._get_data_fn(x)
def hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
return self._hook(hook_name, glo)
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx = self._worker.pipe.find_seg(self._worker.task.seg, x)
return glob(path.join(self._worker.pipe.get_data_dir(segx),y))\
+glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y)))
def logged_subprocess(self, args, shell=False):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err
Parameters
----------
args: a Task argument
Returns
-------
tuple (outputfile, errorfile)
"""
proc = args[0]
of = self.get_data_fn(w, proc+'.log')
ef = self.get_data_fn(w, proc+'.err')
o = file(of,'w')
e = file(ef,'w')
o.write('#'+' '.join([str(a) for a in args])+'\n')
o.flush()
subprocess.Popen(args, stdout=o, stderr=e, shell=shell).communicate()[0]
o.close()
e.close()
return (of, ef)
def get_tmp_fn(self):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf = path.join(self._worker.work_dir, str(datetime.now().toordinal()))
self._tmpf.append(tf)
return tf
def load_param(self, seg,glo,param_name='*'):
""" Update the global dictionnary with parameters of a
segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
segx = self._worker.pipe.find_seg(self._worker.task.seg, seg)
if not path.exists(self._worker.pipe.get_param_file(segx)):
return
self.load_products(self._worker.pipe.get_param_file(segx), glo, param_name=param_name)
def save_products(self, filename, glo, param_name='*'):
""" Use pickle to save a part of a given namespace.
Parameters
----------
filename : string, save in this filename.
glo : dict, the namespace.
param_name : string list, name of objects to save. If '*' save
everything in the dictionnary.
"""
new_dict = {}
if param_name == '*':
param_name = glo.keys()
for k in param_name:
try:
new_dict[k] = glo[k]
except KeyError:
logger.warning( 'Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
def load_products(self, filename, glo, param_name='*'):
""" Update a namespace by unpickling requested object from the
file.
Parameters
----------
filename: string, the pickle filename.
glo : dict, the namespace to update.
param_name : string list, names of object to unpickle. If '*',
everything from the file is loaded.
"""
try:
f = file(filename)
new_dict = pickle.load(f)
f.close()
except IOError:
logger.warning( 'No such file: %s'%filename)
except UnpicklingError:
logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
for k in param_name:
try:
glo[k] = new_dict[k]
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def clean_tmp(self):
""" Delete the list of tmp file created by a segment
"""
for e in self._tmpf:
subprocess.Popen(['rm', e]).communicate()[0]
self._tmpf = []
def make_tag(self, seg, glo):
""" Write a tag using parameters of a segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
Returns
-------
string.
"""
try:
var_tag = glo['var_tag']
except KeyError:
var_tag = []
strtag = ''
for param in var_tag:
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
fn = self._worker.pipe.get_meta_file(seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = var_tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
return var_tag
def save_param(self, seg, glo,param_name='*'):
""" Save parameters of a segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
if path.exists(self._worker.pipe.get_param_file(seg)):
return
self.save_products(self._worker.pipe.get_param_file(seg),glo, param_name=param_name)
def _close(self, glo, seg):
""" Close environment.
This routine performs post processing like saving tag,
parameters, etc.
Parameters
----------
glo :
"""
self.clean_tmp()
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
self.logger.info ('Save param file')
except KeyError:
self.logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
self.logger.warning( 'Fail to save the param file for seg %s' % seg)
self._worker.task.param = self.make_tag(seg, glo)
try:
res = glo["res"]
except:
res = None
self._worker.task.task_output = res
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment