Coverage for tests / test_transform.py: 18%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:53 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27"""Unit tests of transform.py.""" 

28 

29import dataclasses 

30import os 

31import shutil 

32import tempfile 

33import unittest 

34 

35from cqg_test_utils import make_test_clustered_quantum_graph 

36 

37from lsst.ctrl.bps import ( 

38 BPS_SEARCH_ORDER, 

39 BpsConfig, 

40 GenericWorkflow, 

41 GenericWorkflowExec, 

42 GenericWorkflowJob, 

43) 

44from lsst.ctrl.bps.transform import ( 

45 _enhance_command, 

46 _get_job_values, 

47 create_final_command, 

48 create_generic_workflow, 

49 create_generic_workflow_config, 

50) 

51 

52TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

53 

54 

55class TestCreateGenericWorkflowConfig(unittest.TestCase): 

56 """Tests of create_generic_workflow_config.""" 

57 

58 def testCreate(self): 

59 """Test successful creation of the config.""" 

60 config = BpsConfig({"a": 1, "b": 2, "uniqProcName": "testCreate"}) 

61 wf_config = create_generic_workflow_config(config, "/test/create/prefix") 

62 self.assertIsInstance(wf_config, BpsConfig) 

63 for key in config: 

64 self.assertEqual(wf_config[key], config[key]) 

65 self.assertEqual(wf_config["workflowName"], "testCreate") 

66 self.assertEqual(wf_config["workflowPath"], "/test/create/prefix") 

67 

68 

69class TestCreateGenericWorkflow(unittest.TestCase): 

70 """Tests of create_generic_workflow.""" 

71 

72 def setUp(self): 

73 self.tmpdir = tempfile.mkdtemp(dir=TESTDIR) 

74 self.config = BpsConfig( 

75 { 

76 "runInit": True, 

77 "computeSite": "global", 

78 "runQuantumCommand": "gexe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}", 

79 "clusterTemplate": "{D1}_{D2}", 

80 "cluster": { 

81 "cl1": {"pipetasks": "T1, T2", "dimensions": "D1, D2"}, 

82 "cl2": {"pipetasks": "T3, T4", "dimensions": "D1, D2"}, 

83 }, 

84 "cloud": { 

85 "cloud1": {"runQuantumCommand": "c1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"}, 

86 "cloud2": {"runQuantumCommand": "c2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"}, 

87 }, 

88 "site": { 

89 "site1": {"runQuantumCommand": "s1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"}, 

90 "site2": {"runQuantumCommand": "s2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"}, 

91 "global": {"runQuantumCommand": "s3exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"}, 

92 }, 

93 # Needed because transform assumes they exist 

94 "whenSaveJobQgraph": "NEVER", 

95 "finalJob": {"whenRun": "ALWAYS", "command1": "/usr/bin/env"}, 

96 }, 

97 BPS_SEARCH_ORDER, 

98 ) 

99 _, self.cqg = make_test_clustered_quantum_graph(self.tmpdir) 

100 

101 def tearDown(self): 

102 shutil.rmtree(self.tmpdir, ignore_errors=True) 

103 

104 def testCreatingGenericWorkflowGlobal(self): 

105 """Test creating a GenericWorkflow with global settings.""" 

106 config = BpsConfig(self.config) 

107 config["computeCloud"] = "cloud1" 

108 config["computeSite"] = "site2" 

109 config["queue"] = "global_queue" 

110 print(config) 

111 workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir) 

112 for jname in workflow: 

113 gwjob = workflow.get_job(jname) 

114 print(gwjob) 

115 self.assertEqual(gwjob.compute_site, "site2") 

116 self.assertEqual(gwjob.compute_cloud, "cloud1") 

117 self.assertEqual(gwjob.executable.src_uri, "s2exe") 

118 self.assertEqual(gwjob.queue, "global_queue") 

119 final = workflow.get_final() 

120 self.assertEqual(final.compute_site, "site2") 

121 self.assertEqual(final.compute_cloud, "cloud1") 

122 self.assertEqual(final.queue, "global_queue") 

123 

124 def testCreatingQuantumGraphMixed(self): 

125 """Test creating a GenericWorkflow with setting overrides.""" 

126 config = BpsConfig(self.config) 

127 config[".cluster.cl1.computeCloud"] = "cloud2" 

128 config[".cluster.cl1.computeSite"] = "notthere" 

129 config[".cluster.cl2.computeSite"] = "site1" 

130 config[".finalJob.queue"] = "special_final_queue" 

131 config[".finalJob.computeSite"] = "special_site" 

132 config[".finalJob.computeCloud"] = "special_cloud" 

133 workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir) 

134 self.assertEqual(len(workflow) - 1, len(self.cqg)) # Don't count pipetaskInit 

135 for jname in workflow: 

136 gwjob = workflow.get_job(jname) 

137 print(gwjob) 

138 if jname.startswith("cl1"): 

139 self.assertEqual(gwjob.compute_site, "notthere") 

140 self.assertEqual(gwjob.compute_cloud, "cloud2") 

141 self.assertEqual(gwjob.executable.src_uri, "c2exe") 

142 elif jname.startswith("cl2"): 

143 self.assertEqual(gwjob.compute_site, "site1") 

144 self.assertIsNone(gwjob.compute_cloud) 

145 self.assertEqual(gwjob.executable.src_uri, "s1exe") 

146 elif jname.startswith("pipetask"): 

147 self.assertEqual(gwjob.compute_site, "global") 

148 self.assertIsNone(gwjob.compute_cloud) 

149 self.assertEqual(gwjob.executable.src_uri, "s3exe") 

150 final = workflow.get_final() 

151 self.assertEqual(final.compute_site, "special_site") 

152 self.assertEqual(final.compute_cloud, "special_cloud") 

153 self.assertEqual(final.queue, "special_final_queue") 

154 

155 

156class TestGetJobValues(unittest.TestCase): 

157 """Tests of _get_job_values.""" 

158 

159 def setUp(self): 

160 self.default_job = GenericWorkflowJob("default_job", "default_label") 

161 

162 def testGettingDefaults(self): 

163 """Test retrieving default values.""" 

164 config = BpsConfig({}) 

165 job_values = _get_job_values(config, {}, None) 

166 self.assertTrue( 

167 all( 

168 getattr(self.default_job, field.name) == job_values[field.name] 

169 for field in dataclasses.fields(self.default_job) 

170 ) 

171 ) 

172 

173 def testEnablingMemoryScaling(self): 

174 """Test enabling the memory scaling mechanism.""" 

175 config = BpsConfig({"memoryMultiplier": 2.0}) 

176 job_values = _get_job_values(config, {}, None) 

177 self.assertAlmostEqual(job_values["memory_multiplier"], 2.0) 

178 self.assertEqual(job_values["number_of_retries"], 5) 

179 

180 def testDisablingMemoryScaling(self): 

181 """Test disabling the memory scaling mechanism.""" 

182 config = BpsConfig({"memoryMultiplier": 0.5}) 

183 job_values = _get_job_values(config, {}, None) 

184 self.assertIsNone(job_values["memory_multiplier"]) 

185 

186 def testRetrievingCmdLine(self): 

187 """Test retrieving the command line.""" 

188 cmd_line_key = "runQuantum" 

189 config = BpsConfig({cmd_line_key: "/path/to/foo bar.txt"}) 

190 job_values = _get_job_values(config, {}, cmd_line_key) 

191 self.assertEqual(job_values["executable"].name, "foo") 

192 self.assertEqual(job_values["executable"].src_uri, "/path/to/foo") 

193 self.assertEqual(job_values["arguments"], "bar.txt") 

194 

195 def testEnvironment(self): 

196 config = BpsConfig( 

197 { 

198 "var1": "two", 

199 "environment": {"TEST_INT": 1, "TEST_BOOL": False, "TEST_SPACES": "one {var1} three"}, 

200 } 

201 ) 

202 job_values = _get_job_values(config, {}, None) 

203 truth = BpsConfig({"TEST_INT": "1", "TEST_BOOL": "False", "TEST_SPACES": "one two three"}, {}, None) 

204 self.assertEqual(truth, job_values["environment"]) 

205 

206 def testEnvironmentOptions(self): 

207 config = BpsConfig( 

208 { 

209 "var1": "two", 

210 "environment": {"TEST_INT": 1, "TEST_BOOL": False, "TEST_SPACES": "one {var1} three"}, 

211 "finalJob": {"requestMemory": 8096, "command1": "/usr/bin/env"}, 

212 } 

213 ) 

214 search_obj = config["finalJob"] 

215 search_opts = {"replaceVars": False, "searchobj": search_obj} 

216 job_values = _get_job_values(config, search_opts, None) 

217 truth = {"TEST_INT": "1", "TEST_BOOL": "False", "TEST_SPACES": "one two three"} 

218 self.assertEqual(truth, job_values["environment"]) 

219 self.assertEqual(search_opts["replaceVars"], False) 

220 self.assertEqual(search_opts["searchobj"]["requestMemory"], 8096) 

221 self.assertEqual(job_values["request_memory"], 8096) 

222 

223 

224class TestCreateFinalCommand(unittest.TestCase): 

225 """Tests for the create_final_command function.""" 

226 

227 def setUp(self): 

228 self.tmpdir = tempfile.TemporaryDirectory() 

229 self.script_beginning = [ 

230 "#!/bin/bash\n", 

231 "\n", 

232 "set -e\n", 

233 "set -x\n", 

234 "qgraphFile=$1\n", 

235 "butlerConfig=$2\n", 

236 ] 

237 

238 def tearDown(self): 

239 self.tmpdir.cleanup() 

240 

241 def testSingleCommand(self): 

242 """Test with single final job command.""" 

243 config_butler = f"{self.tmpdir.name}/test_repo" 

244 config = BpsConfig( 

245 { 

246 "var1": "42a", 

247 "var2": "42b", 

248 "var3": "42c", 

249 "butlerConfig": config_butler, 

250 "finalJob": {"command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}"}, 

251 } 

252 ) 

253 gwf_exec, args = create_final_command(config, self.tmpdir.name) 

254 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}") 

255 final_script = f"{self.tmpdir.name}/final_job.bash" 

256 self.assertEqual(gwf_exec.src_uri, final_script) 

257 with open(final_script) as infh: 

258 lines = infh.readlines() 

259 self.assertEqual( 

260 lines, self.script_beginning + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n"] 

261 ) 

262 

263 def testMultipleCommands(self): 

264 config_butler = f"{self.tmpdir.name}/test_repo" 

265 config = BpsConfig( 

266 { 

267 "var1": "42a", 

268 "var2": "42b", 

269 "var3": "42c", 

270 "butlerConfig": config_butler, 

271 "finalJob": { 

272 "command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}", 

273 "command2": "/usr/bin/uptime", 

274 }, 

275 } 

276 ) 

277 gwf_exec, args = create_final_command(config, self.tmpdir.name) 

278 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}") 

279 final_script = f"{self.tmpdir.name}/final_job.bash" 

280 self.assertEqual(gwf_exec.src_uri, final_script) 

281 with open(final_script) as infh: 

282 lines = infh.readlines() 

283 self.assertEqual( 

284 lines, 

285 self.script_beginning 

286 + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n", "/usr/bin/uptime\n"], 

287 ) 

288 

289 def testZeroCommands(self): 

290 config_butler = f"{self.tmpdir.name}/test_repo" 

291 config = BpsConfig( 

292 { 

293 "var1": "42a", 

294 "var2": "42b", 

295 "var3": "42c", 

296 "butlerConfig": config_butler, 

297 "finalJob": { 

298 "cmd1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}", 

299 "cmd2": "/usr/bin/uptime", 

300 }, 

301 } 

302 ) 

303 with self.assertRaisesRegex(RuntimeError, "finalJob.whenRun"): 

304 _, _ = create_final_command(config, self.tmpdir.name) 

305 

306 def testWhiteSpaceOnlyCommand(self): 

307 config_butler = f"{self.tmpdir.name}/test_repo" 

308 config = BpsConfig( 

309 { 

310 "butlerConfig": config_butler, 

311 "finalJob": {"command1": "", "command2": "\t \n"}, 

312 } 

313 ) 

314 with self.assertRaisesRegex(RuntimeError, "finalJob.whenRun"): 

315 _, _ = create_final_command(config, self.tmpdir.name) 

316 

317 def testSkipCommandUsingWhiteSpace(self): 

318 config_butler = f"{self.tmpdir.name}/test_repo" 

319 config = BpsConfig( 

320 { 

321 "var1": "42a", 

322 "var2": "42b", 

323 "var3": "42c", 

324 "butlerConfig": config_butler, 

325 "finalJob": { 

326 "command1": "/usr/bin/echo {var1} {qgraphFile} {var2} {butlerConfig} {var3}", 

327 "command2": "", # test skipping a command (i.e., overriding a default) 

328 "command3": "/usr/bin/uptime", 

329 }, 

330 } 

331 ) 

332 gwf_exec, args = create_final_command(config, self.tmpdir.name) 

333 self.assertEqual(args, f"<FILE:runQgraphFile> {config_butler}") 

334 final_script = f"{self.tmpdir.name}/final_job.bash" 

335 self.assertEqual(gwf_exec.src_uri, final_script) 

336 with open(final_script) as infh: 

337 lines = infh.readlines() 

338 self.assertEqual( 

339 lines, 

340 self.script_beginning 

341 + ["/usr/bin/echo 42a ${qgraphFile} 42b ${butlerConfig} 42c\n", "\n", "/usr/bin/uptime\n"], 

342 ) 

343 

344 

345class TestEnhanceCommand(unittest.TestCase): 

346 """Tests of _enhance_command function.""" 

347 

348 def setUp(self): 

349 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask") 

350 self.config = BpsConfig( 

351 { 

352 # "profile": {}, 

353 "bpsUseShared": True, 

354 "whenSaveJobQgraph": "NEVER", 

355 "useLazyCommands": True, 

356 # "memoryLimit": 32768, 

357 "defOpts": "--long-log --log-file {submitPath}/{jobName}.{wmsAttemptNum}.json", 

358 "submitPath": "/the/path", 

359 } 

360 ) 

361 self.cached_vals = { 

362 "label1": { 

363 "profile": {}, 

364 "bpsUseShared": True, 

365 "whenSaveJobQgraph": "NEVER", 

366 "useLazyCommands": True, 

367 "memoryLimit": 32768, 

368 "key1": "val1", 

369 } 

370 } 

371 

372 def testAttemptNum(self): 

373 # test both in arguments as well as in variables in arguments 

374 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) 

375 gw = GenericWorkflow("test1") 

376 gw.add_job(gwjob) 

377 

378 first_args = "{defOpts} run-qbb repo test.qg --summary {submitPath}/{jobName}-summary." 

379 gwjob.arguments = first_args + "{wmsAttemptNum}.json" 

380 

381 new_arguments = first_args + "<WMS:attemptNum>.json" 

382 new_opts = "--long-log --log-file /the/path/job1.<WMS:attemptNum>.json" 

383 

384 _enhance_command(self.config, gw, gwjob, {}) 

385 

386 self.assertEqual(gwjob.arguments, new_arguments) 

387 self.assertEqual(gwjob.cmdvals["defOpts"], new_opts) 

388 

389 def testKeyCachedCmdVal(self): 

390 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) 

391 gw = GenericWorkflow("test1") 

392 gw.add_job(gwjob) 

393 gwjob.arguments = "run-qbb repo test.qg -x {key1}" 

394 self.assertNotIn("key1", gwjob.cmdvals) 

395 _enhance_command(self.config, gw, gwjob, self.cached_vals) 

396 self.assertEqual(gwjob.cmdvals["key1"], "val1") 

397 

398 def testS3Argument(self): 

399 """Make sure s3 double slashes are not getting removed.""" 

400 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) 

401 gw = GenericWorkflow("test1") 

402 gw.add_job(gwjob) 

403 s3 = "s3://user1@rubin-place-users/butler-pipeline1-processing.yaml" 

404 gwjob.arguments = s3 

405 _enhance_command(self.config, gw, gwjob, {}) 

406 self.assertEqual(gwjob.arguments, s3) 

407 

408 

409if __name__ == "__main__": 

410 unittest.main()