Commit f5e65823 authored by Marc Betoule's avatar Marc Betoule
Browse files

Merge branch 'newbase' of git@gitorious.org:pipelet/pipelet into newbase

parents 339c4e0e 2bf70409
Pipelet is a free framework allowing for creation, manipulation,
execution and browsing of scientific data processing pipelines. It
provides:
WARNING Pipelet is currently under active development and highly
unstable. There is good chance that it becomes incompatible from one
commit to another.
Pipelet is a free framework allowing for creation, manipulation,
execution and browsing of scientific data processing pipelines. It
provides:
+ easy chaining of interdependent elementary tasks,
+ web access to data products,
+ branch handling,
......@@ -76,6 +76,14 @@ pipeweb start
4. You should be able to browse the result on the web page
http://localhost:8080
*** Getting a new pipe framework
To get a new pipe framework, with sample main and segment scripts :
pipeutils -c pipename
** Writing Pipes
*** Pipeline architecture
......@@ -156,6 +164,10 @@ directive can be found the last one is retained.
- @multiplex gather : The input set contains one tuple of all the ouputs.
*** Depend directive
*** Orphan segments
TODO TBD
......@@ -170,7 +182,27 @@ actual data elsewhere, but you will loose the benefit of automated
versionning which proves to be quite convenient.
The storage is organized as follows:
all data are stored below a root
- all pipeline instances are stored below a root which corresponds to
the prefix parameter of the Pipeline object.
/prefix/
- all segment meta data are stored below a root which name corresponds
to an unique match of the segment code.
/prefix/seg_segname_YFLJ65/
- Segment's meta data are:
- a copy of the segment python script
- a copy of all segment hook scripts
- a parameter file (.args) which contains segment parameters value
- a meta data file (.meta) which contains some extra meta data
- all segment instances data and meta data are stored in a specific subdirectory
which name corresponds to a string representation of its input
/prefix/seg_segname_YFLJ65/data/1/
- if there is a single segment instance, then data are stored in
/prefix/seg_segname_YFLJ65/data/
- If a segment has at least one parent, its root will be located below
one of its parent's one :
/prefix/seg_segname_YFLJ65/seg_segname2_PLMBH9/
- etc...
*** The segment environment
......@@ -178,16 +210,20 @@ The segment code is executed in a specific environment that provides:
1. access to the segment input and output
- seg_input: this variable is a dictionnary containing the input of the segment
- get_input():
- get_input():
- seg_output: this variable has to be set to a list containing the
2. Functionnalities to use the automated hierarchical data storage system.
- get_data_fn(basename): complete the filename with the path to the working directory.
- glob_seg():
- glob_seg(regexp, seg): return the list of filename matching regexp from segment seg
- get_tmp_fn(): return a temporary filename.
3. Various convenient functionalities
3. Functionnalities to use the automated parameters handling
- var_key: list of parameter names of the segment
- var_tag: list of parameter names which will be made visible from the web interface
- load_param(seg, var_names)
4. Various convenient functionalities
- save_products(filename=', var_names='*'): use pickle to save a
part of a given namespace.
- load_products(filename, var_names): update the namespace by
......@@ -195,33 +231,26 @@ The segment code is executed in a specific environment that provides:
- logged_subprocess(lst_args): execute a subprocess and log its output.
- log is a standard logging.Logger object that can be used to log the processing
4. Hooking support
5. Hooking support
Pipelet enables you to write reusable generic
segments by providing a hooking system via the hook function.
hook (hookname, globals()): execute Python script ‘seg_segname_hookname.py’ and update the namespace.
fullname = get_tmp_fn (): return a temporary filename
lst_file = glob_seg (regexp, seg): return the list of filename matching regexp from segment seg
Parameter tools
output : the input value of the downstream segment.
load_param (seg, globals(), lst_par) : update the namespace with parameters of segment seg
save_products (filename, globals(), lst_par):
load_products (filename, globals(), lst_par): update the namespace by unpickling requested object from the file.
Code dependency tools
Loading another environment
Loading another environment
*** Depend directive
** Running Pipes
*** The interactive mode
This mode has been designed to ease debugging. If P is an instance of the pipeline object, the syntax reads :
This mode has been designed to ease debugging. If P is an instance of
the pipeline object, the syntax reads :
from pipelet.launchers import launch_interactive
w, t = launch_interactive(P)
......@@ -231,24 +260,106 @@ In this mode, each tasks will be computed in a sequential way.
Do not hesitate to invoque the Python debugger from IPython : %pdb
*** The process mode
In this mode, one can run simultaneous tasks (if the pipe scheme
allows to do so).
The number of subprocess is set by the N parameter :
from pipelet.launchers import launch_process
launch_process(P, N)
*** The batch mode
In this mode, one can submit some batch jobs to execute the tasks.
The number of job is set by the N parameter :
from pipelet.launchers import launch_pbs
launch_pbs(P, N , address=(os.environ['HOST'],50000))
** Browsing Pipes
*** The pipelet webserver and ACL
The pipelet webserver allows the browsing of multiple pipelines.
Each pipeline has to be register using :
pipeweb track <shortname> sqlfile
As the pipeline browsing implies a disk parsing, some basic security
has to be set also. All users have to be register with a specific access
level (1 for read-only access, and 2 for write access).
pipeutils -a <username> -l 2 sqlfile
*** The pipelet webserver
Start the web server using :
pipeweb start
Then the web application will be available on the web page http://localhost:8080
*** The web application
- The various views (index, pipeline, segment, tasks)
-
*** ACL
In order to ease the comparison of different processing, the web
interface displays various views of the pipeline data :
**** The index page
The index page display a tree view of all pipeline instances. Each
segment may be expand or reduce via the +/- buttons.
The parameters used in each segments are resumed and displayed with
the date of execution and the number of related tasks order by
status.
A checkbox allows to performed operation on multiple segments :
- deletion : to clean unwanted data
- tag : to tag remarkable data
The filter panel allows to display the segment instances wrt 2
criterions :
- tag
- date of execution
**** The code page
Each segment names is a link to its code page. From this page the user
can view all python scripts code which have been applied to the data.
The tree view is reduced to the current segment and its related
parents.
The root path corresponding to the data storage is also displayed.
**** The product page
The number of related tasks, order by status, is a link to the product
pages, where the data can be directly displayed (if images, or text
files) or downloaded.
From this page it is also possible to delete a specific product and
its dependencies.
**** The log page
The log page can be acceed via the log button of the filter panel.
Logs are ordered by date.
* Advanced usage
** Database reconstruction
** The hooking system
In case of unfortunate lost of the pipeline sql data base, it is
possible to reconstruct it from the disk :
import pipelet
pipelet.utils.rebuild_db_from_disk (prefix, sqlfile)
All information will be retrieve, but with new identifiers.
** The hooking system
** Writing custom environments
** Using custom dependency schemes
......@@ -259,3 +370,13 @@ Pipeweb use the cherrypy web framework server and can be run behind an
apache webserver which brings essentially two advantages:
- https support.
- faster static files serving.
* The pipelet actors
** The Repository object
** The Pipeline object
** The Task object
** The Scheduler object
** The Tracker object
** The Worker object
......@@ -51,7 +51,14 @@ import sys
import datetime
def get_log_file (pipe, name):
"""
""" Return log file name
Current date time is prepend to the log file name.
Parameters
----------
pipe: pipe instance
name: log file short name
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%m%d_%H%M%s%f")
......
......@@ -113,7 +113,12 @@ class Pipeline:
self.repository = LocalRepository(code_dir)
## string, indicates where to save the pipeline products.
self._prefix = prefix
if not os.path.exists(prefix):
os.mkdir(prefix)
logdir = os.path.join(prefix, "log")
if not os.path.exists(logdir):
os.mkdir(logdir)
## dict, directory corresponding to each segment
self._curr_dirs = {}
## dict, hash key corresponding to each segment
......@@ -434,19 +439,6 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_fn(self, seg, prod=None):
""" Return the segment output file
Parameters
----------
seg : string, segment name.
Returns
-------
string, segment directory.
"""
return path.join(self.get_data_dir(seg, prod),'seg_%s.output'%seg)
def get_param_file(self, seg):
""" Return the segment directory.
......@@ -473,21 +465,6 @@ class Pipeline:
"""
return path.join(self.get_curr_dir(seg),'stdout')
# def write_subpipes_file(self, seg):
# """ Return the segment directory.
# Parameters
# ----------
# seg : string, segment name.
# Returns
# -------
# string, segment directory.
# """
# f = file(path.join(self.get_curr_dir(seg),'subpipes.txt'),'w')
# for d in self._sub_dirs[seg]:
# f.write(d)
# f.close()
def get_data_dir(self, seg, prod=None):
""" Return the data directory for the segment or a product
......
......@@ -242,7 +242,7 @@ class Scheduler():
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
self.store_meta_task(t)
if t.task_output:
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
......@@ -274,21 +274,6 @@ class Scheduler():
r = pickle.dump(dict({'parents':lst_dir}),f)
def store_meta_task(self, t):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg, prod=t.task_input)
with closing(file(fn, 'w')) as f:
r = pickle.dump(t.to_save,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -327,7 +312,7 @@ class Scheduler():
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
self.store_meta_seg (seg, parents)
self.store_meta_seg(seg, parents)
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
......@@ -346,7 +331,7 @@ class Scheduler():
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par))
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par, seg_parents=parents))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
......@@ -398,7 +383,6 @@ class Scheduler():
"""
self.tracker.update_status(t,'failed')
self.store_meta_task(t)
self.task_queue.task_done()
def requeue(self, t):
......
......@@ -14,6 +14,9 @@
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import threading
from utils import str_date
from contextlib import closing
import pickle
class Task:
""" A segment code associated with its input/output product(s).
......@@ -22,7 +25,7 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[]):
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[], seg_parents=[]):
""" Initialize a task object.
Parameters
......@@ -52,7 +55,10 @@ class Task:
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
self.to_save = {}
## List of str_input of the parent tasks
self.str_parents = []
## List of curr_dir of the parent segments
self.seg_parents = seg_parents
def __str__(self):
""" Convert task object to string.
......@@ -60,6 +66,37 @@ class Task:
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
def update_status(self, status):
""" Update task status.
"""
str_d = str_date()
self.status = status
if status == 'done':
self.ended_on = str_d
elif status == 'queued':
self.queued_on = str_d
else:
self.task_output = None
if status == 'running':
self.begun_on = str_d
def store_meta(self, fn):
""" Store meta information for task.
This is used to rebuild db.
Parameters
----------
fn, string file name
"""
with closing(file(fn, 'w')) as f:
r = pickle.dump(dict({'parents':self.str_parents, 'queued_on':self.queued_on, 'ended_on':self.ended_on, 'begun_on':self.begun_on, 'input':pickle.dumps(self.task_input), 'output':pickle.dumps(self.task_output), 'status':self.status}),f)
class TaskList:
""" List of task objects.
......
......@@ -297,8 +297,8 @@ class SqliteTracker(Tracker,threading.Thread):
with self.conn_lock:
ts = self.conn.execute('select * from tasks as t join segments as s on t.seg_id=s.seg_id and s.curr_dir=? and t.status=? ',(s, status))
for t in ts:
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
task_output = pickle.loads(t['output'])
task_input = pickle.loads(t['input'])
l.append(task.Task(seg,status=status,id=t['task_id'], task_input=task_input, task_output=task_output))
return l
......@@ -316,28 +316,22 @@ class SqliteTracker(Tracker,threading.Thread):
task object
"""
t.update_status ('queued')
task_output = pickle.dumps(t.task_output)
task_input = pickle.dumps(t.task_input)
str_input = self.pipe.get_data_dir(t.seg, prod=t.task_input)
str_d = str_date()
with self.conn_lock:
with self.conn:
seg_id = self.seg_id_cache[t.seg]
c = self.conn.execute('insert into tasks(seg_id, status, input, output, str_input, queued_on) values(?, ?, ?, ?, ?, ?)',
(seg_id, 'queued', task_input, task_output, str_input, str_d))
(seg_id, t.status, task_input, task_output, str_input, t.queued_on))
t.id = c.lastrowid
logger.debug('for task:\n%s\nparents:\n%s\n'%(str(t),str(t.parents)))
self.conn.executemany('insert into task_relations (father_id, child_id) values (?, ?)',((fid, t.id) for fid in t.parents))
t.status = 'queued'
lst_dir = []
for e in t.parents:
with self.conn_lock:
c = self.conn.execute('select str_input from tasks where task_id=?', (e,)).fetchone()
if c:
lst_dir.append(c[0])
t.to_save['parents'] = lst_dir
t.to_save['queued_on'] = str_d
for fid in t.parents:
c = self.conn.execute('select str_input from tasks where task_id=?', ((fid,)))
for e in c:
t.str_parents.append(e[0])
return t
......@@ -356,21 +350,6 @@ class SqliteTracker(Tracker,threading.Thread):
c = self.conn.execute('select status from tasks where task_id=?', t.id)
return c.fetchone()['status']
def write_param (self, t):
""" Add param string to meta file
Parameters
----------
t: task object
"""
fn = self.pipe.get_meta_file(t.seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = t.tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def update_status(self, t, status):
""" Update the task with new status.
......@@ -385,26 +364,17 @@ class SqliteTracker(Tracker,threading.Thread):
-------
task object
"""
str_d = str_date()
t.update_status (status)
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=?, begun_on=?, queued_on=? where task_id=?',
(status, task_output, t.ended_on, t.begun_on, t.queued_on, t.id))
if status == 'done':
task_output = pickle.dumps(t.task_output)
self._asynchronous_request('update tasks set status=?, output=?, ended_on=? where task_id=?',
(status, task_output, str_d, t.id))
self._asynchronous_request('update segments set param=? where seg=?',
(t.tag, t.seg))
t.to_save["ended_on"] = str_d
self.write_param(t)
else:
t.task_output = None
task_output = pickle.dumps(t.task_output)
if status == 'running':
self._asynchronous_request('update tasks set status=?, output=?, begun_on=? where task_id=?',(status, task_output, str_date(), t.id))
t.to_save["begun_on"] = str_d
else:
self._asynchronous_request('update tasks set status=?, output=? where task_id=?',(status, task_output, t.id))
t.status = status
(t.param, t.seg))
if status =='done' or status == 'failed':
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input))
return t
......
......@@ -30,6 +30,7 @@ from pipelet.repository import LocalRepository
from contextlib import closing
import pickle
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
def flatten(seq):
"""
......@@ -213,6 +214,54 @@ def reduced_code_formatting(s, unsafe=False):
min_code += l+'\n'
return min_code
def create_pipe(pipename, prefix=[]):
""" Create an empty pipe environment including main, defaulf
segment, prefix and log directories.
This function is called from the script pipeutils.
Parameters
----------
pipename: string, pipe name
prefix: string, pipe prefix.
"""
import os
import shutil
## make code dir
code_dir = os.path.join("./"+pipename)
if not os.path.exists(code_dir):
os.mkdir(code_dir)
## make prefix and log dir
if not prefix:
prefix= code_dir
if not os.path.exists(os.path.join(code_dir, "main.py")):
fn = current_dir+"/static/main.py"
with closing(file(fn, 'r')) as f:
str = f.read()
str = str.replace("prefix = './'", "prefix = '%s'"%prefix)
fn = os.path.join(code_dir, "main.py")
with closing(file(fn, 'w')) as f:
f.write(str)
if not os.path.exists(os.path.join(code_dir, "seg_default_code.py")):
shutil.copy(current_dir+"/static/seg_default_code.py", code_dir) ## make a segment file with some doc
print "\n\n-----------------------------------------------------------"
print " Pipeline '%s' has been successfully created "%pipename
print "-----------------------------------------------------------\n\n"
print "1- Change directory to '%s' to set your pipeline scheme\n in the file named 'main.py'\n"%code_dir
print "2- Rename and edit the 'seg_default_code.py' to set the segments content.\n"
print "3- Run 'python main.py -d' to enter the debugging execution mode."
print "Type 'python main.py --help' to get the full list of options.\n"
print "4- Run 'pipeweb track %s %s/.sqlstatus' \n to add the pipe to the web interface.\n"%(pipename, prefix)
print "5- Set acl with 'pipeutils -a username -l 2 %s/.sqlstatus'\n"%prefix
print "6- Launch the web server with 'pipeweb start'"
print "You should be able to browse the result on the web page http://localhost:8080\n"
print "Retype the current command line to get those instructions back."
def hash_file(codefile):
"""Use cheksum algorithm to return a unique key for file.
......@@ -376,15 +425,7 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
for t in lst_task:
## read output from output file
fn = glob.glob(path.join(t, "seg_*.output"))
if fn:
with closing (file(fn[0],'r')) as f:
output = pickle.load(f)
else:
print "error no output file found"
## read dates and parents from meta file
## read task propertie from meta file
fn = glob.glob(path.join(t, "seg_*.meta"))
if fn:
with closing (file(fn[0],'r')) as f:
......@@ -393,20 +434,9 @@ def rebuild_db_from_disk(pipedir, sqlfile=None):
print "error no meta file found"
task_depend_cache[t] = meta['parents']