Commit 33b55584 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

logging ok for worker and scheduler in Interactive/Threads/Process mode

parent f3165541
......@@ -45,19 +45,91 @@ import pipelet.worker as worker
import pipelet.pipeline as pipeline
from os import path
from contextlib import closing
import logging
import logging.handlers
import sys
import datetime
def launch_interactive(pipe):
def set_logger (pipe, log_level):
""" Set worker and scheduler loggers.
Parameters
----------
pipe: a pipe instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s")
sched_log_file = pipe._prefix+"/log/"+strdate+"_scheduler"
work_log_file = pipe._prefix+"/log/"+strdate+"_worker"
init_logger ('scheduler', level=logging.DEBUG, filename=sched_log_file)
init_logger ('worker', level=logging.DEBUG, filename=work_log_file)
if log_level:
init_logger ('scheduler', level=log_level)
init_logger ('worker', level=log_level)
def init_logger (name, level=logging.DEBUG, filename=None):
""" Initialize a logger.
Parameters
----------
name: string, logger name
level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
filename: if not None, a rotating file handler is used.
Default is stream handler.
"""
logger = logging.getLogger(name)
logger.setLevel(level)
if (len(logger.handlers) < 3):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
if filename is not None:
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
else:
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(level)
shandler.setFormatter(formatter)
logger.addHandler(shandler)
def launch_interactive(pipe, log_level=logging.DEBUG):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
execution can be tracked.
Parameters
----------
pipe: a pipeline instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> w,t = launch_interactive(T)
>>> w.run()
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
w = worker.InteractiveWorker(s)
import threading
......@@ -67,18 +139,32 @@ def launch_interactive(pipe):
def launch_thread(pipe, n):
def launch_thread(pipe, n, log_level=logging.CRITICAL ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
execution of any segment are caught and the corresponding task is
marked as failed.
Parameters
----------
pipe: a pipeline instance
n : integer, the number of threads
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> launch_thread(T, 2)
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
for i in range(n):
w = worker.ThreadWorker(s)
......@@ -103,6 +189,9 @@ def launch_process(pipe, n, address=('',50000), authkey='secret'):
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> launch_process(T, 2)
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......
......@@ -18,6 +18,7 @@ import sqlite3
import cherrypy
from contextlib import closing
import os.path
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
from glob import glob
import shutil
......@@ -63,15 +64,17 @@ class Web:
----------
highlight: list of segid (optional), filter the printed seg by segid.
"""
html = html_tmp
html += '<h1>Pipelines in %s </h1> <div class="list"><ul class="mktree" id="segtree"> '%self.name
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
# get all instances
with conn:
l = conn.execute('select seg, curr_dir, seg_id, tag from segments order by curr_dir').fetchall()
html = html_tmp
html += '<h1>Pipelines in %s </h1> <a href="log?logdir=%s">logs</a> <div class="list"><ul class="mktree" id="segtree"> '%(self.name,l[0][1].split("seg")[0]+"log")
indent = -1
# select a subset
if highlight is not None:
newl = []
......@@ -234,6 +237,37 @@ class Web:
forbidden_path = re.compile('.*\.\..*')
@cherrypy.expose
@read_access
def log(self, logdir):
""" Print the content of the log directory.
"""
directory = logdir
html = html_tmp + '<h1> Content of %s </h1><a href="delete_log?logdir=%s">Delete logs</a> <div class="list"><ul>'%(directory,logdir)
for filename in glob(os.path.join(directory,'*')):
absPath = os.path.abspath(filename)
html += '<li><a href="serve_log?filename='+absPath+ '">' + os.path.basename(filename) + "</a> </li>"
html += """</ul></div></body></html>"""
return html
@cherrypy.expose
@read_access
def serve_log(self, filename):
""" Print the content of the log file.
"""
return serve_file(filename, content_type='text/plain', disposition="inline")
@cherrypy.expose
@write_access
def delete_log(self, logdir):
""" Delete the content of the log directory.
"""
for filename in glob(os.path.join(logdir,'*')):
absPath = os.path.abspath(filename)
os.remove(absPath)
raise cherrypy.HTTPRedirect("log?logdir=%s"%logdir,303)
def check_path(self, segid, path):
"""Chroot the path to the segid currdir.
"""
......
......@@ -37,7 +37,7 @@ class NullHandler(logging.Handler):
"""
pass
logger = logging.getLogger("scheduler")
logger = logging.getLogger("worker")
h = NullHandler()
logger.addHandler(h)
......@@ -91,11 +91,11 @@ class Worker(object):
queue is empty. The task status are updated after completion
or failure.
"""
print 'checking in ...'
logger.info( 'checking in ...')
if not self.scheduler.check_in():
print "check_in failed : stopping"
logger.warning( "check_in failed : stopping")
return
print 'checked in.'
logger.info( 'checked in.')
if self.pipe.matplotlib:
import matplotlib
......@@ -109,7 +109,7 @@ class Worker(object):
try:
task = self.execute_task(task)
except AbortError, e:
print "Abort after catching signal %d" % e.signal
logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task)
break
if task.status == "done":
......@@ -119,9 +119,9 @@ class Worker(object):
n = n+1
else:
break
print("%d jobs completed" % n)
logger.info("%d jobs completed" % n)
except AbortError, e:
print "Abort after catching signal %d" % e.signal
logger.warning( "Abort after catching signal %d" % e.signal)
finally:
self.terminate()
......@@ -145,9 +145,9 @@ class Worker(object):
new_dict = pickle.load(f)
f.close()
except IOError:
print 'No such file: %s'%filename
logger.warning( 'No such file: %s'%filename)
except UnpicklingError:
print 'Failed to unpickle from file: %s'%filename
logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
......@@ -155,7 +155,7 @@ class Worker(object):
try:
glo[k] = new_dict[k]
except KeyError:
print 'Fail to load object %s from file %s'%(k,filename)
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def write_res(self, seg, task_output):
""" Pickle the result of the segment.
......@@ -190,7 +190,7 @@ class Worker(object):
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
print 'parameter '+param+' not in dictionary'
logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
if not path.exists(self.pipe.get_tag_file(seg)):
f = file(self.pipe.get_tag_file(seg), "w")
......@@ -394,7 +394,7 @@ class Worker(object):
try:
new_dict[k] = glo[k]
except KeyError:
print 'Fail to save object %s in file %s'%(k,filename)
logger.warning( 'Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
......@@ -427,11 +427,11 @@ class InteractiveWorker(Worker):
queue is empty. The task status are updated after completion
or failure.
"""
print 'checking in ...'
logger.info( 'checking in ...')
if not self.scheduler.check_in():
print "check_in failed : stopping"
logger.warning( "check_in failed : stopping")
return
print 'checked in.'
logger.info( 'checked in.')
if self.pipe.matplotlib_interactive:
import matplotlib
matplotlib.use('Agg')
......@@ -450,9 +450,9 @@ class InteractiveWorker(Worker):
n = n+1
else:
break
print("%d jobs completed" % n)
logger.info("%d jobs completed" % n)
except KeyboardInterrupt:
print "Abort after catching signal"
logger.warning( "Abort after catching signal" )
finally:
self.terminate()
......@@ -485,9 +485,9 @@ class InteractiveWorker(Worker):
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
task.status = "done" # set status
return task
......@@ -552,9 +552,9 @@ class ThreadWorker(Worker, threading.Thread):
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
return task
......@@ -581,7 +581,7 @@ class ProcessWorker(Worker, Process):
"""
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.connect()
print "connected to %s"%str(address)
logger.info( "connected to %s"%str(address))
Worker.__init__(self, mgr.get_scheduler(), **keys)
Process.__init__(self)
signal.signal(signal.SIGUSR1, catch_sigterm)
......@@ -632,9 +632,9 @@ class ProcessWorker(Worker, Process):
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
return task
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment