Commit 57d6569d authored by Betoule Marc's avatar Betoule Marc
Browse files

modifications to the way ccali jobs are launched

parent c4c3bea2
......@@ -263,24 +263,27 @@ python -m pipelet.launchers -H %s -p %s -s %s
"""
def launch_ccali(pipe, n, address=None, authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs",
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.DEBUG):
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.DEBUG,buff=None):
"""submit a bunch of distant workers to the BQS system
at CCIN2P3.
"""
set_logger (pipe, log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
if server:
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
import cPickle as pickle
if buff is not None:
b = ['-b', buff]
else:
b = []
s = subprocess.Popen(['pipeletd',
'-l', '%d'%log_level,
'-p', '%d'%address[1],
'-a', address[0]]+b, stdin=subprocess.PIPE)
s.communicate(input=pickle.dumps(pipe))
processlist = []
for i in range(n):
jobfile = get_log_file (pipe, 'job')
name = '%s%d'%(job_name,i)
jobfile = get_log_file (pipe, name+'.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
......@@ -292,12 +295,8 @@ def launch_ccali(pipe, n, address=None, authkey='secret', job_dir="/sps/snls13/u
'-l', 'scratch=%s'%scratch,
'-o', logfile,
'-e', errfile,
'-N', '%s%d'%(job_name,i),
'-N', name,
jobfile]).communicate()[0]
if server:
print 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
if __name__ == "__main__":
import optparse
......
......@@ -3,38 +3,48 @@
"""
import sys, os
import logging
import signal
import shutil
def main():
class AbortError(Exception):
""" Extension of the Exception class.
"""
def __init__(self, signal=0):
""" Initialize an error.
Parameters
----------
signal: integer, signal value.
"""
## integer, signal value.
self.signal = signal
def catch_sigterm(signal, frame):
raise AbortError(signal)
def main(options, args):
"""
"""
import pipelet.scheduler as scheduler
from multiprocessing.managers import BaseManager
import multiprocessing
from pipelet.utils import get_log_file
import logging
import logging.handlers
import optparse
import pickle
parser = optparse.OptionParser()
parser.add_option('-l', '--log-level', metavar='N',
help='Set the verbosity of the loggin',type='int',default=logging.INFO)
parser.add_option('-p', '--port',
help='Listen on port',type='int',default=5000)
parser.add_option('-a', '--address',
help='Listen on address',type='str',default=os.environ['HOST'])
(options, args) = parser.parse_args()
# Reading the pipe
pipefile = args[0]
try:
f = open(pipefile)
pipe = pickle.load(f)
f.close()
except IOError:
sys.exit(1)
pipefile = open(args[0])
pipe = pickle.load(pipefile)
pipefile.close()
except IndexError:
pipe = pickle.load(sys.stdin)
# Setting up logging
# Set up logging
log_file = get_log_file (pipe, 'scheduler_on_%s'%os.environ['HOST'])
logger = logging.getLogger("scheduler")
logger.setLevel(options.log_level)
......@@ -46,30 +56,49 @@ def main():
ml = multiprocessing.get_logger()
ml.setLevel(options.log_level)
ml.addHandler(rfhandler)
signal.signal(signal.SIGUSR1, catch_sigterm)
signal.signal(signal.SIGTERM, catch_sigterm)
signal.signal(signal.SIGABRT, catch_sigterm)
signal.signal(signal.SIGINT, catch_sigterm)
try:
# Now that logging is set up decouple from parent environnement
redirect_stream(sys.stdin, None)
redirect_stream(sys.stdout, None)
redirect_stream(sys.stderr, None)
# Launching the scheduler
class SchedulerManager(BaseManager):
""" Extension of the BaseManager class.
"""
pass
if options.buffer:
old = pipe.sqlfile
pipe.sqlfile = options.buffer
shutil.copy(old, options.buffer)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
mgr = SchedulerManager(address=(options.address, options.port), authkey='secret')
server = mgr.get_server()
print "Started"
# Now that logging is set up decouple from parent environnement
redirect_stream(sys.stdin, None)
redirect_stream(sys.stdout, None)
redirect_stream(sys.stderr, None)
os.chdir(WORKDIR)
server.serve_forever()
except BaseException, e:
logger.critical(e.message)
sys.exit(1)
except BaseException, e:
logger.critical(str(e))
finally:
if options.buffer:
try:
shutil.copy(options.buffer, old)
except:
pass
# Default working directory for the daemon.
WORKDIR = "/"
......@@ -92,6 +121,20 @@ def redirect_stream(system_stream, target_stream):
if __name__ == "__main__":
import optparse
parser = optparse.OptionParser(usage="pipeletd [options] pipe.pickle")
parser.add_option('-l', '--log-level', metavar='N',
help='Set the verbosity of the loggin',type='int',default=logging.INFO)
parser.add_option('-p', '--port',
help='Listen on port',type='int',default=5000)
parser.add_option('-a', '--address',
help='Listen on address',type='str',default=os.environ['HOST'])
parser.add_option('-b', '--buffer',
help='Make a local copy of the database file. Usefull to overcome the slowness of network filesystems',
default="", type='str')
(options, args) = parser.parse_args()
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
......@@ -113,11 +156,11 @@ if __name__ == "__main__":
pid = os.fork()
if pid > 0:
# exit from second parent, print eventual PID before
print "Daemon PID %d" % pid
print "Starting Scheduler as daemon with PID %d ..." % pid
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# start the daemon main loop
main()
# start the daemon main
main(options, args)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment