Commit ac79ba61 authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' of gitorious.org:pipelet/pipelet

parents 8b900c58 7c86d660
......@@ -20,10 +20,8 @@ from pipeline import *
from task import *
from environment import *
__version__='2dc35f51f3dd80b94e009cf75ffc0c584a7a75bd'
__version__='2dc35f51f3dd80b94e009cf75ffc0c584a7a75bd'
__version__='daf472835097e279b933bace5a8406076ace01a0'
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
__version__='daf472835097e279b933bace5a8406076ace01a0'
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
......@@ -2,7 +2,11 @@ try:
import sqlite3
except ImportError:
import pysqlite2 as sqlite3
from glob import glob
import os
from contextlib import closing
import pickle
import shutil
def old_to_new (old_file, new_file):
""" Convert an old database to the new database format."""
conn1 = sqlite3.connect(old_file)
......@@ -32,3 +36,315 @@ def change_root(sql_file,old_prefix, new_prefix):
with conn1:
conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix))
conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
def deltask(db_file, lst_task, report_only=False):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
Parameters
----------
db_file: string name of the database
lst_task: list of task ids
"""
import shutil
# remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
print "removing task %s from db"%taskid
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
# delete from tasks_relations
if not report_only:
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
# mark child tasks for deletion
children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall()
# delete from tasks
if not report_only:
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
print 'Task %s removed from db'%taskid
else:
print 'Task %s would be removed from db'%taskid
conn.close()
try:
print "Removing directory %s"%str_input
if not report_only:
shutil.rmtree(str_input)
print "%s removed"%str_input
else:
print "%s would be removed"%str_input
except OSError:
print "Failed to remove %s"%str_input
if children:
print "Adding children of %s"%taskid
lst_task += [c[0] for c in children]
lst_task = list(set(lst_task))
lst_task.sort()
if lst_task:
deltask(db_file, lst_task, report_only=report_only)
def get_lst_tag (db_file):
""" Return the list of existing tags
Tags are ; separated in the db.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
lst = []
with conn:
l = conn.execute('select tag from segments').fetchall()
for s in l:
if s[0] is not None:
str_tag = s[0].split(";")
for e in str_tag:
if e:
lst.append(e)
return lst
def get_lst_seg (db_file):
""" Return the list of segment name
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
lst = []
with conn:
l = conn.execute('select distinct seg from segments').fetchall()
lst = [s[0] for s in l]
return lst
def get_lst_date (db_file):
""" Return the list of existing dates
Date strings are picked from queued_on field corresponding to the
first task of each segment.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select queued_on from tasks group by queued_on').fetchall()
return l[0]
def add_tag (db_file, segid, tag):
""" Add new tag to the database
Check that the tag do not exist yet for this id
Parameters
----------
segid: string. List of seg_id - separated. (- at the end of string also)
tag: string. Tags are ; separated in the db.
"""
seglst = segid.split('-')
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
for segid in seglst:
if segid:
with conn:
l = conn.execute('select tag,curr_dir from segments where seg_id=?',(segid,)).fetchone()
lst_tag = []
if l[0] is not None and l:
lst_tag = l[0].split(";")
lst_tag.append(tag)
lst_tag = list(set(lst_tag))
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid))
fn = glob(os.path.join(l[1], "*.meta"))
_update_meta(fn, str_tag)
def _update_meta(fn, str_tag):
""" Update meta file with current tag value.
fn: string, meta file name
tag: string, tag string
"""
if fn:
with closing(file(fn[0], 'r')) as f:
d = pickle.load(f)
d['tag'] = str_tag
with closing(file(fn[0], 'w')) as f:
r = pickle.dump(d,f)
def del_tag (db_file, tag):
""" Delete tag from the database
Parameters
----------
tag: string.
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select seg_id, tag, curr_dir from segments where tag like ?',("%"+tag+"%",)).fetchall()
for s in l:
lst_tag = s[1].split(";")
lst_tag.remove(tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0]))
fn = glob(os.path.join(s[2], "*.meta"))
_update_meta(fn, str_tag)
def _delseg(db_file, lst_seg):
""" Delete a pipeline instance.
Delete all segments and products directories of a pipeline instance.
Parameters
----------
lst_seg: integer list
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
segid = lst_seg[0]
with conn:
currdir = conn.execute('select curr_dir from segments where seg_id = ?',(int(segid),)).fetchone()[0]
l = conn.execute('select seg_id from segments where curr_dir like ?',(currdir+'%',))
for e in l:
lst_seg.remove(e[0])
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id in (select task_id from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?))',(currdir+'%',))
## delete from tasks
l = conn.execute('delete from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments_relations
l = conn.execute('delete from segment_relations where child_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments
l = conn.execute('delete from segments where curr_dir like ?',(currdir+'%',))
conn.close()
try:
shutil.rmtree(currdir)
except:
pass
if lst_seg:
_delseg(db_file, lst_seg)
def _deltask(db_file, lst_task):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
Parameters
----------
lst_tag: string, list of pipe id '-' separated
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
## delete from tasks
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
conn.close()
try:
shutil.rmtree(str_input)
except:
pass
if lst_task:
_deltask(db_file, lst_task)
def _get_fathers(db_file, segid):
""" Append upstream pipeline instance paths to the current path.
Return a list which contains all upstream segment instances
id for a given segment instance. This is used to print a pipeline
tree view with all dependencies.
Parameters
----------
segid: id of the leaf segment.
Returns
-------
list of segid, for the upstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select father_id from segment_relations where child_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_fathers(db_file,l[0])
return lstid
def _get_children(db_file, segid):
""" Return the list of pipeline instance paths which depend on seg
Return a list which contains all downstream segment instances
id for a given segment instance. This is used to delete all dependencies.
Parameters
----------
segid: id of the segment.
Returns
-------
list of segid, for the downstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from segment_relations where father_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children(db_file, l[0])
return lstid
def _get_children_task(db_file, taskid):
""" Return the list of tasks which depend on task.
Return a list which contains all downstream task instances
id for a given task instance. This is used to delete all dependencies.
Parameters
----------
taskid: id of the task
Returns
-------
list of taskid, for the downstream task instances.
"""
lstid = [int(taskid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from task_relations where father_id = ?'
,(taskid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children_task(db_file, l[0])
return lstid
......@@ -285,6 +285,8 @@ class Environment(EnvironmentBase):
elif isinstance(segs,str):
segs = [segs]
res=[]
if not isinstance (segs, list):
self.logger.critical ("can't convert string to list")
for x in segs:
segx = self._worker.pipe.find_seg(self._worker.task.seg, x)
if segx is None:
......@@ -338,7 +340,7 @@ class Environment(EnvironmentBase):
"""
proc = args[0]
if name is None:
name = proc
name = path.basename(proc)
of = self.get_data_fn(name+'.log')
ef = self.get_data_fn(name+'.err')
o = file(of,'w')
......@@ -581,6 +583,7 @@ class Environment(EnvironmentBase):
else:
res = self.seg_output
close_logger (self.logger)
glo.clear() ## empty segment workspace
return res
class SandBoxEnv(Environment):
......
......@@ -204,17 +204,16 @@ def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_lev
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
if server:
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
jobfile = get_log_file (pipe, 'job%d'%i)
errfile = jobfile.replace('job', 'err')
......@@ -227,29 +226,74 @@ echo $PYTHONPATH
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
f.close()
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
if server:
print 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
def launch_nersc (pipe, n, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00"):
""" Launch a bunch of distant workers through a PBS batch system using aprun.
# def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", job_header="""
# #/bin/bash
# echo $PYTHONPATH
# """ ):
# """ Launch a bunch of distant workers through a PBS batch system using mpirun.
# """
# server_file=job_header+"""
# #PBS -l walltime=%s
# echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
# pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
# %s
# EOF
# sleep 5
# mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
# """
# import cPickle as pickle
# pipedesc = pickle.dumps(pipe)
# jobfile = get_log_file (pipe, '.job')
# errfile = jobfile.replace('job', 'err')
# logfile = jobfile.replace('job', 'log')
# with closing(file(jobfile,'w')) as f:
# f.write(server_file%(n, cpu_time, log_level, address[1], pipedesc, n, address[1], authkey, logfile))
# subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
def launch_nersc (pipe, n, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00", job_header="""
#PBS -q regular
#PBS -l nodes=%d:ppn=1
"""):
""" Launch a bunch of distant workers through a PBS batch system using aprun.
"""
server_file="""
#PBS -l mppwidth=%d
# server_file="""
##PBS -q regular
##PBS -l mppwidth=%d
##PBS -l walltime=%s
#export CRAY_ROOTFS=DSL
#echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
#pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
#%s
#EOF
#sleep 5
#aprun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
#"""
server_file=job_header+"""
#PBS -l walltime=%s
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOST" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOST << 'EOF' &
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
%s
EOF
sleep 5
aprun -n %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
"""
import cPickle as pickle
......@@ -263,8 +307,6 @@ aprun -n %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
_job_file = """
#/bin/zsh
export PATH=/afs/in2p3.fr/home/b/betoule/software/photcalib/tools:$PATH
......@@ -496,6 +538,7 @@ echo "worker returned at $(date)"
if __name__ == "__main__":
import optparse
import sys
parser = optparse.OptionParser()
parser.add_option('-H', '--host', metavar='hostname',
help='hostame or adress of the scheduler server', default=socket.gethostname())
......@@ -511,6 +554,7 @@ if __name__ == "__main__":
name = options.logfile.split("_")[-1]
wl = init_logger (name, options.logfile)
w = worker.ProcessWorker(address=(options.host,options.port), authkey=options.secret,logger=wl)
sys.argv=[sys.argv[0]] ## do not propage command line argument to segment script
w.run()
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
try:
import sqlite3
except ImportError:
import pysqlite2 as sqlite3
import cPickle as pickle
import os
import glob
import cherrypy
import auth
from pipelet.web import TreeView
from copy import copy
from operator import itemgetter, attrgetter
class PBView(TreeView):
""" A pipeline Web interface dedicated to Polarbear experiment
It extends the minimalist tree view with a search object panel.
"""
def __init__ (self,db_file, name):
TreeView.__init__(self, db_file, name)
self.js_file += ["../static/pb.js"]
self.lst_ids = self.get_ids()
self.lst_mjd,self.lst_patch,self.lst_obs = self.get_mjd()
@cherrypy.expose
@auth.pipeauth.read_access()
def index(self, highlight=None, thumbnail=None):
html = TreeView.index(self, highlight=highlight, thumbnail=thumbnail)
self.lst_ids = self.get_ids()
self.lst_mjd,self.lst_patch,self.lst_obs = self.get_mjd()
return html
def get_ids (self):
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l1 = conn.execute('select seg_id,seg,param from segments where seg="stimtau"').fetchall()
l2 = conn.execute('select seg_id,seg,param from segments where seg="gatherstim"').fetchall()
conn.close()
ll = []
for e in l1+l2:
if e[2] is not None:#len(e[2])>41:
ll.append (e[1]+'@'+e[2][-41:])
else:
ll.append(e[1]+'@'+str(e[0]))
if not hasattr (self, "selected_id") or self.selected_id not in ll:
self.selected_id = l1[0][0]
self.selected_seg= l1[0][1]
self.selected_idtxt= ll[0]
return ll
def get_mjd (self):
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
ll = []
with conn:
l = conn.execute('select seg_id from segments where seg like "unpack%"').fetchall()
for s in l:
ll += conn.execute('select input,str_input from tasks where seg_id=? and status="done"',(s[0],)).fetchall()
conn.close()
lst_mjd = []
lst_patch= []
lst_dir = []
for e in ll:
i = pickle.loads(e[0])
lst_mjd += [k[0] for k in i.values()]
lst_patch+= [k[1] for k in i.values()]
tmplst = os.listdir(e[1])
lst_dir += [f for f in tmplst
if os.path.isdir(os.path.join(e[1],f)) ]
lst_mjd = list(set(lst_mjd))
lst_patch= list(set(lst_patch))
lst_dir = list(set(lst_dir))
lst_mjd = sorted(lst_mjd)
lst_dir = sorted(lst_dir)
return (lst_mjd,lst_patch, lst_dir)
@cherrypy.expose
@auth.pipeauth.read_access()
def get_folder (self, mjd):
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str
segid = self.selected_id
with conn:
#l = conn.execute('select seg_id from segments where seg="stimtau"').fetchall()
#segid = l[0][0]
sqlr = 'select input,str_input from tasks where seg_id=%s and status="done" and input like "XXX%sXXX"'%(segid,mjd)
sqlr = sqlr.replace("XXX", "%")
l = conn.execute(sqlr).fetchall()
conn.close()
ll = ["stim"+str(pickle.loads(e[0]).values()[0][0]) for e in l]
self.lst_obs = ll
self.selected_mjd = mjd
raise cherrypy.HTTPRedirect('/'+self.name+'/',303)
@cherrypy.expose
@auth.pipeauth.read_access()
def set_id (self, idtxt):
conn = sqlite3.connect(self.db_file,check_same_thread=True)
conn.text_factory=str