Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-53292.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implemented `wmsAttemptNum` with generic wms variable placeholders.
5 changes: 5 additions & 0 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,11 @@ Supported settings
The parent directory for the WMS-id link. Defaults to ``${PWD}/bps_links``.
(see :ref:`bps-softlink`)

**wmsAttemptNum**
A placeholder for the WMS-specific job attempt number (0-based) that can be
used in ``runQuantumCommand`` lines or ``environment`` sections. Check
WMS-specific documentation to see if particular WMS plugin supports it.

.. _pickle: https://docs.python.org/3/library/pickle.html
.. _ref: https://stackoverflow.com/a/38971446

Expand Down
12 changes: 10 additions & 2 deletions python/lsst/ctrl/bps/bps_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,10 @@ def search(self, key, opt=None):
value = re.sub(r"<ENV:([^>]+)>", r"$\1", value)
value = expandvars(value)
elif opt.get("replaceEnvVars", False):
value = re.sub(r"\${([^}]+)}", r"<ENV:\1>", value)
value = re.sub(r"\$(\S+)", r"<ENV:\1>", value)
# Don't replace double dollar signs or $( to allow
# pass-through to WMS
value = re.sub(r"\${([^$(}]+)}", r"<ENV:\1>", value)
value = re.sub(r"\$([^$(}]+)", r"<ENV:\1>", value)

if opt.get("replaceVars", True):
value = self.replace_vars(value, opt)
Expand Down Expand Up @@ -409,6 +411,9 @@ def replace_vars(self, value: str, opt: dict[str, Any]) -> str:
for name in opt.get("skipNames", {}):
value = value.replace(f"{{{name}}}", f"<BPSTMP2:{name}>")

# Replace special keys for WMS to fill in.
value = re.sub(r"{wms([^}]+)}", lambda x: f"<WMS:{x[1][0].lower() + x[1][1:]}>", value)

value = self.formatter.format(value, self, opt)

# Replace any temporary place holders.
Expand All @@ -426,6 +431,9 @@ def replace_vars(self, value: str, opt: dict[str, Any]) -> str:
if "bpsEval" in value:
raise ValueError(f"Unparsable bpsEval in '{value}'")

# Make yaml-specified paths easier to read, remove empty subdirs (//)
value = re.sub(r"//+", "/", value)

return value

def generate_config(self) -> None:
Expand Down
28 changes: 23 additions & 5 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,17 @@ def _enhance_command(config, generic_workflow, gwjob, cached_job_values):
"""
_LOG.debug("gwjob given to _enhance_command: %s", gwjob)

curvals = {
"curr_pipetask": gwjob.label,
"curr_cluster": gwjob.label,
"jobName": gwjob.name,
"jobLabel": gwjob.label,
}
for key, value in gwjob.tags.items():
curvals[key] = value

search_opt = {
"curvals": {"curr_pipetask": gwjob.label},
"curvals": curvals,
"replaceVars": False,
"expandEnvVars": False,
"replaceEnvVars": True,
Expand Down Expand Up @@ -251,16 +260,25 @@ def _enhance_command(config, generic_workflow, gwjob, cached_job_values):
for gwfile in generic_workflow.get_job_outputs(gwjob.name):
gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>")

# Replace wms variables with wms placeholders.
gwjob.arguments = re.sub(
r"{wms([^}]+)}", lambda x: f"<WMS:{x[1][0].lower() + x[1][1:]}>", gwjob.arguments
)

# To make yaml-specified paths easier to read, remove empty subdirs (//)
gwjob.arguments = re.sub(r"//+", "/", gwjob.arguments)

# Save dict of other values needed to complete command line.
# (Be careful to not replace env variables as they may
# be different in compute job.)
search_opt["replaceVars"] = True

for key in re.findall(r"{([^}]+)}", gwjob.arguments):
if key not in gwjob.cmdvals:
if key not in cached_job_values[gwjob.label]:
_, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt)
if key in gwjob.cmdvals:
continue
elif key in cached_job_values[gwjob.label]:
gwjob.cmdvals[key] = cached_job_values[gwjob.label][key]
else:
_, gwjob.cmdvals[key] = config.search(key, opt=search_opt)

# backwards compatibility
if not cached_job_values[gwjob.label]["useLazyCommands"]:
Expand Down
63 changes: 62 additions & 1 deletion tests/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@

from cqg_test_utils import make_test_clustered_quantum_graph

from lsst.ctrl.bps import BPS_SEARCH_ORDER, BpsConfig, GenericWorkflowJob
from lsst.ctrl.bps import (
BPS_SEARCH_ORDER,
BpsConfig,
GenericWorkflow,
GenericWorkflowExec,
GenericWorkflowJob,
)
from lsst.ctrl.bps.transform import (
_enhance_command,
_get_job_values,
create_final_command,
create_generic_workflow,
Expand Down Expand Up @@ -335,5 +342,59 @@ def testSkipCommandUsingWhiteSpace(self):
)


class TestEnhanceCommand(unittest.TestCase):
"""Tests of _enhance_command function."""

def setUp(self):
self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask")
self.config = BpsConfig(
{
# "profile": {},
"bpsUseShared": True,
"whenSaveJobQgraph": "NEVER",
"useLazyCommands": True,
# "memoryLimit": 32768,
"defOpts": "--long-log --log-file {submitPath}/{jobName}.{wmsAttemptNum}.json",
"submitPath": "/the/path",
}
)
self.cached_vals = {
"label1": {
"profile": {},
"bpsUseShared": True,
"whenSaveJobQgraph": "NEVER",
"useLazyCommands": True,
"memoryLimit": 32768,
"key1": "val1",
}
}

def testAttemptNum(self):
# test both in arguments as well as in variables in arguments
gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
gw = GenericWorkflow("test1")
gw.add_job(gwjob)

first_args = "{defOpts} run-qbb repo test.qg --summary {submitPath}/{jobName}-summary."
gwjob.arguments = first_args + "{wmsAttemptNum}.json"

new_arguments = first_args + "<WMS:attemptNum>.json"
new_opts = "--long-log --log-file /the/path/job1.<WMS:attemptNum>.json"

_enhance_command(self.config, gw, gwjob, {})

self.assertEqual(gwjob.arguments, new_arguments)
self.assertEqual(gwjob.cmdvals["defOpts"], new_opts)

def testKeyCachedCmdVal(self):
gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
gw = GenericWorkflow("test1")
gw.add_job(gwjob)
gwjob.arguments = "run-qbb repo test.qg -x {key1}"
self.assertNotIn("key1", gwjob.cmdvals)
_enhance_command(self.config, gw, gwjob, self.cached_vals)
self.assertEqual(gwjob.cmdvals["key1"], "val1")


if __name__ == "__main__":
unittest.main()
Loading