Commit ec263dac authored by Marc Betoule's avatar Marc Betoule
Browse files

Improve the handling of locks

parent ecb0c504
......@@ -73,7 +73,7 @@ class Scheduler():
## lock on success
self.success_lock = threading.Lock()
## lock on worker
self.nb_worker_lock = threading.Lock()
self.nb_worker_lock = threading.RLock()
## number of worker
self.nb_worker = 0
## stoppping event
......@@ -114,6 +114,7 @@ class Scheduler():
logger.info('%d workers registered'%self.nb_worker)
if self.nb_worker == 0:
self.stop_event.set()
self.abort_scheduling(None)
def put_task(self, t):
""" Add tasks to the queue.
......@@ -275,8 +276,9 @@ class Scheduler():
"""
self.tracker.update_status(t,'queued')
self.task_queue.put_task(t)
self.task_done()
if not self.abort:
self.task_queue.put(t)
self.task_queue.task_done()
def abort_scheduling(self, t):
""" Abort the processing.
......
......@@ -121,7 +121,7 @@ class Worker(object):
logger.info("%d jobs completed" % n)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
scheduler.requeue(task)
self.scheduler.requeue(task)
finally:
self.terminate()
......@@ -272,10 +272,25 @@ class ProcessWorker(Worker, Process):
logger.info( "connected to %s"%str(address))
Worker.__init__(self, mgr.get_scheduler(), **keys)
Process.__init__(self)
signal.signal(signal.SIGUSR1, catch_sigterm)
signal.signal(signal.SIGTERM, catch_sigterm)
signal.signal(signal.SIGABRT, catch_sigterm)
# Catch all TERM signals
signal.signal(signal.SIGHUP, catch_sigterm)
signal.signal(signal.SIGINT, catch_sigterm)
signal.signal(signal.SIGQUIT, catch_sigterm)
signal.signal(signal.SIGILL, catch_sigterm)
signal.signal(signal.SIGABRT, catch_sigterm)
signal.signal(signal.SIGFPE, catch_sigterm)
signal.signal(signal.SIGSEGV, catch_sigterm)
# What to do for SIGPIPE
signal.signal(signal.SIGALRM, catch_sigterm)
signal.signal(signal.SIGTERM, catch_sigterm)
signal.signal(signal.SIGUSR1, catch_sigterm)
signal.signal(signal.SIGUSR2, catch_sigterm)
class _FakeWorker(ProcessWorker):
......
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from pipelet.launchers import launch_process, launch_interactive
import os.path as op
import logging
import sys
S = "->".join(["lock%d"% a for a in range(10)])
T = pipeline.Pipeline(S, code_dir=op.abspath('./'), prefix=op.abspath('./'))
T.push (lock0=range(5))
#W,t = launch_interactive(T)
#W.run()
launch_process(T,2, log_level=logging.DEBUG)
import time
timeout = float(seg_input.values()[0])
time.sleep(timeout)
seg_output = [int(timeout)]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment