Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 39a329fd authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'newbase' of git@gitorious.org:pipelet/pipelet into newbase

parents f027763d 507576e3
No preview for this file type
......@@ -56,7 +56,7 @@ def get_log_file (pipe, name):
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s%f")
return pipe.get_log_dir()+strdate+"_"+name
return path.join(pipe.get_log_dir(),strdate+"_"+name)
def set_logger (pipe, log_level):
......
......@@ -512,7 +512,7 @@ class Pipeline:
""" Return the pipe log directory.
"""
return self._prefix+"log/"
return path.join(self._prefix,"log")
def get_log_file (self, seg):
""" Return the segment log filename.
......
......@@ -242,13 +242,13 @@ class Scheduler():
self.tracker.update_status(t,'done')
if t.task_output:
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# This segment does not return products, a void product is
# created once and forall and is added to the product
# list.
t = task.Task(t.seg)
self.products_list.push_void(t)
self.task_queue.task_done()
......@@ -297,16 +297,17 @@ class Scheduler():
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queued
## product list to queue
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
l = [([r[0] for r in e], [r[1] for r in e]) for e in l]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))
for e in l: # foreach product of the task list
for e, par in l: # foreach product of the task list
if (e in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued'))
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
......@@ -315,8 +316,9 @@ class Scheduler():
if t.task_output:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from previously done task in segment %s"%seg)
......@@ -331,8 +333,9 @@ class Scheduler():
if t.task_output:
logger.info("Loading %d results from already computed orphan segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from already computed orphan segment %s"%seg)
......
......@@ -22,7 +22,7 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None):
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[]):
""" Initialize a task object.
Parameters
......@@ -44,8 +44,14 @@ class Task:
self.task_output = task_output
## integer, a uniq id for the task attributed by the tracker
self.id = id
## string, a tag name
self.tag = ""
## String, store the result of the computation
self.param=None
## date_time objects
self.queued_on = queued_on
self.begun_on = None
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
def __str__(self):
""" Convert task object to string.
......@@ -53,7 +59,6 @@ class Task:
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
class TaskList:
""" List of task objects.
......@@ -114,7 +119,7 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(t.task_output)
self._list[t.seg].append((t.task_output,t.id))
def push_void(self, t):
""" Add a 'void' task to the task list, if not previously done.
......@@ -134,4 +139,4 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(None)
self._list[t.seg].append((None, t.id))
......@@ -176,10 +176,10 @@ class SqliteTracker(Tracker,threading.Thread):
cur = self.conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
if not cur.fetchone():
with self.conn:
self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)')
self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)')
self.conn.execute('create table task_relations (father_id INTEGER, child_id INTEGER)')
self.conn.execute('create table segment_relations (father_id INTEGER, child_id INTEGER)')
self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)')
self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)')
self.conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
self.conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
self.conn.row_factory = sqlite3.Row
self.segments_registration()
......@@ -209,8 +209,8 @@ class SqliteTracker(Tracker,threading.Thread):
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
c = self.conn.execute(
'insert into segments (seg, curr_dir, tag, comment) values (?, ?, ?, ?)'
,(s, curr_dir, "", docline))
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id
logger.info("Storing connectivity for segment %s."%s)
......@@ -268,8 +268,6 @@ class SqliteTracker(Tracker,threading.Thread):
logger.info("Stopping the tracker ...")
self.requests.put((None,None))
def get_list_status(self, seg, status):
""" Return the task list for a given segment and status.
......@@ -309,14 +307,15 @@ class SqliteTracker(Tracker,threading.Thread):
"""
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
curr_dir = self.pipe.get_curr_dir(t.seg)
str_prod = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
with self.conn_lock:
with self.conn:
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(curr_dir,)).fetchone()[0]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_prod) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_prod))
t.id = c.lastrowid
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
return t
......@@ -363,6 +362,7 @@ class SqliteTracker(Tracker,threading.Thread):
def clean(self, seg):
""" Remove queued and running tasks from data base.
This is called only at the init of the scheduler.
Parameters
----------
seg: string, segment name.
......@@ -376,6 +376,6 @@ class SqliteTracker(Tracker,threading.Thread):
#self.conn.execute('delete from segments where seg_id=?',(seg_id,))
def close(self):
""" Close data base connexion.
""" Close database connexion.
"""
self.conn.close()
......@@ -186,7 +186,7 @@ class Web:
seg, currdir = conn.execute(
'select seg, curr_dir from segments where seg_id = ?'
,(segid,)).fetchone()
l = conn.execute('select str_prod from tasks where seg_id=? and status=?',(segid, status)).fetchall()
l = conn.execute('select str_input from tasks where seg_id=? and status=?',(segid, status)).fetchall()
conn.close()
html = html_tmp + '<h1> Data products for %s tasks in segment %s </h1>' % (status, seg)
html += '<div class="list"><p>Directory : %s</p> %d <span class="%s">%s</span> tasks <ul> '%( currdir, len(l), status, status)
......@@ -212,12 +212,11 @@ class Web:
## TO DO : get subpipes somewhere ???
conn = sqlite3.connect(self.db_file,check_same_thread=True)
with conn:
seg, currdir = conn.execute(
'select father_id from segments where child_id = ?'
fids = conn.execute(
'select father_id from segment_relations where child_id = ?'
,(segid,)).fetchall()
l = conn.execute('select str_prod from tasks where seg_id=? and status=?',(segid, status)).fetchall()
conn.close()
return [int(segid)]
return [int(segid)]+[i for i in fids]
@cherrypy.expose
@read_access
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment