Commit 0669a6b6 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'newbase' of git@gitorious.org:pipelet/pipelet into newbase

parents 7c68ce82 c46bf4a0
......@@ -163,6 +163,7 @@ directive can be found the last one is retained.
- #multiplex('gather') : The input set contains one tuple of all the ouputs.
- #multiplex('group') : compute the cross_product and group the task that are identical.
*** Depend directive
......@@ -234,8 +235,10 @@ The segment code is executed in a specific environment that provides:
part of a given namespace.
- load_products(filename, lst_par): update the namespace by
unpickling requested object from the file.
- logged_subprocess(lst_args): execute a subprocess and log its output.
- logger is a standard logging.Logger object that can be used to log the processing
- logged_subprocess(lst_args): execute a subprocess and log its
output in processname.log and processname.err.
- logger is a standard logging.Logger object that can be used to
log the processing
5. Hooking support
Pipelet enables you to write reusable generic
......
......@@ -74,7 +74,7 @@ def union(*args):
return l
def gather(*args):
""" Return the gathering of the input sets.
""" Return the gathering of the input sets.
Parameters
----------
......@@ -93,7 +93,24 @@ def gather(*args):
>>> print(len(gather(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
1
"""
l = []
for l_ in args:
l.extend(l_)
return [l]
l = []
for l_ in args:
l.extend(l_)
return [l]
def group(*args):
""" Compute the cross product and group the similar entries.
>>> group([('a',1),('b',2)],[(1,3),(1,4),(1,5)])
[[(('a', 1), (1, 3)), (('a', 1), (1, 4)), (('a', 1), (1, 5))],
[(('b', 2), (1, 3)), (('b', 2), (1, 4)), (('b', 2), (1, 5))]]
"""
l = cross_prod(*args)
classes = set([tuple([e[0] for e in t]) for t in l])
l2 = []
for e in classes:
a = ()
for t in l:
if tuple([b[0] for b in t]) == e:
a = a + t
l2.append(a)
return l2
......@@ -14,7 +14,7 @@
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import threading
from utils import str_date
from utils import str_date, make_dict
from contextlib import closing
import pickle
......@@ -166,8 +166,9 @@ class TaskList:
output_set = method (*a)
## e for each task to push
lst_task = [Task(seg, dict([(r[2],r[0]) for r in e]), status='queued', parents=[r[1] for r in e if r[1] is not None]) for e in output_set]
lst_task = [Task(seg, make_dict([(r[2],r[0]) for r in e]), status='queued', parents=[r[1] for r in e if r[1] is not None]) for e in output_set]
for l in lst_task:
print str(l)+"parents: "
print l.parents
return lst_task
......@@ -502,6 +502,21 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
conn.commit()
def make_dict(l):
""" Convert a list into a dict keeping duplicated entries.
>>> d = make_dict(zip(['a']*10+['b'], range(10)+[1]))
{'a': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'b': 1}
"""
d = {}
for k, v in l:
try:
d[k].append(v)
except KeyError:
d[k] = v
except AttributeError:
d[k] = [d[k],v]
return d
if __name__ == "__main__":
......
tf = get_data_fn('test.txt')
# Dude makes some kind of heavy computation
i = 0
product = seg_input[0]
product = seg_input["first"]
f = file(tf,'w')
f.write('This segment received %s as arg'%product)
f.close
seg_output = [seg_input[0]]
seg_output = [seg_input["first"]]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment