Commit e97ec821 authored by Betoule Marc's avatar Betoule Marc
Browse files

update doctests

parent 0ee7c10a
......@@ -1249,6 +1249,13 @@ using git:
- Format the patch for email submission, and send it to us
=git send-email --to HEAD^=
** Tests
Pipelet is not extensively tested because of limited development
time. Still we have a handful of useful test case that should be run
to prevent regression. They can be found in the test repository and
run using the command:
=test=
......@@ -25,3 +25,211 @@ from environment import *
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
__version__='2a761817b51d950aab5ce993735cad68eb2ec245'
__version__='2a761817b51d950aab5ce993735cad68eb2ec245'
__version__='2a761817b51d950aab5ce993735cad68eb2ec245'
__version__='2a761817b51d950aab5ce993735cad68eb2ec245'
__version__='2a761817b51d950aab5ce993735cad68eb2ec245'
__version__='3055d7cfe19886fffd26d2507d175e08f9d037ba'
__version__='3055d7cfe19886fffd26d2507d175e08f9d037ba'
__version__='3055d7cfe19886fffd26d2507d175e08f9d037ba'
__version__='3055d7cfe19886fffd26d2507d175e08f9d037ba'
__version__='d36778793e30ddec71bb22a9d3935b34ae9fffaf'
__version__='d36778793e30ddec71bb22a9d3935b34ae9fffaf'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
__version__='0ee7c10afe6fdbca66624c03ca58a6172c7a47c5'
......@@ -46,25 +46,25 @@ Run as a main, it can be used to launch a worker on a distant machine
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from os import path
import os
from contextlib import closing
import logging
import logging.handlers
import sys
import datetime
from pipelet.utils import get_log_file, init_logger
import socket # to get the hostname
import socket # to get the hostname
from multiprocessing.managers import BaseManager
import subprocess
def launch_interactive(pipe, log_level=logging.INFO):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
execution can be tracked.
Parameters
----------
pipe: a pipeline instance
Parameters
----------
pipe: a pipeline instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
......@@ -72,31 +72,31 @@ def launch_interactive(pipe, log_level=logging.INFO):
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> w,t = launch_interactive(T)
>>> T = pipeline.Pipeline("", code_dir='./',
... prefix='./', permissive=True)
>>> w,t = launch_interactive(T, log_level=logging.ERROR)
>>> w.run()
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
wl = init_logger ('worker', get_log_file (pipe, 'worker'), level=log_level)
wl = init_logger('worker', get_log_file(pipe, 'worker'), level=log_level)
w = worker.InteractiveWorker(s, wl)
import threading
t = threading.Thread(target=s.run)
t.start()
return (w,t)
return (w, t)
def launch_thread(pipe, n, log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
execution of any segment are caught and the corresponding task is
marked as failed.
Parameters
----------
pipe: a pipeline instance
Parameters
----------
pipe: a pipeline instance
n : integer, the number of threads
log_level:
integer, print log on stdout with log_level (values given by
......@@ -104,100 +104,108 @@ def launch_thread(pipe, n, log_level=logging.WARNING ):
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_thread(T, 2)
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ThreadWorker(s, wl)
w.start()
s.run()
from multiprocessing.managers import BaseManager
class SchedulerManager(BaseManager):
""" Extension of the BaseManager class.
""" Extension of the BaseManager class.
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING, nice=0):
def launch_process(pipe, n, address=('', 50000), authkey='secret',
log_level=logging.WARNING, nice=0):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
limitant, which is bound to be the case when most time is spend in
pure python processings.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_process(T, 2)
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl, nice=nice)
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey,
logger=wl, nice=nice)
w.start()
processlist.append(w)
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.join()
import subprocess
def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.WARNING ):
def launch_ssh(pipe, host_list, address=None, authkey='secret',
log_level=logging.ERROR):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
distribute computations on a pool of machine that share an access
to a NAS.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='/home/betoule/soft/pipelet/test/', prefix='/home/betoule/homeafs/')
>>> launch_ssh(T, ['betoule@lpnp204'], address=('lpnp321.in2p3.fr',50000))
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000))
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i, h in enumerate(host_list):
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0], address[1], authkey, get_log_file (pipe, 'worker%d'%i))])
w = subprocess.Popen(
['ssh', h,
"python -m pipelet.launchers -H %s -p %s -s %s"
" -l %s" % (address[0], address[1], authkey,
get_log_file(pipe, 'worker%d' % i))])
processlist.append(w)
print 'launching the scheduler'
# 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
......@@ -538,27 +546,31 @@ echo "worker returned at $(date)"
'-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
if __name__ == "__main__":
import optparse
import sys
parser = optparse.OptionParser()
parser.add_option('-H', '--host', metavar='hostname',
help='hostame or adress of the scheduler server', default=socket.gethostname())
help='hostame or adress of the scheduler server',
default=socket.gethostname())
parser.add_option('-p', '--port', metavar='port',
help='port the scheduler is listenning to', default=50000, type='int')
help='port the scheduler is listenning to',
default=50000, type='int')
parser.add_option('-s', '--secret', metavar='key',
help='authentication key', default='secret')
parser.add_option('-l', '--logfile', metavar='logfile',
help='worker log filename')
# parser.add_option('-L', '--log-level', metavar='level',
# type=
# help='worker log filename')
(options, args) = parser.parse_args()
print (options.host,options.port)
print options.secret
# print (options.host, options.port)
# print options.secret
name = options.logfile.split("_")[-1]
wl = init_logger (name, options.logfile)
w = worker.ProcessWorker(address=(options.host,options.port), authkey=options.secret,logger=wl)
sys.argv=[sys.argv[0]] ## do not propage command line argument to segment script
wl = init_logger(name, options.logfile)
w = worker.ProcessWorker(address=(options.host, options.port),
authkey=options.secret, logger=wl)
sys.argv = [sys.argv[0]] # do not propage command line argument to script
w.run()
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
......@@ -15,16 +15,17 @@
from __builtin__ import zip
def cross_prod(*args):
""" Return the cartesian product of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
a list of tuples.
Examples
--------
......@@ -42,23 +43,24 @@ def cross_prod(*args):
l2 = cross_prod(*(args[1:]))
if l1 and l2:
for a in l1:
res.extend([(a,)+b for b in l2])
res.extend([(a,) + b for b in l2])
elif l1:
res.extend(zip(l1))
elif l2:
res.extend(l2)
return res
def union(*args):
""" Return the union of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
a list of tuples.
Examples
--------
......@@ -77,6 +79,7 @@ def union(*args):
l.extend(zip(l_))
return l
def _group(l, code):
""" Group entries of l matching expression code.
......@@ -84,31 +87,30 @@ def _group(l, code):
----------
l: list of tuple (prod, id, parent name)
code: string, group_by directive content
Returns
-------
list of tuple (prod, list of parent id)
Examples
--------
>>> group([('a',1),('b',2)],[(1,3),(1,4),(1,5)])
[[(('a', 1), (1, 3)), (('a', 1), (1, 4)), (('a', 1), (1, 5))],
[(('b', 2), (1, 3)), (('b', 2), (1, 4)), (('b', 2), (1, 5))]]
"""
c = compile(code,'<string>','eval')
c = compile(code, '<string>', 'eval')
## l is a list of tuple (prod, id , parent name)
## apply group by directive using a dictionnary of name:prod as local
values = [eval(c, {}, dict([(e[2],e[0]) for e in t])) for t in l]
classes = set(values) # reduce to unique list of product
values = [eval(c, {}, dict([(e[2], e[0]) for e in t])) for t in l]
classes = set(values) # reduce to unique list of product
## build a dictionnary prod:list of id
d = {}
for e in classes: # for each class init dict
for e in classes: # for each class init dict
d[e] = []
for t, v in zip(l,values): ## l and values have same size. zip match input with eval result
d[v].extend([e[1] for e in t if e[1] is not None]) # add new contrib to class
## how id can be None ?
return d.items() ## return dict as a list of tuple (prod, list of id)
# l and values have same size. zip match input with eval result
for t, v in zip(l, values):
# add new contrib to class
d[v].extend([e[1] for e in t if e[1] is not None])
## how id can be None ?
return d.items() # return dict as a list of tuple (prod, list of id)
def _where(code):
......@@ -117,16 +119,16 @@ def _where(code):
Parameters
----------
code: string, condition expression
Returns
-------
python selection function.
python selection function.
"""
c = compile(code, '<string>','eval')
c = compile(code, '<string>', 'eval')
def condition(x):
## x is a list of tuple (prod, id, parent name)
for p in x:
vars()[p[2]]= p[0] # build a local dict {parent name:prod}
return eval(c) ## apply selection condition return True or False
vars()[p[2]] = p[0] # build a local dict {parent name:prod}
return eval(c) # apply selection condition return True or False
return condition
This diff is collapsed.
......@@ -3,29 +3,33 @@
import re
import os.path as path
import subprocess
import contextlib
## string, escape character
_escape = re.compile('_')
def escape(s):
""" Escape character.
""" Escape character.
Parameters
----------
s: string.