Coverage for tests / test_predicted_qg.py: 13%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import itertools 

29import logging 

30import os 

31import shutil 

32import subprocess 

33import tempfile 

34import unittest 

35import uuid 

36from io import StringIO 

37 

38import lsst.utils.tests 

39from lsst.pipe.base import QuantumGraph 

40from lsst.pipe.base.dot_tools import graph2dot 

41from lsst.pipe.base.mermaid_tools import graph2mermaid 

42from lsst.pipe.base.quantum_graph import ( 

43 FORMAT_VERSION, 

44 AddressReader, 

45 PredictedDatasetInfo, 

46 PredictedQuantumGraph, 

47 PredictedQuantumInfo, 

48) 

49from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo 

50 

51logging.getLogger("timer").setLevel(logging.INFO) 

52logging.getLogger("lsst").setLevel(logging.INFO) 

53logging.getLogger("lsst.pipe.base.quantum_graph").setLevel(logging.DEBUG) 

54 

55 

56class PredictedQuantumGraphTestCase(unittest.TestCase): 

57 """Unit tests for the predicted quantum graph classes. 

58 

59 These tests focus on the interface of the predicted quantum graph 

60 especially I/O and conversion to and from the old quantum graph class. 

61 

62 Additional test coverage is provided by the tests for quantum graph 

63 builders and executors, which all use this class as well. 

64 """ 

65 

66 def setUp(self): 

67 self.helper = InMemoryRepo("base.yaml", "spatial.yaml") 

68 self.enterContext(self.helper) 

69 self.helper.add_task( 

70 "calibrate", 

71 dimensions=["visit", "detector"], 

72 inputs={ 

73 "input_image": DynamicConnectionConfig( 

74 dataset_type_name="raw", 

75 dimensions=["visit", "detector"], 

76 ) 

77 }, 

78 prerequisite_inputs={ 

79 "refcat": DynamicConnectionConfig( 

80 dataset_type_name="references", 

81 dimensions=["htm7"], 

82 multiple=True, 

83 ) 

84 }, 

85 init_outputs={ 

86 "output_schema": DynamicConnectionConfig( 

87 dataset_type_name="source_schema", 

88 ) 

89 }, 

90 outputs={ 

91 "output_image": DynamicConnectionConfig( 

92 dataset_type_name="image", 

93 dimensions=["visit", "detector"], 

94 ), 

95 "output_table": DynamicConnectionConfig( 

96 dataset_type_name="source_detector", 

97 dimensions=["visit", "detector"], 

98 ), 

99 }, 

100 ) 

101 self.helper.add_task( 

102 "consolidate", 

103 dimensions=["visit"], 

104 init_inputs={ 

105 "input_schema": DynamicConnectionConfig( 

106 dataset_type_name="source_schema", 

107 ) 

108 }, 

109 inputs={ 

110 "input_table": DynamicConnectionConfig( 

111 dataset_type_name="source_detector", 

112 dimensions=["visit", "detector"], 

113 multiple=True, 

114 ) 

115 }, 

116 outputs={ 

117 "output_table": DynamicConnectionConfig( 

118 dataset_type_name="source", 

119 dimensions=["visit"], 

120 ) 

121 }, 

122 ) 

123 self.helper.add_task( 

124 "resample", 

125 dimensions=["patch", "visit"], 

126 inputs={ 

127 "input_image": DynamicConnectionConfig( 

128 dataset_type_name="image", 

129 dimensions=["visit", "detector"], 

130 multiple=True, 

131 ) 

132 }, 

133 outputs={ 

134 "output_image": DynamicConnectionConfig( 

135 dataset_type_name="warp", 

136 dimensions=["patch", "visit"], 

137 ) 

138 }, 

139 ) 

140 self.helper.add_task( 

141 "coadd", 

142 dimensions=["patch", "band"], 

143 inputs={ 

144 "input_image": DynamicConnectionConfig( 

145 dataset_type_name="warp", 

146 dimensions=["patch", "visit"], 

147 multiple=True, 

148 ) 

149 }, 

150 outputs={ 

151 "output_image": DynamicConnectionConfig( 

152 dataset_type_name="coadd", 

153 dimensions=["patch", "band"], 

154 ), 

155 "output_table": DynamicConnectionConfig( 

156 dataset_type_name="object", 

157 dimensions=["patch", "band"], 

158 # Like all other (defaulted) storage classes here, 

159 # 'ArrowAstropy' below will be mocked; we pick it so we can 

160 # try a component input, and because we know its pytype is 

161 # safe to import here. 

162 storage_class="ArrowAstropy", 

163 ), 

164 }, 

165 ) 

166 self.helper.add_task( 

167 "match", 

168 dimensions=["htm6"], 

169 inputs={ 

170 "input_object": DynamicConnectionConfig( 

171 dataset_type_name="object", 

172 dimensions=["patch", "band"], 

173 multiple=True, 

174 storage_class="ArrowAstropy", 

175 ), 

176 "input_source": DynamicConnectionConfig( 

177 dataset_type_name="source", 

178 dimensions=["visit"], 

179 multiple=True, 

180 ), 

181 "input_object_schema": DynamicConnectionConfig( 

182 dataset_type_name="object.schema", 

183 dimensions=["visit"], 

184 multiple=True, 

185 storage_class="ArrowAstropySchema", 

186 ), 

187 }, 

188 outputs={ 

189 "output_table": DynamicConnectionConfig( 

190 dataset_type_name="matches", 

191 dimensions=["htm6"], 

192 ) 

193 }, 

194 ) 

195 self.builder = self.helper.make_quantum_graph_builder() 

196 

197 def check_quantum_graph( 

198 self, 

199 qg: PredictedQuantumGraph, 

200 *, 

201 all_tasks: bool = True, 

202 thin_graph: bool = True, 

203 all_quantum_datasets: bool = True, 

204 init_quanta: bool = True, 

205 dimension_data_loaded: bool = True, 

206 dimension_data_deserialized: bool = True, 

207 converting_partial: bool = False, 

208 ) -> None: 

209 """Test the attributes and methods of a quantum graph produced by this 

210 test case's builder. 

211 

212 Parameters 

213 ---------- 

214 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

215 Graph to test. 

216 all_tasks : `bool`, optional 

217 Whether all tasks in the pipeline were loaded. 

218 thin_graph : `bool`, optional 

219 Whether the ``thin_graph`` component was loaded. 

220 all_quantum_datasets : `bool`, optional 

221 Whether all quantum datasets were loaded. 

222 init_quanta : `bool`, optional 

223 Whether the ``init_quanta`` component was loaded. 

224 dimension_data_loaded : `bool`, optional 

225 Whether the ``dimension_data`` component was loaded at all. 

226 dimension_data_deserialized : `bool`, optional 

227 Whether all dimension records are expected to have been 

228 deserialized. Ignored if ``dimension_data_loaded`` is `False`. 

229 converting_partial : `bool`, optional 

230 Whether this was a partial load that only included some quanta. 

231 """ 

232 self.assertEqual(qg.header.inputs, ["input_chain"]) 

233 self.assertEqual(qg.header.output, "output_chain") 

234 self.assertEqual(qg.header.output_run, "output_run") 

235 self.assertEqual(qg.header.metadata["stuff"], "whatever") 

236 self.assertEqual(qg.header.version, FORMAT_VERSION) 

237 if not converting_partial: 

238 # When partial-reading an old QG, counts reflect what was loaded. 

239 # In all other cases they reflect the full original graph. 

240 self.assertEqual(qg.header.n_task_quanta["calibrate"], 8) 

241 self.assertEqual(qg.header.n_task_quanta["consolidate"], 2) 

242 self.assertEqual(qg.header.n_task_quanta["resample"], 10) 

243 self.assertEqual(qg.header.n_task_quanta["coadd"], 10) 

244 self.assertEqual(qg.header.n_task_quanta["match"], 2) 

245 self.assertEqual(qg.header.n_quanta, 32) 

246 self.assertEqual( 

247 qg.header.n_datasets, 

248 63 # non-automatic datasets (see breakdown below) 

249 + 32 * 2 # one metadata and one log for each quantam 

250 + 5 # one config for each task 

251 + 1, # packages 

252 ) 

253 if all_tasks: 

254 self.assertFalse(self.helper.pipeline_graph.diff_tasks(qg.pipeline_graph)) 

255 if thin_graph or all_quantum_datasets: 

256 self.assertEqual(len(qg), qg.header.n_quanta) 

257 self.assertEqual(len(qg.quantum_only_xgraph), qg.header.n_quanta) 

258 self.assertEqual(len(qg.quanta_by_task["calibrate"]), 8) 

259 self.assertEqual(len(qg.quanta_by_task["consolidate"]), 2) 

260 self.assertEqual(len(qg.quanta_by_task["resample"]), 10) 

261 self.assertEqual(len(qg.quanta_by_task["coadd"]), 10) 

262 self.assertEqual(len(qg.quanta_by_task["match"]), 2) 

263 if init_quanta: 

264 if all_tasks: 

265 self.assertEqual(len(qg.datasets_by_type["calibrate_config"]), 1) 

266 self.assertEqual(len(qg.datasets_by_type["consolidate_config"]), 1) 

267 self.assertEqual(len(qg.datasets_by_type["resample_config"]), 1) 

268 self.assertEqual(len(qg.datasets_by_type["coadd_config"]), 1) 

269 self.assertEqual(len(qg.datasets_by_type["match_config"]), 1) 

270 self.assertEqual(len(qg.datasets_by_type["source_schema"]), 1) 

271 self.assertEqual(len(qg.datasets_by_type["packages"]), 1) 

272 n_init_datasets = 7 

273 else: 

274 n_init_datasets = 0 

275 if all_quantum_datasets: 

276 assert all_tasks 

277 self.assertEqual( 

278 len(qg.bipartite_xgraph), 

279 qg.header.n_quanta + qg.header.n_datasets - n_init_datasets, 

280 ) 

281 self.assertEqual(len(qg.datasets_by_type["raw"]), 8) 

282 self.assertEqual(len(qg.datasets_by_type["references"]), 4) 

283 self.assertEqual(len(qg.datasets_by_type["image"]), 8) 

284 self.assertEqual(len(qg.datasets_by_type["source_detector"]), 8) 

285 self.assertEqual(len(qg.datasets_by_type["source"]), 2) 

286 self.assertEqual(len(qg.datasets_by_type["warp"]), 10) 

287 self.assertEqual(len(qg.datasets_by_type["coadd"]), 10) 

288 self.assertEqual(len(qg.datasets_by_type["object"]), 10) 

289 self.assertEqual(len(qg.datasets_by_type["matches"]), 2) 

290 # Spot-check some edges and graph structure. 

291 for data_id, dataset_id in qg.datasets_by_type["source_detector"].items(): 

292 for producer_id, _, edge_data in qg.bipartite_xgraph.in_edges(dataset_id, data=True): 

293 self.assertIn(producer_id, qg.quanta_by_task["calibrate"].values()) 

294 self.assertFalse(edge_data["is_read"]) 

295 self.assertEqual( 

296 edge_data["pipeline_edges"], 

297 [qg.pipeline_graph.tasks["calibrate"].get_output_edge("output_table")], 

298 ) 

299 for _, consumer_id, edge_data in qg.bipartite_xgraph.out_edges(dataset_id, data=True): 

300 self.assertIn(consumer_id, qg.quanta_by_task["consolidate"].values()) 

301 self.assertTrue(edge_data["is_read"]) 

302 self.assertEqual( 

303 edge_data["pipeline_edges"], 

304 [qg.pipeline_graph.tasks["consolidate"].get_input_edge("input_table")], 

305 ) 

306 # We use 'is' checks instead of just equality because we don't want 

307 # duplicates of these objects floating around wasting memory. 

308 for task_label, quanta_for_task in qg.quanta_by_task.items(): 

309 for data_id, quantum_id in quanta_for_task.items(): 

310 d1: PredictedQuantumInfo = qg.quantum_only_xgraph.nodes[quantum_id] 

311 self.assertEqual(d1["task_label"], task_label) 

312 self.assertIs(d1["data_id"], data_id) 

313 self.assertIs(d1["pipeline_node"], qg.pipeline_graph.tasks[task_label]) 

314 d2: PredictedQuantumInfo = qg.bipartite_xgraph.nodes[quantum_id] 

315 self.assertEqual(d2["task_label"], task_label) 

316 self.assertIs(d2["data_id"], data_id) 

317 self.assertIs(d2["pipeline_node"], qg.pipeline_graph.tasks[task_label]) 

318 for dataset_type_name, datasets_for_type in qg.datasets_by_type.items(): 

319 if ( 

320 dataset_type_name == "source_schema" 

321 or dataset_type_name == "packages" 

322 or dataset_type_name.endswith("_config") 

323 ): 

324 continue 

325 for data_id, dataset_id in datasets_for_type.items(): 

326 d3: PredictedDatasetInfo = qg.bipartite_xgraph.nodes[dataset_id] 

327 self.assertIs(d3["data_id"], data_id) 

328 self.assertIs(d3["pipeline_node"], qg.pipeline_graph.dataset_types[dataset_type_name]) 

329 if dimension_data_loaded: 

330 self.assertIsNotNone(qg.dimension_data) 

331 if dimension_data_deserialized and not converting_partial: 

332 self.assertEqual(len(qg.dimension_data.records["instrument"]), 1) 

333 self.assertEqual(len(qg.dimension_data.records["visit"]), 2) 

334 self.assertEqual(len(qg.dimension_data.records["detector"]), 4) 

335 self.assertEqual(len(qg.dimension_data.records["visit_detector_region"]), 8) 

336 self.assertEqual(len(qg.dimension_data.records["tract"]), 2) 

337 self.assertEqual(len(qg.dimension_data.records["patch"]), 9) 

338 self.assertEqual(len(qg.dimension_data.records["band"]), 2) 

339 self.assertEqual(len(qg.dimension_data.records["physical_filter"]), 2) 

340 

341 def define_partial_read(self, qg: PredictedQuantumGraph) -> list[uuid.UUID]: 

342 """Return a list of quantum IDs to test partial loads. 

343 

344 Parameters 

345 ---------- 

346 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

347 Full graph to obtain quantum IDs from. Note that quantum IDs are 

348 random and change every test run, and hence test behavior should 

349 never depend on explicit values or their sort order. 

350 

351 Returns 

352 ------- 

353 quanta : `list` [ `uuid.UUID` ] 

354 Quanta to use for partial-read tests. These always have the same 

355 task labels and data IDs: all ``calibrate`` and ``consolidate`` 

356 quanta for ``visit==1``. 

357 """ 

358 return [ 

359 quantum_id 

360 for data_id, quantum_id in qg.quanta_by_task["calibrate"].items() 

361 if data_id["visit"] == 1 

362 ] + [ 

363 quantum_id 

364 for data_id, quantum_id in qg.quanta_by_task["consolidate"].items() 

365 if data_id["visit"] == 1 

366 ] 

367 

368 def check_partial_read( 

369 self, qg: PredictedQuantumGraph, quanta: list[uuid.UUID], converting: bool = False 

370 ) -> None: 

371 """Test the attributes and methods of a partially-loaded quantum graph. 

372 

373 Parameters 

374 ---------- 

375 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

376 Graph for which only ``quanta`` were loaded. 

377 quanta : `list` [ `uuid.UUID` ] 

378 IDs of the quanta that were loaded. Must be the result of a call 

379 to `define_partial_read`. 

380 converting : `bool`, optional 

381 Whether this graph was read via the old 

382 `lsst.pipe.base.QuantumGraph` class and then converted. 

383 """ 

384 self.check_quantum_graph( 

385 qg, 

386 all_tasks=False, 

387 thin_graph=False, 

388 all_quantum_datasets=False, 

389 converting_partial=converting, 

390 dimension_data_deserialized=converting, 

391 ) 

392 self.assertEqual(len(qg), 5) 

393 self.assertEqual(len(qg.quantum_only_xgraph), 5) 

394 execution_quanta = qg.build_execution_quanta() 

395 self.assertEqual(execution_quanta.keys(), set(quanta)) 

396 for quantum_id, quantum in execution_quanta.items(): 

397 self.assertIs(qg.bipartite_xgraph.nodes[quantum_id]["quantum"], quantum) 

398 self.assertIs(qg.quantum_only_xgraph.nodes[quantum_id]["quantum"], quantum) 

399 

400 def test_build(self) -> None: 

401 """Test building a `PredictedQuantumGraph` by 

402 inspecting the result. 

403 """ 

404 qg = self.builder.finish( 

405 output="output_chain", 

406 metadata={"stuff": "whatever"}, 

407 attach_datastore_records=False, 

408 ).assemble() 

409 self.check_quantum_graph(qg) 

410 

411 def test_from_old_quantum_graph(self) -> None: 

412 """Test building a old `QuantumGraph` and then 

413 converting it to a `PredictedQuantumGraph`. 

414 """ 

415 old_qg = self.builder.build( 

416 metadata={ 

417 "input": ["input_chain"], 

418 "output": "output_chain", 

419 "output_run": "output_run", 

420 "stuff": "whatever", 

421 }, 

422 attach_datastore_records=False, 

423 ) 

424 new_qg = PredictedQuantumGraph.from_old_quantum_graph(old_qg) 

425 self.check_quantum_graph(new_qg) 

426 

427 def test_read_execution_quanta_old_file(self) -> None: 

428 """Test building a old `QuantumGraph`, saving it, and then reading it 

429 via `PredictedQuantumGraph.read_execution_quanta`. 

430 """ 

431 old_qg = self.builder.build( 

432 metadata={ 

433 "input": ["input_chain"], 

434 "output": "output_chain", 

435 "output_run": "output_run", 

436 "stuff": "whatever", 

437 }, 

438 attach_datastore_records=False, 

439 ) 

440 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

441 tmpfile = os.path.join(tmpdir, "old.qgraph") 

442 old_qg.saveUri(tmpfile) 

443 # Read everything. 

444 qg1 = PredictedQuantumGraph.read_execution_quanta(tmpfile) 

445 self.check_quantum_graph(qg1, thin_graph=False) 

446 # Read just a few quanta. 

447 quanta = self.define_partial_read(qg1) 

448 qg2 = PredictedQuantumGraph.read_execution_quanta(tmpfile, quantum_ids=quanta) 

449 self.check_partial_read(qg2, quanta, converting=True) 

450 

451 def test_roundtrip_old_quantum_graph(self) -> None: 

452 """Test building a `PredictedQuantumGraph` and round-tripping it 

453 through the old `QuantumGraph` class. 

454 """ 

455 qg1 = self.builder.finish( 

456 output="output_chain", 

457 metadata={"stuff": "whatever"}, 

458 attach_datastore_records=False, 

459 ).assemble() 

460 old_qg = qg1.to_old_quantum_graph() 

461 qg2 = PredictedQuantumGraph.from_old_quantum_graph(old_qg) 

462 self.check_quantum_graph(qg2) 

463 

464 def test_write_new_as_old(self) -> None: 

465 """Test building a `PredictedQuantumGraphComponents`, saving it with 

466 the old extension, and reading it back in both the old class and the 

467 new class. 

468 """ 

469 components = self.builder.finish( 

470 output="output_chain", 

471 metadata={"stuff": "whatever"}, 

472 attach_datastore_records=False, 

473 ) 

474 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

475 tmpfile = os.path.join(tmpdir, "old.qgraph") 

476 components.write(tmpfile) 

477 qg1 = PredictedQuantumGraph.read_execution_quanta(tmpfile) 

478 self.check_quantum_graph(qg1) 

479 old_qg = QuantumGraph.loadUri(tmpfile) 

480 qg2 = PredictedQuantumGraph.from_old_quantum_graph(old_qg) 

481 self.check_quantum_graph(qg2) 

482 

483 def test_read_new_as_old(self) -> None: 

484 """Test building a `PredictedQuantumGraphComponents`, saving it with 

485 the new extension, and reading it back in via the old class. 

486 """ 

487 components = self.builder.finish( 

488 output="output_chain", 

489 metadata={"stuff": "whatever"}, 

490 attach_datastore_records=False, 

491 ) 

492 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

493 tmpfile = os.path.join(tmpdir, "new.qg") 

494 components.write(tmpfile) 

495 old_qg = QuantumGraph.loadUri(tmpfile) 

496 new_qg = PredictedQuantumGraph.from_old_quantum_graph(old_qg) 

497 self.check_quantum_graph(new_qg) 

498 

499 def test_io(self) -> None: 

500 """Test saving a `PredictedQuantumGraphComponents` and reading it back 

501 in as a `PredictedQuantumGraph`, both fully and partially, and as a 

502 `QuantumGraph`, both fully and partially. 

503 """ 

504 components = self.builder.finish( 

505 output="output_chain", 

506 metadata={"stuff": "whatever"}, 

507 attach_datastore_records=False, 

508 ) 

509 # We use a small page size since this is a tiny graph and hence 

510 # otherwise all addresses would fit in a single page. 

511 four_row_page_size = AddressReader.compute_row_size(int_size=8, n_addresses=1) * 4 

512 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

513 tmpfile = os.path.join(tmpdir, "new.qg") 

514 components.write(tmpfile, zstd_dict_n_inputs=24) # enable dict compression code path 

515 # Test a full read with the new class. 

516 with PredictedQuantumGraph.open(tmpfile, page_size=four_row_page_size) as reader: 

517 reader.read_all() 

518 full_qg = reader.finish() 

519 self.check_quantum_graph(full_qg, dimension_data_deserialized=False) 

520 # Test a full read with the old class (uses new class and then 

521 # converts to old, and we convert back to new for the test). 

522 self.check_quantum_graph( 

523 PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(tmpfile)) 

524 ) 

525 # Test a partial read with the old class. 

526 quanta = self.define_partial_read(full_qg) 

527 self.check_partial_read( 

528 PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(tmpfile, nodes=quanta)), 

529 quanta, 

530 converting=True, 

531 ) 

532 # Test a thin but shallow read with the new class. 

533 with PredictedQuantumGraph.open(tmpfile, page_size=four_row_page_size) as reader: 

534 reader.read_thin_graph() 

535 thin_qg = reader.finish() 

536 self.check_quantum_graph( 

537 thin_qg, 

538 dimension_data_deserialized=False, 

539 all_quantum_datasets=False, 

540 dimension_data_loaded=False, 

541 init_quanta=False, 

542 ) 

543 # Test a deep read of just a few quanta with the new class. 

544 narrow_qg = PredictedQuantumGraph.read_execution_quanta( 

545 tmpfile, 

546 quantum_ids=quanta, 

547 page_size=four_row_page_size, 

548 ) 

549 self.check_partial_read(narrow_qg, quanta) 

550 

551 def test_no_compression_dict(self) -> None: 

552 """Test saving a `PredictedQuantumGraphComponents` and reading it back 

553 in as a `PredictedQuantumGraph` with the compression dictionary 

554 disabled. 

555 """ 

556 components = self.builder.finish( 

557 output="output_chain", 

558 metadata={"stuff": "whatever"}, 

559 attach_datastore_records=False, 

560 ) 

561 # We use a small page size since this is a tiny graph and hence 

562 # otherwise all addresses would fit in a single page. 

563 three_row_page_size = AddressReader.compute_row_size(int_size=8, n_addresses=1) * 3 

564 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

565 tmpfile = os.path.join(tmpdir, "new.qg") 

566 components.write(tmpfile, zstd_dict_size=0) 

567 with PredictedQuantumGraph.open(tmpfile, page_size=three_row_page_size) as reader: 

568 reader.read_all() 

569 full_qg = reader.finish() 

570 self.check_quantum_graph(full_qg, dimension_data_deserialized=False) 

571 

572 def test_dot(self) -> None: 

573 """Test visualization via GraphViz dot.""" 

574 qg = self.builder.finish(attach_datastore_records=False).assemble() 

575 stream = StringIO() 

576 graph2dot(qg, stream) 

577 if (dot_cmd := shutil.which("dot")) is None: 

578 raise unittest.SkipTest("Aborting test early; 'dot' command is not available.") 

579 # Just check that the dot command can parse what we've given it. 

580 result = subprocess.run( 

581 [dot_cmd, "-Txdot"], input=stream.getvalue().encode(), stdout=subprocess.DEVNULL 

582 ) 

583 result.check_returncode() 

584 

585 def test_mermaid(self) -> None: 

586 """Test visualization via Mermaid.""" 

587 qg = self.builder.finish(attach_datastore_records=False).assemble() 

588 stream = StringIO() 

589 # Just check that it runs without error. 

590 graph2mermaid(qg, stream) 

591 

592 def test_update_output_run(self) -> None: 

593 """Test the PredictedQuantumGraphComponents.output_run method.""" 

594 components = self.builder.finish( 

595 output="output_chain", 

596 metadata={"stuff": "whatever"}, 

597 attach_datastore_records=False, 

598 ) 

599 components.update_output_run("new_output_run") 

600 qg = components.assemble() 

601 for ref in qg.get_init_outputs("calibrate").values(): 

602 self.assertEqual(ref.run, "new_output_run") 

603 for quantum in qg.build_execution_quanta().values(): 

604 for ref in itertools.chain.from_iterable(quantum.outputs.values()): 

605 self.assertEqual(ref.run, "new_output_run", msg=str(ref)) 

606 for ref in itertools.chain.from_iterable(quantum.inputs.values()): 

607 if ref.datasetType.name not in ("raw", "references"): 

608 self.assertEqual(ref.run, "new_output_run", msg=str(ref)) 

609 

610 

611if __name__ == "__main__": 

612 lsst.utils.tests.init() 

613 unittest.main()