diff --git a/doc/changes/DM-53292.feature.rst b/doc/changes/DM-53292.feature.rst new file mode 100644 index 00000000..4502d610 --- /dev/null +++ b/doc/changes/DM-53292.feature.rst @@ -0,0 +1 @@ +Implemented `wmsAttemptNum` with generic wms variable placeholders. diff --git a/doc/lsst.ctrl.bps/quickstart.rst b/doc/lsst.ctrl.bps/quickstart.rst index 422f0de2..46c984cc 100644 --- a/doc/lsst.ctrl.bps/quickstart.rst +++ b/doc/lsst.ctrl.bps/quickstart.rst @@ -891,6 +891,11 @@ Supported settings The parent directory for the WMS-id link. Defaults to ``${PWD}/bps_links``. (see :ref:`bps-softlink`) +**wmsAttemptNum** + A placeholder for the WMS-specific job attempt number (0-based) that can be + used in ``runQuantumCommand`` lines or ``environment`` sections. Check + WMS-specific documentation to see if particular WMS plugin supports it. + .. _pickle: https://docs.python.org/3/library/pickle.html .. _ref: https://stackoverflow.com/a/38971446 diff --git a/python/lsst/ctrl/bps/bps_config.py b/python/lsst/ctrl/bps/bps_config.py index 9ada3504..98e42501 100644 --- a/python/lsst/ctrl/bps/bps_config.py +++ b/python/lsst/ctrl/bps/bps_config.py @@ -371,8 +371,10 @@ def search(self, key, opt=None): value = re.sub(r"]+)>", r"$\1", value) value = expandvars(value) elif opt.get("replaceEnvVars", False): - value = re.sub(r"\${([^}]+)}", r"", value) - value = re.sub(r"\$(\S+)", r"", value) + # Don't replace double dollar signs or $( to allow + # pass-through to WMS + value = re.sub(r"\${([^$(}]+)}", r"", value) + value = re.sub(r"\$([^$(}]+)", r"", value) if opt.get("replaceVars", True): value = self.replace_vars(value, opt) @@ -409,6 +411,9 @@ def replace_vars(self, value: str, opt: dict[str, Any]) -> str: for name in opt.get("skipNames", {}): value = value.replace(f"{{{name}}}", f"") + # Replace special keys for WMS to fill in. + value = re.sub(r"{wms([^}]+)}", lambda x: f"", value) + value = self.formatter.format(value, self, opt) # Replace any temporary place holders. @@ -426,6 +431,9 @@ def replace_vars(self, value: str, opt: dict[str, Any]) -> str: if "bpsEval" in value: raise ValueError(f"Unparsable bpsEval in '{value}'") + # Make yaml-specified paths easier to read, remove empty subdirs (//) + value = re.sub(r"//+", "/", value) + return value def generate_config(self) -> None: diff --git a/python/lsst/ctrl/bps/transform.py b/python/lsst/ctrl/bps/transform.py index a57a6ac0..8ad3e3bf 100644 --- a/python/lsst/ctrl/bps/transform.py +++ b/python/lsst/ctrl/bps/transform.py @@ -216,8 +216,17 @@ def _enhance_command(config, generic_workflow, gwjob, cached_job_values): """ _LOG.debug("gwjob given to _enhance_command: %s", gwjob) + curvals = { + "curr_pipetask": gwjob.label, + "curr_cluster": gwjob.label, + "jobName": gwjob.name, + "jobLabel": gwjob.label, + } + for key, value in gwjob.tags.items(): + curvals[key] = value + search_opt = { - "curvals": {"curr_pipetask": gwjob.label}, + "curvals": curvals, "replaceVars": False, "expandEnvVars": False, "replaceEnvVars": True, @@ -251,16 +260,25 @@ def _enhance_command(config, generic_workflow, gwjob, cached_job_values): for gwfile in generic_workflow.get_job_outputs(gwjob.name): gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"") + # Replace wms variables with wms placeholders. + gwjob.arguments = re.sub( + r"{wms([^}]+)}", lambda x: f"", gwjob.arguments + ) + + # To make yaml-specified paths easier to read, remove empty subdirs (//) + gwjob.arguments = re.sub(r"//+", "/", gwjob.arguments) + # Save dict of other values needed to complete command line. # (Be careful to not replace env variables as they may # be different in compute job.) search_opt["replaceVars"] = True - for key in re.findall(r"{([^}]+)}", gwjob.arguments): - if key not in gwjob.cmdvals: - if key not in cached_job_values[gwjob.label]: - _, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt) + if key in gwjob.cmdvals: + continue + elif key in cached_job_values[gwjob.label]: gwjob.cmdvals[key] = cached_job_values[gwjob.label][key] + else: + _, gwjob.cmdvals[key] = config.search(key, opt=search_opt) # backwards compatibility if not cached_job_values[gwjob.label]["useLazyCommands"]: diff --git a/tests/test_transform.py b/tests/test_transform.py index e998618a..f62e5903 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -34,8 +34,15 @@ from cqg_test_utils import make_test_clustered_quantum_graph -from lsst.ctrl.bps import BPS_SEARCH_ORDER, BpsConfig, GenericWorkflowJob +from lsst.ctrl.bps import ( + BPS_SEARCH_ORDER, + BpsConfig, + GenericWorkflow, + GenericWorkflowExec, + GenericWorkflowJob, +) from lsst.ctrl.bps.transform import ( + _enhance_command, _get_job_values, create_final_command, create_generic_workflow, @@ -335,5 +342,59 @@ def testSkipCommandUsingWhiteSpace(self): ) +class TestEnhanceCommand(unittest.TestCase): + """Tests of _enhance_command function.""" + + def setUp(self): + self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask") + self.config = BpsConfig( + { + # "profile": {}, + "bpsUseShared": True, + "whenSaveJobQgraph": "NEVER", + "useLazyCommands": True, + # "memoryLimit": 32768, + "defOpts": "--long-log --log-file {submitPath}/{jobName}.{wmsAttemptNum}.json", + "submitPath": "/the/path", + } + ) + self.cached_vals = { + "label1": { + "profile": {}, + "bpsUseShared": True, + "whenSaveJobQgraph": "NEVER", + "useLazyCommands": True, + "memoryLimit": 32768, + "key1": "val1", + } + } + + def testAttemptNum(self): + # test both in arguments as well as in variables in arguments + gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) + gw = GenericWorkflow("test1") + gw.add_job(gwjob) + + first_args = "{defOpts} run-qbb repo test.qg --summary {submitPath}/{jobName}-summary." + gwjob.arguments = first_args + "{wmsAttemptNum}.json" + + new_arguments = first_args + ".json" + new_opts = "--long-log --log-file /the/path/job1..json" + + _enhance_command(self.config, gw, gwjob, {}) + + self.assertEqual(gwjob.arguments, new_arguments) + self.assertEqual(gwjob.cmdvals["defOpts"], new_opts) + + def testKeyCachedCmdVal(self): + gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) + gw = GenericWorkflow("test1") + gw.add_job(gwjob) + gwjob.arguments = "run-qbb repo test.qg -x {key1}" + self.assertNotIn("key1", gwjob.cmdvals) + _enhance_command(self.config, gw, gwjob, self.cached_vals) + self.assertEqual(gwjob.cmdvals["key1"], "val1") + + if __name__ == "__main__": unittest.main()