tracker.py 12.1 KB
Newer Older
Maude Le Jeune's avatar
Maude Le Jeune committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

import os.path as path
Marc Betoule's avatar
Marc Betoule committed
17 18 19
import cPickle as pickle
import sqlite3
import task
20
import shutil
Marc Betoule's avatar
Marc Betoule committed
21 22
import threading
import Queue
23
import logging
24
from utils import str_date
Maude Le Jeune's avatar
Maude Le Jeune committed
25

26
logger = logging.getLogger("scheduler")
Maude Le Jeune's avatar
Maude Le Jeune committed
27
class Tracker:
Marc Betoule's avatar
Marc Betoule committed
28
    """ Track the submission of objects through files in the segment directory.
Maude Le Jeune's avatar
Maude Le Jeune committed
29 30 31 32 33
    
    The numerous file access may result in a serious performance
    bottleneck. Nevertheless, it is the only work tracker that may be
    compatible with the ancestral version of pipelet.
    """
Marc Betoule's avatar
Marc Betoule committed
34
    def get_list_status(self, seg, status):
35 36 37 38 39 40 41 42 43 44 45
        """ Return the task list for a given segment and status. 
        
        Parameters
        ----------
        seg: string, segment name
        status: string, task status (done, queued, running, failed)
        
        Returns
        -------
        a task list. 
        """
Marc Betoule's avatar
Marc Betoule committed
46 47 48
        pass
    
    def get_queued(self, seg):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
49 50 51 52 53 54 55 56 57 58
        """ Return the list of queued tasks.
        
        Parameters
        ----------
        seg: string, segment name
        
        Returns
        -------
        a Task list
        """
Marc Betoule's avatar
Marc Betoule committed
59
        return self.get_list_status(seg,'queued')
Maude Le Jeune's avatar
Maude Le Jeune committed
60

Marc Betoule's avatar
Marc Betoule committed
61
    def get_done(self, seg):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
62 63 64 65 66 67 68 69 70 71
        """ Return the list of done tasks.
        
        Parameters
        ----------
        seg: string, segment name
        
        Returns
        -------
        a Task list
        """
Marc Betoule's avatar
Marc Betoule committed
72 73 74
        return self.get_list_status(seg,'done')
    
    def get_failed(self, seg):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
75 76 77 78 79 80 81 82 83 84
        """ Return the list of failed tasks.
        
        Parameters
        ----------
        seg: string, segment name
        
        Returns
        -------
        a Task list
        """
Marc Betoule's avatar
Marc Betoule committed
85 86 87
        return self.get_list_status(seg,'failed')

    def get_running(self, seg):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
88 89 90 91 92 93 94 95 96 97
        """ Return the list of running tasks.
        
        Parameters
        ----------
        seg: string, segment name
        
        Returns
        -------
        a Task list
        """
Marc Betoule's avatar
Marc Betoule committed
98 99 100
        return self.get_list_status(seg,'running')
    
    def add_queued(self, t):
101 102 103 104 105 106 107 108 109 110 111 112 113
        """ Add a task to the queue. 

        Add a task to the data base, and return it with its new id and status. 
        
        Parameters
        ----------
        t: task object. 
        
        Returns
        -------
        task object
        
        """
Marc Betoule's avatar
Marc Betoule committed
114 115 116
        pass
    
    def update_status(self, t, status):
117 118
        """ Update the task with new status.

119
        Update the task status. If not done task_output is set to None.  
120 121 122 123 124 125 126 127 128 129
        
        Parameters
        ----------
        t: task object
        status: string (done, failed, queued, running)

        Returns
        -------
        task object
        """
Marc Betoule's avatar
Marc Betoule committed
130 131 132
        pass

    def get_status(self, t):
133 134 135 136 137 138 139 140 141 142
        """ Return task status. 
        
        Parameters
        ----------
        t: a task object
        
        Returns
        -------
        string (done, failed, queued, running)
        """
Marc Betoule's avatar
Marc Betoule committed
143 144 145
        pass


Marc Betoule's avatar
Marc Betoule committed
146
class SqliteTracker(Tracker,threading.Thread):
Marc Betoule's avatar
Marc Betoule committed
147 148
    """ Fast sqlite-based tracker.
    
149 150
    Tasks are saved in a sqlite3 data base. 
    Format is: 
151
    TODO
Marc Betoule's avatar
Marc Betoule committed
152
    """
Betoule Marc's avatar
Betoule Marc committed
153

154
    def __init__(self, pipe):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
155 156 157 158 159 160 161
        """ Initialize the Sqlite tracker. 
        
        Parameters
        ----------
        pipe: a pipe instance.
        
        """
Marc Betoule's avatar
Marc Betoule committed
162
        threading.Thread.__init__(self)
163
        ## pipeline object
Maude Le Jeune's avatar
Maude Le Jeune committed
164
        self.pipe = pipe
165

Betoule Marc's avatar
Betoule Marc committed
166 167
        self.seg_id_cache = {}

168 169 170
        ## string, sql filename
        self.sqlfile = path.join(pipe._prefix,'.sqlstatus')
        if self.pipe.sqlfile is not None:
171
            self.sqlfile = self.pipe.sqlfile
172
        logger.info("tracking job in %s" % self.sqlfile)
173 174

        ## sql connexion object
175
        self.conn = sqlite3.connect(self.sqlfile)
Marc Betoule's avatar
Marc Betoule committed
176 177 178
        self.conn.text_factory=str
        cur = self.conn.execute('select tbl_name from sqlite_master where tbl_name == "tasks"')
        if not cur.fetchone():
179
            with self.conn:
180 181 182 183
                self.conn.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT NOT NULL, curr_dir TEXT NOT NULL UNIQUE, param TEXT, tag TEXT, comment TEXT)')
                self.conn.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER NOT NULL, status TEXT NOT NULL, input BLOB NOT NULL, output BLOB, str_input NOT NULL, queued_on TEXT, begun_on TEXT, ended_on TEXT)')
                self.conn.execute('create table task_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
                self.conn.execute('create table segment_relations (father_id INTEGER NOT NULL, child_id INTEGER NOT NULL)')
Marc Betoule's avatar
Marc Betoule committed
184
        self.conn.row_factory = sqlite3.Row
185

186 187
        self.segments_registration()

188
        ## requests queue
Marc Betoule's avatar
Marc Betoule committed
189
        self.requests = Queue.Queue(0)
190 191 192
        
        ## threading lock on connexion 
        self.conn_lock = threading.Lock()  # We use this to avoid database locked exceptions
193

194 195 196
    def segments_registration(self):
        """
        """
Betoule Marc's avatar
Betoule Marc committed
197
        previous = None
198 199 200 201 202 203 204 205 206 207
        for s in self.pipe.flatten():
            curr_dir = self.pipe.get_curr_dir(s)
            try:
                docline = self.pipe.repository.get_docline(s)
            except Exception:
                docline = ""
            try:
                seg_id = self.conn.execute(
                    'select seg_id from segments where curr_dir = ? limit 1',
                    (curr_dir,)).fetchone()[0]
Betoule Marc's avatar
Betoule Marc committed
208
                self.seg_id_cache[s] = seg_id
209 210 211
                logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir))
            except TypeError:
                logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
Betoule Marc's avatar
Betoule Marc committed
212
                c = self.conn.execute(
213 214
                    'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
                    ,(s, curr_dir, docline))
Betoule Marc's avatar
Betoule Marc committed
215 216 217 218 219 220 221
                seg_id = c.lastrowid
                self.seg_id_cache[s] = seg_id
                logger.info("Storing connectivity for segment %s."%s)
                for p in self.pipe._parents[s]:
                    self.conn.execute(
                        'insert into segment_relations (father_id, child_id) values (?, ?)'
                        ,(self.seg_id_cache[p], seg_id))
222 223
        self.conn.commit()

Marc Betoule's avatar
Marc Betoule committed
224
    def _asynchronous_request(self, sqlrequest, args):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
225 226
        """ Add an sql request to the queue
        
Marc Betoule's avatar
Marc Betoule committed
227
        We want those requests to be done FIFO.
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
228 229 230 231

        Parameters
        ----------
        sqlrequest: string, sql request.
232
        args: tuple, request arguments.
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
233

Marc Betoule's avatar
Marc Betoule committed
234 235 236 237
        """
        self.requests.put((sqlrequest,args))
    
    def run(self):
238 239 240 241
        """ Start the tracker. 
        
        Open a connexion to the sqlite data base, and empty the
        requests queue. 
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
242
        """
Marc Betoule's avatar
Marc Betoule committed
243 244
        threadconn = sqlite3.connect(self.sqlfile)
        sqlrequest, args = self.requests.get()
245
        n_tries = 0
Marc Betoule's avatar
Marc Betoule committed
246
        while sqlrequest is not None:
247 248 249 250 251
            try:
                with self.conn_lock:
                    with threadconn:
                        c = threadconn.execute(sqlrequest, args)
            except sqlite3.OperationalError:
252 253 254 255 256
                n_tries = n_tries + 1
                logger.error('Catch database is locked exception, retrying: %d...' % n_tries)
                if n_tries > 10:
                    logger.error('The database seems down, abandon tracking')
                    raise
257 258 259
            else:
                self.requests.task_done()
                sqlrequest, args = self.requests.get()
Marc Betoule's avatar
Marc Betoule committed
260
        self.requests.task_done()
Maude Le Jeune's avatar
Maude Le Jeune committed
261

Maude Le Jeune's avatar
Maude Le Jeune committed
262 263

    def stop(self):
264 265 266 267 268
        """ Stop the tracker.
        
        Empty the request queue by adding None. 
        
        """
269
        logger.info("Stopping the tracker ...")
Maude Le Jeune's avatar
Maude Le Jeune committed
270 271
        self.requests.put((None,None))
       
Marc Betoule's avatar
Marc Betoule committed
272
    def get_list_status(self, seg, status):
273 274 275 276 277 278 279 280 281 282 283
        """ Return the task list for a given segment and status. 

        Parameters
        ----------
        seg: string, segment name
        status: string, task status (done, queued, running, failed)
        
        Returns
        -------
        a task list. 
        """
Marc Betoule's avatar
Marc Betoule committed
284
        l = []
285
        s = self.pipe.get_curr_dir(seg)
286
        logger.debug("Looking for curr_dir=%s, status=%s in database"%(s, status)) 
287
        with self.conn_lock:
288
            ts = self.conn.execute('select * from tasks as t join segments as s on t.seg_id=s.seg_id and s.curr_dir=? and t.status=? ',(s, status))
289
        for t in ts:
290 291
            task_output  = pickle.loads(t['output'])
            task_input = pickle.loads(t['input'])
292
            l.append(task.Task(seg,status=status,id=t['task_id'], task_input=task_input, task_output=task_output))
Marc Betoule's avatar
Marc Betoule committed
293 294 295
        return l
    
    def add_queued(self, t):
296 297 298 299 300 301 302 303 304 305 306 307 308
        """ Add a task to the queue. 

        Add a task to the data base, and return it with its new id and status. 
        
        Parameters
        ----------
        t: task object. 
        
        Returns
        -------
        task object
        
        """
309
        task_output = pickle.dumps(t.task_output)
310 311
        task_input  = pickle.dumps(t.task_input)
        str_input   = self.pipe.get_data_dir(t.seg, prod=t.task_input)
312 313
        with self.conn_lock:
            with self.conn:
314
                seg_id = self.seg_id_cache[t.seg]
315 316
                c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)',
                                      (seg_id, 'queued', task_input, task_output, str_input, str_date()))
317 318 319
                t.id = c.lastrowid
                logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
                self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
Marc Betoule's avatar
Marc Betoule committed
320 321 322 323
        t.status = 'queued'
        return t

    def get_status(self, t):
324 325 326 327 328 329 330 331 332 333
        """ Return task status. 
        
        Parameters
        ----------
        t: a task object
        
        Returns
        -------
        string (done, failed, queued, running)
        """
334
        with self.conn_lock:
335
            c = self.conn.execute('select status from tasks where task_id=?', t.id)
Marc Betoule's avatar
Marc Betoule committed
336 337 338
        return c.fetchone()['status']
    
    def update_status(self, t, status):
339 340
        """ Update the task with new status.

341
        Update the task status in data base. If not done task_output is set to None.  
342 343 344 345 346 347 348 349 350 351
        
        Parameters
        ----------
        t: task object
        status: string (done, failed, queued, running)

        Returns
        -------
        task object
        """
Marc Betoule's avatar
Marc Betoule committed
352
        if status == 'done':
353
            task_output = pickle.dumps(t.task_output)
Marc Betoule's avatar
Marc Betoule committed
354
        else:
355 356 357
            t.task_output = None
            task_output = pickle.dumps(t.task_output)
        #TODO tag problem
358
        self._asynchronous_request('update tasks set status=?, output=? where task_id=?',
359
                                   (status, task_output, t.id))
Marc Betoule's avatar
Marc Betoule committed
360 361
        t.status = status
        return t
362

363
    def clean(self, seg):
364 365
        """ Remove queued and running tasks from data base. 
        
366
        This is called only at the init of the scheduler.
367 368 369 370
        Parameters
        ----------
        seg: string, segment name. 
        """
371 372
        d = self.pipe.get_curr_dir(seg)
        with self.conn:
373
            seg_id = self.conn.execute('select seg_id from segments where curr_dir=?',(d,)).fetchone()[0]
374 375 376
            
            self.conn.execute('delete from tasks where seg_id=? and status="running"',(seg_id,))
            self.conn.execute('delete from tasks where seg_id=? and status="queued"',(seg_id,))
377
            #self.conn.execute('delete from segments where seg_id=?',(seg_id,))
378

379
    def close(self):
380
        """ Close database connexion.
381 382
        """
        self.conn.close()