Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 9ba53f9e authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'master' of gitlab.in2p3.fr:pipelet/pipelet

parents 4028134c aa5fd7fa
......@@ -88,7 +88,7 @@ Getting last pipelet version
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code:: bash
git clone git://gitorious.org/pipelet/pipelet.git
git clone https://gitlab.in2p3.fr/pipelet/pipelet.git
Installing Pipelet
++++++++++++++++++
......
......@@ -12,24 +12,23 @@
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import os.path as path
import subprocess
import threading
from glob import glob
from datetime import datetime
import logging
from utils import str_web
from contextlib import closing
import cPickle as pickle
from pipelet.utils import init_logger, close_logger
from pipelet.repository import get_git_revision
import traceback
import signal
from contextlib import closing
from datetime import datetime
import gc
import logging
from glob import glob
import inspect
import logging
import os.path as path
import pipelet
from pipelet.repository import get_git_revision
from pipelet.utils import init_logger, close_logger, str_web
import resource
import signal
import subprocess
import threading
from time import time as timer
import traceback
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
......@@ -39,6 +38,25 @@ class NullHandler(logging.Handler):
"""
pass
ressources = (
'ru_stime',
'ru_utime',
'ru_maxrss',
'ru_ixrss',
'ru_idrss',
'ru_isrss',
'ru_minflt',
'ru_majflt',
'ru_nswap',
'ru_inblock',
'ru_oublock',
'ru_msgsnd',
'ru_msgrcv',
'ru_nsignals',
'ru_nvcsw',
'ru_nivcsw',
)
used_ressource_id = [0, 1, 2, 6, 7, 9, 10, 14, 15]
class EnvironmentBase():
......@@ -282,7 +300,7 @@ class Environment(EnvironmentBase):
+glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y)))
def logged_subprocess(self, args, shell=False, except_on_failure=True, name=None, of=None):
def logged_subprocess(self, args, shell=False, except_on_failure=True, name=None, of=None, stat=False, cwd=None):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err. If shell is set to True, the command is run via a shell. If except_on_failure set to True, raise an exception when command execution return non zero
......@@ -301,8 +319,16 @@ class Environment(EnvironmentBase):
e = file(ef,'w')
e.write('#'+' '.join([str(a) for a in args])+'\n')
e.flush()
p=subprocess.Popen(args, stdout=o, stderr=e, shell=shell)
if stat:
res = resource.getrusage(resource.RUSAGE_CHILDREN)
wall_time = timer()
p=subprocess.Popen(args, stdout=o, stderr=e, shell=shell,cwd=cwd)
p.communicate()[0]
if stat:
wall_time = timer() - wall_time
res2 = resource.getrusage(resource.RUSAGE_CHILDREN)
resstr = 'ressources for process %s: walltime=%f, ' % (proc, wall_time)
self.logger.info(resstr + ', '.join(['%s=%f' % (ressources[k], res2[k] - res[k]) for k in used_ressource_id]))
if except_on_failure:
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, args[0])
......
......@@ -184,10 +184,7 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret',
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="",ppn=1, mem=4 ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
......@@ -198,17 +195,19 @@ echo $PYTHONPATH
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
job_header0="#PBS -S /bin/bash"
for i in range(n):
jobfile = get_log_file (pipe, 'job%d'%i)
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write (job_header0+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("#PBS -l nodes=1:ppn=%d,walltime=%s,mem=%dgb\n"%(ppn, cpu_time,mem))
f.write (job_header+"\n")
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
f.close()
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
......@@ -314,7 +313,6 @@ mpirun -np %d %s $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
_job_file = """
#/bin/zsh
export PATH=/afs/in2p3.fr/home/b/betoule/software/photcalib/tools:$PATH
......
from pipelet.environment import *
import os
class PlanckEnvironment(Environment):
""" Planck environment.
Planck is an ESA satellite aimed at providing a final mapping of
the cosmological anisotropies of temperature and
polarisation. It has been launch on May 14th 2009.
Two instruments are onboard, the LFI and HFI.
The scientific analysis of Planck data is performed within two
environments called DPC (Data Processing Center). The HFI DPC is
organised as an international consortium and coordinated from the
IAP (Paris, France) with the help of the IAS (Orsay, France).
The HFI DPC infrastructure includes many things from documents to
data management including also development tools.
This pipelet environment adapts the data management
component (DMC) by extending the pipelet data filename utility.
It is planned to link the pipelet instances to the DPC web
browsing tools : http://pipelines.planck.fr
"""
def my_function (self):
""" My function do nothing
"""
return
def get_data_fn (self, x, seg=None, dmc=False, grp=None):
""" Complete the filename with the path to the working
directory.
If dmc is True a DMC object name is returned
using grp as group name and an empty file pointing to the DMC
object is made.
Parameters
----------
x: string, filename suffix
grp: string, DMC group name
Returns
-------
string, filename
"""
## dmc object
if dmc:
x = x.split(".")[0] ## remove extension
x = x+"_"+self._worker.pipe._hashes[self._worker.task.seg] ## add key
if grp is None:
logger.error ("groupname is None for DMC object %s"%x)
import piolib
x = path.join(grp, x)
x = path.join(piolib.GetDataDB(), x)
locname = self._get_data_fn(x.replace("/", ":"), seg=seg)
os.system("touch %s"%locname)
return x
## local file
else:
return self._get_data_fn(x, seg=seg)
def glob_default(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx = self._worker.pipe.find_seg(self._worker.task.seg, x)
if segx is None:
self.logger.warning("No parent segment matching %s found"%x)
return glob(path.join(self._worker.pipe.get_data_dir(segx),y))\
+glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y)))
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
f = self.glob_default(x, y)
if not f:
self.logger.info("No file matching the extension, looking for DMC object")
z = y.split(".")[0]
z = "*:"+z
self.logger.info("New regexp is %s"%z)
f = self.glob_default(x, z)
lst = []
for fi in f:
lst.append(os.path.basename(fi).replace(":", "/"))
f = lst
return f
......@@ -147,4 +147,9 @@ h1{
font-weight: bold;
font-family:Helvetica,Arial,sans-serif;
font-variant: small-caps
}
.dutag{
font-weight: bold;
text-shadow:2px 2px #AFEEEE;
}
\ No newline at end of file
......@@ -32,6 +32,8 @@ from contextlib import closing
import pickle
import os
import sys
import subprocess
import threading
try:
from pipelet.ctools import clean_string
except ImportError:
......@@ -322,7 +324,39 @@ def parse_disk(pipedir):
return lstpipe
def show_disk_usage(pipedir, sqlfile=None, hide=False):
""" Show disk usage for each pipe instance
"""
lstpipe = parse_disk(pipedir)
if not hide:
ddu = dict()
for pipe in lstpipe:
p = subprocess.Popen(["du", "-sh", pipe], stdout=subprocess.PIPE)
out, err = p.communicate()
du = out.split('\t')[0]
ddu[pipe] = du
if sqlfile is None:
sqlfile = os.path.join(pipedir, ".sqlstatus")
conn_lock = threading.Lock()
conn = sqlite3.connect(sqlfile)
conn.text_factory=str
with conn_lock:
with conn:
for pipe in lstpipe:
seg_id,param = conn.execute('select seg_id,param from segments where curr_dir = ?',(pipe,)).fetchone()
if param:
already = len(param.split('<span class="dutag">'))>1
if already:
param = param.split('<span class="dutag">')[0]
if not hide:
param += '<span class="dutag"> %s </span>'%ddu[pipe]
elif not hide:
param = param + '<span class="dutag"> %s </span>'%ddu[pipe]
conn.execute('update segments set param=? where seg_id =?',(param, seg_id))
def rebuild_db_from_disk(pipedir, sqlfile=None):
""" Rebuild db from disk.
......@@ -351,8 +385,9 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
seg_depend_cache = {} ## store parents for each curr_dir
for curr_dir in lst_dir:
curr_dir = path.abspath(curr_dir)
#s = curr_dir.split("_")[-2].split("/")[-1] ## TODO replace with regexp -> done
#rec = re.search (r"/([a-zA-Z0-9]+)_([a-zA-Z0-9]_*)([a-zA-Z0-9]+)$", curr_dir)
#s = curr_dir.split("/")[-1]
#kk = s.split("_")[-1]
#s = s.replace ("_"+kk, "")
gr = re.search (r"/([a-zA-Z0-9]+)(_[a-zA-Z0-9])*_([a-zA-Z0-9]+)$", curr_dir).groups()[:-1] # remove segkey
s = gr[0]
for g in gr[1:]:
......
......@@ -17,6 +17,10 @@ To tag the last computed segment:
%prog --tag-last <sql_file> tag
To remove a tag
%prog --del-tag <sql_file> tag
To show disk usage
%prog --show-disk-usage=<root_dir>
To hide disk usage
%prog --hide-disk-usage=<root_dir>
""")
parser.add_option('-c', '--create-pipeline',
help='Create a new pipeline',)
......@@ -29,7 +33,9 @@ To remove a tag
parser.add_option('-f', '--force', action='store_true', default=False,
help='Actually perform dangerous actions (such as delete-task)',)
parser.add_option('-p', '--prefix',
help='Prefix of the pipeline',)
help='Prefix of the pipeline', default=None)
parser.add_option('--sql-file',
help='sql_file of the pipeline',)
parser.add_option('-a', '--add-user',
help='Setup acl, add a new user',)
parser.add_option('-d', '--del-user',
......@@ -40,7 +46,10 @@ To remove a tag
help='When creating a new user give him this level', default=1, type='int')
parser.add_option('-u', '--upgrade-config',
help='Upgrade config file for v1.1', default=False, action='store_true')
parser.add_option('--show-disk-usage',
help='Show disk usage of each pipeline instance under root', default=None, type='str')
parser.add_option('--hide-disk-usage',
help='Hide disk usage of each pipeline instance under root', default=None, type='str')
(options, args) = parser.parse_args()
......@@ -92,6 +101,13 @@ To remove a tag
seg_id = db_utils.del_tag(options.del_tag, args[0])
#print 'tag segment %d as %s'%(seg_id, args[0])
#db_utils.add_tag(options.tag_last, "%d"%seg_id, args[0])
elif options.show_disk_usage:
from pipelet import utils
utils.show_disk_usage(options.show_disk_usage, sqlfile=options.sql_file)
elif options.hide_disk_usage:
from pipelet import utils
utils.show_disk_usage(options.hide_disk_usage, sqlfile=options.sql_file, hide=True)
else:
if len(args) < 0:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment