Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 7b047d24 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Feature #872 almost done. Need to set final syntax

parent f5e65823
......@@ -9,7 +9,7 @@
# automated distribution of computational tasks.
__all__=['scheduler','worker', 'repository', 'tracker', 'pipeline', 'task']
__all__=['environment', 'scheduler','worker', 'repository', 'tracker', 'pipeline', 'task']
from scheduler import *
from worker import *
......@@ -17,4 +17,5 @@ from repository import *
from tracker import *
from pipeline import *
from task import *
#import cherrypy
from environment import *
class EnvironmentBase():
def _get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return path.join(self.pipe.get_data_dir(self.task.seg, prod=self.task.task_input), x)
def _make_environment(self):
""" Prepare the namespace for a segment.
Returns
-------
Dictionnary, namespace containing all the
"""
class Environment(EnvironmentBase):
def get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return self._get_data_fn(x)
def logged_subprocess(worker, args, shell=False):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err
Parameters
----------
args: a Task argument
Returns
-------
tuple (outputfile, errorfile)
"""
proc = args[0]
of = worker._get_data_fn(proc+'.log')
ef = worker._get_data_fn(proc+'.err')
o = file(of,'w')
e = file(ef,'w')
o.write('#'+' '.join([str(a) for a in args])+'\n')
o.flush()
subprocess.Popen(args, stdout=o, stderr=e, shell=shell).communicate()[0]
o.close()
e.close()
return (of, ef)
def get_fig_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return path.join(self.pipe.get_fig_dir(self.task.seg, prod=self.task.task_input), x)
def get_tmp_fn(self):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf = path.join(self.work_dir, str(datetime.now().toordinal()))
self._tmpf.append(tf)
return tf
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx = self.pipe.find_seg(self.task.seg, x)
return glob(path.join(self.pipe.get_data_dir(segx),y))\
+glob(path.join(self.pipe.get_data_dir(segx),path.join('*/',y)))
def load_param(self,seg,glo,param_name='*'):
""" Update the global dictionnary with parameters of a
segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
segx = self.pipe.find_seg(self.task.seg, seg)
if not path.exists(self.pipe.get_param_file(segx)):
return
self.load_products(self.pipe.get_param_file(segx), glo, param_name=param_name)
def save_products(self, filename, glo, param_name='*'):
""" Use pickle to save a part of a given namespace.
Parameters
----------
filename : string, save in this filename.
glo : dict, the namespace.
param_name : string list, name of objects to save. If '*' save
everything in the dictionnary.
"""
new_dict = {}
if param_name == '*':
param_name = glo.keys()
for k in param_name:
try:
new_dict[k] = glo[k]
except KeyError:
logger.warning( 'Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
def load_products(self, filename, glo, param_name='*'):
""" Update a namespace by unpickling requested object from the
file.
Parameters
----------
filename: string, the pickle filename.
glo : dict, the namespace to update.
param_name : string list, names of object to unpickle. If '*',
everything from the file is loaded.
"""
try:
f = file(filename)
new_dict = pickle.load(f)
f.close()
except IOError:
logger.warning( 'No such file: %s'%filename)
except UnpicklingError:
logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
for k in param_name:
try:
glo[k] = new_dict[k]
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code = self.pipe.repository.get_hook_string(self.task.seg, hook_name)
if code:
exec(code, glo)
else:
print "No hook file named %s for seg %s"%(self.task.seg,hook_name)
......@@ -49,21 +49,8 @@ import logging
import logging.handlers
import sys
import datetime
from pipelet.utils import get_log_file, init_logger
def get_log_file (pipe, name):
""" Return log file name
Current date time is prepend to the log file name.
Parameters
----------
pipe: pipe instance
name: log file short name
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%m%d_%H%M%s%f")
return path.join(pipe.get_log_dir(),strdate+"_"+name)
def set_logger (pipe, log_level):
......@@ -84,37 +71,6 @@ def set_logger (pipe, log_level):
init_logger ('worker', work_log_file, level=log_level)
def init_logger (name, filename, level=logging.DEBUG):
""" Initialize a logger.
Parameters
----------
name: string, logger name
level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
filename: if not None, a rotating file handler is used.
Default is stream handler.
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
if (len(logger.handlers) < 3):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
if level:
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(level)
shandler.setFormatter(formatter)
logger.addHandler(shandler)
def launch_interactive(pipe, log_level=logging.DEBUG):
""" Launch a local worker in the interactive session.
......
......@@ -19,6 +19,7 @@
import os.path as path
import re
import subprocess
import logging
import tempfile
import base64
......@@ -30,6 +31,8 @@ from pipelet.repository import LocalRepository
from contextlib import closing
import pickle
import os
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
def flatten(seq):
......@@ -215,6 +218,51 @@ def reduced_code_formatting(s, unsafe=False):
return min_code
def get_log_file (pipe, name):
""" Return log file name
Current date time is prepend to the log file name.
Parameters
----------
pipe: pipe instance
name: log file short name
"""
d = datetime.datetime.now()
strdate = d.strftime("%y%m%d_%H%M%s%f")
return path.join(pipe.get_log_dir(),strdate+"_"+name)
def init_logger (name, filename, level=logging.DEBUG):
""" Initialize a logger.
Parameters
----------
name: string, logger name
level:
integer, print log on stdout with log_level (values given by
logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
filename: if not None, a rotating file handler is used.
Default is stream handler.
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
if (len(logger.handlers) < 4):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
rfhandler = logging.handlers.RotatingFileHandler(
filename, maxBytes=200000, backupCount=5)
logger.addHandler(rfhandler)
rfhandler.setFormatter(formatter)
if level:
shandler = logging.StreamHandler(sys.stdout)
shandler.setLevel(level)
shandler.setFormatter(formatter)
logger.addHandler(shandler)
return logger
def create_pipe(pipename, prefix=[]):
""" Create an empty pipe environment including main, defaulf
segment, prefix and log directories.
......
......@@ -16,6 +16,7 @@
import os.path as path
import os
from task import *
from environment import *
from glob import glob
import cPickle as pickle
from datetime import datetime
......@@ -27,6 +28,7 @@ import signal
from contextlib import closing
import gc
import logging
import inspect
class NullHandler(logging.Handler):
......@@ -129,67 +131,6 @@ class Worker(object):
""" Disconnect from scheduler. """
self.scheduler.check_out()
def load_products(self, filename, glo, param_name='*'):
""" Update a namespace by unpickling requested object from the
file.
Parameters
----------
filename: string, the pickle filename.
glo : dict, the namespace to update.
param_name : string list, names of object to unpickle. If '*',
everything from the file is loaded.
"""
try:
f = file(filename)
new_dict = pickle.load(f)
f.close()
except IOError:
logger.warning( 'No such file: %s'%filename)
except UnpicklingError:
logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
for k in param_name:
try:
glo[k] = new_dict[k]
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def make_tag(self, seg, glo):
""" Write a tag using parameters of a segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
Returns
-------
string.
"""
try:
var_tag = glo['var_tag']
except KeyError:
var_tag = []
strtag = ''
for param in var_tag:
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
fn = self.pipe.get_meta_file(seg)
with closing(file(fn, 'r')) as f:
d = pickle.load(f)
d['param'] = var_tag
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
return var_tag
def make_dir(self, task):
""" Create a directory for a given task.
......@@ -217,94 +158,6 @@ class Worker(object):
if e.errno != 17: # file exist
raise e
def logged_subprocess(self, args, shell=False):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err
Parameters
----------
args: a Task argument
Returns
-------
tuple (outputfile, errorfile)
"""
proc = args[0]
of = self.get_data_fn(proc+'.log')
ef = self.get_data_fn(proc+'.err')
o = file(of,'w')
e = file(ef,'w')
o.write('#'+' '.join([str(a) for a in args])+'\n')
o.flush()
subprocess.Popen(args, stdout=o, stderr=e, shell=shell).communicate()[0]
o.close()
e.close()
return (of, ef)
def get_data_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return path.join(self.pipe.get_data_dir(self.task.seg, prod=self.task.task_input), x)
def get_fig_fn(self, x):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return path.join(self.pipe.get_fig_dir(self.task.seg, prod=self.task.task_input), x)
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx = self.pipe.find_seg(self.task.seg, x)
return glob(path.join(self.pipe.get_data_dir(segx),y))\
+glob(path.join(self.pipe.get_data_dir(segx),path.join('*/',y)))
def hook(self, hook_name, glo):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code = self.pipe.repository.get_hook_string(self.task.seg, hook_name)
if code:
exec(code, glo)
else:
print "No hook file named %s for seg %s"%(self.task.seg,hook_name)
def prepare_env(self, task):
""" Build the segment global execution environment for a given task.
......@@ -324,86 +177,18 @@ class Worker(object):
gc.collect()
self.task = task
glo = {'get_data_fn':self.get_data_fn, 'get_fig_fn':self.get_fig_fn,
'segment_args':task.task_input, 'get_tmp_fn':self.get_tmp_fn,
'glob_seg':self.glob_seg,'load_param':self.load_param,
'load_products':self.load_products,
'save_products':self.save_products,
'logged_subprocess': self.logged_subprocess,
#'overload_param': self.overload_param}
'hook': self.hook}
return glo
def clean_tmp(self):
""" Delete the list of tmp file created by a segment
"""
for e in self._tmpf:
subprocess.Popen(['rm', e]).communicate()[0]
self._tmpf = []
def get_tmp_fn(self):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf = path.join(self.work_dir, str(datetime.now().toordinal()))
self._tmpf.append(tf)
return tf
env = Environment(self)
glo = {}
for l in env._get_namespace():
bool = eval ("inspect.ismethod(env.%s)"%l)
## method -> build lambda function
if bool:
exec("glo['%s'] = lambda *args : env.%s(*args)"%(l,l), locals())
## attribut -> set to attribut
else:
exec("glo['%s'] = env.%s"%(l,l), locals())
return (glo,env)
def load_param(self,seg,glo,param_name='*'):
""" Update the global dictionnary with parameters of a
segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
segx = self.pipe.find_seg(self.task.seg, seg)
if not path.exists(self.pipe.get_param_file(segx)):
return
self.load_products(self.pipe.get_param_file(segx), glo, param_name=param_name)
def save_products(self, filename, glo, param_name='*'):
""" Use pickle to save a part of a given namespace.
Parameters
----------
filename : string, save in this filename.
glo : dict, the namespace.
param_name : string list, name of objects to save. If '*' save
everything in the dictionnary.
"""
new_dict = {}
if param_name == '*':
param_name = glo.keys()
for k in param_name:
try:
new_dict[k] = glo[k]
except KeyError:
logger.warning( 'Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
def save_param(self, seg, glo,param_name='*'):
""" Save parameters of a segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
if path.exists(self.pipe.get_param_file(seg)):
return
self.save_products(self.pipe.get_param_file(seg),glo, param_name=param_name)
class InteractiveWorker(Worker):
......@@ -464,23 +249,11 @@ class InteractiveWorker(Worker):
seg = task.seg
prod = task.task_input
self.make_dir(task)
glo = self.prepare_env(task)
(glo,env) = self.prepare_env(task)
self.task = task
code = self.pipe.repository.get_code_string(seg)
exec (code, glo)
try: # set product
res = glo['res']