Commit 0602dae0 authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' into newbase

Conflicts:
	pipelet/repository.py
	pipelet/web.py
parents d0957430 a415da39
......@@ -161,36 +161,42 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret'):
for w in processlist:
w.wait()
def launch_pbs(pipe, host_list, address=None, authkey='secret'):
def launch_pbs(pipe, n, address=None, authkey='secret', job_dir="/wrk/lejeune/pipelet/scripts", job_name="job_", log_dir="/wrk/lejeune/pipelet/logs", cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
pass
# s = scheduler.Scheduler(pipe)
# SchedulerManager.register('get_scheduler', callable=lambda:s)
"""
s = scheduler.Scheduler(pipe)
# mgr = SchedulerManager(address=address, authkey=authkey)
# mgr.start()
# processlist = []
# for i, h in enumerate(host_list):
# jfilename="job_%d.py"%i
# with closing(file(jfilename,'w')) as f:
# f.write(_job_file%(str(address),str(authkey)))
# _scp(jfilename, h+':')
# w = subprocess.Popen(['ssh', h, "python %s"%jfilename])
# processlist.append(w)
SchedulerManager.register('get_scheduler', callable=lambda:s)
if server:
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
jobfile = path.join(job_dir,"%s%d.py"%(job_name,i))
errfile = path.join(log_dir,"e_%s%d"%(job_name,i))
logfile = path.join(log_dir,"o_%s%d"%(job_name,i))
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s"%(address[0],address[1],authkey))
f.close()
subprocess.Popen(['qsub',jobfile]).communicate()[0]
if server:
print 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# sched_proxy = mgr.get_scheduler()
# sched_proxy.run()
# # joining Zombie process
# for w in processlist:
# w.wait()
# for i, h in enumerate(host_list):
# subprocess.Popen(['ssh', h, "rm -f %s"%jfilename])
_job_file = """
#/bin/zsh
......
......@@ -118,7 +118,6 @@ class Pipeline:
self._curr_dirs = {}
## dict, hash key corresponding to each segment
self._hashes = {}
self.compute_hash()
## string, sql data base
......@@ -345,14 +344,16 @@ class Pipeline:
"""
hash = crc32()
currdir = self._prefix
self.repository.save_all_string(seg)
for k in self._parents[seg]:
h, currdir = self._compute_hash(k)
hash.update(h)
files = self.repository.get_all_files(seg)
h = get_hashkey(files, hash).base32_digest()
s = self.repository.get_all_string(seg)
h = get_hashkey(s, hash).base32_digest()
self._hashes[seg] = h
currdir = path.join(currdir, 'seg_%s_%s'%(seg,h))
self._curr_dirs[seg] = currdir
return h, currdir
def compute_hash(self):
......@@ -362,6 +363,7 @@ class Pipeline:
if not v:
self._compute_hash(k)
def get_full_seg_name(self, seg):
""" Return segment full name (segment name + hashkey).
......@@ -432,6 +434,19 @@ class Pipeline:
"""
return self._curr_dirs[seg]
def get_output_file(self, seg):
""" Return the segment output file
Parameters
----------
seg : string, segment name.
Returns
-------
string, segment directory.
"""
return path.join(self.get_curr_dir(seg),'seg_%s.output'%seg)
def get_param_file(self, seg):
""" Return the segment directory.
......
......@@ -31,6 +31,15 @@ class Repository:
_ext_re = re.compile('.py$|.m$')
def __init__(self):
""" Initialize a repository.
"""
## dict, code string corresponding to each segment
self._code = {}
self._hook = {}
def _ext_filter(self, f):
""" Check if the extension file is ok.
......@@ -54,8 +63,8 @@ class Repository:
"""
return bool(self._ext_re.match(path.splitext(f)[1]))
def get_all_files(self, seg):
""" Return the list of files that should enter in the
def get_all_string(self, seg):
""" Return the code string that should enter in the
computation of the hash key.
Parameters
......@@ -64,7 +73,7 @@ class Repository:
Returns
-------
string list, name of files.
string
Exemples
--------
......@@ -72,28 +81,74 @@ class Repository:
>>> print(c.get_all_files('first'))
['../test/seg_first_code.py']
"""
if self.get_args_file(seg):
return [self.get_code_file(seg), self.get_args_file(seg)] + self.get_dependencies(seg)
else:
return [self.get_code_file(seg)] + self.get_dependencies(seg)
s = self._code[seg]
for k,v in self._hook[seg].items():
s = s+v
return s
def get_dependencies(self, seg):
""" Return the segment dependencies.
def save_all_string (self, seg):
""" Save code file and hooks into dictionnaries
Right now return an empty list. In the near future, it could
be used to search the segment code for some kind of depend
clause.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
"""
file = self.get_code_file(seg)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._code[seg] = code
else:
self._code[seg] = ""
lst_hook = self.get_hook_list(seg)
self._hook[seg] = {}
for h in lst_hook:
file = self.get_hook_file(seg, h)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._hook[seg][h] = code
else:
self._hook[seg][h] = ""
def get_code_string(self, seg):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg : string, name of the segment.
seg: string, segment name
Returns
-------
string list, name of source files.
string
"""
return []
return self._code[seg]
def get_hook_list(self, seg):
""" Return the list of hooks
Parameters
----------
seg: string, segment name
Returns
-------
string list
"""
if not self._hook_list:
self._hook_list = []
c = self._code[seg]
for l in c.split("hook(")[1:]:
self._hook_list.append(l.split(",")[0][1:-1])
return self._hook_list
def get_docline(self, seg):
""" Return the segment synopsis doc line.
......@@ -108,6 +163,22 @@ class Repository:
import pydoc
return pydoc.source_synopsis(self.get_code_file(seg))
def get_hook_string(self, seg, hook):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string
"""
return self._hook[seg][hook]
class LocalRepository(Repository):
""" A local repository.
......@@ -123,20 +194,16 @@ class LocalRepository(Repository):
src_path : where to find segment's source code files
lib_path : where to find segment's library
"""
## dict, code string corresponding to each segment
self._code = {}
self._hook = {}
self._hook_list = []
## string, where to find segment's source code file
self.src_path = src_path
## string, where to find segment's library
self.lib_path = None
if lib_path is None:
try:
self.lib_path = path.realpath(path.join(pipelet.inst.__path__[0],'../lib'))
except:
self.lib_path = None
#TODO log
pass
else:
self.lib_path = lib_path
self.lib_path = lib_path
def get_code_file(self, seg):
""" Return the filename of the segment code.
......@@ -159,9 +226,13 @@ class LocalRepository(Repository):
f = [filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_code.*'%seg)))[0]]
except:
#f = filter(self._ext_filter,
# glob(path.join(self.lib_path, 'seg_%s_code.*'%seg)))
f = []
if len(f) == 0:
try:
f = [filter(self._ext_filter,
glob(path.join(self.lib_path, 'seg_%s_code.*'%seg)))[0]]
except:
f = []
if len(f) == 0:
try:
s = re.findall('^(.+?)\d+$',seg)[0]
......@@ -170,60 +241,45 @@ class LocalRepository(Repository):
raise Exception('No source file corresponding to segment')
return f[0]
def get_args_file(self, seg):
""" Check the existence of a local parameter file for the
def get_hook_file(self, seg, hook):
""" Check the existence of a local hook file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
hook : string, name of the hook.
Returns
-------
string, name of the argument file.
string, name of the hook file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_args_file('dbm'))
['../test/seg_dbm_args.py']
>>> print(c.get_hook_file('dbm', 'preproc'))
['../test/seg_dbm_preproc.py']
"""
try:
f = filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_args.*'%seg)))[0]
glob(path.join(self.src_path, 'seg_%s_%s.*'%(seg,hook))))[0]
except:
f = None
return f
def get_visu_file(self, seg):
""" Check the existence of a visualization file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
Returns
-------
string, name of the argument file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(path.samefile(c.get_visu_file('dbm'),'../test/seg_dbm_visu.py'))
True
"""
f = []
if len(f) == 0:
try:
f = [filter(self._ext_filter,
glob(path.join(self.lib_path, 'seg_%s_%s.*'%(seg,hook))))[0]]
except:
f = []
if len(f) == 0:
try:
s = re.findall('^(.+?)\d+$',seg)[0]
f = [self.get_hook_file(s, hook)]
except:
raise Exception('No source file corresponding to hook')
return f[0]
try:
f = filter(self._ext_filter,
glob(path.join(self.src_path, 'seg_%s_visu.*'%seg)))[0]
except:
#f = filter(self._ext_filter,
# glob(path.join(self.lib_path, 'seg_%s_visu.*'%seg)))
#if len(f) > 0:
# f = f[0]
f = None
return f
class GitRepository(Repository):
......@@ -236,9 +292,7 @@ class GitRepository(Repository):
pass
def get_code_file(self, seg):
pass
def get_args_file(self, seg):
pass
def get_visu_file(self, seg):
def get_hook_file(self, seg):
pass
class CVSRepository(Repository):
......@@ -251,9 +305,7 @@ class CVSRepository(Repository):
pass
def get_code_file(self, seg):
pass
def get_args_file(self, seg):
pass
def get_visu_file(self, seg):
def get_hook_file(self, seg):
pass
......
......@@ -273,9 +273,21 @@ class Scheduler():
if not path.exists(d):
os.mkdir(d)
r = self.pipe.repository
for f in (r.get_code_file(seg),r.get_args_file(seg),r.get_visu_file(seg)):
if f and path.exists(f):
shutil.copy(f,d)
f = r.get_code_file(seg)
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_code_string(seg))
fid.close()
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_file(seg, h)
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_hook_string(seg, h))
fid.close()
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
......@@ -411,8 +423,11 @@ class Scheduler():
self.stopping = True
for i in range(self.nb_worker):
self.task_queue.put(None)
# wait for workers to complete
logger.info("Waiting for workers to complete...")
if self.nb_worker == 0:
self.stop_event.set()
self.stop_event.wait()
logger.info("No more workers. Stopping the sqltracker ...")
self.tracker.stop()
......
......@@ -227,15 +227,15 @@ def hash_file(codefile):
return key
def get_hashkey(files, starthash):
"""Return a unique key which identify code files and dependencies
def get_hashkey(s, starthash):
"""Return a unique key which identify code string
using cheksum algorithm.
Comment lines and blanks are not taken into account in code files.
Comment lines and blanks are not taken into account in code string.
Parameters
----------
files: string or string list
s: string
starthash: an Hash instance
Returns
......@@ -244,22 +244,14 @@ def get_hashkey(files, starthash):
Examples
--------
>>> print(get_hashkey(['../test/codesample.py']))
>>> print(get_hashkey("a = 5"))
B6EB81FE
"""
if isinstance(files,str):
files = [files]
if not starthash:
starthash = crc32()
for f in files:
of = file(f)
if is_code_file(f):
starthash.update(reduced_code_formatting(of.read()))
else:
starthash.update(of.read())
of.close()
starthash.update(reduced_code_formatting(s))
return starthash
......
......@@ -222,7 +222,9 @@ class Web:
html = html_tmp + '<h1> Content of %s </h1> <div class="list"><ul>'%directory
for filename in glob(os.path.join(directory,'*')):
absPath = os.path.abspath(filename)
if os.path.isdir(absPath):
if os.path.islink(absPath):
html += '<li><a href="pipedir?segid=%d&directory='%int(segid) + absPath + '">' + os.path.basename(filename)+"("+os.path.realpath(filename)+")" + "</a></li>"
elif os.path.isdir(absPath):
html += '<li><a href="pipedir?segid=%d&directory='%int(segid) + absPath + '">' + os.path.basename(filename) + "</a></li>"
else:
html += '<li><a href="download?segid=%d&filepath='%int(segid) + absPath + '">' + os.path.basename(filename) + "</a> </li>"
......
......@@ -150,7 +150,7 @@ class Worker(object):
----------
task_output: task result.
"""
fn = self.get_data_fn('.pipe_res') # TODO move this to pipe
fn = self.pipe.get_output_fn()
with closing (file(fn,'w')) as f:
r = pickle.dump(task_output,f)
......@@ -279,22 +279,24 @@ class Worker(object):
return glob(path.join(self.pipe.get_data_dir(segx),y))\
+glob(path.join(self.pipe.get_data_dir(segx),path.join('*/',y)))
def overload_param(self, glo):
""" Update parameter values.
def hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code_file = self.pipe.repository.get_args_file(self.task.seg)
if code_file:
execfile(code_file, glo)
code = self.pipe.repository.get_hook_string(self.task.seg, hook_name)
if code:
exec(code, glo)
else:
print "No arg file for seg %s"%self.task.seg
print "No hook file named %s for seg %s"%(self.task.seg,hook_name)
def prepare_env(self, task):
""" Build the segment global execution environment for a given task.
......@@ -320,7 +322,8 @@ class Worker(object):
'load_products':self.load_products,
'save_products':self.save_products,
'logged_subprocess': self.logged_subprocess,
'overload_param': self.overload_param}
#'overload_param': self.overload_param}
'hook': self.hook}
return glo
def clean_tmp(self):
......@@ -455,8 +458,8 @@ class InteractiveWorker(Worker):
self.make_dir(task)
glo = self.prepare_env(task)
self.task = task
code_file = self.pipe.repository.get_code_file(seg)
execfile(code_file, glo) # Execute the segment
code = self.pipe.repository.get_code_string(seg)
exec (code, glo)
try: # set product
res = glo['res']
except:
......@@ -510,9 +513,9 @@ class ThreadWorker(Worker, threading.Thread):
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
execfile(code_file, glo)
exec (code, glo)
except Exception:
etype, value, tb = traceback.sys.exc_info()
f = file(glo['get_data_fn']("segment_error.log"),"w")
......@@ -587,9 +590,9 @@ class ProcessWorker(Worker, Process):
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
code_file = self.pipe.repository.get_code_file(seg)
code = self.pipe.repository.get_code_string(seg)
try: # Execute the segment
execfile(code_file, glo)
exec (code, glo)
except AbortError, e:
raise e
except Exception:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment