Commit 77f4cb7a authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

use mpi in pbs launcher

parent bc9aec35
......@@ -199,41 +199,34 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
""" Launch a bunch of distant workers through a PBS batch system using mpirun.
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
if server:
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
jobfile = get_log_file (pipe, 'job%d'%i)
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
f.close()
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
server_file=job_header+"""
#PBS -l walltime=%s
echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
%s
EOF
sleep 5
mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
"""
import cPickle as pickle
pipedesc = pickle.dumps(pipe)
jobfile = get_log_file (pipe, '.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(server_file%(n, cpu_time, log_level, address[1], pipedesc, n, address[1], authkey, logfile))
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
if server:
print 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
def launch_nersc (pipe, n, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00", job_header="""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment