Commit d312afd4 authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge remote branch 'origin/worker_reshaping'

parents df618d0b ec263dac
...@@ -73,7 +73,7 @@ class Scheduler(): ...@@ -73,7 +73,7 @@ class Scheduler():
## lock on success ## lock on success
self.success_lock = threading.Lock() self.success_lock = threading.Lock()
## lock on worker ## lock on worker
self.nb_worker_lock = threading.Lock() self.nb_worker_lock = threading.RLock()
## number of worker ## number of worker
self.nb_worker = 0 self.nb_worker = 0
## stoppping event ## stoppping event
...@@ -114,6 +114,7 @@ class Scheduler(): ...@@ -114,6 +114,7 @@ class Scheduler():
logger.info('%d workers registered'%self.nb_worker) logger.info('%d workers registered'%self.nb_worker)
if self.nb_worker == 0: if self.nb_worker == 0:
self.stop_event.set() self.stop_event.set()
self.abort_scheduling(None)
def put_task(self, t): def put_task(self, t):
""" Add tasks to the queue. """ Add tasks to the queue.
...@@ -275,8 +276,9 @@ class Scheduler(): ...@@ -275,8 +276,9 @@ class Scheduler():
""" """
self.tracker.update_status(t,'queued') self.tracker.update_status(t,'queued')
self.task_queue.put_task(t) if not self.abort:
self.task_done() self.task_queue.put(t)
self.task_queue.task_done()
def abort_scheduling(self, t): def abort_scheduling(self, t):
""" Abort the processing. """ Abort the processing.
......
...@@ -121,7 +121,7 @@ class Worker(object): ...@@ -121,7 +121,7 @@ class Worker(object):
logger.info("%d jobs completed" % n) logger.info("%d jobs completed" % n)
except AbortError, e: except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal) logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task) self.scheduler.requeue(task)
finally: finally:
self.terminate() self.terminate()
...@@ -272,10 +272,25 @@ class ProcessWorker(Worker, Process): ...@@ -272,10 +272,25 @@ class ProcessWorker(Worker, Process):
logger.info( "connected to %s"%str(address)) logger.info( "connected to %s"%str(address))
Worker.__init__(self, mgr.get_scheduler(), **keys) Worker.__init__(self, mgr.get_scheduler(), **keys)
Process.__init__(self) Process.__init__(self)
signal.signal(signal.SIGUSR1, catch_sigterm) # Catch all TERM signals
signal.signal(signal.SIGTERM, catch_sigterm) signal.signal(signal.SIGHUP, catch_sigterm)
signal.signal(signal.SIGABRT, catch_sigterm)
signal.signal(signal.SIGINT, catch_sigterm) signal.signal(signal.SIGINT, catch_sigterm)
signal.signal(signal.SIGQUIT, catch_sigterm)
signal.signal(signal.SIGILL, catch_sigterm)
signal.signal(signal.SIGABRT, catch_sigterm)
signal.signal(signal.SIGFPE, catch_sigterm)
signal.signal(signal.SIGSEGV, catch_sigterm)
# What to do for SIGPIPE
signal.signal(signal.SIGALRM, catch_sigterm)
signal.signal(signal.SIGTERM, catch_sigterm)
signal.signal(signal.SIGUSR1, catch_sigterm)
signal.signal(signal.SIGUSR2, catch_sigterm)
class _FakeWorker(ProcessWorker): class _FakeWorker(ProcessWorker):
......
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from pipelet.launchers import launch_process, launch_interactive
import os.path as op
import logging
import sys
S = "->".join(["lock%d"% a for a in range(10)])
T = pipeline.Pipeline(S, code_dir=op.abspath('./'), prefix=op.abspath('./'))
T.push (lock0=range(5))
#W,t = launch_interactive(T)
#W.run()
launch_process(T,2, log_level=logging.DEBUG)
import time
timeout = float(seg_input.values()[0])
time.sleep(timeout)
seg_output = [int(timeout)]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment