Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit d8e8886e authored by Marc Betoule's avatar Marc Betoule
Browse files

engine for new DB seems complete

parent d7b5c83a
No preview for this file type
......@@ -242,13 +242,13 @@ class Scheduler():
self.tracker.update_status(t,'done')
if t.task_output:
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# This segment does not return products, a void product is
# created once and forall and is added to the product
# list.
t = task.Task(t.seg)
self.products_list.push_void(t)
self.task_queue.task_done()
......@@ -297,16 +297,17 @@ class Scheduler():
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queued
## product list to queue
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
l = [([r[0] for r in e], [r[1] for r in e]) for e in l]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))
for e in l: # foreach product of the task list
for e, par in l: # foreach product of the task list
if (e in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued'))
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
......@@ -315,8 +316,9 @@ class Scheduler():
if t.task_output:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from previously done task in segment %s"%seg)
......@@ -331,8 +333,9 @@ class Scheduler():
if t.task_output:
logger.info("Loading %d results from already computed orphan segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from already computed orphan segment %s"%seg)
......
......@@ -22,7 +22,7 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None):
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[]):
""" Initialize a task object.
Parameters
......@@ -44,8 +44,14 @@ class Task:
self.task_output = task_output
## integer, a uniq id for the task attributed by the tracker
self.id = id
## string, a tag name
self.tag = ""
## String, store the result of the computation
self.param=None
## date_time objects
self.queued_on = queued_on
self.begun_on = None
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
def __str__(self):
""" Convert task object to string.
......@@ -53,7 +59,6 @@ class Task:
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
class TaskList:
""" List of task objects.
......@@ -114,7 +119,7 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(t.task_output)
self._list[t.seg].append((t.task_output,t.id))
def push_void(self, t):
""" Add a 'void' task to the task list, if not previously done.
......@@ -134,4 +139,4 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(None)
self._list[t.seg].append((None, t.id))
......@@ -176,10 +176,10 @@ class SqliteTracker(Tracker,threading.Thread):
cur = self.conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
if not cur.fetchone():
with self.conn:
self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)')
self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)')
self.conn.execute('create table task_relations (father_id INTEGER, child_id INTEGER)')
self.conn.execute('create table segment_relations (father_id INTEGER, child_id INTEGER)')
self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)')
self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)')
self.conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
self.conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
self.conn.row_factory = sqlite3.Row
self.segments_registration()
......@@ -209,8 +209,8 @@ class SqliteTracker(Tracker,threading.Thread):
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
c = self.conn.execute(
'insert into segments (seg, curr_dir, tag, comment) values (?, ?, ?, ?)'
,(s, curr_dir, "", docline))
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id
logger.info("Storing connectivity for segment %s."%s)
......@@ -268,8 +268,6 @@ class SqliteTracker(Tracker,threading.Thread):
logger.info("Stopping the tracker ...")
self.requests.put((None,None))
def get_list_status(self, seg, status):
""" Return the task list for a given segment and status.
......@@ -309,14 +307,15 @@ class SqliteTracker(Tracker,threading.Thread):
"""
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
curr_dir = self.pipe.get_curr_dir(t.seg)
str_prod = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
with self.conn_lock:
with self.conn:
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(curr_dir,)).fetchone()[0]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_prod) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_prod))
t.id = c.lastrowid
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
return t
......@@ -363,6 +362,7 @@ class SqliteTracker(Tracker,threading.Thread):
def clean(self, seg):
""" Remove queued and running tasks from data base.
This is called only at the init of the scheduler.
Parameters
----------
seg: string, segment name.
......@@ -376,6 +376,6 @@ class SqliteTracker(Tracker,threading.Thread):
#self.conn.execute('delete from segments where seg_id=?',(seg_id,))
def close(self):
""" Close data base connexion.
""" Close database connexion.
"""
self.conn.close()
......@@ -129,7 +129,7 @@ class Web:
seg, currdir = conn.execute(
'select seg, curr_dir from segments where seg_id = ?'
,(segid,)).fetchone()
l = conn.execute('select str_prod from tasks where seg_id=? and status=?',(segid, status)).fetchall()
l = conn.execute('select str_input from tasks where seg_id=? and status=?',(segid, status)).fetchall()
conn.close()
html = html_tmp + '<h1> Data products for %s tasks in segment %s </h1>' % (status, seg)
html += '<div class="list"><p>Directory : %s</p> %d <span class="%s">%s</span> tasks <ul> '%( currdir, len(l), status, status)
......@@ -155,12 +155,11 @@ class Web:
## TO DO : get subpipes somewhere ???
conn = sqlite3.connect(self.db_file,check_same_thread=True)
with conn:
seg, currdir = conn.execute(
'select father_id from segments where child_id = ?'
fids = conn.execute(
'select father_id from segment_relations where child_id = ?'
,(segid,)).fetchall()
l = conn.execute('select str_prod from tasks where seg_id=? and status=?',(segid, status)).fetchall()
conn.close()
return [int(segid)]
return [int(segid)]+[i for i in fids]
@cherrypy.expose
@read_access
......
......@@ -150,7 +150,7 @@ class Worker(object):
----------
task_output: task result.
"""
fn = self.pipe.get_output_fn()
fn = self.pipe.get_output_file(self.task.seg)
with closing (file(fn,'w')) as f:
r = pickle.dump(task_output,f)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment