Coverage for tests / test_pipelineIR.py: 14%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import os 

29import tempfile 

30import textwrap 

31import unittest 

32 

33import lsst.utils.tests 

34from lsst.pipe.base.pipelineIR import ConfigIR, PipelineIR, PipelineSubsetCtrl 

35 

36# Find where the test pipelines exist and store it in an environment variable. 

37os.environ["TESTDIR"] = os.path.dirname(__file__) 

38 

39 

40class ConfigIRTestCase(unittest.TestCase): 

41 """A test case for ConfigIR Objects. 

42 

43 ConfigIR contains a method that is not exercised by the PipelineIR task, 

44 so it should be tested here. 

45 """ 

46 

47 def testMergeConfig(self): 

48 # Create some configs to merge 

49 config1 = ConfigIR( 

50 python="config.foo=6", dataId={"visit": 7}, file=["test1.py"], rest={"a": 1, "b": 2} 

51 ) 

52 config2 = ConfigIR(python=None, dataId=None, file=["test2.py"], rest={"c": 1, "d": 2}) 

53 config3 = ConfigIR(python="config.bar=7", dataId=None, file=["test3.py"], rest={"c": 1, "d": 2}) 

54 config4 = ConfigIR(python=None, dataId=None, file=["test4.py"], rest={"c": 3, "e": 4}) 

55 config5 = ConfigIR(rest={"f": 5, "g": 6}) 

56 config6 = ConfigIR(rest={"h": 7, "i": 8}) 

57 config7 = ConfigIR(rest={"h": 9}) 

58 

59 # Merge configs with different dataIds, this should yield two elements 

60 self.assertEqual(list(config1.maybe_merge(config2)), [config1, config2]) 

61 

62 # Merge configs with python blocks defined, this should yield two 

63 # elements 

64 self.assertEqual(list(config1.maybe_merge(config3)), [config1, config3]) 

65 

66 # Merge configs with file defined, this should yield two elements 

67 self.assertEqual(list(config2.maybe_merge(config4)), [config2, config4]) 

68 

69 # merge config2 into config1 

70 merge_result = list(config5.maybe_merge(config6)) 

71 self.assertEqual(len(merge_result), 1) 

72 self.assertEqual(config5.rest, {"f": 5, "g": 6, "h": 7, "i": 8}) 

73 

74 # Cant merge configs with shared keys 

75 self.assertEqual(list(config6.maybe_merge(config7)), [config6, config7]) 

76 

77 

78class PipelineIRTestCase(unittest.TestCase): 

79 """A test case for PipelineIR objects.""" 

80 

81 def testPipelineIRInitChecks(self): 

82 # Missing description 

83 pipeline_str = """ 

84 tasks: 

85 a: module.A 

86 """ 

87 with self.assertRaises(ValueError): 

88 PipelineIR.from_string(pipeline_str) 

89 

90 # Missing tasks 

91 pipeline_str = """ 

92 description: Test Pipeline 

93 """ 

94 with self.assertRaises(ValueError): 

95 PipelineIR.from_string(pipeline_str) 

96 

97 # This should raise a FileNotFoundError, as there are imported defined 

98 # so the __init__ method should pass but the imported file does not 

99 # exist 

100 pipeline_str = textwrap.dedent( 

101 """ 

102 description: Test Pipeline 

103 imports: /dummy_pipeline.yaml 

104 """ 

105 ) 

106 

107 with self.assertRaises(FileNotFoundError): 

108 PipelineIR.from_string(pipeline_str) 

109 

110 def testTaskParsing(self): 

111 # Should be able to parse a task defined both ways 

112 pipeline_str = textwrap.dedent( 

113 """ 

114 description: Test Pipeline 

115 tasks: 

116 modA: test.modA 

117 modB: 

118 class: test.modB 

119 """ 

120 ) 

121 

122 pipeline = PipelineIR.from_string(pipeline_str) 

123 self.assertEqual(list(pipeline.tasks.keys()), ["modA", "modB"]) 

124 self.assertEqual([t.klass for t in pipeline.tasks.values()], ["test.modA", "test.modB"]) 

125 

126 def testImportParsing(self): 

127 # This should raise, as the two pipelines, both define the same label 

128 pipeline_str = textwrap.dedent( 

129 """ 

130 description: Test Pipeline 

131 imports: 

132 - $TESTDIR/pipelines/testPipeline1.yaml 

133 - $TESTDIR/pipelines/testPipeline2.yaml 

134 """ 

135 ) 

136 # "modA" is the duplicated label, and it should appear in the error. 

137 with self.assertRaisesRegex(ValueError, "modA"): 

138 PipelineIR.from_string(pipeline_str) 

139 

140 # This should pass, as the conflicting task is excluded 

141 pipeline_str = textwrap.dedent( 

142 """ 

143 description: Test Pipeline 

144 imports: 

145 - location: $TESTDIR/pipelines/testPipeline1.yaml 

146 exclude: modA 

147 - $TESTDIR/pipelines/testPipeline2.yaml 

148 """ 

149 ) 

150 pipeline = PipelineIR.from_string(pipeline_str) 

151 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modB"}) 

152 

153 # This should pass, as the conflicting task is no in includes 

154 pipeline_str = textwrap.dedent( 

155 """ 

156 description: Test Pipeline 

157 imports: 

158 - location: $TESTDIR/pipelines/testPipeline1.yaml 

159 include: modB 

160 labeledSubsetModifyMode: DROP 

161 - $TESTDIR/pipelines/testPipeline2.yaml 

162 """ 

163 ) 

164 

165 pipeline = PipelineIR.from_string(pipeline_str) 

166 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modB"}) 

167 

168 # Test that you can't include and exclude a task 

169 pipeline_str = textwrap.dedent( 

170 """ 

171 description: Test Pipeline 

172 imports: 

173 - location: $TESTDIR/pipelines/testPipeline1.yaml 

174 exclude: modA 

175 include: modB 

176 labeledSubsetModifyMode: EDIT 

177 - $TESTDIR/pipelines/testPipeline2.yaml 

178 """ 

179 ) 

180 

181 with self.assertRaises(ValueError): 

182 PipelineIR.from_string(pipeline_str) 

183 

184 # Test that you can rename a task 

185 pipeline_str = textwrap.dedent( 

186 """ 

187 description: Test Pipeline 

188 imports: 

189 - location: $TESTDIR/pipelines/testPipeline1.yaml 

190 rename: 

191 modB: modZ 

192 labeledSubsetModifyMode: EDIT 

193 """ 

194 ) 

195 

196 pipeline = PipelineIR.from_string(pipeline_str) 

197 self.assertEqual(set(pipeline.tasks.keys()), {"modA", "modZ"}) 

198 

199 # Test that renaming and including work together 

200 pipeline_str = textwrap.dedent( 

201 """ 

202 description: Test Pipeline 

203 imports: 

204 - location: $TESTDIR/pipelines/testPipeline1.yaml 

205 include: modB 

206 rename: 

207 modB: modZ 

208 labeledSubsetModifyMode: EDIT 

209 """ 

210 ) 

211 

212 pipeline = PipelineIR.from_string(pipeline_str) 

213 self.assertEqual( 

214 set(pipeline.tasks.keys()), 

215 { 

216 "modZ", 

217 }, 

218 ) 

219 

220 # Test that renaming an excluded label works, although perhaps it 

221 # should emit a warning 

222 pipeline_str = textwrap.dedent( 

223 """ 

224 description: Test Pipeline 

225 imports: 

226 - location: $TESTDIR/pipelines/testPipeline1.yaml 

227 exclude: modB 

228 rename: 

229 modB: modZ 

230 labeledSubsetModifyMode: EDIT 

231 """ 

232 ) 

233 

234 pipeline = PipelineIR.from_string(pipeline_str) 

235 self.assertEqual( 

236 set(pipeline.tasks.keys()), 

237 { 

238 "modA", 

239 }, 

240 ) 

241 

242 # Test that you can't rename a task to an existing task 

243 pipeline_str = textwrap.dedent( 

244 """ 

245 description: Test Pipeline 

246 imports: 

247 - location: $TESTDIR/pipelines/testPipeline1.yaml 

248 rename: 

249 modB: modA 

250 labeledSubsetModifyMode: EDIT 

251 """ 

252 ) 

253 

254 with self.assertRaises(ValueError): 

255 PipelineIR.from_string(pipeline_str) 

256 

257 # Test that you can't rename two tasks to the same new name 

258 pipeline_str = textwrap.dedent( 

259 """ 

260 description: Test Pipeline 

261 imports: 

262 - location: $TESTDIR/pipelines/testPipeline1.yaml 

263 rename: 

264 modA: modC 

265 modB: modC 

266 labeledSubsetModifyMode: EDIT 

267 """ 

268 ) 

269 

270 with self.assertRaises(ValueError): 

271 PipelineIR.from_string(pipeline_str) 

272 

273 # Test that you can't rename a task back to its old name 

274 # This could work but is confusing and pointless 

275 pipeline_str = textwrap.dedent( 

276 """ 

277 description: Test Pipeline 

278 imports: 

279 - location: $TESTDIR/pipelines/testPipeline1.yaml 

280 rename: 

281 modB: modC 

282 modC: modB 

283 labeledSubsetModifyMode: EDIT 

284 """ 

285 ) 

286 

287 with self.assertRaises(ValueError): 

288 PipelineIR.from_string(pipeline_str) 

289 

290 # Test unknown labeledSubsetModifyModes raise 

291 pipeline_str = textwrap.dedent( 

292 """ 

293 description: Test Pipeline 

294 imports: 

295 - location: $TESTDIR/pipelines/testPipeline1.yaml 

296 exclude: modA 

297 include: modB 

298 labeledSubsetModifyMode: WRONG 

299 - $TESTDIR/pipelines/testPipeline2.yaml 

300 """ 

301 ) 

302 with self.assertRaises(ValueError): 

303 PipelineIR.from_string(pipeline_str) 

304 

305 # Test that contracts are imported 

306 pipeline_str = textwrap.dedent( 

307 """ 

308 description: Test Pipeline 

309 imports: 

310 - $TESTDIR/pipelines/testPipeline1.yaml 

311 """ 

312 ) 

313 

314 pipeline = PipelineIR.from_string(pipeline_str) 

315 self.assertEqual(pipeline.contracts[0].contract, "modA.b == modA.c") 

316 

317 # Test that contracts are not imported 

318 pipeline_str = textwrap.dedent( 

319 """ 

320 description: Test Pipeline 

321 imports: 

322 - location: $TESTDIR/pipelines/testPipeline1.yaml 

323 importContracts: False 

324 """ 

325 ) 

326 

327 pipeline = PipelineIR.from_string(pipeline_str) 

328 self.assertEqual(pipeline.contracts, []) 

329 

330 # Test that configs are imported when defining the same task again 

331 # with the same label 

332 pipeline_str = textwrap.dedent( 

333 """ 

334 description: Test Pipeline 

335 imports: 

336 - $TESTDIR/pipelines/testPipeline2.yaml 

337 tasks: 

338 modA: 

339 class: "test.moduleA" 

340 config: 

341 value2: 2 

342 """ 

343 ) 

344 pipeline = PipelineIR.from_string(pipeline_str) 

345 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"value1": 1, "value2": 2}) 

346 

347 # Test that configs are not imported when redefining the task 

348 # associated with a label 

349 pipeline_str = textwrap.dedent( 

350 """ 

351 description: Test Pipeline 

352 imports: 

353 - $TESTDIR/pipelines/testPipeline2.yaml 

354 tasks: 

355 modA: 

356 class: "test.moduleAReplace" 

357 config: 

358 value2: 2 

359 """ 

360 ) 

361 pipeline = PipelineIR.from_string(pipeline_str) 

362 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"value2": 2}) 

363 

364 # Test that named subsets are imported 

365 pipeline_str = textwrap.dedent( 

366 """ 

367 description: Test Pipeline 

368 imports: 

369 - $TESTDIR/pipelines/testPipeline2.yaml 

370 """ 

371 ) 

372 pipeline = PipelineIR.from_string(pipeline_str) 

373 self.assertEqual(pipeline.labeled_subsets.keys(), {"modSubset"}) 

374 self.assertEqual(pipeline.labeled_subsets["modSubset"].subset, {"modA"}) 

375 

376 # Test that imported and redeclaring a named subset works 

377 pipeline_str = textwrap.dedent( 

378 """ 

379 description: Test Pipeline 

380 imports: 

381 - $TESTDIR/pipelines/testPipeline2.yaml 

382 tasks: 

383 modE: "test.moduleE" 

384 subsets: 

385 modSubset: 

386 - modE 

387 """ 

388 ) 

389 pipeline = PipelineIR.from_string(pipeline_str) 

390 self.assertEqual(pipeline.labeled_subsets.keys(), {"modSubset"}) 

391 self.assertEqual(pipeline.labeled_subsets["modSubset"].subset, {"modE"}) 

392 

393 # Test that imported from two pipelines that both declare a named 

394 # subset with the same name fails 

395 pipeline_str = textwrap.dedent( 

396 """ 

397 description: Test Pipeline 

398 imports: 

399 - $TESTDIR/pipelines/testPipeline2.yaml 

400 - $TESTDIR/pipelines/testPipeline3.yaml 

401 """ 

402 ) 

403 with self.assertRaises(ValueError): 

404 PipelineIR.from_string(pipeline_str) 

405 

406 # Test that imported a named subset that duplicates a label declared 

407 # in this pipeline fails 

408 pipeline_str = textwrap.dedent( 

409 """ 

410 description: Test Pipeline 

411 imports: 

412 - $TESTDIR/pipelines/testPipeline2.yaml 

413 tasks: 

414 modSubset: "test.moduleE" 

415 """ 

416 ) 

417 with self.assertRaises(ValueError): 

418 PipelineIR.from_string(pipeline_str) 

419 

420 # Test that imported fails if a named subset and task label conflict 

421 pipeline_str = textwrap.dedent( 

422 """ 

423 description: Test Pipeline 

424 imports: 

425 - $TESTDIR/pipelines/testPipeline2.yaml 

426 - $TESTDIR/pipelines/testPipeline4.yaml 

427 """ 

428 ) 

429 with self.assertRaises(ValueError): 

430 PipelineIR.from_string(pipeline_str) 

431 

432 # Test that importing Pipelines with different step definitions fails 

433 pipeline_str = textwrap.dedent( 

434 """ 

435 description: Test Pipeline 

436 imports: 

437 - $TESTDIR/pipelines/testPipeline5.yaml 

438 steps: 

439 - label: sub1 

440 dimensions: ['a', 'e'] 

441 """ 

442 ) 

443 with self.assertRaises(ValueError): 

444 PipelineIR.from_string(pipeline_str) 

445 

446 # Test that it does not fail if steps are excluded 

447 pipeline_str = textwrap.dedent( 

448 """ 

449 description: Test Pipeline 

450 imports: 

451 - location: $TESTDIR/pipelines/testPipeline5.yaml 

452 importSteps: false 

453 steps: 

454 - label: sub1 

455 dimensions: ['a', 'e'] 

456 """ 

457 ) 

458 PipelineIR.from_string(pipeline_str) 

459 

460 # Test that importing does work 

461 pipeline_str = textwrap.dedent( 

462 """ 

463 description: Test Pipeline 

464 imports: 

465 - location: $TESTDIR/pipelines/testPipeline5.yaml 

466 """ 

467 ) 

468 pipeline = PipelineIR.from_string(pipeline_str) 

469 self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"}) 

470 

471 def testSteps(self): 

472 # Test that steps definitions are created 

473 pipeline_str = textwrap.dedent( 

474 """ 

475 description: Test Pipeline 

476 tasks: 

477 modA: "test.moduleA" 

478 modB: "test.moduleB" 

479 subsets: 

480 sub1: 

481 subset: 

482 - modA 

483 - modB 

484 sub2: 

485 subset: 

486 - modA 

487 steps: 

488 - label: sub1 

489 dimensions: ['a', 'b'] 

490 - label: sub2 

491 dimensions: ['a', 'b'] 

492 """ 

493 ) 

494 pipeline = PipelineIR.from_string(pipeline_str) 

495 self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"}) 

496 

497 # Test that steps definitions must be unique 

498 pipeline_str = textwrap.dedent( 

499 """ 

500 description: Test Pipeline 

501 tasks: 

502 modA: "test.moduleA" 

503 modB: "test.moduleB" 

504 subsets: 

505 sub1: 

506 subset: 

507 - modA 

508 - modB 

509 sub2: 

510 subset: 

511 - modA 

512 steps: 

513 - label: sub1 

514 dimensions: ['a', 'b'] 

515 - label: sub1 

516 dimensions: ['a', 'b'] 

517 """ 

518 ) 

519 with self.assertRaises(ValueError): 

520 pipeline = PipelineIR.from_string(pipeline_str) 

521 

522 def testReadParameters(self): 

523 # verify that parameters section are read in from a pipeline 

524 pipeline_str = textwrap.dedent( 

525 """ 

526 description: Test Pipeline 

527 parameters: 

528 value1: A 

529 value2: B 

530 tasks: 

531 modA: ModuleA 

532 """ 

533 ) 

534 pipeline = PipelineIR.from_string(pipeline_str) 

535 self.assertEqual(pipeline.parameters.mapping, {"value1": "A", "value2": "B"}) 

536 

537 def testTaskParameterLabel(self): 

538 # verify that "parameters" cannot be used as a task label 

539 pipeline_str = textwrap.dedent( 

540 """ 

541 description: Test Pipeline 

542 tasks: 

543 parameters: modA 

544 """ 

545 ) 

546 with self.assertRaises(ValueError): 

547 PipelineIR.from_string(pipeline_str) 

548 

549 def testParameterImporting(self): 

550 # verify that importing parameters happens correctly 

551 pipeline_str = textwrap.dedent( 

552 """ 

553 description: Test Pipeline 

554 imports: 

555 - $TESTDIR/pipelines/testPipeline1.yaml 

556 - location: $TESTDIR/pipelines/testPipeline2.yaml 

557 exclude: 

558 - modA 

559 

560 parameters: 

561 value4: valued 

562 """ 

563 ) 

564 pipeline = PipelineIR.from_string(pipeline_str) 

565 self.assertEqual( 

566 pipeline.parameters.mapping, 

567 {"value4": "valued", "value1": "valueNew", "value2": "valueB", "value3": "valueC"}, 

568 ) 

569 

570 def testImportingInstrument(self): 

571 # verify an instrument is imported, or ignored, (Or otherwise modified 

572 # for potential future use) 

573 pipeline_str = textwrap.dedent( 

574 """ 

575 description: Test Pipeline 

576 imports: 

577 - $TESTDIR/pipelines/testPipeline1.yaml 

578 """ 

579 ) 

580 pipeline = PipelineIR.from_string(pipeline_str) 

581 self.assertEqual(pipeline.instrument, "test.instrument") 

582 

583 # verify that an imported pipeline can have its instrument set to None 

584 pipeline_str = textwrap.dedent( 

585 """ 

586 description: Test Pipeline 

587 imports: 

588 - location: $TESTDIR/pipelines/testPipeline1.yaml 

589 instrument: None 

590 """ 

591 ) 

592 pipeline = PipelineIR.from_string(pipeline_str) 

593 self.assertEqual(pipeline.instrument, None) 

594 

595 # verify that an imported pipeline can have its instrument modified 

596 pipeline_str = textwrap.dedent( 

597 """ 

598 description: Test Pipeline 

599 imports: 

600 - location: $TESTDIR/pipelines/testPipeline1.yaml 

601 instrument: new.instrument 

602 """ 

603 ) 

604 pipeline = PipelineIR.from_string(pipeline_str) 

605 self.assertEqual(pipeline.instrument, "new.instrument") 

606 

607 # Test that multiple instruments can't be defined, 

608 # and that the error message tells you what instruments were found. 

609 pipeline_str = textwrap.dedent( 

610 """ 

611 description: Test Pipeline 

612 instrument: new.instrument 

613 imports: 

614 - location: $TESTDIR/pipelines/testPipeline1.yaml 

615 """ 

616 ) 

617 with self.assertRaisesRegex(ValueError, "new.instrument .* test.instrument."): 

618 PipelineIR.from_string(pipeline_str) 

619 

620 def testParameterConfigFormatting(self): 

621 # verify that a config properly is formatted with parameters 

622 pipeline_str = textwrap.dedent( 

623 """ 

624 description: Test Pipeline 

625 parameters: 

626 value1: A 

627 tasks: 

628 modA: 

629 class: ModuleA 

630 config: 

631 testKey: parameters.value1 

632 """ 

633 ) 

634 pipeline = PipelineIR.from_string(pipeline_str) 

635 newConfig = pipeline.tasks["modA"].config[0].formatted(pipeline.parameters) 

636 self.assertEqual(newConfig.rest["testKey"], "A") 

637 

638 def testReadContracts(self): 

639 # Verify that contracts are read in from a pipeline 

640 location = "$TESTDIR/pipelines/testPipeline1.yaml" 

641 pipeline = PipelineIR.from_uri(location) 

642 self.assertEqual(pipeline.contracts[0].contract, "modA.b == modA.c") 

643 

644 # Verify that a contract message is loaded 

645 pipeline_str = textwrap.dedent( 

646 """ 

647 description: Test Pipeline 

648 tasks: 

649 modA: test.modA 

650 modB: 

651 class: test.modB 

652 contracts: 

653 - contract: modA.foo == modB.Bar 

654 msg: "Test message" 

655 """ 

656 ) 

657 

658 pipeline = PipelineIR.from_string(pipeline_str) 

659 self.assertEqual(pipeline.contracts[0].msg, "Test message") 

660 

661 def testReadNamedSubsets(self): 

662 pipeline_str = textwrap.dedent( 

663 """ 

664 description: Test Pipeline 

665 tasks: 

666 modA: test.modA 

667 modB: 

668 class: test.modB 

669 modC: test.modC 

670 modD: test.modD 

671 subsets: 

672 subset1: 

673 - modA 

674 - modB 

675 subset2: 

676 subset: 

677 - modC 

678 - modD 

679 description: "A test named subset" 

680 """ 

681 ) 

682 pipeline = PipelineIR.from_string(pipeline_str) 

683 self.assertEqual(pipeline.labeled_subsets.keys(), {"subset1", "subset2"}) 

684 

685 self.assertEqual(pipeline.labeled_subsets["subset1"].subset, {"modA", "modB"}) 

686 self.assertEqual(pipeline.labeled_subsets["subset1"].description, None) 

687 

688 self.assertEqual(pipeline.labeled_subsets["subset2"].subset, {"modC", "modD"}) 

689 self.assertEqual(pipeline.labeled_subsets["subset2"].description, "A test named subset") 

690 

691 # verify that forgetting a subset key is an error 

692 pipeline_str = textwrap.dedent( 

693 """ 

694 description: Test Pipeline 

695 tasks: 

696 modA: test.modA 

697 modB: 

698 class: test.modB 

699 modC: test.modC 

700 modD: test.modD 

701 subsets: 

702 subset2: 

703 sub: 

704 - modC 

705 - modD 

706 description: "A test named subset" 

707 """ 

708 ) 

709 with self.assertRaises(ValueError): 

710 PipelineIR.from_string(pipeline_str) 

711 

712 # verify putting a label in a named subset that is not in the task is 

713 # an error 

714 pipeline_str = textwrap.dedent( 

715 """ 

716 description: Test Pipeline 

717 tasks: 

718 modA: test.modA 

719 modB: 

720 class: test.modB 

721 modC: test.modC 

722 modD: test.modD 

723 subsets: 

724 subset2: 

725 - modC 

726 - modD 

727 - modE 

728 """ 

729 ) 

730 with self.assertRaises(ValueError): 

731 PipelineIR.from_string(pipeline_str) 

732 

733 def testSubsettingPipeline(self): 

734 pipeline_str = textwrap.dedent( 

735 """ 

736 description: Test Pipeline 

737 tasks: 

738 modA: test.modA 

739 modB: 

740 class: test.modB 

741 modC: test.modC 

742 modD: test.modD 

743 subsets: 

744 subset1: 

745 - modA 

746 - modB 

747 subset2: 

748 subset: 

749 - modC 

750 - modD 

751 description: "A test named subset" 

752 """ 

753 ) 

754 pipeline = PipelineIR.from_string(pipeline_str) 

755 # verify that creating a pipeline subset with the default drop behavior 

756 # removes any labeled subset that contains a label not in the set of 

757 # all task labels. 

758 pipelineSubset1 = pipeline.subset_from_labels({"modA", "modB", "modC"}) 

759 self.assertEqual(pipelineSubset1.labeled_subsets.keys(), {"subset1"}) 

760 # verify that creating a pipeline subset with the edit behavior 

761 # edits any labeled subset that contains a label not in the set of 

762 # all task labels. 

763 pipelineSubset2 = pipeline.subset_from_labels({"modA", "modB", "modC"}, PipelineSubsetCtrl.EDIT) 

764 self.assertEqual(pipelineSubset2.labeled_subsets.keys(), {"subset1", "subset2"}) 

765 self.assertEqual(pipelineSubset2.labeled_subsets["subset2"].subset, {"modC"}) 

766 

767 def testInstrument(self): 

768 # Verify that if instrument is defined it is parsed out 

769 pipeline_str = textwrap.dedent( 

770 """ 

771 description: Test Pipeline 

772 instrument: dummyCam 

773 tasks: 

774 modA: test.moduleA 

775 """ 

776 ) 

777 

778 pipeline = PipelineIR.from_string(pipeline_str) 

779 self.assertEqual(pipeline.instrument, "dummyCam") 

780 

781 def testReadTaskConfig(self): 

782 # Verify that a task with a config is read in correctly 

783 pipeline_str = textwrap.dedent( 

784 """ 

785 description: Test Pipeline 

786 tasks: 

787 modA: 

788 class: test.moduleA 

789 config: 

790 propertyA: 6 

791 propertyB: 7 

792 file: testfile.py 

793 python: "config.testDict['a'] = 9" 

794 """ 

795 ) 

796 

797 pipeline = PipelineIR.from_string(pipeline_str) 

798 self.assertEqual(pipeline.tasks["modA"].config[0].file, ["testfile.py"]) 

799 self.assertEqual(pipeline.tasks["modA"].config[0].python, "config.testDict['a'] = 9") 

800 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"propertyA": 6, "propertyB": 7}) 

801 

802 # Verify that multiple files are read fine 

803 pipeline_str = textwrap.dedent( 

804 """ 

805 description: Test Pipeline 

806 tasks: 

807 modA: 

808 class: test.moduleA 

809 config: 

810 file: 

811 - testfile.py 

812 - otherFile.py 

813 """ 

814 ) 

815 

816 pipeline = PipelineIR.from_string(pipeline_str) 

817 self.assertEqual(pipeline.tasks["modA"].config[0].file, ["testfile.py", "otherFile.py"]) 

818 

819 # Test reading multiple Config entries 

820 pipeline_str = textwrap.dedent( 

821 """ 

822 description: Test Pipeline 

823 tasks: 

824 modA: 

825 class: test.moduleA 

826 config: 

827 - propertyA: 6 

828 propertyB: 7 

829 dataId: {"visit": 6} 

830 - propertyA: 8 

831 propertyB: 9 

832 """ 

833 ) 

834 

835 pipeline = PipelineIR.from_string(pipeline_str) 

836 self.assertEqual(pipeline.tasks["modA"].config[0].rest, {"propertyA": 6, "propertyB": 7}) 

837 self.assertEqual(pipeline.tasks["modA"].config[0].dataId, {"visit": 6}) 

838 self.assertEqual(pipeline.tasks["modA"].config[1].rest, {"propertyA": 8, "propertyB": 9}) 

839 self.assertEqual(pipeline.tasks["modA"].config[1].dataId, None) 

840 

841 def testSerialization(self): 

842 # Test creating a pipeline, writing it to a file, reading the file 

843 pipeline_str = textwrap.dedent( 

844 """ 

845 description: Test Pipeline 

846 instrument: dummyCam 

847 imports: 

848 - location: $TESTDIR/pipelines/testPipeline1.yaml 

849 instrument: None 

850 tasks: 

851 modC: 

852 class: test.moduleC 

853 config: 

854 - propertyA: 6 

855 propertyB: 7 

856 dataId: {"visit": 6} 

857 - propertyA: 8 

858 propertyB: 9 

859 modD: test.moduleD 

860 contracts: 

861 - modA.foo == modB.bar 

862 subsets: 

863 subA: 

864 - modA 

865 - modC 

866 """ 

867 ) 

868 

869 pipeline = PipelineIR.from_string(pipeline_str) 

870 

871 # Create the temp file, write and read 

872 with tempfile.NamedTemporaryFile() as tf: 

873 pipeline.write_to_uri(tf.name) 

874 loaded_pipeline = PipelineIR.from_uri(tf.name) 

875 self.assertEqual(pipeline, loaded_pipeline) 

876 

877 def testPipelineYamlLoader(self): 

878 # Tests that an exception is thrown in the case a key is used multiple 

879 # times in a given scope within a pipeline file 

880 pipeline_str = textwrap.dedent( 

881 """ 

882 description: Test Pipeline 

883 tasks: 

884 modA: test1 

885 modB: test2 

886 modA: test3 

887 """ 

888 ) 

889 self.assertRaises(KeyError, PipelineIR.from_string, pipeline_str) 

890 

891 def testMultiLineStrings(self): 

892 """Test that multi-line strings in pipelines are written with 

893 '|' continuation-syntax instead of explicit newlines. 

894 """ 

895 pipeline_ir = PipelineIR({"description": "Line 1\nLine2\n", "tasks": {"modA": "task1"}}) 

896 string = str(pipeline_ir) 

897 self.assertIn("|", string) 

898 self.assertNotIn(r"\n", string) 

899 

900 

901class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

902 """Run file leak tests.""" 

903 

904 

905def setup_module(module): 

906 """Configure pytest.""" 

907 lsst.utils.tests.init() 

908 

909 

910if __name__ == "__main__": 

911 lsst.utils.tests.init() 

912 unittest.main()