Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 5e733fab authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'v1.0' of git://gitorious.org/pipelet/pipelet into v1.0

parents e8484089 33fdd1a8
......@@ -115,7 +115,8 @@ class Pipeline:
## string, the location of the segment source code
self.repository = None
if code_dir:
self.repository = LocalRepository(code_dir)
print self._seg_list
self.repository = LocalRepository(self._seg_list, code_dir)
## string, indicates where to save the pipeline products.
self._prefix = path.realpath(prefix)
if not os.path.exists(prefix):
......@@ -216,6 +217,7 @@ class Pipeline:
c = s.find(sp,a.start())
a = _edge.search(s,c)
segs = set(flatten(r))
self._seg_list = segs
self._parents=dict([(seg,[]) for seg in segs])
self._children=dict([(seg,[]) for seg in segs])
for c in r:
......@@ -279,6 +281,7 @@ class Pipeline:
>>> print(a._children)
{'second': [], 'first': ['second']}
"""
self._seg_list=seg_list
self._children = dict(zip(seg_list,[[s] for s in seg_list[1:]]+[[]]))
self._parents = dict(zip(seg_list,[[]]+[[s] for s in seg_list[:-1]]))
......@@ -395,7 +398,7 @@ class Pipeline:
"""
hash = crc32()
currdir = self._prefix
self.repository.save_all_string(seg)
#self.repository.save_all_string(seg)
for k in self._parents[seg]:
h, currdir = self._compute_hash(k)
hash.update(h)
......
......@@ -26,51 +26,100 @@ class Repository:
A reposiroty is the location where to find the segment's source
code files. It returns the full path of all files that will be
used to execute the segment (code, visu, args, ...). By default,
used to execute the segment (code, hook, ...). By default,
local repositories are used, but it can also be a version control
system (git, CVS, ...).
"""
_ext_re = re.compile('.py$|.m$')
def __init__(self):
def __init__(self, lstseg):
""" Initialize a repository.
"""
## dict, code string corresponding to each segment
self._code = {}
## dict, code string corresponding to each hook
self._hook = {}
## dict, code string corresponding to each external dependency
self._deps = {}
## dict, code string
self._all_string = {}
self._fill_dict (lstseg)
def _ext_filter(self, f):
""" Check if the extension file is ok.
def _match_fn (self, fns, seg, hook=None):
""" Find filename matching pipelet rules for a given segment.
Parameters
----------
f : string, file name
fns : string list, list of candidates filenames
seg: string, segment name
hook: string, hook name
Returns
_______
boolean
-------
string, matching filename
"""
if hook is None:
for s in range(len(seg)):
def match(x): return path.basename(x)==seg[0:len(seg)-s]+'.py'
fn = filter(match, fns)
if len(fn)>0:
return fn[0]
else:
for s in range(len(seg)):
for h in range(len(hook)):
def match(x): return path.basename(x)==seg[0:len(seg)-s]+'_'+hook[0:len(hook)-h]+'.py'
fn = filter(match, fns)[0]
if len(fn)>0:
return fn[0]
raise Exception('No source file corresponding to segment %s and hook %s'%(seg,hook))
def _fill_dict(self, lstseg):
""" Initialize all dictionnaries.
Examples
--------
>>> c = LocalRepository('../test')
>>> print(c._ext_filter('pipe.py'))
True
>>> print(c._ext_filter('pipe.py~'))
False
>>> print(c._ext_filter('pipe.m'))
True
Parameters
----------
lstseg : string list: segment names
"""
fns = self.get_fns()
for s in lstseg:
## code
fn = self._match_fn (fns, s)
source = self.get_code_source(fn)
self._code[s] = compile(source, fn, "exec")
self._all_string[s] =source
## hooks
lsthook = self.get_hook_list(source)
for h in lsthook:
fn= self._match_fn(fns, s, h)
source = self.get_code_source(fn)
self._hook[s][h] = compile(so, fn, "exec")
self._all_string[s] +=source
## deps
depend = self.get_directive(Depend, s)
self._deps[s] = ""
for fn in depend.deps:
source = self.get_code_source(fn)
self._deps[s] += source
def get_all_string(self, seg):
""" Return all code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
code string
"""
return bool(self._ext_re.match(path.splitext(f)[1]))
return self._all_string[seg]
def get_directive(self, Direct, seg):
""" Initialize a directive object from segment code string.
""" Initialize a directive object from segment seg.
Parameters
----------
......@@ -81,7 +130,7 @@ class Repository:
-------
directive instance
"""
c = self.get_code_string(seg)+'\n'.join(self._hook[seg].values())
c = self._all_string[seg]
d = Direct()
for l in c.splitlines():
try:
......@@ -94,102 +143,68 @@ class Repository:
except StopIteration:
pass
return d
def get_all_string(self, seg):
""" Return the code string that should enter in the
computation of the hash key.
def get_hook_list(self, code):
""" Return the list of hooks
Parameters
----------
seg : string, name of the segment.
code: string
Returns
-------
string
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_all_files('first'))
['../test/seg_first_code.py']
string list
"""
s = self._code[seg]
for k,v in self._hook[seg].items():
s = s+v
s = s+self._deps[seg]
return s
def save_all_string (self, seg):
""" Save code file and hooks into dictionnaries
hook_list = []
for l in code.split("hook(")[1:]:
hook_list.append(l.split(",")[0][1:-1])
return hook_list
def get_code_string(self, seg):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
code object
"""
file = self.get_code_file(seg)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._code[seg] = code
else:
self._code[seg] = ""
lst_hook = self.get_hook_list(seg)
self._hook[seg] = {}
for h in lst_hook:
file = self.get_hook_file(seg, h)
if file:
fid = open(file, "r")
code = fid.read()
fid.close()
self._hook[seg][h] = code
else:
self._hook[seg][h] = ""
depend = self.get_directive(Depend, seg)
self._deps[seg] = ""
for file in depend.deps:
fid = open(file, "r")
code = fid.read()
fid.close()
self._deps[seg] += code
return self._code[seg]
def get_code_string(self, seg):
""" Return segment code as a string.
def get_deps_string(self, dep):
""" Return external dependency code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
dep: string, dependency name
Returns
-------
string
"""
return self._code[seg]
return self._deps[seg]
def get_hook_list(self, seg):
""" Return the list of hooks
def get_hook_string(self, seg, hook):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string list
code object
"""
hook_list = []
c = self._code[seg]
for l in c.split("hook(")[1:]:
hook_list.append(l.split(",")[0][1:-1])
return hook_list
return self._hook[seg][hook]
def get_docline(self, seg):
""" Return the segment synopsis doc line.
......@@ -203,23 +218,7 @@ class Repository:
string list, name of source files.
"""
import pydoc
return pydoc.source_synopsis(self.get_code_file(seg))
def get_hook_string(self, seg, hook):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string
"""
return self._hook[seg][hook]
return pydoc.source_synopsis(self._code[seg].co_filename)
class LocalRepository(Repository):
""" A local repository.
......@@ -228,7 +227,7 @@ class LocalRepository(Repository):
segment's code file, plus an optionnal segment's library directory.
"""
def __init__(self, src_path, lib_path=None):
def __init__(self, lst_seg, src_path):
""" Initialize a local repository.
Parameters
......@@ -241,100 +240,35 @@ class LocalRepository(Repository):
self._code = {}
self._hook = {}
self._deps = {}
self._all_string = {}
## string, where to find segment's source code file
self.src_path = path.expanduser(path.expandvars(src_path))
## string, where to find segment's library
self.lib_path = lib_path
def get_code_file(self, seg):
""" Return the filename of the segment code.
self._fill_dict (lst_seg)
def get_code_source (self, filename):
""" Read source code from file
Parameters
----------
seg : string, name of the segment.
filename: string
Returns
-------
string, name of the segment's code file.
>>> c = LocalRepository('../test')
>>> print(path.samefile(c.get_code_file('gal'),'../lib/gal.m'))
True
>>> print(path.samefile(c.get_code_file('first'),'../test/first.py'))
True
string, content of file
"""
for s in range(len(seg)):
search= seg[0:len(seg)-s]
try:
f = [filter(self._ext_filter,
glob(path.join(self.src_path, '%s.*'%search)))[0]]
except:
f = []
if len(f) == 0:
try:
f = [filter(self._ext_filter,
glob(path.join(self.lib_path, '%s.*'%search)))[0]]
except:
f = []
if len(f) == 0:
try:
s = re.findall('^(.+?)\d+$',search)[0]
f = [self.get_code_file(s)]
except:
if len(search)==1:
raise Exception('No source file corresponding to segment %s'%seg)
if len(f)>0:
logger.info("segment script %s found"%f[0])
return f[0]
def get_hook_file(self, seg, hook):
""" Check the existence of a local hook file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
hook : string, name of the hook.
fid = open(path.join(self.src_path,filename), "r")
code = fid.read()
fid.close()
return code
def get_fns (self):
""" Return filename candidates.
Returns
-------
string, name of the hook file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_hook_file('dbm', 'preproc'))
['../test/dbm_preproc.py']
list of string basename.
"""
for s in range(len(seg)):
for h in range(len(hook)):
search_s = seg[0:len(seg)-s]
search_h = hook[0:len(hook)-h]
try:
f = [filter(self._ext_filter,
glob(path.join(self.src_path, '%s_%s.*'%(search_s,search_h))))[0]]
except:
f = []
if len(f) == 0:
try:
f = [filter(self._ext_filter,
glob(path.join(self.lib_path, '%s_%s.*'%(search_s,search_h))))[0]]
except:
f = []
if len(f) == 0:
try:
s = re.findall('^(.+?)\d+$',search_s)[0]
f = [self.get_hook_file(s, search_h)]
except:
if (len(search_s)==1) and (len(search_h)==1):
logger.warning('No source file corresponding to hook')
return None
if len(f)>0:
logger.info("hook script %s found"%f[0])
return f[0]
return glob(path.join(self.src_path, '*.py'))
class GitRepository(Repository):
......
......@@ -27,6 +27,7 @@ from contextlib import closing
from directive import Multiplex
import pickle
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
"""
......@@ -191,7 +192,6 @@ class Scheduler():
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -213,27 +213,27 @@ class Scheduler():
if not path.exists(d):
os.mkdir(d)
r = self.pipe.repository
f = r.get_code_file(seg)
f = r.get_code_string(seg).co_filename
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_code_string(seg))
fid.close()
os.system("cp %s %s"%(f, dest))
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_file(seg, h)
f = r.get_hook_string(seg, h).co_filename
if f:
dest = d+'/'+os.path.basename(f)
fid = open(dest, "w")
fid.write(r.get_hook_string(seg, h))
fid.close()
##self.store_meta_seg(seg)
os.system("cp %s %s"%(f, dest))
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.info('Found %d done tasks segment %s'%(len(d),seg))
logger.info('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
......@@ -243,6 +243,11 @@ class Scheduler():
l = [Task(seg)]
logger.info('Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
# print seg
# print t.parents
# print t.task_input
if (t.task_input in failed_prod): # done but failed
#logger.debug("task already done and failed in seg %s"%seg)
continue
......
......@@ -372,12 +372,6 @@ class SqliteTracker(Tracker,threading.Thread):
# May hurt performances very badly for short tasks
self._asynchronous_request('update segments set param=? where seg_id =?',
(t.param, self.seg_id_cache[t.seg]))
fn = self.pipe.get_meta_file(t.seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = t.param
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
if status =='done' or status == 'failed':
t.store_meta(self.pipe.get_meta_file(t.seg, prod=t.task_input))
return t
......
......@@ -44,7 +44,7 @@ class Web:
The informations about the pipeline are retrieve from its data base file.
Pipelines are printed through a tree view with links to all segments and products.
"""
p = profiler.Profiler('/home/betoule/pipewebprof/')
#p = profiler.Profiler('/home/betoule/pipewebprof/')
## boolean, Web object is exposed.
exposed = True
def __init__(self, db_file, name):
......
import pipelet.pipeline as pipeline
from pipelet.launchers import launch_process
from pipelet.launchers import launch_process, launch_interactive
import os
pipedot = """
......@@ -8,5 +8,5 @@ third -> fourth;
"""
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
launch_process(P,1)
w, t = launch_interactive(P)
w.run()
#multiplex cross_prod where 'b[0]==a'
#multiplex cross_prod group_by "0"
print seg_input
......@@ -4,14 +4,14 @@ import pipelet.pipeline as pipeline
from pipelet.launchers import launch_interactive
import os
import logging
from pylab import *
pipedot ="""a -> c;
b-> c;
"""
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
P.push(a=[1,2,3])
P.push(a=list(arange(3)))
P.push(b=[1,2,3])
W,t = launch_interactive(P, log_level=0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment