Commit 3e6b6d77 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'master' of gitorious.org:pipelet/pipelet

parents 899d659d 2c9e893e
......@@ -1249,6 +1249,13 @@ using git:
- Format the patch for email submission, and send it to us
=git send-email --to HEAD^=
** Tests
Pipelet is not extensively tested because of limited development
time. Still we have a handful of useful test case that should be run
to prevent regression. They can be found in the test repository and
run using the command:
=test=
......@@ -20,8 +20,3 @@ from pipeline import *
from task import *
from environment import *
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
__version__='4206ee5d2fb7a638977cef0b4bae42029f0248ae'
......@@ -52,41 +52,43 @@ def deltask(db_file, lst_task, report_only=False):
import shutil
# remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
print "removing task %s from db"%taskid
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
while lst_task:
taskid = lst_task[0]
with conn:
print "removing task %s from db"%taskid
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
# delete from tasks_relations
if not report_only:
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
# mark child tasks for deletion
children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall()
# delete from tasks
if not report_only:
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
print 'Task %s removed from db'%taskid
else:
print 'Task %s would be removed from db'%taskid
lst_task.remove(taskid)
# delete from tasks_relations
if not report_only:
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
# mark child tasks for deletion
children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall()
# delete from tasks
if not report_only:
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
print 'Task %s removed from db'%taskid
else:
print 'Task %s would be removed from db'%taskid
#conn.close()
try:
print "Removing directory %s"%str_input
if not report_only:
shutil.rmtree(str_input)
print "%s removed"%str_input
else:
print "%s would be removed"%str_input
except OSError:
print "Failed to remove %s"%str_input
if children:
print "Adding children of %s"%taskid
lst_task += [c[0] for c in children]
lst_task = list(set(lst_task))
lst_task.sort()
conn.close()
try:
print "Removing directory %s"%str_input
if not report_only:
shutil.rmtree(str_input)
print "%s removed"%str_input
else:
print "%s would be removed"%str_input
except OSError:
print "Failed to remove %s"%str_input
if children:
print "Adding children of %s"%taskid
lst_task += [c[0] for c in children]
lst_task = list(set(lst_task))
lst_task.sort()
if lst_task:
deltask(db_file, lst_task, report_only=report_only)
#if lst_task:
# deltask(db_file, lst_task, report_only=report_only)
def get_lst_tag (db_file):
""" Return the list of existing tags
......@@ -169,7 +171,19 @@ def add_tag (db_file, segid, tag):
fn = glob(os.path.join(l[1], "*.meta"))
_update_meta(fn, str_tag)
def get_last(db_file):
""" Return the last computed segment in a given database.
Return a seg-id
Parameters
----------
db_file: string. Database file name.
"""
conn = sqlite3.connect(db_file, check_same_thread=True)
conn.text_factory=str
l = conn.execute('select seg_id,max(ended_on) from tasks').fetchone()
return l[0]
def _update_meta(fn, str_tag):
""" Update meta file with current tag value.
......@@ -192,10 +206,12 @@ def del_tag (db_file, tag):
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
print tag
with conn:
l = conn.execute('select seg_id, tag, curr_dir from segments where tag like ?',("%"+tag+"%",)).fetchall()
l = conn.execute('select seg_id, tag, curr_dir from segments where tag == ?',(tag,)).fetchall()
for s in l:
lst_tag = s[1].split(";")
print lst_tag
lst_tag.remove(tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0]))
......
......@@ -46,25 +46,25 @@ Run as a main, it can be used to launch a worker on a distant machine
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from os import path
import os
from contextlib import closing
import logging
import logging.handlers
import sys
import datetime
from pipelet.utils import get_log_file, init_logger
import socket # to get the hostname
import socket # to get the hostname
from multiprocessing.managers import BaseManager
import subprocess
def launch_interactive(pipe, log_level=logging.INFO):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
execution can be tracked.
Parameters
----------
pipe: a pipeline instance
Parameters
----------
pipe: a pipeline instance
log_level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
......@@ -72,31 +72,31 @@ def launch_interactive(pipe, log_level=logging.INFO):
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> w,t = launch_interactive(T)
>>> T = pipeline.Pipeline("", code_dir='./',
... prefix='./', permissive=True)
>>> w,t = launch_interactive(T, log_level=logging.ERROR)
>>> w.run()
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
wl = init_logger ('worker', get_log_file (pipe, 'worker'), level=log_level)
wl = init_logger('worker', get_log_file(pipe, 'worker'), level=log_level)
w = worker.InteractiveWorker(s, wl)
import threading
t = threading.Thread(target=s.run)
t.start()
return (w,t)
return (w, t)
def launch_thread(pipe, n, log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
execution of any segment are caught and the corresponding task is
marked as failed.
Parameters
----------
pipe: a pipeline instance
Parameters
----------
pipe: a pipeline instance
n : integer, the number of threads
log_level:
integer, print log on stdout with log_level (values given by
......@@ -104,100 +104,108 @@ def launch_thread(pipe, n, log_level=logging.WARNING ):
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_thread(T, 2)
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ThreadWorker(s, wl)
w.start()
s.run()
from multiprocessing.managers import BaseManager
class SchedulerManager(BaseManager):
""" Extension of the BaseManager class.
""" Extension of the BaseManager class.
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING, nice=0):
def launch_process(pipe, n, address=('', 50000), authkey='secret',
log_level=logging.WARNING, nice=0):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
limitant, which is bound to be the case when most time is spend in
pure python processings.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='./', prefix='./')
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_process(T, 2)
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
wl = init_logger ('worker%d'%i, get_log_file (pipe, 'worker%d'%i), level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl, nice=nice)
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey,
logger=wl, nice=nice)
w.start()
processlist.append(w)
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.join()
import subprocess
def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.WARNING ):
def launch_ssh(pipe, host_list, address=None, authkey='secret',
log_level=logging.ERROR):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
distribute computations on a pool of machine that share an access
to a NAS.
Examples
--------
>>> T = pipeline.Pipeline(['first','second'], code_dir='/home/betoule/soft/pipelet/test/', prefix='/home/betoule/homeafs/')
>>> launch_ssh(T, ['betoule@lpnp204'], address=('lpnp321.in2p3.fr',50000))
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000))
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i, h in enumerate(host_list):
w = subprocess.Popen(['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0], address[1], authkey, get_log_file (pipe, 'worker%d'%i))])
w = subprocess.Popen(
['ssh', h,
"python -m pipelet.launchers -H %s -p %s -s %s"
" -l %s" % (address[0], address[1], authkey,
get_log_file(pipe, 'worker%d' % i))])
processlist.append(w)
print 'launching the scheduler'
# 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
......@@ -538,27 +546,31 @@ echo "worker returned at $(date)"
'-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
if __name__ == "__main__":
import optparse
import sys
parser = optparse.OptionParser()
parser.add_option('-H', '--host', metavar='hostname',
help='hostame or adress of the scheduler server', default=socket.gethostname())
help='hostame or adress of the scheduler server',
default=socket.gethostname())
parser.add_option('-p', '--port', metavar='port',
help='port the scheduler is listenning to', default=50000, type='int')
help='port the scheduler is listenning to',
default=50000, type='int')
parser.add_option('-s', '--secret', metavar='key',
help='authentication key', default='secret')
parser.add_option('-l', '--logfile', metavar='logfile',
help='worker log filename')
# parser.add_option('-L', '--log-level', metavar='level',
# type=
# help='worker log filename')
(options, args) = parser.parse_args()
print (options.host,options.port)
print options.secret
# print (options.host, options.port)
# print options.secret
name = options.logfile.split("_")[-1]
wl = init_logger (name, options.logfile)
w = worker.ProcessWorker(address=(options.host,options.port), authkey=options.secret,logger=wl)
sys.argv=[sys.argv[0]] ## do not propage command line argument to segment script
wl = init_logger(name, options.logfile)
w = worker.ProcessWorker(address=(options.host, options.port),
authkey=options.secret, logger=wl)
sys.argv = [sys.argv[0]] # do not propage command line argument to script
w.run()
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
......@@ -15,16 +15,17 @@
from __builtin__ import zip
def cross_prod(*args):
""" Return the cartesian product of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
a list of tuples.
Examples
--------
......@@ -42,23 +43,24 @@ def cross_prod(*args):
l2 = cross_prod(*(args[1:]))
if l1 and l2:
for a in l1:
res.extend([(a,)+b for b in l2])
res.extend([(a,) + b for b in l2])
elif l1:
res.extend(zip(l1))
elif l2:
res.extend(l2)
return res
def union(*args):
""" Return the union of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
a list of tuples.
Examples
--------
......@@ -77,6 +79,7 @@ def union(*args):
l.extend(zip(l_))
return l
def _group(l, code):
""" Group entries of l matching expression code.
......@@ -84,31 +87,30 @@ def _group(l, code):
----------
l: list of tuple (prod, id, parent name)
code: string, group_by directive content
Returns
-------
list of tuple (prod, list of parent id)
Examples
--------
>>> group([('a',1),('b',2)],[(1,3),(1,4),(1,5)])
[[(('a', 1), (1, 3)), (('a', 1), (1, 4)), (('a', 1), (1, 5))],
[(('b', 2), (1, 3)), (('b', 2), (1, 4)), (('b', 2), (1, 5))]]
"""
c = compile(code,'<string>','eval')
c = compile(code, '<string>', 'eval')
## l is a list of tuple (prod, id , parent name)
## apply group by directive using a dictionnary of name:prod as local
values = [eval(c, {}, dict([(e[2],e[0]) for e in t])) for t in l]
classes = set(values) # reduce to unique list of product
values = [eval(c, {}, dict([(e[2], e[0]) for e in t])) for t in l]
classes = set(values) # reduce to unique list of product
## build a dictionnary prod:list of id
d = {}
for e in classes: # for each class init dict
for e in classes: # for each class init dict
d[e] = []
for t, v in zip(l,values): ## l and values have same size. zip match input with eval result
d[v].extend([e[1] for e in t if e[1] is not None]) # add new contrib to class
## how id can be None ?
return d.items() ## return dict as a list of tuple (prod, list of id)
# l and values have same size. zip match input with eval result
for t, v in zip(l, values):
# add new contrib to class
d[v].extend([e[1] for e in t if e[1] is not None])
## how id can be None ?
return d.items() # return dict as a list of tuple (prod, list of id)
def _where(code):
......@@ -117,16 +119,16 @@ def _where(code):
Parameters
----------
code: string, condition expression
Returns
-------
python selection function.
python selection function.
"""
c = compile(code, '<string>','eval')
c = compile(code, '<string>', 'eval')
def condition(x):
## x is a list of tuple (prod, id, parent name)
for p in x:
vars()[p[2]]= p[0] # build a local dict {parent name:prod}
return eval(c) ## apply selection condition return True or False
vars()[p[2]] = p[0] # build a local dict {parent name:prod}
return eval(c) # apply selection condition return True or False
return condition
This diff is collapsed.
......@@ -3,29 +3,33 @@
import re
import os.path as path
import subprocess
import contextlib
## string, escape character
_escape = re.compile('_')
def escape(s):
""" Escape character.
""" Escape character.
Parameters
----------
s: string.
s: string.
"""
return _escape.subn(r'\_',s)[0]
return _escape.subn(r'\_', s)[0]
## string, default latex preambule
# string, default latex preambule
default_preambule = r"""\documentclass[a4paper]{article}
\usepackage{geometry}
\geometry{ hmargin=2.5cm, vmargin=3cm }
\begin{document}
"""
def array2table(tab, tab_format=None, headings=[], num_format="%.2g", texname='table.tex',caption=""):
""" Convert numpy array to latex table.
def array2table(tab, tab_format=None, headings=[],
num_format="%.2g", texname='table.tex', caption=""):
""" Convert numpy array to latex table.
Parameters
----------
tab: numpy array
......@@ -58,39 +62,39 @@ for l in tab:
#!\end{table}
"""
if isinstance(num_format, str):
num_format = [num_format]*tab.shape[-1]
num_format = [num_format] * tab.shape[-1]
if not tab_format:
tab_format = "*{%d}{c}"%tab.shape[-1]
tab_format = "*{%d}{c}" % tab.shape[-1]
rf = ReportFactory(namespace=locals())
rf.generate_tex(script=script, texname=texname)
class ReportFactory:
""" Provide latex format reports from python code.
""" Provide latex format reports from python code.
"""
_template = re.compile('@\{([^@]*?)\}')
def __init__(self,latex_preambule=default_preambule,namespace={}):
""" Initialize a report factory.
def __init__(self, latex_preambule=default_preambule, namespace={}):
""" Initialize a report factory.
Parameters
----------
latex_preambule: string, latex preambule
namespace: dict, python namespace
"""
## dict, python namespace
self.namespace= namespace
self.namespace = namespace
self.namespace['escape'] = escape
## string, latex preambule
self.latex_preambule=latex_preambule
self.latex_preambule = latex_preambule
def execute_code_block(self, block):
""" Execute python code and report standard output and error.
""" Execute python code and report standard output and error.
Parameters
----------
block: string, python code
Returns
-------
string
......@@ -103,14 +107,13 @@ class ReportFactory:
2
3