Commit db310f0f authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

bug #1269: change version system for task_id included to str_input

parent 29658a98
......@@ -3,12 +3,12 @@ I see at least before three projects to complete before making the first release
* The task_id project is not closed:
This is release critical.
- [ ] There is some dark zone (sideeffects):
- how segment without seg_output are treated (no task is stored, what happened when we delete these kind of segs ...)
- Any problem with Orphan tasks ?
- problem for parents giving same str_input outside the special case of groud_by (is this really ok for group_by ?)
- [ ] how segment without seg_output are treated (no task is stored, what happened when we delete these kind of segs ...)
- [ ] Any problem with Orphan tasks ?
- [ ] problem for parents giving same str_input outside the special case of groud_by
- [ ] Does the tracking on disk allow to reconstruct the database
- [ ] I modified the task format to allow for easy glob_seg. It may have break other things (At least the database reconstruction)
- [ ] Is the treatment of redundant tasks resulting from group_by OK
- [X] Is the treatment of redundant tasks resulting from group_by OK
- [ ] The code of multiplex is hard to read/lacks for comments. Variable Names, data structure may be rethought.
* We should bring the default environment to its final state:
This is partly release critical: changes breaking compatibility are
......
......@@ -67,7 +67,7 @@ class EnvironmentBase():
-------
string, filename
"""
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input,version=self._worker.task.version), x)
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=(self._worker.task.task_input, self._worker.task.id)), x)
def _get_namespace(self):
""" Prepare the namespace for a segment.
......
......@@ -516,44 +516,29 @@ class Pipeline:
return path.join(self.get_curr_dir(seg),'stdout')
def get_data_dir(self, seg, prod=None, version=0):
def get_data_dir(self, seg, prod=None):
""" Return the data directory for the segment or a product
full name.
Parameters
----------
seg : string, segment name.
prod : string, product name.
prod : tuple, (product object, task_id).
Returns
-------
string, segment directory or product full name.
"""
if prod is not None:
if version>0:
return path.join(self.get_data_dir(seg), str_file(prod)+"_%d"%version)
if isinstance(prod, tuple):
task_id = prod[1]
prod = prod[0]
else:
return path.join(self.get_data_dir(seg), str_file(prod))
task_id = ""
return path.join(self.get_data_dir(seg), "%d_"%task_id+str_file(prod))
else:
return path.join(self.get_curr_dir(seg), 'data')
def find_new_version(self, seg, prod=None):
""" Return a new version number from existing directory.
Parameters
----------
seg : string, segment name.
prod : string, product name.
Returns
-------
integer : version number
"""
d = self.get_data_dir(seg, prod)
v = len(glob(d+"*"))
return v
def get_log_dir (self):
""" Return the pipe log directory.
"""
......@@ -565,7 +550,7 @@ class Pipeline:
"""
return path.join(self.get_curr_dir(seg),'%s.log'%seg)
def get_meta_file (self, seg, prod=-1, version=0):
def get_meta_file (self, seg, prod=-1):
""" Return the meta data filename
This routine is called for segment meta data storage and task meta data storage.
......@@ -575,7 +560,7 @@ class Pipeline:
if (prod == -1):
dirname = self.get_curr_dir(seg)
else:
dirname = self.get_data_dir(seg, prod, version)
dirname = self.get_data_dir(seg, prod)
return path.join(dirname,'%s.meta'%seg)
......
......@@ -271,7 +271,6 @@ class Scheduler():
self.products_list.push(t)
self.nb_success = self.nb_success + 1
except ValueError: ## parents do not match parents of the done list
t.version = self.pipe.find_new_version(t.seg, t.task_input) ## update version number.
self.put_task(t)
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
......
......@@ -60,9 +60,6 @@ class Task:
self.parents = parents
## List of str_input of the parent tasks
self.str_parents = {}
## integer, version number (different versions correspond to tasks which share the same inputs but different parents)
self.version = 0
def __str__(self):
......
......@@ -324,7 +324,7 @@ class SqliteTracker(Tracker,threading.Thread):
t.update_status ('queued')
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input, version=t.version)
str_input = self.pipe.get_data_dir(t.seg, prod=(t.task_input, 0))
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
......@@ -332,6 +332,8 @@ class SqliteTracker(Tracker,threading.Thread):
(seg_id, t.status, task_input, task_output, str_input, t.queued_on))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
str_input = self.pipe.get_data_dir(t.seg, prod=(t.task_input, t.id))
self.conn.execute('update tasks set str_input=? where task_id=?',(str_input, t.id))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
for p in self.pipe.get_parents(t.seg):
t.str_parents[p] = []
......@@ -384,7 +386,7 @@ class SqliteTracker(Tracker,threading.Thread):
self._asynchronous_request('update segments set param=? where seg_id =?',
(t.param, self.seg_id_cache[t.seg]))
if status =='done' or status == 'failed':
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input, version=t.version))
t.store_meta(self.pipe.get_meta_file(t.seg, prod=(t.task_input,t.id)))
return t
......
......@@ -220,7 +220,7 @@ class Worker(object):
if prod==None:
logger.critical("A new task is submitted without dedicated directory (%s)"%d)
if not prod == None:
d = self.pipe.get_data_dir(seg, prod, task.version)
d = self.pipe.get_data_dir(seg, prod=(prod, task.id))
success = self._make_dir(d)
if not success:
logger.critical("A new task is submitted without dedicated directory (%s)"%d)
......
......@@ -12,7 +12,7 @@ b-> c;
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
P.push(a=list(arange(4)))
P.push(b=[1,2,3])
P.push(b=[1,2,3,4])
W,t = launch_interactive(P, log_level=logging.DEBUG)
W.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment