Coverage for tests / test_init_output_run.py: 14%

257 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import itertools 

29import tempfile 

30import unittest 

31from collections.abc import Iterator 

32from contextlib import contextmanager 

33from typing import ClassVar 

34 

35import lsst.utils.tests 

36from lsst.daf.butler import ( 

37 Butler, 

38 DatasetRef, 

39 DatasetType, 

40 MissingDatasetTypeError, 

41 QuantumBackedButler, 

42 SerializedDatasetType, 

43 StorageClassFactory, 

44) 

45from lsst.daf.butler.registry import ConflictingDefinitionError 

46from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

47from lsst.pipe.base.pipeline_graph import PipelineGraph 

48from lsst.pipe.base.quantum_graph import PredictedQuantumGraph 

49from lsst.pipe.base.tests.mocks import ( 

50 DynamicConnectionConfig, 

51 DynamicTestPipelineTask, 

52 DynamicTestPipelineTaskConfig, 

53 MockDataset, 

54) 

55 

56 

57def _have_example_storage_classes() -> bool: 

58 """Check whether some storage classes work as expected. 

59 

60 Given that these have registered converters, it shouldn't actually be 

61 necessary to import those types in order to determine that they're 

62 convertible, but the storage class machinery is implemented such that types 

63 that can't be imported can't be converted, and while that's inconvenient 

64 here it's totally fine in non-testing scenarios where you only care about a 

65 storage class if you can actually use it. 

66 """ 

67 getter = StorageClassFactory().getStorageClass 

68 return ( 

69 getter("ArrowTable").can_convert(getter("ArrowAstropy")) 

70 and getter("ArrowAstropy").can_convert(getter("ArrowTable")) 

71 and getter("ArrowTable").can_convert(getter("DataFrame")) 

72 and getter("DataFrame").can_convert(getter("ArrowTable")) 

73 ) 

74 

75 

76class InitOutputRunTestCase(unittest.TestCase): 

77 """Tests for the init_output_run methods of PipelineGraph and 

78 QuantumGraph. 

79 """ 

80 

81 INPUT_COLLECTION: ClassVar[str] = "overall_inputs" 

82 

83 @contextmanager 

84 def make_butler(self) -> Iterator[Butler]: 

85 """Wrap a temporary local butler repository in a context manager.""" 

86 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as root: 

87 config = Butler.makeRepo(root) 

88 with Butler.from_config(config, writeable=True) as butler: 

89 yield butler 

90 

91 @contextmanager 

92 def prep_butler(self, pipeline_graph: PipelineGraph) -> Iterator[Butler]: 

93 """Create a temporary local butler repository with the dataset types 

94 and input datasets needed by a pipeline graph. 

95 

96 This also resolves the pipeline graph and checks dataset types 

97 immediately after they are registered, providing test coverage for the 

98 methods that do that. 

99 """ 

100 with self.make_butler() as butler: 

101 butler.collections.register(self.INPUT_COLLECTION) 

102 pipeline_graph.resolve(butler.registry) 

103 with self.assertRaises(MissingDatasetTypeError): 

104 pipeline_graph.check_dataset_type_registrations(butler) 

105 pipeline_graph.register_dataset_types(butler) 

106 pipeline_graph.check_dataset_type_registrations(butler) 

107 for _, dataset_type_node in pipeline_graph.iter_overall_inputs(): 

108 butler.put( 

109 MockDataset( 

110 dataset_id=None, 

111 dataset_type=SerializedDatasetType( 

112 name=dataset_type_node.name, 

113 dimensions=[], 

114 storageClass=dataset_type_node.storage_class_name, 

115 ), 

116 data_id={}, 

117 run=self.INPUT_COLLECTION, 

118 ), 

119 dataset_type_node.name, 

120 run=self.INPUT_COLLECTION, 

121 ) 

122 yield butler 

123 

124 def find_init_output_refs( 

125 self, pipeline_graph: PipelineGraph, butler: Butler 

126 ) -> dict[str, list[DatasetRef]]: 

127 """Find the init-output datasets of a pipeline graph in a butler 

128 repository. 

129 

130 Parameters 

131 ---------- 

132 pipeline_graph : `PipelineGraph` 

133 Pipeline graph. 

134 butler : `lsst.daf.butler.Butler` 

135 Full butler client. 

136 

137 Returns 

138 ------- 

139 init_output_refs : `dict` 

140 Dataset references, keyed by task label. Storage classes will 

141 match the data repository definitions of the dataset types. The 

142 special 'packages' dataset type will be included under a '*' key. 

143 """ 

144 init_output_refs: dict[str, list[DatasetRef]] = {} 

145 for task_node in pipeline_graph.tasks.values(): 

146 init_output_refs_for_task: list[DatasetRef] = [] 

147 for write_edge in task_node.init.iter_all_outputs(): 

148 ref = butler.find_dataset(write_edge.dataset_type_name) 

149 assert ref is not None # For linters. 

150 # Check that the ref we got back uses the dataset type node's 

151 # definition of the dataset type (including storage class). 

152 self.assertEqual( 

153 ref.datasetType, pipeline_graph.dataset_types[write_edge.dataset_type_name].dataset_type 

154 ) 

155 # Remember the version of the ref that has the task's storage 

156 # class, in case they differ. 

157 init_output_refs_for_task.append(write_edge.adapt_dataset_ref(ref)) 

158 init_output_refs[task_node.label] = init_output_refs_for_task 

159 init_output_refs["*"] = [butler.find_dataset(pipeline_graph.packages_dataset_type)] 

160 return init_output_refs 

161 

162 def get_quantum_graph_init_output_refs( 

163 self, quantum_graph: PredictedQuantumGraph 

164 ) -> dict[str, list[DatasetRef]]: 

165 """Extract dataset references from a QuantumGraph into the same form 

166 as returned by `find_init_output_refs`. 

167 """ 

168 init_output_refs: dict[str, list[DatasetRef]] = {} 

169 for task_label in quantum_graph.pipeline_graph.tasks: 

170 init_output_refs[task_label] = list(quantum_graph.get_init_outputs(task_label).values()) 

171 init_output_refs["*"] = list(quantum_graph.get_init_outputs("").values()) 

172 return init_output_refs 

173 

174 def assert_init_output_refs_equal( 

175 self, a: dict[str, list[DatasetRef]], b: dict[str, list[DatasetRef]] 

176 ) -> None: 

177 """Check that two dictionaries of the form returned by 

178 `find_init_output_refs` are equal. 

179 """ 

180 self.assertEqual(a.keys(), b.keys()) 

181 for task_label, init_output_refs_for_task in a.items(): 

182 self.assertCountEqual(init_output_refs_for_task, b[task_label]) 

183 

184 def check_qbb_consistency( 

185 self, init_output_refs: dict[str, list[DatasetRef]], qbb: QuantumBackedButler 

186 ) -> None: 

187 """Check that a quantum-backed butler sees all of the given datasets. 

188 

189 Parameters 

190 ---------- 

191 init_output_refs : `dict` 

192 Dataset references, keyed by task label. Storage classes should 

193 match the data repository definitions of the dataset types. The 

194 special 'packages' dataset type should be included under a '*' key. 

195 qbb : `lsst.daf.butler.QuantumBackedButler` 

196 A quantum-backed butler. 

197 """ 

198 for task_label, init_output_refs_for_task in init_output_refs.items(): 

199 for ref, stored_in in qbb.stored_many(init_output_refs_for_task).items(): 

200 self.assertTrue( 

201 stored_in, msg=f"Init-input {ref} of task {task_label} not stored according to QBB." 

202 ) 

203 

204 def init_with_pipeline_graph_first( 

205 self, pipeline_graph: PipelineGraph, butler: Butler, run: str 

206 ) -> PredictedQuantumGraph: 

207 """Test the init_output_run methods of PipelineGraph and QuantumGraph, 

208 using the former to actually write init-outputs (with later attempts 

209 correctly failing or doing nothing, depending on parameters). 

210 """ 

211 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run]) 

212 pipeline_graph.init_output_run(butler) 

213 init_output_refs = self.find_init_output_refs(pipeline_graph, butler) 

214 # Build a QG with the init outputs already in place. 

215 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

216 pipeline_graph, 

217 butler, 

218 skip_existing_in=[run], 

219 output_run=run, 

220 input_collections=[self.INPUT_COLLECTION], 

221 ) 

222 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble() 

223 # Check that the QG refs are the same as the ones that were present 

224 # already. 

225 self.assert_init_output_refs_equal( 

226 self.get_quantum_graph_init_output_refs(quantum_graph), 

227 init_output_refs, 

228 ) 

229 # Initialize with the pipeline graph, should be a no-op. 

230 pipeline_graph.init_output_run(butler) 

231 self.assert_init_output_refs_equal( 

232 self.find_init_output_refs(pipeline_graph, butler), 

233 init_output_refs, 

234 ) 

235 # Initialize again with the QG; should be a no-op. 

236 quantum_graph.init_output_run(butler) 

237 self.assert_init_output_refs_equal( 

238 self.find_init_output_refs(pipeline_graph, butler), 

239 init_output_refs, 

240 ) 

241 # Initialize again with the QG but tell it to expect an empty run. 

242 with self.assertRaises(ConflictingDefinitionError): 

243 quantum_graph.init_output_run(butler, existing=False) 

244 with self.assertRaises(ConflictingDefinitionError): 

245 quantum_graph.write_configs(butler, compare_existing=False) 

246 with self.assertRaises(ConflictingDefinitionError): 

247 quantum_graph.write_packages(butler, compare_existing=False) 

248 with self.assertRaises(ConflictingDefinitionError): 

249 quantum_graph.write_init_outputs(butler, skip_existing=False) 

250 # Make a QBB, check that it can see the init outputs. 

251 qbb = quantum_graph.make_init_qbb(butler._config) 

252 self.enterContext(qbb) 

253 self.check_qbb_consistency(init_output_refs, qbb) 

254 # Use QBB to initialize again, should be a no-op. 

255 quantum_graph.init_output_run(qbb) 

256 self.check_qbb_consistency(init_output_refs, qbb) 

257 # Use QBB to initialize again but tell it to expect an empty run. 

258 with self.assertRaises(ConflictingDefinitionError): 

259 quantum_graph.init_output_run(qbb, existing=False) 

260 return quantum_graph 

261 

262 def init_with_quantum_graph_first( 

263 self, pipeline_graph: PipelineGraph, butler: Butler, run: str 

264 ) -> PredictedQuantumGraph: 

265 """Test the init_output_run methods of PipelineGraph and QuantumGraph, 

266 using the latter to actually write init-outputs (with later attempts 

267 correctly failing or doing nothing, depending on parameters). 

268 """ 

269 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run]) 

270 # Build a QG. 

271 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

272 pipeline_graph, 

273 butler, 

274 input_collections=[self.INPUT_COLLECTION], 

275 ) 

276 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble() 

277 # Initialize with the QG. 

278 quantum_graph.init_output_run(butler) 

279 # Check that the QG refs are the same as the ones we find in the repo. 

280 init_output_refs = self.find_init_output_refs(pipeline_graph, butler) 

281 self.assert_init_output_refs_equal( 

282 self.get_quantum_graph_init_output_refs(quantum_graph), 

283 init_output_refs, 

284 ) 

285 # Initialize again with the QG; should be a no-op. 

286 quantum_graph.init_output_run(butler) 

287 self.assert_init_output_refs_equal( 

288 self.find_init_output_refs(pipeline_graph, butler), 

289 init_output_refs, 

290 ) 

291 # Initialize again with the QG but tell it to expect an empty run. 

292 with self.assertRaises(ConflictingDefinitionError): 

293 quantum_graph.init_output_run(butler, existing=False) 

294 with self.assertRaises(ConflictingDefinitionError): 

295 quantum_graph.write_configs(butler, compare_existing=False) 

296 with self.assertRaises(ConflictingDefinitionError): 

297 quantum_graph.write_packages(butler, compare_existing=False) 

298 with self.assertRaises(ConflictingDefinitionError): 

299 quantum_graph.write_init_outputs(butler, skip_existing=False) 

300 # Initialize with the pipeline graph, should be a no-op. 

301 pipeline_graph.init_output_run(butler) 

302 # Make a QBB, check that it can see the init outputs. 

303 qbb = quantum_graph.make_init_qbb(butler._config) 

304 self.enterContext(qbb) 

305 self.check_qbb_consistency(init_output_refs, qbb) 

306 # Use QBB to initialize again, should be a no-op. 

307 quantum_graph.init_output_run(qbb) 

308 self.check_qbb_consistency(init_output_refs, qbb) 

309 # Use QBB to initialize again but tell it to expect an empty run. 

310 with self.assertRaises(ConflictingDefinitionError): 

311 quantum_graph.init_output_run(qbb, existing=False) 

312 return quantum_graph 

313 

314 def init_with_qbb_first( 

315 self, pipeline_graph: PipelineGraph, butler: Butler, run: str 

316 ) -> PredictedQuantumGraph: 

317 """Test the init_output_run methods of PipelineGraph and QuantumGraph, 

318 using the latter a quantum-backed butler to actually write init-outputs 

319 (with later attempts correctly failing or doing nothing, depending on 

320 parameters). 

321 """ 

322 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run]) 

323 # Build a QG. 

324 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

325 pipeline_graph, 

326 butler, 

327 input_collections=[self.INPUT_COLLECTION], 

328 ) 

329 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble() 

330 # Make a quantum-backed butler and use it to initialize the run. 

331 qbb = quantum_graph.make_init_qbb(butler._config) 

332 self.enterContext(qbb) 

333 quantum_graph.init_output_run(qbb) 

334 init_output_refs = self.get_quantum_graph_init_output_refs(quantum_graph) 

335 self.check_qbb_consistency(init_output_refs, qbb) 

336 # Use QBB to initialize again, should be a no-op. 

337 self.check_qbb_consistency(init_output_refs, qbb) 

338 # Use QBB to initialize again but tell it to expect an empty run. 

339 with self.assertRaises(ConflictingDefinitionError): 

340 quantum_graph.init_output_run(qbb, existing=False) 

341 # Transferring datasets back to the main butler (i.e. insert DB entries 

342 # for them). 

343 butler.transfer_from(qbb, itertools.chain.from_iterable(init_output_refs.values())) 

344 # Check that the QG refs are the same as the ones we find in the repo. 

345 self.assert_init_output_refs_equal( 

346 self.find_init_output_refs(pipeline_graph, butler), 

347 init_output_refs, 

348 ) 

349 # Initialize again with the QG; should be a no-op. 

350 quantum_graph.init_output_run(butler) 

351 self.assert_init_output_refs_equal( 

352 self.find_init_output_refs(pipeline_graph, butler), 

353 init_output_refs, 

354 ) 

355 # Initialize again with the QG but tell it to expect an empty run. 

356 with self.assertRaises(ConflictingDefinitionError): 

357 quantum_graph.init_output_run(butler, existing=False) 

358 # Initialize with the pipeline graph, should be a no-op. 

359 pipeline_graph.init_output_run(butler) 

360 return quantum_graph 

361 

362 def test_two_tasks_no_conversions(self) -> None: 

363 """Test a two-task pipeline with an overall init-input, an overall 

364 init-output, and an init-intermediate. 

365 """ 

366 a_config = DynamicTestPipelineTaskConfig() 

367 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

368 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

369 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init") 

370 a_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="intermediate_init") 

371 b_config = DynamicTestPipelineTaskConfig() 

372 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

373 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

374 b_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="intermediate_init") 

375 b_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="output_init") 

376 pipeline_graph = PipelineGraph() 

377 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

378 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config) 

379 with self.prep_butler(pipeline_graph) as butler: 

380 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1") 

381 self.assertEqual(butler.get("a_config", collections="run1"), a_config) 

382 self.assertEqual(butler.get("b_config", collections="run1"), b_config) 

383 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2") 

384 self.assertEqual(butler.get("a_config", collections="run2"), a_config) 

385 self.assertEqual(butler.get("b_config", collections="run2"), b_config) 

386 self.init_with_qbb_first(pipeline_graph, butler, "run3") 

387 self.assertEqual(butler.get("a_config", collections="run3"), a_config) 

388 self.assertEqual(butler.get("b_config", collections="run3"), b_config) 

389 

390 def test_optional_input_unregistered(self) -> None: 

391 """Test that an optional input dataset type that is not registered is 

392 not considered an error. 

393 """ 

394 a_config = DynamicTestPipelineTaskConfig() 

395 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime", minimum=0) 

396 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

397 pipeline_graph = PipelineGraph() 

398 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

399 with self.make_butler() as butler: 

400 pipeline_graph.resolve(butler.registry) 

401 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_config"].dataset_type) 

402 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_log"].dataset_type) 

403 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_metadata"].dataset_type) 

404 butler.registry.registerDatasetType(pipeline_graph.dataset_types["output_runtime"].dataset_type) 

405 pipeline_graph.check_dataset_type_registrations(butler, include_packages=False) 

406 

407 def test_registration_changed(self) -> None: 

408 """Test that we get an error when dataset type registrations in a data 

409 repository change between the time a pipeline graph is resolved (e.g. 

410 at QG generation) and when dataset types are checked later (e.g. during 

411 execution). 

412 """ 

413 a_config = DynamicTestPipelineTaskConfig() 

414 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

415 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

416 pipeline_graph = PipelineGraph() 

417 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

418 with self.make_butler() as butler: 

419 pipeline_graph.resolve(butler.registry) 

420 pipeline_graph.register_dataset_types(butler) 

421 butler.registry.removeDatasetType("input_runtime") 

422 butler.registry.registerDatasetType( 

423 DatasetType("input_runtime", {"instrument"}, "StructuredDataList", universe=butler.dimensions) 

424 ) 

425 with self.assertRaises(ConflictingDefinitionError): 

426 pipeline_graph.check_dataset_type_registrations(butler) 

427 

428 @unittest.skipUnless( 

429 _have_example_storage_classes(), "Arrow/Astropy/Pandas storage classes are not available." 

430 ) 

431 def test_init_intermediate_component(self) -> None: 

432 """Test init_output_run with an init-intermediate that is written as 

433 a composite and read as a component. 

434 """ 

435 a_config = DynamicTestPipelineTaskConfig() 

436 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

437 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

438 a_config.init_outputs["io"] = DynamicConnectionConfig( 

439 dataset_type_name="intermediate_init", storage_class="ArrowTable" 

440 ) 

441 b_config = DynamicTestPipelineTaskConfig() 

442 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

443 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

444 b_config.init_inputs["ii"] = DynamicConnectionConfig( 

445 dataset_type_name="intermediate_init.schema", storage_class="ArrowSchema" 

446 ) 

447 pipeline_graph = PipelineGraph() 

448 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

449 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config) 

450 with self.prep_butler(pipeline_graph) as butler: 

451 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1") 

452 self.assertEqual(butler.get("a_config", collections="run1"), a_config) 

453 self.assertEqual(butler.get("b_config", collections="run1"), b_config) 

454 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2") 

455 self.assertEqual(butler.get("a_config", collections="run2"), a_config) 

456 self.assertEqual(butler.get("b_config", collections="run2"), b_config) 

457 self.init_with_qbb_first(pipeline_graph, butler, "run3") 

458 self.assertEqual(butler.get("a_config", collections="run3"), a_config) 

459 self.assertEqual(butler.get("b_config", collections="run3"), b_config) 

460 

461 def test_no_get_init_input_callback(self) -> None: 

462 """Test calling PipelineGraph.instantiate_tasks with no get_init_input 

463 callback when one is necessary. 

464 """ 

465 a_config = DynamicTestPipelineTaskConfig() 

466 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

467 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

468 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init") 

469 pipeline_graph = PipelineGraph() 

470 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

471 with self.make_butler() as butler: 

472 pipeline_graph.resolve(butler.registry) 

473 with self.assertRaises(ValueError): 

474 pipeline_graph.instantiate_tasks() 

475 

476 def test_multiple_init_input_consumers(self) -> None: 

477 """Test init_output_run when there are two tasks consuming the same 

478 init-input. 

479 """ 

480 a_config = DynamicTestPipelineTaskConfig() 

481 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

482 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

483 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init") 

484 a_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="output_init") 

485 b_config = DynamicTestPipelineTaskConfig() 

486 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime") 

487 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

488 b_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init") 

489 pipeline_graph = PipelineGraph() 

490 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

491 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config) 

492 with self.prep_butler(pipeline_graph) as butler: 

493 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1") 

494 self.assertEqual(butler.get("a_config", collections="run1"), a_config) 

495 self.assertEqual(butler.get("b_config", collections="run1"), b_config) 

496 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2") 

497 self.assertEqual(butler.get("a_config", collections="run2"), a_config) 

498 self.assertEqual(butler.get("b_config", collections="run2"), b_config) 

499 self.init_with_qbb_first(pipeline_graph, butler, "run3") 

500 self.assertEqual(butler.get("a_config", collections="run3"), a_config) 

501 self.assertEqual(butler.get("b_config", collections="run3"), b_config) 

502 

503 def test_config_change(self) -> None: 

504 """Test init_output_run when there is an existing config that is 

505 inconsistent with the one in the pipeline graph. 

506 """ 

507 a_config = DynamicTestPipelineTaskConfig() 

508 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime") 

509 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime") 

510 pipeline_graph = PipelineGraph() 

511 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config) 

512 with self.prep_butler(pipeline_graph) as butler: 

513 butler.collections.register("run1") 

514 butler.put(DynamicTestPipelineTaskConfig(), "a_config", run="run1") 

515 with self.assertRaises(ConflictingDefinitionError): 

516 pipeline_graph.init_output_run( 

517 butler.clone(run="run1", collections=[self.INPUT_COLLECTION, "run1"]) 

518 ) 

519 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

520 pipeline_graph, 

521 butler, 

522 skip_existing_in=["run1"], 

523 output_run="run1", 

524 input_collections=[self.INPUT_COLLECTION], 

525 ) 

526 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": "run1"}).assemble() 

527 with self.assertRaises(ConflictingDefinitionError): 

528 quantum_graph.init_output_run( 

529 butler.clone(run="run1", collections=[self.INPUT_COLLECTION, "run1"]) 

530 ) 

531 

532 

533if __name__ == "__main__": 

534 lsst.utils.tests.init() 

535 unittest.main()