Commit 6f809eff authored by Marc Betoule's avatar Marc Betoule
Browse files

a first go toward the new db format

parent 093b17ad
import sqlite3
def old_to_new (old_file, new_file):
""" Convert an old database to the new database format."""
conn1 = sqlite3.connect(old_file)
conn2 = sqlite3.connect(new_file)
# Creates tables in the new database
with conn2:
conn2.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)')
conn2.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)')
conn2.execute('create table task_relations (father_id INTEGER, child_id INTEGER)')
with conn1:
l = conn1.execute('select distinct seg, curr_dir from tasks').fetchall()
with conn2:
for e in l:
conn2.execute('insert into segments (seg, curr_dir) values (?, ?)', e)
ls = conn2.execute('select seg_id, curr_dir from segments').fetchall()
with conn2:
for seg_id, curr_dir in ls:
l = conn1.execute('select status, prod, res, str_prod from tasks where curr_dir==?',(curr_dir,))
for t in l:
conn2.execute('insert into tasks (seg_id, status, input, output, str_prod) values (?, ?, ?, ?, ?)',(seg_id,)+t)
......@@ -240,9 +240,9 @@ class Scheduler():
with self.success_lock:
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
if t.res:
for r in t.res:
t = task.Task(t.seg, res=r)
if t.task_output:
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
else:
# This segment does not return products, a void product is
......@@ -279,9 +279,9 @@ class Scheduler():
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.prod for t in d] ## done products
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.prod for t in failed] # failed products
failed_prod = [t.task_input for t in failed] # failed products
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d done tasks segment %s'%(len(failed),seg))
if parents:
......@@ -294,16 +294,16 @@ class Scheduler():
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, prod=e, status='queued'))
self.put_task(task.Task(seg, task_input=e, status='queued'))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
ind = dprod.index(e)
t = d[ind];
if t.res:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.res),seg))
for r in t.res:
t = task.Task(t.seg, res=r)
if t.task_output:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
else:
# add an empty product once.
......@@ -316,10 +316,10 @@ class Scheduler():
logger.warning("Orphan segment %s already computed but failed"%seg)
elif d: # done
logger.info("Orphan segment %s already computed"%seg)
if t.res:
logger.info("Loading %d results from already computed orphan segment %s"%(len(t.res),seg))
for r in t.res:
t = task.Task(t.seg, res=r)
if t.task_output:
logger.info("Loading %d results from already computed orphan segment %s"%(len(t.task_output),seg))
for r in t.task_output:
t = task.Task(t.seg, task_output=r)
self.products_list.push(t)
else:
# add an empty product once.
......
......@@ -22,26 +22,26 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, prod=None, status=None, res=None, id=None):
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None):
""" Initialize a task object.
Parameters
----------
seg : string, a segment name
prod : an input product
task_input : the input product
status : string, task status (done, queued, failed, running)
res : list, task output product(s)
task_output : list, task output product(s)
id : a uniq id for the task attributed by the tracker
"""
## string, a segment name
self.seg = seg
## pythonn object, an input product
self.prod = prod
self.task_input = task_input
## string, task status (done, queued, failed, running)
self.status = status
## list, task output product(s)
self.res = res
self.task_output = task_output
## integer, a uniq id for the task attributed by the tracker
self.id = id
## string, a tag name
......@@ -50,8 +50,8 @@ class Task:
def __str__(self):
""" Convert task object to string.
"""
return 'seg: %s, status: %s, id: %s, prod: %s, res: %s' \
% (self.seg, self.status, self.id, str(self.prod), str(self.res))
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
class TaskList:
......@@ -114,7 +114,7 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(t.res)
self._list[t.seg].append(t.task_output)
def push_void(self, t):
""" Add a 'void' task to the task list, if not previously done.
......
......@@ -115,7 +115,7 @@ class Tracker:
def update_status(self, t, status):
""" Update the task with new status.
Update the task status. If not done res is set to None.
Update the task status. If not done task_output is set to None.
Parameters
----------
......@@ -147,7 +147,7 @@ class SqliteTracker(Tracker,threading.Thread):
Tasks are saved in a sqlite3 data base.
Format is:
id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, status TEXT, prod BLOB, res BLOB, str_prod TEXT, tag TEXT
TODO
"""
def __init__(self, pipe):
......@@ -173,8 +173,11 @@ class SqliteTracker(Tracker,threading.Thread):
self.conn.text_factory=str
cur = self.conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
if not cur.fetchone():
self.conn.execute('create table tasks (id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, status TEXT, prod BLOB, res BLOB, str_prod TEXT, tag TEXT)')
self.conn.commit()
with self.conn:
self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)')
self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)')
self.conn.execute('create table task_relations (father_id INTEGER, child_id INTEGER)')
self.conn.row_factory = sqlite3.Row
## requests queue
......@@ -244,11 +247,11 @@ class SqliteTracker(Tracker,threading.Thread):
s = self.pipe.get_curr_dir(seg)
logger.debug("Looking for curr_dir=%s, status=%s in database"%(s, status))
with self.conn_lock:
ts = self.conn.execute('select * from tasks where curr_dir=? and status=?',(s, status))
ts = self.conn.execute('select * from tasks as t join segments as s on t.seg_id=s.seg_id and s.curr_dir=? and t.status=? ',(s, status))
for t in ts:
res = pickle.loads(t['res'])
prod = pickle.loads(t['prod'])
l.append(task.Task(seg,status=status,id=t['id'], res=res,prod=prod))
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
l.append(task.Task(seg,status=status,id=t['id'], task_input=task_input, task_output=task_output))
return l
def add_queued(self, t):
......@@ -265,14 +268,15 @@ class SqliteTracker(Tracker,threading.Thread):
task object
"""
res = pickle.dumps(t.res)
prod = pickle.dumps(t.prod)
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
curr_dir = self.pipe.get_curr_dir(t.seg)
str_prod = self.pipe.get_data_dir(t.seg, prod=t.prod)
str_prod = self.pipe.get_data_dir(t.seg, prod=t.task_input)
with self.conn_lock:
with self.conn:
c = self.conn.execute('insert into tasks(seg, curr_dir, status, prod, res, str_prod) values(?, ?, ?, ?, ?, ?)',
(t.seg, curr_dir, 'queued', prod, res, str_prod))
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(curr_dir,)).fetchone()
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_prod) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_prod))
t.id = c.lastrowid
t.status = 'queued'
return t
......@@ -295,7 +299,7 @@ class SqliteTracker(Tracker,threading.Thread):
def update_status(self, t, status):
""" Update the task with new status.
Update the task status in data base. If not done res is set to None.
Update the task status in data base. If not done task_output is set to None.
Parameters
----------
......@@ -307,12 +311,13 @@ class SqliteTracker(Tracker,threading.Thread):
task object
"""
if status == 'done':
res = pickle.dumps(t.res)
task_output = pickle.dumps(t.task_output)
else:
t.res = None
res = pickle.dumps(t.res)
self._asynchronous_request('update tasks set status=?, res=?, tag=? where id=?',
(status, res, t.tag, t.id))
t.task_output = None
task_output = pickle.dumps(t.task_output)
#TODO tag problem
self._asynchronous_request('update tasks set status=?, output=? where id=?',
(status, task_output, t.id))
t.status = status
return t
......@@ -325,8 +330,11 @@ class SqliteTracker(Tracker,threading.Thread):
"""
d = self.pipe.get_curr_dir(seg)
with self.conn:
self.conn.execute('delete from tasks where curr_dir=? and status="running"',(d,))
self.conn.execute('delete from tasks where curr_dir=? and status="queued"',(d,))
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(d,)).fetchone()
self.conn.execute('delete from tasks where seg_id=? and status="running"',(seg_id,))
self.conn.execute('delete from tasks where seg_id=? and status="queued"',(seg_id,))
self.conn.execute('delete from segments where seg_id=?',(seg_id,))
def close(self):
""" Close data base connexion.
......
......@@ -143,16 +143,16 @@ class Worker(object):
except KeyError:
print 'Fail to load object %s from file %s'%(k,filename)
def write_res(self, res):
def write_res(self, task_output):
""" Pickle the result of the segment.
Parameters
----------
res: task result.
task_output: task result.
"""
fn = self.get_data_fn('.pipe_res') # TODO move this to pipe
with closing (file(fn,'w')) as f:
r = pickle.dump(res,f)
r = pickle.dump(task_output,f)
def make_tag(self, seg, glo):
""" Write a tag using parameters of a segment.
......@@ -191,7 +191,7 @@ class Worker(object):
task: task object
"""
seg = task.seg
prod = task.prod
prod = task.task_input
# make directories
os.umask(18)
......@@ -246,7 +246,7 @@ class Worker(object):
-------
string, filename
"""
return path.join(self.pipe.get_data_dir(self.task.seg, prod=self.task.prod), x)
return path.join(self.pipe.get_data_dir(self.task.seg, prod=self.task.task_input), x)
def get_fig_fn(self, x):
""" Complete the filename with the path to the working
......@@ -260,7 +260,7 @@ class Worker(object):
-------
string, filename
"""
return path.join(self.pipe.get_fig_dir(self.task.seg, prod=self.task.prod), x)
return path.join(self.pipe.get_fig_dir(self.task.seg, prod=self.task.task_input), x)
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
......@@ -315,7 +315,7 @@ class Worker(object):
self.task = task
glo = {'get_data_fn':self.get_data_fn, 'get_fig_fn':self.get_fig_fn,
'segment_args':task.prod, 'get_tmp_fn':self.get_tmp_fn,
'segment_args':task.task_input, 'get_tmp_fn':self.get_tmp_fn,
'glob_seg':self.glob_seg,'load_param':self.load_param,
'load_products':self.load_products,
'save_products':self.save_products,
......@@ -451,7 +451,7 @@ class InteractiveWorker(Worker):
"""
seg = task.seg
prod = task.prod
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
self.task = task
......@@ -461,7 +461,7 @@ class InteractiveWorker(Worker):
res = glo['res']
except:
res = None
task.res = res
task.task_output = res
self.write_res(res)
try: # Save params
var_key = glo['var_key']
......@@ -507,7 +507,7 @@ class ThreadWorker(Worker, threading.Thread):
"""
seg = task.seg
prod = task.prod
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
......@@ -526,7 +526,7 @@ class ThreadWorker(Worker, threading.Thread):
res = glo['res']
except:
res = None
task.res = res
task.task_output = res
self.write_res(res)
if task.status == "failed":
return task
......@@ -584,7 +584,7 @@ class ProcessWorker(Worker, Process):
task : task object
"""
seg = task.seg
prod = task.prod
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
......@@ -606,7 +606,7 @@ class ProcessWorker(Worker, Process):
res = glo['res']
except:
res = None
task.res = res
task.task_output = res
self.write_res(res)
if task.status == "failed":
return task
......@@ -624,5 +624,5 @@ class _FakeWorker(ProcessWorker):
""" Used for performance tests.
"""
def execute_task(self, task):
task.res = range(10)
task.task_output = range(10)
return task
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment