Commit b908e75d authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'master' into v1.0

parents f6e5282a b866095f
......@@ -436,6 +436,10 @@ mode that enable to exploitation of data parallelism (in this case
running the four independent instances of the melt segment in
parallel), and how to provide web access to the results.
*** The exemple pipelines
**** fft
**** cmb
** Running Pipes
*** The sample main file
......@@ -225,8 +225,8 @@ class Environment(EnvironmentBase):
ef = self.get_data_fn(proc+'.err')
o = file(of,'w')
e = file(ef,'w')
o.write('#'+' '.join([str(a) for a in args])+'\n')
e.write('#'+' '.join([str(a) for a in args])+'\n')
p=subprocess.Popen(args, stdout=o, stderr=e, shell=shell)
if except_on_failure:
......@@ -71,7 +71,7 @@ def set_logger (pipe, log_level):
init_logger ('worker', work_log_file, level=log_level)
def launch_interactive(pipe, log_level=logging.DEBUG):
def launch_interactive(pipe, log_level=logging.INFO):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
......@@ -103,7 +103,7 @@ def launch_interactive(pipe, log_level=logging.DEBUG):
def launch_thread(pipe, n, log_level=logging.CRITICAL ):
def launch_thread(pipe, n, log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
......@@ -141,7 +141,7 @@ class SchedulerManager(BaseManager):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.CRITICAL ):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -181,7 +181,7 @@ def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.CRITICAL ):
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.WARNING ):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
......@@ -216,8 +216,10 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging
for w in processlist:
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.CRITICAL, cpu_time="2:00:00", server=False, job_header="""
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
......@@ -265,7 +267,7 @@ python -m pipelet.launchers -H %s -p %s -s %s
def launch_ccali(pipe, n, address=('',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs",
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.DEBUG,buff=None):
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.INFO,buff=None):
"""submit a bunch of distant workers to the BQS system
at CCIN2P3.
......@@ -20,7 +20,6 @@ from task import TaskList
from task import Task
import Queue
import tracker
import pprint
import shutil
import threading
import logging
......@@ -235,14 +234,14 @@ class Scheduler():
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))'Found %d done tasks segment %s'%(len(d),seg))'Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
l = self.products_list.multiplex(seg, parents, self.pipe.repository.get_directive(Multiplex,seg))
## task with no input
if not l:
l = [Task(seg)]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))'Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
if (t.task_input in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
......@@ -311,7 +310,7 @@ class Scheduler():
except Queue.Empty:"Empty Queue")
logger.debug("Empty Queue")
def run(self):
""" Start the scheduler.
......@@ -326,10 +325,9 @@ class Scheduler():
for k,v in self.pipe._input.iteritems():
t = Task(self.pipe.get_parents(k)[0], task_output=v)
self.products_list.push(t)"Pushing phantom task %s"%str(t))
logger.debug("Pushing phantom task %s"%str(t))
for s in self.pipe.flatten():
if self.abort:
......@@ -21,7 +21,6 @@ import shutil
import threading
import Queue
import logging
from utils import str_date
from contextlib import closing
logger = logging.getLogger("scheduler")
......@@ -214,15 +213,15 @@ class SqliteTracker(Tracker,threading.Thread):
'select seg_id from segments where curr_dir = ? limit 1',
self.seg_id_cache[s] = seg_id"Segment %s instance (%s) already registered in db."%(s, curr_dir))
logger.debug("Segment %s instance (%s) already registered in db."%(s, curr_dir))
except TypeError:"Creating segment %s instance (%s)."%(s, curr_dir))
logger.debug("Creating segment %s instance (%s)."%(s, curr_dir))
c = self.conn.execute(
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id"Storing connectivity for segment %s."%s)
logger.debug("Storing connectivity for segment %s."%s)
for p in self.pipe._parents[s]:
......@@ -313,37 +313,6 @@ def create_pipe(pipename, prefix=[]):
print "You should be able to browse the result on the web page http://localhost:8080\n"
print "Retype the current command line to get those instructions back."
def hash_file(codefile):
"""Use cheksum algorithm to return a unique key for file.
DEPRECATED: use crc32
codefile: string.
>>> print(hash_file('../test/'))
if not path.exists(codefile):
logger.warning('Try to compute checksum of non existing file %s.'%codefile)
return None
sum_binary = "cksum"
# Computes checksum
sp = subprocess.Popen([sum_binary, codefile], stdout=subprocess.PIPE)
res = sp.communicate()[0]
key = "%X" % int(res.split(' ')[0])
if sp.returncode != 0:
logger.critical('Error while computing checksum of file %s.'%codefile)
return key
def get_hashkey(s, starthash):
"""Return a unique key which identify code string
......@@ -523,3 +492,4 @@ def make_dict(l):
if __name__ == "__main__":
import doctest
......@@ -20,7 +20,6 @@ from environment import *
from glob import glob
import cPickle as pickle
from datetime import datetime
from utils import str_web
import subprocess
import threading
import traceback
......@@ -102,7 +101,7 @@ class Worker(object):
""" 'checking in ...')
if not self.scheduler.check_in():
logger.warning( "check_in failed : stopping")
logger.error( "check_in failed : stopping")
return 'checked in.')
......@@ -121,7 +120,7 @@ class Worker(object):
self.task = None"%d jobs completed" % n)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
logger.error( "Abort after catching signal %d" % e.signal)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment