Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Docker-in-Docker (DinD) capabilities of public runners deactivated.
More info
Open sidebar
pipelet
Pipelet
Commits
556aec52
Commit
556aec52
authored
Jan 20, 2011
by
Maude Le Jeune
Browse files
Task #1263 refactoring of repository
parent
bf455f57
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
138 additions
and
211 deletions
+138
-211
pipelet/pipeline.py
pipelet/pipeline.py
+5
-2
pipelet/repository.py
pipelet/repository.py
+124
-190
pipelet/scheduler.py
pipelet/scheduler.py
+5
-9
pipelet/tracker.py
pipelet/tracker.py
+0
-6
pipelet/web.py
pipelet/web.py
+1
-1
test/first_test/main.py
test/first_test/main.py
+3
-3
No files found.
pipelet/pipeline.py
View file @
556aec52
...
...
@@ -115,7 +115,8 @@ class Pipeline:
## string, the location of the segment source code
self
.
repository
=
None
if
code_dir
:
self
.
repository
=
LocalRepository
(
code_dir
)
print
self
.
_seg_list
self
.
repository
=
LocalRepository
(
self
.
_seg_list
,
code_dir
)
## string, indicates where to save the pipeline products.
self
.
_prefix
=
path
.
realpath
(
prefix
)
if
not
os
.
path
.
exists
(
prefix
):
...
...
@@ -216,6 +217,7 @@ class Pipeline:
c
=
s
.
find
(
sp
,
a
.
start
())
a
=
_edge
.
search
(
s
,
c
)
segs
=
set
(
flatten
(
r
))
self
.
_seg_list
=
segs
self
.
_parents
=
dict
([(
seg
,[])
for
seg
in
segs
])
self
.
_children
=
dict
([(
seg
,[])
for
seg
in
segs
])
for
c
in
r
:
...
...
@@ -279,6 +281,7 @@ class Pipeline:
>>> print(a._children)
{'second': [], 'first': ['second']}
"""
self
.
_seg_list
=
seg_list
self
.
_children
=
dict
(
zip
(
seg_list
,[[
s
]
for
s
in
seg_list
[
1
:]]
+
[[]]))
self
.
_parents
=
dict
(
zip
(
seg_list
,[[]]
+
[[
s
]
for
s
in
seg_list
[:
-
1
]]))
...
...
@@ -395,7 +398,7 @@ class Pipeline:
"""
hash
=
crc32
()
currdir
=
self
.
_prefix
self
.
repository
.
save_all_string
(
seg
)
#
self.repository.save_all_string(seg)
for
k
in
self
.
_parents
[
seg
]:
h
,
currdir
=
self
.
_compute_hash
(
k
)
hash
.
update
(
h
)
...
...
pipelet/repository.py
View file @
556aec52
...
...
@@ -26,50 +26,99 @@ class Repository:
A reposiroty is the location where to find the segment's source
code files. It returns the full path of all files that will be
used to execute the segment (code,
visu, args
, ...). By default,
used to execute the segment (code,
hook
, ...). By default,
local repositories are used, but it can also be a version control
system (git, CVS, ...).
"""
_ext_re
=
re
.
compile
(
'.py$|.m$'
)
def
__init__
(
self
):
def
__init__
(
self
,
lstseg
):
""" Initialize a repository.
"""
## dict, code string corresponding to each segment
self
.
_code
=
{}
## dict, code string corresponding to each hook
self
.
_hook
=
{}
## dict, code string corresponding to each external dependency
self
.
_deps
=
{}
## dict, code string
self
.
_all_string
=
{}
self
.
_fill_dict
(
lstseg
)
def
_
ext_filter
(
self
,
f
):
"""
Check if the extension file is ok
.
def
_
match_fn
(
self
,
fns
,
seg
,
hook
=
None
):
"""
Find filename matching pipelet rules for a given segment
.
Parameters
----------
f : string, file name
fns : string list, list of candidates filenames
seg: string, segment name
hook: string, hook name
Returns
_______
boolean
-------
string, matching filename
"""
if
hook
is
None
:
for
s
in
range
(
len
(
seg
)):
def
match
(
x
):
return
path
.
basename
(
x
)
==
seg
[
0
:
len
(
seg
)
-
s
]
+
'.py'
fn
=
filter
(
match
,
fns
)
if
len
(
fn
)
>
0
:
return
fn
[
0
]
else
:
for
s
in
range
(
len
(
seg
)):
for
h
in
range
(
len
(
hook
)):
def
match
(
x
):
return
path
.
basename
(
x
)
==
seg
[
0
:
len
(
seg
)
-
s
]
+
'_'
+
hook
[
0
:
len
(
hook
)
-
h
]
+
'.py'
fn
=
filter
(
match
,
fns
)[
0
]
if
len
(
fn
)
>
0
:
return
fn
[
0
]
raise
Exception
(
'No source file corresponding to segment %s and hook %s'
%
(
seg
,
hook
))
def
_fill_dict
(
self
,
lstseg
):
""" Initialize all dictionnaries.
Examples
--------
>>> c = LocalRepository('../test')
>>> print(c._ext_filter('pipe.py'))
True
>>> print(c._ext_filter('pipe.py~'))
False
>>> print(c._ext_filter('pipe.m'))
True
Parameters
----------
lstseg : string list: segment names
"""
fns
=
self
.
get_fns
()
for
s
in
lstseg
:
## code
fn
=
self
.
_match_fn
(
fns
,
s
)
source
=
self
.
get_code_source
(
fn
)
self
.
_code
[
s
]
=
compile
(
source
,
fn
,
"exec"
)
self
.
_all_string
[
s
]
=
source
## hooks
lsthook
=
self
.
get_hook_list
(
source
)
for
h
in
lsthook
:
fn
=
self
.
_match_fn
(
fns
,
s
,
h
)
source
=
self
.
get_code_source
(
fn
)
self
.
_hook
[
s
][
h
]
=
compile
(
so
,
fn
,
"exec"
)
self
.
_all_string
[
s
]
+=
source
## deps
depend
=
self
.
get_directive
(
Depend
,
self
.
_all_string
[
s
])
self
.
_deps
[
s
]
=
""
for
fn
in
depend
.
deps
:
source
=
self
.
get_code_source
(
fn
)
self
.
_deps
[
s
]
+=
source
def
get_all_string
(
self
,
seg
):
""" Return all code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
code string
"""
return
bool
(
self
.
_
ext_re
.
match
(
path
.
splitext
(
f
)[
1
]))
return
self
.
_
all_string
[
seg
]
def
get_directive
(
self
,
Direct
,
seg
):
def
get_directive
(
self
,
Direct
,
code
):
""" Initialize a directive object from segment code string.
Parameters
...
...
@@ -81,7 +130,7 @@ class Repository:
-------
directive instance
"""
c
=
self
.
get_code_string
(
seg
)
+
'
\n
'
.
join
(
self
.
_hook
[
seg
].
values
())
c
=
code
d
=
Direct
()
for
l
in
c
.
splitlines
():
try
:
...
...
@@ -94,102 +143,68 @@ class Repository:
except
StopIteration
:
pass
return
d
def
get_all_string
(
self
,
seg
):
""" Return the code string that should enter in the
computation of the hash key.
def
get_hook_list
(
self
,
code
):
""" Return the list of hooks
Parameters
----------
seg
: string
, name of the segment.
code
: string
Returns
-------
string
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_all_files('first'))
['../test/seg_first_code.py']
string list
"""
s
=
self
.
_code
[
seg
]
for
k
,
v
in
self
.
_hook
[
seg
].
items
():
s
=
s
+
v
s
=
s
+
self
.
_deps
[
seg
]
return
s
def
save_all_string
(
self
,
seg
):
""" Save code file and hooks into dictionnaries
hook_list
=
[]
for
l
in
code
.
split
(
"hook("
)[
1
:]:
hook_list
.
append
(
l
.
split
(
","
)[
0
][
1
:
-
1
])
return
hook_list
def
get_code_string
(
self
,
seg
):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
Returns
-------
code object
"""
file
=
self
.
get_code_file
(
seg
)
if
file
:
fid
=
open
(
file
,
"r"
)
code
=
fid
.
read
()
fid
.
close
()
self
.
_code
[
seg
]
=
code
else
:
self
.
_code
[
seg
]
=
""
lst_hook
=
self
.
get_hook_list
(
seg
)
self
.
_hook
[
seg
]
=
{}
for
h
in
lst_hook
:
file
=
self
.
get_hook_file
(
seg
,
h
)
if
file
:
fid
=
open
(
file
,
"r"
)
code
=
fid
.
read
()
fid
.
close
()
self
.
_hook
[
seg
][
h
]
=
code
else
:
self
.
_hook
[
seg
][
h
]
=
""
depend
=
self
.
get_directive
(
Depend
,
seg
)
self
.
_deps
[
seg
]
=
""
for
file
in
depend
.
deps
:
fid
=
open
(
file
,
"r"
)
code
=
fid
.
read
()
fid
.
close
()
self
.
_deps
[
seg
]
+=
code
return
self
.
_code
[
seg
]
def
get_
co
de_string
(
self
,
seg
):
""" Return
segment
code as a string.
def
get_de
ps
_string
(
self
,
dep
):
""" Return
external dependency
code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg
: string,
segment
name
dep
: string,
dependency
name
Returns
-------
string
"""
return
self
.
_
co
de
[
seg
]
return
self
.
_de
ps
[
seg
]
def
get_hook_
li
st
(
self
,
seg
):
""" Return
the list of hooks
def
get_hook_st
ring
(
self
,
seg
,
hook
):
""" Return
hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string lis
t
code objec
t
"""
hook_list
=
[]
c
=
self
.
_code
[
seg
]
for
l
in
c
.
split
(
"hook("
)[
1
:]:
hook_list
.
append
(
l
.
split
(
","
)[
0
][
1
:
-
1
])
return
hook_list
return
self
.
_hook
[
seg
][
hook
]
def
get_docline
(
self
,
seg
):
""" Return the segment synopsis doc line.
...
...
@@ -203,23 +218,7 @@ class Repository:
string list, name of source files.
"""
import
pydoc
return
pydoc
.
source_synopsis
(
self
.
get_code_file
(
seg
))
def
get_hook_string
(
self
,
seg
,
hook
):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
Parameters
----------
seg: string, segment name
hook: string, hook name
Returns
-------
string
"""
return
self
.
_hook
[
seg
][
hook
]
return
pydoc
.
source_synopsis
(
self
.
_code
[
seg
].
co_filename
)
class
LocalRepository
(
Repository
):
""" A local repository.
...
...
@@ -228,7 +227,7 @@ class LocalRepository(Repository):
segment's code file, plus an optionnal segment's library directory.
"""
def
__init__
(
self
,
src_path
,
lib_path
=
None
):
def
__init__
(
self
,
lst_seg
,
src_path
):
""" Initialize a local repository.
Parameters
...
...
@@ -241,100 +240,35 @@ class LocalRepository(Repository):
self
.
_code
=
{}
self
.
_hook
=
{}
self
.
_deps
=
{}
self
.
_all_string
=
{}
## string, where to find segment's source code file
self
.
src_path
=
path
.
expanduser
(
path
.
expandvars
(
src_path
))
## string, where to find segment's library
self
.
lib_path
=
lib_path
def
get_code_file
(
self
,
seg
):
""" Return the filename of the segment code.
self
.
_fill_dict
(
lst_seg
)
def
get_code_source
(
self
,
filename
):
""" Read source code from file
Parameters
----------
seg : string, name of the segment.
filename: string
Returns
-------
string, name of the segment's code file.
>>> c = LocalRepository('../test')
>>> print(path.samefile(c.get_code_file('gal'),'../lib/gal.m'))
True
>>> print(path.samefile(c.get_code_file('first'),'../test/first.py'))
True
string, content of file
"""
for
s
in
range
(
len
(
seg
)):
search
=
seg
[
0
:
len
(
seg
)
-
s
]
try
:
f
=
[
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
src_path
,
'%s.*'
%
search
)))[
0
]]
except
:
f
=
[]
if
len
(
f
)
==
0
:
try
:
f
=
[
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
lib_path
,
'%s.*'
%
search
)))[
0
]]
except
:
f
=
[]
if
len
(
f
)
==
0
:
try
:
s
=
re
.
findall
(
'^(.+?)\d+$'
,
search
)[
0
]
f
=
[
self
.
get_code_file
(
s
)]
except
:
if
len
(
search
)
==
1
:
raise
Exception
(
'No source file corresponding to segment %s'
%
seg
)
if
len
(
f
)
>
0
:
logger
.
info
(
"segment script %s found"
%
f
[
0
])
return
f
[
0
]
def
get_hook_file
(
self
,
seg
,
hook
):
""" Check the existence of a local hook file for the
segment and return its full name.
Parameters
----------
seg : string, name of the segment.
hook : string, name of the hook.
fid
=
open
(
path
.
join
(
self
.
src_path
,
filename
),
"r"
)
code
=
fid
.
read
()
fid
.
close
()
return
code
def
get_fns
(
self
):
""" Return filename candidates.
Returns
-------
string, name of the hook file.
Exemples
--------
>>> c = LocalRepository('../test')
>>> print(c.get_hook_file('dbm', 'preproc'))
['../test/dbm_preproc.py']
list of string basename.
"""
for
s
in
range
(
len
(
seg
)):
for
h
in
range
(
len
(
hook
)):
search_s
=
seg
[
0
:
len
(
seg
)
-
s
]
search_h
=
hook
[
0
:
len
(
hook
)
-
h
]
try
:
f
=
[
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
src_path
,
'%s_%s.*'
%
(
search_s
,
search_h
))))[
0
]]
except
:
f
=
[]
if
len
(
f
)
==
0
:
try
:
f
=
[
filter
(
self
.
_ext_filter
,
glob
(
path
.
join
(
self
.
lib_path
,
'%s_%s.*'
%
(
search_s
,
search_h
))))[
0
]]
except
:
f
=
[]
if
len
(
f
)
==
0
:
try
:
s
=
re
.
findall
(
'^(.+?)\d+$'
,
search_s
)[
0
]
f
=
[
self
.
get_hook_file
(
s
,
search_h
)]
except
:
if
(
len
(
search_s
)
==
1
)
and
(
len
(
search_h
)
==
1
):
logger
.
warning
(
'No source file corresponding to hook'
)
return
None
if
len
(
f
)
>
0
:
logger
.
info
(
"hook script %s found"
%
f
[
0
])
return
f
[
0
]
return
glob
(
path
.
join
(
self
.
src_path
,
'*.py'
))
class
GitRepository
(
Repository
):
...
...
pipelet/scheduler.py
View file @
556aec52
...
...
@@ -213,22 +213,18 @@ class Scheduler():
if
not
path
.
exists
(
d
):
os
.
mkdir
(
d
)
r
=
self
.
pipe
.
repository
f
=
r
.
get_code_
file
(
seg
)
f
=
r
.
get_code_
string
(
seg
).
co_filename
if
f
:
dest
=
d
+
'/'
+
os
.
path
.
basename
(
f
)
fid
=
open
(
dest
,
"w"
)
fid
.
write
(
r
.
get_code_string
(
seg
))
fid
.
close
()
os
.
system
(
"cp %s %s"
%
(
dest
,
f
))
lst_hook
=
r
.
get_hook_list
(
seg
)
for
h
in
lst_hook
:
f
=
r
.
get_hook_
file
(
seg
,
h
)
f
=
r
.
get_hook_
string
(
seg
,
h
)
.
co_filename
if
f
:
dest
=
d
+
'/'
+
os
.
path
.
basename
(
f
)
fid
=
open
(
dest
,
"w"
)
fid
.
write
(
r
.
get_hook_string
(
seg
,
h
))
fid
.
close
()
##self.store_meta_seg(seg)
os
.
system
(
"cp %s %s"
%
(
dest
,
f
))
parents
=
self
.
pipe
.
get_parents
(
seg
)
## parents segments
d
=
self
.
tracker
.
get_done
(
seg
)
## done tasks
dprod
=
[
t
.
task_input
for
t
in
d
]
## done products
...
...
pipelet/tracker.py
View file @
556aec52
...
...
@@ -374,12 +374,6 @@ class SqliteTracker(Tracker,threading.Thread):
# May hurt performances very badly for short tasks
self
.
_asynchronous_request
(
'update segments set param=? where seg_id =?'
,
(
t
.
param
,
self
.
seg_id_cache
[
t
.
seg
]))
fn
=
self
.
pipe
.
get_meta_file
(
t
.
seg
)
with
closing
(
file
(
fn
,
'r'
))
as
f
:
d
=
pickle
.
load
(
f
)
d
[
'param'
]
=
t
.
param
with
closing
(
file
(
fn
,
'w'
))
as
f
:
r
=
pickle
.
dump
(
d
,
f
)
if
status
==
'done'
or
status
==
'failed'
:
t
.
store_meta
(
self
.
pipe
.
get_meta_file
(
t
.
seg
,
prod
=
t
.
task_input
))
return
t
...
...
pipelet/web.py
View file @
556aec52
...
...
@@ -44,7 +44,7 @@ class Web:
The informations about the pipeline are retrieve from its data base file.
Pipelines are printed through a tree view with links to all segments and products.
"""
p
=
profiler
.
Profiler
(
'/home/betoule/pipewebprof/'
)
#
p = profiler.Profiler('/home/betoule/pipewebprof/')
## boolean, Web object is exposed.
exposed
=
True
def
__init__
(
self
,
db_file
,
name
):
...
...
test/first_test/main.py
View file @
556aec52
import
pipelet.pipeline
as
pipeline
from
pipelet.launchers
import
launch_process
from
pipelet.launchers
import
launch_process
,
launch_interactive
import
os
pipedot
=
"""
...
...
@@ -8,5 +8,5 @@ third -> fourth;
"""
P
=
pipeline
.
Pipeline
(
pipedot
,
code_dir
=
'./'
,
prefix
=
'./'
)
launch_process
(
P
,
1
)
w
,
t
=
launch_interactive
(
P
)
w
.
run
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment