Commit 2970e3a2 authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' of lpnp204.in2p3.fr:/home/betoule/soft/pipelet

parents 36af46c4 6d62b4f3
......@@ -20,3 +20,10 @@ from pipeline import *
from task import *
from environment import *
__version__='2dc35f51f3dd80b94e009cf75ffc0c584a7a75bd'
__version__='2dc35f51f3dd80b94e009cf75ffc0c584a7a75bd'
__version__='daf472835097e279b933bace5a8406076ace01a0'
__version__='daf472835097e279b933bace5a8406076ace01a0'
......@@ -582,3 +582,36 @@ class Environment(EnvironmentBase):
res = self.seg_output
close_logger (self.logger)
return res
class SandBoxEnv(Environment):
def _save_param(self, seg, glo,param_name='*'):
pass
def _make_tag(self, seg, glo):
pass
def get_tmp_fn(self):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf = os.tmpnam()
self._tmpf.append(tf)
return tf
def __init__(self, w):
""" Initialize the base environment with task input.
Parameters
----------
w: a worker instance
"""
self._worker = w
self.seg_input = w.task.task_input
## list of temporary files
self._tmpf = []
self.logger = logging.getLogger('sandbox')
......@@ -125,7 +125,7 @@ class SchedulerManager(BaseManager):
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING ):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING, nice=0):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -147,7 +147,7 @@ def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=loggi
processlist = []
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl, nice=nice)
w.start()
processlist.append(w)
......
......@@ -89,9 +89,10 @@ class Worker(object):
self.logger.addHandler(h)
def matplotlib_hook(self):
""" Turn the matplotlib backend to Agg.
""" Turn the matplotlib backend to Agg.
"""
if self.pipe.matplotlib:
self.logger.info('Turning matplotlib backend to Agg')
import matplotlib
matplotlib.use('Agg')
......@@ -299,7 +300,7 @@ class ProcessWorker(Worker, Process):
It access the scheduler through the use of managers
"""
def __init__(self, address=('', 50000), authkey="secret", **keys):
def __init__(self, address=('', 50000), authkey="secret", nice=0, **keys):
""" Initialize a process worker
"""
mgr = SchedulerManager(address=address, authkey=authkey)
......@@ -327,8 +328,28 @@ class ProcessWorker(Worker, Process):
# GE send XCPU and XFSZ for soft limit exceed
signal.signal(signal.SIGXCPU, catch_sigterm)
signal.signal(signal.SIGXFSZ, catch_sigterm)
self.nice = nice
def run(self):
os.nice(self.nice)
Worker.run(self)
class SandBox(Worker):
""" Provide a mean to come back in the execution state of a segment
for latter debugging.
"""
def __init__(self, pipeline):
self.pipe=pipeline
def setup(self, seg, global_env):
""" Setup the working environment for segment seg.
"""
self.work_dir = self.pipe.get_curr_dir(seg)
task = Task(seg)
glo, env = self.prepare_env(task)
global_env.update(glo)
class _FakeWorker(ProcessWorker):
""" Used for performance tests.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment