Commit 02481b0c authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'worker_reshaping'

parents b5f2457f ecb0c504
......@@ -86,6 +86,10 @@ class Worker(object):
## list of temporary files
self._tmpf = []
def matplotlib_hook(self):
if self.pipe.matplotlib:
import matplotlib
matplotlib.use('Agg')
def run(self):
""" Start the worker.
......@@ -98,22 +102,15 @@ class Worker(object):
logger.warning( "check_in failed : stopping")
return
logger.info( 'checked in.')
if self.pipe.matplotlib:
import matplotlib
matplotlib.use('Agg')
self.matplotlib_hook()
n = 0
try:
n = 0
while(True):
task = self.scheduler.get_task()
for task in iter(self.scheduler.get_task, None):
if task is not None:
self.work_dir = self.pipe.get_curr_dir(task.seg)
try:
task = self.execute_task(task)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task)
break
task = self.execute_task(task)
if task.status == "done":
self.scheduler.task_done(task)
else:
......@@ -124,9 +121,37 @@ class Worker(object):
logger.info("%d jobs completed" % n)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task)
finally:
self.terminate()
def execute_task(self, task):
seg = task.seg
prod = task.task_input
self.make_dir(task)
glo, env = self.prepare_env(task)
code = self.pipe.repository.get_code_string(seg)
self.exec_code(code, glo, env)
self.task.task_output = env._close(glo)
return self.task
def exec_code(self, code, glo, env):
try:
exec (code, glo)
# We do not want to prevent Abortion
except AbortError, e:
raise e
# Other exception occuring in the segment are catched by default
except Exception:
etype, value, tb = traceback.sys.exc_info()
f = file(env._get_log_file(),"a")
traceback.print_exception(etype, value, tb, file=f)
f.close()
# TODO dump the glo for latter debugging
self.task.status = "failed"
else:
self.task.status = "done"
def terminate(self):
""" Disconnect from scheduler. """
self.scheduler.check_out()
......@@ -180,8 +205,6 @@ class Worker(object):
glo = env._get_namespace()
return (glo,env)
class InteractiveWorker(Worker):
""" Run segments in interactive mode.
......@@ -189,67 +212,22 @@ class InteractiveWorker(Worker):
session you may want to call self.terminate() to properly stop the
scheduler and update the database.
"""
def run(self):
""" Start the worker.
The worker execute the tasks given by its scheduler until the
queue is empty. The task status are updated after completion
or failure.
"""
logger.info( 'checking in ...')
if not self.scheduler.check_in():
logger.warning( "check_in failed : stopping")
return
logger.info( 'checked in.')
def matplotlib_hook(self):
if self.pipe.matplotlib_interactive:
import matplotlib
matplotlib.use('Agg')
try:
n = 0
while(True):
task = self.scheduler.get_task()
if task is not None:
self.work_dir = self.pipe.get_curr_dir(task.seg)
task = self.execute_task(task)
if task.status == "done":
self.scheduler.task_done(task)
else:
self.scheduler.task_failed(task)
self.task = None
n = n+1
else:
break
logger.info("%d jobs completed" % n)
except KeyboardInterrupt:
logger.warning( "Abort after catching signal" )
finally:
self.terminate()
def execute_task(self, task):
""" Execute code of segment task.
Update the task status.
The routine handles the creation of needed directories and
files.
Parameters
----------
task : task object
"""
def exec_code(self, code, glo, env):
""" Reimplement exec_code from Worker.
seg = task.seg
prod = task.task_input
self.make_dir(task)
(glo,env) = self.prepare_env(task)
self.task = task
code = self.pipe.repository.get_code_string(seg)
exec (code, glo)
self.task.task_output = env._close(glo)
task.status = "done" # set status
return task
Main difference is that exception occuring in the segment code
are not caught.
"""
exec(code, glo)
self.task.status = "done"
def terminate(self):
""" Try to properly stop the processing of the ongoing task after a
""" Try to stop the processing of the ongoing task after a
bug.
"""
self.scheduler.abort_scheduling(self.task)
......@@ -268,39 +246,6 @@ class ThreadWorker(Worker, threading.Thread):
threading.Thread.__init__(self)
Worker.__init__(self, *args, **keys)
def execute_task(self, task):
""" Execute code of segment task.
Update the task status.
The routine handles the creation of needed directories and
files.
Parameters
----------
task : task object
"""
seg = task.seg
prod = task.task_input
self.make_dir(task)
(glo,env) = self.prepare_env(task)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
exec (code, glo)
except Exception:
etype, value, tb = traceback.sys.exc_info()
f = file(env._get_log_file(),"a")
traceback.print_exception(etype, value, tb, file=f)
f.close()
task.status = "failed" # TODO dump the glo for latter debugging
else:
task.status = "done" # set status
if task.status == "failed":
return task
self.task.task_output = env._close(glo)
return task
from multiprocessing.managers import BaseManager
from multiprocessing import Process
......@@ -333,40 +278,6 @@ class ProcessWorker(Worker, Process):
signal.signal(signal.SIGINT, catch_sigterm)
def execute_task(self, task):
""" Execute code of segment task.
Update the task status.
The routine handles the creation of needed directories and
files.
Parameters
----------
task : task object
"""
seg = task.seg
prod = task.task_input
self.make_dir(task)
(glo,env) = self.prepare_env(task)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
exec (code, glo)
except AbortError, e:
raise e
except Exception:
etype, value, tb = traceback.sys.exc_info()
f = file(env._get_log_file(),"a")
traceback.print_exception(etype, value, tb, file=f)
f.close()
# TODO dump the glo for latter debugging
task.status = "failed"
else:
task.status = "done" # set status
self.task.task_output = env._close(glo)
if task.status == "failed":
return task
return task
class _FakeWorker(ProcessWorker):
""" Used for performance tests.
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment