Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
pipelet
Pipelet
Commits
3e6d1058
Commit
3e6d1058
authored
Aug 24, 2010
by
Maude Le Jeune
Browse files
Bug #777 code strings are saved when hashkey is computed
parent
093b17ad
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
70 additions
and
12 deletions
+70
-12
pipelet/pipeline.py
pipelet/pipeline.py
+54
-0
pipelet/scheduler.py
pipelet/scheduler.py
+7
-3
pipelet/worker.py
pipelet/worker.py
+9
-9
No files found.
pipelet/pipeline.py
View file @
3e6d1058
...
...
@@ -119,6 +119,10 @@ class Pipeline:
## dict, hash key corresponding to each segment
self
.
_hashes
=
{}
## dict, code string corresponding to each segment
self
.
_code
=
{}
self
.
_args
=
{}
self
.
compute_hash
()
## string, sql data base
...
...
@@ -353,6 +357,7 @@ class Pipeline:
self
.
_hashes
[
seg
]
=
h
currdir
=
path
.
join
(
currdir
,
'seg_%s_%s'
%
(
seg
,
h
))
self
.
_curr_dirs
[
seg
]
=
currdir
self
.
save_seg_code
(
seg
)
return
h
,
currdir
def
compute_hash
(
self
):
...
...
@@ -362,6 +367,55 @@ class Pipeline:
if
not
v
:
self
.
_compute_hash
(
k
)
def
save_seg_code
(
self
,
seg
):
""" Save code file into attribut
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
"""
for
k
,
v
in
dict
({
self
.
repository
.
get_code_file
(
seg
):
self
.
_code
,
self
.
repository
.
get_args_file
(
seg
):
self
.
_args
}).
items
():
file
=
k
fid
=
open
(
file
,
"r"
)
code
=
fid
.
read
()
fid
.
close
()
v
[
seg
]
=
code
def
get_seg_code
(
self
,
seg
):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
string
"""
return
self
.
_code
[
seg
]
def
get_seg_args
(
self
,
seg
):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
string
"""
return
self
.
_args
[
seg
]
def
get_full_seg_name
(
self
,
seg
):
""" Return segment full name (segment name + hashkey).
...
...
pipelet/scheduler.py
View file @
3e6d1058
...
...
@@ -273,9 +273,13 @@ class Scheduler():
if
not
path
.
exists
(
d
):
os
.
mkdir
(
d
)
r
=
self
.
pipe
.
repository
for
f
in
(
r
.
get_code_file
(
seg
),
r
.
get_args_file
(
seg
),
r
.
get_visu_file
(
seg
)):
if
f
and
path
.
exists
(
f
):
shutil
.
copy
(
f
,
d
)
p
=
self
.
pipe
for
k
,
v
in
dict
({
r
.
get_code_file
(
seg
):
p
.
get_seg_code
(
seg
),
r
.
get_args_file
(
seg
):
p
.
get_seg_args
(
seg
)}).
items
():
if
k
and
path
.
exists
(
k
):
dest
=
d
+
'/'
+
os
.
path
.
basename
(
k
)
fid
=
open
(
dest
,
"w"
)
fid
.
write
(
v
)
fid
.
close
()
parents
=
self
.
pipe
.
get_parents
(
seg
)
## parents segments
d
=
self
.
tracker
.
get_done
(
seg
)
## done tasks
...
...
pipelet/worker.py
View file @
3e6d1058
...
...
@@ -290,9 +290,9 @@ class Worker(object):
glo: dict, global dictionnary to update.
"""
code
_file
=
self
.
pipe
.
repository
.
get_args_file
(
self
.
task
.
seg
)
if
code
_file
:
exec
file
(
code_fil
e
,
glo
)
code
=
self
.
pipe
.
get_seg_args
(
self
.
task
.
seg
)
if
code
:
exec
(
cod
e
,
glo
)
else
:
print
"No arg file for seg %s"
%
self
.
task
.
seg
...
...
@@ -455,8 +455,8 @@ class InteractiveWorker(Worker):
self
.
make_dir
(
task
)
glo
=
self
.
prepare_env
(
task
)
self
.
task
=
task
code
_file
=
self
.
pipe
.
repository
.
get_code_fil
e
(
seg
)
exec
file
(
code
_file
,
glo
)
# Execute the segment
code
=
self
.
pipe
.
get_seg_cod
e
(
seg
)
exec
(
code
,
glo
)
try
:
# set product
res
=
glo
[
'res'
]
except
:
...
...
@@ -510,9 +510,9 @@ class ThreadWorker(Worker, threading.Thread):
prod
=
task
.
prod
self
.
make_dir
(
task
)
glo
=
self
.
prepare_env
(
task
)
code
_file
=
self
.
pipe
.
repository
.
get_code_fil
e
(
seg
)
code
=
self
.
pipe
.
get_seg_cod
e
(
seg
)
try
:
# Execute the segment
exec
file
(
code_fil
e
,
glo
)
exec
(
cod
e
,
glo
)
except
Exception
:
etype
,
value
,
tb
=
traceback
.
sys
.
exc_info
()
f
=
file
(
glo
[
'get_data_fn'
](
"segment_error.log"
),
"w"
)
...
...
@@ -587,9 +587,9 @@ class ProcessWorker(Worker, Process):
prod
=
task
.
prod
self
.
make_dir
(
task
)
glo
=
self
.
prepare_env
(
task
)
code
_file
=
self
.
pipe
.
repository
.
get_code_fil
e
(
seg
)
code
=
self
.
pipe
.
get_seg_cod
e
(
seg
)
try
:
# Execute the segment
exec
file
(
code_fil
e
,
glo
)
exec
(
cod
e
,
glo
)
except
AbortError
,
e
:
raise
e
except
Exception
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment