Commit d6618f0a authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'master' of git@gitorious.org:pipelet/pipelet

parents 693b8926 6d72c1d1
......@@ -22,3 +22,10 @@ def old_to_new (old_file, new_file):
conn2.execute('insert into tasks (seg_id, status, input, output, str_prod) values (?, ?, ?, ?, ?)',(seg_id,)+t)
def change_root(sql_file,old_prefix, new_prefix):
""" Update the data root path in the database."""
conn1 = sqlite3.connect(sql_file)
with conn1:
conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix))
conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
......@@ -280,7 +280,7 @@ class Environment(EnvironmentBase):
try:
new_dict[k] = glo[k]
except KeyError:
logger.warning('Fail to save object %s in file %s'%(k,filename))
self.logger.warning('Fail to save object %s in file %s'%(k,filename))
f = file(filename,'w')
pickle.dump(new_dict,f)
f.close()
......@@ -301,9 +301,9 @@ class Environment(EnvironmentBase):
new_dict = pickle.load(f)
f.close()
except IOError:
logger.warning('No such file: %s'%filename)
self.logger.warning('No such file: %s'%filename)
except UnpicklingError:
logger.warning('Failed to unpickle from file: %s'%filename)
self.logger.warning('Failed to unpickle from file: %s'%filename)
f.close()
if param_name == '*':
param_name = new_dict.keys()
......@@ -311,7 +311,7 @@ class Environment(EnvironmentBase):
try:
glo[k] = new_dict[k]
except KeyError:
logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
self.logger.warning( 'Fail to load object %s from file %s'%(k,filename) )
def clean_tmp(self):
......@@ -343,7 +343,7 @@ class Environment(EnvironmentBase):
if param in glo:
strtag = strtag + param + '=' + str_web(glo[param])+' '
else:
logger.warning( 'parameter '+param+' not in dictionary')
self.logger.warning( 'parameter '+param+' not in dictionary')
var_tag = strtag + ' <small>(<b>'+ datetime.today().strftime("%e %m - %R")+'</b>)</small> '
fn = self._worker.pipe.get_meta_file(seg)
with closing(file(fn, 'r')) as f:
......
......@@ -285,7 +285,10 @@ def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir=
import re
l = subprocess.Popen(['qjob', '-wide','-nh'], stdout=subprocess.PIPE).communicate()[0]
existing_process = re.findall( '%s([0-9]*)'%job_name,l)
starting_num = max([int(p) for p in existing_process])+1
try:
starting_num = max([int(p) for p in existing_process])+1
except:
starting_num = 0
for i in range(starting_num,starting_num+n):
name = '%s%d'%(job_name,i)
jobfile = get_log_file (pipe, name+'.job')
......
......@@ -40,8 +40,13 @@ def cross_prod(*args):
return zip(*args)
l1 = args[0]
l2 = cross_prod(*(args[1:]))
for a in l1:
res.extend([(a,)+b for b in l2])
if l1 and l2:
for a in l1:
res.extend([(a,)+b for b in l2])
elif l1:
res.extend(zip(l1))
elif l2:
res.extend(l2)
return res
def union(*args):
......
......@@ -131,7 +131,7 @@ class Pipeline:
self.compute_hash()
## string, sql data base
self.sqlfile=sqlfile
self.sqlfile=path.expanduser(path.expandvars(sqlfile))
## boolean, if true, turn the matplotlib backend to Agg
self.matplotlib = matplotlib
......
from pipelet.environment import *
import os
class PlanckEnvironment(Environment):
def my_function (self):
""" My function do nothing
"""
return
def get_data_fn (self, x, dmc=False, grp=None):
""" Complete the filename with the path to the working
directory.
If dmc is True a DMC object name is returned
using grp as group name and an empty file pointing to the DMC
object is made.
Parameters
----------
x: string, filename suffix
grp: string, DMC group name
Returns
-------
string, filename
"""
## dmc object
if dmc:
x = x.split(".")[0] ## remove extension
x = x+"_"+self._worker.pipe._hashes[self._worker.task.seg] ## add key
if grp is None:
logger.error ("groupname is None for DMC object %s"%x)
import piolib
x = path.join(grp, x)
x = path.join(piolib.GetDataDB(), x)
locname = self._get_data_fn(x.replace("/", ":"))
os.system("touch %s"%locname)
return x
## local file
else:
return self._get_data_fn(x)
def glob_default(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx = self._worker.pipe.find_seg(self._worker.task.seg, x)
if segx is None:
self.logger.warning("No parent segment matching %s found"%x)
return glob(path.join(self._worker.pipe.get_data_dir(segx),y))\
+glob(path.join(self._worker.pipe.get_data_dir(segx),path.join('*/',y)))
def glob_seg(self, x, y):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
f = self.glob_default(x, y)
import piolib
if not f:
self.logger.info("No file matching the extension, looking for DMC object")
z = y.split(".")[0]
z = "*:"+z
self.logger.info("New regexp is %s"%z)
f = self.glob_default(x, z)
lst = []
for fi in f:
lst.append(os.path.basename(fi).replace(":", "/"))
f = lst
return f
......@@ -65,7 +65,7 @@ class Repository:
return bool(self._ext_re.match(path.splitext(f)[1]))
def get_directive(self, Direct, seg):
c = self.get_code_string(seg)
c = self.get_code_string(seg)+'\n'.join(self._hook[seg].values())
d = Direct()
for l in c.splitlines():
try:
......@@ -252,7 +252,7 @@ class LocalRepository(Repository):
s = re.findall('^(.+?)\d+$',seg)[0]
f = [self.get_code_file(s)]
except:
raise Exception('No source file corresponding to segment')
raise Exception('No source file corresponding to segment %s'%seg)
return f[0]
def get_hook_file(self, seg, hook):
......
......@@ -619,11 +619,14 @@ class Web:
datlist = []
loglist = []
for filename in lstfile:
if (filename.split(".")[1] in ["png", "jpg", "eps", "pdf"]):
imglist.append(filename)
elif (filename.split(".")[1] in ["meta", "log", "err"]):
loglist.append(filename)
else:
try:
if (filename.split(".")[1] in ["png", "jpg", "eps", "pdf"]):
imglist.append(filename)
elif (filename.split(".")[1] in ["meta", "log", "err"]):
loglist.append(filename)
else:
datlist.append(filename)
except IndexError:
datlist.append(filename)
biglist = [sorted(loglist), sorted(datlist), sorted(imglist)]
name = ['LOGS', 'DATA', 'FIGURES']
......
......@@ -108,16 +108,14 @@ class Worker(object):
n = 0
try:
for task in iter(self.scheduler.get_task, None):
if task is not None:
self.work_dir = self.pipe.get_curr_dir(task.seg)
task = self.execute_task(task)
if task.status == "done":
self.scheduler.task_done(task)
else:
self.scheduler.task_failed(task)
n = n+1
self.work_dir = self.pipe.get_curr_dir(task.seg)
task = self.execute_task(task)
if task.status == "done":
self.scheduler.task_done(task)
else:
break
self.scheduler.task_failed(task)
n = n+1
self.task = None
logger.info("%d jobs completed" % n)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
......
......@@ -62,6 +62,12 @@ def main(options, args):
signal.signal(signal.SIGABRT, catch_sigterm)
signal.signal(signal.SIGINT, catch_sigterm)
if options.buffer:
old = pipe.sqlfile
pipe.sqlfile = options.buffer
shutil.copy(old, options.buffer)
try:
# Launching the scheduler
class SchedulerManager(BaseManager):
......@@ -69,11 +75,11 @@ def main(options, args):
"""
pass
if options.buffer:
old = pipe.sqlfile
pipe.sqlfile = options.buffer
shutil.copy(old, options.buffer)
# Now that logging is set up decouple from parent environnement
redirect_stream(sys.stdin, None)
redirect_stream(sys.stdout, None)
redirect_stream(sys.stderr, None)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
......@@ -81,12 +87,6 @@ def main(options, args):
mgr.start()
print "Started"
# Now that logging is set up decouple from parent environnement
redirect_stream(sys.stdin, None)
redirect_stream(sys.stdout, None)
redirect_stream(sys.stderr, None)
os.chdir(WORKDIR)
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
......
......@@ -2,7 +2,7 @@
def main():
import optparse
parser = optparse.OptionParser(usage="\nTo create a new pipeline:\n %prog -c <pipename> [-p <prefix>]\nTo activate acl and setup a new user:\n %prog -a <username> [-l <access_level>] <sql_file>\nTo suppress an existing user:\n %prog -d <username> ")
parser = optparse.OptionParser(usage="\nTo create a new pipeline:\n %prog -c <pipename> [-p <prefix>]\nTo activate acl and setup a new user:\n %prog -a <username> [-l <access_level>] <sql_file>\nTo suppress an existing user:\n %prog -d <username>\nTo change the data root directory : %prod -r old_dir new_dir <sql_file>")
parser.add_option('-c', '--create-pipeline',
help='Create a new pipeline',)
parser.add_option('-p', '--prefix',
......@@ -11,6 +11,8 @@ def main():
help='Setup acl, add a new user',)
parser.add_option('-d', '--del-user',
help='Delete an existing user')
parser.add_option('-r', '--change-prefix',
help='Modify the prefix directory',nargs=2)
parser.add_option('-l', '--access-level',
help='When creating a new user give him this level', default=1, type='int')
......@@ -37,7 +39,9 @@ def main():
elif options.create_pipeline:
from pipelet import utils
utils.create_pipe(options.create_pipeline, prefix=options.prefix)
elif options.change_prefix:
from pipelet import db_utils
db_utils.change_root (args[0], options.change_prefix[0], options.change_prefix[1])
if __name__ == "__main__":
main()
......@@ -6,8 +6,8 @@ import os.path as op
import logging
import sys
S = """
first->second->fourth
third->fourth
mkgauss->convol;
fftimg->convol;
"""
#T.connect('second', ['third', 'fourth'], 'fourth')
#T.compute_hash()
......@@ -15,6 +15,8 @@ third->fourth
T = pipeline.Pipeline(S, code_dir=op.abspath('./'), prefix=op.abspath('./'))
T.to_dot('pipeline.dot')
T.push(fftimg=[1,2,3,4])
#T.push (first=["lancelot"])
print T
......
"""
"""
import pylab
load_param("fftimg", globals(), ["x_size", "y_size"])
img = seg_input['fftimg']
fft_img = glob_seg("fftimg", "fft_%d.dat"%img)[0]
fft_filter = glob_seg("mkgauss", "fft.dat")[0]
load_products(fft_filter, globals(), ["filter"])
load_products(fft_img, globals(), ["im_fft"])
for i in range(x_size):
for j in range(y_size):
im_fft [i,j] = im_fft[i,j] * filter[i,j]
im = pylab.ifft2(im_fft).real
pylab.imshow(im)
pylab.savefig(get_data_fn("filtered_%s.png"%img))
""" Compute 2d fft.
"""
import glob
import pylab
lst_par = ["x_size", "y_size"]
lst_tag = ["x_size", "y_size"]
## set image number
if seg_input is not None:
img = seg_input.values()[0]
else:
img = 1
## get image file name
img_file = glob.glob("%d.dat"%img)[0]
## load image and make a plot
im = pylab.loadtxt(img_file)
pylab.imshow(im)
pylab.savefig(get_data_fn("%d.png"%img))
x_size = pylab.shape(im)[0]
y_size = pylab.shape(im)[1]
## compute fft
im_fft = pylab.fft2(im)
save_products(get_data_fn("fft_%d.dat"%img), globals(), ["im_fft"])
seg_output = [img]
# from PIL import Image
# for l in range(4):
# im = Image.open("%d.png"%(l+1))
# im = im.convert ("L", colors=2048)
# im = im.convert("P", palette=Image.ADAPTIVE)
# #imshow(im)
# pix = im.load()
# new = zeros((256,256))
# for y in range(256):
# for x in range(256):
# new[x,y] = pix[x,y]
# #imshow(new)
# savetxt("%d.dat"%(l+1), new)
""" Make a Gaussian Fourier filter
The Gaussian shape is set by the fwhm parameter (full width half height).
The amplitude is set to unity by default.
"""
lst_par = ["fwhm" , "x_size", "y_size", "a"]
lst_tag = ["fwhm"]
import pylab
## Image dimension
x_size = 256
y_size = 256
## Gaussian full width half height
fwhm = 10
## make a gaussian pattern
im = pylab.zeros((x_size,y_size))
center_x = x_size/2
center_y = y_size/2
a = 1
fact = - pylab.log(2) / (0.25 * fwhm * fwhm)
for i in range(x_size):
for j in range(y_size):
dist = ((i-center_x)**2) + ((j-center_y)**2)
im[i,j] = a*pylab.exp(dist*fact)
for i in range(center_x, center_x+x_size):
for j in range(center_y, center_y+y_size/2):
c = im[i%x_size,j%x_size]
im[i%x_size,j%x_size] = im[i-center_x, j-center_y]
im[i-center_x, j-center_y] = c
## plot it
pylab.imshow(im)
pylab.savefig(get_data_fn("gaussian_pattern.png"))
## compute its fft
filter = pylab.fft2(im)
## save it to disk
save_products(get_data_fn("fft.dat"), globals(), ["filter"])
seg_output = [fwhm]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment