Commit 29658a98 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

Merge branch 'bug1269' into v1.1

Conflicts:
	pipelet/scheduler.py
	pipelet/task.py
parents f66d2320 3bed6974
......@@ -41,6 +41,9 @@ I see at least before three projects to complete before making the first release
- [ ] Those files should enter in the hash
- [ ] Those files should be stored
* In my branch the only remaining working mode is debug
It seems that manager is not able to serialize the scheduler object anymore
When trying to pickle the s object I am answered that thread.lock object are not pickleable. Strange because lock are here for a while.
- [X] It seems that manager is not able to serialize the scheduler object anymore
When trying to pickle the s object I am answered that
thread.lock object are not pickleable. Strange because lock
are here for a while. -> due to compile, which is now move to
worker.
#!/usr/bin/env python
def main():
import glob
import os
dia = glob.glob("./*.dia")
for f in dia:
eps = f.replace(".dia", ".eps")
os.system("dia -e %s %s"%(eps,f))
os.system("epstopdf %s"%eps)
if __name__ == "__main__":
main()
\documentclass[hyperref={colorlinks=true}]{beamer}
\usepackage{graphicx}
\usepackage{amsmath}
\usepackage[utf8]{inputenc}
\usepackage{multicol}
\usepackage{ulem}
\usepackage{color}
\usepackage{xspace}
\usepackage{listings}
\usepackage{wasysym}
\useoutertheme{infolines}
\usepackage{hangcaption}
\newcommand{\pipelet}{\textbf{\small{PIPELET}}\xspace}
\title[Pipelet]{The \pipelet software}
\author[Betoule, Le Jeune]{Marc \textsc{Betoule}, Maude \textsc{Le Jeune}}
\institute[CNRS]{}
\date[2010/09/04]{september, 4th, 2010}
\newcommand{\unnumberedcaption}%
% {\@dblarg{\@unnumberedcaption\@captype}}
\begin{document}
\begin{frame}{\pipelet}
\tableofcontents
\end{frame}
\section{Context}
\begin{frame}
\tableofcontents[currentsection]
\end{frame}
\begin{frame}{Context and needs}
Usually in scientific data processing:
\begin{itemize}
\item Big data sets
\item Complex processing (multiple interdependant steps)
\item Optimal parameters unknown
\end{itemize}
\begin{centering} $\rightarrow$ Computational \textbf{and development} cost a lot.\\
\end{centering}
\begin{figure}
\includegraphics[width=0.50\textwidth]{pipelet_scheme_small2.pdf}
\end{figure}
The \pipelet software answers the 3 above items:
\begin{itemize}
\item Computational cost limited to its lower limit
\item Guarranty traceability
\item Offer comparison facilities
\end{itemize}
\end{frame}
\begin{frame}{The \pipelet software}
The main idea behind \pipelet is to:
\begin{itemize}
\item Cut the whole processing into \textbf{segments} (script files)
\item Save intermediate products on disk
\item Use an unique indentifier wrt code, parameters and I/Os.
\end{itemize}
\begin{figure}
\includegraphics[width=0.50\textwidth]{pipelet_scheme_small3.pdf}
\end{figure}
\pipelet is written in Python:
\begin{itemize}
\item High level language offering lots of functionalities
\item Known as a glue language ideal for interfacing heterogenous codes
\item Ease debugging and interactivity
\end{itemize}
\end{frame}
\section{How it works}
\begin{frame}
\tableofcontents[currentsection]
\end{frame}
\begin{frame}{The \pipelet big scheme}
\begin{figure}
\includegraphics[width=0.90\textwidth]{pipelet_scheme.pdf}
\end{figure}
\end{frame}
\subsection{Building a pipeline}
\begin{frame}[fragile]{Building a pipeline}
\begin{verbatim}P = Pipeline(pipedot, codedir='./', prefix='/data/...')
\end{verbatim}
\begin{figure}
\includegraphics[width=0.5\textwidth]{pipelet_scheme_small.pdf}
\end{figure}
\begin{itemize}
\item \verb pipedot is the string description of the pipeline
\begin{verbatim}pipedot = """
1->2->4
3->4
"""
\end{verbatim}
\item \verb codedir is the path of the processing code files (.py)
\item \verb prefix is the path of the processed data repository
\end{itemize}
\end{frame}
\subsection{Writing segment scripts}
\begin{frame}[fragile]{Writing segment scripts}
\begin{itemize}
\item A segment is a python script (\verb .py file)
\item It benefits from an improved namespace to:
\begin{itemize}
\item control the pipe parallelization scheme;
\begin{figure}
\includegraphics[width=0.98\textwidth]{seg_scheme.pdf}
\end{figure}
\item save and load I/O's and provide filenames;
\item save and load parameters;
\item execute or include subprocess
\end{itemize}
\end{itemize}
\end{frame}
\subsection{Running a pipeline}
\begin{frame}[fragile]{Running a pipeline}
The pipe engine converts each pair of (processing code, data to
process) into a \textcolor{blue}{task list}.
\begin{figure}
\includegraphics[width=0.80\textwidth]{task_scheme.pdf}
\end{figure}
One can empty the \textcolor{blue}{task list} in different modes:
\begin{itemize}
\item the interactive mode (or debugging mode)
\item the process/thread mode (for smp machine)
\item the batch mode (for cluster)
\end{itemize}
\end{frame}
\subsection{Browsing a pipeline}
\begin{frame}[fragile]{Browsing a pipeline : \href{http://localhost:8080}{http://localhost:8080}}
\begin{figure}
\includegraphics[width=0.70\textwidth]{snapshot.png}
\end{figure}
From the web interface one can: \\
\vspace{0.5cm}
\begin{tabular}{ll}
$\bullet$ Filter/delete pipe instances & from the pipeline page\\
$\bullet$ Highlight dependencies & from the segment page\\
$\bullet$ Read code & from the segment page\\
$\bullet$ Read log files & from the log page\\
$\bullet$ Download/visualize/delete product files & from the product page\\
\end{tabular}
\end{frame}
\section{Getting started}
\begin{frame}
\tableofcontents[currentsection]
\end{frame}
\begin{frame}[fragile]{Getting \pipelet}
$\rhd$ Download from \url{http://gitorious.org/pipelet}
\begin{itemize}
\item Git repository\\
\begin{centering}\verb!git clone git@gitorious.org:pipelet/pipelet.git!
\end{centering}
\item Open wiki including documentation
\end{itemize}
\vspace{0.5cm}
$\rhd$ Features and bugs are tracked from the IN2P3 forge.
\end{frame}
\section{Going further}
\begin{frame}
\tableofcontents[currentsection]
\end{frame}
\begin{frame}{The \pipelet actors}
\begin{figure}
\includegraphics[width=1\textwidth]{pipelet_actors.pdf}
\end{figure}
\end{frame}
\begin{frame}[fragile]{The pipeline object}
The \pipelet scheme is resumed by its segment's relations:
\begin{itemize}
\item a tree view (dot scheme)
\item a flat view
\begin{figure}
\includegraphics[width=0.5\textwidth]{pipeline.pdf}
\end{figure}
\end{itemize}
Segment's code files are found from:
\begin{itemize}
\item a local repository
\item \textcolor{red}{a git, CVS repository}
\end{itemize}
For each segment, a unique hash key is computed from:
\begin{itemize}
\item the segment code script (\verb seg_name_code.py )
\item the hooks scripts (\verb seg_name_hookname.py )
\end{itemize}
\textsl{removing blank lines and comments.}
\end{frame}
\begin{frame}[fragile]{The task object}
A task is the association of a \textcolor{blue}{segment} with its \textcolor{blue}{input} product, its
execution \textcolor{blue}{status} and its \textcolor{blue}{output} product(s). \\
\vspace{0.5cm}
The task attributs:
\begin{columns}
\begin{column}[l]{0.65\textwidth}
\begin{itemize}
\item segment name (string)
\item task input (list)
\item task output (list)
\item task identifier (integer)
\item status string \verb! queued, running, done, failed!
\item date string \verb! queued_on, begun_on, ended_on!
\item task parents (list of identifiers)
\end{itemize}
\end{column}
\begin{column}[r]{0.35\textwidth}
\begin{figure}
\includegraphics[width=1\textwidth]{task.pdf}
\end{figure}
\end{column}
\end{columns}
\end{frame}
\begin{frame}[fragile]{The scheduler object}
\begin{columns}
\begin{column}[l]{0.65\textwidth}
The \verb scheduler.push_next_seg() function:
\begin{itemize}
\item add tasks to the task list
\item all segment's tasks are pushed at the same time
\item using the flat view
\item if no task has never been done
\end{itemize}
\end{column}
\begin{column}[r]{0.35\textwidth}
\begin{figure}
\includegraphics[width=1\textwidth]{scheduler.pdf}
\end{figure}
\end{column}
\end{columns}
\vspace{0.5cm}
The tasks inputs are build from:
\begin{itemize}
\item the segment \verb seg_input and \verb seg_output variables
\item the segment \verb #multiplex directives
\verb!#multiplex cross_prod group_by '0' !\\
\verb!#multiplex cross_prod group_by 'p1' !\\
\verb!#multiplex cross_prod group_by 'p1[0]' !\\
\verb!#multiplex union !
\end{itemize}
\end{frame}
\begin{frame}[fragile]{The worker object}
\begin{figure}
\includegraphics[width=0.5\textwidth]{worker.pdf}
\end{figure}
The \verb worker.execute_task(task) function:
\begin{enumerate}
\item load environment namespace
\item put task inputs to namespace
\item \verb try to execute segment script using namespace
\item set new task status (\verb!done or failed!)
\item if \verb done get task output from namespace
\end{enumerate}
\end{frame}
\begin{frame}[fragile]{The tracker object}
\begin{figure}
\includegraphics[width=0.75\textwidth]{tracker.pdf}
\end{figure}
The \verb tracker.segment_registration() function:
\begin{itemize}
\item insert new entry in sql segments and segment relations tables
\end{itemize}
The \verb tracker.add_queued(task) function:
\begin{itemize}
\item insert new entry in sql tasks and task relations tables
\end{itemize}
The \verb tracker.update_status(task) function:
\begin{itemize}
\item update task entry in sql tasks table using \textcolor{blue}{asynchronous} request
\end{itemize}
\end{frame}
\begin{frame}{Adapt to DPC environment}
\begin{itemize}
\item put intermediate products into DMC
\begin{itemize}
\item need to provide filenames
\item including group name
\item and product identifier
\end{itemize}
\item link to the DPC modules and pipelines facilities ?
\begin{itemize}
\item build a pipelet module + parameter file from an existing pipeline ?
\item build as many modules + parameter files as segments ?
\item convert a pipeline into a DMC pipeline object ?
\end{itemize}
\item any more ambitious ideas ?
\end{itemize}
\end{frame}
\end{document}
\ No newline at end of file
......@@ -67,7 +67,7 @@ class EnvironmentBase():
-------
string, filename
"""
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input), x)
return path.join(self._worker.pipe.get_data_dir(self._worker.task.seg, prod=self._worker.task.task_input,version=self._worker.task.version), x)
def _get_namespace(self):
""" Prepare the namespace for a segment.
......
......@@ -24,6 +24,7 @@ from contextlib import closing
from environment import *
from task import Task
import multiplex
from glob import glob
class PipeException(Exception):
""" Extension of exception class.
......@@ -515,7 +516,7 @@ class Pipeline:
return path.join(self.get_curr_dir(seg),'stdout')
def get_data_dir(self, seg, prod=None):
def get_data_dir(self, seg, prod=None, version=0):
""" Return the data directory for the segment or a product
full name.
......@@ -529,10 +530,29 @@ class Pipeline:
string, segment directory or product full name.
"""
if prod is not None:
return path.join(self.get_data_dir(seg), str_file(prod))
if version>0:
return path.join(self.get_data_dir(seg), str_file(prod)+"_%d"%version)
else:
return path.join(self.get_data_dir(seg), str_file(prod))
else:
return path.join(self.get_curr_dir(seg), 'data')
def find_new_version(self, seg, prod=None):
""" Return a new version number from existing directory.
Parameters
----------
seg : string, segment name.
prod : string, product name.
Returns
-------
integer : version number
"""
d = self.get_data_dir(seg, prod)
v = len(glob(d+"*"))
return v
def get_log_dir (self):
""" Return the pipe log directory.
......@@ -545,13 +565,17 @@ class Pipeline:
"""
return path.join(self.get_curr_dir(seg),'%s.log'%seg)
def get_meta_file (self, seg, prod=-1):
def get_meta_file (self, seg, prod=-1, version=0):
""" Return the meta data filename
This routine is called for segment meta data storage and task meta data storage.
If the first case, meta data are stored in the segment curr_dir.
In the second case, meta data are stored in the task directory (prod may be None)
"""
if (prod == -1):
dirname = self.get_curr_dir(seg)
else:
dirname = self.get_data_dir(seg, prod)
dirname = self.get_data_dir(seg, prod, version)
return path.join(dirname,'%s.meta'%seg)
......
......@@ -234,9 +234,8 @@ class Scheduler():
failed = self.tracker.get_failed(seg) # failed tasks
failed_prod = [t.task_input for t in failed] # failed products
dstrp = [set(t.parents) for t in d] ## list of set
logger.info('Found %d done tasks segment %s'%(len(d),seg))
logger.info('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
......@@ -246,29 +245,34 @@ class Scheduler():
l = [Task(seg)]
logger.info('Found %d tasks in seg %s to get done'%(len(l),seg))
for t in l: # foreach task of the task list
# print seg
# print t.parents
# print t.task_input
if (t.task_input in failed_prod): # done but failed
#logger.debug("task already done and failed in seg %s"%seg)
continue
if not (t.task_input in dprod): # not done
if (not (t.task_input in dprod)): #not done wrt task_input
#logger.debug("pushing 1 task for seg %s"%seg)
self.put_task(t)
else: # done
else: # done (or not because parents changed)
#logger.debug("task already accomplished in segment %s"%seg)
# fetch the result of the task and store it in the task list
ind = dprod.index(t.task_input)
t = d[ind];
#try:
#logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
#except TypeError:
#logger.debug("No result to load from previously done task in segment %s"%(seg))
self.products_list.push(t)
self.nb_success = self.nb_success + 1
strp = set(t.parents)
try:
# fetch the result of the task and store it in the task list
if len(t.parents)==0:
ind = dprod.index(t.task_input) # look at task input if no parents
else:
ind = dstrp.index(strp) ## find parents set in list of done parents sets
t = d[ind];
#try:
#logger.debug("Loading %d results from previously done task in segment %s"%(len(t.task_output),seg))
#except TypeError:
#logger.debug("No result to load from previously done task in segment %s"%(seg))
self.products_list.push(t)
self.nb_success = self.nb_success + 1
except ValueError: ## parents do not match parents of the done list
t.version = self.pipe.find_new_version(t.seg, t.task_input) ## update version number.
self.put_task(t)
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment