Coverage for tests / test_transform.py: 18%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""Unit tests of transform.py."""
29import dataclasses
30import os
31import shutil
32import tempfile
33import unittest
35from cqg_test_utils import make_test_clustered_quantum_graph
37from lsst.ctrl.bps import (
38 BPS_SEARCH_ORDER,
39 BpsConfig,
40 GenericWorkflow,
41 GenericWorkflowExec,
42 GenericWorkflowJob,
43)
44from lsst.ctrl.bps.transform import (
45 _enhance_command,
46 _get_job_values,
47 create_final_command,
48 create_generic_workflow,
49 create_generic_workflow_config,
50)
52TESTDIR = os.path.abspath(os.path.dirname(__file__))
55class TestCreateGenericWorkflowConfig(unittest.TestCase):
56 """Tests of create_generic_workflow_config."""
58 def testCreate(self):
59 """Test successful creation of the config."""
60 config = BpsConfig({"a": 1, "b": 2, "uniqProcName": "testCreate"})
61 wf_config = create_generic_workflow_config(config, "/test/create/prefix")
62 self.assertIsInstance(wf_config, BpsConfig)
63 for key in config:
64 self.assertEqual(wf_config[key], config[key])
65 self.assertEqual(wf_config["workflowName"], "testCreate")
66 self.assertEqual(wf_config["workflowPath"], "/test/create/prefix")
69class TestCreateGenericWorkflow(unittest.TestCase):
70 """Tests of create_generic_workflow."""
72 def setUp(self):
73 self.tmpdir = tempfile.mkdtemp(dir=TESTDIR)
74 self.config = BpsConfig(
75 {
76 "runInit": True,
77 "computeSite": "global",
78 "runQuantumCommand": "gexe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}",
79 "clusterTemplate": "{D1}_{D2}",
80 "cluster": {
81 "cl1": {"pipetasks": "T1, T2", "dimensions": "D1, D2"},
82 "cl2": {"pipetasks": "T3, T4", "dimensions": "D1, D2"},
83 },
84 "cloud": {
85 "cloud1": {"runQuantumCommand": "c1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
86 "cloud2": {"runQuantumCommand": "c2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
87 },
88 "site": {
89 "site1": {"runQuantumCommand": "s1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
90 "site2": {"runQuantumCommand": "s2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
91 "global": {"runQuantumCommand": "s3exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
92 },
93 # Needed because transform assumes they exist
94 "whenSaveJobQgraph": "NEVER",
95 "finalJob": {"whenRun": "ALWAYS", "command1": "/usr/bin/env"},
96 },
97 BPS_SEARCH_ORDER,
98 )
99 _, self.cqg = make_test_clustered_quantum_graph(self.tmpdir)
101 def tearDown(self):
102 shutil.rmtree(self.tmpdir, ignore_errors=True)
104 def testCreatingGenericWorkflowGlobal(self):
105 """Test creating a GenericWorkflow with global settings."""
106 config = BpsConfig(self.config)
107 config["computeCloud"] = "cloud1"
108 config["computeSite"] = "site2"
109 config["queue"] = "global_queue"
110 print(config)
111 workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir)
112 for jname in workflow:
113 gwjob = workflow.get_job(jname)
114 print(gwjob)
115 self.assertEqual(gwjob.compute_site, "site2")
116 self.assertEqual(gwjob.compute_cloud, "cloud1")
117 self.assertEqual(gwjob.executable.src_uri, "s2exe")
118 self.assertEqual(gwjob.queue, "global_queue")
119 final = workflow.get_final()
120 self.assertEqual(final.compute_site, "site2")
121 self.assertEqual(final.compute_cloud, "cloud1")
122 self.assertEqual(final.queue, "global_queue")
124 def testCreatingQuantumGraphMixed(self):
125 """Test creating a GenericWorkflow with setting overrides."""
126 config = BpsConfig(self.config)
127 config[".cluster.cl1.computeCloud"] = "cloud2"
128 config[".cluster.cl1.computeSite"] = "notthere"
129 config[".cluster.cl2.computeSite"] = "site1"
130 config[".finalJob.queue"] = "special_final_queue"
131 config[".finalJob.computeSite"] = "special_site"
132 config[".finalJob.computeCloud"] = "special_cloud"
133 workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir)
134 self.assertEqual(len(workflow) - 1, len(self.cqg)) # Don't count pipetaskInit
135 for jname in workflow:
136 gwjob = workflow.get_job(jname)
137 print(gwjob)
138 if jname.startswith("cl1"):
139 self.assertEqual(gwjob.compute_site, "notthere")
140 self.assertEqual(gwjob.compute_cloud, "cloud2")
141 self.assertEqual(gwjob.executable.src_uri, "c2exe")
142 elif jname.startswith("cl2"):
143 self.assertEqual(gwjob.compute_site, "site1")
144 self.assertIsNone(gwjob.compute_cloud)
145 self.assertEqual(gwjob.executable.src_uri, "s1exe")
146 elif jname.startswith("pipetask"):
147 self.assertEqual(gwjob.compute_site, "global")
148 self.assertIsNone(gwjob.compute_cloud)
149 self.assertEqual(gwjob.executable.src_uri, "s3exe")
150 final = workflow.get_final()
151 self.assertEqual(final.compute_site, "special_site")
152 self.assertEqual(final.compute_cloud, "special_cloud")
153 self.assertEqual(final.queue, "special_final_queue")
156class TestGetJobValues(unittest.TestCase):
157 """Tests of _get_job_values."""
159 def setUp(self):
160 self.default_job = GenericWorkflowJob("default_job", "default_label")
162 def testGettingDefaults(self):
163 """Test retrieving default values."""
164 config = BpsConfig({})
165 job_values = _get_job_values(config, {}, None)
166 self.assertTrue(
167 all(
168 getattr(self.default_job, field.name) == job_values[field.name]
169 for field in dataclasses.fields(self.default_job)
170 )
171 )
173 def testEnablingMemoryScaling(self):
174 """Test enabling the memory scaling mechanism."""
175 config = BpsConfig({"memoryMultiplier": 2.0})
176 job_values = _get_job_values(config, {}, None)
177 self.assertAlmostEqual(job_values["memory_multiplier"], 2.0)
178 self.assertEqual(job_values["number_of_retries"], 5)
180 def testDisablingMemoryScaling(self):
181 """Test disabling the memory scaling mechanism."""
182 config = BpsConfig({"memoryMultiplier": 0.5})
183 job_values = _get_job_values(config, {}, None)
184 self.assertIsNone(job_values["memory_multiplier"])
186 def testRetrievingCmdLine(self):
187 """Test retrieving the command line."""
188 cmd_line_key = "runQuantum"
189 config = BpsConfig({cmd_line_key: "/path/to/foo bar.txt"})
190 job_values = _get_job_values(config, {}, cmd_line_key)
191 self.assertEqual(job_values["executable"].name, "foo")
192 self.assertEqual(job_values["executable"].src_uri, "/path/to/foo")
193 self.assertEqual(job_values["arguments"], "bar.txt")
195 def testEnvironment(self):
196 config = BpsConfig(
197 {
198 "var1": "two",
199 "environment": {"TEST_INT": 1, "TEST_BOOL": False, "TEST_SPACES": "one {var1} three"},
200 }
201 )
202 job_values = _get_job_values(config, {}, None)
203 truth = BpsConfig({"TEST_INT": "1", "TEST_BOOL": "False", "TEST_SPACES": "one two three"}, {}, None)
204 self.assertEqual(truth, job_values["environment"])
206 def testEnvironmentOptions(self):
207 config = BpsConfig(
208 {
209 "var1": "two",
210 "environment": {"TEST_INT": 1, "TEST_BOOL": False, "TEST_SPACES": "one {var1} three"},
211 "finalJob": {"requestMemory": 8096, "command1": "/usr/bin/env"},
212 }
213 )
214 search_obj = config["finalJob"]
215 search_opts = {"replaceVars": False, "searchobj": search_obj}
216 job_values = _get_job_values(config, search_opts, None)
217 truth = {"TEST_INT": "1", "TEST_BOOL": "False", "TEST_SPACES": "one two three"}
218 self.assertEqual(truth, job_values["environment"])
219 self.assertEqual(search_opts["replaceVars"], False)
220 self.assertEqual(search_opts["searchobj"]["requestMemory"], 8096)
221 self.assertEqual(job_values["request_memory"], 8096)
224class TestCreateFinalCommand(unittest.TestCase):
225 """Tests for the create_final_command function."""
227 def setUp(self):
228 self.tmpdir = tempfile.TemporaryDirectory()
229 self.script_beginning = [
230 "#!/bin/bash\n",
231 "\n",
232 "set -e\n",
233 "set -x\n",
234 "qgraphFile=$1\n",
235 "butlerConfig=$2\n",
236 ]
238 def tearDown(self):
239 self.tmpdir.cleanup()
241 def testSingleCommand(self):
242 """Test with single final job command."""
243 config_butler = f"{self.tmpdir.name}/test_repo"
244 config = BpsConfig(
245 {
246 "var1": "42a",
247 "var2": "42b",
248 "var3": "42c",
249 "butlerConfig": config_butler,
250 "finalJob": {"command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}"},
251 }
252 )
253 gwf_exec, args = create_final_command(config, self.tmpdir.name)
254 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}")
255 final_script = f"{self.tmpdir.name}/final_job.bash"
256 self.assertEqual(gwf_exec.src_uri, final_script)
257 with open(final_script) as infh:
258 lines = infh.readlines()
259 self.assertEqual(
260 lines, self.script_beginning + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n"]
261 )
263 def testMultipleCommands(self):
264 config_butler = f"{self.tmpdir.name}/test_repo"
265 config = BpsConfig(
266 {
267 "var1": "42a",
268 "var2": "42b",
269 "var3": "42c",
270 "butlerConfig": config_butler,
271 "finalJob": {
272 "command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}",
273 "command2": "/usr/bin/uptime",
274 },
275 }
276 )
277 gwf_exec, args = create_final_command(config, self.tmpdir.name)
278 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}")
279 final_script = f"{self.tmpdir.name}/final_job.bash"
280 self.assertEqual(gwf_exec.src_uri, final_script)
281 with open(final_script) as infh:
282 lines = infh.readlines()
283 self.assertEqual(
284 lines,
285 self.script_beginning
286 + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n", "/usr/bin/uptime\n"],
287 )
289 def testZeroCommands(self):
290 config_butler = f"{self.tmpdir.name}/test_repo"
291 config = BpsConfig(
292 {
293 "var1": "42a",
294 "var2": "42b",
295 "var3": "42c",
296 "butlerConfig": config_butler,
297 "finalJob": {
298 "cmd1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}",
299 "cmd2": "/usr/bin/uptime",
300 },
301 }
302 )
303 with self.assertRaisesRegex(RuntimeError, "finalJob.whenRun"):
304 _, _ = create_final_command(config, self.tmpdir.name)
306 def testWhiteSpaceOnlyCommand(self):
307 config_butler = f"{self.tmpdir.name}/test_repo"
308 config = BpsConfig(
309 {
310 "butlerConfig": config_butler,
311 "finalJob": {"command1": "", "command2": "\t \n"},
312 }
313 )
314 with self.assertRaisesRegex(RuntimeError, "finalJob.whenRun"):
315 _, _ = create_final_command(config, self.tmpdir.name)
317 def testSkipCommandUsingWhiteSpace(self):
318 config_butler = f"{self.tmpdir.name}/test_repo"
319 config = BpsConfig(
320 {
321 "var1": "42a",
322 "var2": "42b",
323 "var3": "42c",
324 "butlerConfig": config_butler,
325 "finalJob": {
326 "command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}",
327 "command2": "", # test skipping a command (i.e., overriding a default)
328 "command3": "/usr/bin/uptime",
329 },
330 }
331 )
332 gwf_exec, args = create_final_command(config, self.tmpdir.name)
333 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}")
334 final_script = f"{self.tmpdir.name}/final_job.bash"
335 self.assertEqual(gwf_exec.src_uri, final_script)
336 with open(final_script) as infh:
337 lines = infh.readlines()
338 self.assertEqual(
339 lines,
340 self.script_beginning
341 + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n", "\n", "/usr/bin/uptime\n"],
342 )
345class TestEnhanceCommand(unittest.TestCase):
346 """Tests of _enhance_command function."""
348 def setUp(self):
349 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask")
350 self.config = BpsConfig(
351 {
352 # "profile": {},
353 "bpsUseShared": True,
354 "whenSaveJobQgraph": "NEVER",
355 "useLazyCommands": True,
356 # "memoryLimit": 32768,
357 "defOpts": "--long-log --log-file {submitPath}/{jobName}.{wmsAttemptNum}.json",
358 "submitPath": "/the/path",
359 }
360 )
361 self.cached_vals = {
362 "label1": {
363 "profile": {},
364 "bpsUseShared": True,
365 "whenSaveJobQgraph": "NEVER",
366 "useLazyCommands": True,
367 "memoryLimit": 32768,
368 "key1": "val1",
369 }
370 }
372 def testAttemptNum(self):
373 # test both in arguments as well as in variables in arguments
374 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
375 gw = GenericWorkflow("test1")
376 gw.add_job(gwjob)
378 first_args = "{defOpts} run-qbb repo test.qg --summary {submitPath}/{jobName}-summary."
379 gwjob.arguments = first_args + "{wmsAttemptNum}.json"
381 new_arguments = first_args + "<WMS:attemptNum>.json"
382 new_opts = "--long-log --log-file /the/path/job1.<WMS:attemptNum>.json"
384 _enhance_command(self.config, gw, gwjob, {})
386 self.assertEqual(gwjob.arguments, new_arguments)
387 self.assertEqual(gwjob.cmdvals["defOpts"], new_opts)
389 def testKeyCachedCmdVal(self):
390 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
391 gw = GenericWorkflow("test1")
392 gw.add_job(gwjob)
393 gwjob.arguments = "run-qbb repo test.qg -x {key1}"
394 self.assertNotIn("key1", gwjob.cmdvals)
395 _enhance_command(self.config, gw, gwjob, self.cached_vals)
396 self.assertEqual(gwjob.cmdvals["key1"], "val1")
398 def testS3Argument(self):
399 """Make sure s3 double slashes are not getting removed."""
400 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
401 gw = GenericWorkflow("test1")
402 gw.add_job(gwjob)
403 s3 = "s3://user1@rubin-place-users/butler-pipeline1-processing.yaml"
404 gwjob.arguments = s3
405 _enhance_command(self.config, gw, gwjob, {})
406 self.assertEqual(gwjob.arguments, s3)
409if __name__ == "__main__":
410 unittest.main()