Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 2eda15e0 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'master' of gitorious.org:pipelet/pipelet

Conflicts:
	pipelet/__init__.py
parents 354b21c6 6d2e61aa
......@@ -10,7 +10,7 @@
__all__=['environment', 'scheduler','worker', 'repository', 'tracker', 'pipeline', 'task']
__all__=['environment', 'scheduler','worker', 'repository', 'tracker', 'pipeline', 'task', 'launchers']
from scheduler import *
from worker import *
......@@ -19,3 +19,5 @@ from tracker import *
from pipeline import *
from task import *
from environment import *
from launchers import *
......@@ -282,7 +282,7 @@ class Environment(EnvironmentBase):
+glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y)))
def logged_subprocess(self, args, shell=False, except_on_failure=True, name=None):
def logged_subprocess(self, args, shell=False, except_on_failure=True, name=None, of=None):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err. If shell is set to True, the command is run via a shell. If except_on_failure set to True, raise an exception when command execution return non zero
......@@ -294,7 +294,8 @@ class Environment(EnvironmentBase):
proc = args[0]
if name is None:
name = path.basename(proc)
of = self.get_data_fn(name+'.log')
if of is None:
of = self.get_data_fn(name+'.log')
ef = self.get_data_fn(name+'.err')
o = file(of,'w')
e = file(ef,'w')
......
......@@ -250,10 +250,10 @@ echo $PYTHONPATH
def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00", job_header="""
def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, job_header="""
#PBS -q regular
#PBS -l nodes=%d:ppn=%d
"""):
""", carver=False, python_mpi=False):
""" Launch a bunch of distant workers through a PBS batch system using aprun.
"""
......@@ -262,8 +262,8 @@ def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_",
##PBS -l mppwidth=%d
##PBS -l walltime=%s
#export CRAY_ROOTFS=DSL
#echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
#pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
#echo "export PIPELETD_HOST=$HOST" > ~/host_info.sh
#pipeletd -n -l %d -p %d -a $HOST << 'EOF' &
#%s
#EOF
#sleep 5
......@@ -271,28 +271,47 @@ def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_",
#"""
server_file=job_header+"""
#PBS -l walltime=%s
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOST" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOST << 'EOF' &
%s
EOF
sleep 5
aprun -n %d -N %d -S %d %s $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
"""
server_file_carver=job_header+"""
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
%s
EOF
sleep 5
mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
mpirun -np %d %s $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
"""
nnode=n/ppn
if (n%ppn>0):
nnode=nnode+1
pyrun = "python"
if python_mpi:
pyrun = "python-mpi"
import cPickle as pickle
pipedesc = pickle.dumps(pipe)
jobfile = get_log_file (pipe, '.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(server_file%(nnode, ppn, cpu_time, log_level, port, pipedesc, n, port, authkey, logfile))
if carver:
server_file = server_file_carver
with closing(file(jobfile,'w')) as f:
f.write(server_file%(nnode, ppn, log_level, port, pipedesc, n, pyrun, port, authkey, logfile))
else:
with closing(file(jobfile,'w')) as f:
f.write(server_file%(n, log_level, port, pipedesc, nnode, 24/ppn, 24/ppn/2, pyrun, port, authkey, logfile))
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
......
......@@ -75,7 +75,7 @@ class Pipeline:
def __init__(self, seg_list, code_dir='./',
prefix='./', sqlfile=None,
matplotlib=False, matplotlib_interactive=False,
env=Environment, permissive=False):
env=Environment, permissive=False, abspath=False):
""" Initialize a pipeline.
seg_list : either a list of string (the list of segment for a
......@@ -110,7 +110,10 @@ class Pipeline:
## string, indicates where to save the pipeline products.
os.umask(002)
self._prefix = path.realpath(prefix)
if abspath:
self._prefix = path.abspath(prefix)
else:
self._prefix = path.realpath(prefix)
if not os.path.exists(prefix):
os.mkdir(prefix)
logdir = os.path.join(prefix, "log")
......
......@@ -531,6 +531,8 @@ Delete products directories of a given task id. The list of task id is '-' separ
else:
if os.path.splitext(filepath)[1] in ['.log','.txt','.list','.py']:
return serve_file(filepath, content_type='text/plain', disposition="inline")
elif os.path.splitext(filepath)[1] in ['.fits','.fz']:
return serve_file(filepath, content_type='application/fits', disposition="inline")
elif os.path.splitext(filepath)[1] in ['.pkl', '.meta', '.args', '.version']:
return self.serve_pickle(filepath)
else:
......
##multiplex cross_prod group_by "0"
p = glob_seg('third', 'Preambule.txt')
p = glob_seg('phird', 'Preambule.txt')
t = glob_seg('second','test.txt')
import subprocess
bla = 9
......
......@@ -3,8 +3,8 @@ from pipelet.launchers import launch_process, launch_interactive
import os
pipedot = """
phird -> fourth;
first -> second -> fourth;
third -> fourth;
"""
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment