## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see http://www.gnu.org/licenses/gpl.html import sqlite3 import cherrypy from contextlib import closing import os.path import os current_dir = os.path.dirname(os.path.abspath(__file__)) from glob import glob import shutil from cherrypy.lib.static import serve_file from auth import read_access, write_access import re import pylab html_tmp = """ """ class Web: """ A pipeline Web interface. A pipeline Web interface allows to browse all instances of a given pipeline. The informations about the pipeline are retrieve from its data base file. Pipelines are printed through a tree view with links to all segments and products. """ exposed = True def __init__(self, db_file, name): """ Initialize a web instance. Parameters ---------- db_file: string, file name of the pipe data base. """ self.name = name ## Pipeline data base file. self.db_file = db_file def get_lst_tag (self): """ Return the list of existing tags Tags are ; separated in the db. Returns ------- list of string """ conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str lst = [] with conn: l = conn.execute('select tag from segments').fetchall() for s in l: if s[0] is not None: str_tag = s[0].split(";") for e in str_tag: if e: lst.append(e) return lst def get_lst_date (self): """ Return the list of existing dates Date strings are picked from queued_on field corresponding to the first task of each segment. Returns ------- list of string """ conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str lst = [] with conn: l = conn.execute('select seg_id from segments').fetchall() for s in l: e = conn.execute('select queued_on from tasks where seg_id=?',(s[0],)).fetchone() if e is not None and e: lst.append(e[0]) return lst @cherrypy.expose @write_access def addtag (self, segid, tag): """ Add new tag to the database Check that the tag do not exist yet for this id Parameters ---------- segid: string. List of seg_id ; separated. (; at the end of string also) tag: string. Tags are ; separated in the db. """ seglst = segid.split(';') conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str for segid in seglst: if segid: with conn: l = conn.execute('select tag from segments where seg_id=?',(segid,)).fetchone() lst_tag = [] if l[0] is not None and l: lst_tag = l[0].split(";") lst_tag.append(tag) lst_tag = pylab.unique(lst_tag) str_tag = ";".join(lst_tag) conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid)) raise cherrypy.HTTPRedirect('/'+self.name+'/',303) @cherrypy.expose @write_access def deltag (self, tag): """ Delete tag from the database Parameters ---------- tag: string. """ conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str with conn: l = conn.execute('select seg_id, tag from segments where tag like ?',("%"+tag+"%",)).fetchall() for s in l: lst_tag = s[1].split(";") lst_tag.remove(tag) str_tag = ";".join(lst_tag) conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0])) raise cherrypy.HTTPRedirect('/'+self.name+'/',303) @cherrypy.expose @read_access def filter (self, tag=None, date=None): """ Print the pipeline instances matching tag and date. If both parameters are set, the selection corresponds to the union. Tag are set for leafs only. Upstream parents are automatically added to the list. Parameters ---------- tag: string date: string """ lstseg = [] conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str with conn: if tag is not None: l = conn.execute('select seg_id, tag from segments where tag like ?',("%"+tag+"%",)).fetchall() for s in l: lst_tag = s[1].split(";") if tag in lst_tag: lstseg+=self._get_highlight(s[0]) if date is not None: l = conn.execute('select seg_id from tasks where queued_on = ?',(date,)).fetchall() for s in l: lstseg+=self._get_highlight(s[0]) html = self.index(highlight=lstseg) return html @cherrypy.expose @read_access def index(self, highlight=None): """ Pipeline instances tree view Print the pipeline instances trough a tree view. Parameters ---------- highlight: list of segid (optional), filter the printed seg by segid. """ conn = sqlite3.connect(self.db_file,check_same_thread=True) conn.text_factory=str # get all instances with conn: l = conn.execute('select seg, curr_dir, seg_id, param from segments order by curr_dir').fetchall() html = html_tmp html += '

Pipelines in %s

'%self.name ## Filter fieldset html += '
Filters' html += '
' ## Tag checkbox html += '' ## Tag select lst_tag = self.get_lst_tag() html += '' ## Tag button html += '' ## Date checkbox html += '' ## Date select lst_date = self.get_lst_date() html += '' str_lst_tag = "\\n" for t in lst_tag: str_lst_tag += "- "+t+"\\n" ## Buttons html += '' html += '
Delete
' html += '
' html += '

' html += 'Apply' html += 'Clear' html += 'Tag'%str_lst_tag html += 'Delete' html += 'Browse log'%(l[0][1].split("seg")[0]+"log") html +='

' html += '
    ' indent = -1 # select a subset if highlight is not None: newl = [] for s in l: if s[2] in highlight: newl.append(s) else: cherrypy.log.error_log.warning(str(s[2])+str(highlight)) l = newl for s in l: with conn: e = conn.execute('select status, count(status) from tasks where seg_id=? group by status',(s[2],)).fetchall() ss = s[3] #try: # with closing(file(os.path.join(s[1],"stdout"))) as f: # ss = f.read() #except IOError: # ss = "" if ss is None: ss = "" print s for stat in e: ss = '%d, '%(s[2],stat[0], stat[0], stat[1]) + ss ss += ' 1: html += '
  • %s : %s\n'%(s[2],s[2],s[0],ss) else: html += '
\n'*abs(diff) + '
  • %s : %s'%(s[2],s[2],s[0],ss) indent += diff conn.close() html += '
  • '*(indent-l[0][1].count('/')+1) if not highlight: html += '
    ' return html @cherrypy.expose @read_access def product(self, segid=None, status=None): """ Products index. Print the content of the product directory. Parameters ---------- currdir: string, pipeline instance path. status: string, product status ('done', 'queued', 'running', 'failed') """ conn = sqlite3.connect(self.db_file,check_same_thread=True) with conn: seg, currdir = conn.execute( 'select seg, curr_dir from segments where seg_id = ?' ,(segid,)).fetchone() l = conn.execute('select str_input from tasks where seg_id=? and status=?',(segid, status)).fetchall() conn.close() html = html_tmp + '

    Data products for %s tasks in segment %s

    ' % (status, seg) html += '

    Directory : %s

    %d %s tasks
      '%( currdir, len(l), status, status) for e in l: html += '
    • %s
    • '%(segid,os.path.relpath(e[0],start=currdir),e[0]) html += '
    ' return html def _get_highlight(self, segid): """ Append upstream pipeline instance paths to the current path. Return a list which contains all upstream segment instances id for a given segment instance. This is used to print a pipeline tree view with all dependencies. Parameters ---------- segid: id of the leaf segment. Returns ------- list of segid, for the upstream segment instances. """ lstid = [segid] conn = sqlite3.connect(self.db_file,check_same_thread=True) with conn: fids = conn.execute( 'select father_id from segment_relations where child_id = ?' ,(segid,)).fetchall() conn.close() if fids: for l in fids: lstid += self._get_highlight(l[0]) return lstid @cherrypy.expose @read_access def code(self, segid=None): """ Segment's files index. Print the content of a segment directory (code files, parameters file, ...) Parameters ---------- currdir: string, pipeline instance path. """ highlight = self._get_highlight(segid) conn = sqlite3.connect(self.db_file,check_same_thread=True) with conn: seg, currdir = conn.execute( 'select seg, curr_dir from segments where seg_id = ?' ,(segid,)).fetchone() conn.close() html = self.index(highlight=highlight) + '

    %s

    ' return html def del_from_checkbox (): """ """ @cherrypy.expose @write_access def delseg(self, segid=None): """ Delete a pipeline instance. Delete all segments and products directories of a given pipeline instance. Parameters ---------- segid: string, list of pipe id ';' separated """ cherrypy.log.error_log.warning('called ONCE') if segid is not None: seglist = segid.split(";") for segid in seglist: if segid: conn = sqlite3.connect(self.db_file,check_same_thread=True) with conn: currdir = conn.execute('select curr_dir from segments where seg_id = ?',(int(segid),)).fetchone()[0] l = conn.execute('delete from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',)) conn.close() try: shutil.rmtree(currdir) except: pass raise cherrypy.HTTPRedirect('/'+self.name+'/',303) @cherrypy.expose @read_access def pipedir(self, segid=None, directory=None): """ Print the content of a directory. Parameters ---------- directory: string pipeline directory path. """ directory = self.check_path (segid, directory) html = html_tmp + '

    Content of %s

    """ return html forbidden_path = re.compile('.*\.\..*') @cherrypy.expose @read_access def log(self, logdir): """ Print the content of the log directory. """ directory = logdir html = html_tmp + '

    Content of %s

    Back
    Delete logs
      '%(directory,self.name,logdir) for filename in sorted(glob(os.path.join(directory,'*')), reverse=True): absPath = os.path.abspath(filename) html += '
    • ' + os.path.basename(filename) + "
    • " html += """
    """ return html @cherrypy.expose @read_access def serve_log(self, filename): """ Print the content of the log file. """ return serve_file(filename, content_type='text/plain', disposition="inline") @cherrypy.expose @write_access def delete_log(self, logdir): """ Delete the content of the log directory. """ for filename in glob(os.path.join(logdir,'*')): absPath = os.path.abspath(filename) os.remove(absPath) raise cherrypy.HTTPRedirect("log?logdir=%s"%logdir,303) def check_path(self, segid, path): """Chroot the path to the segid currdir. """ conn = sqlite3.connect(self.db_file,check_same_thread=True) with conn: seg, currdir = conn.execute( 'select seg, curr_dir from segments where seg_id = ?' ,(segid,)).fetchone() conn.close() filepath = os.path.realpath(os.path.join(currdir, path)) if self.forbidden_path.match(os.path.relpath(filepath, start=currdir)) is not None: raise cherrypy.HTTPError(403) else: return filepath @cherrypy.expose @read_access def download(self, segid=None, filepath=None): """ Download a file. Text files are printed directly. Parameters ---------- filepath: string, file name. """ filepath = self.check_path(segid, filepath) if os.path.splitext(filepath)[1] in ['.log','.txt','.list','.py']: return serve_file(filepath, content_type='text/plain', disposition="inline") else: return serve_file(filepath, disposition="inline") class PipeIndex(): """ A pipeline index Web interface. A pipeline index Web interface prints the lists of available pipelines found in a given path. """ def __init__(self): self.pipelist = [] for pipe, dbfile in cherrypy.config['Pipelines'].iteritems(): self.pipelist.append(pipe) setattr(self, pipe, Web(os.path.expanduser(dbfile),pipe)) def which_sqlfile(self, path_info): try: pipe = path_info.split('/')[1] if pipe in self.pipelist: return getattr(self, pipe).db_file except KeyError: return None return None @cherrypy.expose def index(self): """ Pipeline index. """ html = html_tmp + '

    Pipeline Index

    Select a pipeline to browse

      ' for k in self.pipelist: html += '
    • %s
    • '%(k,k) html += '
    ' return html class MyApp(cherrypy.Application): def __init__(self, domain, domaindir, script_name="", config=None): self.domain = domain self.domaindir = domaindir root = PipeIndex() for pipe, dbfile in cherrypy.config['Pipelines'].iteritems(): setattr(root, pipe, Web(os.path.expanduser(dbfile))) cherrypy.Application.__init__(self, root, script_name, config={'/':{}}) def start(config, config_file): """Start the web server in daemon mode. Store the PID in the file specified in config. Parameters ---------- config: dict where internal config is stored config_file: string, user config file """ from cherrypy.process.plugins import Daemonizer, PIDFile cherrypy.config.update(config) cherrypy.config.update(config_file) cherrypy.tree.mount(PipeIndex(),"",config) d = Daemonizer(cherrypy.engine) d.subscribe() p = PIDFile(cherrypy.engine, config['global']['server.pidfile']) p.subscribe() cherrypy.engine.start() def stop(config, config_file): """Stop the web server by sending the TERM signal to the daemon process. Look for the PID in the file specified in config. Parameters ---------- config: dict, where internal config is stored config_file: string, user config file """ import signal cherrypy.config.update(config) cherrypy.config.update(config_file) pidf = open(config['global']['server.pidfile']) pid = int(pidf.read()) pidf.close() os.kill(pid, signal.SIGTERM) def update_config(pipename, pipepath, config_file): """ Add a new Pipeline entry to the config file. If the config file does not exist, create a new one. Parameters ---------- pipename: string, pipe short name pipepath: string, pipe instance root path config_file: string, user config file """ import ConfigParser c = ConfigParser.ConfigParser() c.read(config_file) if not c.has_section('Pipelines'): c.add_section('Pipelines') if os.path.isfile(pipepath): pipepath = os.path.expanduser(pipepath) pipepath = os.path.abspath(pipepath) c.set('Pipelines', pipename, '"'+pipepath+'"') f=file(config_file,'w') c.write(f) f.close() else: print pipepath+ " not found" def main(): import optparse parser = optparse.OptionParser(usage="\nTo start/stop the web server:\n %prog start | stop\nTo track a new pipeline under the name with the web interface:\n %prog track ") parser.add_option('-H', '--host', help='serve this address',default='0.0.0.0') parser.add_option('-c', '--config-file', help='Read pipelet configuration from this file',default=os.path.expanduser('~/.pipelet')) parser.add_option('-p', '--port', help='port the server is listenning to', default=8080, type='int') parser.add_option('-n', '--no-daemon', help='Run the web server interactively instead of starting a daemon, log errors and access to the screen. Stop with C-c.', default=False, action='store_true') parser.add_option('-k', '--krenew', help='In deamon mode ask and renew afs token.', default=False, action='store_true') parser.add_option('-l', '--error-file', help='Logging file for errors when run in daemon mode', default=os.path.expanduser('~/pipelet.errors')) parser.add_option('-a', '--access-file', help='Logging file for access when run in daemon mode', default=os.path.expanduser('~/pipelet.access')) parser.add_option('-i', '--pid-file', help='Store the pid in daemon mode', default=os.path.expanduser('~/pipelet.pid')) (options, args) = parser.parse_args() config = {'/static': {'tools.staticdir.on': True, 'tools.staticdir.dir': os.path.join(current_dir,'static'), }, 'global':{'server.socket_port':options.port, 'server.socket_host':options.host, 'server.thread_pool':10, 'log.screen':False, 'log.error_file': options.error_file, 'log.access_file':options.access_file, 'server.pidfile':options.pid_file} } if options.no_daemon: config['global']['log.screen'] = True cherrypy.config.update(config) cherrypy.config.update(options.config_file) cherrypy.tree.mount(PipeIndex(),"",config) cherrypy.engine.start() cherrypy.engine.block() exit(0) if options.krenew: cherrypy.config.update(config) cherrypy.config.update(options.config_file) cherrypy.tree.mount(PipeIndex(),"",config) cherrypy.engine.start() cherrypy.engine.block() exit(0) if len(args) < 1: parser.print_usage() exit(-1) if args[0] == 'start': start(config, options.config_file) elif args[0] == 'stop': stop(config, options.config_file) elif args[0] == 'restart': restart() elif args[0] == 'track': if len(args) != 3: parser.print_usage() exit(-1) update_config(args[1], args[2], options.config_file) else: parser.print_usage() exit(-1) if __name__ == "__main__": main()