Commit 8db50e34 authored by Maude Le Jeune's avatar Maude Le Jeune
Browse files

one the way towards plugin pipeline views

parent a975fe41
......@@ -2,7 +2,11 @@ try:
import sqlite3
except ImportError:
import pysqlite2 as sqlite3
from glob import glob
import os
from contextlib import closing
import pickle
import shutil
def old_to_new (old_file, new_file):
""" Convert an old database to the new database format."""
conn1 = sqlite3.connect(old_file)
......@@ -32,3 +36,248 @@ def change_root(sql_file,old_prefix, new_prefix):
with conn1:
conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix))
conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
def get_lst_tag (db_file):
""" Return the list of existing tags
Tags are ; separated in the db.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
lst = []
with conn:
l = conn.execute('select tag from segments').fetchall()
for s in l:
if s[0] is not None:
str_tag = s[0].split(";")
for e in str_tag:
if e:
lst.append(e)
return lst
def get_lst_date (db_file):
""" Return the list of existing dates
Date strings are picked from queued_on field corresponding to the
first task of each segment.
Returns
-------
list of string
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select queued_on from tasks group by queued_on').fetchall()
return l
def add_tag (db_file, segid, tag):
""" Add new tag to the database
Check that the tag do not exist yet for this id
Parameters
----------
segid: string. List of seg_id - separated. (- at the end of string also)
tag: string. Tags are ; separated in the db.
"""
seglst = segid.split('-')
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
for segid in seglst:
if segid:
with conn:
l = conn.execute('select tag,curr_dir from segments where seg_id=?',(segid,)).fetchone()
lst_tag = []
if l[0] is not None and l:
lst_tag = l[0].split(";")
lst_tag.append(tag)
lst_tag = list(set(lst_tag))
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid))
fn = glob(os.path.join(l[1], "*.meta"))
_update_meta(fn, str_tag)
def _update_meta(fn, str_tag):
""" Update meta file with current tag value.
fn: string, meta file name
tag: string, tag string
"""
if fn:
with closing(file(fn[0], 'r')) as f:
d = pickle.load(f)
d['tag'] = str_tag
with closing(file(fn[0], 'w')) as f:
r = pickle.dump(d,f)
def del_tag (db_file, tag):
""" Delete tag from the database
Parameters
----------
tag: string.
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select seg_id, tag, curr_dir from segments where tag like ?',("%"+tag+"%",)).fetchall()
for s in l:
lst_tag = s[1].split(";")
lst_tag.remove(tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0]))
fn = glob(os.path.join(s[2], "*.meta"))
_update_meta(fn, str_tag)
def _delseg(db_file, lst_seg):
""" Delete a pipeline instance.
Delete all segments and products directories of a pipeline instance.
Parameters
----------
lst_seg: integer list
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
segid = lst_seg[0]
with conn:
currdir = conn.execute('select curr_dir from segments where seg_id = ?',(int(segid),)).fetchone()[0]
l = conn.execute('select seg_id from segments where curr_dir like ?',(currdir+'%',))
for e in l:
lst_seg.remove(e[0])
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id in (select task_id from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?))',(currdir+'%',))
## delete from tasks
l = conn.execute('delete from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments_relations
l = conn.execute('delete from segment_relations where child_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments
l = conn.execute('delete from segments where curr_dir like ?',(currdir+'%',))
conn.close()
try:
shutil.rmtree(currdir)
except:
pass
if lst_seg:
_delseg(lst_seg)
def _deltask(db_file, lst_task):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
Parameters
----------
lst_tag: string, list of pipe id '-' separated
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
## delete from tasks
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
conn.close()
try:
shutil.rmtree(str_input)
except:
pass
if lst_task:
_deltask(lst_task)
def _get_fathers(db_file, segid):
""" Append upstream pipeline instance paths to the current path.
Return a list which contains all upstream segment instances
id for a given segment instance. This is used to print a pipeline
tree view with all dependencies.
Parameters
----------
segid: id of the leaf segment.
Returns
-------
list of segid, for the upstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select father_id from segment_relations where child_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_fathers(l[0])
return lstid
def _get_children(db_file, segid):
""" Return the list of pipeline instance paths which depend on seg
Return a list which contains all downstream segment instances
id for a given segment instance. This is used to delete all dependencies.
Parameters
----------
segid: id of the segment.
Returns
-------
list of segid, for the downstream segment instances.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from segment_relations where father_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children(l[0])
return lstid
def _get_children_task(db_file, taskid):
""" Return the list of tasks which depend on task.
Return a list which contains all downstream task instances
id for a given task instance. This is used to delete all dependencies.
Parameters
----------
taskid: id of the task
Returns
-------
list of taskid, for the downstream task instances.
"""
lstid = [int(taskid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from task_relations where father_id = ?'
,(taskid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children_task(l[0])
return lstid
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment