Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 7c26bbe9 authored by Betoule Marc's avatar Betoule Marc
Browse files

Merge branch 'v1.0' of git://gitorious.org/pipelet/pipelet into v1.0

parents 4db85e44 b908e75d
......@@ -436,6 +436,10 @@ mode that enable to exploitation of data parallelism (in this case
running the four independent instances of the melt segment in
parallel), and how to provide web access to the results.
*** The exemple pipelines
**** fft
**** cmb
** Running Pipes
*** The sample main file
......
......@@ -71,7 +71,7 @@ def set_logger (pipe, log_level):
init_logger ('worker', work_log_file, level=log_level)
def launch_interactive(pipe, log_level=logging.DEBUG):
def launch_interactive(pipe, log_level=logging.INFO):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
......@@ -103,7 +103,7 @@ def launch_interactive(pipe, log_level=logging.DEBUG):
def launch_thread(pipe, n, log_level=logging.CRITICAL ):
def launch_thread(pipe, n, log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
......@@ -141,7 +141,7 @@ class SchedulerManager(BaseManager):
"""
pass
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.CRITICAL ):
def launch_process(pipe, n, address=('',50000), authkey='secret',log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
......@@ -181,7 +181,7 @@ def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.CRITICAL ):
def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging.WARNING ):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
......@@ -216,8 +216,10 @@ def launch_ssh(pipe, host_list, address=None, authkey='secret',log_level=logging
for w in processlist:
w.wait()
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.CRITICAL, cpu_time="2:00:00", server=False, job_header="""
#!/bin/bash
def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
......@@ -265,7 +267,7 @@ python -m pipelet.launchers -H %s -p %s -s %s
def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs",
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.DEBUG,buff=None):
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.INFO,buff=None):
"""submit a bunch of distant workers to the BQS system
at CCIN2P3.
"""
......
......@@ -20,7 +20,6 @@ from task import TaskList
from task import Task
import Queue
import tracker
import pprint
import shutil
import threading
import logging
......@@ -184,6 +183,11 @@ class Scheduler():
except:
d = {}
d['parents'] = lst_dir
try:
d['param'] = self.products_list._list[seg][-1].param
except:
logger.info("no tag saved for seg %s"%seg)
with closing(file(fn, 'w')) as f:
r = pickle.dump(d,f)
......@@ -224,20 +228,20 @@ class Scheduler():
fid = open(dest, "w")
fid.write(r.get_hook_string(seg, h))
fid.close()
self.store_meta_seg(seg)
##self.store_meta_seg(seg)
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
dprod = [t.task_input for t in d] ## done products
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
logger.info('Found %d done tasks segment %s'%(len(d),seg))
logger.info('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
l = self.products_list.multiplex(seg, parents, self.pipe.repository.get_directive(Multiplex,seg))
## task with no input
if not l:
l = [Task(seg)]
logger.debug('Found %d tasks in seg %s to get done'%(len(l),seg))
logger.info('Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
if (t.task_input in failed_prod): # done but failed
logger.debug("task already done and failed in seg %s"%seg)
......@@ -306,7 +310,7 @@ class Scheduler():
while(self.task_queue.get(block=False)):
self.task_queue.task_done()
except Queue.Empty:
logger.info("Empty Queue")
logger.debug("Empty Queue")
def run(self):
""" Start the scheduler.
......@@ -321,16 +325,16 @@ class Scheduler():
for k,v in self.pipe._input.iteritems():
t = Task(self.pipe.get_parents(k)[0], task_output=v)
self.products_list.push(t)
logger.info("Pushing phantom task %s"%str(t))
logger.debug("Pushing phantom task %s"%str(t))
try:
for s in self.pipe.flatten():
logger.info(s)
self.push_next_seg(s)
self.task_queue.join()
if self.abort:
logger.warning("Interrupting the main loop")
break
if self.nb_success:
self.store_meta_seg(s)
self.nb_success = 0
else:
logger.warning("No successfull job in the previous step %s, stopping here."%s)
......
......@@ -21,7 +21,6 @@ import shutil
import threading
import Queue
import logging
from utils import str_date
from contextlib import closing
logger = logging.getLogger("scheduler")
......@@ -214,15 +213,15 @@ class SqliteTracker(Tracker,threading.Thread):
'select seg_id from segments where curr_dir = ? limit 1',
(curr_dir,)).fetchone()[0]
self.seg_id_cache[s] = seg_id
logger.info("Segment %s instance (%s) already registered in db."%(s, curr_dir))
logger.debug("Segment %s instance (%s) already registered in db."%(s, curr_dir))
except TypeError:
logger.info("Creating segment %s instance (%s)."%(s, curr_dir))
logger.debug("Creating segment %s instance (%s)."%(s, curr_dir))
c = self.conn.execute(
'insert into segments (seg, curr_dir, comment) values (?, ?, ?)'
,(s, curr_dir, docline))
seg_id = c.lastrowid
self.seg_id_cache[s] = seg_id
logger.info("Storing connectivity for segment %s."%s)
logger.debug("Storing connectivity for segment %s."%s)
for p in self.pipe._parents[s]:
self.conn.execute(
......
......@@ -313,37 +313,6 @@ def create_pipe(pipename, prefix=[]):
print "You should be able to browse the result on the web page http://localhost:8080\n"
print "Retype the current command line to get those instructions back."
def hash_file(codefile):
"""Use cheksum algorithm to return a unique key for file.
DEPRECATED: use crc32
Parameters
----------
codefile: string.
Returns
-------
string
Examples
--------
>>> print(hash_file('../test/codesample.py'))
3DDAAEC1
"""
if not path.exists(codefile):
logger.warning('Try to compute checksum of non existing file %s.'%codefile)
return None
sum_binary = "cksum"
# Computes checksum
sp = subprocess.Popen([sum_binary, codefile], stdout=subprocess.PIPE)
res = sp.communicate()[0]
key = "%X" % int(res.split(' ')[0])
if sp.returncode != 0:
logger.critical('Error while computing checksum of file %s.'%codefile)
return key
def get_hashkey(s, starthash):
"""Return a unique key which identify code string
......@@ -523,3 +492,4 @@ def make_dict(l):
if __name__ == "__main__":
import doctest
doctest.testmod()
......@@ -20,7 +20,6 @@ from environment import *
from glob import glob
import cPickle as pickle
from datetime import datetime
from utils import str_web
import subprocess
import threading
import traceback
......@@ -102,7 +101,7 @@ class Worker(object):
"""
logger.info( 'checking in ...')
if not self.scheduler.check_in():
logger.warning( "check_in failed : stopping")
logger.error( "check_in failed : stopping")
return
logger.info( 'checked in.')
......@@ -121,7 +120,7 @@ class Worker(object):
self.task = None
logger.info("%d jobs completed" % n)
except AbortError, e:
logger.warning( "Abort after catching signal %d" % e.signal)
logger.error( "Abort after catching signal %d" % e.signal)
self.scheduler.requeue(task)
finally:
self.terminate()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment