Commit 478b769e authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

new test pipe, + remove print from engine + doc updated with multiplex directive

parent b60b79b9
......@@ -205,7 +205,16 @@ final output set of segment "melt" will be:
[('Lancelot the Brave'), ('Lancelot the Pure'), ('Galahad the Brave'), ('Galahad the Pure')].
TODO : describe input data type : disctionnary , ... ?
As the segment execution order can not be determined from the pipe
scheme, it is not possible to retrieve an ordered input tuple. To
avoid this issue, segment inputs are dictionnaries, with keywords
matching parent segment names. In the above example, one can read
"melt" inputs using:
k = seg_input["knights"]
q = seg_input["quality"]
See section 'The segment environment' for more details.
*** Multiplex directive
......@@ -213,13 +222,37 @@ This default behavior can be altered by specifying a #multiplex
directive in the commentary of the segment code. If several multiplex
directives are present in the segment code the last one is retained.
- #multiplex : activate the default behaviour
The multiplex directive can be one of:
+ #multiplex cross_prod : default behavior, return the Cartesian
product.
+ #multiplex union : make the union of the inputs
+ #multiplex zip : return a list of tuples, where each tuple contains the i-th element
from each of the parent output lists. The returned list is truncated
in length to the length of the shortest list.
Moreover the #multiplex cross_prod directive can be treated like an SQL
request:
#multiplex cross_prod where "condition" group_by "instruction"
Following the previous example, one can use :
- #multiplex cross_prod group by 0 : The input set contains one tuple of all the ouputs.
#multiplex cross_prod where "quality=='the Brave'"
to get 2 instances of the "melt" segment : ('Lancelot','the Brave'), ('Galahad','the Brave')
- #multiplex cross_prod group by ... : compute the cross_product and group the task
that are identical. To make use of group, elements of the output set
have to be hashable.
#multiplex cross_prod group_by "knights"
to get 2 instances of the "melt" segment : ('Lancelot'), ('Galahad')
#multiplex cross_prod group_by "0"
to get 1 instance of the "melt" segment.
Note that to make use of group, elements of the output set have to be hashable.
Another caution on the use of group: segment input data type is no longer
dictionnary in those cases.
See section 'The segment environment' for more details.
*** Orphan segments
......@@ -228,8 +261,16 @@ and therefore are executed once.
The possibility is offer to push an input list to an orphan segment.
If P is an instance of the pipeline object:
P.push (segname=seg_input)
P.push (segname=[1,2,3])
From the segment environment, inputs can be retrieve from the
usual dictionnary, using the keyword 'segnamephantom'.
id = seg_input['segnamephantom']
or
id = seg_input.values()[0]
See section 'The segment environment' for more details.
*** Depend directive
......@@ -301,7 +342,7 @@ The storage is organized as follows:
The segment code is executed in a specific environment that provides:
1. access to the segment input and output
- seg_input: this variable is a dictionnary containing the input of the segment
- seg_input: this variable is a dictionnary containing the input of the segment^*
- seg_output: this variable has to be a list.
2. Functionnalities to use the automated hierarchical data storage system.
......@@ -330,6 +371,13 @@ The segment code is executed in a specific environment that provides:
hook (hookname, globals()): execute Python script ‘segname_hookname.py’ and update the namespace.
^* In the general case, seg_input is a python dictionnary which
contains as many keywords as parent segments. In the case of orphan
segment, the keyword used is suffixed by the 'phantom' word.
One exception to this is coming from the use of the 'group_by'
directive, which alters the origin of the inputs. In this case,
seg_input contains the resulting class elements.
*** The exemple pipelines
**** fft
......@@ -804,6 +852,10 @@ apache webserver which brings essentially two advantages:
* The pipelet actors
This section document the code for developpers.
The code documentation can be built using the doxygen configuration
file
pipelet/doc/pipelet.dox
** The Repository object
** The Pipeline object
......
......@@ -216,8 +216,6 @@ class Pipeline:
c = s.find(sp,a.start())
a = _edge.search(s,c)
segs = set(flatten(r))
print r
print segs
self._parents=dict([(seg,[]) for seg in segs])
self._children=dict([(seg,[]) for seg in segs])
for c in r:
......
......@@ -324,7 +324,7 @@ class Scheduler():
logger.info("Pushing phantom task %s"%str(t))
try:
for s in self.pipe.flatten():
print s
logger.info(s)
self.push_next_seg(s)
self.task_queue.join()
if self.abort:
......
......@@ -146,7 +146,7 @@ class TaskList:
def multiplex (self, seg, parent_list, directive):
""" Compute the result of a multiplex directive
Multiplex method is one of 'cross_prod', 'union', 'gather', 'zip'.
Multiplex method is one of 'cross_prod', 'union', 'zip'.
Parameters
----------
......@@ -176,9 +176,11 @@ class TaskList:
if directive.group_by:
output_set = multiplex._group(output_set, parent_list, directive.group_by)
lst_task = [Task(seg, e[0], status='queued', parents=list(set(e[1]))) for e in output_set]
## make the list of task to push, input are dictionnaries
else:
lst_task = [Task(seg, make_dict([(r[2],r[0]) for r in e]), status='queued', parents=list(set([r[1] for r in e if r[1] is not None]))) for e in output_set]
return lst_task
......@@ -332,7 +332,7 @@ def hash_file(codefile):
3DDAAEC1
"""
if not path.exists(codefile):
print('Try to compute checksum of non existing file %s.'%codefile)
logger.warning('Try to compute checksum of non existing file %s.'%codefile)
return None
sum_binary = "cksum"
......@@ -341,7 +341,7 @@ def hash_file(codefile):
res = sp.communicate()[0]
key = "%X" % int(res.split(' ')[0])
if sp.returncode != 0:
print('Error while computing checksum of file %s.'%codefile)
logger.critical('Error while computing checksum of file %s.'%codefile)
return key
......
print seg_input
seg_output = [seg_input.values()[0]]
seg_output = ['aa', 'bb']
#multiplex cross_prod group_by "b=='aa'"
print seg_input
""" Demonstration of """
import pipelet.pipeline as pipeline
from pipelet.launchers import launch_interactive
import os
import logging
pipedot ="""a -> c;
b-> c;
"""
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
P.push(a=[1,2,3])
W,t = launch_interactive(P, log_level=0)
W.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment