Commit c35555f6 authored by Betoule Marc's avatar Betoule Marc
Browse files

Add the fundamental command line tool "lspipe"

parent 2294ec2a
import cPickle as pickle
import os.path
from glob import glob
import sqlite3
import sys
import subprocess
import datetime
import optparse
def recurs_path(p):
n = os.path.abspath(p)
yield n
while n!='/':
n = os.path.dirname(n)
yield n
class Cache():
def __init__(self):
pipeletdir = os.path.expanduser('~/.pipelet')
self.dbs = {}
self.last = ""
if not os.path.exists(pipeletdir):
self.dbcachefile = os.path.join(pipeletdir,'dbcache.pkl')
if os.path.exists(self.dbcachefile):
self.dbs = pickle.load(_f)
self.last= pickle.load(_f)
def getdb(self, pipepath, dbfile):
if dbfile:
if dbfile.endswith('.sql'):
_dbfile = dbfile
import ConfigParser
conf = ConfigParser.ConfigParser()'~/.pipelet/pipelet.conf'))
_dbfile = conf.get('Pipelines',dbfile).translate(None,'''"'"''')
# elif pipepath:
# try:
# _dbfile=self.dbs[pipepath][0]
# except KeyError:
# for p in recurs_path(pipepath):
# try:
# _dbfile = glob(os.path.join(p,'.sqlstatus*'))[0]
# break
# except IndexError:
# pass
#self.dbs[pipepath]=[dbfile, datetime.datetime(1, 1, 1, 0, 0)]
#if - self.dbs[pipepath][1] > datetime.timedelta(5,0,0):
# subprocess.Popen(['rsync', dbfile, '.pipecache/'+str(hash(dbfile))]).communicate()
# self.dbs[pipepath][1] =
c = sqlite3.connect(_dbfile,check_same_thread=True)
except sqlite3.OperationalError, e:
print 'Unable to connect to %s'%_dbfile
raise e
return c
def __del__(self):
pickle.dump(self.dbs, _f)
pickle.dump(self.last, _f)
class Pipe():
def __init__(self, pipepath="", tag="", dbfile=""):
self.path = pipepath
self.tag = tag
self.cache = Cache()
self.conn = self.cache.getdb(pipepath, dbfile)
if tag:
s = self.conn.execute('select seg_id from segments where tag like ?',("%"+tag+"%",)).fetchone()
s = self.conn.execute('select seg_id from segments where curr_dir like ?',("%"+self.path+"%",)).fetchone()
lstseg = [s[0]]
self.lstseg = lstseg
del self.cache
def _get_fathers(self, segid):
""" Append upstream pipeline instance paths to the current path.
Return a list which contains all upstream segment instances
id for a given segment instance. This is used to print a pipeline
tree view with all dependencies.
segid: id of the leaf segment.
list of segid, for the upstream segment instances.
lstid = [int(segid)]
fids = self.conn.execute(
'select father_id from segment_relations where child_id = ?'
if fids:
for l in fids:
lstid += self._get_fathers(l[0])
return lstid
def get_seg_par(self, seg):
l = self.conn.execute('select curr_dir from segments where (seg glob ?) and seg_id IN ('+','.join('?'*len(self.lstseg))+')',(seg,)+tuple(self.lstseg)).fetchall()
return l
class PermissiveOptionParser(optparse.OptionParser):
def _process_args(self, largs, rargs, values):
"""_process_args(largs : [string],
rargs : [string],
values : Values)
Process command-line arguments and populate 'values', consuming
options and arguments from 'rargs'. If 'allow_interspersed_args' is
false, stop at the first non-option argument. If true, accumulate any
interspersed non-option arguments in 'largs'.
self.unknow_options = []
while rargs:
arg = rargs[0]
# We handle bare "--" explicitly, and bare "-" is handled by the
# standard arg handler since the short arg case ensures that the
# len of the opt string is greater than 1.
if arg == "--":
del rargs[0]
elif arg[0:2] == "--":
# process a single long option (possibly with value(s))
self._process_long_opt(rargs, values)
except (optparse.BadOptionError, optparse.OptionValueError), err:
del rargs[0]
elif arg[:1] == "-" and len(arg) > 1:
# process a cluster of short options (possibly with
# value(s) for the last one only)
self._process_short_opts(rargs, values)
except (optparse.BadOptionError, optparse.OptionValueError), err:
return # stop now, leave this arg in rargs
if __name__=="__main__":
parser = PermissiveOptionParser(usage="""
%prog [-d database.sql] -s seg [-s seg ...] -t tag [pattern]
%prog [-d database.sql] -s seg [-s seg ...] -p path [pattern]
%prog tag[@database]:seg/pattern
%prog #pathfragment[@database]:seg/pattern
Return the files matching pattern in the segment directory seg of
the pipeline identified by tag or path fragment. If the database is not
specified the last accessed database is used (if there is one).
Elements of the pattern syntax:
database: Can be either a sqlite3 file or a name. In the last case
the name is search for in the .pipelet/pipelet.conf file.
tag: one or more existing pipeline tags in the database. To specify
several tags use comma to separate tags as in tag1,tag2. Note that
tag names containing '@', '/', ',', '#' or ':' cannot be handled
correctly at this stage so consider such a naming as a bad practice.
#pathfragment: a fragment of the leaf segment path preceeded by #.
seg: a segment name. Unix style globbing is accepted to specify
segment names.
pattern: Bash-style file pattern. Everything behaves as if the
sequence tag[@database]:seg/ had replaced by the matching segments
root directory.
Do not forget to enclose the url with "" to prevent the shell to
perform the expansion.
lspipe "snls3r,snlssdss@jla1:cosmo/data/*/*.dat"
parser.add_option('-U', '--unsorted', action='store_true', default=False,
help='Use echo instead of ls to prevent sorting',)
(options, args) = parser.parse_args()
ldir = []
for arg in args:
pipeid, fileid = arg.split(':')
if '@' in pipeid:
pipeid, db=pipeid.split('@')
db = ""
seg = fileid.split('/')[0]
pattern = "/".join(fileid.split('/')[1:])
for tag in pipeid.split(','):
if tag.startswith('#'):
pipe = Pipe(tag[1:], "", db)
pipe = Pipe('', tag, db)
ldir.extend([os.path.join(p[0], pattern) for p in pipe.get_seg_par(seg)])
if options.unsorted:
stdout, stderr = subprocess.Popen("echo "+" ".join(ldir)+" "+" ".join(parser.unknow_options),
#stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, executable="/bin/bash").communicate()
stdout, stderr = subprocess.Popen("ls "+" ".join(ldir)+" "+" ".join(parser.unknow_options),
#stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, executable="/bin/bash").communicate()
# if stderr:
# print >> sys.stderr, stderr
# res += stdout.split()
# else:
# res += [p[0]]
# print " ".join(res)
...@@ -30,5 +30,5 @@ setup(name='pipelet', ...@@ -30,5 +30,5 @@ setup(name='pipelet',
'pipelet.task', 'pipelet.utils', 'pipelet.web'], 'pipelet.task', 'pipelet.utils', 'pipelet.web'],
ext_modules=[Extension('pipelet.ctools', ['pipelet/ctools.c'])], ext_modules=[Extension('pipelet.ctools', ['pipelet/ctools.c'])],
package_data={'pipelet': ['static/*']}, package_data={'pipelet': ['static/*']},
scripts=['scripts/pipeweb', 'scripts/pipeutils', 'scripts/pipeletd', 'scripts/pipeweb_autostart.cgi', 'scripts/pipeweb_autostart.fcgi' ] scripts=['scripts/pipeweb', 'scripts/pipeutils', 'scripts/lspipe', 'scripts/pipeletd', 'scripts/pipeweb_autostart.cgi', 'scripts/pipeweb_autostart.fcgi' ]
) )
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment