Coverage for tests / test_separable_pipeline_executor.py: 11%

728 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28 

29import os 

30import tempfile 

31import unittest 

32 

33import lsst.daf.butler 

34import lsst.daf.butler.tests as butlerTests 

35import lsst.pex.config 

36import lsst.utils.tests 

37from lsst.daf.butler.registry import RegistryDefaults 

38from lsst.pipe.base import ( 

39 Instrument, 

40 Pipeline, 

41 PipelineGraph, 

42 QuantumAttemptStatus, 

43 QuantumGraph, 

44 QuantumSuccessCaveats, 

45 TaskMetadata, 

46) 

47from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

48from lsst.pipe.base.automatic_connection_constants import ( 

49 PACKAGES_INIT_OUTPUT_NAME, 

50 PROVENANCE_DATASET_TYPE_NAME, 

51 PROVENANCE_STORAGE_CLASS, 

52) 

53from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError 

54from lsst.pipe.base.quantum_graph import ProvenanceQuantumGraph 

55from lsst.pipe.base.quantum_graph_builder import OutputExistsError 

56from lsst.pipe.base.separable_pipeline_executor import SeparablePipelineExecutor 

57from lsst.pipe.base.tests.mocks import ( 

58 DirectButlerRepo, 

59 DynamicTestPipelineTaskConfig, 

60) 

61from lsst.resources import ResourcePath 

62from lsst.utils.packages import Packages 

63 

64TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

65 

66 

67class SeparablePipelineExecutorTests(lsst.utils.tests.TestCase): 

68 """Test the SeparablePipelineExecutor API with a trivial task.""" 

69 

70 pipeline_file = os.path.join(TESTDIR, "pipelines", "pipeline_separable.yaml") 

71 

72 def setUp(self): 

73 repodir = tempfile.TemporaryDirectory() 

74 # TemporaryDirectory warns on leaks; addCleanup also keeps it from 

75 # getting garbage-collected. 

76 self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) 

77 

78 # standalone parameter forces the returned config to also include 

79 # the information from the search paths. 

80 config = lsst.daf.butler.Butler.makeRepo( 

81 repodir.name, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")] 

82 ) 

83 butler = lsst.daf.butler.Butler.from_config(config, writeable=True) 

84 self.enterContext(butler) 

85 output = "fake" 

86 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}" 

87 butler.registry.registerCollection(output_run, lsst.daf.butler.CollectionType.RUN) 

88 butler.registry.registerCollection(output, lsst.daf.butler.CollectionType.CHAINED) 

89 butler.registry.setCollectionChain(output, [output_run]) 

90 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run) 

91 self.butler = butler 

92 

93 butlerTests.addDatasetType(self.butler, "input", set(), "StructuredDataDict") 

94 butlerTests.addDatasetType(self.butler, "intermediate", set(), "StructuredDataDict") 

95 butlerTests.addDatasetType(self.butler, "a_log", set(), "ButlerLogRecords") 

96 butlerTests.addDatasetType(self.butler, "a_metadata", set(), "TaskMetadata") 

97 butlerTests.addDatasetType(self.butler, "a_config", set(), "Config") 

98 provenance_dataset_type = butlerTests.addDatasetType( 

99 self.butler, PROVENANCE_DATASET_TYPE_NAME, set(), PROVENANCE_STORAGE_CLASS 

100 ) 

101 self.provenance_ref = lsst.daf.butler.DatasetRef( 

102 provenance_dataset_type, 

103 lsst.daf.butler.DataCoordinate.make_empty(self.butler.dimensions), 

104 run=butler.run, 

105 ) 

106 

107 def build_empty_quantum_graph(self) -> None: 

108 pipeline_graph = PipelineGraph(universe=self.butler.dimensions) 

109 pipeline_graph.resolve(self.butler.registry) 

110 builder = AllDimensionsQuantumGraphBuilder(pipeline_graph, self.butler) 

111 return builder.finish(attach_datastore_records=False).assemble() 

112 

113 def check_provenance_fullgraph(self): 

114 provenance_qg = self.butler.get(self.provenance_ref) 

115 empty_data_id = lsst.daf.butler.DataCoordinate.make_empty(self.butler.dimensions) 

116 self.assertCountEqual(provenance_qg.quanta_by_task.keys(), {"a", "b"}) 

117 self.assertCountEqual( 

118 provenance_qg.datasets_by_type.keys(), 

119 { 

120 "input", 

121 "intermediate", 

122 "output", 

123 "a_config", 

124 "b_config", 

125 "a_metadata", 

126 "b_metadata", 

127 "a_log", 

128 "b_log", 

129 }, 

130 ) 

131 a_id = provenance_qg.quanta_by_task["a"][empty_data_id] 

132 b_id = provenance_qg.quanta_by_task["b"][empty_data_id] 

133 input_id = provenance_qg.datasets_by_type["input"][empty_data_id] 

134 intermediate_id = provenance_qg.datasets_by_type["intermediate"][empty_data_id] 

135 output_id = provenance_qg.datasets_by_type["output"][empty_data_id] 

136 a_metadata_id = provenance_qg.datasets_by_type["a_metadata"][empty_data_id] 

137 a_log_id = provenance_qg.datasets_by_type["a_log"][empty_data_id] 

138 b_metadata_id = provenance_qg.datasets_by_type["b_metadata"][empty_data_id] 

139 b_log_id = provenance_qg.datasets_by_type["b_log"][empty_data_id] 

140 self.assertEqual( 

141 provenance_qg.bipartite_xgraph.nodes[a_id]["status"], QuantumAttemptStatus.SUCCESSFUL 

142 ) 

143 self.assertEqual( 

144 provenance_qg.bipartite_xgraph.nodes[b_id]["status"], QuantumAttemptStatus.SUCCESSFUL 

145 ) 

146 self.assertEqual(list(provenance_qg.bipartite_xgraph.predecessors(a_id)), [input_id]) 

147 self.assertEqual( 

148 list(provenance_qg.bipartite_xgraph.successors(a_id)), [intermediate_id, a_metadata_id, a_log_id] 

149 ) 

150 self.assertEqual(list(provenance_qg.bipartite_xgraph.predecessors(b_id)), [intermediate_id]) 

151 self.assertEqual( 

152 list(provenance_qg.bipartite_xgraph.successors(b_id)), [output_id, b_metadata_id, b_log_id] 

153 ) 

154 for datasets_by_data_id in provenance_qg.datasets_by_type.values(): 

155 for dataset_id in datasets_by_data_id.values(): 

156 self.assertTrue(provenance_qg.bipartite_xgraph.nodes[dataset_id]["produced"]) 

157 logs_pqg = self.butler.get( 

158 self.provenance_ref.makeComponentRef("logs"), 

159 parameters={"quanta": (a_id,), "datasets": (b_log_id,)}, 

160 ) 

161 self.assertEqual(list(logs_pqg[a_id][-1]), list(self.butler.get("a_log", empty_data_id))) 

162 self.assertEqual(list(logs_pqg[b_log_id][-1]), list(self.butler.get("b_log", empty_data_id))) 

163 metadata_pqg = self.butler.get( 

164 self.provenance_ref.makeComponentRef("metadata"), 

165 parameters={"quanta": (b_id,), "datasets": (a_metadata_id,)}, 

166 ) 

167 self.assertEqual(metadata_pqg[a_metadata_id][-1], self.butler.get("a_metadata", empty_data_id)) 

168 self.assertEqual(metadata_pqg[b_id][-1], self.butler.get("b_metadata", empty_data_id)) 

169 self.assertIsInstance(self.butler.get("run_provenance.packages"), Packages) 

170 

171 def check_provenance_emptygraph(self): 

172 provenance_qg = self.butler.get(self.provenance_ref) 

173 self.assertFalse(provenance_qg.bipartite_xgraph) 

174 self.assertFalse(any(provenance_qg.quanta_by_task.values())) 

175 self.assertFalse(any(provenance_qg.datasets_by_type.values())) 

176 self.assertIsInstance(self.butler.get("run_provenance.packages"), Packages) 

177 

178 def test_pre_execute_qgraph_old(self): 

179 # Too hard to make a quantum graph from scratch. 

180 executor = SeparablePipelineExecutor(self.butler) 

181 pipeline = Pipeline.fromFile(self.pipeline_file) 

182 self.butler.put({"zero": 0}, "input") 

183 graph = executor.make_quantum_graph(pipeline) 

184 

185 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

186 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

187 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

188 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

189 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

190 

191 executor.pre_execute_qgraph( 

192 graph, 

193 register_dataset_types=False, 

194 save_init_outputs=False, 

195 save_versions=False, 

196 ) 

197 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

198 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

199 

200 def test_pre_execute_qgraph(self): 

201 # Too hard to make a quantum graph from scratch. 

202 executor = SeparablePipelineExecutor(self.butler) 

203 pipeline = Pipeline.fromFile(self.pipeline_file) 

204 self.butler.put({"zero": 0}, "input") 

205 graph = executor.build_quantum_graph(pipeline) 

206 

207 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

208 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

209 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

210 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

211 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

212 

213 executor.pre_execute_qgraph( 

214 graph, 

215 register_dataset_types=False, 

216 save_init_outputs=False, 

217 save_versions=False, 

218 ) 

219 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

220 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

221 

222 def test_pre_execute_qgraph_unconnected_old(self): 

223 # Unconnected graph; see 

224 # test_make_quantum_graph_nowhere_skippartial_clobber. 

225 executor = SeparablePipelineExecutor( 

226 self.butler, 

227 skip_existing_in=[self.butler.run], 

228 clobber_output=True, 

229 ) 

230 pipeline = Pipeline.fromFile(self.pipeline_file) 

231 self.butler.put({"zero": 0}, "input") 

232 self.butler.put({"zero": 0}, "intermediate") 

233 graph = executor.make_quantum_graph(pipeline) 

234 

235 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

236 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

237 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

238 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

239 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

240 

241 executor.pre_execute_qgraph( 

242 graph, 

243 register_dataset_types=False, 

244 save_init_outputs=False, 

245 save_versions=False, 

246 ) 

247 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

248 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

249 

250 def test_pre_execute_qgraph_unconnected(self): 

251 # Unconnected graph; see 

252 # test_make_quantum_graph_nowhere_skippartial_clobber. 

253 executor = SeparablePipelineExecutor( 

254 self.butler, 

255 skip_existing_in=[self.butler.run], 

256 clobber_output=True, 

257 ) 

258 pipeline = Pipeline.fromFile(self.pipeline_file) 

259 self.butler.put({"zero": 0}, "input") 

260 self.butler.put({"zero": 0}, "intermediate") 

261 graph = executor.build_quantum_graph(pipeline) 

262 

263 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

264 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

265 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

266 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

267 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

268 

269 executor.pre_execute_qgraph( 

270 graph, 

271 register_dataset_types=False, 

272 save_init_outputs=False, 

273 save_versions=False, 

274 ) 

275 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

276 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

277 

278 def test_pre_execute_qgraph_empty_old(self): 

279 executor = SeparablePipelineExecutor(self.butler) 

280 graph = QuantumGraph({}, universe=self.butler.dimensions) 

281 

282 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

283 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

284 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

285 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

286 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

287 

288 executor.pre_execute_qgraph( 

289 graph, 

290 register_dataset_types=False, 

291 save_init_outputs=False, 

292 save_versions=False, 

293 ) 

294 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

295 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

296 

297 def test_pre_execute_qgraph_empty(self): 

298 executor = SeparablePipelineExecutor(self.butler) 

299 graph = self.build_empty_quantum_graph() 

300 

301 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

302 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

303 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

304 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

305 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

306 

307 executor.pre_execute_qgraph( 

308 graph, 

309 register_dataset_types=False, 

310 save_init_outputs=False, 

311 save_versions=False, 

312 ) 

313 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

314 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

315 

316 def test_pre_execute_qgraph_register_old(self): 

317 executor = SeparablePipelineExecutor(self.butler) 

318 pipeline = Pipeline.fromFile(self.pipeline_file) 

319 self.butler.put({"zero": 0}, "input") 

320 graph = executor.make_quantum_graph(pipeline) 

321 

322 executor.pre_execute_qgraph( 

323 graph, 

324 register_dataset_types=True, 

325 save_init_outputs=False, 

326 save_versions=False, 

327 ) 

328 self.assertEqual({d.name for d in self.butler.registry.queryDatasetTypes("output")}, {"output"}) 

329 self.assertEqual( 

330 {d.name for d in self.butler.registry.queryDatasetTypes("b_*")}, 

331 {"b_config", "b_log", "b_metadata"}, 

332 ) 

333 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

334 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

335 

336 def test_pre_execute_qgraph_register(self): 

337 executor = SeparablePipelineExecutor(self.butler) 

338 pipeline = Pipeline.fromFile(self.pipeline_file) 

339 self.butler.put({"zero": 0}, "input") 

340 graph = executor.build_quantum_graph(pipeline) 

341 

342 executor.pre_execute_qgraph( 

343 graph, 

344 register_dataset_types=True, 

345 save_init_outputs=False, 

346 save_versions=False, 

347 ) 

348 self.assertEqual({d.name for d in self.butler.registry.queryDatasetTypes("output")}, {"output"}) 

349 self.assertEqual( 

350 {d.name for d in self.butler.registry.queryDatasetTypes("b_*")}, 

351 {"b_config", "b_log", "b_metadata"}, 

352 ) 

353 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

354 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

355 

356 def test_pre_execute_qgraph_init_outputs_old(self): 

357 # Too hard to make a quantum graph from scratch. 

358 executor = SeparablePipelineExecutor(self.butler) 

359 pipeline = Pipeline.fromFile(self.pipeline_file) 

360 self.butler.put({"zero": 0}, "input") 

361 graph = executor.make_quantum_graph(pipeline) 

362 

363 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

364 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

365 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

366 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

367 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

368 

369 executor.pre_execute_qgraph( 

370 graph, 

371 register_dataset_types=False, 

372 save_init_outputs=True, 

373 save_versions=False, 

374 ) 

375 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

376 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

377 

378 def test_pre_execute_qgraph_init_outputs(self): 

379 # Too hard to make a quantum graph from scratch. 

380 executor = SeparablePipelineExecutor(self.butler) 

381 pipeline = Pipeline.fromFile(self.pipeline_file) 

382 self.butler.put({"zero": 0}, "input") 

383 graph = executor.build_quantum_graph(pipeline) 

384 

385 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

386 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

387 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

388 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

389 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

390 

391 executor.pre_execute_qgraph( 

392 graph, 

393 register_dataset_types=False, 

394 save_init_outputs=True, 

395 save_versions=False, 

396 ) 

397 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

398 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

399 

400 def test_pre_execute_qgraph_versions_old(self): 

401 executor = SeparablePipelineExecutor(self.butler) 

402 pipeline = Pipeline.fromFile(self.pipeline_file) 

403 self.butler.put({"zero": 0}, "input") 

404 graph = executor.make_quantum_graph(pipeline) 

405 

406 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

407 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

408 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

409 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

410 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

411 

412 executor.pre_execute_qgraph( 

413 graph, 

414 register_dataset_types=False, 

415 save_init_outputs=True, 

416 save_versions=True, 

417 ) 

418 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

419 self.assertTrue(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

420 

421 def test_pre_execute_qgraph_versions(self): 

422 executor = SeparablePipelineExecutor(self.butler) 

423 pipeline = Pipeline.fromFile(self.pipeline_file) 

424 self.butler.put({"zero": 0}, "input") 

425 graph = executor.build_quantum_graph(pipeline) 

426 

427 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

428 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

429 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

430 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

431 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages") 

432 

433 executor.pre_execute_qgraph( 

434 graph, 

435 register_dataset_types=False, 

436 save_init_outputs=True, 

437 save_versions=True, 

438 ) 

439 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run])) 

440 self.assertTrue(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {})) 

441 

442 def test_init_badinput(self): 

443 with lsst.daf.butler.Butler.from_config(butler=self.butler, collections=[], run="foo") as butler: 

444 with self.assertRaises(ValueError): 

445 SeparablePipelineExecutor(butler) 

446 

447 def test_init_badoutput(self): 

448 with lsst.daf.butler.Butler.from_config(butler=self.butler, collections=["foo"]) as butler: 

449 with self.assertRaises(ValueError): 

450 SeparablePipelineExecutor(butler) 

451 

452 def test_make_pipeline_full(self): 

453 executor = SeparablePipelineExecutor(self.butler) 

454 for uri in [ 

455 self.pipeline_file, 

456 ResourcePath(self.pipeline_file), 

457 ResourcePath(self.pipeline_file).geturl(), 

458 ]: 

459 pipeline_graph = executor.make_pipeline(uri).to_graph() 

460 self.assertEqual(set(pipeline_graph.tasks), {"a", "b"}) 

461 

462 def test_make_pipeline_subset(self): 

463 executor = SeparablePipelineExecutor(self.butler) 

464 path = self.pipeline_file + "#a" 

465 for uri in [ 

466 path, 

467 ResourcePath(path), 

468 ResourcePath(path).geturl(), 

469 ]: 

470 pipeline_graph = executor.make_pipeline(uri).to_graph() 

471 self.assertEqual(set(pipeline_graph.tasks), {"a"}) 

472 

473 def test_build_quantum_graph_nowhere_noskip_noclobber(self): 

474 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

475 pipeline = Pipeline.fromFile(self.pipeline_file) 

476 

477 self.butler.put({"zero": 0}, "input") 

478 

479 graph = executor.make_quantum_graph(pipeline) 

480 self.assertTrue(graph.isConnected) 

481 self.assertEqual(len(graph), 2) 

482 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"}) 

483 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

484 

485 def test_make_quantum_graph_nowhere_noskip_noclobber(self): 

486 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

487 pipeline = Pipeline.fromFile(self.pipeline_file) 

488 

489 self.butler.put({"zero": 0}, "input") 

490 

491 graph = executor.build_quantum_graph(pipeline) 

492 self.assertEqual(len(graph), 2) 

493 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

494 

495 def test_make_quantum_graph_nowhere_noskip_noclobber_conflict(self): 

496 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

497 pipeline = Pipeline.fromFile(self.pipeline_file) 

498 

499 self.butler.put({"zero": 0}, "input") 

500 self.butler.put({"zero": 0}, "intermediate") 

501 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

502 self.butler.put(TaskMetadata(), "a_metadata") 

503 

504 with self.assertRaises(OutputExistsError): 

505 executor.build_quantum_graph(pipeline) 

506 

507 # TODO: need more complex task and Butler to test 

508 # make_quantum_graph(where=...) 

509 

510 def test_build_quantum_graph_nowhere_skipnone_noclobber(self): 

511 executor = SeparablePipelineExecutor( 

512 self.butler, 

513 skip_existing_in=[self.butler.run], 

514 clobber_output=False, 

515 ) 

516 pipeline = Pipeline.fromFile(self.pipeline_file) 

517 

518 self.butler.put({"zero": 0}, "input") 

519 

520 graph = executor.make_quantum_graph(pipeline) 

521 self.assertTrue(graph.isConnected) 

522 self.assertEqual(len(graph), 2) 

523 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"}) 

524 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

525 

526 def test_make_quantum_graph_nowhere_skipnone_noclobber(self): 

527 executor = SeparablePipelineExecutor( 

528 self.butler, 

529 skip_existing_in=[self.butler.run], 

530 clobber_output=False, 

531 ) 

532 pipeline = Pipeline.fromFile(self.pipeline_file) 

533 self.butler.put({"zero": 0}, "input") 

534 graph = executor.build_quantum_graph(pipeline) 

535 self.assertEqual(len(graph), 2) 

536 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

537 

538 def test_build_quantum_graph_nowhere_skiptotal_noclobber(self): 

539 executor = SeparablePipelineExecutor( 

540 self.butler, 

541 skip_existing_in=[self.butler.run], 

542 clobber_output=False, 

543 ) 

544 pipeline = Pipeline.fromFile(self.pipeline_file) 

545 

546 self.butler.put({"zero": 0}, "input") 

547 self.butler.put({"zero": 0}, "intermediate") 

548 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

549 self.butler.put(TaskMetadata(), "a_metadata") 

550 self.butler.put(lsst.pex.config.Config(), "a_config") 

551 

552 graph = executor.make_quantum_graph(pipeline) 

553 self.assertTrue(graph.isConnected) 

554 self.assertEqual(len(graph), 1) 

555 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"}) 

556 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

557 

558 def test_make_quantum_graph_nowhere_skiptotal_noclobber(self): 

559 executor = SeparablePipelineExecutor( 

560 self.butler, 

561 skip_existing_in=[self.butler.run], 

562 clobber_output=False, 

563 ) 

564 pipeline = Pipeline.fromFile(self.pipeline_file) 

565 

566 self.butler.put({"zero": 0}, "input") 

567 self.butler.put({"zero": 0}, "intermediate") 

568 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

569 self.butler.put(TaskMetadata(), "a_metadata") 

570 self.butler.put(lsst.pex.config.Config(), "a_config") 

571 

572 graph = executor.build_quantum_graph(pipeline) 

573 self.assertEqual(len(graph), 1) 

574 self.assertEqual(graph.header.n_task_quanta["a"], 0) 

575 self.assertEqual(graph.header.n_task_quanta["b"], 1) 

576 

577 def test_make_quantum_graph_nowhere_skippartial_noclobber(self): 

578 executor = SeparablePipelineExecutor( 

579 self.butler, 

580 skip_existing_in=[self.butler.run], 

581 clobber_output=False, 

582 ) 

583 pipeline = Pipeline.fromFile(self.pipeline_file) 

584 

585 self.butler.put({"zero": 0}, "input") 

586 self.butler.put({"zero": 0}, "intermediate") 

587 

588 with self.assertRaises(OutputExistsError): 

589 executor.build_quantum_graph(pipeline) 

590 

591 def test_build_quantum_graph_nowhere_noskip_clobber(self): 

592 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

593 pipeline = Pipeline.fromFile(self.pipeline_file) 

594 

595 self.butler.put({"zero": 0}, "input") 

596 

597 graph = executor.make_quantum_graph(pipeline) 

598 self.assertTrue(graph.isConnected) 

599 self.assertEqual(len(graph), 2) 

600 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"}) 

601 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

602 

603 def test_make_quantum_graph_nowhere_noskip_clobber(self): 

604 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

605 pipeline = Pipeline.fromFile(self.pipeline_file) 

606 self.butler.put({"zero": 0}, "input") 

607 graph = executor.build_quantum_graph(pipeline) 

608 self.assertEqual(len(graph), 2) 

609 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

610 

611 def test_build_quantum_graph_nowhere_noskip_clobber_conflict(self): 

612 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

613 pipeline = Pipeline.fromFile(self.pipeline_file) 

614 

615 self.butler.put({"zero": 0}, "input") 

616 self.butler.put({"zero": 0}, "intermediate") 

617 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

618 self.butler.put(TaskMetadata(), "a_metadata") 

619 

620 graph = executor.make_quantum_graph(pipeline) 

621 self.assertTrue(graph.isConnected) 

622 self.assertEqual(len(graph), 2) 

623 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"}) 

624 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

625 

626 def test_make_quantum_graph_nowhere_noskip_clobber_conflict(self): 

627 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

628 pipeline = Pipeline.fromFile(self.pipeline_file) 

629 self.butler.put({"zero": 0}, "input") 

630 self.butler.put({"zero": 0}, "intermediate") 

631 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

632 self.butler.put(TaskMetadata(), "a_metadata") 

633 graph = executor.build_quantum_graph(pipeline) 

634 self.assertEqual(len(graph), 2) 

635 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

636 

637 def test_build_quantum_graph_nowhere_skipnone_clobber(self): 

638 executor = SeparablePipelineExecutor( 

639 self.butler, 

640 skip_existing_in=[self.butler.run], 

641 clobber_output=True, 

642 ) 

643 pipeline = Pipeline.fromFile(self.pipeline_file) 

644 

645 self.butler.put({"zero": 0}, "input") 

646 

647 graph = executor.make_quantum_graph(pipeline) 

648 self.assertTrue(graph.isConnected) 

649 self.assertEqual(len(graph), 2) 

650 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"}) 

651 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

652 

653 def test_make_quantum_graph_nowhere_skipnone_clobber(self): 

654 executor = SeparablePipelineExecutor( 

655 self.butler, 

656 skip_existing_in=[self.butler.run], 

657 clobber_output=True, 

658 ) 

659 pipeline = Pipeline.fromFile(self.pipeline_file) 

660 self.butler.put({"zero": 0}, "input") 

661 graph = executor.build_quantum_graph(pipeline) 

662 self.assertEqual(len(graph), 2) 

663 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

664 

665 def test_make_quantum_graph_nowhere_skiptotal_clobber(self): 

666 executor = SeparablePipelineExecutor( 

667 self.butler, 

668 skip_existing_in=[self.butler.run], 

669 clobber_output=True, 

670 ) 

671 pipeline = Pipeline.fromFile(self.pipeline_file) 

672 

673 self.butler.put({"zero": 0}, "input") 

674 self.butler.put({"zero": 0}, "intermediate") 

675 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

676 self.butler.put(TaskMetadata(), "a_metadata") 

677 self.butler.put(lsst.pex.config.Config(), "a_config") 

678 

679 graph = executor.make_quantum_graph(pipeline) 

680 self.assertTrue(graph.isConnected) 

681 self.assertEqual(len(graph), 1) 

682 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"}) 

683 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

684 

685 def test_build_quantum_graph_nowhere_skiptotal_clobber(self): 

686 executor = SeparablePipelineExecutor( 

687 self.butler, 

688 skip_existing_in=[self.butler.run], 

689 clobber_output=True, 

690 ) 

691 pipeline = Pipeline.fromFile(self.pipeline_file) 

692 

693 self.butler.put({"zero": 0}, "input") 

694 self.butler.put({"zero": 0}, "intermediate") 

695 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

696 self.butler.put(TaskMetadata(), "a_metadata") 

697 self.butler.put(lsst.pex.config.Config(), "a_config") 

698 

699 graph = executor.make_quantum_graph(pipeline) 

700 self.assertTrue(graph.isConnected) 

701 self.assertEqual(len(graph), 1) 

702 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"}) 

703 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"}) 

704 

705 def test_make_quantum_graph_nowhere_skippartial_clobber(self): 

706 executor = SeparablePipelineExecutor( 

707 self.butler, 

708 skip_existing_in=[self.butler.run], 

709 clobber_output=True, 

710 ) 

711 pipeline = Pipeline.fromFile(self.pipeline_file) 

712 self.butler.put({"zero": 0}, "input") 

713 self.butler.put({"zero": 0}, "intermediate") 

714 graph = executor.build_quantum_graph(pipeline) 

715 self.assertEqual(len(graph), 2) 

716 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) 

717 

718 def test_make_quantum_graph_noinput(self): 

719 executor = SeparablePipelineExecutor(self.butler) 

720 pipeline = Pipeline.fromFile(self.pipeline_file) 

721 

722 graph = executor.make_quantum_graph(pipeline) 

723 self.assertEqual(len(graph), 0) 

724 

725 def test_build_quantum_graph_noinput(self): 

726 executor = SeparablePipelineExecutor(self.butler) 

727 pipeline = Pipeline.fromFile(self.pipeline_file) 

728 

729 graph = executor.build_quantum_graph(pipeline) 

730 self.assertEqual(len(graph), 0) 

731 

732 def test_make_quantum_graph_alloutput_skip(self): 

733 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=[self.butler.run]) 

734 pipeline = Pipeline.fromFile(self.pipeline_file) 

735 

736 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

737 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

738 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

739 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

740 

741 self.butler.put({"zero": 0}, "input") 

742 self.butler.put({"zero": 0}, "intermediate") 

743 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

744 self.butler.put(TaskMetadata(), "a_metadata") 

745 self.butler.put({"zero": 0}, "output") 

746 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "b_log") 

747 self.butler.put(TaskMetadata(), "b_metadata") 

748 self.butler.put(lsst.pex.config.Config(), "a_config") 

749 self.butler.put(lsst.pex.config.Config(), "b_config") 

750 

751 graph = executor.make_quantum_graph(pipeline) 

752 self.assertEqual(len(graph), 0) 

753 

754 def test_build_quantum_graph_alloutput_skip(self): 

755 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=[self.butler.run]) 

756 pipeline = Pipeline.fromFile(self.pipeline_file) 

757 

758 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict") 

759 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords") 

760 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata") 

761 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config") 

762 

763 self.butler.put({"zero": 0}, "input") 

764 self.butler.put({"zero": 0}, "intermediate") 

765 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

766 self.butler.put(TaskMetadata(), "a_metadata") 

767 self.butler.put({"zero": 0}, "output") 

768 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "b_log") 

769 self.butler.put(TaskMetadata(), "b_metadata") 

770 self.butler.put(lsst.pex.config.Config(), "a_config") 

771 self.butler.put(lsst.pex.config.Config(), "b_config") 

772 

773 graph = executor.build_quantum_graph(pipeline) 

774 self.assertEqual(len(graph), 0) 

775 

776 def test_run_pipeline_noskip_noclobber_fullgraph(self): 

777 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

778 pipeline = Pipeline.fromFile(self.pipeline_file) 

779 self.butler.put({"zero": 0}, "input") 

780 graph = executor.make_quantum_graph(pipeline) 

781 executor.pre_execute_qgraph( 

782 graph, 

783 register_dataset_types=True, 

784 save_init_outputs=True, 

785 save_versions=False, 

786 ) 

787 

788 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

789 self.butler.registry.refresh() 

790 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

791 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

792 self.check_provenance_fullgraph() 

793 

794 def test_run_pipeline_noskip_noclobber_fullgraph_old(self): 

795 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

796 pipeline = Pipeline.fromFile(self.pipeline_file) 

797 self.butler.put({"zero": 0}, "input") 

798 graph = executor.build_quantum_graph(pipeline) 

799 executor.pre_execute_qgraph( 

800 graph, 

801 register_dataset_types=True, 

802 save_init_outputs=True, 

803 save_versions=False, 

804 ) 

805 

806 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

807 self.butler.registry.refresh() 

808 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

809 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

810 self.check_provenance_fullgraph() 

811 

812 def test_run_pipeline_noskip_noclobber_emptygraph_old(self): 

813 old_repo_size = self.butler.registry.queryDatasets(...).count() 

814 

815 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

816 graph = QuantumGraph({}, universe=self.butler.dimensions) 

817 executor.pre_execute_qgraph( 

818 graph, 

819 register_dataset_types=True, 

820 save_init_outputs=True, 

821 save_versions=False, 

822 ) 

823 

824 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

825 self.butler.registry.refresh() 

826 # Empty graph execution should do nothing but write provenance. 

827 self.assertEqual(self.butler.registry.queryDatasets(...).count(), old_repo_size + 1) 

828 self.check_provenance_emptygraph() 

829 

830 def test_run_pipeline_noskip_noclobber_emptygraph(self): 

831 old_repo_size = self.butler.registry.queryDatasets(...).count() 

832 

833 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False) 

834 graph = self.build_empty_quantum_graph() 

835 executor.pre_execute_qgraph( 

836 graph, 

837 register_dataset_types=True, 

838 save_init_outputs=True, 

839 save_versions=False, 

840 ) 

841 

842 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

843 self.butler.registry.refresh() 

844 # Empty graph execution should do nothing but write provenance. 

845 self.assertEqual(self.butler.registry.queryDatasets(...).count(), old_repo_size + 1) 

846 self.check_provenance_emptygraph() 

847 

848 def test_run_pipeline_skipnone_noclobber_old(self): 

849 executor = SeparablePipelineExecutor( 

850 self.butler, 

851 skip_existing_in=[self.butler.run], 

852 clobber_output=False, 

853 ) 

854 pipeline = Pipeline.fromFile(self.pipeline_file) 

855 self.butler.put({"zero": 0}, "input") 

856 graph = executor.make_quantum_graph(pipeline) 

857 executor.pre_execute_qgraph( 

858 graph, 

859 register_dataset_types=True, 

860 save_init_outputs=True, 

861 save_versions=False, 

862 ) 

863 

864 executor.run_pipeline(graph) 

865 self.butler.registry.refresh() 

866 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

867 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

868 

869 def test_run_pipeline_skipnone_noclobber(self): 

870 executor = SeparablePipelineExecutor( 

871 self.butler, 

872 skip_existing_in=[self.butler.run], 

873 clobber_output=False, 

874 ) 

875 pipeline = Pipeline.fromFile(self.pipeline_file) 

876 self.butler.put({"zero": 0}, "input") 

877 graph = executor.build_quantum_graph(pipeline) 

878 executor.pre_execute_qgraph( 

879 graph, 

880 register_dataset_types=True, 

881 save_init_outputs=True, 

882 save_versions=False, 

883 ) 

884 

885 executor.run_pipeline(graph) 

886 self.butler.registry.refresh() 

887 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

888 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

889 

890 def test_run_pipeline_skiptotal_noclobber_old(self): 

891 executor = SeparablePipelineExecutor( 

892 self.butler, 

893 skip_existing_in=[self.butler.run], 

894 clobber_output=False, 

895 ) 

896 pipeline = Pipeline.fromFile(self.pipeline_file) 

897 self.butler.put({"zero": 0}, "input") 

898 self.butler.put({"zero": 0}, "intermediate") 

899 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

900 self.butler.put(TaskMetadata(), "a_metadata") 

901 self.butler.put(lsst.pex.config.Config(), "a_config") 

902 graph = executor.make_quantum_graph(pipeline) 

903 executor.pre_execute_qgraph( 

904 graph, 

905 register_dataset_types=True, 

906 save_init_outputs=True, 

907 save_versions=False, 

908 ) 

909 

910 executor.run_pipeline(graph) 

911 self.butler.registry.refresh() 

912 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2}) 

913 

914 def test_run_pipeline_skiptotal_noclobber(self): 

915 executor = SeparablePipelineExecutor( 

916 self.butler, 

917 skip_existing_in=[self.butler.run], 

918 clobber_output=False, 

919 ) 

920 pipeline = Pipeline.fromFile(self.pipeline_file) 

921 self.butler.put({"zero": 0}, "input") 

922 self.butler.put({"zero": 0}, "intermediate") 

923 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

924 self.butler.put(TaskMetadata(), "a_metadata") 

925 self.butler.put(lsst.pex.config.Config(), "a_config") 

926 graph = executor.build_quantum_graph(pipeline) 

927 executor.pre_execute_qgraph( 

928 graph, 

929 register_dataset_types=True, 

930 save_init_outputs=True, 

931 save_versions=False, 

932 ) 

933 

934 executor.run_pipeline(graph) 

935 self.butler.registry.refresh() 

936 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2}) 

937 

938 def test_run_pipeline_noskip_clobber_connected_old(self): 

939 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

940 pipeline = Pipeline.fromFile(self.pipeline_file) 

941 self.butler.put({"zero": 0}, "input") 

942 graph = executor.make_quantum_graph(pipeline) 

943 executor.pre_execute_qgraph( 

944 graph, 

945 register_dataset_types=True, 

946 save_init_outputs=True, 

947 save_versions=False, 

948 ) 

949 

950 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

951 self.butler.registry.refresh() 

952 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

953 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

954 self.check_provenance_fullgraph() 

955 

956 def test_run_pipeline_noskip_clobber_connected(self): 

957 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

958 pipeline = Pipeline.fromFile(self.pipeline_file) 

959 self.butler.put({"zero": 0}, "input") 

960 graph = executor.build_quantum_graph(pipeline) 

961 executor.pre_execute_qgraph( 

962 graph, 

963 register_dataset_types=True, 

964 save_init_outputs=True, 

965 save_versions=False, 

966 ) 

967 

968 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

969 self.butler.registry.refresh() 

970 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

971 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

972 self.check_provenance_fullgraph() 

973 

974 def test_run_pipeline_noskip_clobber_unconnected_old(self): 

975 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

976 pipeline = Pipeline.fromFile(self.pipeline_file) 

977 self.butler.put({"zero": 0}, "input") 

978 self.butler.put({"zero": 0}, "intermediate") 

979 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

980 self.butler.put(TaskMetadata(), "a_metadata") 

981 graph = executor.make_quantum_graph(pipeline) 

982 executor.pre_execute_qgraph( 

983 graph, 

984 register_dataset_types=True, 

985 save_init_outputs=True, 

986 save_versions=False, 

987 ) 

988 

989 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

990 self.butler.registry.refresh() 

991 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

992 # The value of output is undefined; it depends on which task ran first. 

993 self.assertTrue(self.butler.exists("output", {})) 

994 

995 def test_run_pipeline_noskip_clobber_unconnected(self): 

996 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) 

997 pipeline = Pipeline.fromFile(self.pipeline_file) 

998 self.butler.put({"zero": 0}, "input") 

999 self.butler.put({"zero": 0}, "intermediate") 

1000 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

1001 self.butler.put(TaskMetadata(), "a_metadata") 

1002 graph = executor.build_quantum_graph(pipeline) 

1003 executor.pre_execute_qgraph( 

1004 graph, 

1005 register_dataset_types=True, 

1006 save_init_outputs=True, 

1007 save_versions=False, 

1008 ) 

1009 

1010 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref) 

1011 self.butler.registry.refresh() 

1012 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

1013 # The value of output is undefined; it depends on which task ran first. 

1014 self.assertTrue(self.butler.exists("output", {})) 

1015 self.check_provenance_fullgraph() 

1016 

1017 def test_run_pipeline_skipnone_clobber_old(self): 

1018 executor = SeparablePipelineExecutor( 

1019 self.butler, 

1020 skip_existing_in=[self.butler.run], 

1021 clobber_output=True, 

1022 ) 

1023 pipeline = Pipeline.fromFile(self.pipeline_file) 

1024 self.butler.put({"zero": 0}, "input") 

1025 graph = executor.make_quantum_graph(pipeline) 

1026 executor.pre_execute_qgraph( 

1027 graph, 

1028 register_dataset_types=True, 

1029 save_init_outputs=True, 

1030 save_versions=False, 

1031 ) 

1032 

1033 executor.run_pipeline(graph) 

1034 self.butler.registry.refresh() 

1035 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

1036 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

1037 

1038 def test_run_pipeline_skipnone_clobber(self): 

1039 executor = SeparablePipelineExecutor( 

1040 self.butler, 

1041 skip_existing_in=[self.butler.run], 

1042 clobber_output=True, 

1043 ) 

1044 pipeline = Pipeline.fromFile(self.pipeline_file) 

1045 self.butler.put({"zero": 0}, "input") 

1046 graph = executor.build_quantum_graph(pipeline) 

1047 executor.pre_execute_qgraph( 

1048 graph, 

1049 register_dataset_types=True, 

1050 save_init_outputs=True, 

1051 save_versions=False, 

1052 ) 

1053 

1054 executor.run_pipeline(graph) 

1055 self.butler.registry.refresh() 

1056 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

1057 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) 

1058 

1059 def test_run_pipeline_skiptotal_clobber_connected_old(self): 

1060 executor = SeparablePipelineExecutor( 

1061 self.butler, 

1062 skip_existing_in=[self.butler.run], 

1063 clobber_output=True, 

1064 ) 

1065 pipeline = Pipeline.fromFile(self.pipeline_file) 

1066 self.butler.put({"zero": 0}, "input") 

1067 self.butler.put({"zero": 0}, "intermediate") 

1068 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

1069 self.butler.put(TaskMetadata(), "a_metadata") 

1070 self.butler.put(lsst.pex.config.Config(), "a_config") 

1071 graph = executor.make_quantum_graph(pipeline) 

1072 executor.pre_execute_qgraph( 

1073 graph, 

1074 register_dataset_types=True, 

1075 save_init_outputs=True, 

1076 save_versions=False, 

1077 ) 

1078 

1079 executor.run_pipeline(graph) 

1080 self.butler.registry.refresh() 

1081 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2}) 

1082 

1083 def test_run_pipeline_skiptotal_clobber_connected(self): 

1084 executor = SeparablePipelineExecutor( 

1085 self.butler, 

1086 skip_existing_in=[self.butler.run], 

1087 clobber_output=True, 

1088 ) 

1089 pipeline = Pipeline.fromFile(self.pipeline_file) 

1090 self.butler.put({"zero": 0}, "input") 

1091 self.butler.put({"zero": 0}, "intermediate") 

1092 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log") 

1093 self.butler.put(TaskMetadata(), "a_metadata") 

1094 self.butler.put(lsst.pex.config.Config(), "a_config") 

1095 graph = executor.build_quantum_graph(pipeline) 

1096 executor.pre_execute_qgraph( 

1097 graph, 

1098 register_dataset_types=True, 

1099 save_init_outputs=True, 

1100 save_versions=False, 

1101 ) 

1102 

1103 executor.run_pipeline(graph) 

1104 self.butler.registry.refresh() 

1105 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2}) 

1106 

1107 def test_run_pipeline_skippartial_clobber_unconnected_old(self): 

1108 executor = SeparablePipelineExecutor( 

1109 self.butler, 

1110 skip_existing_in=[self.butler.run], 

1111 clobber_output=True, 

1112 ) 

1113 pipeline = Pipeline.fromFile(self.pipeline_file) 

1114 self.butler.put({"zero": 0}, "input") 

1115 self.butler.put({"zero": 0}, "intermediate") 

1116 graph = executor.make_quantum_graph(pipeline) 

1117 executor.pre_execute_qgraph( 

1118 graph, 

1119 register_dataset_types=True, 

1120 save_init_outputs=True, 

1121 save_versions=False, 

1122 ) 

1123 

1124 executor.run_pipeline(graph) 

1125 self.butler.registry.refresh() 

1126 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

1127 # The value of output is undefined; it depends on which task ran first. 

1128 self.assertTrue(self.butler.exists("output", {})) 

1129 

1130 def test_run_pipeline_skippartial_clobber_unconnected(self): 

1131 executor = SeparablePipelineExecutor( 

1132 self.butler, 

1133 skip_existing_in=[self.butler.run], 

1134 clobber_output=True, 

1135 ) 

1136 pipeline = Pipeline.fromFile(self.pipeline_file) 

1137 self.butler.put({"zero": 0}, "input") 

1138 self.butler.put({"zero": 0}, "intermediate") 

1139 graph = executor.build_quantum_graph(pipeline) 

1140 executor.pre_execute_qgraph( 

1141 graph, 

1142 register_dataset_types=True, 

1143 save_init_outputs=True, 

1144 save_versions=False, 

1145 ) 

1146 executor.run_pipeline(graph) 

1147 self.butler.registry.refresh() 

1148 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) 

1149 # The value of output is undefined; it depends on which task ran first. 

1150 self.assertTrue(self.butler.exists("output", {})) 

1151 

1152 

1153class SeparablePipelineExecutorMockTests(lsst.utils.tests.TestCase): 

1154 """Additional tests for SeparablePipelineExecutor API that use 

1155 the lsst.pipe.base.tests.mocks system to define complex pipelines. 

1156 """ 

1157 

1158 def setUp(self): 

1159 # 'base.yaml' adds an instrument, 'Cam1', with four detectors and 

1160 # two physical filters. 

1161 self.helper, _ = self.enterContext(DirectButlerRepo.make_temporary("base.yaml")) 

1162 

1163 def run_base_test( 

1164 self, b_config: DynamicTestPipelineTaskConfig, expected_error: type[Exception] | None 

1165 ) -> ProvenanceQuantumGraph: 

1166 """Build and run a quantum graph with three tasks and four data IDs, 

1167 with customization of the middle task. 

1168 """ 

1169 self.helper.add_task("a", dimensions=["detector"]) 

1170 self.helper.add_task("b", dimensions=["detector"], config=b_config) 

1171 self.helper.add_task("c", dimensions=["detector"]) 

1172 qg = self.helper.make_quantum_graph() 

1173 self.helper.butler.collections.register(qg.header.output_run) 

1174 qg.init_output_run(self.helper.butler, existing=False) 

1175 executor = SeparablePipelineExecutor( 

1176 self.helper.butler.clone(collections=qg.header.inputs, run=qg.header.output_run) 

1177 ) 

1178 provenance_type = lsst.daf.butler.DatasetType( 

1179 PROVENANCE_DATASET_TYPE_NAME, 

1180 self.helper.butler.dimensions.empty, 

1181 PROVENANCE_STORAGE_CLASS, 

1182 ) 

1183 self.helper.butler.registry.registerDatasetType(provenance_type) 

1184 provenance_ref = lsst.daf.butler.DatasetRef( 

1185 provenance_type, 

1186 lsst.daf.butler.DataCoordinate.make_empty(self.helper.butler.dimensions), 

1187 run=qg.header.output_run, 

1188 ) 

1189 if expected_error is None: 

1190 executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref) 

1191 else: 

1192 with self.assertRaises(expected_error): 

1193 executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref) 

1194 provenance_graph = self.helper.butler.get(provenance_ref) 

1195 self.assertEqual(len(provenance_graph.quanta_by_task), 3) 

1196 self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4) 

1197 self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4) 

1198 self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4) 

1199 return provenance_graph 

1200 

1201 def test_no_work_chain_provenance(self): 

1202 """Test provenance recording when a NoWorkFound error chains to 

1203 downstream tasks during execution. 

1204 """ 

1205 b_config = DynamicTestPipelineTaskConfig() 

1206 b_config.fail_exception = "lsst.pipe.base.NoWorkFound" 

1207 b_config.fail_condition = "detector=2" 

1208 provenance_graph = self.run_base_test(b_config, expected_error=None) 

1209 xgraph = provenance_graph.quantum_only_xgraph 

1210 for quantum_id in provenance_graph.quanta_by_task["a"].values(): 

1211 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1212 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1213 for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items(): 

1214 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1215 if data_id["detector"] == 2: 

1216 self.assertTrue(xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.NO_WORK) 

1217 else: 

1218 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1219 for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items(): 

1220 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1221 if data_id["detector"] == 2: 

1222 self.assertTrue( 

1223 xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED 

1224 ) 

1225 else: 

1226 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1227 

1228 def test_failure_block_provenance(self): 

1229 """Test provenance recording when an exception blocks one branch of a 

1230 QG but not another. 

1231 """ 

1232 # 'base.yaml' adds an instrument, 'Cam1', with four detectors and 

1233 # two physical filters. 

1234 b_config = DynamicTestPipelineTaskConfig() 

1235 b_config.fail_exception = "builtins.RuntimeError" 

1236 b_config.fail_condition = "detector=2" 

1237 provenance_graph = self.run_base_test(b_config, MPGraphExecutorError) 

1238 self.assertEqual(len(provenance_graph.quanta_by_task), 3) 

1239 self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4) 

1240 self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4) 

1241 self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4) 

1242 xgraph = provenance_graph.quantum_only_xgraph 

1243 for quantum_id in provenance_graph.quanta_by_task["a"].values(): 

1244 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1245 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1246 for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items(): 

1247 if data_id["detector"] == 2: 

1248 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.FAILED) 

1249 else: 

1250 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1251 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1252 for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items(): 

1253 if data_id["detector"] == 2: 

1254 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.BLOCKED) 

1255 else: 

1256 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL) 

1257 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS) 

1258 

1259 

1260class MemoryTester(lsst.utils.tests.MemoryTestCase): 

1261 """Generic test for file leaks.""" 

1262 

1263 

1264def setup_module(module): 

1265 """Set up the module for pytest. 

1266 

1267 Parameters 

1268 ---------- 

1269 module : `~types.ModuleType` 

1270 Module to set up. 

1271 """ 

1272 lsst.utils.tests.init() 

1273 

1274 

1275if __name__ == "__main__": 

1276 lsst.utils.tests.init() 

1277 unittest.main()