Commit 6ba207fb authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

doc API

parent 9a228d49
:mod:`environment` Module
-------------------
.. py:module:: environment
.. automodule:: pipelet.environment
:members: Environment
:undoc-members:
:show-inheritance:
:mod:`launchers` Module
-------------------
.. automodule:: pipelet.launchers
:members:
:undoc-members:
:show-inheritance:
pipelet
=======
.. toctree::
:maxdepth: 4
pipelet
:mod:`directive` Module
-------------------
.. automodule:: pipelet.directive
:members:
:undoc-members:
:show-inheritance:
.. automodule:: pipelet.multiplex
:members:
:undoc-members:
:show-inheritance:
pipelet Package
===============
:mod:`pipelet` Package
----------------------
.. automodule:: pipelet
:members:
:undoc-members:
:show-inheritance:
.. toctree::
:maxdepth: 2
environment module <environment>
repository module <repository>
pipeline module <pipeline>
directive module <multiplex>
launchers module <launchers>
scheduler module <scheduler>
worker module <worker>
task module <task>
tracker module <tracker>
utils module <utils>
:mod:`pipeline` Module
-------------------
.. automodule:: pipelet.pipeline
:members:
:undoc-members:
:show-inheritance:
:mod:`repository` Module
-------------------
.. automodule:: pipelet.repository
:members: Repository, LocalRepository
:undoc-members:
:show-inheritance:
:mod:`scheduler` Module
-------------------
.. automodule:: pipelet.scheduler
:members:
:undoc-members:
:show-inheritance:
:mod:`task` Module
-------------------
.. automodule:: pipelet.task
:members:
:undoc-members:
:show-inheritance:
:mod:`tracker` Module
-------------------
.. automodule:: pipelet.tracker
:members:
:undoc-members:
:show-inheritance:
Advanced usage
--------------
This section describe more complicated (and useful) features and
requires good familiarity with the concept introduced in the previous section.
Multiplex directive
~~~~~~~~~~~~~~~~~~~
The default behavior can be altered by specifying a ``#multiplex``
directive in the commentary of the segment code. If several multiplex
directives are present in the segment code the last one is retained.
The multiplex directive can be one of:
+ ``#multiplex cross_prod`` : default behavior, return the Cartesian product.
+ ``#multiplex union`` : make the union of the inputs
Moreover the ``#multiplex cross_prod`` directive admits filtering and
grouping by class similarly to SQL requests::
#multiplex cross_prod where "condition" group_by "class_function"
``condition`` and ``class_function`` are python code evaluated for each element
of the product set.
The argument of ``where`` is a condition. The element will be part of the
input set only if it evaluates to ``True``.
The ``group_by`` directive groups elements into class according to the
result of the evaluation of the given class function. The input set
contains all the resulting class. For example, if the function is a
constant, the input set will contain only one element: the class
containing all elements.
During the evaluation, the values of the tuple elements are accessible
as variable wearing the name of the corresponding parents.
Given the Cartesian product set::
[('Lancelot','the Brave'), ('Lancelot','the Pure'), ('Galahad','the Brave'), ('Galahad','the Pure')]
one can use ::
#multiplex cross_prod where "quality=='the Brave'"
to get 2 instances of the following segment (``melt``) running on ``('Lancelot','the Brave'), ('Galahad','the Brave')``
or::
#multiplex cross_prod group_by "knights"
to get 2 instances of the ``melt`` segment running on ``('Lancelot'), ('Galahad')``
or::
#multiplex cross_prod group_by "0"
to get 1 instance of the ``melt`` segment running on: (0)
Note that to make use of ``group_by``, elements of the output set have to be
hashable.
Another caution on the use of group: segment input data type is no
longer a dictionary in those cases as the original tuple is
lost and replaced by the result of the class function.
See section :ref:`The segment environment<my-reference-label>` section for more details.
Depend directive
~~~~~~~~~~~~~~~~
As explained in the introduction section, Pipelet offers the
possibility to spare CPU time by saving intermediate products on disk.
We call intermediate products the input/output data files of the
different segments.
Each segment repository is identified by a unique key which depends
on:
- the segment processing code and parameters (segment and hook
scripts)
- the input data (identified from the key of the parent segments)
Every change made on a segment (new parameter or new parent) will then
give a different key, and tell the Pipelet engine to compute a new
segment instance.
It is possible to add some external dependencies to the key
computation using the depend directive::
#depend file1 file2
At the very beginning of the pipeline execution, all dependencies will
be stored, to prevent any change (code edition) between the key
computation and actual processing.
Note that this mechanism works only for segment and hook
scripts. External dependencies are also read as the beginning of the
pipeline execution, but only used for the key computation.
Database reconstruction
~~~~~~~~~~~~~~~~~~~~~~~
In case of unfortunate lost of the pipeline sql data base, it is
possible to reconstruct it from the disk ::
import pipelet
pipelet.utils.rebuild_db_from_disk (prefix, sqlfile)
All information will be retrieve, but with new identifiers.
The hooking system
~~~~~~~~~~~~~~~~~~
As described in the :ref:`The segment environment<my-reference-label>` section, Pipelet supports
an hooking system which allows the use of generic processing code, and
code sectioning.
Let's consider a set of instructions that have to be systematically
applied at the end of a segment (post processing), one can put those
instruction in the separate script file named for example
``segname_postproc.py`` and calls the hook function::
hook('postproc', globals())
A specific dictionary can be passed to the hook script to avoid
confusion.
The hook scripts are included into the hash key computation.
Segment script repository
~~~~~~~~~~~~~~~~~~~~~~~~~
Local repository
****************
By default, segment scripts are read from a local directory, specified
at the pipeline initialization with the parameter named ``code_dir``::
from pipelet.pipeline import Pipeline
P = Pipeline(pipedot, code_dir="./", prefix="./")
The segment script contents are immediatly stored, to prevent from
any modification between the pipeline start time and the actual execution
of each segment.
It is generally a good idea to make this directory controlled by an
RCS, to ease the reproducibility of the pipeline (even if the pipelet
engine makes a copy of the segment script in the segment output
directory).
If using Git, the revision number will be stored at the beginning of
the copy of the segment script.
Writing custom environments
~~~~~~~~~~~~~~~~~~~~~~~~~~~
The Pipelet software provides a set of default utilities available
from the segment environment. It is possible to extend this default
environment or even re-write a completely new one.
Extending the default environment
*********************************
The different environment utilities are actually methods of the class
Environment. It is possible to add new functionalities by using the
python heritage mechanism:
File : ``myenvironment.py``::
from pipelet.environment import *
class MyEnvironment(Environment):
def my_function (self):
""" My function do nothing
"""
return
The Pipelet engine objects (segments, tasks, pipeline) are available
from the worker attribut ``self._worker``. See the :ref:`The pipelet actors<act-sec>` section for more details about the Pipelet machinery.
Writing new environment
***********************
In order to start with a completely new environment, extend the base
environment:
File : ``myenvironment.py``::
from pipelet.environment import *
class MyEnvironment(EnvironmentBase):
def my_get_data_fn (self, x):
""" New name for get_data_fn
"""
return self._get_data_fn(x)
def _close(self, glo):
""" Post processing code
"""
return glo['seg_output']
From the base environment, the basic functionalities for getting file
names and executing hook scripts are still available through:
- ``self._get_data_fn``
- ``self._hook``
The segment input argument is also stored in ``self._seg_input``
The segment output argument has to be returned by the ``_close(self, glo)``
method.
The pipelet engine objects (segments, tasks, pipeline) are available
from the worker attribut ``self._worker``. See doxygen documentation for
more details about the Pipelet machinery.
Loading another environment
***************************
To load another environment, set the pipeline environment attribute
accordingly. ::
Pipeline(pipedot, codedir=, prefix=, env=MyEnvironment)
Writing custom main files
~~~~~~~~~~~~~~~~~~~~~~~~~
Launching pipeweb behind apache
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Pipeweb use the cherrypy web framework server and can be run behind an
apache web server which brings essentially two advantages:
- access to *_mod apache facilities (https, gzip, authentication facilities ...).
- faster static files serving (the pipelet application actually use
quite few of them so the actual gain is marginal, getting the actual
data served by apache may be feasible but is not planned yet).
There is actually several way of doing so, the `cherrypy <http://tools.cherrypy.org/wiki/BehindApache>`_ documentation
giving hints about each. We describe here an example case using
mod_rewrite and virtual hosting.
1. The first thing we need is a working installation of apache with
``mod_rewrite`` activated. On a debian-like distribution this usually
obtain by::
sudo a2enmod rewrite
sudo a2enmod proxy
sudo a2enmod proxy_http
2. We then configure apache to rewrite request to the cherrypy
application except for the static files of the application that
will be served directly. Here is a sample configuration file for a
dedicated virtual host named pipeweb with pipelet installed under
``/usr/local/lib/python2.6/dist-packages/`` .::
<VirtualHost pipeweb:80>
ServerAdmin pipeweb_admin@localhost
DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet
# ErrorLog /some/custom/error_file.log
# CustomLog /some/custom/access_file.log common
RewriteEngine on
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
</VirtualHost>
3. Restart apache and start the pipeweb application to serve on the
specified address and port:
``pipeweb start -H 127.0.0.1``
There is also some possibility to start the application on demand
using a cgi script like::
#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1')
To have it executed when the proxy detect the absence of the application::
<VirtualHost pipeweb:80>
#...
ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
RewriteEngine on
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
ErrorDocument 503 /pipeweb_autostart.cgi
#...
</VirtualHost>
You may want to adjust ownership and suid of the
``pipeweb_autostart.cgi`` script so that it executes with the correct
rights.
Pipeweb handles access rights using per pipeline ACL registered in the
database file. It support Basic and Digest http authentication. When
deploying the pipeweb interface in a production environment, one may
want to defer a part of the authorization process to external and
potentially more secure systems. The pipeweb behavior in term of
authorization is controlled by the ``-A`` option that accept the
following arguments:
- ``Digest`` (default) Authenticate users via HTTP Digest authentication
according to the user:passwd list stored in the database.
- ``Basic`` Authenticate users via HTTP Basic (clear text) authentication according to the user:passwd list stored in the database.
- ``ACL`` Check the access rights of otherwise authenticated users according to the user list stored in the database.
- ``None`` Do no check. (Defer the whole authentication/authorization process to the proxy.)
Here is a complete configuration sample making of https, basic
authentication, and per pipeline ACL to secure data browsing.::
<VirtualHost _default_:443>
ServerAdmin pipeweb_admin@localhost
DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet
# ErrorLog /some/custom/error_file.log
# CustomLog /some/custom/access_file.log common
# Adjust the ssl configuration to fit your needs
SSLEngine on
SSLCertificateFile /etc/ssl/certs/ssl-cert-snakeoil.pem
SSLCertificateKeyFile /etc/ssl/private/ssl-cert-snakeoil.key
# This handles authentication and access to the index page
# Access right checking to the various registered pipelines
# is left to pipeweb
<Location />
#Replace with Any suitable authentication system
AuthName "pipeweb"
AuthType Basic
AuthUserFile /etc/apache2/pipelet.pwd
require valid-user
</Location>
ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
RewriteEngine on
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
ErrorDocument 503 /pipeweb_autostart.cgi
</VirtualHost>
And the corresponding cgi script::
#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1 -A ACL')
:mod:`utils` Module
-------------------
.. automodule:: pipelet.utils
:members:
:undoc-members:
:show-inheritance:
.. automodule:: pipelet.db_utils
:members:
:undoc-members:
:show-inheritance:
:mod:`worker` Module
-------------------
.. automodule:: pipelet.worker
:members:
:undoc-members:
:show-inheritance:
......@@ -21,3 +21,15 @@ from task import *
from environment import *
__version__='9a228d498a98e442020758e6976a12c692c2c774'
__version__='9a228d498a98e442020758e6976a12c692c2c774'
__version__='9a228d498a98e442020758e6976a12c692c2c774'
__version__='9a228d498a98e442020758e6976a12c692c2c774'
__version__='9a228d498a98e442020758e6976a12c692c2c774'
__version__='9a228d498a98e442020758e6976a12c692c2c774'
......@@ -43,10 +43,7 @@ def deltask(db_file, lst_task, report_only=False):
Delete all products directories of a tasks instance.
Parameters
----------
db_file: string name of the database
lst_task: list of task ids
"""
import shutil
......@@ -95,9 +92,7 @@ def get_lst_tag (db_file):
Tags are ; separated in the db.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
......@@ -115,9 +110,7 @@ def get_lst_tag (db_file):
def get_lst_seg (db_file):
""" Return the list of segment name
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
......@@ -133,9 +126,7 @@ def get_lst_date (db_file):
Date strings are picked from queued_on field corresponding to the
first task of each segment.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
......@@ -149,10 +140,7 @@ def add_tag (db_file, segid, tag):
Check that the tag do not exist yet for this id
Parameters
----------
segid: string. List of seg_id - separated. (- at the end of string also)
tag: string. Tags are ; separated in the db.
"""
seglst = segid.split('-')
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -176,9 +164,7 @@ def get_last(db_file):
Return a seg-id
Parameters
----------
db_file: string. Database file name.
"""
conn = sqlite3.connect(db_file, check_same_thread=True)
conn.text_factory=str
......@@ -200,9 +186,7 @@ def _update_meta(fn, str_tag):
def del_tag (db_file, tag):
""" Delete tag from the database
Parameters
----------
tag: string.
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
......@@ -225,9 +209,7 @@ def _delseg(db_file, lst_seg):
Delete all segments and products directories of a pipeline instance.
Parameters
----------
lst_seg: integer list
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -261,9 +243,7 @@ def _deltask(db_file, lst_task):
Delete all products directories of a tasks instance.
Parameters
----------
lst_tag: string, list of pipe id '-' separated
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -291,13 +271,7 @@ def _get_fathers(db_file, segid):
id for a given segment instance. This is used to print a pipeline
tree view with all dependencies.
Parameters
----------
segid: id of the leaf segment.
Returns
-------
list of segid, for the upstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -317,13 +291,7 @@ def _get_children(db_file, segid):
Return a list which contains all downstream segment instances
id for a given segment instance. This is used to delete all dependencies.
Parameters
----------
segid: id of the segment.
Returns
-------
list of segid, for the downstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -344,13 +312,7 @@ def _get_children_task(db_file, taskid):
Return a list which contains all downstream task instances
id for a given task instance. This is used to delete all dependencies.
Parameters
----------
taskid: id of the task
Returns
-------
list of taskid, for the downstream task instances.
"""
lstid = [int(taskid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
......
......@@ -24,9 +24,6 @@ class Directive:
Directive names and value are class attributs.