Coverage for tests / test_quantumBackedButler.py: 9%

294 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:18 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import json 

29import os 

30import tempfile 

31import unittest 

32import unittest.mock 

33import zipfile 

34from typing import cast 

35 

36from lsst.daf.butler import ( 

37 Butler, 

38 ButlerMetrics, 

39 Config, 

40 DatasetRef, 

41 DatasetType, 

42 DimensionUniverse, 

43 Quantum, 

44 QuantumBackedButler, 

45 QuantumProvenanceData, 

46 RegistryConfig, 

47 StorageClass, 

48) 

49from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

50from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

51from lsst.daf.butler.direct_butler import DirectButler 

52from lsst.daf.butler.registry import _RegistryFactory 

53from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

54from lsst.resources import ResourcePath 

55 

56TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

57 

58 

59class QuantumBackedButlerTestCase(unittest.TestCase): 

60 """Test case for QuantumBackedButler.""" 

61 

62 def setUp(self) -> None: 

63 self.root = makeTestTempDir(TESTDIR) 

64 self.config = Config() 

65 self.config["root"] = self.root 

66 self.universe = DimensionUniverse() 

67 self.metrics = ButlerMetrics() 

68 

69 # Make a butler and import dimension definitions. 

70 registryConfig = RegistryConfig(self.config.get("registry")) 

71 registry = _RegistryFactory(registryConfig).create_from_config(butlerRoot=self.root) 

72 registry.close() 

73 butler = Butler.from_config(self.config, writeable=True, run="RUN", metrics=self.metrics) 

74 self.enterContext(butler) 

75 assert isinstance(butler, DirectButler) 

76 self.butler = butler 

77 self.butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/base.yaml") 

78 

79 # make all dataset types 

80 graph = self.universe.conform(("instrument", "detector")) 

81 storageClass = StorageClass("StructuredDataDict") 

82 self.datasetTypeInit = DatasetType("test_ds_init", graph, storageClass) 

83 self.datasetTypeInput = DatasetType("test_ds_input", graph, storageClass) 

84 self.datasetTypeOutput = DatasetType("test_ds_output", graph, storageClass) 

85 self.datasetTypeOutput2 = DatasetType("test_ds_output2", graph, storageClass) 

86 self.datasetTypeExtra = DatasetType("test_ds_extra", graph, storageClass) 

87 

88 self.dataset_types: dict[str, DatasetType] = {} 

89 dataset_types = ( 

90 self.datasetTypeInit, 

91 self.datasetTypeInput, 

92 self.datasetTypeOutput, 

93 self.datasetTypeOutput2, 

94 self.datasetTypeExtra, 

95 ) 

96 for dataset_type in dataset_types: 

97 self.butler.registry.registerDatasetType(dataset_type) 

98 self.dataset_types[dataset_type.name] = dataset_type 

99 

100 dataIds = [ 

101 self.butler.registry.expandDataId(dict(instrument="Cam1", detector=detector_id)) 

102 for detector_id in (1, 2, 3, 4) 

103 ] 

104 

105 # make actual input datasets 

106 self.input_refs = [ 

107 self.butler.put({"data": dataId["detector"]}, self.datasetTypeInput, dataId) for dataId in dataIds 

108 ] 

109 self.init_inputs_refs = [self.butler.put({"data": -1}, self.datasetTypeInit, dataIds[0])] 

110 self.all_input_refs = self.input_refs + self.init_inputs_refs 

111 

112 # generate dataset refs for outputs 

113 self.output_refs = [DatasetRef(self.datasetTypeOutput, dataId, run="RUN") for dataId in dataIds] 

114 self.output_refs2 = [DatasetRef(self.datasetTypeOutput2, dataId, run="RUN") for dataId in dataIds] 

115 

116 self.missing_refs = [DatasetRef(self.datasetTypeExtra, dataId, run="RUN") for dataId in dataIds] 

117 

118 def tearDown(self) -> None: 

119 removeTestTempDir(self.root) 

120 

121 def make_quantum(self, step: int = 1) -> Quantum: 

122 """Make a Quantum which includes datastore records.""" 

123 if step == 1: 

124 datastore_records = self.butler._datastore.export_records(self.all_input_refs) 

125 predictedInputs = {self.datasetTypeInput: self.input_refs} 

126 outputs = {self.datasetTypeOutput: self.output_refs} 

127 initInputs = {self.datasetTypeInit: self.init_inputs_refs[0]} 

128 elif step == 2: 

129 # The result should be empty, this is just to test that it works. 

130 datastore_records = self.butler._datastore.export_records(self.output_refs) 

131 predictedInputs = {self.datasetTypeInput: self.output_refs} 

132 outputs = {self.datasetTypeOutput2: self.output_refs2} 

133 initInputs = {} 

134 else: 

135 raise ValueError(f"unexpected {step} value") 

136 

137 return Quantum( 

138 taskName="some.task.name", 

139 inputs=predictedInputs, 

140 outputs=outputs, 

141 initInputs=initInputs, 

142 datastore_records=datastore_records, 

143 ) 

144 

145 def test_initialize(self) -> None: 

146 """Test for initialize factory method""" 

147 quantum = self.make_quantum() 

148 qbb = QuantumBackedButler.initialize( 

149 config=self.config, 

150 quantum=quantum, 

151 dimensions=self.universe, 

152 dataset_types=self.dataset_types, 

153 metrics=self.metrics, 

154 ) 

155 self.addCleanup(qbb.close) 

156 self._test_factory(qbb) 

157 

158 def test_initialize_repo_index(self) -> None: 

159 """Test for initialize using config file and repo index.""" 

160 # Store config to a file. 

161 self.config.dumpToUri(self.root) 

162 

163 butler_index = Config() 

164 butler_index["label"] = self.root 

165 with ResourcePath.temporary_uri(suffix=".yaml") as index_path: 

166 butler_index.dumpToUri(index_path) 

167 

168 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(index_path)}): 

169 quantum = self.make_quantum() 

170 qbb = QuantumBackedButler.initialize( 

171 config="label", 

172 quantum=quantum, 

173 dimensions=self.universe, 

174 dataset_types=self.dataset_types, 

175 metrics=self.metrics, 

176 ) 

177 self.addCleanup(qbb.close) 

178 self._test_factory(qbb) 

179 

180 def test_from_predicted(self) -> None: 

181 """Test for from_predicted factory method""" 

182 datastore_records = self.butler._datastore.export_records(self.all_input_refs) 

183 qbb = QuantumBackedButler.from_predicted( 

184 config=self.config, 

185 predicted_inputs=[ref.id for ref in self.all_input_refs], 

186 predicted_outputs=[ref.id for ref in self.output_refs], 

187 dimensions=self.universe, 

188 datastore_records=datastore_records, 

189 dataset_types=self.dataset_types, 

190 ) 

191 self.addCleanup(qbb.close) 

192 self._test_factory(qbb) 

193 

194 def _test_factory(self, qbb: QuantumBackedButler) -> None: 

195 """Test state immediately after construction.""" 

196 self.assertTrue(qbb.isWriteable()) 

197 self.assertEqual(qbb._predicted_inputs, {ref.id for ref in self.all_input_refs}) 

198 self.assertEqual(qbb._predicted_outputs, {ref.id for ref in self.output_refs}) 

199 self.assertEqual(qbb._available_inputs, set()) 

200 self.assertEqual(qbb._unavailable_inputs, set()) 

201 self.assertEqual(qbb._actual_inputs, set()) 

202 self.assertEqual(qbb._actual_output_refs, set()) 

203 

204 def test_getput(self) -> None: 

205 """Test for get/put methods""" 

206 quantum = self.make_quantum() 

207 qbb = QuantumBackedButler.initialize( 

208 config=self.config, 

209 quantum=quantum, 

210 dimensions=self.universe, 

211 dataset_types=self.dataset_types, 

212 metrics=self.metrics, 

213 ) 

214 self.addCleanup(qbb.close) 

215 

216 # Verify all input data are readable. 

217 for ref in self.input_refs: 

218 data = qbb.get(ref) 

219 self.assertEqual(data, {"data": ref.dataId["detector"]}) 

220 for ref in self.init_inputs_refs: 

221 data = qbb.get(ref) 

222 self.assertEqual(data, {"data": -1}) 

223 for ref in self.missing_refs: 

224 with self.assertRaises(FileNotFoundError): 

225 data = qbb.get(ref) 

226 

227 self.assertEqual(qbb._available_inputs, qbb._predicted_inputs) 

228 self.assertEqual(qbb._actual_inputs, qbb._predicted_inputs) 

229 self.assertEqual(qbb._unavailable_inputs, {ref.id for ref in self.missing_refs}) 

230 

231 self.metrics.reset() 

232 

233 # Write all expected outputs. 

234 for ref in self.output_refs: 

235 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

236 self.assertEqual(self.metrics.n_put, len(self.output_refs)) 

237 

238 # Must be able to read them back 

239 for ref in self.output_refs: 

240 data = qbb.get(ref) 

241 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2}) 

242 

243 self.assertEqual(qbb._actual_output_refs, set(self.output_refs)) 

244 self.assertEqual(self.metrics.n_get, len(self.output_refs)) 

245 

246 # Retrieve them as a Zip artifact. 

247 with tempfile.TemporaryDirectory() as tmpdir: 

248 zip = qbb.retrieve_artifacts_zip(self.output_refs, destination=tmpdir) 

249 

250 index = ZipIndex.from_zip_file(zip) 

251 

252 # Check that using the alternate API gives the same index. 

253 with zipfile.ZipFile(zip.ospath) as zf: 

254 index2 = ZipIndex.from_open_zip(zf) 

255 self.assertEqual(index, index2) 

256 

257 zip_refs = index.refs.to_refs(universe=qbb.dimensions) 

258 self.assertEqual(len(zip_refs), 4) 

259 self.assertEqual(set(zip_refs), set(self.output_refs)) 

260 self.assertEqual(len(index.artifact_map), 4) # Count number of artifacts in Zip. 

261 

262 # Retrieve them to a directory. 

263 with tempfile.TemporaryDirectory() as tmpdir: 

264 retrieved = qbb.retrieve_artifacts(self.output_refs, destination=tmpdir, preserve_path=False) 

265 self.assertEqual(len(retrieved), 4) 

266 self.assertTrue(retrieved[0].exists()) 

267 with open(os.path.join(tmpdir, ZipIndex.index_name)) as zf: 

268 index = ZipIndex.model_validate_json(zf.read()) 

269 self.assertEqual(len(index.artifact_map), len(retrieved)) 

270 

271 def test_getDeferred(self) -> None: 

272 """Test for getDeferred method""" 

273 quantum = self.make_quantum() 

274 qbb = QuantumBackedButler.initialize( 

275 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

276 ) 

277 self.addCleanup(qbb.close) 

278 

279 # get some input data 

280 input_refs = self.input_refs[:2] 

281 for ref in input_refs: 

282 data = qbb.getDeferred(ref) 

283 self.assertEqual(data.get(), {"data": ref.dataId["detector"]}) 

284 for ref in self.init_inputs_refs: 

285 data = qbb.getDeferred(ref) 

286 self.assertEqual(data.get(), {"data": -1}) 

287 for ref in self.missing_refs: 

288 data = qbb.getDeferred(ref) 

289 with self.assertRaises(FileNotFoundError): 

290 data.get() 

291 

292 # _avalable_inputs is not 

293 self.assertEqual(qbb._available_inputs, {ref.id for ref in input_refs + self.init_inputs_refs}) 

294 self.assertEqual(qbb._actual_inputs, {ref.id for ref in input_refs + self.init_inputs_refs}) 

295 self.assertEqual(qbb._unavailable_inputs, {ref.id for ref in self.missing_refs}) 

296 

297 def test_stored(self) -> None: 

298 """Test for dataset existence method""" 

299 quantum = self.make_quantum() 

300 qbb = QuantumBackedButler.initialize( 

301 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

302 ) 

303 self.addCleanup(qbb.close) 

304 

305 # get some input data 

306 input_refs = self.input_refs[:2] 

307 for ref in input_refs: 

308 exists = qbb.stored(ref) 

309 self.assertTrue(exists) 

310 for ref in self.init_inputs_refs: 

311 exists = qbb.stored(ref) 

312 self.assertTrue(exists) 

313 for ref in self.missing_refs: 

314 exists = qbb.stored(ref) 

315 self.assertFalse(exists) 

316 

317 # Now do the same checks in bulk. 

318 missing_set = set(self.missing_refs) 

319 refs = input_refs + self.init_inputs_refs + self.missing_refs 

320 stored_many = qbb.stored_many(refs) 

321 for ref, stored in stored_many.items(): 

322 if ref in missing_set: 

323 self.assertFalse(stored) 

324 else: 

325 self.assertTrue(stored) 

326 

327 # _available_inputs is not 

328 self.assertEqual(qbb._available_inputs, {ref.id for ref in input_refs + self.init_inputs_refs}) 

329 self.assertEqual(qbb._actual_inputs, set()) 

330 self.assertEqual(qbb._unavailable_inputs, set()) # this is not consistent with get? 

331 

332 def test_markInputUnused(self) -> None: 

333 """Test for markInputUnused method""" 

334 quantum = self.make_quantum() 

335 qbb = QuantumBackedButler.initialize( 

336 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

337 ) 

338 self.addCleanup(qbb.close) 

339 

340 # get some input data 

341 for ref in self.input_refs: 

342 data = qbb.get(ref) 

343 self.assertEqual(data, {"data": ref.dataId["detector"]}) 

344 for ref in self.init_inputs_refs: 

345 data = qbb.get(ref) 

346 self.assertEqual(data, {"data": -1}) 

347 

348 self.assertEqual(qbb._available_inputs, qbb._predicted_inputs) 

349 self.assertEqual(qbb._actual_inputs, qbb._predicted_inputs) 

350 

351 qbb.markInputUnused(self.input_refs[0]) 

352 self.assertEqual(qbb._actual_inputs, {ref.id for ref in self.input_refs[1:] + self.init_inputs_refs}) 

353 

354 def test_pruneDatasets(self) -> None: 

355 """Test for pruneDatasets methods""" 

356 quantum = self.make_quantum() 

357 qbb = QuantumBackedButler.initialize( 

358 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

359 ) 

360 self.addCleanup(qbb.close) 

361 

362 # Write all expected outputs. 

363 for ref in self.output_refs: 

364 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

365 

366 # Must be able to read them back 

367 for ref in self.output_refs: 

368 data = qbb.get(ref) 

369 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2}) 

370 

371 # Check for invalid arguments. 

372 with self.assertRaisesRegex(TypeError, "Cannot pass purge=True without disassociate=True"): 

373 qbb.pruneDatasets(self.output_refs, disassociate=False, unstore=True, purge=True) 

374 with self.assertRaisesRegex(TypeError, "Cannot pass purge=True without unstore=True"): 

375 qbb.pruneDatasets(self.output_refs, disassociate=True, unstore=False, purge=True) 

376 with self.assertRaisesRegex(TypeError, "Cannot pass disassociate=True without purge=True"): 

377 qbb.pruneDatasets(self.output_refs, disassociate=True, unstore=True, purge=False) 

378 

379 # Disassociate only. 

380 ref = self.output_refs[0] 

381 qbb.pruneDatasets([ref], disassociate=False, unstore=True, purge=False) 

382 self.assertFalse(qbb.stored(ref)) 

383 with self.assertRaises(FileNotFoundError): 

384 data = qbb.get(ref) 

385 

386 # can store it again 

387 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

388 self.assertTrue(qbb.stored(ref)) 

389 

390 # Purge completely. 

391 ref = self.output_refs[1] 

392 qbb.pruneDatasets([ref], disassociate=True, unstore=True, purge=True) 

393 self.assertFalse(qbb.stored(ref)) 

394 with self.assertRaises(FileNotFoundError): 

395 data = qbb.get(ref) 

396 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

397 self.assertTrue(qbb.stored(ref)) 

398 

399 def test_extract_provenance_data(self) -> None: 

400 """Test for extract_provenance_data method""" 

401 quantum = self.make_quantum() 

402 qbb = QuantumBackedButler.initialize( 

403 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

404 ) 

405 self.addCleanup(qbb.close) 

406 

407 # read/store everything 

408 for ref in self.input_refs: 

409 qbb.get(ref) 

410 for ref in self.init_inputs_refs: 

411 qbb.get(ref) 

412 for ref in self.output_refs: 

413 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

414 

415 provenance1 = qbb.extract_provenance_data() 

416 prov_json = provenance1.model_dump_json() 

417 provenance2 = QuantumProvenanceData.direct(**json.loads(prov_json)) 

418 for provenance in (provenance1, provenance2): 

419 input_ids = {ref.id for ref in self.input_refs + self.init_inputs_refs} 

420 self.assertEqual(provenance.predicted_inputs, input_ids) 

421 self.assertEqual(provenance.available_inputs, input_ids) 

422 self.assertEqual(provenance.actual_inputs, input_ids) 

423 output_ids = {ref.id for ref in self.output_refs} 

424 self.assertEqual(provenance.predicted_outputs, output_ids) 

425 self.assertEqual(provenance.actual_outputs, output_ids) 

426 datastore_name = "FileDatastore@<butlerRoot>/datastore" 

427 self.assertEqual(set(provenance.datastore_records.keys()), {datastore_name}) 

428 datastore_records = provenance.datastore_records[datastore_name] 

429 self.assertEqual(set(datastore_records.dataset_ids), output_ids) 

430 class_name = "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo" 

431 self.assertEqual(set(datastore_records.records.keys()), {class_name}) 

432 self.assertEqual(set(datastore_records.records[class_name].keys()), {id.hex for id in output_ids}) 

433 table_name = "file_datastore_records" 

434 for dataset_data in datastore_records.records[class_name].values(): 

435 self.assertEqual(set(dataset_data), {table_name}) 

436 

437 def test_export_predicted_datastore_records(self) -> None: 

438 """Test for export_predicted_datastore_records method""" 

439 quantum = self.make_quantum() 

440 qbb = QuantumBackedButler.initialize( 

441 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types 

442 ) 

443 self.addCleanup(qbb.close) 

444 

445 records = qbb.export_predicted_datastore_records(self.output_refs) 

446 self.assertEqual(len(records["FileDatastore@<butlerRoot>/datastore"].records), len(self.output_refs)) 

447 

448 with self.assertRaises(ValueError): 

449 qbb.export_predicted_datastore_records(self.missing_refs) 

450 

451 def test_collect_and_transfer(self) -> None: 

452 """Test for collect_and_transfer method""" 

453 quantum1 = self.make_quantum(1) 

454 qbb1 = QuantumBackedButler.initialize( 

455 config=self.config, quantum=quantum1, dimensions=self.universe, dataset_types=self.dataset_types 

456 ) 

457 self.addCleanup(qbb1.close) 

458 

459 quantum2 = self.make_quantum(2) 

460 qbb2 = QuantumBackedButler.initialize( 

461 config=self.config, quantum=quantum2, dimensions=self.universe, dataset_types=self.dataset_types 

462 ) 

463 self.addCleanup(qbb2.close) 

464 

465 # read/store everything 

466 for ref in self.input_refs: 

467 qbb1.get(ref) 

468 for ref in self.init_inputs_refs: 

469 qbb1.get(ref) 

470 for ref in self.output_refs: 

471 qbb1.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref) 

472 

473 for ref in self.output_refs: 

474 qbb2.get(ref) 

475 for ref in self.output_refs2: 

476 qbb2.put({"data": cast(int, ref.dataId["detector"]) ** 3}, ref) 

477 

478 QuantumProvenanceData.collect_and_transfer( 

479 self.butler, 

480 [quantum1, quantum2], 

481 [qbb1.extract_provenance_data(), qbb2.extract_provenance_data()], 

482 ) 

483 

484 for ref in self.output_refs: 

485 data = self.butler.get(ref) 

486 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2}) 

487 

488 for ref in self.output_refs2: 

489 data = self.butler.get(ref) 

490 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 3}) 

491 

492 def test_record_data_merge_mappings(self): 

493 """Test DatastoreRecordData.merge_mappings.""" 

494 r1 = self.butler._datastore.export_records(self.input_refs) 

495 r2 = self.butler._datastore.export_records(self.init_inputs_refs) 

496 r3 = self.butler._datastore.export_records(self.input_refs) # intentional duplicate 

497 merged = DatastoreRecordData.merge_mappings(r1, r2, r3) 

498 self.assertEqual(merged.keys(), r1.keys() | r2.keys()) 

499 for datastore_name in merged.keys(): 

500 if datastore_name in r1: 

501 self.assertGreaterEqual( 

502 merged[datastore_name].records.keys(), r1[datastore_name].records.keys() 

503 ) 

504 if datastore_name in r2: 

505 self.assertGreaterEqual( 

506 merged[datastore_name].records.keys(), r2[datastore_name].records.keys() 

507 ) 

508 

509 

510if __name__ == "__main__": 

511 unittest.main()