Commit 9345cb4f authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

add get_input set_output. Param are saved together with the version -> from...

add get_input set_output. Param are saved together with the version -> from scheduler once for all. save and expose replace lst_par and lst_tag. Doc updated accordingly.
parent 684ef0d9
......@@ -346,14 +346,20 @@ tagging...).
The segment code is executed in a specific environment that provides:
1. access to the segment input and output
- =seg_input=: this variable is a dictionary containing the input of the segment.
- =get_input(seg)=: return the input coming from segment seg. If no
segment specified, take the first. This utility replaces the
seg_input variable which type could vary as described below.
- =seg_input=: this variable is a dictionary containing the input of the segment.
In the general case, =seg_input= is a python dictionary which
contains as many keywords as parent segments. In the case of orphan
segment, the keyword used is suffixed by the =phantom= word.
One exception to this is coming from the use of the =group_by=
directive, which alters the origin of the inputs. In this case,
=seg_input= contains the resulting class elements.
- =set_output(o)=: set the segment output as a list. If o is not a
list, set a list of one element o.
- =seg_output=: this variable has to be a list.
......@@ -374,9 +380,9 @@ The segment code is executed in a specific environment that provides:
- =get_tmp_fn()=: return a temporary filename.
3. Functionalities to use the automated parameters handling
- =lst_par=: list of parameter names of the segment to save in the meta data.
- =lst_tag=: list of parameter names which will be made visible from the web interface
- =load_param(seg, globals(), lst_par)=: retrieve parameters from the meta data.
- =save(lst)=: save the listed parameters of the segment in the meta data.
- =expose(lst)=: expose the listed parameters from the web interface
- =load(seg, globals(), lst)=: retrieve parameters from the meta data.
4. Various convenient functionalities
- =save_products(filename, globals(), lst_par)=: use pickle to save a
......
......@@ -38,7 +38,7 @@ I see at least before three projects to complete before making the first release
- The name should be changed in glob_parents and glob_seg
- [X] Are we satisfied with seg_input, is this convenient, should we
provide extra function to ease the retrieval of inputs.
- [ ] Are we satisfied with the lst_par, lst_tag, load_param syntax (may become save(name1,name2 ...) and expose(name1,name2...))
- [X] Are we satisfied with the lst_par, lst_tag, load_param syntax (may become save(name1,name2 ...) and expose(name1,name2...))
- [ ] logged_subprocess is the function I use in every segment,
quick and simple to make it perfect.
* Profiling/optimisation of the code.
......
......@@ -144,15 +144,27 @@ class Environment(EnvironmentBase):
## segment input
self.seg_input = w.task.task_input
## segment output
self.seg_output = []
## list of temporary files
self._tmpf = []
## logger instance
self.logger = init_logger (self._get_data_fn(""), self._get_log_file(), level=[])
## list of param to save
self.lst_par = []
## list of param to expose
self.lst_tag = []
def get_input(self, seg=None):
""" Get segment input.
""" Return the input coming from segment seg.
If no segment specified, take the first. This utility replaces
the seg_input variable which type could vary as described
below.
Parameters
----------
......@@ -179,6 +191,20 @@ class Environment(EnvironmentBase):
return self.seg_input
return None
def set_output(self, o):
""" Set the segment output as a list.
If o is not a list, set a list of one element o.
Parameters
----------
o : list or arbitrary python object
"""
if not isinstance (o, list):
self.seg_output=[o]
else:
self.seg_output=o
def get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
......@@ -313,7 +339,7 @@ class Environment(EnvironmentBase):
return tf
def load_param(self, seg,glo,param_name='*'):
def load(self, seg,glo,param_name='*'):
""" Update the global dictionnary with parameters of a
segment.
......@@ -402,12 +428,8 @@ class Environment(EnvironmentBase):
-------
string.
"""
try:
var_tag = glo['lst_tag']
except KeyError:
var_tag = []
strtag = ''
for param in var_tag:
for param in self.lst_tag:
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
......@@ -416,19 +438,28 @@ class Environment(EnvironmentBase):
return var_tag
def _save_param(self, seg, glo,param_name='*'):
def _save_param(self, glo,param_name='*'):
""" Save parameters of a segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
if path.exists(self._worker.pipe.get_param_file(seg)):
return
self.save_products(self._worker.pipe.get_param_file(seg),glo, param_name=param_name)
Returns
-------
dictionnary
"""
new_dict = {}
if param_name == '*':
param_name = glo.keys()
for k in param_name:
try:
new_dict[k] = glo[k]
except KeyError:
self.logger.warning('Fail to save object %s '%(k))
return new_dict
def _save_version(self, glo):
""" Save version information for all modules
......@@ -436,6 +467,10 @@ class Environment(EnvironmentBase):
----------
glo : dict, the global dictionnary
Returns
-------
dictionnary
"""
import types
version = {}
......@@ -445,6 +480,28 @@ class Environment(EnvironmentBase):
version[k] = v.__version__
return version
def save (self, lst_par):
""" Set list of parameter to save on disk
Parameters
----------
lst_par: list of string
"""
if not isinstance(lst_par, list):
lst_par = [lst_par]
self.lst_par = lst_par
def expose (self, lst_tag):
""" Set list of parameter to expose from the web interface.
Parameters
----------
lst_tag: list of string
"""
if not isinstance(lst_tag, list):
lst_tag = [lst_tag]
self.lst_tag = lst_tag
def _close(self, glo):
""" Close environment.
......@@ -458,16 +515,17 @@ class Environment(EnvironmentBase):
"""
seg = self._worker.task.seg
self.clean_tmp()
param = None
try: # Save params
var_key = glo['lst_par']
self._save_param(seg, glo, param_name=var_key)
param = self._save_param(glo, param_name=self.lst_par)
except KeyError:
self.logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
self.logger.warning( 'Fail to save the param file for seg %s' % seg)
self._worker.task.tosave = [(self._save_version(glo),self._worker.pipe.get_version_file(seg) ),(param,self._worker.pipe.get_param_file(seg) )]
self._worker.task.param = self._make_tag(seg, glo)
self._worker.task.version = self._save_version(glo)
try:
res = glo["seg_output"]
except:
......
......@@ -502,6 +502,19 @@ class Pipeline:
"""
return path.join(self.get_curr_dir(seg),'%s.args'%seg)
def get_version_file(self, seg):
""" Return the segment directory.
Parameters
----------
seg : string, segment name.
Returns
-------
string, segment directory.
"""
return path.join(self.get_curr_dir(seg),'%s.version'%seg)
def get_tag_file(self, seg):
""" Return the segment directory.
......
......@@ -188,13 +188,18 @@ class Scheduler():
d['param'] = self.products_list._list[seg][-1].param
except:
logger.info("no tag saved for seg %s"%seg)
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
## other stuff
try:
d['version'] = self.products_list._list[seg][-1].version
for d,fn in self.products_list._list[seg][-1].tosave:
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
except:
logger.info("no version saved for seg %s"%seg)
logger.info("no version nor param saved for seg %s"%seg)
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......
......@@ -60,8 +60,8 @@ class Task:
self.parents = parents
## Dict of str_input of the parent tasks
self.str_parents = {}
## Dict of dependency module version
self.version = {}
## List of tuple (dict, filename) to save on disk at the segment level
self.tosave = []
def __str__(self):
......
......@@ -2,4 +2,4 @@ print seg_input
import os
inp = get_input()
os.system("touch %s"%get_data_fn("file%d.dat"%inp))
seg_output = [inp]
set_output(inp)
#multiplex cross_prod group_by "0"
import numpy
print seg_input
inp = get_input()
#depend /home/lejeune/rien.txt
import os
f = glob_seg("a", "file*.dat")
......@@ -11,4 +11,5 @@ g = glob_parent("b*.dat")
for file in g:
os.system("cp %s %s"%(file, get_data_fn("fromglobparent_%s"%os.path.basename(file))))
save("f")
expose("inp")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment