Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Docker-in-Docker (DinD) capabilities of public runners deactivated.
More info
Open sidebar
pipelet
Pipelet
Commits
a3c6e084
Commit
a3c6e084
authored
Sep 20, 2018
by
Betoule Marc
Browse files
Add the possibility to register jobs for a limited number of tasksx
parent
18617c40
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
71 additions
and
13 deletions
+71
-13
pipelet/launchers.py
pipelet/launchers.py
+3
-2
pipelet/worker.py
pipelet/worker.py
+17
-11
test/limitted_number_of_jobs/main.py
test/limitted_number_of_jobs/main.py
+51
-0
No files found.
pipelet/launchers.py
View file @
a3c6e084
...
...
@@ -556,6 +556,8 @@ if __name__ == "__main__":
help
=
'authentication key'
,
default
=
'secret'
)
parser
.
add_option
(
'-l'
,
'--logfile'
,
metavar
=
'logfile'
,
help
=
'worker log filename'
)
parser
.
add_option
(
'-N'
,
'--maximum-number-of-tasks'
,
dest
=
'max_number_tasks'
,
default
=
None
,
type
=
int
,
help
=
'Make worker exit gracefully after a fix number of jobs'
)
# parser.add_option('-L', '--log-level', metavar='level',
# type=
# help='worker log filename')
...
...
@@ -568,5 +570,4 @@ if __name__ == "__main__":
w
=
worker
.
ProcessWorker
(
address
=
(
options
.
host
,
options
.
port
),
authkey
=
options
.
secret
,
logger
=
wl
)
sys
.
argv
=
[
sys
.
argv
[
0
]]
# do not propage command line argument to script
w
.
run
()
w
.
run
(
maximum_task_number
=
options
.
max_number_tasks
)
pipelet/worker.py
View file @
a3c6e084
...
...
@@ -96,21 +96,26 @@ class Worker(object):
# import cProfile
# cProfile.runctx('self._run()', globals(), locals(),'workprof')
def
run
(
self
):
"""
Start the worker.
def
run
(
self
,
maximum_task_number
=
None
):
"""Start the worker.
The worker executes the tasks given by its scheduler until the
queue is empty. The task status are updated after completion
or failure.
queue is empty or the maximum number of task (if specified) is
reached. The task status are updated after completion or
failure.
"""
self
.
logger
.
info
(
'checking in ...'
)
if
not
self
.
scheduler
.
check_in
():
self
.
logger
.
error
(
"check_in failed : stopping"
)
return
self
.
logger
.
info
(
'checked in.'
)
if
maximum_task_number
is
not
None
:
self
.
logger
.
info
(
'checked in for a maximum of %d tasks'
%
maximum_task_number
)
else
:
self
.
logger
.
info
(
'checked in.'
)
self
.
matplotlib_hook
()
n
=
0
try
:
for
task
in
iter
(
self
.
scheduler
.
get_task
,
None
):
...
...
@@ -124,6 +129,9 @@ class Worker(object):
self
.
logger
.
info
(
'task failed'
)
self
.
scheduler
.
task_failed
(
task
)
n
=
n
+
1
if
(
maximum_task_number
is
not
None
)
and
(
n
==
maximum_task_number
):
self
.
logger
.
info
(
'Maximum number of jobs reached (%d). Stopping.'
%
n
)
break
self
.
task
=
None
self
.
logger
.
info
(
"%d jobs completed"
%
n
)
except
AbortError
,
e
:
...
...
@@ -139,8 +147,6 @@ class Worker(object):
environment feeded with the task input. Task output is read
from the environment after execution and updated to the task
object.
"""
seg
=
task
.
seg
prod
=
task
.
task_input
...
...
@@ -307,9 +313,9 @@ class ProcessWorker(Worker, Process):
self
.
nice
=
nice
def
run
(
self
):
def
run
(
self
,
**
keys
):
os
.
nice
(
self
.
nice
)
Worker
.
run
(
self
)
Worker
.
run
(
self
,
**
keys
)
class
SandBox
(
Worker
):
""" Provide a mean to come back in the execution state of a segment
...
...
test/limitted_number_of_jobs/main.py
0 → 100644
View file @
a3c6e084
""" Demonstration of """
import
pipelet.pipeline
as
pipeline
from
pipelet.launchers
import
launch_interactive
,
SchedulerManager
from
pipelet
import
scheduler
import
os
import
logging
import
subprocess
from
pylab
import
*
pipedot
=
"""
a->b;
"""
for
i
in
[
'a'
,
'b'
]:
segfn
=
"%s.py"
%
i
with
open
(
segfn
,
"w"
)
as
fid
:
fid
.
write
(
"""
import time
inp = get_input()
par = get_parent()
out = inp
time.sleep(5)
set_output(out)
"""
)
P
=
pipeline
.
Pipeline
(
pipedot
,
code_dir
=
'./'
,
prefix
=
'./'
)
P
.
push
(
a
=
list
(
arange
(
12
)))
address
=
(
''
,
50000
)
authkey
=
'secret'
# Launching the scheduler
s
=
scheduler
.
Scheduler
(
P
)
SchedulerManager
.
register
(
'get_scheduler'
,
callable
=
lambda
:
s
)
mgr
=
SchedulerManager
(
address
=
address
,
authkey
=
authkey
)
mgr
.
start
()
processlist
=
[]
job
=
[
"python"
,
"-m"
,
"pipelet.launchers"
,
"-H"
,
str
(
address
[
0
]),
"-p"
,
str
(
address
[
1
]),
"-s"
,
str
(
authkey
),
"-l"
,
'toto.log'
,
"-N"
,
"1"
]
print
' '
.
join
(
job
)
w
=
subprocess
.
Popen
(
job
)
processlist
.
append
(
w
)
sched_proxy
=
mgr
.
get_scheduler
()
sched_proxy
.
run
()
for
w
in
processlist
:
w
.
wait
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment