Commit f6a94392 authored by Marc Betoule's avatar Marc Betoule
Browse files

Get a working implementation with the new db format. Link between tasks missing.

parent 6f809eff
......@@ -93,8 +93,21 @@ class Repository:
string list, name of source files.
"""
return []
def get_docline(self, seg):
""" Return the segment synopsis doc line.
Parameters
----------
seg : string, name of the segment.
Returns
-------
string list, name of source files.
"""
import pydoc
return pydoc.source_synopsis(self.get_code_file(seg))
class LocalRepository(Repository):
""" A local repository.
......
......@@ -283,7 +283,7 @@ class Scheduler():
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d done tasks segment %s'%(len(failed),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queued
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
......
......@@ -180,12 +180,35 @@ class SqliteTracker(Tracker,threading.Thread):
self.conn.row_factory = sqlite3.Row
self.segments_registration()
## requests queue
self.requests = Queue.Queue(0)
## threading lock on connexion
self.conn_lock = threading.Lock() # We use this to avoid database locked exceptions
def segments_registration(self):
"""
"""
for s in self.pipe.flatten():
curr_dir = self.pipe.get_curr_dir(s)
try:
docline = self.pipe.repository.get_docline(s)
except Exception:
docline = ""
try:
seg_id = self.conn.execute(
'select seg_id from segments where curr_dir = ? limit 1',
(curr_dir,)).fetchone()[0]
logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir))
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
self.conn.execute(
'insert into segments (seg, curr_dir, tag, comment) values (?, ?, ?, ?)'
,(s, curr_dir, "", docline))
self.conn.commit()
def _asynchronous_request(self, sqlrequest, args):
""" Add an sql request to the queue
......@@ -207,13 +230,18 @@ class SqliteTracker(Tracker,threading.Thread):
"""
threadconn = sqlite3.connect(self.sqlfile)
sqlrequest, args = self.requests.get()
n_tries = 0
while sqlrequest is not None:
try:
with self.conn_lock:
with threadconn:
c = threadconn.execute(sqlrequest, args)
except sqlite3.OperationalError:
logger.error('Catch database is locked exception, retrying...')
n_tries = n_tries + 1
logger.error('Catch database is locked exception, retrying: %d...' % n_tries)
if n_tries > 10:
logger.error('The database seems down, abandon tracking')
raise
else:
self.requests.task_done()
sqlrequest, args = self.requests.get()
......@@ -251,7 +279,7 @@ class SqliteTracker(Tracker,threading.Thread):
for t in ts:
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
l.append(task.Task(seg,status=status,id=t['id'], task_input=task_input, task_output=task_output))
l.append(task.Task(seg,status=status,id=t['task_id'], task_input=task_input, task_output=task_output))
return l
def add_queued(self, t):
......@@ -274,7 +302,7 @@ class SqliteTracker(Tracker,threading.Thread):
str_prod = self.pipe.get_data_dir(t.seg, prod=t.task_input)
with self.conn_lock:
with self.conn:
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(curr_dir,)).fetchone()
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(curr_dir,)).fetchone()[0]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_prod) values(?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_prod))
t.id = c.lastrowid
......@@ -293,7 +321,7 @@ class SqliteTracker(Tracker,threading.Thread):
string (done, failed, queued, running)
"""
with self.conn_lock:
c = self.conn.execute('select status from tasks where id=?', t.id)
c = self.conn.execute('select status from tasks where task_id=?', t.id)
return c.fetchone()['status']
def update_status(self, t, status):
......@@ -316,7 +344,7 @@ class SqliteTracker(Tracker,threading.Thread):
t.task_output = None
task_output = pickle.dumps(t.task_output)
#TODO tag problem
self._asynchronous_request('update tasks set status=?, output=? where id=?',
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',
(status, task_output, t.id))
t.status = status
return t
......@@ -330,11 +358,11 @@ class SqliteTracker(Tracker,threading.Thread):
"""
d = self.pipe.get_curr_dir(seg)
with self.conn:
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(d,)).fetchone()
seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(d,)).fetchone()[0]
self.conn.execute('delete from tasks where seg_id=? and status="running"',(seg_id,))
self.conn.execute('delete from tasks where seg_id=? and status="queued"',(seg_id,))
self.conn.execute('delete from segments where seg_id=?',(seg_id,))
#self.conn.execute('delete from segments where seg_id=?',(seg_id,))
def close(self):
""" Close data base connexion.
......
......@@ -3,6 +3,8 @@ import pipelet.worker as worker
import pipelet.pipeline as pipeline
from pipelet.launchers import launch_interactive
import os.path as op
import logging
import sys
S = """
first->second->fourth
third->fourth
......@@ -10,6 +12,15 @@ third->fourth
#T.connect('second', ['third', 'fourth'], 'fourth')
#T.compute_hash()
logger = logging.getLogger('scheduler')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(logging.DEBUG)
shandler.setFormatter(formatter)
logger.addHandler(shandler)
T = pipeline.Pipeline(S, code_dir=op.abspath('./'), prefix=op.abspath('./'))
T.to_dot('pipeline.dot')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment