Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
pipelet
Pipelet
Commits
45a619bc
Commit
45a619bc
authored
Sep 07, 2010
by
Betoule Marc
Browse files
Correct a bug in gather
parent
329cf21f
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
29 additions
and
11 deletions
+29
-11
README.org
README.org
+4
-2
pipelet/multiplex.py
pipelet/multiplex.py
+5
-5
pipelet/task.py
pipelet/task.py
+3
-2
pipelet/utils.py
pipelet/utils.py
+15
-0
test/seg_second_code.py
test/seg_second_code.py
+2
-2
No files found.
README.org
View file @
45a619bc
...
...
@@ -234,8 +234,10 @@ The segment code is executed in a specific environment that provides:
part of a given namespace.
- load_products(filename, lst_par): update the namespace by
unpickling requested object from the file.
- logged_subprocess(lst_args): execute a subprocess and log its output.
- logger is a standard logging.Logger object that can be used to log the processing
- logged_subprocess(lst_args): execute a subprocess and log its
output in processname.log and processname.err.
- logger is a standard logging.Logger object that can be used to
log the processing
5. Hooking support
Pipelet enables you to write reusable generic
...
...
pipelet/multiplex.py
View file @
45a619bc
...
...
@@ -74,7 +74,7 @@ def union(*args):
return
l
def
gather
(
*
args
):
""" Return the gathering of the input sets.
""" Return the gathering of the input sets.
Parameters
----------
...
...
@@ -93,7 +93,7 @@ def gather(*args):
>>> print(len(gather(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
1
"""
l
=
[]
for
l_
in
args
:
l
.
extend
(
l_
)
return
[
l
]
l
=
[]
for
l_
in
args
:
l
.
extend
(
l_
)
return
[
l
]
pipelet/task.py
View file @
45a619bc
...
...
@@ -14,7 +14,7 @@
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import
threading
from
utils
import
str_date
from
utils
import
str_date
,
make_dict
from
contextlib
import
closing
import
pickle
...
...
@@ -166,8 +166,9 @@ class TaskList:
output_set
=
method
(
*
a
)
## e for each task to push
lst_task
=
[
Task
(
seg
,
dict
([(
r
[
2
],
r
[
0
])
for
r
in
e
]),
status
=
'queued'
,
parents
=
[
r
[
1
]
for
r
in
e
if
r
[
1
]
is
not
None
])
for
e
in
output_set
]
lst_task
=
[
Task
(
seg
,
make_
dict
([(
r
[
2
],
r
[
0
])
for
r
in
e
]),
status
=
'queued'
,
parents
=
[
r
[
1
]
for
r
in
e
if
r
[
1
]
is
not
None
])
for
e
in
output_set
]
for
l
in
lst_task
:
print
str
(
l
)
+
"parents: "
print
l
.
parents
return
lst_task
pipelet/utils.py
View file @
45a619bc
...
...
@@ -499,6 +499,21 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
conn
.
commit
()
def
make_dict
(
l
):
""" Convert a list into a dict keeping duplicated entries.
>>> d = make_dict(zip(['a']*10+['b'], range(10)+[1]))
{'a': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'b': 1}
"""
d
=
{}
for
k
,
v
in
l
:
try
:
d
[
k
].
append
(
v
)
except
KeyError
:
d
[
k
]
=
v
except
AttributeError
:
d
[
k
]
=
[
d
[
k
],
v
]
return
d
if
__name__
==
"__main__"
:
...
...
test/seg_second_code.py
View file @
45a619bc
tf
=
get_data_fn
(
'test.txt'
)
# Dude makes some kind of heavy computation
i
=
0
product
=
seg_input
[
0
]
product
=
seg_input
[
"first"
]
f
=
file
(
tf
,
'w'
)
f
.
write
(
'This segment received %s as arg'
%
product
)
f
.
close
seg_output
=
[
seg_input
[
0
]]
seg_output
=
[
seg_input
[
"first"
]]
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment