Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 33fdd1a8 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

some bug remaining

parent 556aec52
......@@ -97,7 +97,7 @@ class Repository:
self._all_string[s] +=source
## deps
depend = self.get_directive(Depend, self._all_string[s])
depend = self.get_directive(Depend, s)
self._deps[s] = ""
for fn in depend.deps:
source = self.get_code_source(fn)
......@@ -118,8 +118,8 @@ class Repository:
"""
return self._all_string[seg]
def get_directive(self, Direct, code):
""" Initialize a directive object from segment code string.
def get_directive(self, Direct, seg):
""" Initialize a directive object from segment seg.
Parameters
----------
......@@ -130,7 +130,7 @@ class Repository:
-------
directive instance
"""
c = code
c = self._all_string[seg]
d = Direct()
for l in c.splitlines():
try:
......
......@@ -27,6 +27,7 @@ from contextlib import closing
from directive import Multiplex
import pickle
class NullHandler(logging.Handler):
""" Extension of the logging handler class.
"""
......@@ -191,7 +192,6 @@ class Scheduler():
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
def push_next_seg(self, seg):
""" Push the segment task list to the queue.
......@@ -216,20 +216,24 @@ class Scheduler():
f = r.get_code_string(seg).co_filename
if f:
dest = d+'/'+os.path.basename(f)
os.system("cp %s %s"%(dest, f))
os.system("cp %s %s"%(f, dest))
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_string(seg, h).co_filename
if f:
dest = d+'/'+os.path.basename(f)
os.system("cp %s %s"%(dest, f))
os.system("cp %s %s"%(f, dest))
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.info('Found %d done tasks segment %s'%(len(d),seg))
logger.info('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
......@@ -239,6 +243,11 @@ class Scheduler():
l = [Task(seg)]
logger.info('Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
# print seg
# print t.parents
# print t.task_input
if (t.task_input in failed_prod): # done but failed
#logger.debug("task already done and failed in seg %s"%seg)
continue
......
#multiplex cross_prod where 'b[0]==a'
#multiplex cross_prod group_by "0"
print seg_input
......@@ -4,14 +4,14 @@ import pipelet.pipeline as pipeline
from pipelet.launchers import launch_interactive
import os
import logging
from pylab import *
pipedot ="""a -> c;
b-> c;
"""
P = pipeline.Pipeline(pipedot, code_dir='./', prefix='./')
P.push(a=[1,2,3])
P.push(a=list(arange(3)))
P.push(b=[1,2,3])
W,t = launch_interactive(P, log_level=0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment