Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 2b6d68c3 authored by Marc Betoule's avatar Marc Betoule
Browse files

Merge branch 'newbase' of git@gitorious.org:pipelet/pipelet into newbase

parents c1da684c 7ae60fa6
......@@ -54,7 +54,7 @@ def get_log_file (pipe, name):
"""
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s%f")
strdate = d.strftime("%y%m%d_%H%M%s%f")
return path.join(pipe.get_log_dir(),strdate+"_"+name)
......
......@@ -434,7 +434,7 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_fn(self, seg):
def get_output_fn(self, seg, prod=None):
""" Return the segment output file
Parameters
......@@ -445,7 +445,7 @@ class Pipeline:
-------
string, segment directory.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.output'%seg)
return path.join(self.get_data_dir(seg, prod),'seg_%s.output'%seg)
def get_param_file(self, seg):
""" Return the segment directory.
......@@ -518,8 +518,18 @@ class Pipeline:
""" Return the segment log filename.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.log'%seg)
def get_meta_file (self, seg, prod=-1):
""" Return the meta data filename
"""
if (prod == -1):
dirname = self.get_curr_dir(seg)
else:
dirname = self.get_data_dir(seg, prod)
return path.join(dirname,'seg_%s.meta'%seg)
if __name__ == "__main__":
import doctest
doctest.testmod()
......@@ -23,7 +23,8 @@ import pprint
import shutil
import threading
import logging
from contextlib import closing
import pickle
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
......@@ -211,8 +212,8 @@ class Scheduler():
t = self.tracker.add_queued(t)
if not self.abort:
self.task_queue.put(t)
def get_task(self):
""" Return the next task of the queue.
......@@ -239,7 +240,9 @@ class Scheduler():
"""
with self.success_lock:
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
self.store_meta_task(t)
if t.task_output:
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
......@@ -252,6 +255,40 @@ class Scheduler():
self.products_list.push_void(t)
self.task_queue.task_done()
def store_meta_seg(self, seg, parents):
""" Store meta information for segment.
This is used to rebuild db.
Parameters
----------
seg: string, segment name
parents: list of string, parent segment names.
"""
fn = self.pipe.get_meta_file(seg)
lst_dir = []
for e in parents:
lst_dir.append(self.pipe.get_curr_dir(e))
with closing(file(fn, 'w')) as f:
r = pickle.dump(dict({'parents':lst_dir}),f)
def store_meta_task(self, t):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg, prod=t.task_input)
with closing(file(fn, 'w')) as f:
r = pickle.dump(t.to_save,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -266,7 +303,7 @@ class Scheduler():
## some cleaning in the data base first
self.tracker.clean (seg)
logger.info('Pushing segment %s'%seg)
os.umask(18)
d = self.pipe.get_curr_dir(seg)
......@@ -290,6 +327,7 @@ class Scheduler():
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
self.store_meta_seg (seg, parents)
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
......@@ -297,6 +335,7 @@ class Scheduler():
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queue
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
l = [([r[0] for r in e], [r[1] for r in e]) for e in l]
......@@ -308,6 +347,7 @@ class Scheduler():
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
......@@ -358,6 +398,7 @@ class Scheduler():
"""
self.tracker.update_status(t,'failed')
self.store_meta_task(t)
self.task_queue.task_done()
def requeue(self, t):
......
......@@ -30,23 +30,23 @@ function navigation(url) {
// remove checked segments
function del_seg (){
lst = document.getElementsByName('checkbox');
var url="delseg?segid=";
var url="del_lst_seg?seglist=";
for (var i=0;i<lst.length;i++){
if(lst[i].checked){
url = url+lst[i].id+"--";
url = url+lst[i].id+"-";
}
}
self.location.href=url;
}
// set new tag for checked segments
function edit_tag (){
var check = prompt("Select an existing tag among: \n - tag1\n - tag2\n or set a new tag for this pipe","tag1");
function edit_tag (str_lst_tag){
var check = prompt("Select an existing tag among:"+str_lst_tag+" or set a new tag for this pipe","tag1");
var url="addtag?segid=";
lst = document.getElementsByName('checkbox');
for (var i=0;i<lst.length;i++){
if(lst[i].checked){
url = url+lst[i].id+"--";
url = url+lst[i].id+"-";
}
}
url = url+"&tag="+check;
......@@ -60,3 +60,49 @@ function uncheck (){
lst[i].checked=0;
}
}
// delete tag from sql base
function del_tag() {
var seltag = document.getElementById('se_tag_id') ;
var tag = seltag.options[seltag.selectedIndex].text;
var check = confirm("Delete tag "+tag+" ?");
if (check == true) {
self.location.href="deltag?tag="+tag;
}
}
// delete products
function del_prod(segid) {
var lst = document.getElementsByName('checkbox_p') ;
var url="del_lst_prod?segid="+segid+"&taskid=";
for (var i=0;i<lst.length;i++){
if(lst[i].checked){
url = url+lst[i].id+"-";
}
}
self.location.href=url;
}
// filter pipelines
function filter(){
var cbtag = document.getElementById('cb_tag') ;
var istag = cbtag.checked;
var tag = "None"
if (istag)
{
var seltag = document.getElementById('se_tag_id') ;
tag = seltag.options[seltag.selectedIndex].text;
}
var cbdate = document.getElementById('cb_date') ;
var isdate = cbdate.checked;
var date = "None"
if (isdate)
{
var seldate = document.getElementById('se_date_id') ;
date = seldate.options[seldate.selectedIndex].text;
}
var url = "filter?tag="+tag+"&date="+date;
self.location.href=url;
}
......@@ -52,6 +52,7 @@ class Task:
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
self.to_save = {}
def __str__(self):
""" Convert task object to string.
......
......@@ -21,6 +21,8 @@ import shutil
import threading
import Queue
import logging
from utils import str_date
from contextlib import closing
logger = logging.getLogger("scheduler")
class Tracker:
......@@ -153,6 +155,10 @@ class SqliteTracker(Tracker,threading.Thread):
def __init__(self, pipe):
""" Initialize the Sqlite tracker.
Build the different tables of the sqlite db.
Register all segments.
Initialize the db lock.
Parameters
----------
pipe: a pipe instance.
......@@ -163,7 +169,7 @@ class SqliteTracker(Tracker,threading.Thread):
self.pipe = pipe
self.seg_id_cache = {}
## string, sql filename
self.sqlfile = path.join(pipe._prefix,'.sqlstatus')
if self.pipe.sqlfile is not None:
......@@ -191,7 +197,9 @@ class SqliteTracker(Tracker,threading.Thread):
self.conn_lock = threading.Lock() # We use this to avoid database locked exceptions
def segments_registration(self):
"""
""" Register all segments in the db.
The registration includes connectivity between segments.
"""
previous = None
for s in self.pipe.flatten():
......@@ -204,7 +212,7 @@ class SqliteTracker(Tracker,threading.Thread):
seg_id = self.conn.execute(
'select seg_id from segments where curr_dir = ? limit 1',
(curr_dir,)).fetchone()[0]
self.seg_id_cache[s] = seg_id
self.seg_id_cache[s] = seg_id
logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir))
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
......@@ -212,13 +220,16 @@ class SqliteTracker(Tracker,threading.Thread):
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id
self.seg_id_cache[s] = seg_id
logger.info("Storing connectivity for segment %s."%s)
for p in self.pipe._parents[s]:
self.conn.execute(
'insert into segment_relations (father_id, child_id) values (?, ?)'
,(self.seg_id_cache[p], seg_id))
self.conn.commit()
def _asynchronous_request(self, sqlrequest, args):
""" Add an sql request to the queue
......@@ -306,19 +317,30 @@ class SqliteTracker(Tracker,threading.Thread):
"""
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_d = str_date()
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input))
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input, str_d))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
lst_dir = []
for e in t.parents:
with self.conn_lock:
c = self.conn.execute('select str_input from tasks where task_id=?', (e,)).fetchone()
if c:
lst_dir.append(c[0])
t.to_save['parents'] = lst_dir
t.to_save['queued_on'] = str_d
return t
def get_status(self, t):
""" Return task status.
......@@ -334,6 +356,21 @@ class SqliteTracker(Tracker,threading.Thread):
c = self.conn.execute('select status from tasks where task_id=?', t.id)
return c.fetchone()['status']
def write_param (self, t):
""" Add param string to meta file
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = t.tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def update_status(self, t, status):
""" Update the task with new status.
......@@ -348,17 +385,29 @@ class SqliteTracker(Tracker,threading.Thread):
-------
task object
"""
str_d = str_date()
if status == 'done':
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=? where task_id=?',
(status, task_output, str_d, t.id))
self._asynchronous_request('update segments set param=? where seg=?',
(t.tag, t.seg))
t.to_save["ended_on"] = str_d
self.write_param(t)
else:
t.task_output = None
task_output = pickle.dumps(t.task_output)
#TODO tag problem
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',
(status, task_output, t.id))
if status == 'running':
self._asynchronous_request('update tasks set status=?, output=?, begun_on=? where task_id=?',(status, task_output, str_date(), t.id))
t.to_save["begun_on"] = str_d
else:
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',(status, task_output, t.id))
t.status = status
return t
def clean(self, seg):
""" Remove queued and running tasks from data base.
......@@ -379,3 +428,6 @@ class SqliteTracker(Tracker,threading.Thread):
""" Close database connexion.
"""
self.conn.close()
......@@ -23,7 +23,13 @@ import subprocess
import tempfile
import base64
import zlib
import datetime
import sqlite3
import glob
from pipelet.repository import LocalRepository
from contextlib import closing
import pickle
import os
def flatten(seq):
"""
......@@ -103,6 +109,18 @@ def str_web(a):
return s
def str_date():
""" Return a string representation of the date time now.
Returns:
--------
string %y/%m/%d %H:%M:%s
"""
d = datetime.datetime.now()
return (d.strftime("%Y-%m-%d %H:%M:%S"))
def is_code_file(s):
"""Check wether the file is a python code file based an the file
extension.
......@@ -254,6 +272,158 @@ def get_hashkey(s, starthash):
return starthash
def parse_disk (pipedir):
""" Return list of pipe instance starting from pipedir.
"""
lstpipe = []
found = False
dircontent = glob.glob(path.join(pipedir,"seg_*"))
## visit each segment's repository
for c in dircontent:
## this is a node
if path.isdir(c):
lstpipe.append(c)
lstpipe += parse_disk(c)
return lstpipe
def rebuild_db_from_disk(pipedir, sqlfile=None):
""" Rebuild db from disk.
Parameters
----------
pipedir: string, a pipeline prefix
sqlfile: string, sql file name (default is pipedir/.sqlstatus)
"""
## set sqlfile
if sqlfile is None:
sqlfile = os.path.join(pipedir, ".sqlstatus")
## 1---: create all tables
## taken from sqlite tracker constructor
conn = sqlite3.connect(sqlfile)
conn.text_factory=str
cur = conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
if not cur.fetchone():
with conn:
conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)')
conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)')
conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
conn.row_factory = sqlite3.Row
## 2---: segments table from curr_dir
lst_dir = parse_disk (pipedir) ## list curr_dir
seg_id_cache = {} ## store new seg_id for each curr_dir
seg_depend_cache = {} ## store parents for each curr_dir
for curr_dir in lst_dir:
curr_dir = path.abspath(curr_dir)
R = LocalRepository(curr_dir)
s = curr_dir.split("_")[-2] ## seg name
print "Creating segment %s instance (%s)."%(s, curr_dir)
## read curr_dir/seg_s_code.py to get docstring
try:
docline = R.get_docline(s)
except Exception:
docline = ""
## read curr_dir/seg_s.meta to get parents, param, and tag
fn = path.join(curr_dir, "seg_"+s+".meta") ##
with closing(file(fn, 'r')) as f:
meta = pickle.load(f)
seg_depend_cache[curr_dir] = meta['parents']
param = meta['param']
if meta.has_key('tag'):
tag = meta['tag']
else:
tag = ""
## insert values in db
c = conn.execute(
'insert into segments (seg, curr_dir, comment, param, tag) values (?, ?, ?, ?, ?)'
,(s, curr_dir, docline, param, tag))
## retrieve the seg_id
seg_id_cache[curr_dir] = c.lastrowid
## 3---: segment_relations table from meta
## need to restart the loop in order to use the right seg_id
for k,v in seg_id_cache.items():
print "Storing connectivity for segment %s."%k
for p in seg_depend_cache[k]:
id = conn.execute('select seg_id from segments where curr_dir = ?',(p,)).fetchone()
conn.execute('insert into segment_relations (father_id, child_id) values (?, ?)',(id[0], v))
## 4---: tasks table from str_input
task_id_cache = {} ## store new task_id for each str_input
task_depend_cache = {} ## store parents for each str_input
for k,v in seg_id_cache.items():
print "Creating tasks of segment %s."%k
## Get the list of sub directory (ie str_input)
lst_task = []
for root, dir, fn in os.walk(path.join(k, "data")):
for d in dir:
lst_task.append(path.abspath(path.join(root, d)))
break
if not lst_task:
lst_task.append(path.abspath(path.join(k, "data")))
for t in lst_task:
## read output from output file
fn = glob.glob(path.join(t, "seg_*.output"))
if fn:
with closing (file(fn[0],'r')) as f:
output = pickle.load(f)
else:
print "error no output file found"
## read dates and parents from meta file
fn = glob.glob(path.join(t, "seg_*.meta"))
if fn:
with closing (file(fn[0],'r')) as f:
meta = pickle.load(f)
else:
print "error no meta file found"
task_depend_cache[t] = meta['parents']
## deduce status from dates
if meta.has_key("ended_on"):
status = 'done'
elif meta.has_key("begun_on"):
status = 'failed'
meta["ended_on"]= ''
else:
status = 'queued'
meta['begun_on'] = ''
meta["ended_on"]= ''
## insert values in db
input = None
c = conn.execute('insert into tasks (seg_id, output, input, begun_on, ended_on, queued_on,status,str_input) values (?, ?, ?, ?, ?, ?, ?, ?)',(v, pickle.dumps(output), pickle.dumps(input), meta["begun_on"], meta["ended_on"], meta["queued_on"], status,t))
## retrieve the task_id
task_id_cache[t] = c.lastrowid
## 5---: task_relations from meta + task input
## need to restart the loop in order to use the right task_id
for k,v in task_id_cache.items():
print "Storing connectivity for task %s."%k
for p in task_depend_cache[k]:
id = conn.execute('select task_id,output from tasks where str_input = ?',(p,)).fetchone()
conn.execute('insert into task_relations (father_id, child_id) values (?, ?)',(id[0], v))
##conn.execute ('update tasks set input=? where task_id=?', (id[1], v))
## TO DO : how to retrieve input ?
conn.commit()
if __name__ == "__main__":
import doctest
......
......@@ -25,6 +25,8 @@ import shutil
from cherrypy.lib.static import serve_file
from auth import read_access, write_access
import re
import pylab
import pickle
html_tmp = """
<html>
......@@ -56,29 +58,147 @@ class Web:
def get_lst_tag (self):
""" Return the list of existing tags
Tags are ; separated in the db.
Returns
-------
list of string
"""
## sql blabla
return []
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
</