## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see http://www.gnu.org/licenses/gpl.html import os.path as path import cPickle as pickle import sqlite3 import task import shutil import threading import Queue import logging from utils import str_date logger = logging.getLogger("scheduler") class Tracker: """ Track the submission of objects through files in the segment directory. The numerous file access may result in a serious performance bottleneck. Nevertheless, it is the only work tracker that may be compatible with the ancestral version of pipelet. """ def get_list_status(self, seg, status): """ Return the task list for a given segment and status. Parameters ---------- seg: string, segment name status: string, task status (done, queued, running, failed) Returns ------- a task list. """ pass def get_queued(self, seg): """ Return the list of queued tasks. Parameters ---------- seg: string, segment name Returns ------- a Task list """ return self.get_list_status(seg,'queued') def get_done(self, seg): """ Return the list of done tasks. Parameters ---------- seg: string, segment name Returns ------- a Task list """ return self.get_list_status(seg,'done') def get_failed(self, seg): """ Return the list of failed tasks. Parameters ---------- seg: string, segment name Returns ------- a Task list """ return self.get_list_status(seg,'failed') def get_running(self, seg): """ Return the list of running tasks. Parameters ---------- seg: string, segment name Returns ------- a Task list """ return self.get_list_status(seg,'running') def add_queued(self, t): """ Add a task to the queue. Add a task to the data base, and return it with its new id and status. Parameters ---------- t: task object. Returns ------- task object """ pass def update_status(self, t, status): """ Update the task with new status. Update the task status. If not done task_output is set to None. Parameters ---------- t: task object status: string (done, failed, queued, running) Returns ------- task object """ pass def get_status(self, t): """ Return task status. Parameters ---------- t: a task object Returns ------- string (done, failed, queued, running) """ pass class SqliteTracker(Tracker,threading.Thread): """ Fast sqlite-based tracker. Tasks are saved in a sqlite3 data base. Format is: TODO """ def __init__(self, pipe): """ Initialize the Sqlite tracker. Parameters ---------- pipe: a pipe instance. """ threading.Thread.__init__(self) ## pipeline object self.pipe = pipe self.seg_id_cache = {} ## string, sql filename self.sqlfile = path.join(pipe._prefix,'.sqlstatus') if self.pipe.sqlfile is not None: self.sqlfile = self.pipe.sqlfile logger.info("tracking job in %s" % self.sqlfile) ## sql connexion object self.conn = sqlite3.connect(self.sqlfile) self.conn.text_factory=str cur = self.conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"') if not cur.fetchone(): with self.conn: self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)') self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)') self.conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)') self.conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)') self.conn.row_factory = sqlite3.Row self.segments_registration() ## requests queue self.requests = Queue.Queue(0) ## threading lock on connexion self.conn_lock = threading.Lock() # We use this to avoid database locked exceptions def segments_registration(self): """ """ previous = None for s in self.pipe.flatten(): curr_dir = self.pipe.get_curr_dir(s) try: docline = self.pipe.repository.get_docline(s) except Exception: docline = "" try: seg_id = self.conn.execute( 'select seg_id from segments where curr_dir = ? limit 1', (curr_dir,)).fetchone()[0] self.seg_id_cache[s] = seg_id logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir)) except TypeError: logger.info("Creating segment %s instance (%s)."%(s, curr_dir)) c = self.conn.execute( 'insert into segments (seg, curr_dir, comment) values (?, ?, ?)' ,(s, curr_dir, docline)) seg_id = c.lastrowid self.seg_id_cache[s] = seg_id logger.info("Storing connectivity for segment %s."%s) for p in self.pipe._parents[s]: self.conn.execute( 'insert into segment_relations (father_id, child_id) values (?, ?)' ,(self.seg_id_cache[p], seg_id)) self.conn.commit() def _asynchronous_request(self, sqlrequest, args): """ Add an sql request to the queue We want those requests to be done FIFO. Parameters ---------- sqlrequest: string, sql request. args: tuple, request arguments. """ self.requests.put((sqlrequest,args)) def run(self): """ Start the tracker. Open a connexion to the sqlite data base, and empty the requests queue. """ threadconn = sqlite3.connect(self.sqlfile) sqlrequest, args = self.requests.get() n_tries = 0 while sqlrequest is not None: try: with self.conn_lock: with threadconn: c = threadconn.execute(sqlrequest, args) except sqlite3.OperationalError: n_tries = n_tries + 1 logger.error('Catch database is locked exception, retrying: %d...' % n_tries) if n_tries > 10: logger.error('The database seems down, abandon tracking') raise else: self.requests.task_done() sqlrequest, args = self.requests.get() self.requests.task_done() def stop(self): """ Stop the tracker. Empty the request queue by adding None. """ logger.info("Stopping the tracker ...") self.requests.put((None,None)) def get_list_status(self, seg, status): """ Return the task list for a given segment and status. Parameters ---------- seg: string, segment name status: string, task status (done, queued, running, failed) Returns ------- a task list. """ l = [] s = self.pipe.get_curr_dir(seg) logger.debug("Looking for curr_dir=%s, status=%s in database"%(s, status)) with self.conn_lock: ts = self.conn.execute('select * from tasks as t join segments as s on t.seg_id=s.seg_id and s.curr_dir=? and t.status=? ',(s, status)) for t in ts: task_output = pickle.loads(t['output']) task_input = pickle.loads(t['input']) l.append(task.Task(seg,status=status,id=t['task_id'], task_input=task_input, task_output=task_output)) return l def add_queued(self, t): """ Add a task to the queue. Add a task to the data base, and return it with its new id and status. Parameters ---------- t: task object. Returns ------- task object """ task_output = pickle.dumps(t.task_output) task_input = pickle.dumps(t.task_input) str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input) with self.conn_lock: with self.conn: seg_id = self.seg_id_cache[t.seg] c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)', (seg_id, 'queued', task_input, task_output, str_input, str_date())) t.id = c.lastrowid logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents))) self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents)) t.status = 'queued' return t def get_status(self, t): """ Return task status. Parameters ---------- t: a task object Returns ------- string (done, failed, queued, running) """ with self.conn_lock: c = self.conn.execute('select status from tasks where task_id=?', t.id) return c.fetchone()['status'] def update_status(self, t, status): """ Update the task with new status. Update the task status in data base. If not done task_output is set to None. Parameters ---------- t: task object status: string (done, failed, queued, running) Returns ------- task object """ if status == 'done': task_output = pickle.dumps(t.task_output) else: t.task_output = None task_output = pickle.dumps(t.task_output) #TODO tag problem self._asynchronous_request('update tasks set status=?, output=? where task_id=?', (status, task_output, t.id)) t.status = status return t def clean(self, seg): """ Remove queued and running tasks from data base. This is called only at the init of the scheduler. Parameters ---------- seg: string, segment name. """ d = self.pipe.get_curr_dir(seg) with self.conn: seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(d,)).fetchone()[0] self.conn.execute('delete from tasks where seg_id=? and status="running"',(seg_id,)) self.conn.execute('delete from tasks where seg_id=? and status="queued"',(seg_id,)) #self.conn.execute('delete from segments where seg_id=?',(seg_id,)) def close(self): """ Close database connexion. """ self.conn.close()