Commit 5891e343 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Bug #1269 make_dir issue solved with attribut version. Bug in push_next_seg using parents solved.

parent eb9ebd38
......@@ -67,7 +67,7 @@ class EnvironmentBase():
string, filename
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input), x)
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input,version=self._worker.task.version), x)
def _get_namespace(self):
""" Prepare the namespace for a segment.
......@@ -24,6 +24,7 @@ from contextlib import closing
from environment import *
from task import Task
import multiplex
from glob import glob
class PipeException(Exception):
""" Extension of exception class.
......@@ -515,7 +516,7 @@ class Pipeline:
return path.join(self.get_curr_dir(seg),'stdout')
def get_data_dir(self, seg, prod=None):
def get_data_dir(self, seg, prod=None, version=0):
""" Return the data directory for the segment or a product
full name.
......@@ -529,10 +530,29 @@ class Pipeline:
string, segment directory or product full name.
if prod is not None:
return path.join(self.get_data_dir(seg), str_file(prod))
if version>0:
return path.join(self.get_data_dir(seg), str_file(prod)+"_%d"%version)
return path.join(self.get_data_dir(seg), str_file(prod))
return path.join(self.get_curr_dir(seg), 'data')
def find_new_version(self, seg, prod=None):
""" Return a new version number from existing directory.
seg : string, segment name.
prod : string, product name.
integer : version number
d = self.get_data_dir(seg, prod)
v = len(glob(d+"*"))
return v
def get_log_dir (self):
""" Return the pipe log directory.
......@@ -545,13 +565,17 @@ class Pipeline:
return path.join(self.get_curr_dir(seg),'%s.log'%seg)
def get_meta_file (self, seg, prod=-1):
def get_meta_file (self, seg, prod=-1, version=0):
""" Return the meta data filename
This routine is called for segment meta data storage and task meta data storage.
If the first case, meta data are stored in the segment curr_dir.
In the second case, meta data are stored in the task directory (prod may be None)
if (prod == -1):
dirname = self.get_curr_dir(seg)
dirname = self.get_data_dir(seg, prod)
dirname = self.get_data_dir(seg, prod, version)
return path.join(dirname,'%s.meta'%seg)
......@@ -217,7 +217,6 @@ class Scheduler():
if f:
dest = d+'/'+os.path.basename(f)
os.system("cp %s %s"%(f, dest))
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_string(seg, h).co_filename
......@@ -231,7 +230,7 @@ class Scheduler():
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
dstrp = [str(sorted(t.parents).join) for t in d]
dstrp = [set(t.parents) for t in d] ## list of set'Found %d done tasks segment %s'%(len(d),seg))'Found %d failed tasks segment %s'%(len(failed),seg))
......@@ -253,11 +252,13 @@ class Scheduler():
else: # done (or not because parents changed)
#logger.debug("task already accomplished in segment %s"%seg)
strp = str(sorted(t.parents))
strp = set(t.parents)
# fetch the result of the task and store it in the task list
#ind = dprod.index(t.task_input)
ind = dstrp.index(strp)
if len(t.parents)==0:
ind = dprod.index(t.task_input) # look at task input if no parents
ind = dstrp.index(strp) ## find parents set in list of done parents sets
t = d[ind];
#logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
......@@ -266,6 +267,7 @@ class Scheduler():
self.nb_success = self.nb_success + 1
except ValueError: ## parents do not match parents of the done list
t.version = self.pipe.find_new_version(t.seg, t.task_input) ## update version number.
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
......@@ -60,6 +60,8 @@ class Task:
self.parents = parents
## List of str_input of the parent tasks
self.str_parents = []
## integer, version number (different versions correspond to tasks which share the same inputs but different parents)
self.version = 0
def __str__(self):
......@@ -323,7 +323,7 @@ class SqliteTracker(Tracker,threading.Thread):
t.update_status ('queued')
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input, version=t.version)
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
......@@ -379,7 +379,7 @@ class SqliteTracker(Tracker,threading.Thread):
self._asynchronous_request('update segments set param=? where seg_id =?',
(t.param, self.seg_id_cache[t.seg]))
if status =='done' or status == 'failed':
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input))
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input, version=t.version))
return t
......@@ -215,14 +215,15 @@ class Worker(object):
success = self._make_dir(d)
if not success:"cannot create directory %s - file exist"%d)
## retry with some suffix
## but : what happend with glob_seb and get_data_fn ?
## this happen if a new task (new input) is passed
## to an existing segment.
if prod==None:
logger.critical("A new task is submitted without dedicated directory (%s)"%d)
if not prod == None:
d = self.pipe.get_data_dir(seg, prod)
d = self.pipe.get_data_dir(seg, prod, task.version)
success = self._make_dir(d)
if not success:"cannot create directory %s - file exist"%d)
logger.critical("A new task is submitted without dedicated directory (%s)"%d)
def prepare_env(self, task):
""" Build the segment global execution environment for a given task.
......@@ -12,7 +12,7 @@ b-> c;
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
W,t = launch_interactive(P, log_level=logging.DEBUG)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment