Commit 81b5af8e authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'master' of git@gitorious.org:pipelet/pipelet

parents 39262af4 85c42ff1
......@@ -5,12 +5,34 @@
import cherrypy
import sqlite3
def read_access(func):
""" Decorator to set read only access to a web page (cherrypy web function)
Parameters
----------
func: web function
"""
return cherrypy.tools.digest_auth(realm="pipeweb",users=lambda: get_credentials(1))(func)
def write_access(func):
""" Decorator to set read only accessto a web page (cherrypy web function)
Parameters
----------
func: web function
"""
return cherrypy.tools.digest_auth(realm="pipeweb",users=lambda: get_credentials(2))(func)
def get_credentials(access_level=1):
""" Get users/passwd from pipe data base.
Parameters
----------
access_level: int, 1 for read-only or 2 for read-write access
Returns
-------
dictionnary of user and passwd
"""
db_file = cherrypy.request.app.root.which_sqlfile(cherrypy.request.path_info)
try:
conn = sqlite3.connect(db_file,check_same_thread=True)
......@@ -28,6 +50,12 @@ def get_credentials(access_level=1):
return dic
def acl_setup(db_file):
""" Set user/passwd table in pipe database.
Parameters
----------
db_file: string, pipe data base file
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
if not conn.execute('select tbl_name from sqlite_master where tbl_name == "users"').fetchone():
......@@ -35,6 +63,15 @@ def acl_setup(db_file):
conn.close()
def add_user(db_file, user, passwd, access_level=1):
""" Add user entry to data base.
Parameters
----------
db_file: string, pipe data base file
user: string, user name
passwd: string, user password
access_level: int, 1 for read-only or 2 for read-write access
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
acl_setup(db_file)
with conn:
......@@ -42,6 +79,13 @@ def add_user(db_file, user, passwd, access_level=1):
conn.close()
def del_user(db_file, user):
""" Delete user entry from data base.
Parameters
----------
db_file: string, pipe data base file
user: string, user name
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
conn.execute('delete from users where user = ?' % user)
......
......@@ -118,7 +118,6 @@ class Pipeline:
self._curr_dirs = {}
## dict, hash key corresponding to each segment
self._hashes = {}
self.compute_hash()
## string, sql data base
......@@ -345,14 +344,16 @@ class Pipeline:
"""
hash = crc32()
currdir = self._prefix
self.repository.save_all_string(seg)
for k in self._parents[seg]:
h, currdir = self._compute_hash(k)
hash.update(h)
files = self.repository.get_all_files(seg)
h = get_hashkey(files, hash).base32_digest()
s = self.repository.get_all_string(seg)
h = get_hashkey(s, hash).base32_digest()
self._hashes[seg] = h
currdir = path.join(currdir, 'seg_%s_%s'%(seg,h))
self._curr_dirs[seg] = currdir
return h, currdir
def compute_hash(self):
......@@ -362,6 +363,7 @@ class Pipeline:
if not v:
self._compute_hash(k)
def get_full_seg_name(self, seg):
""" Return segment full name (segment name + hashkey).
......
......@@ -31,6 +31,15 @@ class Repository:
_ext_re = re.compile('.py$|.m$')
def __init__(self):
""" Initialize a repository.
"""
## dict, code string corresponding to each segment
self._code = {}
self._hook = {}
def _ext_filter(self, f):
""" Check if the extension file is ok.
......@@ -54,8 +63,8 @@ class Repository:
"""
return bool(self._ext_re.match(path.splitext(f)[1]))
def get_all_files(self, seg):
""" Return the list of files that should enter in the
def get_all_string(self, seg):
""" Return the code string that should enter in the
computation of the hash key.
Parameters
......@@ -64,7 +73,7 @@ class Repository:
Returns
-------
string list, name of files.
string
Exemples
--------
......@@ -72,28 +81,92 @@ class Repository:
>>> print(c.get_all_files('first'))
['../test/seg_first_code.py']
"""
if self.get_args_file(seg):
return [self.get_code_file(seg), self.get_args_file(seg)] + self.get_dependencies(seg)
s = self._code[seg]
for k,v in self._hook[seg].items():
s = s+v
return s
def save_all_string (self, seg):
""" Save code file and hooks into dictionnaries
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
"""
file = self.get_code_file(seg)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._code[seg] = code
else:
return [self.get_code_file(seg)] + self.get_dependencies(seg)
self._code[seg] = ""
lst_hook = self.get_hook_list(seg)
self._hook[seg] = {}
for h in lst_hook:
file = self.get_hook_file(seg, h)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._hook[seg][h] = code
else:
self._hook[seg][h] = ""
def get_code_string(self, seg):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
def get_dependencies(self, seg):
""" Return the segment dependencies.
Parameters
----------
seg: string, segment name
Right now return an empty list. In the near future, it could
be used to search the segment code for some kind of depend
clause.
Returns
-------
string
"""
return self._code[seg]
def get_hook_list(self, seg):
""" Return the list of hooks
Parameters
----------
seg : string, name of the segment.
seg: string, segment name
Returns
-------
string list, name of source files.
string list
"""
return []
if not self._hook_list:
self._hook_list = []
c = self._code[seg]
for l in c.split("hook(")[1:]:
self._hook_list.append(l.split(",")[0][1:-1])
return self._hook_list
def get_hook_string(self, seg, hook):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string
"""
return self._hook[seg][hook]
class LocalRepository(Repository):
""" A local repository.
......@@ -110,7 +183,12 @@ class LocalRepository(Repository):
src_path : where to find segment's source code files
lib_path : where to find segment's library
"""
## dict, code string corresponding to each segment
self._code = {}
self._hook = {}
self._hook_list = []
## string, where to find segment's source code file
self.src_path = src_path
## string, where to find segment's library
......@@ -157,60 +235,33 @@ class LocalRepository(Repository):
raise Exception('No source file corresponding to segment')
return f[0]
def get_args_file(self, seg):
""" Check the existence of a local parameter file for the
def get_hook_file(self, seg, hook):
""" Check the existence of a local hook file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
hook : string, name of the hook.
Returns
-------
string, name of the argument file.
string, name of the hook file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_args_file('dbm'))
['../test/seg_dbm_args.py']
>>> print(c.get_hook_file('dbm', 'preproc'))
['../test/seg_dbm_preproc.py']
"""
try:
f = filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_args.*'%seg)))[0]
glob(path.join(self.src_path, 'seg_%s_%s.*'%(seg,hook))))[0]
except:
f = None
return f
def get_visu_file(self, seg):
""" Check the existence of a visualization file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
Returns
-------
string, name of the argument file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(path.samefile(c.get_visu_file('dbm'),'../test/seg_dbm_visu.py'))
True
"""
try:
f = filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_visu.*'%seg)))[0]
except:
#f = filter(self._ext_filter,
# glob(path.join(self.lib_path, 'seg_%s_visu.*'%seg)))
#if len(f) > 0:
# f = f[0]
f = None
return f
class GitRepository(Repository):
......@@ -223,9 +274,7 @@ class GitRepository(Repository):
pass
def get_code_file(self, seg):
pass
def get_args_file(self, seg):
pass
def get_visu_file(self, seg):
def get_hook_file(self, seg):
pass
class CVSRepository(Repository):
......@@ -238,9 +287,7 @@ class CVSRepository(Repository):
pass
def get_code_file(self, seg):
pass
def get_args_file(self, seg):
pass
def get_visu_file(self, seg):
def get_hook_file(self, seg):
pass
......
......@@ -273,9 +273,21 @@ class Scheduler():
if not path.exists(d):
os.mkdir(d)
r = self.pipe.repository
for f in (r.get_code_file(seg),r.get_args_file(seg),r.get_visu_file(seg)):
if f and path.exists(f):
shutil.copy(f,d)
f = r.get_code_file(seg)
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_code_string(seg))
fid.close()
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_file(seg, h)
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_hook_string(seg, h))
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
......
......@@ -227,15 +227,15 @@ def hash_file(codefile):
return key
def get_hashkey(files, starthash):
"""Return a unique key which identify code files and dependencies
def get_hashkey(s, starthash):
"""Return a unique key which identify code string
using cheksum algorithm.
Comment lines and blanks are not taken into account in code files.
Comment lines and blanks are not taken into account in code string.
Parameters
----------
files: string or string list
s: string
starthash: an Hash instance
Returns
......@@ -244,22 +244,14 @@ def get_hashkey(files, starthash):
Examples
--------
>>> print(get_hashkey(['../test/codesample.py']))
>>> print(get_hashkey("a = 5"))
B6EB81FE
"""
if isinstance(files,str):
files = [files]
if not starthash:
starthash = crc32()
for f in files:
of = file(f)
if is_code_file(f):
starthash.update(reduced_code_formatting(of.read()))
else:
starthash.update(of.read())
of.close()
starthash.update(reduced_code_formatting(s))
return starthash
......
......@@ -277,6 +277,11 @@ def start(config, config_file):
"""Start the web server in daemon mode.
Store the PID in the file specified in config.
Parameters
----------
config: dict where internal config is stored
config_file: string, user config file
"""
from cherrypy.process.plugins import Daemonizer, PIDFile
cherrypy.config.update(config)
......@@ -292,9 +297,14 @@ def stop(config, config_file):
"""Stop the web server by sending the TERM signal to the daemon process.
Look for the PID in the file specified in config.
Parameters
----------
config: dict, where internal config is stored
config_file: string, user config file
"""
import signal
cherrypy.config.update(config_file)
cherrypy.config.update(config)
cherrypy.config.update(config_file)
pidf = open(config['global']['server.pidfile'])
pid = int(pidf.read())
......@@ -305,6 +315,12 @@ def update_config(pipename, pipepath, config_file):
""" Add a new Pipeline entry to the config file.
If the config file does not exist, create a new one.
Parameters
----------
pipename: string, pipe short name
pipepath: string, pipe instance root path
config_file: string, user config file
"""
import ConfigParser
c = ConfigParser.ConfigParser()
......
......@@ -279,22 +279,24 @@ class Worker(object):
return glob(path.join(self.pipe.get_data_dir(segx),y))\
+glob(path.join(self.pipe.get_data_dir(segx),path.join('*/',y)))
def overload_param(self, glo):
""" Update parameter values.
def hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code_file = self.pipe.repository.get_args_file(self.task.seg)
if code_file:
execfile(code_file, glo)
code = self.pipe.repository.get_hook_string(self.task.seg, hook_name)
if code:
exec(code, glo)
else:
print "No arg file for seg %s"%self.task.seg
print "No hook file named %s for seg %s"%(self.task.seg,hook_name)
def prepare_env(self, task):
""" Build the segment global execution environment for a given task.
......@@ -320,7 +322,8 @@ class Worker(object):
'load_products':self.load_products,
'save_products':self.save_products,
'logged_subprocess': self.logged_subprocess,
'overload_param': self.overload_param}
#'overload_param': self.overload_param}
'hook': self.hook}
return glo
def clean_tmp(self):
......@@ -455,8 +458,8 @@ class InteractiveWorker(Worker):
self.make_dir(task)
glo = self.prepare_env(task)
self.task = task
code_file = self.pipe.repository.get_code_file(seg)
execfile(code_file, glo) # Execute the segment
code = self.pipe.repository.get_code_string(seg)
exec (code, glo)
try: # set product
res = glo['res']
except:
......@@ -510,9 +513,9 @@ class ThreadWorker(Worker, threading.Thread):
prod = task.prod
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
execfile(code_file, glo)
exec (code, glo)
except Exception:
etype, value, tb = traceback.sys.exc_info()
f = file(glo['get_data_fn']("segment_error.log"),"w")
......@@ -587,9 +590,9 @@ class ProcessWorker(Worker, Process):
prod = task.prod
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
execfile(code_file, glo)
exec (code, glo)
except AbortError, e:
raise e
except Exception:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment