Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Docker-in-Docker (DinD) capabilities of public runners deactivated.
More info
Open sidebar
pipelet
Pipelet
Commits
339c4e0e
Commit
339c4e0e
authored
Sep 02, 2010
by
Marc Betoule
Browse files
begin the env job
parent
2b6d68c3
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
193 additions
and
0 deletions
+193
-0
pipelet/environnement.py
pipelet/environnement.py
+193
-0
No files found.
pipelet/environnement.py
0 → 100644
View file @
339c4e0e
class
EnvironmentBase
():
def
_get_data_fn
(
self
,
x
):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return
path
.
join
(
self
.
pipe
.
get_data_dir
(
self
.
task
.
seg
,
prod
=
self
.
task
.
task_input
),
x
)
def
_make_environment
(
self
):
""" Prepare the namespace for a segment.
Returns
-------
Dictionnary, namespace containing all the
"""
class
Environment
(
EnvironmentBase
):
def
get_data_fn
(
self
,
x
):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return
self
.
_get_data_fn
(
x
)
def
logged_subprocess
(
worker
,
args
,
shell
=
False
):
""" Execute a subprocess and log its output.
Create files process_name.log and process_name.err
Parameters
----------
args: a Task argument
Returns
-------
tuple (outputfile, errorfile)
"""
proc
=
args
[
0
]
of
=
worker
.
_get_data_fn
(
proc
+
'.log'
)
ef
=
worker
.
_get_data_fn
(
proc
+
'.err'
)
o
=
file
(
of
,
'w'
)
e
=
file
(
ef
,
'w'
)
o
.
write
(
'#'
+
' '
.
join
([
str
(
a
)
for
a
in
args
])
+
'
\n
'
)
o
.
flush
()
subprocess
.
Popen
(
args
,
stdout
=
o
,
stderr
=
e
,
shell
=
shell
).
communicate
()[
0
]
o
.
close
()
e
.
close
()
return
(
of
,
ef
)
def
get_fig_fn
(
self
,
x
):
""" Complete the filename with the path to the working
directory.
Parameters
----------
x: string, filename suffix
Returns
-------
string, filename
"""
return
path
.
join
(
self
.
pipe
.
get_fig_dir
(
self
.
task
.
seg
,
prod
=
self
.
task
.
task_input
),
x
)
def
get_tmp_fn
(
self
):
""" Obtain a temporary filename
Note : has to be part of the segment execution environment
The temporary file is added to the intern list for future removal.
Returns
-------
string, temporary filename.
"""
tf
=
path
.
join
(
self
.
work_dir
,
str
(
datetime
.
now
().
toordinal
()))
self
.
_tmpf
.
append
(
tf
)
return
tf
def
glob_seg
(
self
,
x
,
y
):
""" Return the list of filename matching y in the working
directory of segment x.
Parameters
----------
x: string, segment name
y: string, regexp of file to glob.
Returns
-------
list of filenames.
"""
segx
=
self
.
pipe
.
find_seg
(
self
.
task
.
seg
,
x
)
return
glob
(
path
.
join
(
self
.
pipe
.
get_data_dir
(
segx
),
y
))
\
+
glob
(
path
.
join
(
self
.
pipe
.
get_data_dir
(
segx
),
path
.
join
(
'*/'
,
y
)))
def
load_param
(
self
,
seg
,
glo
,
param_name
=
'*'
):
""" Update the global dictionnary with parameters of a
segment.
Parameters
----------
seg : string, segment name
glo : dict, the global dictionnary
param_name : string list, parameters name.
"""
segx
=
self
.
pipe
.
find_seg
(
self
.
task
.
seg
,
seg
)
if
not
path
.
exists
(
self
.
pipe
.
get_param_file
(
segx
)):
return
self
.
load_products
(
self
.
pipe
.
get_param_file
(
segx
),
glo
,
param_name
=
param_name
)
def
save_products
(
self
,
filename
,
glo
,
param_name
=
'*'
):
""" Use pickle to save a part of a given namespace.
Parameters
----------
filename : string, save in this filename.
glo : dict, the namespace.
param_name : string list, name of objects to save. If '*' save
everything in the dictionnary.
"""
new_dict
=
{}
if
param_name
==
'*'
:
param_name
=
glo
.
keys
()
for
k
in
param_name
:
try
:
new_dict
[
k
]
=
glo
[
k
]
except
KeyError
:
logger
.
warning
(
'Fail to save object %s in file %s'
%
(
k
,
filename
))
f
=
file
(
filename
,
'w'
)
pickle
.
dump
(
new_dict
,
f
)
f
.
close
()
def
load_products
(
self
,
filename
,
glo
,
param_name
=
'*'
):
""" Update a namespace by unpickling requested object from the
file.
Parameters
----------
filename: string, the pickle filename.
glo : dict, the namespace to update.
param_name : string list, names of object to unpickle. If '*',
everything from the file is loaded.
"""
try
:
f
=
file
(
filename
)
new_dict
=
pickle
.
load
(
f
)
f
.
close
()
except
IOError
:
logger
.
warning
(
'No such file: %s'
%
filename
)
except
UnpicklingError
:
logger
.
warning
(
'Failed to unpickle from file: %s'
%
filename
)
f
.
close
()
if
param_name
==
'*'
:
param_name
=
new_dict
.
keys
()
for
k
in
param_name
:
try
:
glo
[
k
]
=
new_dict
[
k
]
except
KeyError
:
logger
.
warning
(
'Fail to load object %s from file %s'
%
(
k
,
filename
)
)
def
hook
(
self
,
hook_name
,
glo
):
""" Execute hook code.
Search for an extra segment code file, and update dictionnary
with the result of its execution.
Parameters
----------
hook_name: string, hook name
glo: dict, global dictionnary to update.
"""
code
=
self
.
pipe
.
repository
.
get_hook_string
(
self
.
task
.
seg
,
hook_name
)
if
code
:
exec
(
code
,
glo
)
else
:
print
"No hook file named %s for seg %s"
%
(
self
.
task
.
seg
,
hook_name
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment