Coverage for tests / test_pipelineIR.py: 14%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import os
29import tempfile
30import textwrap
31import unittest
33import lsst.utils.tests
34from lsst.pipe.base.pipelineIR import ConfigIR, PipelineIR, PipelineSubsetCtrl
36# Find where the test pipelines exist and store it in an environment variable.
37os.environ["TESTDIR"] = os.path.dirname(__file__)
40class ConfigIRTestCase(unittest.TestCase):
41 """A test case for ConfigIR Objects.
43 ConfigIR contains a method that is not exercised by the PipelineIR task,
44 so it should be tested here.
45 """
47 def testMergeConfig(self):
48 # Create some configs to merge
49 config1 = ConfigIR(
50 python="config.foo=6", dataId={"visit": 7}, file=["test1.py"], rest={"a": 1, "b": 2}
51 )
52 config2 = ConfigIR(python=None, dataId=None, file=["test2.py"], rest={"c": 1, "d": 2})
53 config3 = ConfigIR(python="config.bar=7", dataId=None, file=["test3.py"], rest={"c": 1, "d": 2})
54 config4 = ConfigIR(python=None, dataId=None, file=["test4.py"], rest={"c": 3, "e": 4})
55 config5 = ConfigIR(rest={"f": 5, "g": 6})
56 config6 = ConfigIR(rest={"h": 7, "i": 8})
57 config7 = ConfigIR(rest={"h": 9})
59 # Merge configs with different dataIds, this should yield two elements
60 self.assertEqual(list(config1.maybe_merge(config2)), [config1, config2])
62 # Merge configs with python blocks defined, this should yield two
63 # elements
64 self.assertEqual(list(config1.maybe_merge(config3)), [config1, config3])
66 # Merge configs with file defined, this should yield two elements
67 self.assertEqual(list(config2.maybe_merge(config4)), [config2, config4])
69 # merge config2 into config1
70 merge_result = list(config5.maybe_merge(config6))
71 self.assertEqual(len(merge_result), 1)
72 self.assertEqual(config5.rest, {"f": 5, "g": 6, "h": 7, "i": 8})
74 # Cant merge configs with shared keys
75 self.assertEqual(list(config6.maybe_merge(config7)), [config6, config7])
78class PipelineIRTestCase(unittest.TestCase):
79 """A test case for PipelineIR objects."""
81 def testPipelineIRInitChecks(self):
82 # Missing description
83 pipeline_str = """
84 tasks:
85 a: module.A
86 """
87 with self.assertRaises(ValueError):
88 PipelineIR.from_string(pipeline_str)
90 # Missing tasks
91 pipeline_str = """
92 description: Test Pipeline
93 """
94 with self.assertRaises(ValueError):
95 PipelineIR.from_string(pipeline_str)
97 # This should raise a FileNotFoundError, as there are imported defined
98 # so the __init__ method should pass but the imported file does not
99 # exist
100 pipeline_str = textwrap.dedent(
101 """
102 description: Test Pipeline
103 imports: /dummy_pipeline.yaml
104 """
105 )
107 with self.assertRaises(FileNotFoundError):
108 PipelineIR.from_string(pipeline_str)
110 def testTaskParsing(self):
111 # Should be able to parse a task defined both ways
112 pipeline_str = textwrap.dedent(
113 """
114 description: Test Pipeline
115 tasks:
116 modA: test.modA
117 modB:
118 class: test.modB
119 """
120 )
122 pipeline = PipelineIR.from_string(pipeline_str)
123 self.assertEqual(list(pipeline.tasks.keys()), ["modA", "modB"])
124 self.assertEqual([t.klass for t in pipeline.tasks.values()], ["test.modA", "test.modB"])
126 def testImportParsing(self):
127 # This should raise, as the two pipelines, both define the same label
128 pipeline_str = textwrap.dedent(
129 """
130 description: Test Pipeline
131 imports:
132 - $TESTDIR/pipelines/testPipeline1.yaml
133 - $TESTDIR/pipelines/testPipeline2.yaml
134 """
135 )
136 # "modA" is the duplicated label, and it should appear in the error.
137 with self.assertRaisesRegex(ValueError, "modA"):
138 PipelineIR.from_string(pipeline_str)
140 # This should pass, as the conflicting task is excluded
141 pipeline_str = textwrap.dedent(
142 """
143 description: Test Pipeline
144 imports:
145 - location: $TESTDIR/pipelines/testPipeline1.yaml
146 exclude: modA
147 - $TESTDIR/pipelines/testPipeline2.yaml
148 """
149 )
150 pipeline = PipelineIR.from_string(pipeline_str)
151 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modB"})
153 # This should pass, as the conflicting task is no in includes
154 pipeline_str = textwrap.dedent(
155 """
156 description: Test Pipeline
157 imports:
158 - location: $TESTDIR/pipelines/testPipeline1.yaml
159 include: modB
160 labeledSubsetModifyMode: DROP
161 - $TESTDIR/pipelines/testPipeline2.yaml
162 """
163 )
165 pipeline = PipelineIR.from_string(pipeline_str)
166 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modB"})
168 # Test that you can't include and exclude a task
169 pipeline_str = textwrap.dedent(
170 """
171 description: Test Pipeline
172 imports:
173 - location: $TESTDIR/pipelines/testPipeline1.yaml
174 exclude: modA
175 include: modB
176 labeledSubsetModifyMode: EDIT
177 - $TESTDIR/pipelines/testPipeline2.yaml
178 """
179 )
181 with self.assertRaises(ValueError):
182 PipelineIR.from_string(pipeline_str)
184 # Test that you can rename a task
185 pipeline_str = textwrap.dedent(
186 """
187 description: Test Pipeline
188 imports:
189 - location: $TESTDIR/pipelines/testPipeline1.yaml
190 rename:
191 modB: modZ
192 labeledSubsetModifyMode: EDIT
193 """
194 )
196 pipeline = PipelineIR.from_string(pipeline_str)
197 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modZ"})
199 # Test that renaming and including work together
200 pipeline_str = textwrap.dedent(
201 """
202 description: Test Pipeline
203 imports:
204 - location: $TESTDIR/pipelines/testPipeline1.yaml
205 include: modB
206 rename:
207 modB: modZ
208 labeledSubsetModifyMode: EDIT
209 """
210 )
212 pipeline = PipelineIR.from_string(pipeline_str)
213 self.assertEqual(
214 set(pipeline.tasks.keys()),
215 {
216 "modZ",
217 },
218 )
220 # Test that renaming an excluded label works, although perhaps it
221 # should emit a warning
222 pipeline_str = textwrap.dedent(
223 """
224 description: Test Pipeline
225 imports:
226 - location: $TESTDIR/pipelines/testPipeline1.yaml
227 exclude: modB
228 rename:
229 modB: modZ
230 labeledSubsetModifyMode: EDIT
231 """
232 )
234 pipeline = PipelineIR.from_string(pipeline_str)
235 self.assertEqual(
236 set(pipeline.tasks.keys()),
237 {
238 "modA",
239 },
240 )
242 # Test that you can't rename a task to an existing task
243 pipeline_str = textwrap.dedent(
244 """
245 description: Test Pipeline
246 imports:
247 - location: $TESTDIR/pipelines/testPipeline1.yaml
248 rename:
249 modB: modA
250 labeledSubsetModifyMode: EDIT
251 """
252 )
254 with self.assertRaises(ValueError):
255 PipelineIR.from_string(pipeline_str)
257 # Test that you can't rename two tasks to the same new name
258 pipeline_str = textwrap.dedent(
259 """
260 description: Test Pipeline
261 imports:
262 - location: $TESTDIR/pipelines/testPipeline1.yaml
263 rename:
264 modA: modC
265 modB: modC
266 labeledSubsetModifyMode: EDIT
267 """
268 )
270 with self.assertRaises(ValueError):
271 PipelineIR.from_string(pipeline_str)
273 # Test that you can't rename a task back to its old name
274 # This could work but is confusing and pointless
275 pipeline_str = textwrap.dedent(
276 """
277 description: Test Pipeline
278 imports:
279 - location: $TESTDIR/pipelines/testPipeline1.yaml
280 rename:
281 modB: modC
282 modC: modB
283 labeledSubsetModifyMode: EDIT
284 """
285 )
287 with self.assertRaises(ValueError):
288 PipelineIR.from_string(pipeline_str)
290 # Test unknown labeledSubsetModifyModes raise
291 pipeline_str = textwrap.dedent(
292 """
293 description: Test Pipeline
294 imports:
295 - location: $TESTDIR/pipelines/testPipeline1.yaml
296 exclude: modA
297 include: modB
298 labeledSubsetModifyMode: WRONG
299 - $TESTDIR/pipelines/testPipeline2.yaml
300 """
301 )
302 with self.assertRaises(ValueError):
303 PipelineIR.from_string(pipeline_str)
305 # Test that contracts are imported
306 pipeline_str = textwrap.dedent(
307 """
308 description: Test Pipeline
309 imports:
310 - $TESTDIR/pipelines/testPipeline1.yaml
311 """
312 )
314 pipeline = PipelineIR.from_string(pipeline_str)
315 self.assertEqual(pipeline.contracts[0].contract, "modA.b == modA.c")
317 # Test that contracts are not imported
318 pipeline_str = textwrap.dedent(
319 """
320 description: Test Pipeline
321 imports:
322 - location: $TESTDIR/pipelines/testPipeline1.yaml
323 importContracts: False
324 """
325 )
327 pipeline = PipelineIR.from_string(pipeline_str)
328 self.assertEqual(pipeline.contracts, [])
330 # Test that configs are imported when defining the same task again
331 # with the same label
332 pipeline_str = textwrap.dedent(
333 """
334 description: Test Pipeline
335 imports:
336 - $TESTDIR/pipelines/testPipeline2.yaml
337 tasks:
338 modA:
339 class: "test.moduleA"
340 config:
341 value2: 2
342 """
343 )
344 pipeline = PipelineIR.from_string(pipeline_str)
345 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"value1": 1, "value2": 2})
347 # Test that configs are not imported when redefining the task
348 # associated with a label
349 pipeline_str = textwrap.dedent(
350 """
351 description: Test Pipeline
352 imports:
353 - $TESTDIR/pipelines/testPipeline2.yaml
354 tasks:
355 modA:
356 class: "test.moduleAReplace"
357 config:
358 value2: 2
359 """
360 )
361 pipeline = PipelineIR.from_string(pipeline_str)
362 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"value2": 2})
364 # Test that named subsets are imported
365 pipeline_str = textwrap.dedent(
366 """
367 description: Test Pipeline
368 imports:
369 - $TESTDIR/pipelines/testPipeline2.yaml
370 """
371 )
372 pipeline = PipelineIR.from_string(pipeline_str)
373 self.assertEqual(pipeline.labeled_subsets.keys(), {"modSubset"})
374 self.assertEqual(pipeline.labeled_subsets["modSubset"].subset, {"modA"})
376 # Test that imported and redeclaring a named subset works
377 pipeline_str = textwrap.dedent(
378 """
379 description: Test Pipeline
380 imports:
381 - $TESTDIR/pipelines/testPipeline2.yaml
382 tasks:
383 modE: "test.moduleE"
384 subsets:
385 modSubset:
386 - modE
387 """
388 )
389 pipeline = PipelineIR.from_string(pipeline_str)
390 self.assertEqual(pipeline.labeled_subsets.keys(), {"modSubset"})
391 self.assertEqual(pipeline.labeled_subsets["modSubset"].subset, {"modE"})
393 # Test that imported from two pipelines that both declare a named
394 # subset with the same name fails
395 pipeline_str = textwrap.dedent(
396 """
397 description: Test Pipeline
398 imports:
399 - $TESTDIR/pipelines/testPipeline2.yaml
400 - $TESTDIR/pipelines/testPipeline3.yaml
401 """
402 )
403 with self.assertRaises(ValueError):
404 PipelineIR.from_string(pipeline_str)
406 # Test that imported a named subset that duplicates a label declared
407 # in this pipeline fails
408 pipeline_str = textwrap.dedent(
409 """
410 description: Test Pipeline
411 imports:
412 - $TESTDIR/pipelines/testPipeline2.yaml
413 tasks:
414 modSubset: "test.moduleE"
415 """
416 )
417 with self.assertRaises(ValueError):
418 PipelineIR.from_string(pipeline_str)
420 # Test that imported fails if a named subset and task label conflict
421 pipeline_str = textwrap.dedent(
422 """
423 description: Test Pipeline
424 imports:
425 - $TESTDIR/pipelines/testPipeline2.yaml
426 - $TESTDIR/pipelines/testPipeline4.yaml
427 """
428 )
429 with self.assertRaises(ValueError):
430 PipelineIR.from_string(pipeline_str)
432 # Test that importing Pipelines with different step definitions fails
433 pipeline_str = textwrap.dedent(
434 """
435 description: Test Pipeline
436 imports:
437 - $TESTDIR/pipelines/testPipeline5.yaml
438 steps:
439 - label: sub1
440 dimensions: ['a', 'e']
441 """
442 )
443 with self.assertRaises(ValueError):
444 PipelineIR.from_string(pipeline_str)
446 # Test that it does not fail if steps are excluded
447 pipeline_str = textwrap.dedent(
448 """
449 description: Test Pipeline
450 imports:
451 - location: $TESTDIR/pipelines/testPipeline5.yaml
452 importSteps: false
453 steps:
454 - label: sub1
455 dimensions: ['a', 'e']
456 """
457 )
458 PipelineIR.from_string(pipeline_str)
460 # Test that importing does work
461 pipeline_str = textwrap.dedent(
462 """
463 description: Test Pipeline
464 imports:
465 - location: $TESTDIR/pipelines/testPipeline5.yaml
466 """
467 )
468 pipeline = PipelineIR.from_string(pipeline_str)
469 self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})
471 def testSteps(self):
472 # Test that steps definitions are created
473 pipeline_str = textwrap.dedent(
474 """
475 description: Test Pipeline
476 tasks:
477 modA: "test.moduleA"
478 modB: "test.moduleB"
479 subsets:
480 sub1:
481 subset:
482 - modA
483 - modB
484 sub2:
485 subset:
486 - modA
487 steps:
488 - label: sub1
489 dimensions: ['a', 'b']
490 - label: sub2
491 dimensions: ['a', 'b']
492 """
493 )
494 pipeline = PipelineIR.from_string(pipeline_str)
495 self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})
497 # Test that steps definitions must be unique
498 pipeline_str = textwrap.dedent(
499 """
500 description: Test Pipeline
501 tasks:
502 modA: "test.moduleA"
503 modB: "test.moduleB"
504 subsets:
505 sub1:
506 subset:
507 - modA
508 - modB
509 sub2:
510 subset:
511 - modA
512 steps:
513 - label: sub1
514 dimensions: ['a', 'b']
515 - label: sub1
516 dimensions: ['a', 'b']
517 """
518 )
519 with self.assertRaises(ValueError):
520 pipeline = PipelineIR.from_string(pipeline_str)
522 def testReadParameters(self):
523 # verify that parameters section are read in from a pipeline
524 pipeline_str = textwrap.dedent(
525 """
526 description: Test Pipeline
527 parameters:
528 value1: A
529 value2: B
530 tasks:
531 modA: ModuleA
532 """
533 )
534 pipeline = PipelineIR.from_string(pipeline_str)
535 self.assertEqual(pipeline.parameters.mapping, {"value1": "A", "value2": "B"})
537 def testTaskParameterLabel(self):
538 # verify that "parameters" cannot be used as a task label
539 pipeline_str = textwrap.dedent(
540 """
541 description: Test Pipeline
542 tasks:
543 parameters: modA
544 """
545 )
546 with self.assertRaises(ValueError):
547 PipelineIR.from_string(pipeline_str)
549 def testParameterImporting(self):
550 # verify that importing parameters happens correctly
551 pipeline_str = textwrap.dedent(
552 """
553 description: Test Pipeline
554 imports:
555 - $TESTDIR/pipelines/testPipeline1.yaml
556 - location: $TESTDIR/pipelines/testPipeline2.yaml
557 exclude:
558 - modA
560 parameters:
561 value4: valued
562 """
563 )
564 pipeline = PipelineIR.from_string(pipeline_str)
565 self.assertEqual(
566 pipeline.parameters.mapping,
567 {"value4": "valued", "value1": "valueNew", "value2": "valueB", "value3": "valueC"},
568 )
570 def testImportingInstrument(self):
571 # verify an instrument is imported, or ignored, (Or otherwise modified
572 # for potential future use)
573 pipeline_str = textwrap.dedent(
574 """
575 description: Test Pipeline
576 imports:
577 - $TESTDIR/pipelines/testPipeline1.yaml
578 """
579 )
580 pipeline = PipelineIR.from_string(pipeline_str)
581 self.assertEqual(pipeline.instrument, "test.instrument")
583 # verify that an imported pipeline can have its instrument set to None
584 pipeline_str = textwrap.dedent(
585 """
586 description: Test Pipeline
587 imports:
588 - location: $TESTDIR/pipelines/testPipeline1.yaml
589 instrument: None
590 """
591 )
592 pipeline = PipelineIR.from_string(pipeline_str)
593 self.assertEqual(pipeline.instrument, None)
595 # verify that an imported pipeline can have its instrument modified
596 pipeline_str = textwrap.dedent(
597 """
598 description: Test Pipeline
599 imports:
600 - location: $TESTDIR/pipelines/testPipeline1.yaml
601 instrument: new.instrument
602 """
603 )
604 pipeline = PipelineIR.from_string(pipeline_str)
605 self.assertEqual(pipeline.instrument, "new.instrument")
607 # Test that multiple instruments can't be defined,
608 # and that the error message tells you what instruments were found.
609 pipeline_str = textwrap.dedent(
610 """
611 description: Test Pipeline
612 instrument: new.instrument
613 imports:
614 - location: $TESTDIR/pipelines/testPipeline1.yaml
615 """
616 )
617 with self.assertRaisesRegex(ValueError, "new.instrument .* test.instrument."):
618 PipelineIR.from_string(pipeline_str)
620 def testParameterConfigFormatting(self):
621 # verify that a config properly is formatted with parameters
622 pipeline_str = textwrap.dedent(
623 """
624 description: Test Pipeline
625 parameters:
626 value1: A
627 tasks:
628 modA:
629 class: ModuleA
630 config:
631 testKey: parameters.value1
632 """
633 )
634 pipeline = PipelineIR.from_string(pipeline_str)
635 newConfig = pipeline.tasks["modA"].config[0].formatted(pipeline.parameters)
636 self.assertEqual(newConfig.rest["testKey"], "A")
638 def testReadContracts(self):
639 # Verify that contracts are read in from a pipeline
640 location = "$TESTDIR/pipelines/testPipeline1.yaml"
641 pipeline = PipelineIR.from_uri(location)
642 self.assertEqual(pipeline.contracts[0].contract, "modA.b == modA.c")
644 # Verify that a contract message is loaded
645 pipeline_str = textwrap.dedent(
646 """
647 description: Test Pipeline
648 tasks:
649 modA: test.modA
650 modB:
651 class: test.modB
652 contracts:
653 - contract: modA.foo == modB.Bar
654 msg: "Test message"
655 """
656 )
658 pipeline = PipelineIR.from_string(pipeline_str)
659 self.assertEqual(pipeline.contracts[0].msg, "Test message")
661 def testReadNamedSubsets(self):
662 pipeline_str = textwrap.dedent(
663 """
664 description: Test Pipeline
665 tasks:
666 modA: test.modA
667 modB:
668 class: test.modB
669 modC: test.modC
670 modD: test.modD
671 subsets:
672 subset1:
673 - modA
674 - modB
675 subset2:
676 subset:
677 - modC
678 - modD
679 description: "A test named subset"
680 """
681 )
682 pipeline = PipelineIR.from_string(pipeline_str)
683 self.assertEqual(pipeline.labeled_subsets.keys(), {"subset1", "subset2"})
685 self.assertEqual(pipeline.labeled_subsets["subset1"].subset, {"modA", "modB"})
686 self.assertEqual(pipeline.labeled_subsets["subset1"].description, None)
688 self.assertEqual(pipeline.labeled_subsets["subset2"].subset, {"modC", "modD"})
689 self.assertEqual(pipeline.labeled_subsets["subset2"].description, "A test named subset")
691 # verify that forgetting a subset key is an error
692 pipeline_str = textwrap.dedent(
693 """
694 description: Test Pipeline
695 tasks:
696 modA: test.modA
697 modB:
698 class: test.modB
699 modC: test.modC
700 modD: test.modD
701 subsets:
702 subset2:
703 sub:
704 - modC
705 - modD
706 description: "A test named subset"
707 """
708 )
709 with self.assertRaises(ValueError):
710 PipelineIR.from_string(pipeline_str)
712 # verify putting a label in a named subset that is not in the task is
713 # an error
714 pipeline_str = textwrap.dedent(
715 """
716 description: Test Pipeline
717 tasks:
718 modA: test.modA
719 modB:
720 class: test.modB
721 modC: test.modC
722 modD: test.modD
723 subsets:
724 subset2:
725 - modC
726 - modD
727 - modE
728 """
729 )
730 with self.assertRaises(ValueError):
731 PipelineIR.from_string(pipeline_str)
733 def testSubsettingPipeline(self):
734 pipeline_str = textwrap.dedent(
735 """
736 description: Test Pipeline
737 tasks:
738 modA: test.modA
739 modB:
740 class: test.modB
741 modC: test.modC
742 modD: test.modD
743 subsets:
744 subset1:
745 - modA
746 - modB
747 subset2:
748 subset:
749 - modC
750 - modD
751 description: "A test named subset"
752 """
753 )
754 pipeline = PipelineIR.from_string(pipeline_str)
755 # verify that creating a pipeline subset with the default drop behavior
756 # removes any labeled subset that contains a label not in the set of
757 # all task labels.
758 pipelineSubset1 = pipeline.subset_from_labels({"modA", "modB", "modC"})
759 self.assertEqual(pipelineSubset1.labeled_subsets.keys(), {"subset1"})
760 # verify that creating a pipeline subset with the edit behavior
761 # edits any labeled subset that contains a label not in the set of
762 # all task labels.
763 pipelineSubset2 = pipeline.subset_from_labels({"modA", "modB", "modC"}, PipelineSubsetCtrl.EDIT)
764 self.assertEqual(pipelineSubset2.labeled_subsets.keys(), {"subset1", "subset2"})
765 self.assertEqual(pipelineSubset2.labeled_subsets["subset2"].subset, {"modC"})
767 def testInstrument(self):
768 # Verify that if instrument is defined it is parsed out
769 pipeline_str = textwrap.dedent(
770 """
771 description: Test Pipeline
772 instrument: dummyCam
773 tasks:
774 modA: test.moduleA
775 """
776 )
778 pipeline = PipelineIR.from_string(pipeline_str)
779 self.assertEqual(pipeline.instrument, "dummyCam")
781 def testReadTaskConfig(self):
782 # Verify that a task with a config is read in correctly
783 pipeline_str = textwrap.dedent(
784 """
785 description: Test Pipeline
786 tasks:
787 modA:
788 class: test.moduleA
789 config:
790 propertyA: 6
791 propertyB: 7
792 file: testfile.py
793 python: "config.testDict['a'] = 9"
794 """
795 )
797 pipeline = PipelineIR.from_string(pipeline_str)
798 self.assertEqual(pipeline.tasks["modA"].config[0].file, ["testfile.py"])
799 self.assertEqual(pipeline.tasks["modA"].config[0].python, "config.testDict['a'] = 9")
800 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"propertyA": 6, "propertyB": 7})
802 # Verify that multiple files are read fine
803 pipeline_str = textwrap.dedent(
804 """
805 description: Test Pipeline
806 tasks:
807 modA:
808 class: test.moduleA
809 config:
810 file:
811 - testfile.py
812 - otherFile.py
813 """
814 )
816 pipeline = PipelineIR.from_string(pipeline_str)
817 self.assertEqual(pipeline.tasks["modA"].config[0].file, ["testfile.py", "otherFile.py"])
819 # Test reading multiple Config entries
820 pipeline_str = textwrap.dedent(
821 """
822 description: Test Pipeline
823 tasks:
824 modA:
825 class: test.moduleA
826 config:
827 - propertyA: 6
828 propertyB: 7
829 dataId: {"visit": 6}
830 - propertyA: 8
831 propertyB: 9
832 """
833 )
835 pipeline = PipelineIR.from_string(pipeline_str)
836 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"propertyA": 6, "propertyB": 7})
837 self.assertEqual(pipeline.tasks["modA"].config[0].dataId, {"visit": 6})
838 self.assertEqual(pipeline.tasks["modA"].config[1].rest, {"propertyA": 8, "propertyB": 9})
839 self.assertEqual(pipeline.tasks["modA"].config[1].dataId, None)
841 def testSerialization(self):
842 # Test creating a pipeline, writing it to a file, reading the file
843 pipeline_str = textwrap.dedent(
844 """
845 description: Test Pipeline
846 instrument: dummyCam
847 imports:
848 - location: $TESTDIR/pipelines/testPipeline1.yaml
849 instrument: None
850 tasks:
851 modC:
852 class: test.moduleC
853 config:
854 - propertyA: 6
855 propertyB: 7
856 dataId: {"visit": 6}
857 - propertyA: 8
858 propertyB: 9
859 modD: test.moduleD
860 contracts:
861 - modA.foo == modB.bar
862 subsets:
863 subA:
864 - modA
865 - modC
866 """
867 )
869 pipeline = PipelineIR.from_string(pipeline_str)
871 # Create the temp file, write and read
872 with tempfile.NamedTemporaryFile() as tf:
873 pipeline.write_to_uri(tf.name)
874 loaded_pipeline = PipelineIR.from_uri(tf.name)
875 self.assertEqual(pipeline, loaded_pipeline)
877 def testPipelineYamlLoader(self):
878 # Tests that an exception is thrown in the case a key is used multiple
879 # times in a given scope within a pipeline file
880 pipeline_str = textwrap.dedent(
881 """
882 description: Test Pipeline
883 tasks:
884 modA: test1
885 modB: test2
886 modA: test3
887 """
888 )
889 self.assertRaises(KeyError, PipelineIR.from_string, pipeline_str)
891 def testMultiLineStrings(self):
892 """Test that multi-line strings in pipelines are written with
893 '|' continuation-syntax instead of explicit newlines.
894 """
895 pipeline_ir = PipelineIR({"description": "Line 1\nLine2\n", "tasks": {"modA": "task1"}})
896 string = str(pipeline_ir)
897 self.assertIn("|", string)
898 self.assertNotIn(r"\n", string)
901class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
902 """Run file leak tests."""
905def setup_module(module):
906 """Configure pytest."""
907 lsst.utils.tests.init()
910if __name__ == "__main__":
911 lsst.utils.tests.init()
912 unittest.main()