Commit d0e6a65a authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' of gitorious.org:pipelet/pipelet

parents 82563527 2d1ba744
......@@ -465,7 +465,7 @@ def launch_ccali_ge(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_d
def launch_ccage_server (pipe, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='24:00:00', scratch="512M", mem="512M", ressource=['sps']):
""" Launch a scheduler + web server through a SGE batch system @ccin2p3.
"""
server_file="""#! /usr/local/bin/bash -l
server_file="""#! /bin/zsh -l
echo $HOST
echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > $HOME/host_info.sh
source $HOME/host_info.sh
......@@ -491,8 +491,9 @@ EOF
com=['qsub',
'-P', 'P_%s'%project,
'-q', 'demon',
'-l', 'demon=1',
#'-q', 'demon',
#'-l', 'demon=1',
'-l', 'sps=1',
'-l', 'fsize=%s'%scratch,
'-l', 'vmem=%s'%mem,
'-l', 'ct=%s'%cpu_time,
......@@ -501,10 +502,10 @@ EOF
'-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
def launch_ccage_worker (pipe, n, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='01:00:00', scratch="512M", mem="512M", ressource=['sps']):
def launch_ccage_worker (pipe, n, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='01:00:00', scratch="512M", mem="512M", ressource=['sps'], multicores=False):
""" Launch n worker using job array
"""
worker_file = """#! /usr/local/bin/bash -l
worker_file = """#! /bin/zsh -l
echo $HOST
sleep 1
source $HOME/host_info.sh
......@@ -518,6 +519,10 @@ echo "worker returned at $(date)"
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(worker_file%(authkey,logfile))
if multicores:
mc = ['-pe', 'multicores' , '%d'%multicores]
else:
mc = []
com=['qsub',
'-P', 'P_%s'%project,
'-l', 'fsize=%s'%scratch,
......@@ -526,7 +531,7 @@ echo "worker returned at $(date)"
'-t', '1-%d'%n,
'-o' ,logfile,
'-e', errfile,
'-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
'-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
......
......@@ -119,10 +119,13 @@ class Worker(object):
try:
for task in iter(self.scheduler.get_task, None):
self.work_dir = self.pipe.get_curr_dir(task.seg)
self.logger.info('Processing one task of segment %s'%task.seg)
task = self.execute_task(task)
if task.status == "done":
self.logger.info('task done')
self.scheduler.task_done(task)
else:
self.logger.info('task failed')
self.scheduler.task_failed(task)
n = n+1
self.task = None
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment