Coverage for tests / test_prepare_utils.py: 24%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:53 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for prepare utility functions.""" 

29 

30import logging 

31import os 

32import unittest 

33 

34from lsst.ctrl.bps import ( 

35 BPS_DEFAULTS, 

36 BPS_SEARCH_ORDER, 

37 BpsConfig, 

38 GenericWorkflow, 

39 GenericWorkflowExec, 

40 GenericWorkflowFile, 

41 GenericWorkflowJob, 

42) 

43from lsst.ctrl.bps.htcondor import prepare_utils 

44from lsst.ctrl.bps.tests.gw_test_utils import make_3_label_workflow, make_3_label_workflow_groups_sort 

45 

46logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

47 

48TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

49 

50 

51class TranslateJobCmdsTestCase(unittest.TestCase): 

52 """Test _translate_job_cmds method.""" 

53 

54 def setUp(self): 

55 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask") 

56 self.cached_vals = {"profile": {}, "bpsUseShared": True, "memoryLimit": 32768} 

57 

58 def testRetryUnlessNone(self): 

59 gwjob = GenericWorkflowJob("retryUnless", "label1", executable=self.gw_exec) 

60 gwjob.retry_unless_exit = None 

61 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

62 self.assertNotIn("retry_until", htc_commands) 

63 

64 def testRetryUnlessInt(self): 

65 gwjob = GenericWorkflowJob("retryUnlessInt", "label1", executable=self.gw_exec) 

66 gwjob.retry_unless_exit = 3 

67 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

68 self.assertEqual(int(htc_commands["retry_until"]), gwjob.retry_unless_exit) 

69 

70 def testRetryUnlessList(self): 

71 gwjob = GenericWorkflowJob("retryUnlessList", "label1", executable=self.gw_exec) 

72 gwjob.retry_unless_exit = [1, 2] 

73 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

74 self.assertEqual(htc_commands["retry_until"], "member(ExitCode, {1,2})") 

75 

76 def testRetryUnlessBad(self): 

77 gwjob = GenericWorkflowJob("retryUnlessBad", "label1", executable=self.gw_exec) 

78 gwjob.retry_unless_exit = "1,2,3" 

79 with self.assertRaises(ValueError) as cm: 

80 _ = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

81 self.assertIn("retryUnlessExit", str(cm.exception)) 

82 

83 def testEnvironmentBasic(self): 

84 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec) 

85 gwjob.environment = {"TEST_INT": 1, "TEST_STR": "TWO"} 

86 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

87 self.assertEqual(htc_commands["environment"], "TEST_INT=1 TEST_STR='TWO'") 

88 

89 def testEnvironmentSpaces(self): 

90 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec) 

91 gwjob.environment = {"TEST_SPACES": "spacey value"} 

92 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

93 self.assertEqual(htc_commands["environment"], "TEST_SPACES='spacey value'") 

94 

95 def testEnvironmentSingleQuotes(self): 

96 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec) 

97 gwjob.environment = {"TEST_SINGLE_QUOTES": "spacey 'quoted' value"} 

98 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

99 self.assertEqual(htc_commands["environment"], "TEST_SINGLE_QUOTES='spacey ''quoted'' value'") 

100 

101 def testEnvironmentDoubleQuotes(self): 

102 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec) 

103 gwjob.environment = {"TEST_DOUBLE_QUOTES": 'spacey "double" value'} 

104 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

105 self.assertEqual(htc_commands["environment"], """TEST_DOUBLE_QUOTES='spacey ""double"" value'""") 

106 

107 def testEnvironmentWithEnvVars(self): 

108 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec) 

109 gwjob.environment = {"TEST_ENV_VAR": "<ENV:CTRL_BPS_DIR>/tests"} 

110 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

111 self.assertEqual(htc_commands["environment"], "TEST_ENV_VAR='$ENV(CTRL_BPS_DIR)/tests'") 

112 

113 def testPeriodicRelease(self): 

114 gwjob = GenericWorkflowJob("periodicRelease", "label1", executable=self.gw_exec) 

115 gwjob.request_memory = 2048 

116 gwjob.memory_multiplier = 2 

117 gwjob.number_of_retries = 3 

118 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

119 release = ( 

120 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && " 

121 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || " 

122 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && " 

123 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768" 

124 ) 

125 self.assertEqual(htc_commands["periodic_release"], release) 

126 

127 def testPeriodicRemoveNoRetries(self): 

128 gwjob = GenericWorkflowJob("periodicRelease", "label1", executable=self.gw_exec) 

129 gwjob.request_memory = 2048 

130 gwjob.memory_multiplier = 1 

131 gwjob.number_of_retries = 0 

132 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

133 remove = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)" 

134 self.assertEqual(htc_commands["periodic_remove"], remove) 

135 self.assertEqual(htc_commands["max_retries"], 0) 

136 

137 def testProfileJobCommands(self): 

138 requirement_str = 'Machine == "node01.cluster.local"' 

139 gwjob = GenericWorkflowJob("requirements", "label1", executable=self.gw_exec) 

140 gwjob.request_memory = 2048 

141 gwjob.memory_multiplier = 1 

142 gwjob.number_of_retries = 0 

143 gwjob.profile = {"requirements": requirement_str} 

144 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob) 

145 self.assertEqual(htc_commands["requirements"], requirement_str) 

146 

147 def testProfileCached(self): 

148 requirement_str = 'Machine == "node01.cluster.local"' 

149 gwjob = GenericWorkflowJob("requirements", "label1", executable=self.gw_exec) 

150 gwjob.request_memory = 2048 

151 gwjob.memory_multiplier = 1 

152 gwjob.number_of_retries = 0 

153 cached_vals = dict(self.cached_vals) 

154 cached_vals["profile"] = {"requirements": requirement_str} 

155 htc_commands = prepare_utils._translate_job_cmds(cached_vals, None, gwjob) 

156 self.assertEqual(htc_commands["requirements"], requirement_str) 

157 

158 def testArgumentsReplaceWmsVars(self): 

159 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec) 

160 gw = GenericWorkflow("test1") 

161 gw.add_job(gwjob) 

162 gwjob.request_cpus = 1 

163 gwjob.request_memory = 2048 

164 gwjob.arguments = "run-qbb repo test.qg --summary /a/b/t/jobs/c/d/job-<WMS:attemptNum>-summary.json" 

165 new_arguments = "run-qbb repo test.qg --summary /a/b/t/jobs/c/d/job-$$([NumJobStarts])-summary.json" 

166 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, gw, gwjob) 

167 self.assertEqual(htc_commands["arguments"], new_arguments) 

168 

169 

170class TranslateDagCmdsTestCase(unittest.TestCase): 

171 """Test _translate_dag_cmds method.""" 

172 

173 def setUp(self): 

174 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask") 

175 

176 def testPriority(self): 

177 gwjob = GenericWorkflowJob("priority", "label1", executable=self.gw_exec) 

178 gwjob.priority = 100 

179 dag_commands = prepare_utils._translate_dag_cmds(gwjob) 

180 self.assertEqual(dag_commands["priority"], 100) 

181 

182 

183class GroupToSubdagTestCase(unittest.TestCase): 

184 """Test _group_to_subdag function.""" 

185 

186 def testBlocking(self): 

187 gw = make_3_label_workflow_groups_sort("test1", True) 

188 gwjob = gw.get_job("group_order1_10001") 

189 config = BpsConfig( 

190 {}, 

191 search_order=BPS_SEARCH_ORDER, 

192 defaults=BPS_DEFAULTS, 

193 ) 

194 

195 htc_job = prepare_utils._group_to_subdag(config, gwjob, "the_prefix") 

196 self.assertEqual(len(htc_job.subdag), len(gwjob)) 

197 

198 

199class GatherSiteValuesTestCase(unittest.TestCase): 

200 """Test _gather_site_values function.""" 

201 

202 def testAllThere(self): 

203 config = BpsConfig( 

204 {}, 

205 search_order=BPS_SEARCH_ORDER, 

206 defaults=BPS_DEFAULTS, 

207 ) 

208 compute_site = "notThere" 

209 results = prepare_utils._gather_site_values(config, compute_site) 

210 self.assertEqual(results["memoryLimit"], BPS_DEFAULTS["memoryLimit"]) 

211 

212 def testNotSpecified(self): 

213 config = BpsConfig( 

214 {}, 

215 search_order=BPS_SEARCH_ORDER, 

216 defaults=BPS_DEFAULTS, 

217 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

218 ) 

219 compute_site = "notThere" 

220 results = prepare_utils._gather_site_values(config, compute_site) 

221 self.assertEqual(results["memoryLimit"], BPS_DEFAULTS["memoryLimit"]) 

222 

223 def testGlobalNodeset(self): 

224 config = BpsConfig( 

225 {"nodeset": "global_node_set_{campaign}", "campaign": "DRP"}, 

226 search_order=BPS_SEARCH_ORDER, 

227 defaults=BPS_DEFAULTS, 

228 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

229 ) 

230 compute_site = "fr" 

231 results = prepare_utils._gather_site_values(config, compute_site) 

232 self.assertEqual(results["nodeset"], "global_node_set_DRP") 

233 

234 def testSiteNodeset(self): 

235 config = BpsConfig( 

236 { 

237 "nodeset": "global_node_set_{campaign}", 

238 "campaign": "DRP", 

239 "site": {"fr": {"nodeset": "fr_node_set_{campaign}"}}, 

240 }, 

241 search_order=BPS_SEARCH_ORDER, 

242 defaults=BPS_DEFAULTS, 

243 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

244 ) 

245 compute_site = "fr" 

246 results = prepare_utils._gather_site_values(config, compute_site) 

247 self.assertEqual(results["nodeset"], "fr_node_set_DRP") 

248 

249 def testAttrsProfile(self): 

250 test_values = { 

251 "bpsNodeset": "DEVSET", 

252 "site": { 

253 "mycomputer": { 

254 "profile": { 

255 "condor": { 

256 "requirements": '( TARGET.Nodeset == "{bpsNodeset}" )', 

257 "+JobNodeset": "{bpsNodeset}", 

258 } 

259 } 

260 } 

261 }, 

262 } 

263 config = BpsConfig( 

264 test_values, 

265 search_order=BPS_SEARCH_ORDER, 

266 defaults=BPS_DEFAULTS, 

267 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

268 ) 

269 results = prepare_utils._gather_site_values(config, "mycomputer") 

270 self.assertEqual(results["profile"], {"requirements": '( TARGET.Nodeset == "DEVSET" )'}) 

271 self.assertEqual(results["attrs"], {"JobNodeset": "DEVSET"}) 

272 

273 

274class GatherLabelValuesTestCase(unittest.TestCase): 

275 """Test _gather_labels_values function.""" 

276 

277 def testClusterLabel(self): 

278 # Test cluster value overrides pipetask. 

279 label = "label1" 

280 config = BpsConfig( 

281 { 

282 "cluster": { 

283 "label1": { 

284 "releaseExpr": "cluster_val", 

285 "overwriteJobFiles": False, 

286 "profile": {"condor": {"prof_val1": 3}}, 

287 } 

288 }, 

289 "pipetask": {"label1": {"releaseExpr": "pipetask_val"}}, 

290 }, 

291 search_order=BPS_SEARCH_ORDER, 

292 defaults=BPS_DEFAULTS, 

293 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

294 ) 

295 results = prepare_utils._gather_label_values(config, label) 

296 self.assertEqual( 

297 results, 

298 { 

299 "attrs": {}, 

300 "profile": {"prof_val1": 3}, 

301 "releaseExpr": "cluster_val", 

302 "overwriteJobFiles": False, 

303 }, 

304 ) 

305 

306 def testPipetaskLabel(self): 

307 label = "label1" 

308 config = BpsConfig( 

309 { 

310 "pipetask": { 

311 "label1": { 

312 "releaseExpr": "pipetask_val", 

313 "overwriteJobFiles": False, 

314 "profile": {"condor": {"prof_val1": 3}}, 

315 } 

316 } 

317 }, 

318 search_order=BPS_SEARCH_ORDER, 

319 defaults=BPS_DEFAULTS, 

320 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

321 ) 

322 results = prepare_utils._gather_label_values(config, label) 

323 self.assertEqual( 

324 results, 

325 { 

326 "attrs": {}, 

327 "profile": {"prof_val1": 3}, 

328 "releaseExpr": "pipetask_val", 

329 "overwriteJobFiles": False, 

330 }, 

331 ) 

332 

333 def testNoSection(self): 

334 label = "notThere" 

335 config = BpsConfig( 

336 {}, 

337 search_order=BPS_SEARCH_ORDER, 

338 defaults=BPS_DEFAULTS, 

339 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

340 ) 

341 results = prepare_utils._gather_label_values(config, label) 

342 self.assertEqual(results, {"attrs": {}, "profile": {}, "overwriteJobFiles": True}) 

343 

344 def testNoOverwriteSpecified(self): 

345 label = "notthere" 

346 config = BpsConfig( 

347 {}, 

348 search_order=BPS_SEARCH_ORDER, 

349 defaults={}, 

350 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

351 ) 

352 results = prepare_utils._gather_label_values(config, label) 

353 self.assertEqual(results, {"attrs": {}, "profile": {}, "overwriteJobFiles": True}) 

354 

355 def testFinalJob(self): 

356 label = "finalJob" 

357 config = BpsConfig( 

358 {"finalJob": {"profile": {"condor": {"prof_val2": 6, "+attr_val1": 5}}}}, 

359 search_order=BPS_SEARCH_ORDER, 

360 defaults=BPS_DEFAULTS, 

361 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService", 

362 ) 

363 results = prepare_utils._gather_label_values(config, label) 

364 self.assertEqual( 

365 results, {"attrs": {"attr_val1": 5}, "profile": {"prof_val2": 6}, "overwriteJobFiles": False} 

366 ) 

367 

368 

369class CreateCheckJobTestCase(unittest.TestCase): 

370 """Test _create_check_job function.""" 

371 

372 def testSuccess(self): 

373 group_job_name = "group_order1_val1a" 

374 job_label = "order1" 

375 job = prepare_utils._create_check_job(group_job_name, job_label) 

376 self.assertIn(group_job_name, job.name) 

377 self.assertEqual(job.label, job_label) 

378 self.assertIn("check_group_status.sub", job.subfile) 

379 

380 

381class CreatePeriodicReleaseExprTestCase(unittest.TestCase): 

382 """Test _create_periodic_release_expr function.""" 

383 

384 def testNoReleaseExpr(self): 

385 results = prepare_utils._create_periodic_release_expr(2048, 1, 32768, "") 

386 self.assertEqual(results, "") 

387 

388 def testMultiplierNone(self): 

389 results = prepare_utils._create_periodic_release_expr(2048, None, 32768, "") 

390 self.assertEqual(results, "") 

391 

392 def testJustMemoryReleaseExpr(self): 

393 self.maxDiff = None # so test error shows entire strings 

394 results = prepare_utils._create_periodic_release_expr(2048, 2, 32768, "") 

395 truth = ( 

396 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && " 

397 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || " 

398 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && " 

399 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768" 

400 ) 

401 self.assertEqual(results, truth) 

402 

403 def testJustUserReleaseExpr(self): 

404 results = prepare_utils._create_periodic_release_expr(2048, 1, 32768, "True") 

405 truth = "JobStatus == 5 && NumJobStarts <= JobMaxRetries && HoldReasonCode =!= 1 && True" 

406 self.assertEqual(results, truth) 

407 

408 def testJustUserReleaseExprMultiplierNone(self): 

409 results = prepare_utils._create_periodic_release_expr(2048, None, 32768, "True") 

410 truth = "JobStatus == 5 && NumJobStarts <= JobMaxRetries && HoldReasonCode =!= 1 && True" 

411 self.assertEqual(results, truth) 

412 

413 def testMemoryAndUserReleaseExpr(self): 

414 self.maxDiff = None # so test error shows entire strings 

415 results = prepare_utils._create_periodic_release_expr(2048, 2, 32768, "True") 

416 truth = ( 

417 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && " 

418 "((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || " 

419 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && " 

420 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768 || " 

421 "HoldReasonCode =!= 1 && True)" 

422 ) 

423 self.assertEqual(results, truth) 

424 

425 

426class CreatePeriodicRemoveExprTestCase(unittest.TestCase): 

427 """Test _create_periodic_release_expr function.""" 

428 

429 def testBasicRemoveExpr(self): 

430 """Function assumes only called if max_retries >= 0.""" 

431 results = prepare_utils._create_periodic_remove_expr(2048, 1, 32768) 

432 truth = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)" 

433 self.assertEqual(results, truth) 

434 

435 def testBasicRemoveExprMultiplierNone(self): 

436 """Function assumes only called if max_retries >= 0.""" 

437 results = prepare_utils._create_periodic_remove_expr(2048, None, 32768) 

438 truth = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)" 

439 self.assertEqual(results, truth) 

440 

441 def testMemoryRemoveExpr(self): 

442 self.maxDiff = None # so test error shows entire strings 

443 results = prepare_utils._create_periodic_remove_expr(2048, 2, 32768) 

444 truth = ( 

445 "JobStatus == 5 && (NumJobStarts > JobMaxRetries || " 

446 "((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || " 

447 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && " 

448 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) == 32768))" 

449 ) 

450 self.assertEqual(results, truth) 

451 

452 

453class HandleJobOutputsTestCase(unittest.TestCase): 

454 """Test _handle_job_outputs function.""" 

455 

456 def setUp(self): 

457 self.job_name = "test_job" 

458 self.out_prefix = "/test/prefix" 

459 

460 def tearDown(self): 

461 pass 

462 

463 def testNoOutputsSharedFilesystem(self): 

464 """Test with shared filesystem and no outputs.""" 

465 mock_workflow = unittest.mock.Mock() 

466 mock_workflow.get_job_outputs.return_value = [] 

467 

468 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, True, self.out_prefix) 

469 

470 self.assertEqual(result, {"transfer_output_files": '""'}) 

471 

472 def testWithOutputsSharedFilesystem(self): 

473 """Test with shared filesystem and outputs present (still empty).""" 

474 mock_workflow = unittest.mock.Mock() 

475 mock_workflow.get_job_outputs.return_value = [ 

476 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt") 

477 ] 

478 

479 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, True, self.out_prefix) 

480 

481 self.assertEqual(result, {"transfer_output_files": '""'}) 

482 

483 def testNoOutputsNoSharedFilesystem(self): 

484 """Test without shared filesystem and no outputs.""" 

485 mock_workflow = unittest.mock.Mock() 

486 mock_workflow.get_job_outputs.return_value = [] 

487 

488 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix) 

489 

490 self.assertEqual(result, {"transfer_output_files": '""'}) 

491 

492 def testWithAnOutputNoSharedFilesystem(self): 

493 """Test without shared filesystem and single output file.""" 

494 mock_workflow = unittest.mock.Mock() 

495 mock_workflow.get_job_outputs.return_value = [ 

496 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt") 

497 ] 

498 

499 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix) 

500 

501 expected = { 

502 "transfer_output_files": "output.txt", 

503 "transfer_output_remaps": '"output.txt=/path/to/output.txt"', 

504 } 

505 self.assertEqual(result, expected) 

506 

507 def testWithOutputsNoSharedFilesystem(self): 

508 """Test without shared filesystem and multiple output files.""" 

509 mock_workflow = unittest.mock.Mock() 

510 mock_workflow.get_job_outputs.return_value = [ 

511 GenericWorkflowFile(name="output1.txt", src_uri="/path/output1.txt"), 

512 GenericWorkflowFile(name="output2.txt", src_uri="/another/path/output2.txt"), 

513 ] 

514 

515 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix) 

516 

517 expected = { 

518 "transfer_output_files": "output1.txt,output2.txt", 

519 "transfer_output_remaps": '"output1.txt=/path/output1.txt;output2.txt=/another/path/output2.txt"', 

520 } 

521 self.assertEqual(result, expected) 

522 

523 @unittest.mock.patch("lsst.ctrl.bps.htcondor.prepare_utils._LOG") 

524 def testLogging(self, mock_log): 

525 mock_workflow = unittest.mock.Mock() 

526 mock_workflow.get_job_outputs.return_value = [ 

527 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt") 

528 ] 

529 

530 prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix) 

531 

532 self.assertTrue(mock_log.debug.called) 

533 debug_calls = mock_log.debug.call_args_list 

534 self.assertTrue(any("src_uri=" in str(call) for call in debug_calls)) 

535 self.assertTrue(any("transfer_output_files=" in str(call) for call in debug_calls)) 

536 self.assertTrue(any("transfer_output_remaps=" in str(call) for call in debug_calls)) 

537 

538 

539class CreateJobTestCase(unittest.TestCase): 

540 """Test _create_job function.""" 

541 

542 def setUp(self): 

543 self.generic_workflow = make_3_label_workflow("test1", True) 

544 self.template = "{label}/{tract}/{patch}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}" 

545 

546 def testNoOverwrite(self): 

547 cached_values = { 

548 "bpsUseShared": True, 

549 "overwriteJobFiles": False, 

550 "memoryLimit": 491520, 

551 "profile": {}, 

552 "attrs": {}, 

553 } 

554 gwjob = self.generic_workflow.get_final() 

555 out_prefix = "submit" 

556 htc_job = prepare_utils._create_job( 

557 self.template, cached_values, self.generic_workflow, gwjob, out_prefix 

558 ) 

559 self.assertEqual(htc_job.name, gwjob.name) 

560 self.assertEqual(htc_job.label, gwjob.label) 

561 self.assertIn("NumJobStarts", htc_job.cmds["output"]) 

562 self.assertIn("NumJobStarts", htc_job.cmds["error"]) 

563 self.assertNotIn("NumJobStarts", htc_job.cmds["log"]) 

564 self.assertTrue(htc_job.cmds["error"].endswith(".out")) 

565 self.assertTrue(htc_job.cmds["output"].endswith(".out")) 

566 self.assertTrue(htc_job.cmds["log"].endswith(".log")) 

567 

568 def testNodesetWithNoRequirements(self): 

569 cached_values = { 

570 "bpsUseShared": True, 

571 "overwriteJobFiles": False, 

572 "memoryLimit": 491520, 

573 "profile": {}, 

574 "attrs": {}, 

575 "nodeset": "set1", 

576 } 

577 gwjob = self.generic_workflow.get_job("label1_10002_11") 

578 out_prefix = "temp" 

579 htc_job = prepare_utils._create_job( 

580 self.template, cached_values, self.generic_workflow, gwjob, out_prefix 

581 ) 

582 self.assertEqual(htc_job.cmds["requirements"], '( Target.Nodeset == "set1" )') 

583 self.assertEqual(htc_job.attrs["JobNodeset"], "set1") 

584 

585 def testNodesetWithRequirements(self): 

586 cached_values = { 

587 "bpsUseShared": True, 

588 "overwriteJobFiles": False, 

589 "memoryLimit": 491520, 

590 "profile": {"requirements": "dummy_val == 3"}, 

591 "attrs": {}, 

592 "nodeset": "set1", 

593 } 

594 gwjob = self.generic_workflow.get_job("label1_10002_11") 

595 out_prefix = "temp" 

596 htc_job = prepare_utils._create_job( 

597 self.template, cached_values, self.generic_workflow, gwjob, out_prefix 

598 ) 

599 self.assertEqual(htc_job.cmds["requirements"], '(dummy_val == 3) && ( Target.Nodeset == "set1" )') 

600 self.assertEqual(htc_job.attrs["JobNodeset"], "set1") 

601 

602 

603class ReplaceWmsVarsTestCase(unittest.TestCase): 

604 """Test _replace_wms_vars function.""" 

605 

606 def testNoWmsVar(self): 

607 orig_string = "whatever <Other:notThere> whatnot" 

608 updated_string = prepare_utils._replace_wms_vars(orig_string) 

609 self.assertEqual(orig_string, updated_string) 

610 

611 def testAttemptNum(self): 

612 orig_string = "whatever <WMS:attemptNum> whatnot" 

613 updated_string = prepare_utils._replace_wms_vars(orig_string) 

614 self.assertEqual("whatever $$([NumJobStarts]) whatnot", updated_string) 

615 

616 def testUnrecognized(self): 

617 orig_string = "whatever <WMS:notThere> whatnot" 

618 with self.assertLogs(level="INFO") as cm_log: 

619 with self.assertRaises(KeyError): 

620 _ = prepare_utils._replace_wms_vars(orig_string) 

621 self.assertRegex(cm_log.output[0], "Unrecognized WMS placeholder: notThere") 

622 

623 

624if __name__ == "__main__": 

625 unittest.main()