Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
pipelet
Pipelet
Commits
f3165541
Commit
f3165541
authored
Aug 26, 2010
by
Maude Le Jeune
Browse files
bug from merge corrected
parent
0602dae0
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
23 additions
and
8 deletions
+23
-8
pipelet/pipeline.py
pipelet/pipeline.py
+1
-1
pipelet/repository.py
pipelet/repository.py
+2
-2
pipelet/worker.py
pipelet/worker.py
+20
-5
No files found.
pipelet/pipeline.py
View file @
f3165541
...
...
@@ -434,7 +434,7 @@ class Pipeline:
"""
return
self
.
_curr_dirs
[
seg
]
def
get_output_f
ile
(
self
,
seg
):
def
get_output_f
n
(
self
,
seg
):
""" Return the segment output file
Parameters
...
...
pipelet/repository.py
View file @
f3165541
...
...
@@ -261,8 +261,8 @@ class LocalRepository(Repository):
['../test/seg_dbm_preproc.py']
"""
try
:
f
=
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
src_path
,
'seg_%s_%s.*'
%
(
seg
,
hook
))))[
0
]
f
=
[
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
src_path
,
'seg_%s_%s.*'
%
(
seg
,
hook
))))[
0
]
]
except
:
f
=
[]
if
len
(
f
)
==
0
:
...
...
pipelet/worker.py
View file @
f3165541
...
...
@@ -26,6 +26,20 @@ import traceback
import
signal
from
contextlib
import
closing
import
gc
import
logging
class
NullHandler
(
logging
.
Handler
):
""" Extension of the logging handler class.
"""
def
emit
(
self
,
record
):
""" Avoid warnings.
"""
pass
logger
=
logging
.
getLogger
(
"scheduler"
)
h
=
NullHandler
()
logger
.
addHandler
(
h
)
class
AbortError
(
Exception
):
""" Extension of the Exception class.
...
...
@@ -143,14 +157,15 @@ class Worker(object):
except
KeyError
:
print
'Fail to load object %s from file %s'
%
(
k
,
filename
)
def
write_res
(
self
,
task_output
):
def
write_res
(
self
,
seg
,
task_output
):
""" Pickle the result of the segment.
Parameters
----------
seg: string, segment name
task_output: task result.
"""
fn
=
self
.
pipe
.
get_output_fn
()
fn
=
self
.
pipe
.
get_output_fn
(
seg
)
with
closing
(
file
(
fn
,
'w'
))
as
f
:
r
=
pickle
.
dump
(
task_output
,
f
)
...
...
@@ -465,7 +480,7 @@ class InteractiveWorker(Worker):
except
:
res
=
None
task
.
task_output
=
res
self
.
write_res
(
res
)
self
.
write_res
(
seg
,
res
)
try
:
# Save params
var_key
=
glo
[
'var_key'
]
self
.
save_param
(
seg
,
glo
,
param_name
=
var_key
)
...
...
@@ -530,7 +545,7 @@ class ThreadWorker(Worker, threading.Thread):
except
:
res
=
None
task
.
task_output
=
res
self
.
write_res
(
res
)
self
.
write_res
(
seg
,
res
)
if
task
.
status
==
"failed"
:
return
task
try
:
# Save params
...
...
@@ -610,7 +625,7 @@ class ProcessWorker(Worker, Process):
except
:
res
=
None
task
.
task_output
=
res
self
.
write_res
(
res
)
self
.
write_res
(
seg
,
res
)
if
task
.
status
==
"failed"
:
return
task
try
:
# Save params
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment