Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Open sidebar
pipelet
Pipelet
Commits
b60b79b9
Commit
b60b79b9
authored
Nov 23, 2010
by
Maude Le Jeune
Browse files
doc updated
parent
a4ba521a
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
100 additions
and
4 deletions
+100
-4
pipelet/environment.py
pipelet/environment.py
+7
-0
pipelet/pipeline.py
pipelet/pipeline.py
+2
-0
pipelet/planckenv.py
pipelet/planckenv.py
+23
-0
pipelet/task.py
pipelet/task.py
+3
-1
pipelet/tracker.py
pipelet/tracker.py
+2
-1
pipelet/web.py
pipelet/web.py
+34
-2
pipelet/worker.py
pipelet/worker.py
+29
-0
No files found.
pipelet/environment.py
View file @
b60b79b9
...
...
@@ -137,10 +137,17 @@ class Environment(EnvironmentBase):
----------
w: a worker instance
"""
## worker instance
self
.
_worker
=
w
## segment input
self
.
seg_input
=
w
.
task
.
task_input
## list of temporary files
self
.
_tmpf
=
[]
## logger instance
self
.
logger
=
init_logger
(
self
.
_get_data_fn
(
""
),
self
.
_get_log_file
(),
level
=
[])
...
...
pipelet/pipeline.py
View file @
b60b79b9
...
...
@@ -137,6 +137,8 @@ class Pipeline:
## boolean, if true, turn the matplotlib backend to Agg
self
.
matplotlib
=
matplotlib
## boolean, if true, turn the matplotlib backend to Agg
self
.
matplotlib_interactive
=
matplotlib_interactive
## string, pipe name.
...
...
pipelet/planckenv.py
View file @
b60b79b9
...
...
@@ -2,6 +2,29 @@ from pipelet.environment import *
import
os
class
PlanckEnvironment
(
Environment
):
""" Planck environment.
Planck is an ESA satellite aimed at providing a final mapping of
the cosmological anisotropies of temperature and
polarisation. It has been launch on May 14th 2009.
Two instruments are onboard, the LFI and HFI.
The scientific analysis of Planck data is performed within two
environments called DPC (Data Processing Center). The HFI DPC is
organised as an international consortium and coordinated from the
IAP (Paris, France) with the help of the IAS (Orsay, France).
The HFI DPC infrastructure includes many things from documents to
data management including also development tools.
This pipelet environment adapts the data management
component (DMC) by extending the pipelet data filename utility.
It is planned to link the pipelet instances to the DPC web
browsing tools : http://pipelines.planck.fr
"""
def
my_function
(
self
):
""" My function do nothing
"""
...
...
pipelet/task.py
View file @
b60b79b9
...
...
@@ -50,9 +50,11 @@ class Task:
self
.
id
=
id
## String, store the result of the computation
self
.
param
=
None
## date_time object
s
## date_time object
self
.
queued_on
=
queued_on
## date_time object
self
.
begun_on
=
None
## date_time object
self
.
ended_on
=
None
## List of the task id whose output become the input of this task
self
.
parents
=
parents
...
...
pipelet/tracker.py
View file @
b60b79b9
...
...
@@ -167,7 +167,8 @@ class SqliteTracker(Tracker,threading.Thread):
threading
.
Thread
.
__init__
(
self
)
## pipeline object
self
.
pipe
=
pipe
## dictionnary of segment ids
self
.
seg_id_cache
=
{}
## string, sql filename
...
...
pipelet/web.py
View file @
b60b79b9
...
...
@@ -43,6 +43,8 @@ class Web:
The informations about the pipeline are retrieve from its data base file.
Pipelines are printed through a tree view with links to all segments and products.
"""
## boolean, Web object is exposed.
exposed
=
True
def
__init__
(
self
,
db_file
,
name
):
""" Initialize a web instance.
...
...
@@ -51,8 +53,9 @@ class Web:
----------
db_file: string, file name of the pipe data base.
"""
## string, pipeline short name
self
.
name
=
name
##
P
ipeline data base file.
##
string, p
ipeline data base file.
self
.
db_file
=
db_file
def
get_lst_tag
(
self
):
...
...
@@ -652,6 +655,7 @@ class Web:
html
+=
"""</ul></div></body></html>"""
return
html
## regular expression which forbids path
forbidden_path
=
re
.
compile
(
'.*\.\..*'
)
...
...
@@ -725,12 +729,25 @@ class PipeIndex():
found in a given path.
"""
def
__init__
(
self
):
""" Initialize the Web interface.
"""
## list of pipelines
self
.
pipelist
=
[]
for
pipe
,
dbfile
in
cherrypy
.
config
[
'Pipelines'
].
iteritems
():
self
.
pipelist
.
append
(
pipe
)
setattr
(
self
,
pipe
,
Web
(
os
.
path
.
expanduser
(
dbfile
),
pipe
))
def
which_sqlfile
(
self
,
path_info
):
""" Return data base filename
Parameters
----------
path_info: string, path read from configuration file .pipelet.
Returns
-------
string, data base file name.
"""
try
:
pipe
=
path_info
.
split
(
'/'
)[
1
]
if
pipe
in
self
.
pipelist
:
...
...
@@ -751,10 +768,25 @@ class PipeIndex():
return
html
class
MyApp
(
cherrypy
.
Application
):
""" Pipelet web interface application.
"""
def
__init__
(
self
,
domain
,
domaindir
,
script_name
=
""
,
config
=
None
):
""" Initialize the web application.
Parameters
----------
domain: string
domaindir: string
script_name: the URL "mount point" for the application
config: a dict of {path: pathconf} pairs, where 'pathconf' is itself a dict of {key: value} pairs
"""
## string
self
.
domain
=
domain
## string
self
.
domaindir
=
domaindir
## top most object of the nested set which define the URL space
root
=
PipeIndex
()
for
pipe
,
dbfile
in
cherrypy
.
config
[
'Pipelines'
].
iteritems
():
setattr
(
root
,
pipe
,
Web
(
os
.
path
.
expanduser
(
dbfile
)))
...
...
pipelet/worker.py
View file @
b60b79b9
...
...
@@ -87,9 +87,12 @@ class Worker(object):
self
.
_tmpf
=
[]
def
matplotlib_hook
(
self
):
""" Turn the matplotlib backend to Agg.
"""
if
self
.
pipe
.
matplotlib
:
import
matplotlib
matplotlib
.
use
(
'Agg'
)
def
run
(
self
):
""" Start the worker.
...
...
@@ -124,6 +127,21 @@ class Worker(object):
self
.
terminate
()
def
execute_task
(
self
,
task
):
""" Execute a task.
Make task directory and execute segment code string with the
environment feeded with the task input. Task output is read
from the environment after execution and updated to the task
object.
Parameters
----------
task: a task object
Returns
-------
a task objet (output updated)
"""
seg
=
task
.
seg
prod
=
task
.
task_input
self
.
make_dir
(
task
)
...
...
@@ -134,6 +152,17 @@ class Worker(object):
return
self
.
task
def
exec_code
(
self
,
code
,
glo
,
env
):
""" Execute code string with namespace.
Task status is updated after execution.
Exceptions are read from the environment log file.
Parameters
----------
code: string.
glo: namespace dictionnary
env: environment object.
"""
try
:
exec
(
code
,
glo
)
# We do not want to prevent Abortion
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment