Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit e65fb6e1 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Feature #731 use meta files for seg and tasks (pickle) to backup db infos

parent 0f6e7b89
......@@ -434,7 +434,7 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_fn(self, seg):
def get_output_fn(self, seg, prod=None):
""" Return the segment output file
Parameters
......@@ -445,7 +445,7 @@ class Pipeline:
-------
string, segment directory.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.output'%seg)
return path.join(self.get_data_dir(seg, prod),'seg_%s.output'%seg)
def get_param_file(self, seg):
""" Return the segment directory.
......@@ -518,8 +518,18 @@ class Pipeline:
""" Return the segment log filename.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.log'%seg)
def get_meta_file (self, seg, prod=-1):
""" Return the meta data filename
"""
if (prod == -1):
dirname = self.get_curr_dir(seg)
else:
dirname = self.get_data_dir(seg, prod)
return path.join(dirname,'seg_%s.meta'%seg)
if __name__ == "__main__":
import doctest
doctest.testmod()
......@@ -23,7 +23,8 @@ import pprint
import shutil
import threading
import logging
from contextlib import closing
import pickle
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
......@@ -211,8 +212,8 @@ class Scheduler():
t = self.tracker.add_queued(t)
if not self.abort:
self.task_queue.put(t)
def get_task(self):
""" Return the next task of the queue.
......@@ -239,7 +240,9 @@ class Scheduler():
"""
with self.success_lock:
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
self.store_meta_task(t)
if t.task_output:
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
......@@ -252,6 +255,40 @@ class Scheduler():
self.products_list.push_void(t)
self.task_queue.task_done()
def store_meta_seg(self, seg, parents):
""" Store meta information for segment.
This is used to rebuild db.
Parameters
----------
seg: string, segment name
parents: list of string, parent segment names.
"""
fn = self.pipe.get_meta_file(seg)
lst_dir = []
for e in parents:
lst_dir.append(self.pipe.get_curr_dir(e))
with closing(file(fn, 'w')) as f:
r = pickle.dump(dict({'parents':lst_dir}),f)
def store_meta_task(self, t):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg, prod=t.task_input)
with closing(file(fn, 'w')) as f:
r = pickle.dump(t.to_save,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -266,7 +303,7 @@ class Scheduler():
## some cleaning in the data base first
self.tracker.clean (seg)
logger.info('Pushing segment %s'%seg)
os.umask(18)
d = self.pipe.get_curr_dir(seg)
......@@ -290,6 +327,7 @@ class Scheduler():
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
self.store_meta_seg (seg, parents)
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
......@@ -297,6 +335,7 @@ class Scheduler():
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queue
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
l = [([r[0] for r in e], [r[1] for r in e]) for e in l]
......@@ -308,6 +347,7 @@ class Scheduler():
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
......@@ -358,6 +398,7 @@ class Scheduler():
"""
self.tracker.update_status(t,'failed')
self.store_meta_task(t)
self.task_queue.task_done()
def requeue(self, t):
......
......@@ -52,6 +52,7 @@ class Task:
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
self.to_save = {}
def __str__(self):
""" Convert task object to string.
......
......@@ -22,6 +22,7 @@ import threading
import Queue
import logging
from utils import str_date
from contextlib import closing
logger = logging.getLogger("scheduler")
class Tracker:
......@@ -154,6 +155,10 @@ class SqliteTracker(Tracker,threading.Thread):
def __init__(self, pipe):
""" Initialize the Sqlite tracker.
Build the different tables of the sqlite db.
Register all segments.
Initialize the db lock.
Parameters
----------
pipe: a pipe instance.
......@@ -164,7 +169,7 @@ class SqliteTracker(Tracker,threading.Thread):
self.pipe = pipe
self.seg_id_cache = {}
## string, sql filename
self.sqlfile = path.join(pipe._prefix,'.sqlstatus')
if self.pipe.sqlfile is not None:
......@@ -192,7 +197,9 @@ class SqliteTracker(Tracker,threading.Thread):
self.conn_lock = threading.Lock() # We use this to avoid database locked exceptions
def segments_registration(self):
"""
""" Register all segments in the db.
The registration includes connectivity between segments.
"""
previous = None
for s in self.pipe.flatten():
......@@ -205,7 +212,7 @@ class SqliteTracker(Tracker,threading.Thread):
seg_id = self.conn.execute(
'select seg_id from segments where curr_dir = ? limit 1',
(curr_dir,)).fetchone()[0]
self.seg_id_cache[s] = seg_id
self.seg_id_cache[s] = seg_id
logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir))
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
......@@ -213,13 +220,16 @@ class SqliteTracker(Tracker,threading.Thread):
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id
self.seg_id_cache[s] = seg_id
logger.info("Storing connectivity for segment %s."%s)
for p in self.pipe._parents[s]:
self.conn.execute(
'insert into segment_relations (father_id, child_id) values (?, ?)'
,(self.seg_id_cache[p], seg_id))
self.conn.commit()
def _asynchronous_request(self, sqlrequest, args):
""" Add an sql request to the queue
......@@ -309,17 +319,28 @@ class SqliteTracker(Tracker,threading.Thread):
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_d = str_date()
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input, str_date()))
(seg_id, 'queued', task_input, task_output, str_input, str_d))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
lst_dir = []
for e in t.parents:
with self.conn_lock:
c = self.conn.execute('select str_input from tasks where task_id=?', (e,)).fetchone()
if c:
lst_dir.append(c[0])
t.to_save['parents'] = lst_dir
t.to_save['queued_on'] = str_d
return t
def get_status(self, t):
""" Return task status.
......@@ -335,6 +356,21 @@ class SqliteTracker(Tracker,threading.Thread):
c = self.conn.execute('select status from tasks where task_id=?', t.id)
return c.fetchone()['status']
def write_param (self, t):
""" Add param string to meta file
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = t.tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def update_status(self, t, status):
""" Update the task with new status.
......@@ -349,24 +385,29 @@ class SqliteTracker(Tracker,threading.Thread):
-------
task object
"""
str_d = str_date()
if status == 'done':
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=? where task_id=?',
(status, task_output, str_date(), t.id))
(status, task_output, str_d, t.id))
self._asynchronous_request('update segments set param=? where seg=?',
(t.tag, t.seg))
t.to_save["ended_on"] = str_d
self.write_param(t)
else:
t.task_output = None
task_output = pickle.dumps(t.task_output)
if status == 'running':
self._asynchronous_request('update tasks set status=?, output=?, begun_on=? where task_id=?',(status, task_output, str_date(), t.id))
t.to_save["begun_on"] = str_d
else:
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',(status, task_output, t.id))
#TODO tag problem
t.status = status
return t
def clean(self, seg):
""" Remove queued and running tasks from data base.
......@@ -387,3 +428,6 @@ class SqliteTracker(Tracker,threading.Thread):
""" Close database connexion.
"""
self.conn.close()
......@@ -24,6 +24,12 @@ import tempfile
import base64
import zlib
import datetime
import sqlite3
import glob
from pipelet.repository import LocalRepository
from contextlib import closing
import pickle
import os
def flatten(seq):
"""
......@@ -266,6 +272,158 @@ def get_hashkey(s, starthash):
return starthash
def parse_disk (pipedir):
""" Return list of pipe instance starting from pipedir.
"""
lstpipe = []
found = False
dircontent = glob.glob(path.join(pipedir,"seg_*"))
## visit each segment's repository
for c in dircontent:
## this is a node
if path.isdir(c):
lstpipe.append(c)
lstpipe += parse_disk(c)
return lstpipe
def rebuild_db_from_disk(pipedir, sqlfile=None):
""" Rebuild db from disk.
Parameters
----------
pipedir: string, a pipeline prefix
sqlfile: string, sql file name (default is pipedir/.sqlstatus)
"""
## set sqlfile
if sqlfile is None:
sqlfile = os.path.join(pipedir, ".sqlstatus")
## 1---: create all tables
## taken from sqlite tracker constructor
conn = sqlite3.connect(sqlfile)
conn.text_factory=str
cur = conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
if not cur.fetchone():
with conn:
conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)')
conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)')
conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
conn.row_factory = sqlite3.Row
## 2---: segments table from curr_dir
lst_dir = parse_disk (pipedir) ## list curr_dir
seg_id_cache = {} ## store new seg_id for each curr_dir
seg_depend_cache = {} ## store parents for each curr_dir
for curr_dir in lst_dir:
curr_dir = path.abspath(curr_dir)
R = LocalRepository(curr_dir)
s = curr_dir.split("_")[-2] ## seg name
print "Creating segment %s instance (%s)."%(s, curr_dir)
## read curr_dir/seg_s_code.py to get docstring
try:
docline = R.get_docline(s)
except Exception:
docline = ""
## read curr_dir/seg_s.meta to get parents, param, and tag
fn = path.join(curr_dir, "seg_"+s+".meta") ##
with closing(file(fn, 'r')) as f:
meta = pickle.load(f)
seg_depend_cache[curr_dir] = meta['parents']
param = meta['param']
if meta.has_key('tag'):
tag = meta['tag']
else:
tag = ""
## insert values in db
c = conn.execute(
'insert into segments (seg, curr_dir, comment, param, tag) values (?, ?, ?, ?, ?)'
,(s, curr_dir, docline, param, tag))
## retrieve the seg_id
seg_id_cache[curr_dir] = c.lastrowid
## 3---: segment_relations table from meta
## need to restart the loop in order to use the right seg_id
for k,v in seg_id_cache.items():
print "Storing connectivity for segment %s."%k
for p in seg_depend_cache[k]:
id = conn.execute('select seg_id from segments where curr_dir = ?',(p,)).fetchone()
conn.execute('insert into segment_relations (father_id, child_id) values (?, ?)',(id[0], v))
## 4---: tasks table from str_input
task_id_cache = {} ## store new task_id for each str_input
task_depend_cache = {} ## store parents for each str_input
for k,v in seg_id_cache.items():
print "Creating tasks of segment %s."%k
## Get the list of sub directory (ie str_input)
lst_task = []
for root, dir, fn in os.walk(path.join(k, "data")):
for d in dir:
lst_task.append(path.abspath(path.join(root, d)))
break
if not lst_task:
lst_task.append(path.abspath(path.join(k, "data")))
for t in lst_task:
## read output from output file
fn = glob.glob(path.join(t, "seg_*.output"))
if fn:
with closing (file(fn[0],'r')) as f:
output = pickle.load(f)
else:
print "error no output file found"
## read dates and parents from meta file
fn = glob.glob(path.join(t, "seg_*.meta"))
if fn:
with closing (file(fn[0],'r')) as f:
meta = pickle.load(f)
else:
print "error no meta file found"
task_depend_cache[t] = meta['parents']
## deduce status from dates
if meta.has_key("ended_on"):
status = 'done'
elif meta.has_key("begun_on"):
status = 'failed'
meta["ended_on"]= ''
else:
status = 'queued'
meta['begun_on'] = ''
meta["ended_on"]= ''
## insert values in db
input = None
c = conn.execute('insert into tasks (seg_id, output, input, begun_on, ended_on, queued_on,status,str_input) values (?, ?, ?, ?, ?, ?, ?, ?)',(v, pickle.dumps(output), pickle.dumps(input), meta["begun_on"], meta["ended_on"], meta["queued_on"], status,t))
## retrieve the task_id
task_id_cache[t] = c.lastrowid
## 5---: task_relations from meta + task input
## need to restart the loop in order to use the right task_id
for k,v in task_id_cache.items():
print "Storing connectivity for task %s."%k
for p in task_depend_cache[k]:
id = conn.execute('select task_id,output from tasks where str_input = ?',(p,)).fetchone()
conn.execute('insert into task_relations (father_id, child_id) values (?, ?)',(id[0], v))
##conn.execute ('update tasks set input=? where task_id=?', (id[1], v))
## TO DO : how to retrieve input ?
conn.commit()
if __name__ == "__main__":
import doctest
......
......@@ -26,6 +26,7 @@ from cherrypy.lib.static import serve_file
from auth import read_access, write_access
import re
import pylab
import pickle
html_tmp = """
<html>
......@@ -117,7 +118,7 @@ class Web:
for segid in seglst:
if segid:
with conn:
l = conn.execute('select tag from segments where seg_id=?',(segid,)).fetchone()
l = conn.execute('select tag,curr_dir from segments where seg_id=?',(segid,)).fetchone()
lst_tag = []
if l[0] is not None and l:
lst_tag = l[0].split(";")
......@@ -125,9 +126,24 @@ class Web:
lst_tag = pylab.unique(lst_tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid))
fn = glob(os.path.join(l[1], "seg_*.meta"))
self._update_meta(fn, str_tag)
raise cherrypy.HTTPRedirect('/'+self.name+'/',303)
def _update_meta(self, fn, str_tag):
""" Update meta file with current tag value.
fn: string, meta file name
tag: string, tag string
"""
if fn:
with closing(file(fn[0], 'r')) as f:
d = pickle.load(f)
d['tag'] = str_tag
with closing(file(fn[0], 'w')) as f:
r = pickle.dump(d,f)
@cherrypy.expose
@write_access
def deltag (self, tag):
......@@ -140,13 +156,14 @@ class Web:
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select seg_id, tag from segments where tag like ?',("%"+tag+"%",)).fetchall()
l = conn.execute('select seg_id, tag, curr_dir from segments where tag like ?',("%"+tag+"%",)).fetchall()
for s in l:
lst_tag = s[1].split(";")
lst_tag.remove(tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0]))
fn = glob(os.path.join(s[2], "seg_*.meta"))
self._update_meta(fn, str_tag)
raise cherrypy.HTTPRedirect('/'+self.name+'/',303)
......@@ -249,7 +266,9 @@ class Web:
html += '<a class="icon delete" href="javascript:del_seg();"><small>Delete</small></a>'
html += '<a class="icon log" href="log?logdir=%s"><small>Browse log</small></a>'%(l[0][1].split("seg")[0]+"log")
html +='</p></fieldset>'
html += '<br><div class="list"><ul class="mktree" id="segtree">'
html += '<br>'
html += '<a href="/%s/">Back</a>'%(self.name)
html +='<div class="list"><ul class="mktree" id="segtree">'
indent = -1
# select a subset
......@@ -265,11 +284,6 @@ class Web:
with conn:
e = conn.execute('select status, count(status) from tasks where seg_id=? group by status',(s[2],)).fetchall()
ss = s[3]
#try:
# with closing(file(os.path.join(s[1],"stdout"))) as f:
# ss = f.read()
#except IOError:
# ss = ""
if ss is None:
ss = ""
print s
......@@ -394,6 +408,7 @@ class Web:
'select seg, curr_dir from segments where seg_id = ?'
,(segid,)).fetchone()
conn.close()
html = self.index(highlight=highlight) + '<p>%s</p><ul>' % (currdir)
l = glob(os.path.join(currdir,"*.*"))
for e in l:
......@@ -430,7 +445,10 @@ class Web:
l = conn.execute('delete from segments where curr_dir like ?',(currdir+'%',))
conn.close()
shutil.rmtree(currdir)
try:
shutil.rmtree(currdir)
except:
pass
if lst_seg:
self._delseg(lst_seg)
......
......@@ -157,17 +157,16 @@ class Worker(object):
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def write_res(self, seg, task_output):
def write_res(self, t):
""" Pickle the result of the segment.
Parameters
----------
seg: string, segment name
task_output: task result.
t: task instance
"""
fn = self.pipe.get_output_fn(seg)
fn = self.pipe.get_output_fn(t.seg, prod=t.task_input)
with closing (file(fn,'w')) as f:
r = pickle.dump(task_output,f)
r = pickle.dump(t.task_output,f)
def make_tag(self, seg, glo):
""" Write a tag using parameters of a segment.
......@@ -198,6 +197,9 @@ class Worker(object):
f.close()
return var_tag
def make_dir(self, task):
""" Create a directory for a given task.
......@@ -480,7 +482,7 @@ class InteractiveWorker(Worker):
except:
res = None
task.task_output = res
self.write_res(seg, res)
self.write_res(task)
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
......