Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Docker-in-Docker (DinD) capabilities of public runners deactivated.
More info
Open sidebar
pipelet
Pipelet
Commits
eb9ebd38
Commit
eb9ebd38
authored
Jan 21, 2011
by
Maude Le Jeune
Browse files
working on bug 1269
parent
33fdd1a8
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
53 additions
and
33 deletions
+53
-33
pipelet/scheduler.py
pipelet/scheduler.py
+19
-18
pipelet/tracker.py
pipelet/tracker.py
+5
-1
pipelet/worker.py
pipelet/worker.py
+26
-11
test/multiplex/main.py
test/multiplex/main.py
+3
-3
No files found.
pipelet/scheduler.py
View file @
eb9ebd38
...
...
@@ -231,9 +231,8 @@ class Scheduler():
failed
=
self
.
tracker
.
get_failed
(
seg
)
# failed tasks
failed_prod
=
[
t
.
task_input
for
t
in
failed
]
# failed products
dstrp
=
[
str
(
sorted
(
t
.
parents
).
join
)
for
t
in
d
]
logger
.
info
(
'Found %d done tasks segment %s'
%
(
len
(
d
),
seg
))
logger
.
info
(
'Found %d failed tasks segment %s'
%
(
len
(
failed
),
seg
))
## task list to queue
...
...
@@ -243,29 +242,31 @@ class Scheduler():
l
=
[
Task
(
seg
)]
logger
.
info
(
'Found %d tasks in seg %s to get done'
%
(
len
(
l
),
seg
))
for
t
in
l
:
# foreach task of the task list
# print seg
# print t.parents
# print t.task_input
if
(
t
.
task_input
in
failed_prod
):
# done but failed
#logger.debug("task already done and failed in seg %s"%seg)
continue
if
not
(
t
.
task_input
in
dprod
):
# not done
if
(
not
(
t
.
task_input
in
dprod
)):
#not done wrt task_input
#logger.debug("pushing 1 task for seg %s"%seg)
self
.
put_task
(
t
)
else
:
# done
else
:
# done
(or not because parents changed)
#logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
ind
=
dprod
.
index
(
t
.
task_input
)
t
=
d
[
ind
];
#try:
#logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
#except TypeError:
#logger.debug("No result to load from previously done task in segment %s"%(seg))
self
.
products_list
.
push
(
t
)
self
.
nb_success
=
self
.
nb_success
+
1
strp
=
str
(
sorted
(
t
.
parents
))
try
:
# fetch the result of the task and store it in the task list
#ind = dprod.index(t.task_input)
ind
=
dstrp
.
index
(
strp
)
t
=
d
[
ind
];
#try:
#logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
#except TypeError:
#logger.debug("No result to load from previously done task in segment %s"%(seg))
self
.
products_list
.
push
(
t
)
self
.
nb_success
=
self
.
nb_success
+
1
except
ValueError
:
## parents do not match parents of the done list
self
.
put_task
(
t
)
logger
.
debug
(
"nb_success starts at %d for segment %s"
%
(
self
.
nb_success
,
seg
))
...
...
pipelet/tracker.py
View file @
eb9ebd38
...
...
@@ -299,7 +299,11 @@ class SqliteTracker(Tracker,threading.Thread):
for
t
in
ts
:
task_output
=
pickle
.
loads
(
t
[
'output'
])
task_input
=
pickle
.
loads
(
t
[
'input'
])
l
.
append
(
task
.
Task
(
seg
,
status
=
status
,
id
=
t
[
'task_id'
],
task_input
=
task_input
,
task_output
=
task_output
))
tid
=
t
[
'task_id'
]
with
self
.
conn_lock
:
p
=
self
.
conn
.
execute
(
'select father_id from task_relations where child_id=?'
,
((
tid
,)))
parents
=
[
e
[
0
]
for
e
in
p
]
l
.
append
(
task
.
Task
(
seg
,
status
=
status
,
id
=
tid
,
task_input
=
task_input
,
task_output
=
task_output
,
parents
=
parents
))
return
l
def
add_queued
(
self
,
t
):
...
...
pipelet/worker.py
View file @
eb9ebd38
...
...
@@ -183,6 +183,23 @@ class Worker(object):
self
.
scheduler
.
check_out
()
def
_make_dir
(
self
,
d
):
""" Create a directory named d
Parameters
----------
d: string, dir name
"""
os
.
umask
(
18
)
try
:
os
.
mkdir
(
d
)
return
1
except
OSError
,
e
:
if
e
.
errno
==
17
:
# file exist
return
0
else
:
raise
e
def
make_dir
(
self
,
task
):
""" Create a directory for a given task.
...
...
@@ -194,20 +211,18 @@ class Worker(object):
prod
=
task
.
task_input
# make directories
os
.
umask
(
18
)
d
=
self
.
pipe
.
get_data_dir
(
seg
)
try
:
os
.
mkdir
(
d
)
except
OSError
,
e
:
if
e
.
errno
!=
17
:
# file exist
raise
e
success
=
self
.
_make_dir
(
d
)
if
not
success
:
logger
.
info
(
"cannot create directory %s - file exist"
%
d
)
## retry with some suffix
## but : what happend with glob_seb and get_data_fn ?
if
not
prod
==
None
:
d
=
self
.
pipe
.
get_data_dir
(
seg
,
prod
)
try
:
os
.
mkdir
(
d
)
except
OSError
,
e
:
if
e
.
errno
!=
17
:
# file exist
raise
e
success
=
self
.
_make_dir
(
d
)
if
not
success
:
logger
.
info
(
"cannot create directory %s - file exist"
%
d
)
def
prepare_env
(
self
,
task
):
""" Build the segment global execution environment for a given task.
...
...
test/multiplex/main.py
View file @
eb9ebd38
...
...
@@ -11,8 +11,8 @@ b-> c;
"""
P
=
pipeline
.
Pipeline
(
pipedot
,
code_dir
=
'./'
,
prefix
=
'./'
)
P
.
push
(
a
=
list
(
arange
(
3
)))
P
.
push
(
b
=
[
1
,
2
,
3
])
P
.
push
(
a
=
list
(
arange
(
4
)))
P
.
push
(
b
=
[
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
])
W
,
t
=
launch_interactive
(
P
,
log_level
=
0
)
W
,
t
=
launch_interactive
(
P
,
log_level
=
logging
.
DEBUG
)
W
.
run
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment