Coverage for tests / test_quantumBackedButler.py: 9%
294 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import json
29import os
30import tempfile
31import unittest
32import unittest.mock
33import zipfile
34from typing import cast
36from lsst.daf.butler import (
37 Butler,
38 ButlerMetrics,
39 Config,
40 DatasetRef,
41 DatasetType,
42 DimensionUniverse,
43 Quantum,
44 QuantumBackedButler,
45 QuantumProvenanceData,
46 RegistryConfig,
47 StorageClass,
48)
49from lsst.daf.butler.datastore.record_data import DatastoreRecordData
50from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex
51from lsst.daf.butler.direct_butler import DirectButler
52from lsst.daf.butler.registry import _RegistryFactory
53from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
54from lsst.resources import ResourcePath
56TESTDIR = os.path.abspath(os.path.dirname(__file__))
59class QuantumBackedButlerTestCase(unittest.TestCase):
60 """Test case for QuantumBackedButler."""
62 def setUp(self) -> None:
63 self.root = makeTestTempDir(TESTDIR)
64 self.config = Config()
65 self.config["root"] = self.root
66 self.universe = DimensionUniverse()
67 self.metrics = ButlerMetrics()
69 # Make a butler and import dimension definitions.
70 registryConfig = RegistryConfig(self.config.get("registry"))
71 registry = _RegistryFactory(registryConfig).create_from_config(butlerRoot=self.root)
72 registry.close()
73 butler = Butler.from_config(self.config, writeable=True, run="RUN", metrics=self.metrics)
74 self.enterContext(butler)
75 assert isinstance(butler, DirectButler)
76 self.butler = butler
77 self.butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/base.yaml")
79 # make all dataset types
80 graph = self.universe.conform(("instrument", "detector"))
81 storageClass = StorageClass("StructuredDataDict")
82 self.datasetTypeInit = DatasetType("test_ds_init", graph, storageClass)
83 self.datasetTypeInput = DatasetType("test_ds_input", graph, storageClass)
84 self.datasetTypeOutput = DatasetType("test_ds_output", graph, storageClass)
85 self.datasetTypeOutput2 = DatasetType("test_ds_output2", graph, storageClass)
86 self.datasetTypeExtra = DatasetType("test_ds_extra", graph, storageClass)
88 self.dataset_types: dict[str, DatasetType] = {}
89 dataset_types = (
90 self.datasetTypeInit,
91 self.datasetTypeInput,
92 self.datasetTypeOutput,
93 self.datasetTypeOutput2,
94 self.datasetTypeExtra,
95 )
96 for dataset_type in dataset_types:
97 self.butler.registry.registerDatasetType(dataset_type)
98 self.dataset_types[dataset_type.name] = dataset_type
100 dataIds = [
101 self.butler.registry.expandDataId(dict(instrument="Cam1", detector=detector_id))
102 for detector_id in (1, 2, 3, 4)
103 ]
105 # make actual input datasets
106 self.input_refs = [
107 self.butler.put({"data": dataId["detector"]}, self.datasetTypeInput, dataId) for dataId in dataIds
108 ]
109 self.init_inputs_refs = [self.butler.put({"data": -1}, self.datasetTypeInit, dataIds[0])]
110 self.all_input_refs = self.input_refs + self.init_inputs_refs
112 # generate dataset refs for outputs
113 self.output_refs = [DatasetRef(self.datasetTypeOutput, dataId, run="RUN") for dataId in dataIds]
114 self.output_refs2 = [DatasetRef(self.datasetTypeOutput2, dataId, run="RUN") for dataId in dataIds]
116 self.missing_refs = [DatasetRef(self.datasetTypeExtra, dataId, run="RUN") for dataId in dataIds]
118 def tearDown(self) -> None:
119 removeTestTempDir(self.root)
121 def make_quantum(self, step: int = 1) -> Quantum:
122 """Make a Quantum which includes datastore records."""
123 if step == 1:
124 datastore_records = self.butler._datastore.export_records(self.all_input_refs)
125 predictedInputs = {self.datasetTypeInput: self.input_refs}
126 outputs = {self.datasetTypeOutput: self.output_refs}
127 initInputs = {self.datasetTypeInit: self.init_inputs_refs[0]}
128 elif step == 2:
129 # The result should be empty, this is just to test that it works.
130 datastore_records = self.butler._datastore.export_records(self.output_refs)
131 predictedInputs = {self.datasetTypeInput: self.output_refs}
132 outputs = {self.datasetTypeOutput2: self.output_refs2}
133 initInputs = {}
134 else:
135 raise ValueError(f"unexpected {step} value")
137 return Quantum(
138 taskName="some.task.name",
139 inputs=predictedInputs,
140 outputs=outputs,
141 initInputs=initInputs,
142 datastore_records=datastore_records,
143 )
145 def test_initialize(self) -> None:
146 """Test for initialize factory method"""
147 quantum = self.make_quantum()
148 qbb = QuantumBackedButler.initialize(
149 config=self.config,
150 quantum=quantum,
151 dimensions=self.universe,
152 dataset_types=self.dataset_types,
153 metrics=self.metrics,
154 )
155 self.addCleanup(qbb.close)
156 self._test_factory(qbb)
158 def test_initialize_repo_index(self) -> None:
159 """Test for initialize using config file and repo index."""
160 # Store config to a file.
161 self.config.dumpToUri(self.root)
163 butler_index = Config()
164 butler_index["label"] = self.root
165 with ResourcePath.temporary_uri(suffix=".yaml") as index_path:
166 butler_index.dumpToUri(index_path)
168 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(index_path)}):
169 quantum = self.make_quantum()
170 qbb = QuantumBackedButler.initialize(
171 config="label",
172 quantum=quantum,
173 dimensions=self.universe,
174 dataset_types=self.dataset_types,
175 metrics=self.metrics,
176 )
177 self.addCleanup(qbb.close)
178 self._test_factory(qbb)
180 def test_from_predicted(self) -> None:
181 """Test for from_predicted factory method"""
182 datastore_records = self.butler._datastore.export_records(self.all_input_refs)
183 qbb = QuantumBackedButler.from_predicted(
184 config=self.config,
185 predicted_inputs=[ref.id for ref in self.all_input_refs],
186 predicted_outputs=[ref.id for ref in self.output_refs],
187 dimensions=self.universe,
188 datastore_records=datastore_records,
189 dataset_types=self.dataset_types,
190 )
191 self.addCleanup(qbb.close)
192 self._test_factory(qbb)
194 def _test_factory(self, qbb: QuantumBackedButler) -> None:
195 """Test state immediately after construction."""
196 self.assertTrue(qbb.isWriteable())
197 self.assertEqual(qbb._predicted_inputs, {ref.id for ref in self.all_input_refs})
198 self.assertEqual(qbb._predicted_outputs, {ref.id for ref in self.output_refs})
199 self.assertEqual(qbb._available_inputs, set())
200 self.assertEqual(qbb._unavailable_inputs, set())
201 self.assertEqual(qbb._actual_inputs, set())
202 self.assertEqual(qbb._actual_output_refs, set())
204 def test_getput(self) -> None:
205 """Test for get/put methods"""
206 quantum = self.make_quantum()
207 qbb = QuantumBackedButler.initialize(
208 config=self.config,
209 quantum=quantum,
210 dimensions=self.universe,
211 dataset_types=self.dataset_types,
212 metrics=self.metrics,
213 )
214 self.addCleanup(qbb.close)
216 # Verify all input data are readable.
217 for ref in self.input_refs:
218 data = qbb.get(ref)
219 self.assertEqual(data, {"data": ref.dataId["detector"]})
220 for ref in self.init_inputs_refs:
221 data = qbb.get(ref)
222 self.assertEqual(data, {"data": -1})
223 for ref in self.missing_refs:
224 with self.assertRaises(FileNotFoundError):
225 data = qbb.get(ref)
227 self.assertEqual(qbb._available_inputs, qbb._predicted_inputs)
228 self.assertEqual(qbb._actual_inputs, qbb._predicted_inputs)
229 self.assertEqual(qbb._unavailable_inputs, {ref.id for ref in self.missing_refs})
231 self.metrics.reset()
233 # Write all expected outputs.
234 for ref in self.output_refs:
235 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
236 self.assertEqual(self.metrics.n_put, len(self.output_refs))
238 # Must be able to read them back
239 for ref in self.output_refs:
240 data = qbb.get(ref)
241 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2})
243 self.assertEqual(qbb._actual_output_refs, set(self.output_refs))
244 self.assertEqual(self.metrics.n_get, len(self.output_refs))
246 # Retrieve them as a Zip artifact.
247 with tempfile.TemporaryDirectory() as tmpdir:
248 zip = qbb.retrieve_artifacts_zip(self.output_refs, destination=tmpdir)
250 index = ZipIndex.from_zip_file(zip)
252 # Check that using the alternate API gives the same index.
253 with zipfile.ZipFile(zip.ospath) as zf:
254 index2 = ZipIndex.from_open_zip(zf)
255 self.assertEqual(index, index2)
257 zip_refs = index.refs.to_refs(universe=qbb.dimensions)
258 self.assertEqual(len(zip_refs), 4)
259 self.assertEqual(set(zip_refs), set(self.output_refs))
260 self.assertEqual(len(index.artifact_map), 4) # Count number of artifacts in Zip.
262 # Retrieve them to a directory.
263 with tempfile.TemporaryDirectory() as tmpdir:
264 retrieved = qbb.retrieve_artifacts(self.output_refs, destination=tmpdir, preserve_path=False)
265 self.assertEqual(len(retrieved), 4)
266 self.assertTrue(retrieved[0].exists())
267 with open(os.path.join(tmpdir, ZipIndex.index_name)) as zf:
268 index = ZipIndex.model_validate_json(zf.read())
269 self.assertEqual(len(index.artifact_map), len(retrieved))
271 def test_getDeferred(self) -> None:
272 """Test for getDeferred method"""
273 quantum = self.make_quantum()
274 qbb = QuantumBackedButler.initialize(
275 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
276 )
277 self.addCleanup(qbb.close)
279 # get some input data
280 input_refs = self.input_refs[:2]
281 for ref in input_refs:
282 data = qbb.getDeferred(ref)
283 self.assertEqual(data.get(), {"data": ref.dataId["detector"]})
284 for ref in self.init_inputs_refs:
285 data = qbb.getDeferred(ref)
286 self.assertEqual(data.get(), {"data": -1})
287 for ref in self.missing_refs:
288 data = qbb.getDeferred(ref)
289 with self.assertRaises(FileNotFoundError):
290 data.get()
292 # _avalable_inputs is not
293 self.assertEqual(qbb._available_inputs, {ref.id for ref in input_refs + self.init_inputs_refs})
294 self.assertEqual(qbb._actual_inputs, {ref.id for ref in input_refs + self.init_inputs_refs})
295 self.assertEqual(qbb._unavailable_inputs, {ref.id for ref in self.missing_refs})
297 def test_stored(self) -> None:
298 """Test for dataset existence method"""
299 quantum = self.make_quantum()
300 qbb = QuantumBackedButler.initialize(
301 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
302 )
303 self.addCleanup(qbb.close)
305 # get some input data
306 input_refs = self.input_refs[:2]
307 for ref in input_refs:
308 exists = qbb.stored(ref)
309 self.assertTrue(exists)
310 for ref in self.init_inputs_refs:
311 exists = qbb.stored(ref)
312 self.assertTrue(exists)
313 for ref in self.missing_refs:
314 exists = qbb.stored(ref)
315 self.assertFalse(exists)
317 # Now do the same checks in bulk.
318 missing_set = set(self.missing_refs)
319 refs = input_refs + self.init_inputs_refs + self.missing_refs
320 stored_many = qbb.stored_many(refs)
321 for ref, stored in stored_many.items():
322 if ref in missing_set:
323 self.assertFalse(stored)
324 else:
325 self.assertTrue(stored)
327 # _available_inputs is not
328 self.assertEqual(qbb._available_inputs, {ref.id for ref in input_refs + self.init_inputs_refs})
329 self.assertEqual(qbb._actual_inputs, set())
330 self.assertEqual(qbb._unavailable_inputs, set()) # this is not consistent with get?
332 def test_markInputUnused(self) -> None:
333 """Test for markInputUnused method"""
334 quantum = self.make_quantum()
335 qbb = QuantumBackedButler.initialize(
336 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
337 )
338 self.addCleanup(qbb.close)
340 # get some input data
341 for ref in self.input_refs:
342 data = qbb.get(ref)
343 self.assertEqual(data, {"data": ref.dataId["detector"]})
344 for ref in self.init_inputs_refs:
345 data = qbb.get(ref)
346 self.assertEqual(data, {"data": -1})
348 self.assertEqual(qbb._available_inputs, qbb._predicted_inputs)
349 self.assertEqual(qbb._actual_inputs, qbb._predicted_inputs)
351 qbb.markInputUnused(self.input_refs[0])
352 self.assertEqual(qbb._actual_inputs, {ref.id for ref in self.input_refs[1:] + self.init_inputs_refs})
354 def test_pruneDatasets(self) -> None:
355 """Test for pruneDatasets methods"""
356 quantum = self.make_quantum()
357 qbb = QuantumBackedButler.initialize(
358 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
359 )
360 self.addCleanup(qbb.close)
362 # Write all expected outputs.
363 for ref in self.output_refs:
364 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
366 # Must be able to read them back
367 for ref in self.output_refs:
368 data = qbb.get(ref)
369 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2})
371 # Check for invalid arguments.
372 with self.assertRaisesRegex(TypeError, "Cannot pass purge=True without disassociate=True"):
373 qbb.pruneDatasets(self.output_refs, disassociate=False, unstore=True, purge=True)
374 with self.assertRaisesRegex(TypeError, "Cannot pass purge=True without unstore=True"):
375 qbb.pruneDatasets(self.output_refs, disassociate=True, unstore=False, purge=True)
376 with self.assertRaisesRegex(TypeError, "Cannot pass disassociate=True without purge=True"):
377 qbb.pruneDatasets(self.output_refs, disassociate=True, unstore=True, purge=False)
379 # Disassociate only.
380 ref = self.output_refs[0]
381 qbb.pruneDatasets([ref], disassociate=False, unstore=True, purge=False)
382 self.assertFalse(qbb.stored(ref))
383 with self.assertRaises(FileNotFoundError):
384 data = qbb.get(ref)
386 # can store it again
387 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
388 self.assertTrue(qbb.stored(ref))
390 # Purge completely.
391 ref = self.output_refs[1]
392 qbb.pruneDatasets([ref], disassociate=True, unstore=True, purge=True)
393 self.assertFalse(qbb.stored(ref))
394 with self.assertRaises(FileNotFoundError):
395 data = qbb.get(ref)
396 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
397 self.assertTrue(qbb.stored(ref))
399 def test_extract_provenance_data(self) -> None:
400 """Test for extract_provenance_data method"""
401 quantum = self.make_quantum()
402 qbb = QuantumBackedButler.initialize(
403 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
404 )
405 self.addCleanup(qbb.close)
407 # read/store everything
408 for ref in self.input_refs:
409 qbb.get(ref)
410 for ref in self.init_inputs_refs:
411 qbb.get(ref)
412 for ref in self.output_refs:
413 qbb.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
415 provenance1 = qbb.extract_provenance_data()
416 prov_json = provenance1.model_dump_json()
417 provenance2 = QuantumProvenanceData.direct(**json.loads(prov_json))
418 for provenance in (provenance1, provenance2):
419 input_ids = {ref.id for ref in self.input_refs + self.init_inputs_refs}
420 self.assertEqual(provenance.predicted_inputs, input_ids)
421 self.assertEqual(provenance.available_inputs, input_ids)
422 self.assertEqual(provenance.actual_inputs, input_ids)
423 output_ids = {ref.id for ref in self.output_refs}
424 self.assertEqual(provenance.predicted_outputs, output_ids)
425 self.assertEqual(provenance.actual_outputs, output_ids)
426 datastore_name = "FileDatastore@<butlerRoot>/datastore"
427 self.assertEqual(set(provenance.datastore_records.keys()), {datastore_name})
428 datastore_records = provenance.datastore_records[datastore_name]
429 self.assertEqual(set(datastore_records.dataset_ids), output_ids)
430 class_name = "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo"
431 self.assertEqual(set(datastore_records.records.keys()), {class_name})
432 self.assertEqual(set(datastore_records.records[class_name].keys()), {id.hex for id in output_ids})
433 table_name = "file_datastore_records"
434 for dataset_data in datastore_records.records[class_name].values():
435 self.assertEqual(set(dataset_data), {table_name})
437 def test_export_predicted_datastore_records(self) -> None:
438 """Test for export_predicted_datastore_records method"""
439 quantum = self.make_quantum()
440 qbb = QuantumBackedButler.initialize(
441 config=self.config, quantum=quantum, dimensions=self.universe, dataset_types=self.dataset_types
442 )
443 self.addCleanup(qbb.close)
445 records = qbb.export_predicted_datastore_records(self.output_refs)
446 self.assertEqual(len(records["FileDatastore@<butlerRoot>/datastore"].records), len(self.output_refs))
448 with self.assertRaises(ValueError):
449 qbb.export_predicted_datastore_records(self.missing_refs)
451 def test_collect_and_transfer(self) -> None:
452 """Test for collect_and_transfer method"""
453 quantum1 = self.make_quantum(1)
454 qbb1 = QuantumBackedButler.initialize(
455 config=self.config, quantum=quantum1, dimensions=self.universe, dataset_types=self.dataset_types
456 )
457 self.addCleanup(qbb1.close)
459 quantum2 = self.make_quantum(2)
460 qbb2 = QuantumBackedButler.initialize(
461 config=self.config, quantum=quantum2, dimensions=self.universe, dataset_types=self.dataset_types
462 )
463 self.addCleanup(qbb2.close)
465 # read/store everything
466 for ref in self.input_refs:
467 qbb1.get(ref)
468 for ref in self.init_inputs_refs:
469 qbb1.get(ref)
470 for ref in self.output_refs:
471 qbb1.put({"data": cast(int, ref.dataId["detector"]) ** 2}, ref)
473 for ref in self.output_refs:
474 qbb2.get(ref)
475 for ref in self.output_refs2:
476 qbb2.put({"data": cast(int, ref.dataId["detector"]) ** 3}, ref)
478 QuantumProvenanceData.collect_and_transfer(
479 self.butler,
480 [quantum1, quantum2],
481 [qbb1.extract_provenance_data(), qbb2.extract_provenance_data()],
482 )
484 for ref in self.output_refs:
485 data = self.butler.get(ref)
486 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 2})
488 for ref in self.output_refs2:
489 data = self.butler.get(ref)
490 self.assertEqual(data, {"data": cast(int, ref.dataId["detector"]) ** 3})
492 def test_record_data_merge_mappings(self):
493 """Test DatastoreRecordData.merge_mappings."""
494 r1 = self.butler._datastore.export_records(self.input_refs)
495 r2 = self.butler._datastore.export_records(self.init_inputs_refs)
496 r3 = self.butler._datastore.export_records(self.input_refs) # intentional duplicate
497 merged = DatastoreRecordData.merge_mappings(r1, r2, r3)
498 self.assertEqual(merged.keys(), r1.keys() | r2.keys())
499 for datastore_name in merged.keys():
500 if datastore_name in r1:
501 self.assertGreaterEqual(
502 merged[datastore_name].records.keys(), r1[datastore_name].records.keys()
503 )
504 if datastore_name in r2:
505 self.assertGreaterEqual(
506 merged[datastore_name].records.keys(), r2[datastore_name].records.keys()
507 )
510if __name__ == "__main__":
511 unittest.main()