launchers.py 20.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

""" Utilitary functions to launch Pipes

- launch_interactive : launch a local worker in the interactive session
  (debugger compliant)

- launch_thread : launch a bunch of local workers in separate threads
  (SMP machine compliant)

- launch_process : launch a bunch of local workers in separate
  processes (usefull when the GIL becomes limitant ie most time spend
  in pure python processings)

- launch_ssh : launch a bunch of distant workers through ssh

- launch_pbs : submit a bunch of distant workers to any PBS compliant
  system (keep in mind that the pipe directory must be accessible to
  workers)

- launch_ccali : submit a bunch of distant workers to the BQS system
  at CCIN2P3
36

37 38 39
- launch_ccali_ge : submit a bunch of distant workers to the Grid Engine system
  at CCIN2P3

40 41 42
- launch_pbspro : submit a bunch of distant workers to the PBSpro
  manager (magique3, ...)

43
Run as a main, it can be used to launch a worker on a distant machine
44 45
"""

46 47 48
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
49
from contextlib import closing
50 51 52
import logging
import logging.handlers
import sys
53
from pipelet.utils import get_log_file, init_logger
Betoule Marc's avatar
Betoule Marc committed
54 55 56 57
import socket  # to get the hostname
from multiprocessing.managers import BaseManager
import subprocess

58

Betoule Marc's avatar
Betoule Marc committed
59
def launch_interactive(pipe, log_level=logging.INFO):
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
60
    """ Launch a local worker in the interactive session.
Betoule Marc's avatar
Betoule Marc committed
61

62 63
    This is debugger compliant, so that exception in the segment
    execution can be tracked.
Maude Le Jeune's avatar
doc API  
Maude Le Jeune committed
64 65
    
    log_level values given by logging object: logging.DEBUG=10 logging.WARNING=30,
66 67
    logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.

Betoule Marc's avatar
Betoule Marc committed
68 69 70
    >>> T = pipeline.Pipeline("", code_dir='./',
    ...                       prefix='./', permissive=True)
    >>> w,t = launch_interactive(T, log_level=logging.ERROR)
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
71
    >>> w.run()
72
    """
Betoule Marc's avatar
Betoule Marc committed
73
    init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
74
    s  = scheduler.Scheduler(pipe)
Betoule Marc's avatar
Betoule Marc committed
75
    wl = init_logger('worker', get_log_file(pipe, 'worker'), level=log_level)
76
    w  = worker.InteractiveWorker(s, wl)
77 78
    import threading
    t = threading.Thread(target=s.run)
79
    t.start()
Betoule Marc's avatar
Betoule Marc committed
80
    return (w, t)
81

82

Betoule Marc's avatar
Betoule Marc committed
83
def launch_thread(pipe, n, log_level=logging.WARNING ):
84
    """ Launch a bunch of local workers in separate threads.
Betoule Marc's avatar
Betoule Marc committed
85

86 87 88 89
    This is SMP machine compliant. Exceptions araising in the
    execution of any segment are caught and the corresponding task is
    marked as failed.

Betoule Marc's avatar
Betoule Marc committed
90
    >>> T = pipeline.Pipeline("", permissive=True)
91 92
    >>> launch_thread(T, 2)
    """
Betoule Marc's avatar
Betoule Marc committed
93
    init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
94 95
    s = scheduler.Scheduler(pipe)
    for i in range(n):
Betoule Marc's avatar
Betoule Marc committed
96 97
        wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
                         level=log_level)
98
        w = worker.ThreadWorker(s, wl)
99 100 101
        w.start()
    s.run()

Betoule Marc's avatar
Betoule Marc committed
102

103
class SchedulerManager(BaseManager):
Betoule Marc's avatar
Betoule Marc committed
104
    """ Extension of the BaseManager class.
Maude Le Jeune's avatar
doc  
Maude Le Jeune committed
105
    """
106 107
    pass

Betoule Marc's avatar
Betoule Marc committed
108 109 110

def launch_process(pipe, n, address=('', 50000), authkey='secret',
                   log_level=logging.WARNING, nice=0):
111
    """ Launch a bunch of local workers in separate processes .
Betoule Marc's avatar
Betoule Marc committed
112

113 114 115 116
    This is usefull (compared to launch_thread) when the GIL becomes
    limitant, which is bound to be the case when most time is spend in
    pure python processings.

Betoule Marc's avatar
Betoule Marc committed
117
    >>> T = pipeline.Pipeline("", permissive=True)
118 119
    >>> launch_process(T, 2)
    """
Betoule Marc's avatar
Betoule Marc committed
120
    init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
121
    s = scheduler.Scheduler(pipe)
Betoule Marc's avatar
Betoule Marc committed
122
    SchedulerManager.register('get_scheduler', callable=lambda: s)
123

124 125 126 127 128
    mgr = SchedulerManager(address=address, authkey=authkey)
    mgr.start()

    processlist = []
    for i in range(n):
Betoule Marc's avatar
Betoule Marc committed
129 130 131 132
        wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
                         level=log_level)
        w = worker.ProcessWorker(address=address, authkey=authkey,
                                 logger=wl, nice=nice)
133 134 135 136 137
        w.start()
        processlist.append(w)

    sched_proxy = mgr.get_scheduler()
    sched_proxy.run()
Betoule Marc's avatar
Betoule Marc committed
138

139 140 141 142 143 144 145 146 147
    # joining Zombie process
    for w in processlist:
        w.join()


def _scp(file, dest):
    """ Wrapper around the scp command."""
    subprocess.Popen(['scp', file, dest]).communicate()[0]

Betoule Marc's avatar
Betoule Marc committed
148 149 150

def launch_ssh(pipe, host_list, address=None, authkey='secret',
               log_level=logging.ERROR):
151
    """ Launch a bunch of distant workers through ssh.
Betoule Marc's avatar
Betoule Marc committed
152

153 154 155 156
    This is used mainly for testing purposes. It can be usefull to
    distribute computations on a pool of machine that share an access
    to a NAS.

Betoule Marc's avatar
Betoule Marc committed
157 158
    >>> T = pipeline.Pipeline("", permissive=True)
    >>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000))
159
    """
Betoule Marc's avatar
Betoule Marc committed
160 161

    init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
162
    s = scheduler.Scheduler(pipe)
Betoule Marc's avatar
Betoule Marc committed
163 164 165

    SchedulerManager.register('get_scheduler', callable=lambda: s)

166 167 168 169 170
    mgr = SchedulerManager(address=address, authkey=authkey)
    mgr.start()

    processlist = []
    for i, h in enumerate(host_list):
Betoule Marc's avatar
Betoule Marc committed
171 172 173 174 175
        w = subprocess.Popen(
            ['ssh', h,
             "python -m pipelet.launchers -H %s -p %s -s %s"
             " -l %s" % (address[0], address[1], authkey,
                         get_log_file(pipe, 'worker%d' % i))])
176 177
        processlist.append(w)

Betoule Marc's avatar
Betoule Marc committed
178
    # 'launching the scheduler'
179 180
    sched_proxy = mgr.get_scheduler()
    sched_proxy.run()
Marc Betoule's avatar
Marc Betoule committed
181

182 183 184
    # joining Zombie process
    for w in processlist:
        w.wait()
Betoule Marc's avatar
Betoule Marc committed
185

Betoule Marc's avatar
Betoule Marc committed
186

Maude Le Jeune's avatar
Maude Le Jeune committed
187
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="",ppn=1, mem=4 ):
Maude Le Jeune's avatar
Maude Le Jeune committed
188
    """ Launch a bunch of distant workers through a PBS batch system.
Maude Le Jeune's avatar
Maude Le Jeune committed
189
    """
Maude Le Jeune's avatar
Maude Le Jeune committed
190 191 192 193 194 195 196 197
    
    init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
    s = scheduler.Scheduler(pipe)
    SchedulerManager.register('get_scheduler', callable=lambda:s)
    if server:
        mgr = SchedulerManager(address=address, authkey=authkey)
        mgr.start()
    processlist = []
lejeune's avatar
lejeune committed
198
    job_header0="#PBS -S /bin/bash"
Maude Le Jeune's avatar
Maude Le Jeune committed
199

Maude Le Jeune's avatar
Maude Le Jeune committed
200 201 202 203 204
    for i in range(n):
        jobfile = get_log_file (pipe, 'job%d'%i)
        errfile = jobfile.replace('job', 'err')
        logfile = jobfile.replace('job', 'log')
        f = file(jobfile, 'w')
lejeune's avatar
lejeune committed
205
        f.write (job_header0+"\n")
Maude Le Jeune's avatar
Maude Le Jeune committed
206 207 208
        f.write ("#PBS -o %s\n"%logfile)
        f.write ("#PBS -e %s\n"%errfile)
        f.write ("#PBS -N %s%d\n"%(job_name,i))
Maude Le Jeune's avatar
Maude Le Jeune committed
209
        f.write ("#PBS -l nodes=1:ppn=%d,walltime=%s,mem=%dgb\n"%(ppn, cpu_time,mem))
lejeune's avatar
lejeune committed
210
        f.write (job_header+"\n")
Maude Le Jeune's avatar
Maude Le Jeune committed
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
        f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
        f.close()
        subprocess.Popen(['qsub','-o' ,logfile, '-e',  errfile,jobfile]).communicate()[0]

    if server:
        print 'launching the scheduler'
        sched_proxy = mgr.get_scheduler()
        sched_proxy.run()                   

     

# def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", job_header="""
# #/bin/bash
# echo $PYTHONPATH
# """ ):
#     """ Launch a bunch of distant workers through a PBS batch system using mpirun.
    
#     """

#     server_file=job_header+"""
# #PBS -l walltime=%s
# echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
# pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
# %s
# EOF
# sleep 5
# mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
# """
#     import cPickle as pickle
#     pipedesc = pickle.dumps(pipe)
#     jobfile = get_log_file (pipe, '.job')
#     errfile = jobfile.replace('job', 'err')
#     logfile = jobfile.replace('job', 'log')

#     with closing(file(jobfile,'w')) as f:
#         f.write(server_file%(n, cpu_time, log_level, address[1], pipedesc, n, address[1], authkey, logfile))
#     subprocess.Popen(['qsub','-o' ,logfile, '-e',  errfile,jobfile]).communicate()[0]
Maude Le Jeune's avatar
Maude Le Jeune committed
248 249


Maude Le Jeune's avatar
Maude Le Jeune committed
250 251
     

Maude Le Jeune's avatar
Maude Le Jeune committed
252
def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, job_header="""
253
#PBS -q regular
Maude Le Jeune's avatar
Maude Le Jeune committed
254
#PBS -l nodes=%d:ppn=%d
Maude Le Jeune's avatar
Maude Le Jeune committed
255
""", carver=False, python_mpi=False):
256
     """ Launch a bunch of distant workers through a PBS batch system using aprun.
Maude Le Jeune's avatar
Maude Le Jeune committed
257 258
     """

259 260 261 262 263
#     server_file="""
##PBS -q regular
##PBS -l mppwidth=%d    
##PBS -l walltime=%s
#export CRAY_ROOTFS=DSL
Maude Le Jeune's avatar
Maude Le Jeune committed
264 265
#echo "export PIPELETD_HOST=$HOST" > ~/host_info.sh
#pipeletd -n -l %d -p %d -a $HOST << 'EOF' &
266 267 268 269 270 271 272 273
#%s
#EOF
#sleep 5
#aprun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
#"""

     server_file=job_header+"""
export CRAY_ROOTFS=DSL
Maude Le Jeune's avatar
Maude Le Jeune committed
274 275
echo "export PIPELETD_HOST=$HOST" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOST << 'EOF' &
Maude Le Jeune's avatar
Maude Le Jeune committed
276 277
%s
EOF
Maude Le Jeune's avatar
Maude Le Jeune committed
278
sleep 5
Maude Le Jeune's avatar
Maude Le Jeune committed
279
aprun -n %d -N %d -S %d %s $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
Maude Le Jeune's avatar
Maude Le Jeune committed
280
"""
Maude Le Jeune's avatar
Maude Le Jeune committed
281
     server_file_carver=job_header+"""
282 283 284
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
Maude Le Jeune's avatar
Maude Le Jeune committed
285 286
%s
EOF
Maude Le Jeune's avatar
Maude Le Jeune committed
287
sleep 5
Maude Le Jeune's avatar
Maude Le Jeune committed
288
mpirun -np %d %s $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
Maude Le Jeune's avatar
Maude Le Jeune committed
289 290 291
"""


Maude Le Jeune's avatar
Maude Le Jeune committed
292 293 294 295
     nnode=n/ppn
     if (n%ppn>0):
         nnode=nnode+1

Maude Le Jeune's avatar
Maude Le Jeune committed
296 297 298 299
     pyrun = "python"
     if python_mpi:
         pyrun = "python-mpi"

Maude Le Jeune's avatar
Maude Le Jeune committed
300 301 302 303 304 305
     import cPickle as pickle
     pipedesc = pickle.dumps(pipe)
     jobfile = get_log_file (pipe, '.job')
     errfile = jobfile.replace('job', 'err')
     logfile = jobfile.replace('job', 'log')

Maude Le Jeune's avatar
Maude Le Jeune committed
306 307 308
     if carver:
         server_file = server_file_carver
         with closing(file(jobfile,'w')) as f:
Maude Le Jeune's avatar
Maude Le Jeune committed
309
             f.write(server_file%(nnode, ppn, log_level, port, pipedesc, n, pyrun, port, authkey, logfile))
Maude Le Jeune's avatar
Maude Le Jeune committed
310 311
     else:
         with closing(file(jobfile,'w')) as f:
Maude Le Jeune's avatar
Maude Le Jeune committed
312
             f.write(server_file%(n, log_level, port, pipedesc, nnode, 24/ppn, 24/ppn/2, pyrun, port, authkey, logfile))
Maude Le Jeune's avatar
Maude Le Jeune committed
313

Maude Le Jeune's avatar
Maude Le Jeune committed
314 315
     subprocess.Popen(['qsub','-o' ,logfile, '-e',  errfile,jobfile]).communicate()[0]

Marc Betoule's avatar
Marc Betoule committed
316
_job_file = """
317
#/bin/zsh
318
export PATH=/afs/in2p3.fr/home/b/betoule/software/photcalib/tools:$PATH
319 320
echo $SHELL
echo $PYTHONPATH
321
python -m pipelet.launchers -H %s -p %s -s %s -l %s
Marc Betoule's avatar
Marc Betoule committed
322 323
"""

324 325 326 327
_server_file = """
#/bin/zsh
echo $SHELL
echo $PYTHONPATH
328 329 330
echo $USER
echo pouet > /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674
cat /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674
331 332
echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > ~/host_info.sh
source ~/host_info.sh
333
pipeletd -n -l %s -p $PIPELETD_PORT -a $PIPELETD_HOST << 'EOF' &
334 335
%s
EOF
336 337 338
echo "starting web server at $(date)"
pipeweb start -n &
echo "web server started at $(date)"
339
sleep 2
340
echo "launching worker at $(date)"
341
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s
342
echo "worker returned at $(date)
343 344 345 346 347 348 349 350 351 352 353
"""

_worker_file = """
#/bin/zsh
echo $SHELL
echo $PYTHONPATH
while [[ ! -e ~/host_info.sh ]]; do sleep 2; done;
source ~/host_info.sh
sleep 5
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s
"""
Marc Betoule's avatar
Marc Betoule committed
354

355
def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
Marc Betoule's avatar
Marc Betoule committed
356
                 job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs", 
Betoule Marc's avatar
Betoule Marc committed
357
                 cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.INFO,buff=None):
Marc Betoule's avatar
Marc Betoule committed
358 359 360
    """submit a bunch of distant workers to the BQS system
    at CCIN2P3.
    """
361 362 363 364 365 366 367 368 369 370 371 372 373
    if server:
        # Submit a job that will run the scheduler, a temporary web server and a single worker
        import cPickle as pickle
        if buff is not None:
            b = ['-b', buff]
        else:
            b = []
        name = '%s_sched'%(job_name,)
        pipedesc = pickle.dumps(pipe)
        jobfile = get_log_file (pipe, name+'.job')
        errfile = jobfile.replace('job', 'err')
        logfile = jobfile.replace('job', 'log')
        with closing(file(jobfile,'w')) as f:
374 375 376 377 378 379 380 381 382 383 384 385
            f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile))
        com = ['qsub',
               '-l', 'platform=%s'%plateform,
               '-l', 'T=%d'%cpu_time,
               '-l', 'M=%s'%mem,
               '-l', 'scratch=%s'%scratch,
               '-o', logfile,
               '-e', errfile,
               '-N', name,
               jobfile]
        print " ".join(com)
        subprocess.Popen(com).communicate()[0]
386 387 388 389

    import re
    l  = subprocess.Popen(['qjob', '-wide','-nh'], stdout=subprocess.PIPE).communicate()[0]
    existing_process = re.findall( '%s([0-9]*)'%job_name,l)
390
    existing_process = filter(lambda x: x!='', existing_process)
391 392 393 394 395 396 397 398 399 400
    try:
        starting_num = max([int(p) for p in existing_process])+1
    except:
        starting_num = 0
    for i in range(starting_num,starting_num+n):
        name = '%s%d'%(job_name,i)
        jobfile = get_log_file (pipe, name+'.job%d'%i)
        errfile = jobfile.replace('job', 'err')
        logfile = jobfile.replace('job', 'log')
        with closing(file(jobfile,'w')) as f:
401 402 403 404 405 406 407 408 409 410 411 412
            f.write(_worker_file%(authkey,logfile))
        com=['qsub',
             '-l', 'platform=%s'%plateform,
             '-l', 'T=%d'%cpu_time,
             '-l', 'M=%s'%mem,
             '-l', 'scratch=%s'%scratch,
             '-o', logfile,
             '-e', errfile,
             '-N', name,
             jobfile]
        print " ".join(com)
        subprocess.Popen(com).communicate()[0]
413 414 415

def launch_ccali_ge(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
                    job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs", 
416
                    cpu_time=1000, scratch="512MB", mem="512MB",project='snovae',server=False, log_level=logging.INFO,buff=None, ressource=['sps']):
417 418 419
    """submit a bunch of distant workers to the BQS system
    at CCIN2P3.
    """
Marc Betoule's avatar
Marc Betoule committed
420
    if server:
421
        # Submit a job that will run the scheduler, a temporary web server and a single worker
422 423 424 425 426
        import cPickle as pickle
        if buff is not None:
            b = ['-b', buff]
        else:
            b = []
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
        name = '%s_sched'%(job_name,)
        pipedesc = pickle.dumps(pipe)
        jobfile = get_log_file (pipe, name+'.job')
        errfile = jobfile.replace('job', 'err')
        logfile = jobfile.replace('job', 'log')
        with closing(file(jobfile,'w')) as f:
            f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile))
        com = ['qsub',
               '-P', 'P_%s'%project,
               '-l', 'ct=%d'%cpu_time,
               '-l', 'vmem=%s'%mem,
               '-l', 'fsize=%s'%scratch,
               '-o', logfile,
               '-e', errfile,
               '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
        print " ".join(com)
        subprocess.Popen(com).communicate()[0]
444

445
    import re
446
    l  = subprocess.Popen(['qstat', '-r'], stdout=subprocess.PIPE).communicate()[0]
447
    existing_process = re.findall( '%s([0-9]*)'%job_name,l)
448
    existing_process = filter(lambda x: x!='', existing_process)
449 450 451 452
    try:
        starting_num = max([int(p) for p in existing_process])+1
    except:
        starting_num = 0
453
    for i in range(starting_num,starting_num+n):
454
        name = '%s%d'%(job_name,i)
455
        jobfile = get_log_file (pipe, name+'.job%d'%i)
456 457
        errfile = jobfile.replace('job', 'err')
        logfile = jobfile.replace('job', 'log')
Marc Betoule's avatar
Marc Betoule committed
458
        with closing(file(jobfile,'w')) as f:
459 460 461 462 463 464 465 466 467 468 469 470
            f.write(_worker_file%(authkey,logfile))
        com=['qsub',
             '-P', 'P_%s'%project,
             '-l', 'ct=%d'%cpu_time,
             '-l', 'vmem=%s'%mem,
             '-l', 'fsize=%s'%scratch,
             '-o', logfile,
             '-e', errfile,
             '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
        print " ".join(com)
        subprocess.Popen(com).communicate()[0]

471
def launch_ccage_server (pipe, address=('127.0.0.1',5000),  authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='24:00:00', scratch="512M", mem="512M", ressource=['sps']):
Maude Le Jeune's avatar
Maude Le Jeune committed
472 473
    """ Launch a scheduler + web server through a SGE batch system @ccin2p3.                                   
    """
474
    server_file="""#! /bin/zsh -l 
475 476 477
echo $HOST
echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > $HOME/host_info.sh
source $HOME/host_info.sh
Maude Le Jeune's avatar
Maude Le Jeune committed
478 479 480
echo "starting web server at $(date)"
sleep 5
pipeweb start -n &
Maude Le Jeune's avatar
Maude Le Jeune committed
481
echo "web server returned at $(date)"
482 483 484 485
pipeletd -n -l %s -p $PIPELETD_PORT -a $HOST << 'EOF' 
%s
EOF
"""
Maude Le Jeune's avatar
Maude Le Jeune committed
486

487
    port = address[1]
Maude Le Jeune's avatar
Maude Le Jeune committed
488 489
    import cPickle as pickle
    pipedesc = pickle.dumps(pipe)
490
    name = 'sched'
Maude Le Jeune's avatar
Maude Le Jeune committed
491 492 493 494 495 496 497
    jobfile = get_log_file (pipe, name+'.job')
    errfile = jobfile.replace('job', 'err')
    logfile = jobfile.replace('job', 'log')
    
    with closing(file(jobfile,'w')) as f:
        f.write(server_file%(port, log_level, pipedesc))

498 499
    com=['qsub', 
         '-P', 'P_%s'%project,
500 501 502
         #'-q', 'demon', 
         #'-l', 'demon=1', 
         '-l', 'sps=1',
503 504 505 506 507 508 509 510
         '-l', 'fsize=%s'%scratch,
         '-l', 'vmem=%s'%mem,
         '-l', 'ct=%s'%cpu_time,
         '-o' ,logfile, 
         '-e',  errfile,
         '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
    subprocess.Popen(com).communicate()[0]

511
def launch_ccage_worker (pipe, n, address=('127.0.0.1',5000),  authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='01:00:00', scratch="512M", mem="512M", ressource=['sps'], multicores=False):
512 513
    """ Launch n worker using job array
    """        
514
    worker_file = """#! /bin/zsh -l 
515 516 517 518 519
echo $HOST
sleep 1
source $HOME/host_info.sh
echo "launching worker at $(date)"
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s$SGE_TASK_ID
Maude Le Jeune's avatar
Maude Le Jeune committed
520
echo "worker returned at $(date)"
521 522 523 524 525 526 527
"""
    name = 'worker'
    jobfile = get_log_file (pipe, name+'.job')
    errfile = jobfile.replace('job', 'err')
    logfile = jobfile.replace('job', 'log')
    with closing(file(jobfile,'w')) as f:
        f.write(worker_file%(authkey,logfile))
528 529 530 531
    if multicores:
        mc = ['-pe', 'multicores' , '%d'%multicores]
    else:
        mc = []
532 533 534 535 536 537 538 539
    com=['qsub', 
         '-P', 'P_%s'%project,
         '-l', 'fsize=%s'%scratch,
         '-l', 'vmem=%s'%mem,
         '-l', 'ct=%s'%cpu_time,
         '-t', '1-%d'%n,
         '-o' ,logfile, 
         '-e',  errfile,
540
         '-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
541
    subprocess.Popen(com).communicate()[0]
542

543

544 545 546 547
if __name__ == "__main__":
    import optparse
    parser = optparse.OptionParser()
    parser.add_option('-H', '--host', metavar='hostname',
Betoule Marc's avatar
Betoule Marc committed
548 549
                      help='hostame or adress of the scheduler server',
                      default=socket.gethostname())
550
    parser.add_option('-p', '--port', metavar='port',
Betoule Marc's avatar
Betoule Marc committed
551 552
                      help='port the scheduler is listenning to',
                      default=50000, type='int')
553 554
    parser.add_option('-s', '--secret', metavar='key',
                      help='authentication key', default='secret')
555 556
    parser.add_option('-l', '--logfile', metavar='logfile',
                      help='worker log filename')
Betoule Marc's avatar
Betoule Marc committed
557 558 559 560
#    parser.add_option('-L', '--log-level', metavar='level',
#                      type=
#                      help='worker log filename')

561
    (options, args) = parser.parse_args()
Betoule Marc's avatar
Betoule Marc committed
562 563
    # print (options.host, options.port)
    # print options.secret
564
    name  = options.logfile.split("_")[-1]
Betoule Marc's avatar
Betoule Marc committed
565 566 567 568
    wl    = init_logger(name, options.logfile)
    w = worker.ProcessWorker(address=(options.host, options.port),
                             authkey=options.secret, logger=wl)
    sys.argv = [sys.argv[0]]  # do not propage command line argument to script
569
    w.run()
Betoule Marc's avatar
Betoule Marc committed
570