""" Example main illustrating various ways of launching jobs. """ from pipelet.pipeline import Pipeline from pipelet.launchers import launch_interactive, launch_process, launch_pbs import os import sys import logging import os.path as path ### Pipeline properties ### pipe dot scheme ### The dependencies between segments must form a directed acyclic ### graph. This graph is described by a char string using a subset of the ### graphviz dot language (http://www.graphviz.org). For exemple the string: ### """ ### a -> b -> d; ### c -> d; ### c -> e; ### """ ### defines a pipeline with 5 segments {"a", "b", "c", "d", "e"}. The ### relation "a->b" ensures that the processing of the segment "a" will be ### done before the processing of its child segment "b". Also the output ### of "a" will be feeded as input for "b". In the given example, the node ### "d" has two parents "b" and "c". Both will be executed before "d". As ### their is no relation between "b" and "c" which of the two will be ### executed first is not defined. ### See doc for more details. ### nside = 512 sim_ids = [2,4,5] pipe_dot = """ noise->clcmb->clplot; cmb->clcmb; """ ### code_dir ### This path corresponds to the directory where the segment python scripts are found. ### One may choose a location which is regularly backed up code_dir = './' ### prefix ### This path corresponds to the directory where the pipeline products will be stored. ### One may choose a location with large disk space and no backup. prefix = './cmb' ### log_level ### Set the log level which will be displayed on stdout. ### Logs are also written in pipelet log files with level set to DEBUG. ### Log files are available from the web interface. log_level = logging.DEBUG ### job_name ### This string will be used in the PBS mode to tag all pipeline jobs. job_name = 'pipelet' ### job_header ### This string will be put at the beginning of each PBS jobs. ### Error and log files will be available from the interface with ### the others log files. job_header = """ #/bin/bash echo $PYTHONPATH """ ### cpu_time ### Maximum cpu time used in PBS mode. cpu_time = "00:30:00" ### print_info ### Print additional informations after running to access the web interface print_info = True def main(print_info=print_info): """ Run the pipeline. """ import optparse parser = optparse.OptionParser() parser.add_option('-a', '--add-workers', metavar='N', help='Submit N supplementary jobs without launching a new server.',type='int') parser.add_option('-d', '--debug', help='Start jobs in interactive mode',action="store_true",default=False) parser.add_option('-p', '--process', metavar='N', help='Launch jobs as local parallel processes',type='int') (options, args) = parser.parse_args() ## Build pipeline instance P = Pipeline(pipe_dot, code_dir=code_dir, prefix=prefix, matplotlib=True, matplotlib_interactive=True) cmbin = [] noisein = [] for sim_id in sim_ids: cmbin.append((nside, sim_id)) noisein.append((nside, sim_id)) P.push(cmb=cmbin) P.push(noise=noisein) ## Interactive mode if options.debug: w, t = launch_interactive(P, log_level=log_level) w.run() ## Process mode elif options.process: launch_process(P, options.process,log_level=log_level) ## PBS mode elif options.add_workers: launch_pbs(P,options.add_workers , address=(os.environ['HOST'],50000), job_name=job_name, cpu_time=cpu_time, job_header=job_header) else: launch_pbs(P, 1, address=(os.environ['HOST'],50000), job_name=job_name, cpu_time=cpu_time, server=True, job_header=job_header, log_level=log_level) if print_info: print "1- Run 'pipeweb track %s' \n to add the pipe to the web interface.\n"%(P.sqlfile) print "2- Set acl with 'pipeutils -a -l 2 %s'\n"%P.sqlfile print "3- Launch the web server with 'pipeweb start'" print "You should be able to browse the result on the web page http://localhost:8080\n" if __name__ == "__main__": main()