Coverage for tests / test_generic_workflow.py: 16%

483 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:47 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27import dataclasses 

28import io 

29import logging 

30import unittest 

31from collections import Counter 

32 

33import networkx 

34 

35import lsst.ctrl.bps.generic_workflow as gw 

36import lsst.ctrl.bps.tests.gw_test_utils as gtu 

37 

38 

39class TestGenericWorkflowNode(unittest.TestCase): 

40 """Test of generic workflow node base class.""" 

41 

42 def testNoNodeType(self): 

43 @dataclasses.dataclass(slots=True) 

44 class GenericWorkflowNoNodeType(gw.GenericWorkflowNode): 

45 dummy_val: int 

46 

47 job = GenericWorkflowNoNodeType("myname", "mylabel", 3) 

48 with self.assertRaises(NotImplementedError): 

49 _ = job.node_type 

50 

51 def testHash(self): 

52 job = gw.GenericWorkflowNode("myname", "mylabel") 

53 job2 = gw.GenericWorkflowNode("myname2", "mylabel") 

54 job3 = gw.GenericWorkflowNode("myname", "mylabel2") 

55 self.assertNotEqual(hash(job), hash(job2)) 

56 self.assertEqual(hash(job), hash(job3)) 

57 

58 

59class TestGenericWorkflowJob(unittest.TestCase): 

60 """Test of generic workflow jobs.""" 

61 

62 def testEquality(self): 

63 job1 = gw.GenericWorkflowJob("job1", "label1") 

64 job2 = gw.GenericWorkflowJob("job1", "label1") 

65 self.assertEqual(job1, job2) 

66 

67 

68class TestGenericWorkflow(unittest.TestCase): 

69 """Test generic workflow.""" 

70 

71 def setUp(self): 

72 self.exec1 = gw.GenericWorkflowExec( 

73 name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False 

74 ) 

75 self.job1 = gw.GenericWorkflowJob("job1", "label1") 

76 self.job1.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

77 self.job1.executable = self.exec1 

78 

79 self.job2 = gw.GenericWorkflowJob("job2", "label2") 

80 self.job2.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

81 self.job2.executable = self.exec1 

82 

83 def testAddJobDuplicate(self): 

84 gwf = gw.GenericWorkflow("mytest") 

85 gwf.add_job(self.job1) 

86 with self.assertRaises(RuntimeError): 

87 gwf.add_job(self.job1) 

88 

89 def testAddJobValid(self): 

90 gwf = gw.GenericWorkflow("mytest") 

91 gwf.add_job(self.job1) 

92 self.assertEqual(1, gwf.number_of_nodes()) 

93 self.assertListEqual(["job1"], list(gwf)) 

94 getjob = gwf.get_job("job1") 

95 self.assertEqual(self.job1, getjob) 

96 

97 def testAddNode(self): 

98 gwf = gw.GenericWorkflow("mytest") 

99 gwf.add_node(self.job1) 

100 self.assertEqual(1, gwf.number_of_nodes()) 

101 self.assertListEqual(["job1"], list(gwf)) 

102 self.assertEqual(self.job1, gwf.get_job("job1")) 

103 

104 def testAddJobRelationshipsSingle(self): 

105 gwf = gw.GenericWorkflow("mytest") 

106 gwf.add_job(self.job1) 

107 gwf.add_job(self.job2) 

108 gwf.add_job_relationships("job1", "job2") 

109 self.assertListEqual([("job1", "job2")], list(gwf.edges())) 

110 

111 def testAddJobRelationshipsMultiChild(self): 

112 job3 = gw.GenericWorkflowJob("job3", "label2") 

113 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

114 job3.executable = self.exec1 

115 

116 gwf = gw.GenericWorkflow("mytest") 

117 gwf.add_job(self.job1) 

118 gwf.add_job(self.job2) 

119 gwf.add_job(job3) 

120 gwf.add_job_relationships("job1", ["job2", "job3"]) 

121 self.assertListEqual([("job1", "job2"), ("job1", "job3")], list(gwf.edges())) 

122 

123 def testAddJobRelationshipsMultiParents(self): 

124 job3 = gw.GenericWorkflowJob("job3", "label2") 

125 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2}) 

126 job3.executable = self.exec1 

127 gwf = gw.GenericWorkflow("mytest") 

128 gwf.add_job(self.job1) 

129 gwf.add_job(self.job2) 

130 gwf.add_job(job3) 

131 gwf.add_job_relationships(["job1", "job2"], "job3") 

132 self.assertListEqual([("job1", "job3"), ("job2", "job3")], list(gwf.edges())) 

133 

134 def testAddJobRelationshipsNone(self): 

135 gwf = gw.GenericWorkflow("mytest") 

136 gwf.add_job(self.job1) 

137 gwf.add_job_relationships(None, "job1") 

138 self.assertListEqual([], list(gwf.edges())) 

139 gwf.add_job_relationships("job1", None) 

140 self.assertListEqual([], list(gwf.edges())) 

141 

142 def testGetJobExists(self): 

143 gwf = gw.GenericWorkflow("mytest") 

144 gwf.add_job(self.job1) 

145 get_job = gwf.get_job("job1") 

146 self.assertIs(self.job1, get_job) 

147 

148 def testGetJobError(self): 

149 gwf = gw.GenericWorkflow("mytest") 

150 gwf.add_job(self.job1) 

151 with self.assertRaises(KeyError): 

152 _ = gwf.get_job("job_not_there") 

153 

154 def testAddEdgeBadParent(self): 

155 gwf = gw.GenericWorkflow("mytest") 

156 gwf.add_job(self.job1) 

157 gwf.add_job(self.job2) 

158 with self.assertRaisesRegex(RuntimeError, "notthere not in GenericWorkflow"): 

159 gwf.add_edge("notthere", "job2") 

160 

161 def testAddEdgeBadChild(self): 

162 gwf = gw.GenericWorkflow("mytest") 

163 gwf.add_job(self.job1) 

164 gwf.add_job(self.job2) 

165 with self.assertRaisesRegex(RuntimeError, "notthere2 not in GenericWorkflow"): 

166 gwf.add_edge("job1", "notthere2") 

167 

168 def testQuantaCounts(self): 

169 gwf = gtu.make_3_label_workflow("test1", final=True) 

170 truth = Counter({"label1": 6, "label2": 6, "label3": 6}) 

171 self.assertEqual(gwf.quanta_counts, truth) 

172 

173 def testGetExecutablesNames(self): 

174 gwf = gw.GenericWorkflow("mytest") 

175 self.job1.executable = gw.GenericWorkflowExec("exec1") 

176 gwf.add_job(self.job1) 

177 self.assertEqual(gwf._executables["exec1"], self.job1.executable) 

178 results = gwf.get_executables(data=False, transfer_only=False) 

179 self.assertEqual(results, ["exec1"]) 

180 

181 def testGetExecutablesData(self): 

182 gwf = gw.GenericWorkflow("mytest") 

183 self.job1.executable = gw.GenericWorkflowExec("exec1") 

184 gwf.add_job(self.job1) 

185 results = gwf.get_executables(data=True, transfer_only=False) 

186 self.assertEqual(results, [self.job1.executable]) 

187 self.assertEqual(hash(self.job1.executable), hash(results[0])) 

188 

189 def testAddFileTwice(self): 

190 gwf = gw.GenericWorkflow("mytest") 

191 gwfile = gw.GenericWorkflowFile("file1") 

192 gwf.add_file(gwfile) 

193 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.DEBUG) as cm: 

194 gwf.add_file(gwfile) 

195 self.assertRegex(cm.records[-1].getMessage(), "Skipped add_file for existing file file1") 

196 

197 def testGetFilesNames(self): 

198 gwf = gw.GenericWorkflow("mytest") 

199 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

200 gwf.add_file(gwfile1) 

201 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

202 gwf.add_file(gwfile2) 

203 results = gwf.get_files(data=False, transfer_only=False) 

204 self.assertEqual(results, ["file1", "file2"]) 

205 

206 def testGetFilesData(self): 

207 gwf = gw.GenericWorkflow("mytest") 

208 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

209 gwf.add_file(gwfile1) 

210 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

211 gwf.add_file(gwfile2) 

212 results = gwf.get_files(data=True, transfer_only=True) 

213 self.assertEqual(results, [gwfile1]) 

214 self.assertEqual(hash(gwfile1), hash(results[0])) 

215 

216 def testJobInputs(self): 

217 # test add + get 

218 gwf = gw.GenericWorkflow("mytest") 

219 gwf.add_job(self.job1) 

220 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True) 

221 gwf.add_job_inputs("job1", gwfile1) 

222 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False) 

223 gwf.add_job_inputs("job2", gwfile2) 

224 self.assertIn("file1", gwf.get_files()) 

225 self.assertEqual(["file1"], gwf.get_job_inputs("job1", data=False)) 

226 self.assertEqual([gwfile1], gwf.get_job_inputs("job1")) 

227 self.assertEqual([], gwf.get_job_inputs("job2", data=False, transfer_only=True)) 

228 

229 def testSaveInvalidFormat(self): 

230 gwf = gw.GenericWorkflow("mytest") 

231 stream = io.BytesIO() 

232 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"): 

233 gwf.save(stream, "bad_format") 

234 

235 def testLoadInvalidFormat(self): 

236 stream = io.BytesIO() 

237 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"): 

238 _ = gw.GenericWorkflow.load(stream, "bad_format") 

239 

240 def testValidate(self): 

241 gwf = gw.GenericWorkflow("mytest") 

242 gwf.add_job(self.job1) 

243 gwf.add_job(self.job2) 

244 job3 = gw.GenericWorkflowJob("job3", "label2") 

245 gwf.add_job(job3) 

246 gwf.add_job_relationships(["job1", "job2"], "job3") 

247 # No exception should be raised 

248 gwf.validate() 

249 

250 def testValidateGroups(self): 

251 gwf = gtu.make_3_label_workflow_groups_sort("test_validate", final=True) 

252 gwf.validate() 

253 

254 def testSavePickle(self): 

255 # test save and load 

256 gwf = gw.GenericWorkflow("mytest") 

257 gwf.add_job(self.job1) 

258 gwf.add_job(self.job2) 

259 gwf.add_job_relationships("job1", "job2") 

260 stream = io.BytesIO() 

261 gwf.save(stream, "pickle") 

262 stream.seek(0) 

263 gwf2 = gw.GenericWorkflow.load(stream, "pickle") 

264 self.assertTrue( 

265 gtu.compare_generic_workflows(gwf, gwf2), 

266 "Results do not match expected GenericWorkflow. See debug messages.", 

267 ) 

268 

269 def testDrawBadFormat(self): 

270 gwf = gw.GenericWorkflow("mytest") 

271 gwf.add_job(self.job1) 

272 stream = io.BytesIO() 

273 with self.assertRaisesRegex(RuntimeError, r"Unknown draw format \(bad_format\)"): 

274 gwf.draw(stream, "bad_format") 

275 

276 def testLabels(self): 

277 job3 = gw.GenericWorkflowJob("job3", "label2") 

278 gwf = gw.GenericWorkflow("mytest") 

279 gwf.add_job(self.job1) 

280 gwf.add_job(self.job2) 

281 gwf.add_job(job3) 

282 gwf.add_job_relationships(["job1", "job2"], "job3") 

283 self.assertListEqual(["label1", "label2"], gwf.labels) 

284 

285 def testRegenerateLabels(self): 

286 job3 = gw.GenericWorkflowJob("job3", "label2") 

287 gwf = gw.GenericWorkflow("mytest") 

288 gwf.add_job(self.job1) 

289 gwf.add_job(self.job2) 

290 gwf.add_job(job3) 

291 gwf.add_job_relationships(["job1", "job2"], "job3") 

292 self.job1.label = "label1b" 

293 self.job2.label = "label1b" 

294 job3.label = "label2b" 

295 gwf.regenerate_labels() 

296 self.assertListEqual(["label1b", "label2b"], gwf.labels) 

297 

298 def testJobCounts(self): 

299 gwf = gtu.make_3_label_workflow("test1", final=False) 

300 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6}) 

301 self.assertEqual(gwf.job_counts, truth) 

302 

303 def testJobCountsFinal(self): 

304 gwf = gtu.make_3_label_workflow("test1", final=True) 

305 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6, "finalJob": 1}) 

306 self.assertEqual(gwf.job_counts, truth) 

307 

308 def testDelJob(self): 

309 job3 = gw.GenericWorkflowJob("job3", "label2") 

310 gwf = gw.GenericWorkflow("mytest") 

311 gwf.add_job(self.job1) 

312 gwf.add_job(self.job2) 

313 gwf.add_job(job3) 

314 gwf.add_job_relationships(["job1", "job2"], "job3") 

315 

316 gwf.del_job("job2") 

317 

318 self.assertListEqual([("job1", "job3")], list(gwf.edges())) 

319 self.assertEqual(Counter({"label1": 1, "label2": 1}), gwf.job_counts) 

320 

321 def testAddWorkflowSource(self): 

322 job3 = gw.GenericWorkflowJob("job3", "label2") 

323 gwf = gw.GenericWorkflow("mytest") 

324 gwf.add_job(self.job1) 

325 gwf.add_job(self.job2) 

326 gwf.add_job(job3) 

327 gwf.add_job_relationships(["job1", "job2"], "job3") 

328 

329 srcjob1 = gw.GenericWorkflowJob("srcjob1", "srclabel1") 

330 srcjob1.executable = self.exec1 

331 srcjob2 = gw.GenericWorkflowJob("srcjob2", "srclabel1") 

332 srcjob2.executable = self.exec1 

333 srcjob3 = gw.GenericWorkflowJob("srcjob3", "srclabel2") 

334 srcjob3.executable = self.exec1 

335 srcjob4 = gw.GenericWorkflowJob("srcjob4", "srclabel2") 

336 srcjob4.executable = self.exec1 

337 gwf2 = gw.GenericWorkflow("mytest2") 

338 gwf2.add_job(srcjob1) 

339 gwf2.add_job(srcjob2) 

340 gwf2.add_job(srcjob3) 

341 gwf2.add_job(srcjob4) 

342 gwf2.add_job_relationships("srcjob1", "srcjob3") 

343 gwf2.add_job_relationships("srcjob2", "srcjob4") 

344 

345 gwf.add_workflow_source(gwf2) 

346 

347 self.assertEqual(Counter({"srclabel1": 2, "srclabel2": 2, "label1": 1, "label2": 2}), gwf.job_counts) 

348 self.assertListEqual(["srclabel1", "srclabel2", "label1", "label2"], gwf.labels) 

349 self.assertListEqual( 

350 sorted( 

351 [ 

352 ("srcjob1", "srcjob3"), 

353 ("srcjob2", "srcjob4"), 

354 ("srcjob3", "job1"), 

355 ("srcjob3", "job2"), 

356 ("srcjob4", "job1"), 

357 ("srcjob4", "job2"), 

358 ("job1", "job3"), 

359 ("job2", "job3"), 

360 ] 

361 ), 

362 sorted(gwf.edges()), 

363 ) 

364 

365 def testGetJobsByLabel(self): 

366 job3 = gw.GenericWorkflowJob("job3", "label3") 

367 gwf = gw.GenericWorkflow("mytest") 

368 gwf.add_job(self.job1) 

369 gwf.add_job(self.job2) 

370 gwf.add_job(job3) 

371 gwf.add_job_relationships(["job1", "job2"], "job3") 

372 

373 self.assertListEqual([job3], gwf.get_jobs_by_label("label3")) 

374 

375 def testAddJobInvalidType(self): 

376 @dataclasses.dataclass(slots=True) 

377 class GenericWorkflowNodeNoInherit: 

378 name: str 

379 label: str 

380 

381 def __hash__(self): 

382 return hash(self.name) 

383 

384 @property 

385 def node_type(self): 

386 return gw.GenericWorkflowNodeType.NOOP 

387 

388 job3 = GenericWorkflowNodeNoInherit("myname", "mylabel") 

389 gwf = gw.GenericWorkflow("mytest") 

390 gwf.add_job(self.job1) 

391 gwf.add_job(self.job2) 

392 with self.assertRaisesRegex(RuntimeError, "Invalid type for job to be added to GenericWorkflowGraph"): 

393 gwf.add_job(job3) 

394 

395 def testGroupJobsByDependencies(self): 

396 gwf = gtu.make_3_label_workflow("test1", final=True) 

397 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "sink"} 

398 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

399 job_groups = gwf._group_jobs_by_dependencies( 

400 "order1", group_config, group_to_label_subgraph["order1"] 

401 ) 

402 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)])) 

403 

404 def testGroupJobsByDependenciesSource(self): 

405 gwf = gtu.make_3_label_workflow("test1", final=True) 

406 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "source"} 

407 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

408 job_groups = gwf._group_jobs_by_dependencies( 

409 "order1", group_config, group_to_label_subgraph["order1"] 

410 ) 

411 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)])) 

412 

413 def testGroupJobsByDependenciesBadMethod(self): 

414 gwf = gtu.make_3_label_workflow("test1", final=True) 

415 

416 group_config = { 

417 "labels": "label2, label3", 

418 "dimensions": "visit", 

419 "findDependencyMethod": "bad_method", 

420 } 

421 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config}) 

422 

423 with self.assertRaisesRegex(RuntimeError, r"Invalid findDependencyMethod \(bad_method\)"): 

424 _ = gwf._group_jobs_by_dependencies("order1", group_config, group_to_label_subgraph["order1"]) 

425 

426 def testCheckJobOrderingConfigBadImplementation(self): 

427 gwf = gtu.make_3_label_workflow("test1", final=True) 

428 with self.assertRaisesRegex(RuntimeError, "Invalid implementation for"): 

429 gwf._check_job_ordering_config( 

430 {"order1": {"implementation": "bad", "labels": "label2", "dimensions": "visit"}} 

431 ) 

432 

433 def testCheckJobOrderingConfigBadType(self): 

434 gwf = gtu.make_3_label_workflow("test1", final=True) 

435 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"): 

436 gwf._check_job_ordering_config( 

437 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}} 

438 ) 

439 

440 def testCheckJobOrderingConfigBadLabel(self): 

441 gwf = gtu.make_3_label_workflow("test_bad_label", final=True) 

442 with self.assertRaisesRegex( 

443 RuntimeError, "Job label label2 appears in more than one job ordering group" 

444 ): 

445 gwf._check_job_ordering_config( 

446 { 

447 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"}, 

448 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"}, 

449 } 

450 ) 

451 

452 def testCheckJobOrderingConfigUnusedLabel(self): 

453 gwf = gtu.make_3_label_workflow("test_unused_label", final=True) 

454 with self.assertRaisesRegex( 

455 RuntimeError, 

456 r"Job label\(s\) \(unused1,unused2\) from job ordering group " 

457 "order1 does not exist in workflow. Aborting.", 

458 ): 

459 gwf._check_job_ordering_config( 

460 { 

461 "order1": { 

462 "ordering_type": "sort", 

463 "labels": "label2,unused1,label3,unused2", 

464 "dimensions": "visit", 

465 }, 

466 } 

467 ) 

468 

469 def testCheckJobOrderingConfigMissingDim(self): 

470 gwf = gtu.make_3_label_workflow("test_missing_dim", final=True) 

471 with self.assertRaisesRegex(KeyError, "Missing dimensions entry in ordering group order1"): 

472 gwf._check_job_ordering_config({"order1": {"ordering_type": "sort", "labels": "label2"}}) 

473 

474 def testCheckJobOrderingConfigSort(self): 

475 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True) 

476 results = gwf._check_job_ordering_config( 

477 {"order1": {"ordering_type": "sort", "labels": "label1,label2", "dimensions": "visit"}} 

478 ) 

479 self.assertEqual(len(results), 1) 

480 

481 graph = networkx.DiGraph() 

482 graph.add_nodes_from(["label1", "label2"]) 

483 graph.add_edges_from([("label1", "label2")]) 

484 self.assertTrue(networkx.is_isomorphic(results["order1"], graph, node_match=lambda x, y: x == y)) 

485 

486 def testAddSpecialJobOrderingNoopSort(self): 

487 # Also making sure noop jobs don't alter quanta_counts 

488 gwf = gtu.make_3_label_workflow("test", final=True) 

489 quanta_counts_before = gwf.quanta_counts 

490 

491 gwf.add_special_job_ordering( 

492 { 

493 "order1": { 

494 "implementation": "noop", 

495 "ordering_type": "sort", 

496 "labels": "label1,label2", 

497 "dimensions": "visit", 

498 } 

499 } 

500 ) 

501 

502 quanta_counts_after = gwf.quanta_counts 

503 self.assertEqual(quanta_counts_after, quanta_counts_before) 

504 

505 gwf.regenerate_labels() 

506 quanta_counts_after = gwf.quanta_counts 

507 self.assertEqual(quanta_counts_after, quanta_counts_before) 

508 

509 gwf_noop = gtu.make_3_label_workflow_noop_sort("truth", final=True) 

510 self.assertTrue( 

511 gtu.compare_generic_workflows(gwf, gwf_noop), 

512 "Results do not match expected GenericWorkflow. See debug messages.", 

513 ) 

514 

515 def testAddSpecialJobOrderingGroupSort(self): 

516 gwf = gtu.make_3_label_workflow("test", final=True) 

517 gwf.add_special_job_ordering( 

518 { 

519 "order1": { 

520 "implementation": "group", 

521 "ordering_type": "sort", 

522 "labels": "label1,label2", 

523 "dimensions": "visit", 

524 } 

525 } 

526 ) 

527 

528 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True) 

529 self.assertTrue( 

530 gtu.compare_generic_workflows(gwf, truth), 

531 "Results do not match expected GenericWorkflow. See debug messages.", 

532 ) 

533 

534 def testAddSpecialJobOrderingGroupSortSink(self): 

535 gwf = gtu.make_3_label_workflow("test_group_sort", final=True) 

536 gwf.add_special_job_ordering( 

537 { 

538 "order1": { 

539 "implementation": "group", 

540 "ordering_type": "sort", 

541 "findDependencyMethod": "sink", 

542 "labels": "label1,label2", 

543 "dimensions": "visit", 

544 } 

545 } 

546 ) 

547 

548 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True) 

549 self.assertTrue( 

550 gtu.compare_generic_workflows(gwf, truth), 

551 "Results do not match expected GenericWorkflow. See debug messages.", 

552 ) 

553 

554 def testMiddleGroupValues(self): 

555 gwf = gtu.make_5_label_workflow("mid_group_even_values", True, False, True) 

556 gwf.add_special_job_ordering( 

557 { 

558 "mid": { 

559 "implementation": "group", 

560 "ordering_type": "sort", 

561 "labels": "T2, T2b, T3", 

562 "dimensions": "visit", 

563 } 

564 } 

565 ) 

566 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

567 self.assertTrue( 

568 gtu.compare_generic_workflows(gwf, truth), 

569 "Results do not match expected GenericWorkflow. See debug messages.", 

570 ) 

571 

572 def testMiddleGroupSink(self): 

573 gwf = gtu.make_5_label_workflow("mid_group_even_sink", True, False, True) 

574 gwf.add_special_job_ordering( 

575 { 

576 "mid": { 

577 "implementation": "group", 

578 "ordering_type": "sort", 

579 "findDependencyMethod": "sink", 

580 "labels": "T2, T2b, T3", 

581 "dimensions": "visit", 

582 } 

583 } 

584 ) 

585 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

586 self.assertTrue( 

587 gtu.compare_generic_workflows(gwf, truth), 

588 "Results do not match expected GenericWorkflow. See debug messages.", 

589 ) 

590 

591 def testMiddleGroupSource(self): 

592 gwf = gtu.make_5_label_workflow("mid_group_even_source", True, False, True) 

593 gwf.add_special_job_ordering( 

594 { 

595 "mid": { 

596 "implementation": "group", 

597 "ordering_type": "sort", 

598 "findDependencyMethod": "source", 

599 "labels": "T2, T2b, T3", 

600 "dimensions": "visit", 

601 } 

602 } 

603 ) 

604 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True) 

605 self.assertTrue( 

606 gtu.compare_generic_workflows(gwf, truth), 

607 "Results do not match expected GenericWorkflow. See debug messages.", 

608 ) 

609 

610 def testMiddleGroupValuesUneven(self): 

611 gwf = gtu.make_5_label_workflow("mid_group_uneven_values", True, True, True) 

612 gwf.add_special_job_ordering( 

613 { 

614 "mid": { 

615 "implementation": "group", 

616 "ordering_type": "sort", 

617 "labels": "T2, T2b, T3", 

618 "dimensions": "visit", 

619 } 

620 } 

621 ) 

622 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True) 

623 self.assertTrue( 

624 gtu.compare_generic_workflows(gwf, truth), 

625 "Results do not match expected GenericWorkflow. See debug messages.", 

626 ) 

627 

628 def testMiddleGroupSinkUneven(self): 

629 gwf = gtu.make_5_label_workflow("mid_group_uneven_sink", True, True, True) 

630 

631 # Note: also checking changing blocking value from default 

632 gwf.add_special_job_ordering( 

633 { 

634 "mid": { 

635 "implementation": "group", 

636 "ordering_type": "sort", 

637 "findDependencyMethod": "sink", 

638 "labels": "T2, T2b, T3", 

639 "dimensions": "visit", 

640 "blocking": True, 

641 } 

642 } 

643 ) 

644 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True, True) 

645 self.assertTrue( 

646 gtu.compare_generic_workflows(gwf, truth), 

647 "Results do not match expected GenericWorkflow. See debug messages.", 

648 ) 

649 

650 def testMiddleGroupSourceUneven(self): 

651 gwf = gtu.make_5_label_workflow("mid_group_uneven_source", True, True, True) 

652 gwf.add_special_job_ordering( 

653 { 

654 "mid": { 

655 "implementation": "group", 

656 "ordering_type": "sort", 

657 "findDependencyMethod": "source", 

658 "labels": "T2, T2b, T3", 

659 "dimensions": "visit", 

660 } 

661 } 

662 ) 

663 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True) 

664 self.assertTrue( 

665 gtu.compare_generic_workflows(gwf, truth), 

666 "Results do not match expected GenericWorkflow. See debug messages.", 

667 ) 

668 

669 def test2GroupsEven(self): 

670 gwf = gtu.make_5_label_workflow("two_groups_even", True, False, True) 

671 gwf.add_special_job_ordering( 

672 { 

673 "order1": { 

674 "implementation": "group", 

675 "ordering_type": "sort", 

676 "findDependencyMethod": "sink", 

677 "labels": "T1, T2", 

678 "dimensions": "visit", 

679 "equalDimensions": "visit:group", 

680 "blocking": True, 

681 }, 

682 "order2": { 

683 "implementation": "group", 

684 "ordering_type": "sort", 

685 "findDependencyMethod": "source", 

686 "labels": "T3, T4", 

687 "dimensions": "visit", 

688 "blocking": True, 

689 }, 

690 } 

691 ) 

692 

693 truth = gtu.make_5_label_workflow_2_groups("truth", True, False, True, True) 

694 self.assertTrue( 

695 gtu.compare_generic_workflows(gwf, truth), 

696 "Results do not match expected GenericWorkflow. See debug messages.", 

697 ) 

698 

699 def test2GroupsUneven(self): 

700 gwf = gtu.make_5_label_workflow("two_groups_uneven", True, True, True) 

701 gwf.add_special_job_ordering( 

702 { 

703 "order1": { 

704 "implementation": "group", 

705 "ordering_type": "sort", 

706 "findDependencyMethod": "sink", 

707 "labels": "T1, T2", 

708 "dimensions": "visit", 

709 "equalDimensions": "visit:group", 

710 }, 

711 "order2": { 

712 "implementation": "group", 

713 "ordering_type": "sort", 

714 "findDependencyMethod": "source", 

715 "labels": "T3, T4", 

716 "dimensions": "visit", 

717 }, 

718 } 

719 ) 

720 

721 truth = gtu.make_5_label_workflow_2_groups("truth", True, True, True) 

722 

723 self.assertTrue( 

724 gtu.compare_generic_workflows(gwf, truth), 

725 "Results do not match expected GenericWorkflow. See debug messages.", 

726 ) 

727 

728 

729class TestGenericWorkflowLabels(unittest.TestCase): 

730 """Tests for GenericWorkflowLabels""" 

731 

732 def testEmptyLabels(self): 

733 gwlabels = gw.GenericWorkflowLabels() 

734 self.assertFalse(len(gwlabels.labels)) 

735 

736 def testEmptyJobCounts(self): 

737 gwlabels = gw.GenericWorkflowLabels() 

738 self.assertFalse(len(gwlabels.job_counts)) 

739 

740 def testEmptyGetJobsByLabel(self): 

741 gwlabels = gw.GenericWorkflowLabels() 

742 self.assertFalse(len(gwlabels.get_jobs_by_label("label1"))) 

743 

744 def testAddJobFirst(self): 

745 gwlabels = gw.GenericWorkflowLabels() 

746 job = gw.GenericWorkflowJob("job1", "label1") 

747 gwlabels.add_job(job, [], []) 

748 self.assertEqual(gwlabels._label_to_jobs["label1"], [job]) 

749 self.assertIn("label1", gwlabels._label_graph) 

750 

751 def testAddJobMult(self): 

752 gwlabels = gw.GenericWorkflowLabels() 

753 job1 = gw.GenericWorkflowJob("job1", "label1") 

754 job2 = gw.GenericWorkflowJob("job2", "label2") 

755 job3 = gw.GenericWorkflowJob("job3", "label2") 

756 job4 = gw.GenericWorkflowJob("job4", "label3") 

757 gwlabels.add_job(job1, [], []) 

758 gwlabels.add_job(job2, ["label1"], []) 

759 gwlabels.add_job(job3, ["label1"], []) 

760 gwlabels.add_job(job4, ["label2"], []) 

761 self.assertListEqual(gwlabels._label_to_jobs["label1"], [job1]) 

762 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job2, job3]) 

763 self.assertListEqual(gwlabels._label_to_jobs["label3"], [job4]) 

764 self.assertIn("label1", gwlabels._label_graph) 

765 self.assertIn("label2", gwlabels._label_graph) 

766 self.assertIn("label3", gwlabels._label_graph) 

767 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"]) 

768 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"]) 

769 

770 def testDelJobRemain(self): 

771 # Test that can delete one of multiple jobs with same label 

772 gwlabels = gw.GenericWorkflowLabels() 

773 job1 = gw.GenericWorkflowJob("job1", "label1") 

774 gwlabels.add_job(job1, [], []) 

775 job2 = gw.GenericWorkflowJob("job2", "label2") 

776 gwlabels.add_job(job2, ["label1"], []) 

777 job3 = gw.GenericWorkflowJob("job3", "label2") 

778 gwlabels.add_job(job3, ["label1"], []) 

779 job4 = gw.GenericWorkflowJob("job4", "label3") 

780 gwlabels.add_job(job4, ["label2"], []) 

781 

782 gwlabels.del_job(job2) 

783 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job3]) 

784 self.assertIn("label2", gwlabels._label_graph) 

785 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"]) 

786 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"]) 

787 

788 def testDelJobLast(self): 

789 # Test when removing only job with a label 

790 gwlabels = gw.GenericWorkflowLabels() 

791 job1 = gw.GenericWorkflowJob("job1", "label1") 

792 gwlabels.add_job(job1, [], []) 

793 job2 = gw.GenericWorkflowJob("job2", "label2") 

794 gwlabels.add_job(job2, [], []) 

795 job3 = gw.GenericWorkflowJob("job3", "label3") 

796 gwlabels.add_job(job3, ["label1", "label2"], []) 

797 job4 = gw.GenericWorkflowJob("job4", "label4") 

798 gwlabels.add_job(job4, ["label3"], []) 

799 job5 = gw.GenericWorkflowJob("job5", "label5") 

800 gwlabels.add_job(job5, ["label3"], []) 

801 

802 gwlabels.del_job(job3) 

803 self.assertNotIn("label3", gwlabels._label_to_jobs) 

804 self.assertNotIn("label3", gwlabels._label_graph) 

805 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label4", "label5"]) 

806 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label4", "label5"]) 

807 

808 def testAddSpecialJobOrderingBadType(self): 

809 gwf = gtu.make_3_label_workflow("test_sort", final=True) 

810 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"): 

811 gwf.add_special_job_ordering( 

812 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}} 

813 ) 

814 

815 def testAddSpecialJobOrderingBadLabel(self): 

816 gwf = gtu.make_3_label_workflow("test_bad_label", final=True) 

817 

818 with self.assertRaisesRegex( 

819 RuntimeError, "Job label label2 appears in more than one job ordering group" 

820 ): 

821 gwf.add_special_job_ordering( 

822 { 

823 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"}, 

824 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"}, 

825 } 

826 ) 

827 

828 def testAddSpecialJobOrderingBadDim(self): 

829 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True) 

830 

831 with self.assertRaisesRegex( 

832 KeyError, r"Job label2_10001_10 missing dimensions \(notthere\) required for order group order1" 

833 ): 

834 gwf.add_special_job_ordering( 

835 {"order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "notthere"}} 

836 ) 

837 

838 def testAddSpecialJobOrderingSort(self): 

839 gwf = gtu.make_3_label_workflow("test_sort", final=True) 

840 quanta_counts_before = gwf.quanta_counts 

841 

842 gwf.add_special_job_ordering( 

843 { 

844 "order1": { 

845 "implementation": "noop", 

846 "ordering_type": "sort", 

847 "labels": "label2", 

848 "dimensions": "visit", 

849 } 

850 } 

851 ) 

852 

853 quanta_counts_after = gwf.quanta_counts 

854 self.assertEqual(quanta_counts_after, quanta_counts_before) 

855 

856 gwf.regenerate_labels() 

857 quanta_counts_after = gwf.quanta_counts 

858 self.assertEqual(quanta_counts_after, quanta_counts_before) 

859 

860 gwf_edges = gwf.edges 

861 for edge in [ 

862 ("label2_301_10", "noop_order1_301"), 

863 ("label2_301_11", "noop_order1_301"), 

864 ("noop_order1_301", "label2_10001_10"), 

865 ("noop_order1_301", "label2_10001_11"), 

866 ("label2_10001_10", "noop_order1_10001"), 

867 ("label2_10001_11", "noop_order1_10001"), 

868 ("noop_order1_10001", "label2_10002_10"), 

869 ("noop_order1_10001", "label2_10002_11"), 

870 ]: 

871 self.assertIn(edge, gwf_edges, f"Missing edge from {edge[0]} to {edge[1]}") 

872 

873 

874if __name__ == "__main__": 

875 unittest.main()