Commit c4c3bea2 authored by Betoule Marc's avatar Betoule Marc
Browse files

writing a script to launch a daemon scheduler

parent 02481b0c
......@@ -232,7 +232,7 @@ def get_log_file (pipe, name):
name: log file short name
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%m%d_%H%M%s%f")
strdate = d.strftime("%y-%m-%d_%H-%M-%s")
return path.join(pipe.get_log_dir(),strdate+"_"+name)
......
......@@ -25,7 +25,6 @@ import shutil
from cherrypy.lib.static import serve_file
from auth import read_access, write_access
import re
import pylab
import pickle
html_tmp = """
......@@ -98,7 +97,7 @@ class Web:
e = conn.execute('select queued_on from tasks where seg_id=?',(s[0],)).fetchone()
if e is not None and e:
lst.append(e[0])
return pylab.unique(lst)
return list(set(lst))
@cherrypy.expose
@write_access
......@@ -123,7 +122,7 @@ class Web:
if l[0] is not None and l:
lst_tag = l[0].split(";")
lst_tag.append(tag)
lst_tag = pylab.unique(lst_tag)
lst_tag = list(set(lst_tag))
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid))
fn = glob(os.path.join(l[1], "seg_*.meta"))
......@@ -571,7 +570,7 @@ class Web:
for segid in seglist:
if segid and segid is not None:
final_list= final_list+self._get_children(int(segid))
final_list = list(pylab.unique(final_list))
final_list = list(set(final_list))
final_list.sort()
## empty the list
......
#!/usr/bin/env python
"""Run the pipelet scheduler in daemon mode.
"""
import sys, os
def main():
"""
"""
import pipelet.scheduler as scheduler
from multiprocessing.managers import BaseManager
import multiprocessing
from pipelet.utils import get_log_file
import logging
import logging.handlers
import optparse
import pickle
parser = optparse.OptionParser()
parser.add_option('-l', '--log-level', metavar='N',
help='Set the verbosity of the loggin',type='int',default=logging.INFO)
parser.add_option('-p', '--port',
help='Listen on port',type='int',default=5000)
parser.add_option('-a', '--address',
help='Listen on address',type='str',default=os.environ['HOST'])
(options, args) = parser.parse_args()
# Reading the pipe
pipefile = args[0]
try:
f = open(pipefile)
pipe = pickle.load(f)
f.close()
except IOError:
sys.exit(1)
# Setting up logging
log_file = get_log_file (pipe, 'scheduler_on_%s'%os.environ['HOST'])
logger = logging.getLogger("scheduler")
logger.setLevel(options.log_level)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
rfhandler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
ml = multiprocessing.get_logger()
ml.setLevel(options.log_level)
ml.addHandler(rfhandler)
try:
# Now that logging is set up decouple from parent environnement
redirect_stream(sys.stdin, None)
redirect_stream(sys.stdout, None)
redirect_stream(sys.stderr, None)
# Launching the scheduler
class SchedulerManager(BaseManager):
""" Extension of the BaseManager class.
"""
pass
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
mgr = SchedulerManager(address=(options.address, options.port), authkey='secret')
server = mgr.get_server()
os.chdir(WORKDIR)
server.serve_forever()
except BaseException, e:
logger.critical(e.message)
sys.exit(1)
# Default working directory for the daemon.
WORKDIR = "/"
def redirect_stream(system_stream, target_stream):
""" Redirect a system stream to a specified file.
`system_stream` is a standard system stream such as
``sys.stdout``. `target_stream` is an open file object that
should replace the corresponding system stream object.
If `target_stream` is ``None``, defaults to opening the
operating system's null device and using its file descriptor.
"""
if target_stream is None:
target_fd = os.open(os.devnull, os.O_RDWR)
else:
target_fd = target_stream.fileno()
os.dup2(target_fd, system_stream.fileno())
if __name__ == "__main__":
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# decouple from parent environment
os.setsid()
# We stay with the same umask
# os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent, print eventual PID before
print "Daemon PID %d" % pid
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
# start the daemon main loop
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment