Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 507576e3 authored by Marc Betoule's avatar Marc Betoule
Browse files

Merge branch 'newbase' of git@gitorious.org:pipelet/pipelet into newbase

Conflicts:
	pipelet/worker.py
parents d8e8886e e69c2282
......@@ -45,19 +45,92 @@ import pipelet.worker as worker
import pipelet.pipeline as pipeline
from os import path
from contextlib import closing
import logging
import logging.handlers
import sys
import datetime
def launch_interactive(pipe):
def get_log_file (pipe, name):
"""
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%d%j_%H%M%s%f")
return path.join(pipe.get_log_dir(),strdate+"_"+name)
def set_logger (pipe, log_level):
""" Set worker and scheduler loggers.
Parameters
----------
pipe: a pipe instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
"""
sched_log_file = get_log_file (pipe, 'scheduler')
work_log_file = get_log_file (pipe, 'worker')
init_logger ('scheduler', sched_log_file, level=log_level)
init_logger ('worker', work_log_file, level=log_level)
def init_logger (name, filename, level=logging.DEBUG):
""" Initialize a logger.
Parameters
----------
name: string, logger name
level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
filename: if not None, a rotating file handler is used.
Default is stream handler.
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
if (len(logger.handlers) < 3):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
if level:
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(level)
shandler.setFormatter(formatter)
logger.addHandler(shandler)
def launch_interactive(pipe, log_level=logging.DEBUG):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
execution can be tracked.
Parameters
----------
pipe: a pipeline instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> w,t = launch_interactive(T)
>>> w.run()
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
w = worker.InteractiveWorker(s)
import threading
......@@ -67,18 +140,32 @@ def launch_interactive(pipe):
def launch_thread(pipe, n):
def launch_thread(pipe, n, log_level=logging.CRITICAL ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
execution of any segment are caught and the corresponding task is
marked as failed.
Parameters
----------
pipe: a pipeline instance
n : integer, the number of threads
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> launch_thread(T, 2)
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
for i in range(n):
w = worker.ThreadWorker(s)
......@@ -91,7 +178,7 @@ class SchedulerManager(BaseManager):
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret'):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.CRITICAL ):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -103,9 +190,11 @@ def launch_process(pipe, n, address=('',50000), authkey='secret'):
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> launch_process(T, 2)
"""
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
set_logger (pipe, log_level)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
......@@ -129,7 +218,7 @@ def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret'):
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.CRITICAL ):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
......@@ -141,6 +230,9 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
>>> T = pipeline.Pipeline(['first','second'], code_dir='/home/betoule/soft/pipelet/test/', prefix='/home/betoule/homeafs/')
>>> launch_ssh(T, ['betoule@lpnp204'], address=('lpnp321.in2p3.fr',50000))
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......@@ -150,7 +242,7 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
processlist = []
for i, h in enumerate(host_list):
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s"%(address[0], address[1], authkey)])
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0], address[1], authkey, get_log_file (pipe, 'worker'))])
processlist.append(w)
print 'launching the scheduler'
......@@ -161,13 +253,14 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
for w in processlist:
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_dir="/wrk/lejeune/pipelet/scripts", job_name="job_", log_dir="/wrk/lejeune/pipelet/logs", cpu_time="2:00:00", server=False, job_header="""
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.CRITICAL, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......@@ -177,16 +270,16 @@ echo $PYTHONPATH
processlist = []
for i in range(n):
jobfile = path.join(job_dir,"%s%d.py"%(job_name,i))
errfile = path.join(log_dir,"e_%s%d"%(job_name,i))
logfile = path.join(log_dir,"o_%s%d"%(job_name,i))
jobfile = get_log_file (pipe, 'job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s"%(address[0],address[1],authkey))
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
f.close()
subprocess.Popen(['qsub',jobfile]).communicate()[0]
......@@ -250,9 +343,12 @@ if __name__ == "__main__":
help='port the scheduler is listenning to', default=50000, type='int')
parser.add_option('-s', '--secret', metavar='key',
help='authentication key', default='secret')
parser.add_option('-l', '--logfile', metavar='logfile',
help='worker log filename')
(options, args) = parser.parse_args()
print (options.host,options.port)
print options.secret
init_logger ('worker', options.logfile)
w = worker.ProcessWorker(address=(options.host,options.port), authkey=options.secret)
w.run()
......
......@@ -434,7 +434,7 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_file(self, seg):
def get_output_fn(self, seg):
""" Return the segment output file
Parameters
......@@ -508,6 +508,17 @@ class Pipeline:
return path.join(self.get_curr_dir(seg), 'data')
def get_log_dir (self):
""" Return the pipe log directory.
"""
return path.join(self._prefix,"log")
def get_log_file (self, seg):
""" Return the segment log filename.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.log'%seg)
if __name__ == "__main__":
import doctest
......
......@@ -261,8 +261,8 @@ class LocalRepository(Repository):
['../test/seg_dbm_preproc.py']
"""
try:
f = filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_%s.*'%(seg,hook))))[0]
f = [filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_%s.*'%(seg,hook))))[0]]
except:
f = []
if len(f) == 0:
......
// Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see http://www.gnu.org/licenses/gpl.html
function confirmation(url) {
check = confirm("Are you sure ?");
if (check == true) {
self.location.href=url;
}
}
function edit_tag(url) {
var check = prompt("Select an existing tag among: \n - tag1\n - tag2\n or set a new tag for this pipe","tag1");
if (check!=null) {
self.location.href=url+check;
}
}
......@@ -18,6 +18,7 @@ import sqlite3
import cherrypy
from contextlib import closing
import os.path
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
from glob import glob
import shutil
......@@ -30,6 +31,7 @@ html_tmp = """
<head>
<SCRIPT SRC="/static/mktree.js" LANGUAGE="JavaScript"></SCRIPT>
<SCRIPT SRC="/static/cook.js" LANGUAGE="JavaScript"></SCRIPT>
<SCRIPT SRC="/static/tag.js" LANGUAGE="JavaScript"></SCRIPT>
<LINK REL="stylesheet" HREF="/static/mktree.css">
</head><body>
"""
......@@ -63,15 +65,17 @@ class Web:
----------
highlight: list of segid (optional), filter the printed seg by segid.
"""
html = html_tmp
html += '<h1>Pipelines in %s </h1> <div class="list"><ul class="mktree" id="segtree"> '%self.name
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
# get all instances
with conn:
l = conn.execute('select seg, curr_dir, seg_id, tag from segments order by curr_dir').fetchall()
html = html_tmp
html += '<h1>Pipelines in %s </h1> <a href="log?logdir=%s">logs</a> <div class="list"><ul class="mktree" id="segtree"> '%(self.name,l[0][1].split("seg")[0]+"log")
indent = -1
# select a subset
if highlight is not None:
newl = []
......@@ -95,7 +99,7 @@ class Web:
print s
for stat in e:
ss = '<a href="product?segid=%s&status=%s" class=%s>%d</a>, '%(s[2],stat[0], stat[0], stat[1]) + ss
ss += '<a href="delseg?segid=%d"> (delete) </a>'%s[2]
ss += '<a href="javascript:confirmation(\'delseg?segid=%d\')"> (delete)</a><spacer> <a href="javascript:edit_tag(\'tag?segid=%d\')"> (tag)</a>'%(s[2],s[2])
diff = s[1].count('/') - indent
if diff == 1:
html += '<ul> <li id=%d ><a href="code?segid=%d">%s</a> : %s\n'%(s[2],s[2],s[0],ss)
......@@ -233,6 +237,37 @@ class Web:
forbidden_path = re.compile('.*\.\..*')
@cherrypy.expose
@read_access
def log(self, logdir):
""" Print the content of the log directory.
"""
directory = logdir
html = html_tmp + '<h1> Content of %s </h1> <a href="/%s/">Back</a><div class="list"><a href="delete_log?logdir=%s">Delete logs</a><ul>'%(directory,self.name,logdir)
for filename in sorted(glob(os.path.join(directory,'*')), reverse=True):
absPath = os.path.abspath(filename)
html += '<li><a href="serve_log?filename='+absPath+ '">' + os.path.basename(filename) + "</a> </li>"
html += """</ul></div></body></html>"""
return html
@cherrypy.expose
@read_access
def serve_log(self, filename):
""" Print the content of the log file.
"""
return serve_file(filename, content_type='text/plain', disposition="inline")
@cherrypy.expose
@write_access
def delete_log(self, logdir):
""" Delete the content of the log directory.
"""
for filename in glob(os.path.join(logdir,'*')):
absPath = os.path.abspath(filename)
os.remove(absPath)
raise cherrypy.HTTPRedirect("log?logdir=%s"%logdir,303)
def check_path(self, segid, path):
"""Chroot the path to the segid currdir.
"""
......
......@@ -26,6 +26,20 @@ import traceback
import signal
from contextlib import closing
import gc
import logging
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
"""
def emit(self, record):
""" Avoid warnings.
"""
pass
logger = logging.getLogger("worker")
h = NullHandler()
logger.addHandler(h)
class AbortError(Exception):
""" Extension of the Exception class.
......@@ -77,11 +91,11 @@ class Worker(object):
queue is empty. The task status are updated after completion
or failure.
"""
print 'checking in ...'
logger.info( 'checking in ...')
if not self.scheduler.check_in():
print "check_in failed : stopping"
logger.warning( "check_in failed : stopping")
return
print 'checked in.'
logger.info( 'checked in.')
if self.pipe.matplotlib:
import matplotlib
......@@ -95,7 +109,7 @@ class Worker(object):
try:
task = self.execute_task(task)
except AbortError, e:
print "Abort after catching signal %d" % e.signal
logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task)
break
if task.status == "done":
......@@ -105,9 +119,9 @@ class Worker(object):
n = n+1
else:
break
print("%d jobs completed" % n)
logger.info("%d jobs completed" % n)
except AbortError, e:
print "Abort after catching signal %d" % e.signal
logger.warning( "Abort after catching signal %d" % e.signal)
finally:
self.terminate()
......@@ -131,9 +145,9 @@ class Worker(object):
new_dict = pickle.load(f)
f.close()
except IOError:
print 'No such file: %s'%filename
logger.warning( 'No such file: %s'%filename)
except UnpicklingError:
print 'Failed to unpickle from file: %s'%filename
logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
......@@ -141,16 +155,17 @@ class Worker(object):
try:
glo[k] = new_dict[k]
except KeyError:
print 'Fail to load object %s from file %s'%(k,filename)
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def write_res(self, task_output):
def write_res(self, seg, task_output):
""" Pickle the result of the segment.
Parameters
----------
seg: string, segment name
task_output: task result.
"""
fn = self.pipe.get_output_file(self.task.seg)
fn = self.pipe.get_output_fn(seg)
with closing (file(fn,'w')) as f:
r = pickle.dump(task_output,f)
......@@ -175,7 +190,7 @@ class Worker(object):
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
print 'parameter '+param+' not in dictionary'
logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
if not path.exists(self.pipe.get_tag_file(seg)):
f = file(self.pipe.get_tag_file(seg), "w")
......@@ -379,7 +394,7 @@ class Worker(object):
try:
new_dict[k] = glo[k]
except KeyError:
print 'Fail to save object %s in file %s'%(k,filename)
logger.warning( 'Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
......@@ -412,11 +427,11 @@ class InteractiveWorker(Worker):
queue is empty. The task status are updated after completion
or failure.
"""
print 'checking in ...'
logger.info( 'checking in ...')
if not self.scheduler.check_in():
print "check_in failed : stopping"
logger.warning( "check_in failed : stopping")
return
print 'checked in.'
logger.info( 'checked in.')
if self.pipe.matplotlib_interactive:
import matplotlib
matplotlib.use('Agg')
......@@ -435,9 +450,9 @@ class InteractiveWorker(Worker):
n = n+1
else:
break
print("%d jobs completed" % n)
logger.info("%d jobs completed" % n)
except KeyboardInterrupt:
print "Abort after catching signal"
logger.warning( "Abort after catching signal" )
finally:
self.terminate()
......@@ -465,14 +480,14 @@ class InteractiveWorker(Worker):
except:
res = None
task.task_output = res
self.write_res(res)
self.write_res(seg, res)
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
task.status = "done" # set status
return task
......@@ -530,16 +545,16 @@ class ThreadWorker(Worker, threading.Thread):
except:
res = None
task.task_output = res
self.write_res(res)
self.write_res(seg, res)
if task.status == "failed":
return task
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
return task
......@@ -566,7 +581,7 @@ class ProcessWorker(Worker, Process):
"""
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.connect()
print "connected to %s"%str(address)
logger.info( "connected to %s"%str(address))
Worker.__init__(self, mgr.get_scheduler(), **keys)
Process.__init__(self)
signal.signal(signal.SIGUSR1, catch_sigterm)
......@@ -610,16 +625,16 @@ class ProcessWorker(Worker, Process):
except:
res = None
task.task_output = res
self.write_res(res)
self.write_res(seg, res)
if task.status == "failed":
return task
try: # Save params
var_key = glo['var_key']
self.save_param(seg, glo, param_name=var_key)
except KeyError:
print 'Nothing to save in param file for seg %s' % seg
logger.warning( 'Nothing to save in param file for seg %s' % seg)
except Exception:
print 'Fail to save the param file for seg %s' % seg
logger.warning( 'Fail to save the param file for seg %s' % seg)
task.tag = self.make_tag(seg, glo) # Dump var_tag
return task
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment