Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
pipelet
Pipelet
Commits
446dc62f
Commit
446dc62f
authored
Sep 03, 2010
by
Maude Le Jeune
Browse files
allow loading new environment
parent
aa6582e4
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
93 additions
and
13 deletions
+93
-13
README.org
README.org
+69
-4
pipelet/environment.py
pipelet/environment.py
+13
-4
pipelet/pipeline.py
pipelet/pipeline.py
+7
-1
pipelet/worker.py
pipelet/worker.py
+4
-4
No files found.
README.org
View file @
446dc62f
...
...
@@ -238,15 +238,12 @@ The segment code is executed in a specific environment that provides:
Loading another environment
** Running Pipes
*** The interactive mode
This mode has been designed to ease debugging. If P is an instance of
the pipeline object, the syntax reads :
...
...
@@ -361,6 +358,74 @@ All information will be retrieve, but with new identifiers.
** The hooking system
** Writing custom environments
The pipelet software provides a set of default utilities available
from the segment environment. It is possible to extend this default
environment or even re-write a completely new one.
*** Extending the default environment
The different environment utilities are actually methods of the class
Environment. It is possible to add new functionnalities by using the
python heritage mechanism:
File : myenvironment.py
from pipelet.environment import *
class MyEnvironment(Environment):
def my_function (self):
""" My function do nothing
"""
return
The pipelet engine objects (segments, tasks, pipeline) are available
from the worker attribut self._worker. See section "The pipelet
actors" for more details about the pipelet machinery.
*** Writing new environment
In order to start with a completely new environment, extend the base
environment:
File : myenvironment.py
from pipelet.environment import *
class MyEnvironment(EnvironmentBase):
def my_get_data_fn (self, x):
""" New name for get_data_fn
"""
return self._get_data_fn(x)
def _close(self, glo):
""" Post processing code
"""
return glo['seg_output']
From the base environment, the basic functionnalities for getting file
names and executing hook scripts are still available through:
- self._get_data_fn
- self._hook
The segment input argument is also stored in self._seg_input
The segment output argument has to be returned by the _close(self, glo)
method.
The pipelet engine objects (segments, tasks, pipeline) are available
from the worker attribut self._worker. See section "The pipelet
actors" for more details about the pipelet machinery.
*** Loading another environment
To load another environment, set the pipeline environment attribut
accordingly.
Pipeline(pipedot, codedir=, prefix=, env=MyEnvironment)
** Using custom dependency schemes
** Launching pipeweb behind apache
...
...
pipelet/environment.py
View file @
446dc62f
...
...
@@ -52,7 +52,8 @@ class EnvironmentBase():
----------
w: a worker instance
"""
self
.
_worker
=
w
self
.
_worker
=
w
self
.
_seg_input
=
w
.
task
.
task_input
def
_get_data_fn
(
self
,
x
):
""" Complete the filename with the path to the working
...
...
@@ -109,7 +110,14 @@ class EnvironmentBase():
print
"No hook file named %s for seg %s"
%
(
self
.
_worker
.
task
.
seg
,
hook_name
)
def
_close
(
self
,
glo
):
""" Return segment's output from dictionnary.
Parameters
----------
glo: segment execution dictionnary
"""
return
None
class
Environment
(
EnvironmentBase
):
""" Default segment's facilities.
...
...
@@ -354,7 +362,7 @@ class Environment(EnvironmentBase):
self
.
save_products
(
self
.
_worker
.
pipe
.
get_param_file
(
seg
),
glo
,
param_name
=
param_name
)
def
_close
(
self
,
glo
,
seg
):
def
_close
(
self
,
glo
):
""" Close environment.
This routine performs post processing like saving tag,
...
...
@@ -365,6 +373,7 @@ class Environment(EnvironmentBase):
glo : dict, the global dictionnary
seg : string, segment name
"""
seg
=
self
.
_worker
.
task
.
seg
self
.
clean_tmp
()
try
:
# Save params
var_key
=
glo
[
'lst_par'
]
...
...
@@ -380,4 +389,4 @@ class Environment(EnvironmentBase):
except
:
res
=
None
self
.
logger
.
info
(
"No segment output found, setting seg_output to None"
)
self
.
_worker
.
task
.
task_output
=
res
return
res
pipelet/pipeline.py
View file @
446dc62f
...
...
@@ -21,6 +21,8 @@ from repository import *
from
utils
import
get_hashkey
,
crc32
from
utils
import
str_file
,
flatten
from
contextlib
import
closing
from
environment
import
*
class
PipeException
(
Exception
):
""" Extension of exception class.
"""
...
...
@@ -88,7 +90,7 @@ class Pipeline:
in order to allow the execution in non-interactive environment.
"""
def
__init__
(
self
,
seg_list
,
code_dir
=
None
,
prefix
=
'./'
,
sqlfile
=
None
,
matplotlib
=
False
,
matplotlib_interactive
=
False
):
def
__init__
(
self
,
seg_list
,
code_dir
=
None
,
prefix
=
'./'
,
sqlfile
=
None
,
matplotlib
=
False
,
matplotlib_interactive
=
False
,
env
=
Environment
):
""" Initialize a pipeline.
Parameters
...
...
@@ -102,6 +104,7 @@ class Pipeline:
matplotlib: boolean, if true, turn the matplotlib backend to Agg
in order to allow the execution in non-interactive environment.
matplotlib_interactive: same thing for interactive workers
env: extension of the EnvironmentBase class.
"""
if
isinstance
(
seg_list
,
str
):
self
.
from_dot
(
seg_list
)
...
...
@@ -135,6 +138,9 @@ class Pipeline:
## string, pipe name.
self
.
name
=
"pipelet"
## Environment Base extension
self
.
env
=
env
def
__str__
(
self
):
""" Print the segment tree.
...
...
pipelet/worker.py
View file @
446dc62f
...
...
@@ -177,7 +177,7 @@ class Worker(object):
gc
.
collect
()
self
.
task
=
task
env
=
Environm
en
t
(
self
)
env
=
self
.
pipe
.
en
v
(
self
)
glo
=
{}
for
l
in
env
.
_get_namespace
():
bool
=
eval
(
"inspect.ismethod(env.%s)"
%
l
)
...
...
@@ -253,7 +253,7 @@ class InteractiveWorker(Worker):
self
.
task
=
task
code
=
self
.
pipe
.
repository
.
get_code_string
(
seg
)
exec
(
code
,
glo
)
env
.
_close
(
glo
,
seg
)
self
.
task
.
task_output
=
env
.
_close
(
glo
)
task
.
status
=
"done"
# set status
return
task
...
...
@@ -307,7 +307,7 @@ class ThreadWorker(Worker, threading.Thread):
if
task
.
status
==
"failed"
:
return
task
env
.
_close
(
glo
,
seg
)
self
.
task
.
task_output
=
env
.
_close
(
glo
)
return
task
...
...
@@ -371,7 +371,7 @@ class ProcessWorker(Worker, Process):
task
.
status
=
"failed"
else
:
task
.
status
=
"done"
# set status
env
.
_close
(
glo
,
seg
)
self
.
task
.
task_output
=
env
.
_close
(
glo
)
if
task
.
status
==
"failed"
:
return
task
return
task
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment