Coverage for tests / test_generic_workflow.py: 16%

484 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:21 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27import dataclasses 

28import io 

29import logging 

30import unittest 

31from collections import Counter 

32 

33import networkx 

34 

35import lsst.ctrl.bps.generic_workflow as gw 

36import lsst.ctrl.bps.tests.gw_test_utils as gtu 

37 

38 

39class TestGenericWorkflowNode(unittest.TestCase): 

40 """Test of generic workflow node base class.""" 

41 

42 def testNoNodeType(self): 

43 @dataclasses.dataclass(slots=True) 

44 class GenericWorkflowNoNodeType(gw.GenericWorkflowNode): 

45 dummy_val: int 

46 

47 job = GenericWorkflowNoNodeType("myname", "mylabel", 3) 

48 with self.assertRaises(NotImplementedError): 

49 _ = job.node_type 

50 

51 def testHash(self): 

52 job = gw.GenericWorkflowNode("myname", "mylabel") 

53 job2 = gw.GenericWorkflowNode("myname2", "mylabel") 

54 job3 = gw.GenericWorkflowNode("myname", "mylabel2") 

55 self.assertNotEqual(hash(job), hash(job2)) 

56 self.assertEqual(hash(job), hash(job3)) 

57 

58 

59class TestGenericWorkflowJob(unittest.TestCase): 

60 """Test of generic workflow jobs.""" 

61 

62 def testEquality(self): 

63 job1 = gw.GenericWorkflowJob("job1", "label1") 

64 job2 = gw.GenericWorkflowJob("job1", "label1") 

65 self.assertEqual(job1, job2) 

66 

67 

68class TestGenericWorkflow(unittest.TestCase): 

69 """Test generic workflow.""" 

70 

71 def setUp(self): 

72 self.exec1 = gw.GenericWorkflowExec( 

73 name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False 

74 ) 

75 self.job1 = gw.GenericWorkflowJob("job1", "label1") 

76 self.job1.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

77 self.job1.executable = self.exec1 

78 

79 self.job2 = gw.GenericWorkflowJob("job2", "label2") 

80 self.job2.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

81 self.job2.executable = self.exec1 

82 

83 def testAddJobDuplicate(self): 

84 gwf = gw.GenericWorkflow("mytest") 

85 gwf.add_job(self.job1) 

86 with self.assertRaises(RuntimeError): 

87 gwf.add_job(self.job1) 

88 

89 def testAddJobValid(self): 

90 gwf = gw.GenericWorkflow("mytest") 

91 gwf.add_job(self.job1) 

92 self.assertEqual(1, gwf.number_of_nodes()) 

93 self.assertListEqual(["job1"], list(gwf)) 

94 getjob = gwf.get_job("job1") 

95 self.assertEqual(self.job1, getjob) 

96 

97 def testAddNode(self): 

98 gwf = gw.GenericWorkflow("mytest") 

99 gwf.add_node(self.job1) 

100 self.assertEqual(1, gwf.number_of_nodes()) 

101 self.assertListEqual(["job1"], list(gwf)) 

102 self.assertEqual(self.job1, gwf.get_job("job1")) 

103 

104 def testAddJobRelationshipsSingle(self): 

105 gwf = gw.GenericWorkflow("mytest") 

106 gwf.add_job(self.job1) 

107 gwf.add_job(self.job2) 

108 gwf.add_job_relationships("job1", "job2") 

109 self.assertListEqual([("job1", "job2")], list(gwf.edges())) 

110 

111 def testAddJobRelationshipsMultiChild(self): 

112 job3 = gw.GenericWorkflowJob("job3", "label2") 

113 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

114 job3.executable = self.exec1 

115 

116 gwf = gw.GenericWorkflow("mytest") 

117 gwf.add_job(self.job1) 

118 gwf.add_job(self.job2) 

119 gwf.add_job(job3) 

120 gwf.add_job_relationships("job1", ["job2", "job3"]) 

121 self.assertListEqual([("job1", "job2"), ("job1", "job3")], list(gwf.edges())) 

122 

123 def testAddJobRelationshipsMultiParents(self): 

124 job3 = gw.GenericWorkflowJob("job3", "label2") 

125 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

126 job3.executable = self.exec1 

127 gwf = gw.GenericWorkflow("mytest") 

128 gwf.add_job(self.job1) 

129 gwf.add_job(self.job2) 

130 gwf.add_job(job3) 

131 gwf.add_job_relationships(["job1", "job2"], "job3") 

132 self.assertListEqual([("job1", "job3"), ("job2", "job3")], list(gwf.edges())) 

133 

134 def testAddJobRelationshipsNone(self): 

135 gwf = gw.GenericWorkflow("mytest") 

136 gwf.add_job(self.job1) 

137 gwf.add_job_relationships(None, "job1") 

138 self.assertListEqual([], list(gwf.edges())) 

139 gwf.add_job_relationships("job1", None) 

140 self.assertListEqual([], list(gwf.edges())) 

141 

142 def testGetJobExists(self): 

143 gwf = gw.GenericWorkflow("mytest") 

144 gwf.add_job(self.job1) 

145 get_job = gwf.get_job("job1") 

146 self.assertIs(self.job1, get_job) 

147 

148 def testGetJobError(self): 

149 gwf = gw.GenericWorkflow("mytest") 

150 gwf.add_job(self.job1) 

151 with self.assertRaises(KeyError): 

152 _ = gwf.get_job("job_not_there") 

153 

154 def testAddEdgeBadParent(self): 

155 gwf = gw.GenericWorkflow("mytest") 

156 gwf.add_job(self.job1) 

157 gwf.add_job(self.job2) 

158 with self.assertRaisesRegex(RuntimeError, "notthere not in GenericWorkflow"): 

159 gwf.add_edge("notthere", "job2") 

160 

161 def testAddEdgeBadChild(self): 

162 gwf = gw.GenericWorkflow("mytest") 

163 gwf.add_job(self.job1) 

164 gwf.add_job(self.job2) 

165 with self.assertRaisesRegex(RuntimeError, "notthere2 not in GenericWorkflow"): 

166 gwf.add_edge("job1", "notthere2") 

167 

168 def testQuantaCounts(self): 

169 gwf = gtu.make_3_label_workflow("test1", final=True) 

170 truth = Counter({"label1": 6, "label2": 6, "label3": 6}) 

171 self.assertEqual(gwf.quanta_counts, truth) 

172 

173 def testGetExecutablesNames(self): 

174 gwf = gw.GenericWorkflow("mytest") 

175 self.job1.executable = gw.GenericWorkflowExec("exec1") 

176 gwf.add_job(self.job1) 

177 self.assertEqual(gwf._executables["exec1"], self.job1.executable) 

178 results = gwf.get_executables(data=False, transfer_only=False) 

179 self.assertEqual(results, ["exec1"]) 

180 

181 def testGetExecutablesData(self): 

182 gwf = gw.GenericWorkflow("mytest") 

183 self.job1.executable = gw.GenericWorkflowExec("exec1") 

184 gwf.add_job(self.job1) 

185 results = gwf.get_executables(data=True, transfer_only=False) 

186 self.assertEqual(results, [self.job1.executable]) 

187 self.assertEqual(hash(self.job1.executable), hash(results[0])) 

188 

189 def testAddFileTwice(self): 

190 gwf = gw.GenericWorkflow("mytest") 

191 gwfile = gw.GenericWorkflowFile("file1") 

192 gwf.add_file(gwfile) 

193 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.DEBUG) as cm: 

194 gwf.add_file(gwfile) 

195 self.assertRegex(cm.records[-1].getMessage(), "Skipped add_file for existing file file1") 

196 

197 def testGetFilesNames(self): 

198 gwf = gw.GenericWorkflow("mytest") 

199 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

200 gwf.add_file(gwfile1) 

201 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

202 gwf.add_file(gwfile2) 

203 results = gwf.get_files(data=False, transfer_only=False) 

204 self.assertEqual(results, ["file1", "file2"]) 

205 

206 def testGetFilesData(self): 

207 gwf = gw.GenericWorkflow("mytest") 

208 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

209 gwf.add_file(gwfile1) 

210 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

211 gwf.add_file(gwfile2) 

212 results = gwf.get_files(data=True, transfer_only=True) 

213 self.assertEqual(results, [gwfile1]) 

214 self.assertEqual(hash(gwfile1), hash(results[0])) 

215 

216 def testJobInputs(self): 

217 # test add + get 

218 gwf = gw.GenericWorkflow("mytest") 

219 gwf.add_job(self.job1) 

220 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

221 gwf.add_job_inputs("job1", gwfile1) 

222 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

223 gwf.add_job_inputs("job2", gwfile2) 

224 self.assertIn("file1", gwf.get_files()) 

225 self.assertEqual(["file1"], gwf.get_job_inputs("job1", data=False)) 

226 self.assertEqual([gwfile1], gwf.get_job_inputs("job1")) 

227 self.assertEqual([], gwf.get_job_inputs("job2", data=False, transfer_only=True)) 

228 

229 def testSaveInvalidFormat(self): 

230 gwf = gw.GenericWorkflow("mytest") 

231 stream = io.BytesIO() 

232 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"): 

233 gwf.save(stream, "bad_format") 

234 

235 def testLoadInvalidFormat(self): 

236 stream = io.BytesIO() 

237 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"): 

238 _ = gw.GenericWorkflow.load(stream, "bad_format") 

239 

240 def testValidate(self): 

241 gwf = gw.GenericWorkflow("mytest") 

242 gwf.add_job(self.job1) 

243 gwf.add_job(self.job2) 

244 job3 = gw.GenericWorkflowJob("job3", "label2") 

245 gwf.add_job(job3) 

246 gwf.add_job_relationships(["job1", "job2"], "job3") 

247 # No exception should be raised 

248 gwf.validate() 

249 

250 def testValidateGroups(self): 

251 gwf = gtu.make_3_label_workflow_groups_sort("test_validate", final=True) 

252 gwf.validate() 

253 

254 def testSavePickle(self): 

255 # test save and load 

256 gwf = gw.GenericWorkflow("mytest") 

257 gwf.add_job(self.job1) 

258 gwf.add_job(self.job2) 

259 gwf.add_job_relationships("job1", "job2") 

260 stream = io.BytesIO() 

261 gwf.save(stream, "pickle") 

262 stream.seek(0) 

263 gwf2 = gw.GenericWorkflow.load(stream, "pickle") 

264 self.assertTrue( 

265 gtu.compare_generic_workflows(gwf, gwf2), 

266 "Results do not match expected GenericWorkflow. See debug messages.", 

267 ) 

268 

269 def testDrawBadFormat(self): 

270 gwf = gw.GenericWorkflow("mytest") 

271 gwf.add_job(self.job1) 

272 stream = io.BytesIO() 

273 with self.assertRaisesRegex(RuntimeError, r"Unknown draw format \(bad_format\)"): 

274 gwf.draw(stream, "bad_format") 

275 

276 def testLabels(self): 

277 job3 = gw.GenericWorkflowJob("job3", "label2") 

278 gwf = gw.GenericWorkflow("mytest") 

279 gwf.add_job(self.job1) 

280 gwf.add_job(self.job2) 

281 gwf.add_job(job3) 

282 gwf.add_job_relationships(["job1", "job2"], "job3") 

283 self.assertListEqual(["label1", "label2"], gwf.labels) 

284 

285 def testRegenerateLabels(self): 

286 job3 = gw.GenericWorkflowJob("job3", "label2") 

287 gwf = gw.GenericWorkflow("mytest") 

288 gwf.add_job(self.job1) 

289 gwf.add_job(self.job2) 

290 gwf.add_job(job3) 

291 gwf.add_job_relationships(["job1", "job2"], "job3") 

292 self.job1.label = "label1b" 

293 self.job2.label = "label1b" 

294 job3.label = "label2b" 

295 gwf.regenerate_labels() 

296 self.assertListEqual(["label1b", "label2b"], gwf.labels) 

297 

298 def testJobCounts(self): 

299 gwf = gtu.make_3_label_workflow("test1", final=False) 

300 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6}) 

301 self.assertEqual(gwf.job_counts, truth) 

302 

303 def testJobCountsFinal(self): 

304 gwf = gtu.make_3_label_workflow("test1", final=True) 

305 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6, "finalJob": 1}) 

306 self.assertEqual(gwf.job_counts, truth) 

307 

308 def testDelJob(self): 

309 job3 = gw.GenericWorkflowJob("job3", "label2") 

310 gwf = gw.GenericWorkflow("mytest") 

311 gwf.add_job(self.job1) 

312 gwf.add_job(self.job2) 

313 gwf.add_job(job3) 

314 gwf.add_job_relationships(["job1", "job2"], "job3") 

315 

316 gwf.del_job("job2") 

317 

318 self.assertListEqual([("job1", "job3")], list(gwf.edges())) 

319 self.assertEqual(Counter({"label1": 1, "label2": 1}), gwf.job_counts) 

320 

321 def testAddWorkflowSource(self): 

322 job3 = gw.GenericWorkflowJob("job3", "label2") 

323 gwf = gw.GenericWorkflow("mytest") 

324 gwf.add_job(self.job1) 

325 gwf.add_job(self.job2) 

326 gwf.add_job(job3) 

327 gwf.add_job_relationships(["job1", "job2"], "job3") 

328 

329 srcjob1 = gw.GenericWorkflowJob("srcjob1", "srclabel1") 

330 srcjob1.executable = self.exec1 

331 srcjob2 = gw.GenericWorkflowJob("srcjob2", "srclabel1") 

332 srcjob2.executable = self.exec1 

333 srcjob3 = gw.GenericWorkflowJob("srcjob3", "srclabel2") 

334 srcjob3.executable = self.exec1 

335 srcjob4 = gw.GenericWorkflowJob("srcjob4", "srclabel2") 

336 srcjob4.executable = self.exec1 

337 gwf2 = gw.GenericWorkflow("mytest2") 

338 gwf2.add_job(srcjob1) 

339 gwf2.add_job(srcjob2) 

340 gwf2.add_job(srcjob3) 

341 gwf2.add_job(srcjob4) 

342 gwf2.add_job_relationships("srcjob1", "srcjob3") 

343 gwf2.add_job_relationships("srcjob2", "srcjob4") 

344 

345 gwf.add_workflow_source(gwf2) 

346 

347 self.assertEqual(Counter({"srclabel1": 2, "srclabel2": 2, "label1": 1, "label2": 2}), gwf.job_counts) 

348 self.assertListEqual(["srclabel1", "srclabel2", "label1", "label2"], gwf.labels) 

349 self.assertListEqual( 

350 sorted( 

351 [ 

352 ("srcjob1", "srcjob3"), 

353 ("srcjob2", "srcjob4"), 

354 ("srcjob3", "job1"), 

355 ("srcjob3", "job2"), 

356 ("srcjob4", "job1"), 

357 ("srcjob4", "job2"), 

358 ("job1", "job3"), 

359 ("job2", "job3"), 

360 ] 

361 ), 

362 sorted(gwf.edges()), 

363 ) 

364 

365 def testGetJobsByLabel(self): 

366 job3 = gw.GenericWorkflowJob("job3", "label3") 

367 gwf = gw.GenericWorkflow("mytest") 

368 gwf.add_job(self.job1) 

369 gwf.add_job(self.job2) 

370 gwf.add_job(job3) 

371 gwf.add_job_relationships(["job1", "job2"], "job3") 

372 

373 self.assertListEqual([job3], gwf.get_jobs_by_label("label3")) 

374 

375 def testAddJobInvalidType(self): 

376 @dataclasses.dataclass(slots=True) 

377 class GenericWorkflowNodeNoInherit: 

378 name: str 

379 label: str 

380 

381 def __hash__(self): 

382 return hash(self.name) 

383 

384 @property 

385 def node_type(self): 

386 return gw.GenericWorkflowNodeType.NOOP 

387 

388 job3 = GenericWorkflowNodeNoInherit("myname", "mylabel") 

389 gwf = gw.GenericWorkflow("mytest") 

390 gwf.add_job(self.job1) 

391 gwf.add_job(self.job2) 

392 with self.assertRaisesRegex(RuntimeError, "Invalid type for job to be added to GenericWorkflowGraph"): 

393 gwf.add_job(job3) 

394 

395 def testGroupJobsByDependencies(self): 

396 gwf = gtu.make_3_label_workflow("test1", final=True) 

397 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "sink"} 

398 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

399 job_groups = gwf._group_jobs_by_dependencies( 

400 "order1", group_config, group_to_label_subgraph["order1"] 

401 ) 

402 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)])) 

403 

404 def testGroupJobsByDependenciesSource(self): 

405 gwf = gtu.make_3_label_workflow("test1", final=True) 

406 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "source"} 

407 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

408 job_groups = gwf._group_jobs_by_dependencies( 

409 "order1", group_config, group_to_label_subgraph["order1"] 

410 ) 

411 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)])) 

412 

413 def testGroupJobsByDependenciesBadMethod(self): 

414 gwf = gtu.make_3_label_workflow("test1", final=True) 

415 

416 group_config = { 

417 "labels": "label2, label3", 

418 "dimensions": "visit", 

419 "findDependencyMethod": "bad_method", 

420 } 

421 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

422 

423 with self.assertRaisesRegex(RuntimeError, r"Invalid findDependencyMethod \(bad_method\)"): 

424 _ = gwf._group_jobs_by_dependencies("order1", group_config, group_to_label_subgraph["order1"]) 

425 

426 def testCheckJobOrderingConfigBadImplementation(self): 

427 gwf = gtu.make_3_label_workflow("test1", final=True) 

428 with self.assertRaisesRegex(RuntimeError, "Invalid implementation for"): 

429 gwf._check_job_ordering_config( 

430 {"order1": {"implementation": "bad", "labels": "label2", "dimensions": "visit"}} 

431 ) 

432 

433 def testCheckJobOrderingConfigBadType(self): 

434 gwf = gtu.make_3_label_workflow("test1", final=True) 

435 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"): 

436 gwf._check_job_ordering_config( 

437 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}} 

438 ) 

439 

440 def testCheckJobOrderingConfigBadLabel(self): 

441 gwf = gtu.make_3_label_workflow("test_bad_label", final=True) 

442 with self.assertRaisesRegex( 

443 RuntimeError, "Job label label2 appears in more than one job ordering group" 

444 ): 

445 gwf._check_job_ordering_config( 

446 { 

447 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"}, 

448 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"}, 

449 } 

450 ) 

451 

452 def testCheckJobOrderingConfigUnusedLabel(self): 

453 gwf = gtu.make_3_label_workflow("test_unused_label", final=True) 

454 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.WARNING) as cm: 

455 gwf._check_job_ordering_config( 

456 { 

457 "order1": { 

458 "ordering_type": "sort", 

459 "labels": "label2,unused1,label3,unused2", 

460 "dimensions": "visit", 

461 }, 

462 } 

463 ) 

464 self.assertTrue( 

465 any( 

466 "Job label(s) (unused1,unused2) from job ordering group order1 does not exist in workflow." 

467 in record.getMessage() 

468 for record in cm.records 

469 ), 

470 "Expected warning about unused labels", 

471 ) 

472 

473 def testCheckJobOrderingConfigMissingDim(self): 

474 gwf = gtu.make_3_label_workflow("test_missing_dim", final=True) 

475 with self.assertRaisesRegex(KeyError, "Missing dimensions entry in ordering group order1"): 

476 gwf._check_job_ordering_config({"order1": {"ordering_type": "sort", "labels": "label2"}}) 

477 

478 def testCheckJobOrderingConfigSort(self): 

479 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True) 

480 results = gwf._check_job_ordering_config( 

481 {"order1": {"ordering_type": "sort", "labels": "label1,label2", "dimensions": "visit"}} 

482 ) 

483 self.assertEqual(len(results), 1) 

484 

485 graph = networkx.DiGraph() 

486 graph.add_nodes_from(["label1", "label2"]) 

487 graph.add_edges_from([("label1", "label2")]) 

488 self.assertTrue(networkx.is_isomorphic(results["order1"], graph, node_match=lambda x, y: x == y)) 

489 

490 def testAddSpecialJobOrderingNoopSort(self): 

491 # Also making sure noop jobs don't alter quanta_counts 

492 gwf = gtu.make_3_label_workflow("test", final=True) 

493 quanta_counts_before = gwf.quanta_counts 

494 

495 gwf.add_special_job_ordering( 

496 { 

497 "order1": { 

498 "implementation": "noop", 

499 "ordering_type": "sort", 

500 "labels": "label1,label2", 

501 "dimensions": "visit", 

502 } 

503 } 

504 ) 

505 

506 quanta_counts_after = gwf.quanta_counts 

507 self.assertEqual(quanta_counts_after, quanta_counts_before) 

508 

509 gwf.regenerate_labels() 

510 quanta_counts_after = gwf.quanta_counts 

511 self.assertEqual(quanta_counts_after, quanta_counts_before) 

512 

513 gwf_noop = gtu.make_3_label_workflow_noop_sort("truth", final=True) 

514 self.assertTrue( 

515 gtu.compare_generic_workflows(gwf, gwf_noop), 

516 "Results do not match expected GenericWorkflow. See debug messages.", 

517 ) 

518 

519 def testAddSpecialJobOrderingGroupSort(self): 

520 gwf = gtu.make_3_label_workflow("test", final=True) 

521 gwf.add_special_job_ordering( 

522 { 

523 "order1": { 

524 "implementation": "group", 

525 "ordering_type": "sort", 

526 "labels": "label1,label2", 

527 "dimensions": "visit", 

528 } 

529 } 

530 ) 

531 

532 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True) 

533 self.assertTrue( 

534 gtu.compare_generic_workflows(gwf, truth), 

535 "Results do not match expected GenericWorkflow. See debug messages.", 

536 ) 

537 

538 def testAddSpecialJobOrderingGroupSortSink(self): 

539 gwf = gtu.make_3_label_workflow("test_group_sort", final=True) 

540 gwf.add_special_job_ordering( 

541 { 

542 "order1": { 

543 "implementation": "group", 

544 "ordering_type": "sort", 

545 "findDependencyMethod": "sink", 

546 "labels": "label1,label2", 

547 "dimensions": "visit", 

548 } 

549 } 

550 ) 

551 

552 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True) 

553 self.assertTrue( 

554 gtu.compare_generic_workflows(gwf, truth), 

555 "Results do not match expected GenericWorkflow. See debug messages.", 

556 ) 

557 

558 def testMiddleGroupValues(self): 

559 gwf = gtu.make_5_label_workflow("mid_group_even_values", True, False, True) 

560 gwf.add_special_job_ordering( 

561 { 

562 "mid": { 

563 "implementation": "group", 

564 "ordering_type": "sort", 

565 "labels": "T2, T2b, T3", 

566 "dimensions": "visit", 

567 } 

568 } 

569 ) 

570 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

571 self.assertTrue( 

572 gtu.compare_generic_workflows(gwf, truth), 

573 "Results do not match expected GenericWorkflow. See debug messages.", 

574 ) 

575 

576 def testMiddleGroupSink(self): 

577 gwf = gtu.make_5_label_workflow("mid_group_even_sink", True, False, True) 

578 gwf.add_special_job_ordering( 

579 { 

580 "mid": { 

581 "implementation": "group", 

582 "ordering_type": "sort", 

583 "findDependencyMethod": "sink", 

584 "labels": "T2, T2b, T3", 

585 "dimensions": "visit", 

586 } 

587 } 

588 ) 

589 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

590 self.assertTrue( 

591 gtu.compare_generic_workflows(gwf, truth), 

592 "Results do not match expected GenericWorkflow. See debug messages.", 

593 ) 

594 

595 def testMiddleGroupSource(self): 

596 gwf = gtu.make_5_label_workflow("mid_group_even_source", True, False, True) 

597 gwf.add_special_job_ordering( 

598 { 

599 "mid": { 

600 "implementation": "group", 

601 "ordering_type": "sort", 

602 "findDependencyMethod": "source", 

603 "labels": "T2, T2b, T3", 

604 "dimensions": "visit", 

605 } 

606 } 

607 ) 

608 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

609 self.assertTrue( 

610 gtu.compare_generic_workflows(gwf, truth), 

611 "Results do not match expected GenericWorkflow. See debug messages.", 

612 ) 

613 

614 def testMiddleGroupValuesUneven(self): 

615 gwf = gtu.make_5_label_workflow("mid_group_uneven_values", True, True, True) 

616 gwf.add_special_job_ordering( 

617 { 

618 "mid": { 

619 "implementation": "group", 

620 "ordering_type": "sort", 

621 "labels": "T2, T2b, T3", 

622 "dimensions": "visit", 

623 } 

624 } 

625 ) 

626 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True) 

627 self.assertTrue( 

628 gtu.compare_generic_workflows(gwf, truth), 

629 "Results do not match expected GenericWorkflow. See debug messages.", 

630 ) 

631 

632 def testMiddleGroupSinkUneven(self): 

633 gwf = gtu.make_5_label_workflow("mid_group_uneven_sink", True, True, True) 

634 

635 # Note: also checking changing blocking value from default 

636 gwf.add_special_job_ordering( 

637 { 

638 "mid": { 

639 "implementation": "group", 

640 "ordering_type": "sort", 

641 "findDependencyMethod": "sink", 

642 "labels": "T2, T2b, T3", 

643 "dimensions": "visit", 

644 "blocking": True, 

645 } 

646 } 

647 ) 

648 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True, True) 

649 self.assertTrue( 

650 gtu.compare_generic_workflows(gwf, truth), 

651 "Results do not match expected GenericWorkflow. See debug messages.", 

652 ) 

653 

654 def testMiddleGroupSourceUneven(self): 

655 gwf = gtu.make_5_label_workflow("mid_group_uneven_source", True, True, True) 

656 gwf.add_special_job_ordering( 

657 { 

658 "mid": { 

659 "implementation": "group", 

660 "ordering_type": "sort", 

661 "findDependencyMethod": "source", 

662 "labels": "T2, T2b, T3", 

663 "dimensions": "visit", 

664 } 

665 } 

666 ) 

667 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True) 

668 self.assertTrue( 

669 gtu.compare_generic_workflows(gwf, truth), 

670 "Results do not match expected GenericWorkflow. See debug messages.", 

671 ) 

672 

673 def test2GroupsEven(self): 

674 gwf = gtu.make_5_label_workflow("two_groups_even", True, False, True) 

675 gwf.add_special_job_ordering( 

676 { 

677 "order1": { 

678 "implementation": "group", 

679 "ordering_type": "sort", 

680 "findDependencyMethod": "sink", 

681 "labels": "T1, T2", 

682 "dimensions": "visit", 

683 "equalDimensions": "visit:group", 

684 "blocking": True, 

685 }, 

686 "order2": { 

687 "implementation": "group", 

688 "ordering_type": "sort", 

689 "findDependencyMethod": "source", 

690 "labels": "T3, T4", 

691 "dimensions": "visit", 

692 "blocking": True, 

693 }, 

694 } 

695 ) 

696 

697 truth = gtu.make_5_label_workflow_2_groups("truth", True, False, True, True) 

698 self.assertTrue( 

699 gtu.compare_generic_workflows(gwf, truth), 

700 "Results do not match expected GenericWorkflow. See debug messages.", 

701 ) 

702 

703 def test2GroupsUneven(self): 

704 gwf = gtu.make_5_label_workflow("two_groups_uneven", True, True, True) 

705 gwf.add_special_job_ordering( 

706 { 

707 "order1": { 

708 "implementation": "group", 

709 "ordering_type": "sort", 

710 "findDependencyMethod": "sink", 

711 "labels": "T1, T2", 

712 "dimensions": "visit", 

713 "equalDimensions": "visit:group", 

714 }, 

715 "order2": { 

716 "implementation": "group", 

717 "ordering_type": "sort", 

718 "findDependencyMethod": "source", 

719 "labels": "T3, T4", 

720 "dimensions": "visit", 

721 }, 

722 } 

723 ) 

724 

725 truth = gtu.make_5_label_workflow_2_groups("truth", True, True, True) 

726 

727 self.assertTrue( 

728 gtu.compare_generic_workflows(gwf, truth), 

729 "Results do not match expected GenericWorkflow. See debug messages.", 

730 ) 

731 

732 

733class TestGenericWorkflowLabels(unittest.TestCase): 

734 """Tests for GenericWorkflowLabels""" 

735 

736 def testEmptyLabels(self): 

737 gwlabels = gw.GenericWorkflowLabels() 

738 self.assertFalse(len(gwlabels.labels)) 

739 

740 def testEmptyJobCounts(self): 

741 gwlabels = gw.GenericWorkflowLabels() 

742 self.assertFalse(len(gwlabels.job_counts)) 

743 

744 def testEmptyGetJobsByLabel(self): 

745 gwlabels = gw.GenericWorkflowLabels() 

746 self.assertFalse(len(gwlabels.get_jobs_by_label("label1"))) 

747 

748 def testAddJobFirst(self): 

749 gwlabels = gw.GenericWorkflowLabels() 

750 job = gw.GenericWorkflowJob("job1", "label1") 

751 gwlabels.add_job(job, [], []) 

752 self.assertEqual(gwlabels._label_to_jobs["label1"], [job]) 

753 self.assertIn("label1", gwlabels._label_graph) 

754 

755 def testAddJobMult(self): 

756 gwlabels = gw.GenericWorkflowLabels() 

757 job1 = gw.GenericWorkflowJob("job1", "label1") 

758 job2 = gw.GenericWorkflowJob("job2", "label2") 

759 job3 = gw.GenericWorkflowJob("job3", "label2") 

760 job4 = gw.GenericWorkflowJob("job4", "label3") 

761 gwlabels.add_job(job1, [], []) 

762 gwlabels.add_job(job2, ["label1"], []) 

763 gwlabels.add_job(job3, ["label1"], []) 

764 gwlabels.add_job(job4, ["label2"], []) 

765 self.assertListEqual(gwlabels._label_to_jobs["label1"], [job1]) 

766 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job2, job3]) 

767 self.assertListEqual(gwlabels._label_to_jobs["label3"], [job4]) 

768 self.assertIn("label1", gwlabels._label_graph) 

769 self.assertIn("label2", gwlabels._label_graph) 

770 self.assertIn("label3", gwlabels._label_graph) 

771 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"]) 

772 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"]) 

773 

774 def testDelJobRemain(self): 

775 # Test that can delete one of multiple jobs with same label 

776 gwlabels = gw.GenericWorkflowLabels() 

777 job1 = gw.GenericWorkflowJob("job1", "label1") 

778 gwlabels.add_job(job1, [], []) 

779 job2 = gw.GenericWorkflowJob("job2", "label2") 

780 gwlabels.add_job(job2, ["label1"], []) 

781 job3 = gw.GenericWorkflowJob("job3", "label2") 

782 gwlabels.add_job(job3, ["label1"], []) 

783 job4 = gw.GenericWorkflowJob("job4", "label3") 

784 gwlabels.add_job(job4, ["label2"], []) 

785 

786 gwlabels.del_job(job2) 

787 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job3]) 

788 self.assertIn("label2", gwlabels._label_graph) 

789 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"]) 

790 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"]) 

791 

792 def testDelJobLast(self): 

793 # Test when removing only job with a label 

794 gwlabels = gw.GenericWorkflowLabels() 

795 job1 = gw.GenericWorkflowJob("job1", "label1") 

796 gwlabels.add_job(job1, [], []) 

797 job2 = gw.GenericWorkflowJob("job2", "label2") 

798 gwlabels.add_job(job2, [], []) 

799 job3 = gw.GenericWorkflowJob("job3", "label3") 

800 gwlabels.add_job(job3, ["label1", "label2"], []) 

801 job4 = gw.GenericWorkflowJob("job4", "label4") 

802 gwlabels.add_job(job4, ["label3"], []) 

803 job5 = gw.GenericWorkflowJob("job5", "label5") 

804 gwlabels.add_job(job5, ["label3"], []) 

805 

806 gwlabels.del_job(job3) 

807 self.assertNotIn("label3", gwlabels._label_to_jobs) 

808 self.assertNotIn("label3", gwlabels._label_graph) 

809 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label4", "label5"]) 

810 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label4", "label5"]) 

811 

812 def testAddSpecialJobOrderingBadType(self): 

813 gwf = gtu.make_3_label_workflow("test_sort", final=True) 

814 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"): 

815 gwf.add_special_job_ordering( 

816 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}} 

817 ) 

818 

819 def testAddSpecialJobOrderingBadLabel(self): 

820 gwf = gtu.make_3_label_workflow("test_bad_label", final=True) 

821 

822 with self.assertRaisesRegex( 

823 RuntimeError, "Job label label2 appears in more than one job ordering group" 

824 ): 

825 gwf.add_special_job_ordering( 

826 { 

827 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"}, 

828 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"}, 

829 } 

830 ) 

831 

832 def testAddSpecialJobOrderingBadDim(self): 

833 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True) 

834 

835 with self.assertRaisesRegex( 

836 KeyError, r"Job label2_10001_10 missing dimensions \(notthere\) required for order group order1" 

837 ): 

838 gwf.add_special_job_ordering( 

839 {"order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "notthere"}} 

840 ) 

841 

842 def testAddSpecialJobOrderingSort(self): 

843 gwf = gtu.make_3_label_workflow("test_sort", final=True) 

844 quanta_counts_before = gwf.quanta_counts 

845 

846 gwf.add_special_job_ordering( 

847 { 

848 "order1": { 

849 "implementation": "noop", 

850 "ordering_type": "sort", 

851 "labels": "label2", 

852 "dimensions": "visit", 

853 } 

854 } 

855 ) 

856 

857 quanta_counts_after = gwf.quanta_counts 

858 self.assertEqual(quanta_counts_after, quanta_counts_before) 

859 

860 gwf.regenerate_labels() 

861 quanta_counts_after = gwf.quanta_counts 

862 self.assertEqual(quanta_counts_after, quanta_counts_before) 

863 

864 gwf_edges = gwf.edges 

865 for edge in [ 

866 ("label2_301_10", "noop_order1_301"), 

867 ("label2_301_11", "noop_order1_301"), 

868 ("noop_order1_301", "label2_10001_10"), 

869 ("noop_order1_301", "label2_10001_11"), 

870 ("label2_10001_10", "noop_order1_10001"), 

871 ("label2_10001_11", "noop_order1_10001"), 

872 ("noop_order1_10001", "label2_10002_10"), 

873 ("noop_order1_10001", "label2_10002_11"), 

874 ]: 

875 self.assertIn(edge, gwf_edges, f"Missing edge from {edge[0]} to {edge[1]}") 

876 

877 

878if __name__ == "__main__": 

879 unittest.main()