Commit 0775ce1d authored by Marc Betoule's avatar Marc Betoule
Browse files

group et where written

parent 3830743f
......@@ -98,19 +98,26 @@ def gather(*args):
l.extend(l_)
return [l]
def group(*args):
def _group(l, parent_list, code):
""" Compute the cross product and group the similar entries.
>>> group([('a',1),('b',2)],[(1,3),(1,4),(1,5)])
[[(('a', 1), (1, 3)), (('a', 1), (1, 4)), (('a', 1), (1, 5))],
[(('b', 2), (1, 3)), (('b', 2), (1, 4)), (('b', 2), (1, 5))]]
"""
l = cross_prod(*args)
classes = set([tuple([e[0] for e in t]) for t in l])
l2 = []
c = compile(code,'<string>','eval')
values = [eval(c, {}, dict(zip(parent_list, [e[0] for e in t]))) for t in l]
classes = set(values)
d = {}
for e in classes:
a = ()
for t in l:
if tuple([b[0] for b in t]) == e:
a = a + t
l2.append(a)
return l2
d[e] = ()
for t, v in zip(l,values):
d[v] = d[v] + t
return d.values()
def _where(parent_list, code):
c = compile(code, '<string>','eval')
def condition(x):
for i, p in enumerate(parent_list):
vars()[p]= x[i][0]
return eval(c)
return condition
......@@ -63,13 +63,13 @@ class Repository:
"""
return bool(self._ext_re.match(path.splitext(f)[1]))
def get_directive(Direct, seg):
def get_directive(self, Direct, seg):
c = self.get_code_string(seg)
d = Direct()
for l in c.splitlines():
try:
d.parse(l)
except StopIteration():
except StopIteration:
pass
return d
......
......@@ -25,6 +25,7 @@ import shutil
import threading
import logging
from contextlib import closing
from directive import Multiplex
import pickle
class NullHandler(logging.Handler):
......@@ -224,7 +225,7 @@ class Scheduler():
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
l = self.products_list.multiplex(seg, parents, self.pipe.get_multiplex(seg))
l = self.products_list.multiplex(seg, parents, self.pipe.repository.get_directive(Multiplex,seg))
## task with no input
if not l:
l = [Task(seg)]
......
......@@ -17,7 +17,7 @@ import threading
from utils import str_date, make_dict
from contextlib import closing
import pickle
import multiplex
class Task:
""" A segment code associated with its input/output product(s).
......@@ -141,8 +141,8 @@ class TaskList:
self._list[t.seg] = []
self._list[t.seg].append(t)
def multiplex (self, seg, parent_list, method):
""" Append segment tasks deduced from multiplex method
def multiplex (self, seg, parent_list, directive):
""" Compute the result of a multiplex directive
Multiplex method is one of 'cross_prod', 'union', 'gather', 'zip'.
......@@ -150,8 +150,10 @@ class TaskList:
----------
seg : string, segment name
parent_list: list of parent segment name
method: multiplex method
directive: an instance of directive.Multiplex
"""
method = eval("multiplex.%s"%directive.method)
a = [] ## as many as parent_list
for p in parent_list:
b = []
......@@ -165,6 +167,7 @@ class TaskList:
a.append(b)
output_set = method (*a)
output_set = filter multiplex
## e for each task to push
lst_task = [Task(seg, make_dict([(r[2],r[0]) for r in e]), status='queued', parents=[r[1] for r in e if r[1] is not None]) for e in output_set]
for l in lst_task:
......
p = glob_seg('third', 'Preambule.txt')
t = glob_seg('second','test.txt')
import subprocess
#multiplex('cross_prod')
bla = 9
f = file(get_data_fn('result.txt'),'w')
print (p,t)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment