Commit 33d05a40 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

logging ok for all launchers except ccali

parent 33b55584
......@@ -50,6 +50,14 @@ import logging.handlers
import sys
import datetime
def get_log_file (pipe, name):
"""
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s%f")
return pipe._prefix+"log/"+strdate+"_"+name
def set_logger (pipe, log_level):
""" Set worker and scheduler loggers.
......@@ -62,20 +70,14 @@ def set_logger (pipe, log_level):
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s")
sched_log_file = pipe._prefix+"/log/"+strdate+"_scheduler"
work_log_file = pipe._prefix+"/log/"+strdate+"_worker"
init_logger ('scheduler', level=logging.DEBUG, filename=sched_log_file)
init_logger ('worker', level=logging.DEBUG, filename=work_log_file)
if log_level:
init_logger ('scheduler', level=log_level)
init_logger ('worker', level=log_level)
sched_log_file = get_log_file (pipe, 'scheduler')
work_log_file = get_log_file (pipe, 'worker')
init_logger ('scheduler', sched_log_file, level=log_level)
init_logger ('worker', work_log_file, level=log_level)
def init_logger (name, level=logging.DEBUG, filename=None):
def init_logger (name, filename, level=logging.DEBUG):
""" Initialize a logger.
Parameters
......@@ -90,16 +92,15 @@ def init_logger (name, level=logging.DEBUG, filename=None):
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.setLevel(logging.DEBUG)
if (len(logger.handlers) < 3):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
if filename is not None:
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
else:
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
if level:
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(level)
shandler.setFormatter(formatter)
......@@ -177,7 +178,7 @@ class SchedulerManager(BaseManager):
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret'):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.CRITICAL ):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -190,11 +191,10 @@ def launch_process(pipe, n, address=('',50000), authkey='secret'):
>>> launch_process(T, 2)
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
set_logger (pipe, log_level)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
......@@ -218,7 +218,7 @@ def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret'):
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.CRITICAL ):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
......@@ -230,6 +230,9 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
>>> T = pipeline.Pipeline(['first','second'], code_dir='/home/betoule/soft/pipelet/test/', prefix='/home/betoule/homeafs/')
>>> launch_ssh(T, ['betoule@lpnp204'], address=('lpnp321.in2p3.fr',50000))
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......@@ -239,7 +242,7 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
processlist = []
for i, h in enumerate(host_list):
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s"%(address[0], address[1], authkey)])
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0], address[1], authkey, get_log_file (pipe, 'worker'))])
processlist.append(w)
print 'launching the scheduler'
......@@ -250,13 +253,14 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
for w in processlist:
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_dir="/wrk/lejeune/pipelet/scripts", job_name="job_", log_dir="/wrk/lejeune/pipelet/logs", cpu_time="2:00:00", server=False, job_header="""
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.CRITICAL, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......@@ -266,16 +270,16 @@ echo $PYTHONPATH
processlist = []
for i in range(n):
jobfile = path.join(job_dir,"%s%d.py"%(job_name,i))
errfile = path.join(log_dir,"e_%s%d"%(job_name,i))
logfile = path.join(log_dir,"o_%s%d"%(job_name,i))
jobfile = get_log_file (pipe, 'job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s"%(address[0],address[1],authkey))
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,work_log_file))
f.close()
subprocess.Popen(['qsub',jobfile]).communicate()[0]
......@@ -339,9 +343,12 @@ if __name__ == "__main__":
help='port the scheduler is listenning to', default=50000, type='int')
parser.add_option('-s', '--secret', metavar='key',
help='authentication key', default='secret')
parser.add_option('-l', '--logfile', metavar='logfile',
help='worker log filename')
(options, args) = parser.parse_args()
print (options.host,options.port)
print options.secret
init_logger ('worker', options.logfile)
w = worker.ProcessWorker(address=(options.host,options.port), authkey=options.secret)
w.run()
......
......@@ -244,8 +244,8 @@ class Web:
""" Print the content of the log directory.
"""
directory = logdir
html = html_tmp + '<h1> Content of %s </h1><a href="delete_log?logdir=%s">Delete logs</a> <div class="list"><ul>'%(directory,logdir)
for filename in glob(os.path.join(directory,'*')):
html = html_tmp + '<h1> Content of %s </h1> <a href="/%s/">Back</a><div class="list"><a href="delete_log?logdir=%s">Delete logs</a><ul>'%(directory,self.name,logdir)
for filename in sorted(glob(os.path.join(directory,'*')), reverse=True):
absPath = os.path.abspath(filename)
html += '<li><a href="serve_log?filename='+absPath+ '">' + os.path.basename(filename) + "</a> </li>"
html += """</ul></div></body></html>"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment