Coverage for tests / test_generic_workflow.py: 16%
483 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27import dataclasses
28import io
29import logging
30import unittest
31from collections import Counter
33import networkx
35import lsst.ctrl.bps.generic_workflow as gw
36import lsst.ctrl.bps.tests.gw_test_utils as gtu
39class TestGenericWorkflowNode(unittest.TestCase):
40 """Test of generic workflow node base class."""
42 def testNoNodeType(self):
43 @dataclasses.dataclass(slots=True)
44 class GenericWorkflowNoNodeType(gw.GenericWorkflowNode):
45 dummy_val: int
47 job = GenericWorkflowNoNodeType("myname", "mylabel", 3)
48 with self.assertRaises(NotImplementedError):
49 _ = job.node_type
51 def testHash(self):
52 job = gw.GenericWorkflowNode("myname", "mylabel")
53 job2 = gw.GenericWorkflowNode("myname2", "mylabel")
54 job3 = gw.GenericWorkflowNode("myname", "mylabel2")
55 self.assertNotEqual(hash(job), hash(job2))
56 self.assertEqual(hash(job), hash(job3))
59class TestGenericWorkflowJob(unittest.TestCase):
60 """Test of generic workflow jobs."""
62 def testEquality(self):
63 job1 = gw.GenericWorkflowJob("job1", "label1")
64 job2 = gw.GenericWorkflowJob("job1", "label1")
65 self.assertEqual(job1, job2)
68class TestGenericWorkflow(unittest.TestCase):
69 """Test generic workflow."""
71 def setUp(self):
72 self.exec1 = gw.GenericWorkflowExec(
73 name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
74 )
75 self.job1 = gw.GenericWorkflowJob("job1", "label1")
76 self.job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
77 self.job1.executable = self.exec1
79 self.job2 = gw.GenericWorkflowJob("job2", "label2")
80 self.job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
81 self.job2.executable = self.exec1
83 def testAddJobDuplicate(self):
84 gwf = gw.GenericWorkflow("mytest")
85 gwf.add_job(self.job1)
86 with self.assertRaises(RuntimeError):
87 gwf.add_job(self.job1)
89 def testAddJobValid(self):
90 gwf = gw.GenericWorkflow("mytest")
91 gwf.add_job(self.job1)
92 self.assertEqual(1, gwf.number_of_nodes())
93 self.assertListEqual(["job1"], list(gwf))
94 getjob = gwf.get_job("job1")
95 self.assertEqual(self.job1, getjob)
97 def testAddNode(self):
98 gwf = gw.GenericWorkflow("mytest")
99 gwf.add_node(self.job1)
100 self.assertEqual(1, gwf.number_of_nodes())
101 self.assertListEqual(["job1"], list(gwf))
102 self.assertEqual(self.job1, gwf.get_job("job1"))
104 def testAddJobRelationshipsSingle(self):
105 gwf = gw.GenericWorkflow("mytest")
106 gwf.add_job(self.job1)
107 gwf.add_job(self.job2)
108 gwf.add_job_relationships("job1", "job2")
109 self.assertListEqual([("job1", "job2")], list(gwf.edges()))
111 def testAddJobRelationshipsMultiChild(self):
112 job3 = gw.GenericWorkflowJob("job3", "label2")
113 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
114 job3.executable = self.exec1
116 gwf = gw.GenericWorkflow("mytest")
117 gwf.add_job(self.job1)
118 gwf.add_job(self.job2)
119 gwf.add_job(job3)
120 gwf.add_job_relationships("job1", ["job2", "job3"])
121 self.assertListEqual([("job1", "job2"), ("job1", "job3")], list(gwf.edges()))
123 def testAddJobRelationshipsMultiParents(self):
124 job3 = gw.GenericWorkflowJob("job3", "label2")
125 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
126 job3.executable = self.exec1
127 gwf = gw.GenericWorkflow("mytest")
128 gwf.add_job(self.job1)
129 gwf.add_job(self.job2)
130 gwf.add_job(job3)
131 gwf.add_job_relationships(["job1", "job2"], "job3")
132 self.assertListEqual([("job1", "job3"), ("job2", "job3")], list(gwf.edges()))
134 def testAddJobRelationshipsNone(self):
135 gwf = gw.GenericWorkflow("mytest")
136 gwf.add_job(self.job1)
137 gwf.add_job_relationships(None, "job1")
138 self.assertListEqual([], list(gwf.edges()))
139 gwf.add_job_relationships("job1", None)
140 self.assertListEqual([], list(gwf.edges()))
142 def testGetJobExists(self):
143 gwf = gw.GenericWorkflow("mytest")
144 gwf.add_job(self.job1)
145 get_job = gwf.get_job("job1")
146 self.assertIs(self.job1, get_job)
148 def testGetJobError(self):
149 gwf = gw.GenericWorkflow("mytest")
150 gwf.add_job(self.job1)
151 with self.assertRaises(KeyError):
152 _ = gwf.get_job("job_not_there")
154 def testAddEdgeBadParent(self):
155 gwf = gw.GenericWorkflow("mytest")
156 gwf.add_job(self.job1)
157 gwf.add_job(self.job2)
158 with self.assertRaisesRegex(RuntimeError, "notthere not in GenericWorkflow"):
159 gwf.add_edge("notthere", "job2")
161 def testAddEdgeBadChild(self):
162 gwf = gw.GenericWorkflow("mytest")
163 gwf.add_job(self.job1)
164 gwf.add_job(self.job2)
165 with self.assertRaisesRegex(RuntimeError, "notthere2 not in GenericWorkflow"):
166 gwf.add_edge("job1", "notthere2")
168 def testQuantaCounts(self):
169 gwf = gtu.make_3_label_workflow("test1", final=True)
170 truth = Counter({"label1": 6, "label2": 6, "label3": 6})
171 self.assertEqual(gwf.quanta_counts, truth)
173 def testGetExecutablesNames(self):
174 gwf = gw.GenericWorkflow("mytest")
175 self.job1.executable = gw.GenericWorkflowExec("exec1")
176 gwf.add_job(self.job1)
177 self.assertEqual(gwf._executables["exec1"], self.job1.executable)
178 results = gwf.get_executables(data=False, transfer_only=False)
179 self.assertEqual(results, ["exec1"])
181 def testGetExecutablesData(self):
182 gwf = gw.GenericWorkflow("mytest")
183 self.job1.executable = gw.GenericWorkflowExec("exec1")
184 gwf.add_job(self.job1)
185 results = gwf.get_executables(data=True, transfer_only=False)
186 self.assertEqual(results, [self.job1.executable])
187 self.assertEqual(hash(self.job1.executable), hash(results[0]))
189 def testAddFileTwice(self):
190 gwf = gw.GenericWorkflow("mytest")
191 gwfile = gw.GenericWorkflowFile("file1")
192 gwf.add_file(gwfile)
193 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.DEBUG) as cm:
194 gwf.add_file(gwfile)
195 self.assertRegex(cm.records[-1].getMessage(), "Skipped add_file for existing file file1")
197 def testGetFilesNames(self):
198 gwf = gw.GenericWorkflow("mytest")
199 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
200 gwf.add_file(gwfile1)
201 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
202 gwf.add_file(gwfile2)
203 results = gwf.get_files(data=False, transfer_only=False)
204 self.assertEqual(results, ["file1", "file2"])
206 def testGetFilesData(self):
207 gwf = gw.GenericWorkflow("mytest")
208 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
209 gwf.add_file(gwfile1)
210 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
211 gwf.add_file(gwfile2)
212 results = gwf.get_files(data=True, transfer_only=True)
213 self.assertEqual(results, [gwfile1])
214 self.assertEqual(hash(gwfile1), hash(results[0]))
216 def testJobInputs(self):
217 # test add + get
218 gwf = gw.GenericWorkflow("mytest")
219 gwf.add_job(self.job1)
220 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
221 gwf.add_job_inputs("job1", gwfile1)
222 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
223 gwf.add_job_inputs("job2", gwfile2)
224 self.assertIn("file1", gwf.get_files())
225 self.assertEqual(["file1"], gwf.get_job_inputs("job1", data=False))
226 self.assertEqual([gwfile1], gwf.get_job_inputs("job1"))
227 self.assertEqual([], gwf.get_job_inputs("job2", data=False, transfer_only=True))
229 def testSaveInvalidFormat(self):
230 gwf = gw.GenericWorkflow("mytest")
231 stream = io.BytesIO()
232 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"):
233 gwf.save(stream, "bad_format")
235 def testLoadInvalidFormat(self):
236 stream = io.BytesIO()
237 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"):
238 _ = gw.GenericWorkflow.load(stream, "bad_format")
240 def testValidate(self):
241 gwf = gw.GenericWorkflow("mytest")
242 gwf.add_job(self.job1)
243 gwf.add_job(self.job2)
244 job3 = gw.GenericWorkflowJob("job3", "label2")
245 gwf.add_job(job3)
246 gwf.add_job_relationships(["job1", "job2"], "job3")
247 # No exception should be raised
248 gwf.validate()
250 def testValidateGroups(self):
251 gwf = gtu.make_3_label_workflow_groups_sort("test_validate", final=True)
252 gwf.validate()
254 def testSavePickle(self):
255 # test save and load
256 gwf = gw.GenericWorkflow("mytest")
257 gwf.add_job(self.job1)
258 gwf.add_job(self.job2)
259 gwf.add_job_relationships("job1", "job2")
260 stream = io.BytesIO()
261 gwf.save(stream, "pickle")
262 stream.seek(0)
263 gwf2 = gw.GenericWorkflow.load(stream, "pickle")
264 self.assertTrue(
265 gtu.compare_generic_workflows(gwf, gwf2),
266 "Results do not match expected GenericWorkflow. See debug messages.",
267 )
269 def testDrawBadFormat(self):
270 gwf = gw.GenericWorkflow("mytest")
271 gwf.add_job(self.job1)
272 stream = io.BytesIO()
273 with self.assertRaisesRegex(RuntimeError, r"Unknown draw format \(bad_format\)"):
274 gwf.draw(stream, "bad_format")
276 def testLabels(self):
277 job3 = gw.GenericWorkflowJob("job3", "label2")
278 gwf = gw.GenericWorkflow("mytest")
279 gwf.add_job(self.job1)
280 gwf.add_job(self.job2)
281 gwf.add_job(job3)
282 gwf.add_job_relationships(["job1", "job2"], "job3")
283 self.assertListEqual(["label1", "label2"], gwf.labels)
285 def testRegenerateLabels(self):
286 job3 = gw.GenericWorkflowJob("job3", "label2")
287 gwf = gw.GenericWorkflow("mytest")
288 gwf.add_job(self.job1)
289 gwf.add_job(self.job2)
290 gwf.add_job(job3)
291 gwf.add_job_relationships(["job1", "job2"], "job3")
292 self.job1.label = "label1b"
293 self.job2.label = "label1b"
294 job3.label = "label2b"
295 gwf.regenerate_labels()
296 self.assertListEqual(["label1b", "label2b"], gwf.labels)
298 def testJobCounts(self):
299 gwf = gtu.make_3_label_workflow("test1", final=False)
300 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6})
301 self.assertEqual(gwf.job_counts, truth)
303 def testJobCountsFinal(self):
304 gwf = gtu.make_3_label_workflow("test1", final=True)
305 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6, "finalJob": 1})
306 self.assertEqual(gwf.job_counts, truth)
308 def testDelJob(self):
309 job3 = gw.GenericWorkflowJob("job3", "label2")
310 gwf = gw.GenericWorkflow("mytest")
311 gwf.add_job(self.job1)
312 gwf.add_job(self.job2)
313 gwf.add_job(job3)
314 gwf.add_job_relationships(["job1", "job2"], "job3")
316 gwf.del_job("job2")
318 self.assertListEqual([("job1", "job3")], list(gwf.edges()))
319 self.assertEqual(Counter({"label1": 1, "label2": 1}), gwf.job_counts)
321 def testAddWorkflowSource(self):
322 job3 = gw.GenericWorkflowJob("job3", "label2")
323 gwf = gw.GenericWorkflow("mytest")
324 gwf.add_job(self.job1)
325 gwf.add_job(self.job2)
326 gwf.add_job(job3)
327 gwf.add_job_relationships(["job1", "job2"], "job3")
329 srcjob1 = gw.GenericWorkflowJob("srcjob1", "srclabel1")
330 srcjob1.executable = self.exec1
331 srcjob2 = gw.GenericWorkflowJob("srcjob2", "srclabel1")
332 srcjob2.executable = self.exec1
333 srcjob3 = gw.GenericWorkflowJob("srcjob3", "srclabel2")
334 srcjob3.executable = self.exec1
335 srcjob4 = gw.GenericWorkflowJob("srcjob4", "srclabel2")
336 srcjob4.executable = self.exec1
337 gwf2 = gw.GenericWorkflow("mytest2")
338 gwf2.add_job(srcjob1)
339 gwf2.add_job(srcjob2)
340 gwf2.add_job(srcjob3)
341 gwf2.add_job(srcjob4)
342 gwf2.add_job_relationships("srcjob1", "srcjob3")
343 gwf2.add_job_relationships("srcjob2", "srcjob4")
345 gwf.add_workflow_source(gwf2)
347 self.assertEqual(Counter({"srclabel1": 2, "srclabel2": 2, "label1": 1, "label2": 2}), gwf.job_counts)
348 self.assertListEqual(["srclabel1", "srclabel2", "label1", "label2"], gwf.labels)
349 self.assertListEqual(
350 sorted(
351 [
352 ("srcjob1", "srcjob3"),
353 ("srcjob2", "srcjob4"),
354 ("srcjob3", "job1"),
355 ("srcjob3", "job2"),
356 ("srcjob4", "job1"),
357 ("srcjob4", "job2"),
358 ("job1", "job3"),
359 ("job2", "job3"),
360 ]
361 ),
362 sorted(gwf.edges()),
363 )
365 def testGetJobsByLabel(self):
366 job3 = gw.GenericWorkflowJob("job3", "label3")
367 gwf = gw.GenericWorkflow("mytest")
368 gwf.add_job(self.job1)
369 gwf.add_job(self.job2)
370 gwf.add_job(job3)
371 gwf.add_job_relationships(["job1", "job2"], "job3")
373 self.assertListEqual([job3], gwf.get_jobs_by_label("label3"))
375 def testAddJobInvalidType(self):
376 @dataclasses.dataclass(slots=True)
377 class GenericWorkflowNodeNoInherit:
378 name: str
379 label: str
381 def __hash__(self):
382 return hash(self.name)
384 @property
385 def node_type(self):
386 return gw.GenericWorkflowNodeType.NOOP
388 job3 = GenericWorkflowNodeNoInherit("myname", "mylabel")
389 gwf = gw.GenericWorkflow("mytest")
390 gwf.add_job(self.job1)
391 gwf.add_job(self.job2)
392 with self.assertRaisesRegex(RuntimeError, "Invalid type for job to be added to GenericWorkflowGraph"):
393 gwf.add_job(job3)
395 def testGroupJobsByDependencies(self):
396 gwf = gtu.make_3_label_workflow("test1", final=True)
397 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "sink"}
398 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
399 job_groups = gwf._group_jobs_by_dependencies(
400 "order1", group_config, group_to_label_subgraph["order1"]
401 )
402 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)]))
404 def testGroupJobsByDependenciesSource(self):
405 gwf = gtu.make_3_label_workflow("test1", final=True)
406 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "source"}
407 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
408 job_groups = gwf._group_jobs_by_dependencies(
409 "order1", group_config, group_to_label_subgraph["order1"]
410 )
411 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)]))
413 def testGroupJobsByDependenciesBadMethod(self):
414 gwf = gtu.make_3_label_workflow("test1", final=True)
416 group_config = {
417 "labels": "label2, label3",
418 "dimensions": "visit",
419 "findDependencyMethod": "bad_method",
420 }
421 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
423 with self.assertRaisesRegex(RuntimeError, r"Invalid findDependencyMethod \(bad_method\)"):
424 _ = gwf._group_jobs_by_dependencies("order1", group_config, group_to_label_subgraph["order1"])
426 def testCheckJobOrderingConfigBadImplementation(self):
427 gwf = gtu.make_3_label_workflow("test1", final=True)
428 with self.assertRaisesRegex(RuntimeError, "Invalid implementation for"):
429 gwf._check_job_ordering_config(
430 {"order1": {"implementation": "bad", "labels": "label2", "dimensions": "visit"}}
431 )
433 def testCheckJobOrderingConfigBadType(self):
434 gwf = gtu.make_3_label_workflow("test1", final=True)
435 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"):
436 gwf._check_job_ordering_config(
437 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}}
438 )
440 def testCheckJobOrderingConfigBadLabel(self):
441 gwf = gtu.make_3_label_workflow("test_bad_label", final=True)
442 with self.assertRaisesRegex(
443 RuntimeError, "Job label label2 appears in more than one job ordering group"
444 ):
445 gwf._check_job_ordering_config(
446 {
447 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"},
448 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"},
449 }
450 )
452 def testCheckJobOrderingConfigUnusedLabel(self):
453 gwf = gtu.make_3_label_workflow("test_unused_label", final=True)
454 with self.assertRaisesRegex(
455 RuntimeError,
456 r"Job label\(s\) \(unused1,unused2\) from job ordering group "
457 "order1 does not exist in workflow. Aborting.",
458 ):
459 gwf._check_job_ordering_config(
460 {
461 "order1": {
462 "ordering_type": "sort",
463 "labels": "label2,unused1,label3,unused2",
464 "dimensions": "visit",
465 },
466 }
467 )
469 def testCheckJobOrderingConfigMissingDim(self):
470 gwf = gtu.make_3_label_workflow("test_missing_dim", final=True)
471 with self.assertRaisesRegex(KeyError, "Missing dimensions entry in ordering group order1"):
472 gwf._check_job_ordering_config({"order1": {"ordering_type": "sort", "labels": "label2"}})
474 def testCheckJobOrderingConfigSort(self):
475 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True)
476 results = gwf._check_job_ordering_config(
477 {"order1": {"ordering_type": "sort", "labels": "label1,label2", "dimensions": "visit"}}
478 )
479 self.assertEqual(len(results), 1)
481 graph = networkx.DiGraph()
482 graph.add_nodes_from(["label1", "label2"])
483 graph.add_edges_from([("label1", "label2")])
484 self.assertTrue(networkx.is_isomorphic(results["order1"], graph, node_match=lambda x, y: x == y))
486 def testAddSpecialJobOrderingNoopSort(self):
487 # Also making sure noop jobs don't alter quanta_counts
488 gwf = gtu.make_3_label_workflow("test", final=True)
489 quanta_counts_before = gwf.quanta_counts
491 gwf.add_special_job_ordering(
492 {
493 "order1": {
494 "implementation": "noop",
495 "ordering_type": "sort",
496 "labels": "label1,label2",
497 "dimensions": "visit",
498 }
499 }
500 )
502 quanta_counts_after = gwf.quanta_counts
503 self.assertEqual(quanta_counts_after, quanta_counts_before)
505 gwf.regenerate_labels()
506 quanta_counts_after = gwf.quanta_counts
507 self.assertEqual(quanta_counts_after, quanta_counts_before)
509 gwf_noop = gtu.make_3_label_workflow_noop_sort("truth", final=True)
510 self.assertTrue(
511 gtu.compare_generic_workflows(gwf, gwf_noop),
512 "Results do not match expected GenericWorkflow. See debug messages.",
513 )
515 def testAddSpecialJobOrderingGroupSort(self):
516 gwf = gtu.make_3_label_workflow("test", final=True)
517 gwf.add_special_job_ordering(
518 {
519 "order1": {
520 "implementation": "group",
521 "ordering_type": "sort",
522 "labels": "label1,label2",
523 "dimensions": "visit",
524 }
525 }
526 )
528 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True)
529 self.assertTrue(
530 gtu.compare_generic_workflows(gwf, truth),
531 "Results do not match expected GenericWorkflow. See debug messages.",
532 )
534 def testAddSpecialJobOrderingGroupSortSink(self):
535 gwf = gtu.make_3_label_workflow("test_group_sort", final=True)
536 gwf.add_special_job_ordering(
537 {
538 "order1": {
539 "implementation": "group",
540 "ordering_type": "sort",
541 "findDependencyMethod": "sink",
542 "labels": "label1,label2",
543 "dimensions": "visit",
544 }
545 }
546 )
548 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True)
549 self.assertTrue(
550 gtu.compare_generic_workflows(gwf, truth),
551 "Results do not match expected GenericWorkflow. See debug messages.",
552 )
554 def testMiddleGroupValues(self):
555 gwf = gtu.make_5_label_workflow("mid_group_even_values", True, False, True)
556 gwf.add_special_job_ordering(
557 {
558 "mid": {
559 "implementation": "group",
560 "ordering_type": "sort",
561 "labels": "T2, T2b, T3",
562 "dimensions": "visit",
563 }
564 }
565 )
566 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
567 self.assertTrue(
568 gtu.compare_generic_workflows(gwf, truth),
569 "Results do not match expected GenericWorkflow. See debug messages.",
570 )
572 def testMiddleGroupSink(self):
573 gwf = gtu.make_5_label_workflow("mid_group_even_sink", True, False, True)
574 gwf.add_special_job_ordering(
575 {
576 "mid": {
577 "implementation": "group",
578 "ordering_type": "sort",
579 "findDependencyMethod": "sink",
580 "labels": "T2, T2b, T3",
581 "dimensions": "visit",
582 }
583 }
584 )
585 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
586 self.assertTrue(
587 gtu.compare_generic_workflows(gwf, truth),
588 "Results do not match expected GenericWorkflow. See debug messages.",
589 )
591 def testMiddleGroupSource(self):
592 gwf = gtu.make_5_label_workflow("mid_group_even_source", True, False, True)
593 gwf.add_special_job_ordering(
594 {
595 "mid": {
596 "implementation": "group",
597 "ordering_type": "sort",
598 "findDependencyMethod": "source",
599 "labels": "T2, T2b, T3",
600 "dimensions": "visit",
601 }
602 }
603 )
604 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
605 self.assertTrue(
606 gtu.compare_generic_workflows(gwf, truth),
607 "Results do not match expected GenericWorkflow. See debug messages.",
608 )
610 def testMiddleGroupValuesUneven(self):
611 gwf = gtu.make_5_label_workflow("mid_group_uneven_values", True, True, True)
612 gwf.add_special_job_ordering(
613 {
614 "mid": {
615 "implementation": "group",
616 "ordering_type": "sort",
617 "labels": "T2, T2b, T3",
618 "dimensions": "visit",
619 }
620 }
621 )
622 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True)
623 self.assertTrue(
624 gtu.compare_generic_workflows(gwf, truth),
625 "Results do not match expected GenericWorkflow. See debug messages.",
626 )
628 def testMiddleGroupSinkUneven(self):
629 gwf = gtu.make_5_label_workflow("mid_group_uneven_sink", True, True, True)
631 # Note: also checking changing blocking value from default
632 gwf.add_special_job_ordering(
633 {
634 "mid": {
635 "implementation": "group",
636 "ordering_type": "sort",
637 "findDependencyMethod": "sink",
638 "labels": "T2, T2b, T3",
639 "dimensions": "visit",
640 "blocking": True,
641 }
642 }
643 )
644 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True, True)
645 self.assertTrue(
646 gtu.compare_generic_workflows(gwf, truth),
647 "Results do not match expected GenericWorkflow. See debug messages.",
648 )
650 def testMiddleGroupSourceUneven(self):
651 gwf = gtu.make_5_label_workflow("mid_group_uneven_source", True, True, True)
652 gwf.add_special_job_ordering(
653 {
654 "mid": {
655 "implementation": "group",
656 "ordering_type": "sort",
657 "findDependencyMethod": "source",
658 "labels": "T2, T2b, T3",
659 "dimensions": "visit",
660 }
661 }
662 )
663 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True)
664 self.assertTrue(
665 gtu.compare_generic_workflows(gwf, truth),
666 "Results do not match expected GenericWorkflow. See debug messages.",
667 )
669 def test2GroupsEven(self):
670 gwf = gtu.make_5_label_workflow("two_groups_even", True, False, True)
671 gwf.add_special_job_ordering(
672 {
673 "order1": {
674 "implementation": "group",
675 "ordering_type": "sort",
676 "findDependencyMethod": "sink",
677 "labels": "T1, T2",
678 "dimensions": "visit",
679 "equalDimensions": "visit:group",
680 "blocking": True,
681 },
682 "order2": {
683 "implementation": "group",
684 "ordering_type": "sort",
685 "findDependencyMethod": "source",
686 "labels": "T3, T4",
687 "dimensions": "visit",
688 "blocking": True,
689 },
690 }
691 )
693 truth = gtu.make_5_label_workflow_2_groups("truth", True, False, True, True)
694 self.assertTrue(
695 gtu.compare_generic_workflows(gwf, truth),
696 "Results do not match expected GenericWorkflow. See debug messages.",
697 )
699 def test2GroupsUneven(self):
700 gwf = gtu.make_5_label_workflow("two_groups_uneven", True, True, True)
701 gwf.add_special_job_ordering(
702 {
703 "order1": {
704 "implementation": "group",
705 "ordering_type": "sort",
706 "findDependencyMethod": "sink",
707 "labels": "T1, T2",
708 "dimensions": "visit",
709 "equalDimensions": "visit:group",
710 },
711 "order2": {
712 "implementation": "group",
713 "ordering_type": "sort",
714 "findDependencyMethod": "source",
715 "labels": "T3, T4",
716 "dimensions": "visit",
717 },
718 }
719 )
721 truth = gtu.make_5_label_workflow_2_groups("truth", True, True, True)
723 self.assertTrue(
724 gtu.compare_generic_workflows(gwf, truth),
725 "Results do not match expected GenericWorkflow. See debug messages.",
726 )
729class TestGenericWorkflowLabels(unittest.TestCase):
730 """Tests for GenericWorkflowLabels"""
732 def testEmptyLabels(self):
733 gwlabels = gw.GenericWorkflowLabels()
734 self.assertFalse(len(gwlabels.labels))
736 def testEmptyJobCounts(self):
737 gwlabels = gw.GenericWorkflowLabels()
738 self.assertFalse(len(gwlabels.job_counts))
740 def testEmptyGetJobsByLabel(self):
741 gwlabels = gw.GenericWorkflowLabels()
742 self.assertFalse(len(gwlabels.get_jobs_by_label("label1")))
744 def testAddJobFirst(self):
745 gwlabels = gw.GenericWorkflowLabels()
746 job = gw.GenericWorkflowJob("job1", "label1")
747 gwlabels.add_job(job, [], [])
748 self.assertEqual(gwlabels._label_to_jobs["label1"], [job])
749 self.assertIn("label1", gwlabels._label_graph)
751 def testAddJobMult(self):
752 gwlabels = gw.GenericWorkflowLabels()
753 job1 = gw.GenericWorkflowJob("job1", "label1")
754 job2 = gw.GenericWorkflowJob("job2", "label2")
755 job3 = gw.GenericWorkflowJob("job3", "label2")
756 job4 = gw.GenericWorkflowJob("job4", "label3")
757 gwlabels.add_job(job1, [], [])
758 gwlabels.add_job(job2, ["label1"], [])
759 gwlabels.add_job(job3, ["label1"], [])
760 gwlabels.add_job(job4, ["label2"], [])
761 self.assertListEqual(gwlabels._label_to_jobs["label1"], [job1])
762 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job2, job3])
763 self.assertListEqual(gwlabels._label_to_jobs["label3"], [job4])
764 self.assertIn("label1", gwlabels._label_graph)
765 self.assertIn("label2", gwlabels._label_graph)
766 self.assertIn("label3", gwlabels._label_graph)
767 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"])
768 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"])
770 def testDelJobRemain(self):
771 # Test that can delete one of multiple jobs with same label
772 gwlabels = gw.GenericWorkflowLabels()
773 job1 = gw.GenericWorkflowJob("job1", "label1")
774 gwlabels.add_job(job1, [], [])
775 job2 = gw.GenericWorkflowJob("job2", "label2")
776 gwlabels.add_job(job2, ["label1"], [])
777 job3 = gw.GenericWorkflowJob("job3", "label2")
778 gwlabels.add_job(job3, ["label1"], [])
779 job4 = gw.GenericWorkflowJob("job4", "label3")
780 gwlabels.add_job(job4, ["label2"], [])
782 gwlabels.del_job(job2)
783 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job3])
784 self.assertIn("label2", gwlabels._label_graph)
785 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"])
786 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"])
788 def testDelJobLast(self):
789 # Test when removing only job with a label
790 gwlabels = gw.GenericWorkflowLabels()
791 job1 = gw.GenericWorkflowJob("job1", "label1")
792 gwlabels.add_job(job1, [], [])
793 job2 = gw.GenericWorkflowJob("job2", "label2")
794 gwlabels.add_job(job2, [], [])
795 job3 = gw.GenericWorkflowJob("job3", "label3")
796 gwlabels.add_job(job3, ["label1", "label2"], [])
797 job4 = gw.GenericWorkflowJob("job4", "label4")
798 gwlabels.add_job(job4, ["label3"], [])
799 job5 = gw.GenericWorkflowJob("job5", "label5")
800 gwlabels.add_job(job5, ["label3"], [])
802 gwlabels.del_job(job3)
803 self.assertNotIn("label3", gwlabels._label_to_jobs)
804 self.assertNotIn("label3", gwlabels._label_graph)
805 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label4", "label5"])
806 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label4", "label5"])
808 def testAddSpecialJobOrderingBadType(self):
809 gwf = gtu.make_3_label_workflow("test_sort", final=True)
810 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"):
811 gwf.add_special_job_ordering(
812 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}}
813 )
815 def testAddSpecialJobOrderingBadLabel(self):
816 gwf = gtu.make_3_label_workflow("test_bad_label", final=True)
818 with self.assertRaisesRegex(
819 RuntimeError, "Job label label2 appears in more than one job ordering group"
820 ):
821 gwf.add_special_job_ordering(
822 {
823 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"},
824 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"},
825 }
826 )
828 def testAddSpecialJobOrderingBadDim(self):
829 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True)
831 with self.assertRaisesRegex(
832 KeyError, r"Job label2_10001_10 missing dimensions \(notthere\) required for order group order1"
833 ):
834 gwf.add_special_job_ordering(
835 {"order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "notthere"}}
836 )
838 def testAddSpecialJobOrderingSort(self):
839 gwf = gtu.make_3_label_workflow("test_sort", final=True)
840 quanta_counts_before = gwf.quanta_counts
842 gwf.add_special_job_ordering(
843 {
844 "order1": {
845 "implementation": "noop",
846 "ordering_type": "sort",
847 "labels": "label2",
848 "dimensions": "visit",
849 }
850 }
851 )
853 quanta_counts_after = gwf.quanta_counts
854 self.assertEqual(quanta_counts_after, quanta_counts_before)
856 gwf.regenerate_labels()
857 quanta_counts_after = gwf.quanta_counts
858 self.assertEqual(quanta_counts_after, quanta_counts_before)
860 gwf_edges = gwf.edges
861 for edge in [
862 ("label2_301_10", "noop_order1_301"),
863 ("label2_301_11", "noop_order1_301"),
864 ("noop_order1_301", "label2_10001_10"),
865 ("noop_order1_301", "label2_10001_11"),
866 ("label2_10001_10", "noop_order1_10001"),
867 ("label2_10001_11", "noop_order1_10001"),
868 ("noop_order1_10001", "label2_10002_10"),
869 ("noop_order1_10001", "label2_10002_11"),
870 ]:
871 self.assertIn(edge, gwf_edges, f"Missing edge from {edge[0]} to {edge[1]}")
874if __name__ == "__main__":
875 unittest.main()