Coverage for tests / test_generic_workflow.py: 16%
484 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:53 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27import dataclasses
28import io
29import logging
30import unittest
31from collections import Counter
33import networkx
35import lsst.ctrl.bps.generic_workflow as gw
36import lsst.ctrl.bps.tests.gw_test_utils as gtu
39class TestGenericWorkflowNode(unittest.TestCase):
40 """Test of generic workflow node base class."""
42 def testNoNodeType(self):
43 @dataclasses.dataclass(slots=True)
44 class GenericWorkflowNoNodeType(gw.GenericWorkflowNode):
45 dummy_val: int
47 job = GenericWorkflowNoNodeType("myname", "mylabel", 3)
48 with self.assertRaises(NotImplementedError):
49 _ = job.node_type
51 def testHash(self):
52 job = gw.GenericWorkflowNode("myname", "mylabel")
53 job2 = gw.GenericWorkflowNode("myname2", "mylabel")
54 job3 = gw.GenericWorkflowNode("myname", "mylabel2")
55 self.assertNotEqual(hash(job), hash(job2))
56 self.assertEqual(hash(job), hash(job3))
59class TestGenericWorkflowJob(unittest.TestCase):
60 """Test of generic workflow jobs."""
62 def testEquality(self):
63 job1 = gw.GenericWorkflowJob("job1", "label1")
64 job2 = gw.GenericWorkflowJob("job1", "label1")
65 self.assertEqual(job1, job2)
68class TestGenericWorkflow(unittest.TestCase):
69 """Test generic workflow."""
71 def setUp(self):
72 self.exec1 = gw.GenericWorkflowExec(
73 name="test1.py", src_uri="${CTRL_BPS_DIR}/bin/test1.py", transfer_executable=False
74 )
75 self.job1 = gw.GenericWorkflowJob("job1", "label1")
76 self.job1.quanta_counts = Counter({"pt1": 1, "pt2": 2})
77 self.job1.executable = self.exec1
79 self.job2 = gw.GenericWorkflowJob("job2", "label2")
80 self.job2.quanta_counts = Counter({"pt1": 1, "pt2": 2})
81 self.job2.executable = self.exec1
83 def testAddJobDuplicate(self):
84 gwf = gw.GenericWorkflow("mytest")
85 gwf.add_job(self.job1)
86 with self.assertRaises(RuntimeError):
87 gwf.add_job(self.job1)
89 def testAddJobValid(self):
90 gwf = gw.GenericWorkflow("mytest")
91 gwf.add_job(self.job1)
92 self.assertEqual(1, gwf.number_of_nodes())
93 self.assertListEqual(["job1"], list(gwf))
94 getjob = gwf.get_job("job1")
95 self.assertEqual(self.job1, getjob)
97 def testAddNode(self):
98 gwf = gw.GenericWorkflow("mytest")
99 gwf.add_node(self.job1)
100 self.assertEqual(1, gwf.number_of_nodes())
101 self.assertListEqual(["job1"], list(gwf))
102 self.assertEqual(self.job1, gwf.get_job("job1"))
104 def testAddJobRelationshipsSingle(self):
105 gwf = gw.GenericWorkflow("mytest")
106 gwf.add_job(self.job1)
107 gwf.add_job(self.job2)
108 gwf.add_job_relationships("job1", "job2")
109 self.assertListEqual([("job1", "job2")], list(gwf.edges()))
111 def testAddJobRelationshipsMultiChild(self):
112 job3 = gw.GenericWorkflowJob("job3", "label2")
113 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
114 job3.executable = self.exec1
116 gwf = gw.GenericWorkflow("mytest")
117 gwf.add_job(self.job1)
118 gwf.add_job(self.job2)
119 gwf.add_job(job3)
120 gwf.add_job_relationships("job1", ["job2", "job3"])
121 self.assertListEqual([("job1", "job2"), ("job1", "job3")], list(gwf.edges()))
123 def testAddJobRelationshipsMultiParents(self):
124 job3 = gw.GenericWorkflowJob("job3", "label2")
125 job3.quanta_counts = Counter({"pt1": 1, "pt2": 2})
126 job3.executable = self.exec1
127 gwf = gw.GenericWorkflow("mytest")
128 gwf.add_job(self.job1)
129 gwf.add_job(self.job2)
130 gwf.add_job(job3)
131 gwf.add_job_relationships(["job1", "job2"], "job3")
132 self.assertListEqual([("job1", "job3"), ("job2", "job3")], list(gwf.edges()))
134 def testAddJobRelationshipsNone(self):
135 gwf = gw.GenericWorkflow("mytest")
136 gwf.add_job(self.job1)
137 gwf.add_job_relationships(None, "job1")
138 self.assertListEqual([], list(gwf.edges()))
139 gwf.add_job_relationships("job1", None)
140 self.assertListEqual([], list(gwf.edges()))
142 def testGetJobExists(self):
143 gwf = gw.GenericWorkflow("mytest")
144 gwf.add_job(self.job1)
145 get_job = gwf.get_job("job1")
146 self.assertIs(self.job1, get_job)
148 def testGetJobError(self):
149 gwf = gw.GenericWorkflow("mytest")
150 gwf.add_job(self.job1)
151 with self.assertRaises(KeyError):
152 _ = gwf.get_job("job_not_there")
154 def testAddEdgeBadParent(self):
155 gwf = gw.GenericWorkflow("mytest")
156 gwf.add_job(self.job1)
157 gwf.add_job(self.job2)
158 with self.assertRaisesRegex(RuntimeError, "notthere not in GenericWorkflow"):
159 gwf.add_edge("notthere", "job2")
161 def testAddEdgeBadChild(self):
162 gwf = gw.GenericWorkflow("mytest")
163 gwf.add_job(self.job1)
164 gwf.add_job(self.job2)
165 with self.assertRaisesRegex(RuntimeError, "notthere2 not in GenericWorkflow"):
166 gwf.add_edge("job1", "notthere2")
168 def testQuantaCounts(self):
169 gwf = gtu.make_3_label_workflow("test1", final=True)
170 truth = Counter({"label1": 6, "label2": 6, "label3": 6})
171 self.assertEqual(gwf.quanta_counts, truth)
173 def testGetExecutablesNames(self):
174 gwf = gw.GenericWorkflow("mytest")
175 self.job1.executable = gw.GenericWorkflowExec("exec1")
176 gwf.add_job(self.job1)
177 self.assertEqual(gwf._executables["exec1"], self.job1.executable)
178 results = gwf.get_executables(data=False, transfer_only=False)
179 self.assertEqual(results, ["exec1"])
181 def testGetExecutablesData(self):
182 gwf = gw.GenericWorkflow("mytest")
183 self.job1.executable = gw.GenericWorkflowExec("exec1")
184 gwf.add_job(self.job1)
185 results = gwf.get_executables(data=True, transfer_only=False)
186 self.assertEqual(results, [self.job1.executable])
187 self.assertEqual(hash(self.job1.executable), hash(results[0]))
189 def testAddFileTwice(self):
190 gwf = gw.GenericWorkflow("mytest")
191 gwfile = gw.GenericWorkflowFile("file1")
192 gwf.add_file(gwfile)
193 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.DEBUG) as cm:
194 gwf.add_file(gwfile)
195 self.assertRegex(cm.records[-1].getMessage(), "Skipped add_file for existing file file1")
197 def testGetFilesNames(self):
198 gwf = gw.GenericWorkflow("mytest")
199 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
200 gwf.add_file(gwfile1)
201 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
202 gwf.add_file(gwfile2)
203 results = gwf.get_files(data=False, transfer_only=False)
204 self.assertEqual(results, ["file1", "file2"])
206 def testGetFilesData(self):
207 gwf = gw.GenericWorkflow("mytest")
208 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
209 gwf.add_file(gwfile1)
210 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
211 gwf.add_file(gwfile2)
212 results = gwf.get_files(data=True, transfer_only=True)
213 self.assertEqual(results, [gwfile1])
214 self.assertEqual(hash(gwfile1), hash(results[0]))
216 def testJobInputs(self):
217 # test add + get
218 gwf = gw.GenericWorkflow("mytest")
219 gwf.add_job(self.job1)
220 gwfile1 = gw.GenericWorkflowFile("file1", wms_transfer=True)
221 gwf.add_job_inputs("job1", gwfile1)
222 gwfile2 = gw.GenericWorkflowFile("file2", wms_transfer=False)
223 gwf.add_job_inputs("job2", gwfile2)
224 self.assertIn("file1", gwf.get_files())
225 self.assertEqual(["file1"], gwf.get_job_inputs("job1", data=False))
226 self.assertEqual([gwfile1], gwf.get_job_inputs("job1"))
227 self.assertEqual([], gwf.get_job_inputs("job2", data=False, transfer_only=True))
229 def testSaveInvalidFormat(self):
230 gwf = gw.GenericWorkflow("mytest")
231 stream = io.BytesIO()
232 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"):
233 gwf.save(stream, "bad_format")
235 def testLoadInvalidFormat(self):
236 stream = io.BytesIO()
237 with self.assertRaisesRegex(RuntimeError, r"Unknown format \(bad_format\)"):
238 _ = gw.GenericWorkflow.load(stream, "bad_format")
240 def testValidate(self):
241 gwf = gw.GenericWorkflow("mytest")
242 gwf.add_job(self.job1)
243 gwf.add_job(self.job2)
244 job3 = gw.GenericWorkflowJob("job3", "label2")
245 gwf.add_job(job3)
246 gwf.add_job_relationships(["job1", "job2"], "job3")
247 # No exception should be raised
248 gwf.validate()
250 def testValidateGroups(self):
251 gwf = gtu.make_3_label_workflow_groups_sort("test_validate", final=True)
252 gwf.validate()
254 def testSavePickle(self):
255 # test save and load
256 gwf = gw.GenericWorkflow("mytest")
257 gwf.add_job(self.job1)
258 gwf.add_job(self.job2)
259 gwf.add_job_relationships("job1", "job2")
260 stream = io.BytesIO()
261 gwf.save(stream, "pickle")
262 stream.seek(0)
263 gwf2 = gw.GenericWorkflow.load(stream, "pickle")
264 self.assertTrue(
265 gtu.compare_generic_workflows(gwf, gwf2),
266 "Results do not match expected GenericWorkflow. See debug messages.",
267 )
269 def testDrawBadFormat(self):
270 gwf = gw.GenericWorkflow("mytest")
271 gwf.add_job(self.job1)
272 stream = io.BytesIO()
273 with self.assertRaisesRegex(RuntimeError, r"Unknown draw format \(bad_format\)"):
274 gwf.draw(stream, "bad_format")
276 def testLabels(self):
277 job3 = gw.GenericWorkflowJob("job3", "label2")
278 gwf = gw.GenericWorkflow("mytest")
279 gwf.add_job(self.job1)
280 gwf.add_job(self.job2)
281 gwf.add_job(job3)
282 gwf.add_job_relationships(["job1", "job2"], "job3")
283 self.assertListEqual(["label1", "label2"], gwf.labels)
285 def testRegenerateLabels(self):
286 job3 = gw.GenericWorkflowJob("job3", "label2")
287 gwf = gw.GenericWorkflow("mytest")
288 gwf.add_job(self.job1)
289 gwf.add_job(self.job2)
290 gwf.add_job(job3)
291 gwf.add_job_relationships(["job1", "job2"], "job3")
292 self.job1.label = "label1b"
293 self.job2.label = "label1b"
294 job3.label = "label2b"
295 gwf.regenerate_labels()
296 self.assertListEqual(["label1b", "label2b"], gwf.labels)
298 def testJobCounts(self):
299 gwf = gtu.make_3_label_workflow("test1", final=False)
300 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6})
301 self.assertEqual(gwf.job_counts, truth)
303 def testJobCountsFinal(self):
304 gwf = gtu.make_3_label_workflow("test1", final=True)
305 truth = Counter({"pipetaskInit": 1, "label1": 6, "label2": 6, "label3": 6, "finalJob": 1})
306 self.assertEqual(gwf.job_counts, truth)
308 def testDelJob(self):
309 job3 = gw.GenericWorkflowJob("job3", "label2")
310 gwf = gw.GenericWorkflow("mytest")
311 gwf.add_job(self.job1)
312 gwf.add_job(self.job2)
313 gwf.add_job(job3)
314 gwf.add_job_relationships(["job1", "job2"], "job3")
316 gwf.del_job("job2")
318 self.assertListEqual([("job1", "job3")], list(gwf.edges()))
319 self.assertEqual(Counter({"label1": 1, "label2": 1}), gwf.job_counts)
321 def testAddWorkflowSource(self):
322 job3 = gw.GenericWorkflowJob("job3", "label2")
323 gwf = gw.GenericWorkflow("mytest")
324 gwf.add_job(self.job1)
325 gwf.add_job(self.job2)
326 gwf.add_job(job3)
327 gwf.add_job_relationships(["job1", "job2"], "job3")
329 srcjob1 = gw.GenericWorkflowJob("srcjob1", "srclabel1")
330 srcjob1.executable = self.exec1
331 srcjob2 = gw.GenericWorkflowJob("srcjob2", "srclabel1")
332 srcjob2.executable = self.exec1
333 srcjob3 = gw.GenericWorkflowJob("srcjob3", "srclabel2")
334 srcjob3.executable = self.exec1
335 srcjob4 = gw.GenericWorkflowJob("srcjob4", "srclabel2")
336 srcjob4.executable = self.exec1
337 gwf2 = gw.GenericWorkflow("mytest2")
338 gwf2.add_job(srcjob1)
339 gwf2.add_job(srcjob2)
340 gwf2.add_job(srcjob3)
341 gwf2.add_job(srcjob4)
342 gwf2.add_job_relationships("srcjob1", "srcjob3")
343 gwf2.add_job_relationships("srcjob2", "srcjob4")
345 gwf.add_workflow_source(gwf2)
347 self.assertEqual(Counter({"srclabel1": 2, "srclabel2": 2, "label1": 1, "label2": 2}), gwf.job_counts)
348 self.assertListEqual(["srclabel1", "srclabel2", "label1", "label2"], gwf.labels)
349 self.assertListEqual(
350 sorted(
351 [
352 ("srcjob1", "srcjob3"),
353 ("srcjob2", "srcjob4"),
354 ("srcjob3", "job1"),
355 ("srcjob3", "job2"),
356 ("srcjob4", "job1"),
357 ("srcjob4", "job2"),
358 ("job1", "job3"),
359 ("job2", "job3"),
360 ]
361 ),
362 sorted(gwf.edges()),
363 )
365 def testGetJobsByLabel(self):
366 job3 = gw.GenericWorkflowJob("job3", "label3")
367 gwf = gw.GenericWorkflow("mytest")
368 gwf.add_job(self.job1)
369 gwf.add_job(self.job2)
370 gwf.add_job(job3)
371 gwf.add_job_relationships(["job1", "job2"], "job3")
373 self.assertListEqual([job3], gwf.get_jobs_by_label("label3"))
375 def testAddJobInvalidType(self):
376 @dataclasses.dataclass(slots=True)
377 class GenericWorkflowNodeNoInherit:
378 name: str
379 label: str
381 def __hash__(self):
382 return hash(self.name)
384 @property
385 def node_type(self):
386 return gw.GenericWorkflowNodeType.NOOP
388 job3 = GenericWorkflowNodeNoInherit("myname", "mylabel")
389 gwf = gw.GenericWorkflow("mytest")
390 gwf.add_job(self.job1)
391 gwf.add_job(self.job2)
392 with self.assertRaisesRegex(RuntimeError, "Invalid type for job to be added to GenericWorkflowGraph"):
393 gwf.add_job(job3)
395 def testGroupJobsByDependencies(self):
396 gwf = gtu.make_3_label_workflow("test1", final=True)
397 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "sink"}
398 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
399 job_groups = gwf._group_jobs_by_dependencies(
400 "order1", group_config, group_to_label_subgraph["order1"]
401 )
402 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)]))
404 def testGroupJobsByDependenciesSource(self):
405 gwf = gtu.make_3_label_workflow("test1", final=True)
406 group_config = {"labels": "label2, label3", "dimensions": "visit", "findDependencyMethod": "source"}
407 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
408 job_groups = gwf._group_jobs_by_dependencies(
409 "order1", group_config, group_to_label_subgraph["order1"]
410 )
411 self.assertEqual(sorted(job_groups.keys()), sorted([(10001,), (10002,), (301,)]))
413 def testGroupJobsByDependenciesBadMethod(self):
414 gwf = gtu.make_3_label_workflow("test1", final=True)
416 group_config = {
417 "labels": "label2, label3",
418 "dimensions": "visit",
419 "findDependencyMethod": "bad_method",
420 }
421 group_to_label_subgraph = gwf._check_job_ordering_config({"order1": group_config})
423 with self.assertRaisesRegex(RuntimeError, r"Invalid findDependencyMethod \(bad_method\)"):
424 _ = gwf._group_jobs_by_dependencies("order1", group_config, group_to_label_subgraph["order1"])
426 def testCheckJobOrderingConfigBadImplementation(self):
427 gwf = gtu.make_3_label_workflow("test1", final=True)
428 with self.assertRaisesRegex(RuntimeError, "Invalid implementation for"):
429 gwf._check_job_ordering_config(
430 {"order1": {"implementation": "bad", "labels": "label2", "dimensions": "visit"}}
431 )
433 def testCheckJobOrderingConfigBadType(self):
434 gwf = gtu.make_3_label_workflow("test1", final=True)
435 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"):
436 gwf._check_job_ordering_config(
437 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}}
438 )
440 def testCheckJobOrderingConfigBadLabel(self):
441 gwf = gtu.make_3_label_workflow("test_bad_label", final=True)
442 with self.assertRaisesRegex(
443 RuntimeError, "Job label label2 appears in more than one job ordering group"
444 ):
445 gwf._check_job_ordering_config(
446 {
447 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"},
448 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"},
449 }
450 )
452 def testCheckJobOrderingConfigUnusedLabel(self):
453 gwf = gtu.make_3_label_workflow("test_unused_label", final=True)
454 with self.assertLogs("lsst.ctrl.bps.generic_workflow", level=logging.WARNING) as cm:
455 gwf._check_job_ordering_config(
456 {
457 "order1": {
458 "ordering_type": "sort",
459 "labels": "label2,unused1,label3,unused2",
460 "dimensions": "visit",
461 },
462 }
463 )
464 self.assertTrue(
465 any(
466 "Job label(s) (unused1,unused2) from job ordering group order1 does not exist in workflow."
467 in record.getMessage()
468 for record in cm.records
469 ),
470 "Expected warning about unused labels",
471 )
473 def testCheckJobOrderingConfigMissingDim(self):
474 gwf = gtu.make_3_label_workflow("test_missing_dim", final=True)
475 with self.assertRaisesRegex(KeyError, "Missing dimensions entry in ordering group order1"):
476 gwf._check_job_ordering_config({"order1": {"ordering_type": "sort", "labels": "label2"}})
478 def testCheckJobOrderingConfigSort(self):
479 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True)
480 results = gwf._check_job_ordering_config(
481 {"order1": {"ordering_type": "sort", "labels": "label1,label2", "dimensions": "visit"}}
482 )
483 self.assertEqual(len(results), 1)
485 graph = networkx.DiGraph()
486 graph.add_nodes_from(["label1", "label2"])
487 graph.add_edges_from([("label1", "label2")])
488 self.assertTrue(networkx.is_isomorphic(results["order1"], graph, node_match=lambda x, y: x == y))
490 def testAddSpecialJobOrderingNoopSort(self):
491 # Also making sure noop jobs don't alter quanta_counts
492 gwf = gtu.make_3_label_workflow("test", final=True)
493 quanta_counts_before = gwf.quanta_counts
495 gwf.add_special_job_ordering(
496 {
497 "order1": {
498 "implementation": "noop",
499 "ordering_type": "sort",
500 "labels": "label1,label2",
501 "dimensions": "visit",
502 }
503 }
504 )
506 quanta_counts_after = gwf.quanta_counts
507 self.assertEqual(quanta_counts_after, quanta_counts_before)
509 gwf.regenerate_labels()
510 quanta_counts_after = gwf.quanta_counts
511 self.assertEqual(quanta_counts_after, quanta_counts_before)
513 gwf_noop = gtu.make_3_label_workflow_noop_sort("truth", final=True)
514 self.assertTrue(
515 gtu.compare_generic_workflows(gwf, gwf_noop),
516 "Results do not match expected GenericWorkflow. See debug messages.",
517 )
519 def testAddSpecialJobOrderingGroupSort(self):
520 gwf = gtu.make_3_label_workflow("test", final=True)
521 gwf.add_special_job_ordering(
522 {
523 "order1": {
524 "implementation": "group",
525 "ordering_type": "sort",
526 "labels": "label1,label2",
527 "dimensions": "visit",
528 }
529 }
530 )
532 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True)
533 self.assertTrue(
534 gtu.compare_generic_workflows(gwf, truth),
535 "Results do not match expected GenericWorkflow. See debug messages.",
536 )
538 def testAddSpecialJobOrderingGroupSortSink(self):
539 gwf = gtu.make_3_label_workflow("test_group_sort", final=True)
540 gwf.add_special_job_ordering(
541 {
542 "order1": {
543 "implementation": "group",
544 "ordering_type": "sort",
545 "findDependencyMethod": "sink",
546 "labels": "label1,label2",
547 "dimensions": "visit",
548 }
549 }
550 )
552 truth = gtu.make_3_label_workflow_groups_sort("truth", final=True)
553 self.assertTrue(
554 gtu.compare_generic_workflows(gwf, truth),
555 "Results do not match expected GenericWorkflow. See debug messages.",
556 )
558 def testMiddleGroupValues(self):
559 gwf = gtu.make_5_label_workflow("mid_group_even_values", True, False, True)
560 gwf.add_special_job_ordering(
561 {
562 "mid": {
563 "implementation": "group",
564 "ordering_type": "sort",
565 "labels": "T2, T2b, T3",
566 "dimensions": "visit",
567 }
568 }
569 )
570 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
571 self.assertTrue(
572 gtu.compare_generic_workflows(gwf, truth),
573 "Results do not match expected GenericWorkflow. See debug messages.",
574 )
576 def testMiddleGroupSink(self):
577 gwf = gtu.make_5_label_workflow("mid_group_even_sink", True, False, True)
578 gwf.add_special_job_ordering(
579 {
580 "mid": {
581 "implementation": "group",
582 "ordering_type": "sort",
583 "findDependencyMethod": "sink",
584 "labels": "T2, T2b, T3",
585 "dimensions": "visit",
586 }
587 }
588 )
589 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
590 self.assertTrue(
591 gtu.compare_generic_workflows(gwf, truth),
592 "Results do not match expected GenericWorkflow. See debug messages.",
593 )
595 def testMiddleGroupSource(self):
596 gwf = gtu.make_5_label_workflow("mid_group_even_source", True, False, True)
597 gwf.add_special_job_ordering(
598 {
599 "mid": {
600 "implementation": "group",
601 "ordering_type": "sort",
602 "findDependencyMethod": "source",
603 "labels": "T2, T2b, T3",
604 "dimensions": "visit",
605 }
606 }
607 )
608 truth = gtu.make_5_label_workflow_middle_groups("truth", True, False, True)
609 self.assertTrue(
610 gtu.compare_generic_workflows(gwf, truth),
611 "Results do not match expected GenericWorkflow. See debug messages.",
612 )
614 def testMiddleGroupValuesUneven(self):
615 gwf = gtu.make_5_label_workflow("mid_group_uneven_values", True, True, True)
616 gwf.add_special_job_ordering(
617 {
618 "mid": {
619 "implementation": "group",
620 "ordering_type": "sort",
621 "labels": "T2, T2b, T3",
622 "dimensions": "visit",
623 }
624 }
625 )
626 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True)
627 self.assertTrue(
628 gtu.compare_generic_workflows(gwf, truth),
629 "Results do not match expected GenericWorkflow. See debug messages.",
630 )
632 def testMiddleGroupSinkUneven(self):
633 gwf = gtu.make_5_label_workflow("mid_group_uneven_sink", True, True, True)
635 # Note: also checking changing blocking value from default
636 gwf.add_special_job_ordering(
637 {
638 "mid": {
639 "implementation": "group",
640 "ordering_type": "sort",
641 "findDependencyMethod": "sink",
642 "labels": "T2, T2b, T3",
643 "dimensions": "visit",
644 "blocking": True,
645 }
646 }
647 )
648 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True, True)
649 self.assertTrue(
650 gtu.compare_generic_workflows(gwf, truth),
651 "Results do not match expected GenericWorkflow. See debug messages.",
652 )
654 def testMiddleGroupSourceUneven(self):
655 gwf = gtu.make_5_label_workflow("mid_group_uneven_source", True, True, True)
656 gwf.add_special_job_ordering(
657 {
658 "mid": {
659 "implementation": "group",
660 "ordering_type": "sort",
661 "findDependencyMethod": "source",
662 "labels": "T2, T2b, T3",
663 "dimensions": "visit",
664 }
665 }
666 )
667 truth = gtu.make_5_label_workflow_middle_groups("truth", True, True, True)
668 self.assertTrue(
669 gtu.compare_generic_workflows(gwf, truth),
670 "Results do not match expected GenericWorkflow. See debug messages.",
671 )
673 def test2GroupsEven(self):
674 gwf = gtu.make_5_label_workflow("two_groups_even", True, False, True)
675 gwf.add_special_job_ordering(
676 {
677 "order1": {
678 "implementation": "group",
679 "ordering_type": "sort",
680 "findDependencyMethod": "sink",
681 "labels": "T1, T2",
682 "dimensions": "visit",
683 "equalDimensions": "visit:group",
684 "blocking": True,
685 },
686 "order2": {
687 "implementation": "group",
688 "ordering_type": "sort",
689 "findDependencyMethod": "source",
690 "labels": "T3, T4",
691 "dimensions": "visit",
692 "blocking": True,
693 },
694 }
695 )
697 truth = gtu.make_5_label_workflow_2_groups("truth", True, False, True, True)
698 self.assertTrue(
699 gtu.compare_generic_workflows(gwf, truth),
700 "Results do not match expected GenericWorkflow. See debug messages.",
701 )
703 def test2GroupsUneven(self):
704 gwf = gtu.make_5_label_workflow("two_groups_uneven", True, True, True)
705 gwf.add_special_job_ordering(
706 {
707 "order1": {
708 "implementation": "group",
709 "ordering_type": "sort",
710 "findDependencyMethod": "sink",
711 "labels": "T1, T2",
712 "dimensions": "visit",
713 "equalDimensions": "visit:group",
714 },
715 "order2": {
716 "implementation": "group",
717 "ordering_type": "sort",
718 "findDependencyMethod": "source",
719 "labels": "T3, T4",
720 "dimensions": "visit",
721 },
722 }
723 )
725 truth = gtu.make_5_label_workflow_2_groups("truth", True, True, True)
727 self.assertTrue(
728 gtu.compare_generic_workflows(gwf, truth),
729 "Results do not match expected GenericWorkflow. See debug messages.",
730 )
733class TestGenericWorkflowLabels(unittest.TestCase):
734 """Tests for GenericWorkflowLabels"""
736 def testEmptyLabels(self):
737 gwlabels = gw.GenericWorkflowLabels()
738 self.assertFalse(len(gwlabels.labels))
740 def testEmptyJobCounts(self):
741 gwlabels = gw.GenericWorkflowLabels()
742 self.assertFalse(len(gwlabels.job_counts))
744 def testEmptyGetJobsByLabel(self):
745 gwlabels = gw.GenericWorkflowLabels()
746 self.assertFalse(len(gwlabels.get_jobs_by_label("label1")))
748 def testAddJobFirst(self):
749 gwlabels = gw.GenericWorkflowLabels()
750 job = gw.GenericWorkflowJob("job1", "label1")
751 gwlabels.add_job(job, [], [])
752 self.assertEqual(gwlabels._label_to_jobs["label1"], [job])
753 self.assertIn("label1", gwlabels._label_graph)
755 def testAddJobMult(self):
756 gwlabels = gw.GenericWorkflowLabels()
757 job1 = gw.GenericWorkflowJob("job1", "label1")
758 job2 = gw.GenericWorkflowJob("job2", "label2")
759 job3 = gw.GenericWorkflowJob("job3", "label2")
760 job4 = gw.GenericWorkflowJob("job4", "label3")
761 gwlabels.add_job(job1, [], [])
762 gwlabels.add_job(job2, ["label1"], [])
763 gwlabels.add_job(job3, ["label1"], [])
764 gwlabels.add_job(job4, ["label2"], [])
765 self.assertListEqual(gwlabels._label_to_jobs["label1"], [job1])
766 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job2, job3])
767 self.assertListEqual(gwlabels._label_to_jobs["label3"], [job4])
768 self.assertIn("label1", gwlabels._label_graph)
769 self.assertIn("label2", gwlabels._label_graph)
770 self.assertIn("label3", gwlabels._label_graph)
771 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"])
772 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"])
774 def testDelJobRemain(self):
775 # Test that can delete one of multiple jobs with same label
776 gwlabels = gw.GenericWorkflowLabels()
777 job1 = gw.GenericWorkflowJob("job1", "label1")
778 gwlabels.add_job(job1, [], [])
779 job2 = gw.GenericWorkflowJob("job2", "label2")
780 gwlabels.add_job(job2, ["label1"], [])
781 job3 = gw.GenericWorkflowJob("job3", "label2")
782 gwlabels.add_job(job3, ["label1"], [])
783 job4 = gw.GenericWorkflowJob("job4", "label3")
784 gwlabels.add_job(job4, ["label2"], [])
786 gwlabels.del_job(job2)
787 self.assertListEqual(gwlabels._label_to_jobs["label2"], [job3])
788 self.assertIn("label2", gwlabels._label_graph)
789 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label2"])
790 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label3"])
792 def testDelJobLast(self):
793 # Test when removing only job with a label
794 gwlabels = gw.GenericWorkflowLabels()
795 job1 = gw.GenericWorkflowJob("job1", "label1")
796 gwlabels.add_job(job1, [], [])
797 job2 = gw.GenericWorkflowJob("job2", "label2")
798 gwlabels.add_job(job2, [], [])
799 job3 = gw.GenericWorkflowJob("job3", "label3")
800 gwlabels.add_job(job3, ["label1", "label2"], [])
801 job4 = gw.GenericWorkflowJob("job4", "label4")
802 gwlabels.add_job(job4, ["label3"], [])
803 job5 = gw.GenericWorkflowJob("job5", "label5")
804 gwlabels.add_job(job5, ["label3"], [])
806 gwlabels.del_job(job3)
807 self.assertNotIn("label3", gwlabels._label_to_jobs)
808 self.assertNotIn("label3", gwlabels._label_graph)
809 self.assertEqual(list(gwlabels._label_graph.successors("label1")), ["label4", "label5"])
810 self.assertEqual(list(gwlabels._label_graph.successors("label2")), ["label4", "label5"])
812 def testAddSpecialJobOrderingBadType(self):
813 gwf = gtu.make_3_label_workflow("test_sort", final=True)
814 with self.assertRaisesRegex(RuntimeError, "Invalid ordering_type for"):
815 gwf.add_special_job_ordering(
816 {"order1": {"ordering_type": "badtype", "labels": "label2", "dimensions": "visit"}}
817 )
819 def testAddSpecialJobOrderingBadLabel(self):
820 gwf = gtu.make_3_label_workflow("test_bad_label", final=True)
822 with self.assertRaisesRegex(
823 RuntimeError, "Job label label2 appears in more than one job ordering group"
824 ):
825 gwf.add_special_job_ordering(
826 {
827 "order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "visit"},
828 "order2": {"ordering_type": "sort", "labels": "label2,label3", "dimensions": "visit"},
829 }
830 )
832 def testAddSpecialJobOrderingBadDim(self):
833 gwf = gtu.make_3_label_workflow("test_bad_dim", final=True)
835 with self.assertRaisesRegex(
836 KeyError, r"Job label2_10001_10 missing dimensions \(notthere\) required for order group order1"
837 ):
838 gwf.add_special_job_ordering(
839 {"order1": {"ordering_type": "sort", "labels": "label2", "dimensions": "notthere"}}
840 )
842 def testAddSpecialJobOrderingSort(self):
843 gwf = gtu.make_3_label_workflow("test_sort", final=True)
844 quanta_counts_before = gwf.quanta_counts
846 gwf.add_special_job_ordering(
847 {
848 "order1": {
849 "implementation": "noop",
850 "ordering_type": "sort",
851 "labels": "label2",
852 "dimensions": "visit",
853 }
854 }
855 )
857 quanta_counts_after = gwf.quanta_counts
858 self.assertEqual(quanta_counts_after, quanta_counts_before)
860 gwf.regenerate_labels()
861 quanta_counts_after = gwf.quanta_counts
862 self.assertEqual(quanta_counts_after, quanta_counts_before)
864 gwf_edges = gwf.edges
865 for edge in [
866 ("label2_301_10", "noop_order1_301"),
867 ("label2_301_11", "noop_order1_301"),
868 ("noop_order1_301", "label2_10001_10"),
869 ("noop_order1_301", "label2_10001_11"),
870 ("label2_10001_10", "noop_order1_10001"),
871 ("label2_10001_11", "noop_order1_10001"),
872 ("noop_order1_10001", "label2_10002_10"),
873 ("noop_order1_10001", "label2_10002_11"),
874 ]:
875 self.assertIn(edge, gwf_edges, f"Missing edge from {edge[0]} to {edge[1]}")
878if __name__ == "__main__":
879 unittest.main()