Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 09704ad0 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

multiplex directive almost done. Missing exception for hashkey computation.

parent 6a8248a1
......@@ -387,6 +387,6 @@ class Environment(EnvironmentBase):
try:
res = glo["seg_output"]
except:
res = None
res = []
self.logger.info("No segment output found, setting seg_output to None")
return res
## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
from __builtin__ import zip
def cross_prod(*args):
""" Return the cartesian product of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
Examples
--------
>>> print(len(cross_prod(['a','b'])))
2
>>> print(len(cross_prod(['a','b'],[1,2,3])))
6
>>> print(len(cross_prod(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
24
"""
res = []
if len(args) == 1:
return zip(*args)
l1 = args[0]
l2 = cross_prod(*(args[1:]))
for a in l1:
res.extend([(a,)+b for b in l2])
return res
def union(*args):
""" Return the union of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
Examples
--------
>>> print(len(union(['a','b'])))
2
>>> print(len(union(['a','b'],[1,2,3])))
5
>>> print(len(union(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
9
"""
"""
"""
l = []
for l_ in args:
l.extend(zip(l_))
return l
def gather(*args):
""" Return the gathering of the input sets.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of one tuple.
Examples
--------
>>> print(len(gather(['a','b'])))
1
>>> print(len(gather(['a','b'],[1,2,3])))
1
>>> print(len(gather(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
1
"""
l = []
for l_ in args:
l.extend(l_)
return [l]
......@@ -22,6 +22,8 @@ from utils import get_hashkey, crc32
from utils import str_file, flatten
from contextlib import closing
from environment import *
from task import Task
import multiplex
class PipeException(Exception):
""" Extension of exception class.
......@@ -141,6 +143,9 @@ class Pipeline:
## Environment Base extension
self.env = env
## input dictionnary (for orphan segments)
self._input = {}
def __str__(self):
""" Print the segment tree.
......@@ -268,7 +273,40 @@ class Pipeline:
self._children = dict(zip(seg_list,[[s] for s in seg_list[1:]]+[[]]))
self._parents = dict(zip(seg_list,[[]]+[[s] for s in seg_list[:-1]]))
def get_parents(self,seg):
def get_multiplex(self, seg):
""" Return multiplex argument for segment.
Read segment code to find multiplex argument.
Multiplex argument is one of 'zip', 'cross_prod', 'union','gather'
Default is 'cross_prod'
Parameters
----------
seg: string, segment name
Returns
-------
function instance
"""
namespace = dir(multiplex)
c = self.repository.get_code_string(seg)
n = c.split("#multiplex(")[1:]
if n:
for l in n:
candidate = l.split(")")[0][1:-1]
if candidate in namespace:
m = eval("multiplex.%s"%candidate)
else:
raise PipeException ('Multiplex argument %s not in multiplex namespace'%candidate)
return m
else:
return multiplex.cross_prod
def get_parents(self,seg, nophantom=False):
""" Return a list of parent segments.
Parameters
......@@ -286,7 +324,13 @@ class Pipeline:
['second', 'third']
"""
return self._parents[seg]
if not nophantom and not self._parents[seg]:
l = [seg+"phantom"]
else:
l = self._parents[seg]
return l
def get_childrens(self,seg):
""" Return a list of child segments.
......@@ -300,6 +344,17 @@ class Pipeline:
string list.
"""
return self._children[seg]
def push (self, **args):
""" Add input to orphan segment.
"""
for k, v in args.iteritems():
if self._parents[k]:
raise PipeException("Segment %s not an orphan segment"%k)
if not isinstance(v, list):
raise TypeError ("Segment input should be a list object")
self._input[k] =v
def flatten(self,seg=None,caller=None):
""" Return a generator delivering segments in execution order.
......
......@@ -17,6 +17,7 @@ import os
import os.path as path
import task
from task import TaskList
from task import Task
import Queue
import tracker
import pprint
......@@ -38,96 +39,10 @@ logger = logging.getLogger("scheduler")
h = NullHandler()
logger.addHandler(h)
def cross_prod(*args):
""" Return a sequence of every possible tuples taking one element
in each list.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples.
Examples
--------
>>> print(len(cross_prod(['a','b'])))
2
>>> print(len(cross_prod(['a','b'],[1,2,3])))
6
>>> print(len(cross_prod(['a','b'],[1,2,3],[0.1,0.2,0.3,0.4])))
24
"""
res = []
if len(args) == 1:
return zip(*args)
l1 = args[0]
l2 = cross_prod(*(args[1:]))
for a in l1:
res.extend([(a,)+b for b in l2])
return res
def sorted_zip(*args):
""" Return a list of tuples, where each tuple contains the i-th element
from each of the argument sequences.
Argument sequences are sorted before the matching. The returned
list is truncated in length to the length of the shortest argument
sequence.
Parameters
----------
args : a sequence of lists.
Returns
-------
a list of tuples
Examples
--------
>>> sorted_zip(['a','c','b'],[3,1]) #doctest: +NORMALIZE_WHITESPACE
[('a', 1), ('b', 3)]
"""
return zip(*[sorted(a) for a in args])
def multiplex(*args, **options):
""" Transform several products list into a single list containing
tuples.
Let us assume that 2 products set, l1 and l2, are provided. They
can be combined into a list of couples according two different
scheme: 1) couples are the element of l1 x l2 (called the
cross_prod method) 2) or according to some kind of mapping
between the two sets. Currently the only mapping (called
sorted_zip method) supported is the canonical bijective relation
between the sorted list of products.
Keywords
--------
method: force the use of one multiplexing scheme. Currently can be
sorted_zip or cross_prod. When not specified, sorted_zip is
used each time the sizes of the products lists match, and
cross_prod is used in all other cases. It is probably a bad
idea to rely on this default behavior when writing generic code.
Alternative mapping scheme can be specified through this keyword.
Examples
--------
>>> multiplex( ['a','b'],[3,1])
[('a', 1), ('b', 3)]
>>> multiplex(['a'],[3,1])
[('a', 3), ('a', 1)]
"""
if not options.has_key('method'):
le = [len(l) for l in args]
if reduce(lambda x, y: y == le[0] and x, le):
method=sorted_zip
else:
method=cross_prod
else:
method = options['method']
return method(*args)
class Scheduler():
""" Feed tasks to an execution queue.
......@@ -240,23 +155,12 @@ class Scheduler():
"""
with self.success_lock:
self.nb_success = self.nb_success + 1
self.tracker.update_status(t,'done')
if t.task_output:
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# This segment does not return products, a void product is
# created once and forall and is added to the product
# list.
self.products_list.push_void(t)
self.tracker.update_status(t,'done')
self.products_list.push(t)
self.task_queue.task_done()
def store_meta_seg(self, seg, parents):
def store_meta_seg(self, seg):
""" Store meta information for segment.
This is used to rebuild db.
......@@ -266,6 +170,7 @@ class Scheduler():
seg: string, segment name
parents: list of string, parent segment names.
"""
parents = self.pipe.get_parents(seg, nophantom=True)
fn = self.pipe.get_meta_file(seg)
lst_dir = []
for e in parents:
......@@ -310,66 +215,41 @@ class Scheduler():
fid = open(dest, "w")
fid.write(r.get_hook_string(seg, h))
fid.close()
self.store_meta_seg(seg)
parents = self.pipe.get_parents(seg) ## parents segments
self.store_meta_seg(seg, parents)
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
if parents:
## product list to queue
l = multiplex(*[self.products_list.pop(s) for s in parents]) # cross prod
l = [([r[0] for r in e], [r[1] for r in e]) for e in l]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))
for e, par in l: # foreach product of the task list
if (e in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
continue
if not (e in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(task.Task(seg, task_input=e, status='queued',parents=par, seg_parents=parents))
else: # done
logger.debug("task already accomplished in segment %s"%seg)
## task list to queue
l = self.products_list.multiplex(seg, parents, self.pipe.get_multiplex(seg))
## task with no input
if not l:
l = [Task(seg)]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
if (t.task_input in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
continue
if not (t.task_input in dprod): # not done
logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(t)
else: # done
logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
ind = dprod.index(e)
t = d[ind];
if t.task_output:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from previously done task in segment %s"%seg)
self.products_list.push_void(t)
self.nb_success = self.nb_success + 1
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
else:
if failed: #failed
logger.warning("Orphan segment %s already computed but failed"%seg)
elif d: # done
logger.info("Orphan segment %s already computed"%seg)
if t.task_output:
logger.info("Loading %d results from already computed orphan segment %s"%(len(t.task_output),seg))
for r in t.task_output:
child = task.Task(t.seg, task_output=r)
child.id = t.id
self.products_list.push(child)
else:
# add an empty product once.
logger.debug("Loading a void result from already computed orphan segment %s"%seg)
self.products_list.push_void(t)
ind = dprod.index(t.task_input)
t = d[ind];
try:
logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
except TypeError:
logger.debug("No result to load from previously done task in segment %s"%(seg))
self.products_list.push(t)
self.nb_success = self.nb_success + 1
else: #not done
logger.info("Pushing orphan segment %s"%seg)
self.put_task(task.Task(seg, status='queued'))
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
def task_failed(self, t):
""" Tell the tracker when a task has failed.
......@@ -428,6 +308,10 @@ class Scheduler():
self.tracker = tracker.SqliteTracker(self.pipe)
self.tracker.start()
logger.info("Tracker launched")
for k,v in self.pipe._input.iteritems():
t = Task(self.pipe.get_parents(k)[0], task_output=v)
self.products_list.push(t)
logger.info("Pushing phantom task %s"%str(t))
try:
for s in self.pipe.flatten():
print s
......
......@@ -18,6 +18,7 @@ from utils import str_date
from contextlib import closing
import pickle
class Task:
""" A segment code associated with its input/output product(s).
......@@ -25,7 +26,7 @@ class Task:
execution status and its output product(s).
"""
def __init__(self, seg, task_input=None, status=None, task_output=None, id=None, queued_on=None, parents=[], seg_parents=[]):
def __init__(self, seg, task_input=[], status=None, task_output=[], id=None, queued_on=None, parents=[]):
""" Initialize a task object.
Parameters
......@@ -57,8 +58,7 @@ class Task:
self.parents = parents
## List of str_input of the parent tasks
self.str_parents = []
## List of curr_dir of the parent segments
self.seg_parents = seg_parents
def __str__(self):
""" Convert task object to string.
......@@ -98,12 +98,9 @@ class Task:
class TaskList:
""" List of task objects.
For now a tasklist is just a wrapper around dict, but we may
change our mind without changing the interface.
""" List of accomplished tasks per segment.
"""
def __init__(self):
""" Initialize a task list.
"""
......@@ -112,21 +109,6 @@ class TaskList:
## lock on task list
self._void_task_lock = threading.Lock()
def count(self, seg):
""" Return the number of tasks stored for the segment.
Parameters
----------
seg : string, segment name.
Returns
-------
int, number of tasks.
"""
try:
return len(self._list[seg])
except KeyError:
return 0
def pop(self, seg):
""" Remove and return stored products of a segment.
......@@ -157,24 +139,35 @@ class TaskList:
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append((t.task_output,t.id))
self._list[t.seg].append(t)
def push_void(self, t):
""" Add a 'void' task to the task list, if not previously done.
This is used when a segment does not return any product. The
required behaviour is then to push only one void task in the
queue of the next segment.
This is thread safe to ensure the unicity of the void task.
def multiplex (self, seg, parent_list, method):
""" Append segment tasks deduced from multiplex method
Multiplex method is one of 'cross_prod', 'union', 'gather', 'zip'.
Parameters
----------
t : task object.
seg : string, segment name
parent_list: list of parent segment name
method: multiplex method
"""
with self._void_task_lock:
if not isinstance(t, Task):
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append((None, t.id))
a = [] ## as many as parent_list
for p in parent_list:
b = []
try:
lst_p = self._list[p]
## no input for orphan segment
except KeyError:
lst_p = []
for t in lst_p:
b = b+ [(o, t.id, p) for o in t.task_output]
a.append(b)
output_set = method (*a)
## e for each task to push
lst_task = [Task(seg, [{r[2]:r[0]} for r in e], status='queued', parents=[r[1] for r in e if r[1] is not None]) for e in output_set]
for l in lst_task:
print str(l)+"parents: "
print l.parents
return lst_task
......@@ -15,6 +15,8 @@ third->fourth
T = pipeline.Pipeline(S, code_dir=op.abspath('./'), prefix=op.abspath('./'))
T.to_dot('pipeline.dot')
#T.push (first=["lancelot"])
print T
W,t = launch_interactive(T)
W.run()
......
p = glob_seg('third', 'Preambule.txt')
t = glob_seg('second','test.txt')
import subprocess
#multiplex('cross_prod')
bla = 9
f = file(get_data_fn('result.txt'),'w')
print (p,t)
res = subprocess.Popen(['cat']+p+t,stdout=f).communicate()[0]
......
......@@ -5,5 +5,5 @@ product = seg_input[0]
f = file(tf,'w')
f.write('This segment received %s as arg'%product)
f.close
seg_output = [seg_input[0]]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment