Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit ae65b320 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Feature #731 + uniform use of task object

parent 7ae60fa6
......@@ -434,19 +434,6 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_fn(self, seg, prod=None):
""" Return the segment output file
Parameters
----------
seg : string, segment name.
Returns
-------
string, segment directory.
"""
return path.join(self.get_data_dir(seg, prod),'seg_%s.output'%seg)
def get_param_file(self, seg):
""" Return the segment directory.
......@@ -473,21 +460,6 @@ class Pipeline:
"""
return path.join(self.get_curr_dir(seg),'stdout')
# def write_subpipes_file(self, seg):
# """ Return the segment directory.
# Parameters
# ----------
# seg : string, segment name.
# Returns
# -------
# string, segment directory.
# """
# f = file(path.join(self.get_curr_dir(seg),'subpipes.txt'),'w')
# for d in self._sub_dirs[seg]:
# f.write(d)
# f.close()
def get_data_dir(self, seg, prod=None):
""" Return the data directory for the segment or a product
......
......@@ -242,7 +242,7 @@ class Scheduler():
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
self.store_meta_task(t)
if t.task_output:
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
......@@ -274,21 +274,6 @@ class Scheduler():
r = pickle.dump(dict({'parents':lst_dir}),f)
def store_meta_task(self, t):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg, prod=t.task_input)
with closing(file(fn, 'w')) as f:
r = pickle.dump(t.to_save,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -327,7 +312,7 @@ class Scheduler():
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
self.store_meta_seg (seg, parents)
self.store_meta_seg(seg, parents)
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
......@@ -346,7 +331,7 @@ class Scheduler():
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par, seg_parents=parents))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
......@@ -398,7 +383,6 @@ class Scheduler():
"""
self.tracker.update_status(t,'failed')
self.store_meta_task(t)
self.task_queue.task_done()
def requeue(self, t):
......
......@@ -14,6 +14,9 @@
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import threading
from utils import str_date
from contextlib import closing
import pickle
class Task:
""" A segment code associated with its input/output product(s).
......@@ -22,7 +25,7 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[]):
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[], seg_parents=[]):
""" Initialize a task object.
Parameters
......@@ -52,7 +55,10 @@ class Task:
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
self.to_save = {}
## List of str_input of the parent tasks
self.str_parents = []
## List of curr_dir of the parent segments
self.seg_parents = seg_parents
def __str__(self):
""" Convert task object to string.
......@@ -60,6 +66,37 @@ class Task:
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
def update_status(self, status):
""" Update task status.
"""
str_d = str_date()
self.status = status
if status == 'done':
self.ended_on = str_d
elif status == 'queued':
self.queued_on = str_d
else:
self.task_output = None
if status == 'running':
self.begun_on = str_d
def store_meta(self, fn):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
fn, string file name
"""
with closing(file(fn, 'w')) as f:
r = pickle.dump(dict({'parents':self.str_parents, 'queued_on':self.queued_on, 'ended_on':self.ended_on, 'begun_on':self.begun_on, 'input':pickle.dumps(self.task_input), 'output':pickle.dumps(self.task_output), 'status':self.status}),f)
class TaskList:
""" List of task objects.
......
......@@ -297,8 +297,8 @@ class SqliteTracker(Tracker,threading.Thread):
with self.conn_lock:
ts = self.conn.execute('select * from tasks as t join segments as s on t.seg_id=s.seg_id and s.curr_dir=? and t.status=? ',(s, status))
for t in ts:
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
l.append(task.Task(seg,status=status,id=t['task_id'], task_input=task_input, task_output=task_output))
return l
......@@ -316,28 +316,22 @@ class SqliteTracker(Tracker,threading.Thread):
task object
"""
t.update_status ('queued')
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_d = str_date()
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input, str_d))
(seg_id, t.status, task_input, task_output, str_input, t.queued_on))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
lst_dir = []
for e in t.parents:
with self.conn_lock:
c = self.conn.execute('select str_input from tasks where task_id=?', (e,)).fetchone()
if c:
lst_dir.append(c[0])
t.to_save['parents'] = lst_dir
t.to_save['queued_on'] = str_d
for fid in t.parents:
c = self.conn.execute('select str_input from tasks where task_id=?', ((fid,)))
for e in c:
t.str_parents.append(e[0])
return t
......@@ -356,21 +350,6 @@ class SqliteTracker(Tracker,threading.Thread):
c = self.conn.execute('select status from tasks where task_id=?', t.id)
return c.fetchone()['status']
def write_param (self, t):
""" Add param string to meta file
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = t.tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def update_status(self, t, status):
""" Update the task with new status.
......@@ -385,26 +364,17 @@ class SqliteTracker(Tracker,threading.Thread):
-------
task object
"""
str_d = str_date()
t.update_status (status)
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=?, begun_on=?, queued_on=? where task_id=?',
(status, task_output, t.ended_on, t.begun_on, t.queued_on, t.id))
if status == 'done':
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=? where task_id=?',
(status, task_output, str_d, t.id))
self._asynchronous_request('update segments set param=? where seg=?',
(t.tag, t.seg))
t.to_save["ended_on"] = str_d
self.write_param(t)
else:
t.task_output = None
task_output = pickle.dumps(t.task_output)
if status == 'running':
self._asynchronous_request('update tasks set status=?, output=?, begun_on=? where task_id=?',(status, task_output, str_date(), t.id))
t.to_save["begun_on"] = str_d
else:
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',(status, task_output, t.id))
t.status = status
(t.param, t.seg))
if status =='done' or status == 'failed':
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input))
return t
......
......@@ -376,15 +376,7 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
for t in lst_task:
## read output from output file
fn = glob.glob(path.join(t, "seg_*.output"))
if fn:
with closing (file(fn[0],'r')) as f:
output = pickle.load(f)
else:
print "error no output file found"
## read dates and parents from meta file
## read task propertie from meta file
fn = glob.glob(path.join(t, "seg_*.meta"))
if fn:
with closing (file(fn[0],'r')) as f:
......@@ -393,20 +385,9 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
print "error no meta file found"
task_depend_cache[t] = meta['parents']
## deduce status from dates
if meta.has_key("ended_on"):
status = 'done'
elif meta.has_key("begun_on"):
status = 'failed'
meta["ended_on"]= ''
else:
status = 'queued'
meta['begun_on'] = ''
meta["ended_on"]= ''
## insert values in db
input = None
c = conn.execute('insert into tasks (seg_id, output, input, begun_on, ended_on, queued_on,status,str_input) values (?, ?, ?, ?, ?, ?, ?, ?)',(v, pickle.dumps(output), pickle.dumps(input), meta["begun_on"], meta["ended_on"], meta["queued_on"], status,t))
c = conn.execute('insert into tasks (seg_id, output, input, begun_on, ended_on, queued_on,status,str_input) values (?, ?, ?, ?, ?, ?, ?, ?)',(v, pickle.dumps(meta["output"]), pickle.dumps(meta["input"]), meta["begun_on"], meta["ended_on"], meta["queued_on"], meta["status"],t))
## retrieve the task_id
task_id_cache[t] = c.lastrowid
......@@ -418,8 +399,6 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
for p in task_depend_cache[k]:
id = conn.execute('select task_id,output from tasks where str_input = ?',(p,)).fetchone()
conn.execute('insert into task_relations (father_id, child_id) values (?, ?)',(id[0], v))
##conn.execute ('update tasks set input=? where task_id=?', (id[1], v))
## TO DO : how to retrieve input ?
conn.commit()
......
......@@ -157,17 +157,6 @@ class Worker(object):
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def write_res(self, t):
""" Pickle the result of the segment.
Parameters
----------
t: task instance
"""
fn = self.pipe.get_output_fn(t.seg, prod=t.task_input)
with closing (file(fn,'w')) as f:
r = pickle.dump(t.task_output,f)
def make_tag(self, seg, glo):
""" Write a tag using parameters of a segment.
......@@ -191,10 +180,12 @@ class Worker(object):
else:
logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
if not path.exists(self.pipe.get_tag_file(seg)):
f = file(self.pipe.get_tag_file(seg), "w")
f.write(var_tag)
f.close()
fn = self.pipe.get_meta_file(seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = var_tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
return var_tag
......@@ -482,7 +473,6 @@ class InteractiveWorker(Worker):
except:
res = None
task.task_output = res
self.write_res(task)
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
......@@ -490,7 +480,7 @@ class InteractiveWorker(Worker):
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
task.param = self.make_tag(seg, glo) # Dump var_tag
task.status = "done" # set status
return task
......@@ -547,7 +537,6 @@ class ThreadWorker(Worker, threading.Thread):
except:
res = None
task.task_output = res
self.write_res(seg, res)
if task.status == "failed":
return task
try: # Save params
......@@ -557,7 +546,7 @@ class ThreadWorker(Worker, threading.Thread):
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
task.param = self.make_tag(seg, glo) # Dump var_tag
return task
......@@ -627,7 +616,6 @@ class ProcessWorker(Worker, Process):
except:
res = None
task.task_output = res
self.write_res(seg, res)
if task.status == "failed":
return task
try: # Save params
......@@ -637,7 +625,7 @@ class ProcessWorker(Worker, Process):
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
task.param = self.make_tag(seg, glo) # Dump var_tag
return task
class _FakeWorker(ProcessWorker):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment