Commit dc69c64a authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' of lpnp204.in2p3.fr:/home/betoule/soft/pipelet

Conflicts:
	pipelet/web.py
parents a975fe41 20d6b32f
......@@ -32,3 +32,53 @@ def change_root(sql_file,old_prefix, new_prefix):
with conn1:
conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix))
conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
def deltask(db_file, lst_task, report_only=False):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
Parameters
----------
db_file: string name of the database
lst_task: list of task ids
"""
import shutil
# remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
print "removing task %s from db"%taskid
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
# delete from tasks_relations
if not report_only:
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
# mark child tasks for deletion
children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall()
# delete from tasks
if not report_only:
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
print 'Task %s removed from db'%taskid
else:
print 'Task %s would be removed from db'%taskid
conn.close()
try:
print "Removing directory %s"%str_input
if not report_only:
shutil.rmtree(str_input)
print "%s removed"%str_input
else:
print "%s would be removed"%str_input
except OSError:
print "Failed to remove %s"%str_input
if children:
print "Adding children of %s"%taskid
lst_task += [c[0] for c in children]
lst_task = list(set(lst_task))
lst_task.sort()
if lst_task:
deltask(db_file, lst_task, report_only=report_only)
......@@ -585,3 +585,36 @@ class Environment(EnvironmentBase):
close_logger (self.logger)
glo.clear() ## empty segment workspace
return res
class SandBoxEnv(Environment):
def _save_param(self, seg, glo,param_name='*'):
pass
def _make_tag(self, seg, glo):
pass
def get_tmp_fn(self):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf = os.tmpnam()
self._tmpf.append(tf)
return tf
def __init__(self, w):
""" Initialize the base environment with task input.
Parameters
----------
w: a worker instance
"""
self._worker = w
self.seg_input = w.task.task_input
## list of temporary files
self._tmpf = []
self.logger = logging.getLogger('sandbox')
......@@ -125,7 +125,7 @@ class SchedulerManager(BaseManager):
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING ):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING, nice=0):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -147,7 +147,7 @@ def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=loggi
processlist = []
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl, nice=nice)
w.start()
processlist.append(w)
......
......@@ -354,10 +354,8 @@ class Web:
e = [tcounts[i] for i in index[s[2]]]
except IndexError:
e = []
ss = '%s'%("" if str(s[3])=='None' else s[3]) + '<span class="pipetag">%s</span>'%(" " if str(s[4])=='None' else s[4])
if ss is None:
ss = ""
ss = '<a class="icon image" href="index?thumbnail=%d"></a>'%(s[2])+ss
for stat in e:
ss = '<a href="product?segid=%s&status=%s" class=%s>%d</a>, '%(s[2],stat[1], stat[1], stat[2]) + ss
......
......@@ -89,9 +89,10 @@ class Worker(object):
self.logger.addHandler(h)
def matplotlib_hook(self):
""" Turn the matplotlib backend to Agg.
""" Turn the matplotlib backend to Agg.
"""
if self.pipe.matplotlib:
self.logger.info('Turning matplotlib backend to Agg')
import matplotlib
matplotlib.use('Agg')
......@@ -296,7 +297,7 @@ class ProcessWorker(Worker, Process):
It access the scheduler through the use of managers
"""
def __init__(self, address=('', 50000), authkey="secret", **keys):
def __init__(self, address=('', 50000), authkey="secret", nice=0, **keys):
""" Initialize a process worker
"""
mgr = SchedulerManager(address=address, authkey=authkey)
......@@ -324,8 +325,28 @@ class ProcessWorker(Worker, Process):
# GE send XCPU and XFSZ for soft limit exceed
signal.signal(signal.SIGXCPU, catch_sigterm)
signal.signal(signal.SIGXFSZ, catch_sigterm)
self.nice = nice
def run(self):
os.nice(self.nice)
Worker.run(self)
class SandBox(Worker):
""" Provide a mean to come back in the execution state of a segment
for latter debugging.
"""
def __init__(self, pipeline):
self.pipe=pipeline
def setup(self, seg, global_env):
""" Setup the working environment for segment seg.
"""
self.work_dir = self.pipe.get_curr_dir(seg)
task = Task(seg)
glo, env = self.prepare_env(task)
global_env.update(glo)
class _FakeWorker(ProcessWorker):
""" Used for performance tests.
......
......@@ -2,9 +2,13 @@
def main():
import optparse
parser = optparse.OptionParser(usage="\nTo create a new pipeline:\n %prog -c <pipename> [-p <prefix>]\nTo activate acl and setup a new user:\n %prog -a <username> [-l <access_level>] <sql_file>\nTo suppress an existing user:\n %prog -d <username> <sql_file>\nTo change the data root directory : %prod -r old_dir new_dir <sql_file>")
parser = optparse.OptionParser(usage="\nTo create a new pipeline:\n %prog -c <pipename> [-p <prefix>]\nTo activate acl and setup a new user:\n %prog -a <username> [-l <access_level>] <sql_file>\nTo suppress an existing user:\n %prog -d <username> <sql_file>\nTo change the data root directory :\n %prog -r old_dir new_dir <sql_file>\nTo delete a selection of task per id:\n %prog -t <sql_file> [-f] task_id1 [task_id2 ...]")
parser.add_option('-c', '--create-pipeline',
help='Create a new pipeline',)
parser.add_option('-t', '--delete-task',
help='Delete the given tasks (and their children if any)',)
parser.add_option('-f', '--force', action='store_true', default=False,
help='Actually perform dangerous actions (such as delete-task)',)
parser.add_option('-p', '--prefix',
help='Prefix of the pipeline',)
parser.add_option('-a', '--add-user',
......@@ -20,9 +24,6 @@ def main():
(options, args) = parser.parse_args()
if len(args) < 0:
parser.print_usage()
exit(-1)
from pipelet import auth
......@@ -48,6 +49,18 @@ def main():
elif options.upgrade_config:
from pipelet import utils
utils.upgrade_webconfig()
elif options.delete_task:
from pipelet import db_utils
if not args:
parser.print_usage()
exit(-1)
db_utils.deltask(options.delete_task, args, report_only=not options.force)
else:
if len(args) < 0:
parser.print_usage()
exit(-1)
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment