Pipelet is a free framework allowing for the creation, execution and browsing of scientific data processing pipelines. It provides: + easy chaining of interdependent elementary tasks, + web access to data products, + branch handling, + automated distribution of computational tasks. Both engine and web interface are written in Python. * Tutorial ** Introduction *** Why using pipelines The pipeline mechanism allows you to apply a sequence of processing steps to your data, in a way that the input of each process is the output of the previous one. Making visible these different processing steps, in the right order, is essential in data analysis to keep track of what you did, and make sure that the whole processing remains consistent. *** How it works Pipelet is based on the possibility to save on disk every intermediate input or output of your pipeline, which is usually not a strong constraint but offers a lot of benefits. It means that you can stop the processing whenever you want, and begin it again without recomputing the whole thing: you just take the last products you have on disk, and continue the processing where it stopped. This logic is interesting when the computation cost is higher than the cost of disk space required by intermediate products. *** The Pipelet functionalities Pipelet is a free framework which helps you : + to write and manipulate pipelines with any dependency scheme, + to dispatch the computational tasks on parallel architectures, + to keep track of what processing has been applied to your data and perform comparisons. ** Getting started *** Pipelet installation **** Dependencies + Running the Pipelet engine requires Python >= 2.6. + The web interface of Pipelet requires the installation of the cherrypy3 Python module (debian aptitude install python-cherrypy3). You may find usefull to install some generic scientific tools that nicely interact with Pipelet: + numpy + matplotlib + latex **** Getting Pipelet There is not any published stable release of pipelet right now. git clone git://gitorious.org/pipelet/pipelet.git **** Installing Pipelet sudo python setup.py install *** Running a simple test pipeline 1. Run the test pipeline cd test python main.py 2. Add this pipeline to the web interface pipeweb track test ./.sqlstatus 3. Set the access control and launch the web server pipeutils -a username -l 2 .sqlstatus pipeweb start 4. You should be able to browse the result on the web page http://localhost:8080 *** Getting a new pipe framework To get a new pipeline framework, with example main and segment scripts : pipeutils -c pipename This command ends up with the creation of directory named pipename wich contains: + a main script (named main.py) providing functionnalities to execute your pipeline in various modes (debug, parallel, batch mode, ...) + an example of segment script (seg_default_code.py) which illustrates the pipelet utilities with comments. The next section describes those two files in more details. ** Writing Pipes *** Pipeline architecture The definition of a data processing pipeline consists in : + a succession of python scripts, called segments, coding each step of the actual processing. + a main script that defines the dependency scheme between segments, and launch the processing. The dependencies between segments must form a directed acyclic graph. This graph is described by a char string using a subset of the graphviz dot language (http://www.graphviz.org). For exemple the string: """ a -> b -> d; c -> d; c -> e; """ defines a pipeline with 5 segments {"a", "b", "c", "d", "e"}. The relation "a->b" ensures that the processing of the segment "a" will be done before the processing of its child segment "b". Also the output of "a" will be feeded as input for "b". In the given example, the node "d" has two parents "b" and "c". Both will be executed before "d". As their is no relation between "b" and "c" which of the two will be executed first is not defined. When executing the segment "seg", the engine looks for a python script named seg.py. If not found, it looks iteratively for script files named "se.py" and "s.py". This way, different segments of the pipeline can share the same code, if they are given a name with a common root (this mecanism is useful to write generic segment and is completed by the hooking system, described in the advanced usage section). The code is then executed in a specific namespace (see below The execution environment). *** The Pipeline object Practically, the creation of a Pipeline object needs 3 arguments: P = Pipeline(pipedot, codedir=, prefix=) - pipedot is the string description of the pipeline - codedir is the path where the segment scripts can be found - prefix is the path to the data repository (see below Hierarchical data storage) It is possible to output the graphviz representation of the pipeline. First, save the graph string into a .dot file with the pipelet function: P.to_dot('pipeline.dot') Then, convert it to an image file with the dot command: dot -Tpng -o pipeline.png pipeline.dot *** Dependencies between segments The modification of the code of one segment will trigger its recalculation and the recalculation of all the segments which depend on it. The output of a segment is a list of python objects. If a segment as no particular output this list can be empty and do not need to be specified. Elements of the list are allowed to be any kind of pickleable python objects. However, a good practice is to fill the list with the minimal set of characteristics relevant to describe the output of the segment and to defer the storage of the data to appropriate structures and file formats. For example, a segment which performs computation on large images could virtually pass the results of its computation to the following segment using the output list. It is a better practice to store the resulting image in a dedicated file and to pass in the list only the information allowing a non ambiguous identification of this file (like its name or part of it) for the following segments. The input of a child segment is taken in a set build from the output lists of its parents. The content of the input set is actually tunable using the multiplex directive (see below). However the simplest and default behaviour of the engine is to form the Cartesian product of the output list of its parent. To illustrate this behaviour let us consider the following pipeline, build from three segments: knights -> melt; quality -> melt; and assume that the respective output lists of segments knights and quality are: ["Lancelot", "Galahad"] and: ['the Brave', 'the Pure'] The cartesian product of the previous set is: [('Lancelot','the Brave'), ('Lancelot,'the Pure'), ('Galahad','the Brave'), ('Galahad','the Pure')] Four instances of segment "melt" will thus be run, each one receving as input one of the four 2-tuples. At the end of the execution of all the instances of a segment, their output lists are concatenated. If the action of segment "melt" is to concatenate the two strings he receives separated by a space, the final output set of segment "melt" will be: [('Lancelot the Brave'), ('Lancelot the Pure'), ('Galahad the Brave'), ('Galahad the Pure')]. TODO : describe input data type : disctionnary , ... ? *** Multiplex directive This default behavior can be altered by specifying a #multiplex directive in the commentary of the segment code. If several multiplex directives are present in the segment code the last one is retained. - #multiplex : activate the default behaviour - #multiplex cross_prod group by 0 : The input set contains one tuple of all the ouputs. - #multiplex cross_prod group by ... : compute the cross_product and group the task that are identical. To make use of group, elements of the output set have to be hashable. *** Orphan segments By default, orphan segments have no input argument (an empty list), and therefore are executed once. The possibility is offer to push an input list to an orphan segment. If P is an instance of the pipeline object: P.push (segname=seg_input) *** Depend directive As explained in the introduction section, Pipelet offers the possibility to spare CPU time by saving intermediate products on disk. We call intermediate products the input/output data files of the different segments. Each segment repository is identified by an unique key which depends on: - the segment processing code and parameters (segment and hook scripts) - the input data (identified from the key of the parent segments) Every change made on a segment (new parameter or new parent) will then give a different key, and tell the pipelet engine to compute a new segment instance. It is possible to add some external dependencies to the key computation using the depend directive: #depend file1 file2 At the very beginning of the pipeline execution, all dependencies will be stored, to prevent any change (code edition) between the key computation and actual processing. Note that this mecanism works only for segment and hook scripts. External dependencies are also read as the beginning of the pipeline execution, but only used for the key computation. *** Hierarchical data storage This system provides versionning of your data and easy access through the web interface. It is also used to keep track of the code, of the execution logs, and various meta-data of the processing. Of course, you remain able to bypass the hierarchical storage and store your actual data elsewhere, but you will loose the benefit of automated versionning which proves to be quite convenient. The storage is organized as follows: - all pipeline instances are stored below a root which corresponds to the prefix parameter of the Pipeline object. /prefix/ - all segment meta data are stored below a root which name corresponds to an unique match of the segment code. /prefix/segname_YFLJ65/ - Segment's meta data are: - a copy of the segment python script - a copy of all segment hook scripts - a parameter file (.args) which contains segment parameters value - a meta data file (.meta) which contains some extra meta data - all segment instances data and meta data are stored in a specific subdirectory which name corresponds to a string representation of its input /prefix/segname_YFLJ65/data/1/ - if there is a single segment instance, then data are stored in /prefix/segname_YFLJ65/data/ - If a segment has at least one parent, its root will be located below one of its parent's one : /prefix/segname_YFLJ65/segname2_PLMBH9/ - etc... *** The segment environment The segment code is executed in a specific environment that provides: 1. access to the segment input and output - seg_input: this variable is a dictionnary containing the input of the segment - seg_output: this variable has to be a list. 2. Functionnalities to use the automated hierarchical data storage system. - get_data_fn(basename): complete the filename with the path to the working directory. - glob_seg(seg, regexp): return the list of filename matching regexp from segment seg - get_tmp_fn(): return a temporary filename. 3. Functionnalities to use the automated parameters handling - lst_par: list of parameter names of the segment - lst_tag: list of parameter names which will be made visible from the web interface - load_param(seg, globals(), lst_par) 4. Various convenient functionalities - save_products(filename=', lst_par='*'): use pickle to save a part of a given namespace. - load_products(filename, lst_par): update the namespace by unpickling requested object from the file. - logged_subprocess(lst_args): execute a subprocess and log its output in processname.log and processname.err. - logger is a standard logging.Logger object that can be used to log the processing 5. Hooking support Pipelet enables you to write reusable generic segments by providing a hooking system via the hook function. hook (hookname, globals()): execute Python script ‘segname_hookname.py’ and update the namespace. *** The exemple pipelines **** fft ***** Highlights This example illustrates a very simple image processing use. The problematic is the following : one wants to apply a Gaussian filter in Fourier domain on several 2D images. The pipe scheme is: pipedot = """ mkgauss->convol; fftimg->convol; """ where segment 'mkgauss' computes the Gaussian filter, 'fftimg' computes the Fourier transforms of the input images, and 'convol' performs the filtering in Fourier domain, and the inverse transform of the filtered images. P = pipeline.Pipeline(pipedot, code_dir=op.abspath('./'), prefix=op.abspath('./')) P.to_dot('pipeline.dot') The pipe scheme is output as a .dot file, that can be converted to an image file with the command line: dot -Tpng -o pipeline.png pipeline.dot To apply this filter to several images (in our case 4 input images), the pipe data parallelism is used. From the main script, a 4-element list is pushed to the 'fftimg' segment. P.push(fftimg=[1,2,3,4]) At execution, 4 instances of the 'fftimg' segment will be created, and each of them outputs one element of this list : img = seg_input.values()[0] #(fftimg.py - line 16) seg_output = [img] #(fftimg.py - line 41) On the other side, a single instance of the 'mkgauss' segment will be executed, as there is one filter to apply. The last segment 'convol', which depends on the two others, will be executed with a number of instances that is the Cartesian product of its 4+1 inputs (ie 4 instances) The instance identifier which is set by the 'fftimg' output, can be retrieve with the following instruction: img = seg_input['fftimg'] #(convol.py - line 12) ***** Running the pipe Follow the same procedure than for the first example pipeline, to run this pipe and browse the result. **** cmb ***** Highlights ***** Running the pipe ** Running Pipes *** The sample main file A sample main file is made available when creating a new pipelet framework. It is copied from the reference file: pipelet/pipelet/static/main.py This script illustrates various ways of running pipes. It describes the different parameters, and also, how to write a main python script can be used as any binary from the command line (including options parsing). *** Common options Some options are common to each running modes. **** log level The logging system is handle by the python logging facility module. This module defines the following log levels : + DEBUG + INFO + WARNING + ERROR + CRITICAL All logging messages are saved in the differents pipelet log files, available from the web interface (rotating file logging). It is also possible to print those messages on the standard output (stream logging), by setting the desired log level in the launchers options: For example: import logging launch_process(P, N,log_level=logging.DEBUG) If set to 0, stream logging will be disable. **** matplotlib The matplotlib documentation says: "Many users report initial problems trying to use maptlotlib in web application servers, because by default matplotlib ships configured to work with a graphical user interface which may require an X11 connection. Since many barebones application servers do not have X11 enabled, you may get errors if you don’t configure matplotlib for use in these environments. Most importantly, you need to decide what kinds of images you want to generate (PNG, PDF, SVG) and configure the appropriate default backend. For 99% of users, this will be the Agg backend, which uses the C++ antigrain rendering engine to make nice PNGs. The Agg backend is also configured to recognize requests to generate other output formats (PDF, PS, EPS, SVG). The easiest way to configure matplotlib to use Agg is to call: matplotlib.use('Agg') " The matplotlib and matplotlib_interactive options turn the matplotlib backend to Agg in order to allow the execution in non-interactive environment. Those two options are set to True by default in the sample main script. TODO : explain why. *** The interactive mode This mode has been designed to ease debugging. If P is an instance of the pipeline object, the syntax reads : from pipelet.launchers import launch_interactive w, t = launch_interactive(P) w.run() In this mode, each tasks will be computed in a sequential way. Do not hesitate to invoque the Python debugger from IPython : %pdb *** The process mode In this mode, one can run simultaneous tasks (if the pipe scheme allows to do so). The number of subprocess is set by the N parameter : from pipelet.launchers import launch_process launch_process(P, N) *** The batch mode In this mode, one can submit some batch jobs to execute the tasks. The number of job is set by the N parameter : from pipelet.launchers import launch_pbs launch_pbs(P, N , address=(os.environ['HOST'],50000)) It is possible to specify some job submission options like: + job name + job header: this string is prepend to the PBS job scripts. You may want to add some environment specific paths. Log and error files are automatically handled by the pipelet engine, and made available from the web interface. + cpu time: syntax is: "hh:mm:ss" The 'server' option can be disable to add some workers to an existing scheduler. ** Browsing Pipes *** The pipelet webserver and ACL The pipelet webserver allows the browsing of multiple pipelines. Each pipeline has to be register using : pipeweb track sqlfile To remove a pipeline from the tracked list, use : pipeweb untrack As the pipeline browsing implies a disk parsing, some basic security has to be set also. All users have to be register with a specific access level (1 for read-only access, and 2 for write access). pipeutils -a -l 2 sqlfile To remove a user from the user list: pipeutils -d sqlfile Start the web server using : pipeweb start Then the web application will be available on the web page http://localhost:8080 To stop the web server : pipeweb stop *** The web application In order to ease the comparison of different processings, the web interface displays various views of the pipeline data : **** The index page The index page displays a tree view of all pipeline instances. Each segment may be expand or reduce via the +/- buttons. The parameters used in each segments are resumed and displayed with the date of execution and the number of related tasks order by status. A checkbox allows to performed operation on multiple segments : - deletion : to clean unwanted data - tag : to tag remarkable data The filter panel allows to display the segment instances wrt 2 criterions : - tag - date of execution **** The code page Each segment names is a link to its code page. From this page the user can view all python scripts code which have been applied to the data. The tree view is reduced to the current segment and its related parents. The root path corresponding to the data storage is also displayed. **** The product page The number of related tasks, order by status, is a link to the product pages, where the data can be directly displayed (if images, or text files) or downloaded. From this page it is also possible to delete a specific product and its dependencies. **** The log page The log page can be acceed via the log button of the filter panel. Logs are ordered by date. * Advanced usage ** Database reconstruction In case of unfortunate lost of the pipeline sql data base, it is possible to reconstruct it from the disk : import pipelet pipelet.utils.rebuild_db_from_disk (prefix, sqlfile) All information will be retrieve, but with new identifiers. ** The hooking system As described in the 'segment environment' section, pipelet supports an hooking system which allows the use of generic processing code, and code sectioning. Let's consider a set of instructions that have to be systematically applied at the end of a segment (post processing), one can put those instruction in the separate script file named for example 'segname_postproc.py' and calls the hook function: hook('postproc', globals()) A specific dictionnary can be passed to the hook script to avoid confusion. The hook scripts are included into the hash key computation (see advanced usage section). ** Writing custom environments The pipelet software provides a set of default utilities available from the segment environment. It is possible to extend this default environment or even re-write a completely new one. *** Extending the default environment The different environment utilities are actually methods of the class Environment. It is possible to add new functionnalities by using the python heritage mechanism: File : myenvironment.py from pipelet.environment import * class MyEnvironment(Environment): def my_function (self): """ My function do nothing """ return The pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See section "The pipelet actors" for more details about the pipelet machinery. *** Writing new environment In order to start with a completely new environment, extend the base environment: File : myenvironment.py from pipelet.environment import * class MyEnvironment(EnvironmentBase): def my_get_data_fn (self, x): """ New name for get_data_fn """ return self._get_data_fn(x) def _close(self, glo): """ Post processing code """ return glo['seg_output'] From the base environment, the basic functionnalities for getting file names and executing hook scripts are still available through: - self._get_data_fn - self._hook The segment input argument is also stored in self._seg_input The segment output argument has to be returned by the _close(self, glo) method. The pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See section "The pipelet actors" for more details about the pipelet machinery. *** Loading another environment To load another environment, set the pipeline environment attribut accordingly. Pipeline(pipedot, codedir=, prefix=, env=MyEnvironment) ** Writing custom main files ** Launching pipeweb behind apache Pipeweb use the cherrypy web framework server and can be run behind an apache webserver which brings essentially two advantages: - https support. - faster static files serving. * The pipelet actors This section document the code for developpers. ** The Repository object ** The Pipeline object ** The Task object ** The Scheduler object ** The Tracker object ** The Worker object ** The Environment object