Coverage for tests / test_datastore.py: 13%
1214 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30import contextlib
31import os
32import pickle
33import shutil
34import tempfile
35import time
36import unittest
37import unittest.mock
38import uuid
39from collections.abc import Callable, Iterator
40from typing import Any, cast
42import yaml
44import lsst.utils.tests
45from lsst.daf.butler import (
46 Config,
47 DataCoordinate,
48 DatasetIdGenEnum,
49 DatasetRef,
50 DatasetType,
51 DatasetTypeNotSupportedError,
52 Datastore,
53 DimensionUniverse,
54 FileDataset,
55 StorageClass,
56 StorageClassFactory,
57)
58from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreValidationError, NullDatastore
59from lsst.daf.butler.datastore.cache_manager import (
60 DatastoreCacheManager,
61 DatastoreCacheManagerConfig,
62 DatastoreDisabledCacheManager,
63)
64from lsst.daf.butler.datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
65from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo, make_datastore_path_relative
66from lsst.daf.butler.formatters.yaml import YamlFormatter
67from lsst.daf.butler.tests import (
68 BadNoWriteFormatter,
69 BadWriteFormatter,
70 DatasetTestHelper,
71 DatastoreTestHelper,
72 DummyRegistry,
73 MetricsExample,
74 MetricsExampleDataclass,
75 MetricsExampleModel,
76)
77from lsst.daf.butler.tests.dict_convertible_model import DictConvertibleModel
78from lsst.daf.butler.tests.utils import TestCaseMixin
79from lsst.resources import ResourcePath
80from lsst.utils import doImport
81from lsst.utils.introspection import get_full_type_name
83TESTDIR = os.path.dirname(__file__)
86def makeExampleMetrics(use_none: bool = False) -> MetricsExample:
87 """Make example dataset that can be stored in butler."""
88 if use_none:
89 array = None
90 else:
91 array = [563, 234, 456.7, 105, 2054, -1045]
92 return MetricsExample(
93 {"AM1": 5.2, "AM2": 30.6},
94 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
95 array,
96 )
99class TransactionTestError(Exception):
100 """Specific error for transactions, to prevent misdiagnosing
101 that might otherwise occur when a standard exception is used.
102 """
104 pass
107class DatastoreTestsBase(DatasetTestHelper, DatastoreTestHelper, TestCaseMixin):
108 """Support routines for datastore testing"""
110 root: str | None = None
111 universe: DimensionUniverse
112 storageClassFactory: StorageClassFactory
114 @classmethod
115 def setUpClass(cls) -> None:
116 # Storage Classes are fixed for all datastores in these tests
117 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml")
118 cls.storageClassFactory = StorageClassFactory()
119 cls.storageClassFactory.addFromConfig(scConfigFile)
121 # Read the Datastore config so we can get the class
122 # information (since we should not assume the constructor
123 # name here, but rely on the configuration file itself)
124 datastoreConfig = DatastoreConfig(cls.configFile)
125 cls.datastoreType = cast(type[Datastore], doImport(datastoreConfig["cls"]))
126 cls.universe = DimensionUniverse()
128 def setUp(self) -> None:
129 self.setUpDatastoreTests(DummyRegistry, DatastoreConfig)
131 def tearDown(self) -> None:
132 if self.root is not None and os.path.exists(self.root):
133 shutil.rmtree(self.root, ignore_errors=True)
136class DatastoreTests(DatastoreTestsBase):
137 """Some basic tests of a simple datastore."""
139 hasUnsupportedPut = True
140 rootKeys: tuple[str, ...] | None = None
141 isEphemeral: bool = False
142 validationCanFail: bool = False
144 def testConfigRoot(self) -> None:
145 full = DatastoreConfig(self.configFile)
146 config = DatastoreConfig(self.configFile, mergeDefaults=False)
147 newroot = "/random/location"
148 self.datastoreType.setConfigRoot(newroot, config, full)
149 if self.rootKeys:
150 for k in self.rootKeys:
151 self.assertIn(newroot, config[k])
153 def testConstructor(self) -> None:
154 datastore = self.makeDatastore()
155 self.assertIsNotNone(datastore)
156 self.assertIs(datastore.isEphemeral, self.isEphemeral)
158 def testConfigurationValidation(self) -> None:
159 datastore = self.makeDatastore()
160 sc = self.storageClassFactory.getStorageClass("ThingOne")
161 datastore.validateConfiguration([sc])
163 sc2 = self.storageClassFactory.getStorageClass("ThingTwo")
164 if self.validationCanFail:
165 with self.assertRaises(DatastoreValidationError):
166 datastore.validateConfiguration([sc2], logFailures=True)
168 dimensions = self.universe.conform(("visit", "physical_filter"))
169 dataId = {
170 "instrument": "dummy",
171 "visit": 52,
172 "physical_filter": "V",
173 "band": "v",
174 "day_obs": 20250101,
175 }
176 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
177 datastore.validateConfiguration([ref])
179 def testParameterValidation(self) -> None:
180 """Check that parameters are validated"""
181 sc = self.storageClassFactory.getStorageClass("ThingOne")
182 dimensions = self.universe.conform(("visit", "physical_filter"))
183 dataId = {
184 "instrument": "dummy",
185 "visit": 52,
186 "physical_filter": "V",
187 "band": "v",
188 "day_obs": 20250101,
189 }
190 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
191 datastore = self.makeDatastore()
192 data = {1: 2, 3: 4}
193 datastore.put(data, ref)
194 newdata = datastore.get(ref)
195 self.assertEqual(data, newdata)
196 with self.assertRaises(KeyError):
197 newdata = datastore.get(ref, parameters={"missing": 5})
199 def testBasicPutGet(self) -> None:
200 metrics = makeExampleMetrics()
201 datastore = self.makeDatastore()
203 # Create multiple storage classes for testing different formulations
204 storageClasses = [
205 self.storageClassFactory.getStorageClass(sc)
206 for sc in ("StructuredData", "StructuredDataJson", "StructuredDataPickle")
207 ]
209 dimensions = self.universe.conform(("visit", "physical_filter"))
210 dataId = {
211 "instrument": "dummy",
212 "visit": 52,
213 "physical_filter": "V",
214 "band": "v",
215 "day_obs": 20250101,
216 }
217 dataId2 = {
218 "instrument": "dummy",
219 "visit": 53,
220 "physical_filter": "V",
221 "band": "v",
222 "day_obs": 20250101,
223 }
225 for sc in storageClasses:
226 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
227 ref2 = self.makeDatasetRef("metric", dimensions, sc, dataId2)
229 # Make sure that using getManyURIs without predicting before the
230 # dataset has been put raises.
231 with self.assertRaises(FileNotFoundError):
232 datastore.getManyURIs([ref], predict=False)
234 # Make sure that using getManyURIs with predicting before the
235 # dataset has been put predicts the URI.
236 uris = datastore.getManyURIs([ref, ref2], predict=True)
237 self.assertIn("52", uris[ref].primaryURI.geturl())
238 self.assertIn("#predicted", uris[ref].primaryURI.geturl())
239 self.assertIn("53", uris[ref2].primaryURI.geturl())
240 self.assertIn("#predicted", uris[ref2].primaryURI.geturl())
242 datastore.put(metrics, ref)
244 # Does it exist?
245 self.assertTrue(datastore.exists(ref))
246 self.assertTrue(datastore.knows(ref))
247 multi = datastore.knows_these([ref])
248 self.assertTrue(multi[ref])
249 multi = datastore.mexists([ref, ref2])
250 self.assertTrue(multi[ref])
251 self.assertFalse(multi[ref2])
253 # Get
254 metricsOut = datastore.get(ref, parameters=None)
255 self.assertEqual(metrics, metricsOut)
257 uri = datastore.getURI(ref)
258 self.assertEqual(uri.scheme, self.uriScheme)
260 uris = datastore.getManyURIs([ref])
261 self.assertEqual(len(uris), 1)
262 ref, uri = uris.popitem()
263 self.assertTrue(uri.primaryURI.exists())
264 self.assertFalse(uri.componentURIs)
266 # Get a component -- we need to construct new refs for them
267 # with derived storage classes but with parent ID
268 for comp in ("data", "output"):
269 compRef = ref.makeComponentRef(comp)
270 output = datastore.get(compRef)
271 self.assertEqual(output, getattr(metricsOut, comp))
273 uri = datastore.getURI(compRef)
274 self.assertEqual(uri.scheme, self.uriScheme)
276 uris = datastore.getManyURIs([compRef])
277 self.assertEqual(len(uris), 1)
279 storageClass = sc
281 # Check that we can put a metric with None in a component and
282 # get it back as None
283 metricsNone = makeExampleMetrics(use_none=True)
284 dataIdNone = {
285 "instrument": "dummy",
286 "visit": 54,
287 "physical_filter": "V",
288 "band": "v",
289 "day_obs": 20250101,
290 }
291 refNone = self.makeDatasetRef("metric", dimensions, sc, dataIdNone)
292 datastore.put(metricsNone, refNone)
294 comp = "data"
295 for comp in ("data", "output"):
296 compRef = refNone.makeComponentRef(comp)
297 output = datastore.get(compRef)
298 self.assertEqual(output, getattr(metricsNone, comp))
300 # Check that a put fails if the dataset type is not supported
301 if self.hasUnsupportedPut:
302 sc = StorageClass("UnsupportedSC", pytype=type(metrics))
303 ref = self.makeDatasetRef("unsupportedType", dimensions, sc, dataId)
304 with self.assertRaises(DatasetTypeNotSupportedError):
305 datastore.put(metrics, ref)
307 # These should raise
308 ref = self.makeDatasetRef("metrics", dimensions, storageClass, dataId)
309 with self.assertRaises(FileNotFoundError):
310 # non-existing file
311 datastore.get(ref)
313 # Get a URI from it
314 uri = datastore.getURI(ref, predict=True)
315 self.assertEqual(uri.scheme, self.uriScheme)
317 with self.assertRaises(FileNotFoundError):
318 datastore.getURI(ref)
320 def testTrustGetRequest(self) -> None:
321 """Check that we can get datasets that registry knows nothing about."""
322 datastore = self.makeDatastore()
324 # Skip test if the attribute is not defined
325 if not hasattr(datastore, "trustGetRequest"):
326 return
328 metrics = makeExampleMetrics()
330 i = 0
331 for sc_name in ("StructuredDataNoComponents", "StructuredData", "StructuredComposite"):
332 i += 1
333 datasetTypeName = f"test_metric{i}" # Different dataset type name each time.
335 if sc_name == "StructuredComposite":
336 disassembled = True
337 else:
338 disassembled = False
340 # Start datastore in default configuration of using registry
341 datastore.trustGetRequest = False
343 # Create multiple storage classes for testing with or without
344 # disassembly
345 sc = self.storageClassFactory.getStorageClass(sc_name)
346 dimensions = self.universe.conform(("visit", "physical_filter"))
348 dataId = {
349 "instrument": "dummy",
350 "visit": 52 + i,
351 "physical_filter": "V",
352 "band": "v",
353 "day_obs": 20250101,
354 }
356 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId)
357 datastore.put(metrics, ref)
359 # Does it exist?
360 self.assertTrue(datastore.exists(ref))
361 self.assertTrue(datastore.knows(ref))
362 multi = datastore.knows_these([ref])
363 self.assertTrue(multi[ref])
364 multi = datastore.mexists([ref])
365 self.assertTrue(multi[ref])
367 # Get
368 metricsOut = datastore.get(ref)
369 self.assertEqual(metrics, metricsOut)
371 # Get the URI(s)
372 allURIs = datastore.getURIs(ref)
373 primaryURI, componentURIs = allURIs
374 if disassembled:
375 self.assertIsNone(primaryURI)
376 self.assertEqual(len(componentURIs), 3)
377 self.assertEqual(list(allURIs.iter_all()), list(componentURIs.values()))
378 else:
379 self.assertIn(datasetTypeName, primaryURI.path)
380 self.assertFalse(componentURIs)
381 self.assertEqual(list(allURIs.iter_all()), [primaryURI])
383 # Delete registry entry so now we are trusting
384 datastore.removeStoredItemInfo(ref)
386 # Now stop trusting and check that things break
387 datastore.trustGetRequest = False
389 # Does it exist?
390 self.assertFalse(datastore.exists(ref))
391 self.assertFalse(datastore.knows(ref))
392 multi = datastore.knows_these([ref])
393 self.assertFalse(multi[ref])
394 multi = datastore.mexists([ref])
395 self.assertFalse(multi[ref])
397 with self.assertRaises(FileNotFoundError):
398 datastore.get(ref)
400 if sc_name != "StructuredDataNoComponents":
401 with self.assertRaises(FileNotFoundError):
402 datastore.get(ref.makeComponentRef("data"))
404 # URI should fail unless we ask for prediction
405 with self.assertRaises(FileNotFoundError):
406 datastore.getURIs(ref)
408 predicted_primary, predicted_disassembled = datastore.getURIs(ref, predict=True)
409 if disassembled:
410 self.assertIsNone(predicted_primary)
411 self.assertEqual(len(predicted_disassembled), 3)
412 for uri in predicted_disassembled.values():
413 self.assertEqual(uri.fragment, "predicted")
414 self.assertIn(datasetTypeName, uri.path)
415 else:
416 self.assertIn(datasetTypeName, predicted_primary.path)
417 self.assertFalse(predicted_disassembled)
418 self.assertEqual(predicted_primary.fragment, "predicted")
420 # Now enable registry-free trusting mode
421 datastore.trustGetRequest = True
423 # Try again to get it
424 metricsOut = datastore.get(ref)
425 self.assertEqual(metricsOut, metrics)
427 # Does it exist?
428 self.assertTrue(datastore.exists(ref))
430 # Get a component
431 if sc_name != "StructuredDataNoComponents":
432 comp = "data"
433 compRef = ref.makeComponentRef(comp)
434 output = datastore.get(compRef)
435 self.assertEqual(output, getattr(metrics, comp))
437 # Get the URI -- if we trust this should work even without
438 # enabling prediction.
439 primaryURI2, componentURIs2 = datastore.getURIs(ref)
440 self.assertEqual(primaryURI2, primaryURI)
441 self.assertEqual(componentURIs2, componentURIs)
443 # Check for compatible storage class.
444 if sc_name in ("StructuredDataNoComponents", "StructuredData"):
445 # Make new dataset ref with compatible storage class.
446 ref_comp = ref.overrideStorageClass("StructuredDataDictJson")
448 # Without `set_retrieve_dataset_type_method` it will fail to
449 # find correct file.
450 self.assertFalse(datastore.exists(ref_comp))
451 with self.assertRaises(FileNotFoundError):
452 datastore.get(ref_comp)
453 with self.assertRaises(FileNotFoundError):
454 datastore.get(ref, storageClass="StructuredDataDictJson")
456 # Need a special method to generate stored dataset type.
457 def _stored_dataset_type(name: str, ref: DatasetRef = ref) -> DatasetType:
458 if name == ref.datasetType.name:
459 return ref.datasetType
460 raise ValueError(f"Unexpected dataset type name {ref.datasetType.name}")
462 datastore.set_retrieve_dataset_type_method(_stored_dataset_type)
464 # Storage class override with original dataset ref.
465 metrics_as_dict = datastore.get(ref, storageClass="StructuredDataDictJson")
466 self.assertIsInstance(metrics_as_dict, dict)
468 # get() should return a dict now.
469 metrics_as_dict = datastore.get(ref_comp)
470 self.assertIsInstance(metrics_as_dict, dict)
472 # exists() should work as well.
473 self.assertTrue(datastore.exists(ref_comp))
475 datastore.set_retrieve_dataset_type_method(None)
477 def testDisassembly(self) -> None:
478 """Test disassembly within datastore."""
479 metrics = makeExampleMetrics()
480 if self.isEphemeral:
481 # in-memory datastore does not disassemble
482 return
484 # Create multiple storage classes for testing different formulations
485 # of composites. One of these will not disassemble to provide
486 # a reference.
487 storageClasses = [
488 self.storageClassFactory.getStorageClass(sc)
489 for sc in (
490 "StructuredComposite",
491 "StructuredCompositeTestA",
492 "StructuredCompositeTestB",
493 "StructuredCompositeReadComp",
494 "StructuredData", # No disassembly
495 "StructuredCompositeReadCompNoDisassembly",
496 )
497 ]
499 # Create the test datastore
500 datastore = self.makeDatastore()
502 # Dummy dataId
503 dimensions = self.universe.conform(("visit", "physical_filter"))
504 dataId = {"instrument": "dummy", "visit": 428, "physical_filter": "R"}
506 for i, sc in enumerate(storageClasses):
507 with self.subTest(storageClass=sc.name):
508 # Create a different dataset type each time round
509 # so that a test failure in this subtest does not trigger
510 # a cascade of tests because of file clashes
511 ref = self.makeDatasetRef(f"metric_comp_{i}", dimensions, sc, dataId)
513 disassembled = sc.name not in {"StructuredData", "StructuredCompositeReadCompNoDisassembly"}
515 datastore.put(metrics, ref)
517 baseURI, compURIs = datastore.getURIs(ref)
518 if disassembled:
519 self.assertIsNone(baseURI)
520 self.assertEqual(set(compURIs), {"data", "output", "summary"})
521 else:
522 self.assertIsNotNone(baseURI)
523 self.assertEqual(compURIs, {})
525 metrics_get = datastore.get(ref)
526 self.assertEqual(metrics_get, metrics)
528 # Retrieve the composite with read parameter
529 stop = 4
530 metrics_get = datastore.get(ref, parameters={"slice": slice(stop)})
531 self.assertEqual(metrics_get.summary, metrics.summary)
532 self.assertEqual(metrics_get.output, metrics.output)
533 self.assertEqual(metrics_get.data, metrics.data[:stop])
535 # Retrieve a component
536 data = datastore.get(ref.makeComponentRef("data"))
537 self.assertEqual(data, metrics.data)
539 # On supported storage classes attempt to access a read
540 # only component
541 if "ReadComp" in sc.name:
542 cRef = ref.makeComponentRef("counter")
543 counter = datastore.get(cRef)
544 self.assertEqual(counter, len(metrics.data))
546 counter = datastore.get(cRef, parameters={"slice": slice(stop)})
547 self.assertEqual(counter, stop)
549 datastore.remove(ref)
551 def prepDeleteTest(self, n_refs: int = 1) -> tuple[Datastore, tuple[DatasetRef, ...]]:
552 metrics = makeExampleMetrics()
553 datastore = self.makeDatastore()
554 # Put
555 dimensions = self.universe.conform(("visit", "physical_filter"))
556 sc = self.storageClassFactory.getStorageClass("StructuredData")
557 refs = []
558 for i in range(n_refs):
559 dataId = {
560 "instrument": "dummy",
561 "visit": 638 + i,
562 "physical_filter": "U",
563 "band": "u",
564 "day_obs": 20250101,
565 }
566 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
567 datastore.put(metrics, ref)
569 # Does it exist?
570 self.assertTrue(datastore.exists(ref))
572 # Get
573 metricsOut = datastore.get(ref)
574 self.assertEqual(metrics, metricsOut)
575 refs.append(ref)
577 return datastore, *refs
579 def testRemove(self) -> None:
580 datastore, ref = self.prepDeleteTest()
582 # Remove
583 datastore.remove(ref)
585 # Does it exist?
586 self.assertFalse(datastore.exists(ref))
588 # Do we now get a predicted URI?
589 uri = datastore.getURI(ref, predict=True)
590 self.assertEqual(uri.fragment, "predicted")
592 # Get should now fail
593 with self.assertRaises(FileNotFoundError):
594 datastore.get(ref)
595 # Can only delete once
596 with self.assertRaises(FileNotFoundError):
597 datastore.remove(ref)
599 def testForget(self) -> None:
600 datastore, ref = self.prepDeleteTest()
602 # Remove
603 datastore.forget([ref])
605 # Does it exist (as far as we know)?
606 self.assertFalse(datastore.exists(ref))
608 # Do we now get a predicted URI?
609 uri = datastore.getURI(ref, predict=True)
610 self.assertEqual(uri.fragment, "predicted")
612 # Get should now fail
613 with self.assertRaises(FileNotFoundError):
614 datastore.get(ref)
616 # Forgetting again is a silent no-op
617 datastore.forget([ref])
619 # Predicted URI should still point to the file.
620 self.assertTrue(uri.exists())
622 def testTransfer(self) -> None:
623 metrics = makeExampleMetrics()
625 dimensions = self.universe.conform(("visit", "physical_filter"))
626 dataId = {
627 "instrument": "dummy",
628 "visit": 2048,
629 "physical_filter": "Uprime",
630 "band": "u",
631 "day_obs": 20250101,
632 }
634 sc = self.storageClassFactory.getStorageClass("StructuredData")
635 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
637 inputDatastore = self.makeDatastore("test_input_datastore")
638 outputDatastore = self.makeDatastore("test_output_datastore")
640 inputDatastore.put(metrics, ref)
641 outputDatastore.transfer(inputDatastore, ref)
643 metricsOut = outputDatastore.get(ref)
644 self.assertEqual(metrics, metricsOut)
646 def testBasicTransaction(self) -> None:
647 datastore = self.makeDatastore()
648 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
649 dimensions = self.universe.conform(("visit", "physical_filter"))
650 nDatasets = 6
651 dataIds = [
652 {"instrument": "dummy", "visit": i, "physical_filter": "V", "band": "v", "day_obs": 20250101}
653 for i in range(nDatasets)
654 ]
655 data = [
656 (
657 self.makeDatasetRef("metric", dimensions, storageClass, dataId),
658 makeExampleMetrics(),
659 )
660 for dataId in dataIds
661 ]
662 succeed = data[: nDatasets // 2]
663 fail = data[nDatasets // 2 :]
664 # All datasets added in this transaction should continue to exist
665 with datastore.transaction():
666 for ref, metrics in succeed:
667 datastore.put(metrics, ref)
668 # Whereas datasets added in this transaction should not
669 with self.assertRaises(TransactionTestError):
670 with datastore.transaction():
671 for ref, metrics in fail:
672 datastore.put(metrics, ref)
673 raise TransactionTestError("This should propagate out of the context manager")
674 # Check for datasets that should exist
675 for ref, metrics in succeed:
676 # Does it exist?
677 self.assertTrue(datastore.exists(ref))
678 # Get
679 metricsOut = datastore.get(ref, parameters=None)
680 self.assertEqual(metrics, metricsOut)
681 # URI
682 uri = datastore.getURI(ref)
683 self.assertEqual(uri.scheme, self.uriScheme)
684 # Check for datasets that should not exist
685 for ref, _ in fail:
686 # These should raise
687 with self.assertRaises(FileNotFoundError):
688 # non-existing file
689 datastore.get(ref)
690 with self.assertRaises(FileNotFoundError):
691 datastore.getURI(ref)
693 def testNestedTransaction(self) -> None:
694 datastore = self.makeDatastore()
695 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
696 dimensions = self.universe.conform(("visit", "physical_filter"))
697 metrics = makeExampleMetrics()
699 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101}
700 refBefore = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
701 datastore.put(metrics, refBefore)
702 with self.assertRaises(TransactionTestError):
703 with datastore.transaction():
704 dataId = {
705 "instrument": "dummy",
706 "visit": 1,
707 "physical_filter": "V",
708 "band": "v",
709 "day_obs": 20250101,
710 }
711 refOuter = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
712 datastore.put(metrics, refOuter)
713 with datastore.transaction():
714 dataId = {
715 "instrument": "dummy",
716 "visit": 2,
717 "physical_filter": "V",
718 "band": "v",
719 "day_obs": 20250101,
720 }
721 refInner = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
722 datastore.put(metrics, refInner)
723 # All datasets should exist
724 for ref in (refBefore, refOuter, refInner):
725 metricsOut = datastore.get(ref, parameters=None)
726 self.assertEqual(metrics, metricsOut)
727 raise TransactionTestError("This should roll back the transaction")
728 # Dataset(s) inserted before the transaction should still exist
729 metricsOut = datastore.get(refBefore, parameters=None)
730 self.assertEqual(metrics, metricsOut)
731 # But all datasets inserted during the (rolled back) transaction
732 # should be gone
733 with self.assertRaises(FileNotFoundError):
734 datastore.get(refOuter)
735 with self.assertRaises(FileNotFoundError):
736 datastore.get(refInner)
738 def _prepareIngestTest(self) -> tuple[MetricsExample, DatasetRef]:
739 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
740 dimensions = self.universe.conform(("visit", "physical_filter"))
741 metrics = makeExampleMetrics()
742 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101}
743 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
744 return metrics, ref
746 def runIngestTest(self, func: Callable[[MetricsExample, str, DatasetRef], None]) -> None:
747 metrics, ref = self._prepareIngestTest()
748 # The file will be deleted after the test.
749 # For symlink tests this leads to a situation where the datastore
750 # points to a file that does not exist. This will make os.path.exist
751 # return False but then the new symlink will fail with
752 # FileExistsError later in the code so the test still passes.
753 with _temp_yaml_file(metrics._asdict()) as path:
754 func(metrics, path, ref)
756 def testIngestNoTransfer(self) -> None:
757 """Test ingesting existing files with no transfer."""
758 for mode in (None, "auto"):
759 # Some datastores have auto but can't do in place transfer
760 if mode == "auto" and "auto" in self.ingestTransferModes and not self.canIngestNoTransferAuto:
761 continue
763 with self.subTest(mode=mode):
764 datastore = self.makeDatastore()
766 def succeed(
767 obj: MetricsExample,
768 path: str,
769 ref: DatasetRef,
770 mode: str | None = mode,
771 datastore: Datastore = datastore,
772 ) -> None:
773 """Ingest a file already in the datastore root."""
774 # first move it into the root, and adjust the path
775 # accordingly.
776 # In the case of a ChainedDatastore, we have multiple
777 # roots, all of which will accept the file, so we
778 # have to copy it into all the roots.
779 relative_path = None
780 for root in datastore.roots.values():
781 if root is not None:
782 copied_path = shutil.copy(path, root.ospath)
783 relative_path = os.path.relpath(copied_path, start=root.ospath)
784 assert relative_path is not None, (
785 "Running a FileDatastore test on a Datastore instance without any roots"
786 )
787 datastore.ingest(FileDataset(path=relative_path, refs=ref), transfer=mode)
788 self.assertEqual(obj, datastore.get(ref))
790 def failInputDoesNotExist(
791 obj: MetricsExample,
792 path: str,
793 ref: DatasetRef,
794 mode: str | None = mode,
795 datastore: Datastore = datastore,
796 ) -> None:
797 """Can't ingest files if we're given a bad path."""
798 with self.assertRaises(FileNotFoundError):
799 datastore.ingest(
800 FileDataset(path="this-file-does-not-exist.yaml", refs=ref), transfer=mode
801 )
802 self.assertFalse(datastore.exists(ref))
804 def failOutsideRoot(
805 obj: MetricsExample,
806 path: str,
807 ref: DatasetRef,
808 mode: str | None = mode,
809 datastore: Datastore = datastore,
810 ) -> None:
811 """Can't ingest files outside of datastore root unless
812 auto.
813 """
814 if mode == "auto":
815 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode)
816 self.assertTrue(datastore.exists(ref))
817 else:
818 with self.assertRaises(RuntimeError):
819 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode)
820 self.assertFalse(datastore.exists(ref))
822 def failNotImplemented(
823 obj: MetricsExample,
824 path: str,
825 ref: DatasetRef,
826 mode: str | None = mode,
827 datastore: Datastore = datastore,
828 ) -> None:
829 with self.assertRaises(NotImplementedError):
830 datastore.ingest(FileDataset(path=path, refs=ref), transfer=mode)
832 if mode in self.ingestTransferModes:
833 self.runIngestTest(failOutsideRoot)
834 self.runIngestTest(failInputDoesNotExist)
835 self.runIngestTest(succeed)
836 else:
837 self.runIngestTest(failNotImplemented)
839 def testIngestTransfer(self) -> None:
840 """Test ingesting existing files after transferring them."""
841 for mode in ("copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto"):
842 with self.subTest(mode=mode):
843 datastore = self.makeDatastore(mode)
845 def succeed(
846 obj: MetricsExample,
847 path: str,
848 ref: DatasetRef,
849 mode: str | None = mode,
850 datastore: Datastore = datastore,
851 ) -> None:
852 """Ingest a file by transferring it to the template
853 location.
854 """
855 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode)
856 self.assertEqual(obj, datastore.get(ref))
857 file_exists = os.path.exists(path)
858 if mode == "move":
859 self.assertFalse(file_exists)
860 else:
861 self.assertTrue(file_exists)
863 def failInputDoesNotExist(
864 obj: MetricsExample,
865 path: str,
866 ref: DatasetRef,
867 mode: str | None = mode,
868 datastore: Datastore = datastore,
869 ) -> None:
870 """Can't ingest files if we're given a bad path."""
871 with self.assertRaises(FileNotFoundError):
872 # Ensure the file does not look like it is in
873 # datastore for auto mode
874 datastore.ingest(
875 FileDataset(path="../this-file-does-not-exist.yaml", refs=ref), transfer=mode
876 )
877 self.assertFalse(datastore.exists(ref), f"Checking not in datastore using mode {mode}")
879 def failNotImplemented(
880 obj: MetricsExample,
881 path: str,
882 ref: DatasetRef,
883 mode: str | None = mode,
884 datastore: Datastore = datastore,
885 ) -> None:
886 with self.assertRaises(NotImplementedError):
887 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode)
889 if mode in self.ingestTransferModes:
890 self.runIngestTest(failInputDoesNotExist)
891 self.runIngestTest(succeed)
892 else:
893 self.runIngestTest(failNotImplemented)
895 def testIngestSymlinkOfSymlink(self) -> None:
896 """Special test for symlink to a symlink ingest"""
897 metrics, ref = self._prepareIngestTest()
898 # The aim of this test is to create a dataset on disk, then
899 # create a symlink to it and finally ingest the symlink such that
900 # the symlink in the datastore points to the original dataset.
901 for mode in ("symlink", "relsymlink"):
902 if mode not in self.ingestTransferModes:
903 continue
905 print(f"Trying mode {mode}")
906 with _temp_yaml_file(metrics._asdict()) as realpath:
907 with tempfile.TemporaryDirectory() as tmpdir:
908 sympath = os.path.join(tmpdir, "symlink.yaml")
909 os.symlink(os.path.realpath(realpath), sympath)
911 datastore = self.makeDatastore()
912 datastore.ingest(FileDataset(path=os.path.abspath(sympath), refs=ref), transfer=mode)
914 uri = datastore.getURI(ref)
915 self.assertTrue(uri.isLocal, f"Check {uri.scheme}")
916 self.assertTrue(os.path.islink(uri.ospath), f"Check {uri} is a symlink")
918 linkTarget = os.readlink(uri.ospath)
919 if mode == "relsymlink":
920 self.assertFalse(os.path.isabs(linkTarget))
921 else:
922 self.assertTrue(os.path.samefile(linkTarget, realpath))
924 # Check that we can get the dataset back regardless of mode
925 metric2 = datastore.get(ref)
926 self.assertEqual(metric2, metrics)
928 # Cleanup the file for next time round loop
929 # since it will get the same file name in store
930 datastore.remove(ref)
932 def _populate_export_datastore(self, name: str) -> tuple[Datastore, list[DatasetRef]]:
933 datastore = self.makeDatastore(name)
935 # For now only the FileDatastore can be used for this test.
936 # ChainedDatastore that only includes InMemoryDatastores have to be
937 # skipped as well.
938 for name in datastore.names:
939 if not name.startswith("InMemoryDatastore"):
940 break
941 else:
942 raise unittest.SkipTest("in-memory datastore does not support record export/import")
944 metrics = makeExampleMetrics()
945 dimensions = self.universe.conform(("visit", "physical_filter"))
946 sc = self.storageClassFactory.getStorageClass("StructuredData")
948 refs = []
949 for visit in (2048, 2049, 2050):
950 dataId = {
951 "instrument": "dummy",
952 "visit": visit,
953 "physical_filter": "Uprime",
954 "band": "u",
955 "day_obs": 20250101,
956 }
957 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
958 datastore.put(metrics, ref)
959 refs.append(ref)
960 return datastore, refs
962 def testExportImportRecords(self) -> None:
963 """Test for export_records and import_records methods."""
964 datastore, refs = self._populate_export_datastore("test_datastore")
965 for exported_refs in (refs, refs[1:]):
966 n_refs = len(exported_refs)
967 records = datastore.export_records(exported_refs)
968 self.assertGreater(len(records), 0)
969 self.assertTrue(set(records.keys()) <= set(datastore.names))
970 # In a ChainedDatastore each FileDatastore will have a complete set
971 for datastore_name in records:
972 record_data = records[datastore_name]
973 self.assertEqual(len(record_data.records), n_refs)
975 # Check that subsetting works, include non-existing dataset ID.
976 dataset_ids = {exported_refs[0].id, uuid.uuid4()}
977 subset = record_data.subset(dataset_ids)
978 assert subset is not None
979 self.assertEqual(len(subset.records), 1)
980 subset = record_data.subset({uuid.uuid4()})
981 self.assertIsNone(subset)
983 # Use the same datastore name to import relative path.
984 datastore2 = self.makeDatastore("test_datastore")
986 records = datastore.export_records(refs[1:])
987 datastore2.import_records(records)
989 with self.assertRaises(FileNotFoundError):
990 data = datastore2.get(refs[0])
991 data = datastore2.get(refs[1])
992 self.assertIsNotNone(data)
993 data = datastore2.get(refs[2])
994 self.assertIsNotNone(data)
996 def testExportPredictedRecords(self):
997 if self.isEphemeral:
998 raise unittest.SkipTest("in-memory datastore does not support record export/import")
999 sc = self.storageClassFactory.getStorageClass("ThingOne")
1000 dimensions = self.universe.conform(("visit", "physical_filter"))
1001 dataId = {
1002 "instrument": "dummy",
1003 "visit": 52,
1004 "physical_filter": "V",
1005 "band": "v",
1006 "day_obs": 20250101,
1007 }
1008 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
1010 datastore = self.makeDatastore("test_datastore")
1011 names = {n for n in datastore.names if not n.startswith("InMemory")}
1012 records = datastore.export_predicted_records([ref])
1014 # Expect predicted records from all datastores.
1015 self.assertEqual(set(records.keys()), names)
1017 for record_data in records.values():
1018 self.assertEqual(len(record_data.records), 1)
1020 def testExport(self) -> None:
1021 datastore, refs = self._populate_export_datastore("test_datastore")
1023 datasets = list(datastore.export(refs))
1024 self.assertEqual(len(datasets), 3)
1026 for transfer in (None, "auto"):
1027 # Both will default to None
1028 datasets = list(datastore.export(refs, transfer=transfer))
1029 self.assertEqual(len(datasets), 3)
1031 with self.assertRaises(TypeError):
1032 list(datastore.export(refs, transfer="copy"))
1034 with self.assertRaises(TypeError):
1035 list(datastore.export(refs, directory="exportDir", transfer="move"))
1037 # Create a new ref that is not known to the datastore and try to
1038 # export it.
1039 sc = self.storageClassFactory.getStorageClass("ThingOne")
1040 dimensions = self.universe.conform(("visit", "physical_filter"))
1041 dataId = {
1042 "instrument": "dummy",
1043 "visit": 52,
1044 "physical_filter": "V",
1045 "band": "v",
1046 "day_obs": 20250101,
1047 }
1048 ref = self.makeDatasetRef("metric", dimensions, sc, dataId)
1049 with self.assertRaises(FileNotFoundError):
1050 list(datastore.export(refs + [ref], transfer=None))
1052 def test_pydantic_dict_storage_class_conversions(self) -> None:
1053 """Test converting a dataset stored as a pydantic model into a dict on
1054 read.
1055 """
1056 datastore = self.makeDatastore()
1057 store_as_model = self.makeDatasetRef(
1058 "store_as_model",
1059 dimensions=self.universe.empty,
1060 storageClass="DictConvertibleModel",
1061 dataId=DataCoordinate.make_empty(self.universe),
1062 )
1063 content = {"a": "one", "b": "two"}
1064 model = DictConvertibleModel.from_dict(content, extra="original content")
1065 datastore.put(model, store_as_model)
1066 retrieved_model = datastore.get(store_as_model)
1067 self.assertEqual(retrieved_model, model)
1068 loaded = datastore.get(store_as_model.overrideStorageClass("NativeDictForConvertibleModel"))
1069 self.assertEqual(type(loaded), dict)
1070 self.assertEqual(loaded, content)
1072 def test_simple_class_put_get(self) -> None:
1073 """Test that we can put and get a simple class with dict()
1074 constructor.
1075 """
1076 datastore = self.makeDatastore()
1077 data = MetricsExample(summary={"a": 1}, data=[1, 2, 3], output={"b": 2})
1078 self._assert_different_puts(datastore, "MetricsExample", data)
1080 def test_dataclass_put_get(self) -> None:
1081 """Test that we can put and get a simple dataclass."""
1082 datastore = self.makeDatastore()
1083 data = MetricsExampleDataclass(summary={"a": 1}, data=[1, 2, 3], output={"b": 2})
1084 self._assert_different_puts(datastore, "MetricsExampleDataclass", data)
1086 def test_pydantic_put_get(self) -> None:
1087 """Test that we can put and get a simple Pydantic model."""
1088 datastore = self.makeDatastore()
1089 data = MetricsExampleModel(summary={"a": 1}, data=[1, 2, 3], output={"b": 2})
1090 self._assert_different_puts(datastore, "MetricsExampleModel", data)
1092 def test_tuple_put_get(self) -> None:
1093 """Test that we can put and get a tuple."""
1094 datastore = self.makeDatastore()
1095 data = ("a", "b", 1)
1096 self._assert_different_puts(datastore, "TupleExample", data)
1098 def _assert_different_puts(self, datastore: Datastore, storageClass_root: str, data: Any) -> None:
1099 refs = {
1100 x: self.makeDatasetRef(
1101 f"stora_as_{x}",
1102 dimensions=self.universe.empty,
1103 storageClass=f"{storageClass_root}{x}",
1104 dataId=DataCoordinate.make_empty(self.universe),
1105 )
1106 for x in ["A", "B"]
1107 }
1109 for ref in refs.values():
1110 datastore.put(data, ref)
1112 self.assertEqual(datastore.get(refs["A"]), datastore.get(refs["B"]))
1115class PosixDatastoreTestCase(DatastoreTests, unittest.TestCase):
1116 """PosixDatastore specialization"""
1118 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1119 uriScheme = "file"
1120 canIngestNoTransferAuto = True
1121 ingestTransferModes = (None, "copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto")
1122 isEphemeral = False
1123 rootKeys = ("root",)
1124 validationCanFail = True
1126 def setUp(self) -> None:
1127 # The call to os.path.realpath is necessary because Mac temporary files
1128 # can end up in either /private/var/folders or /var/folders, which
1129 # refer to the same location but don't appear to.
1130 # This matters for "relsymlink" transfer mode, because it needs to be
1131 # able to read the file through a relative symlink, but some of the
1132 # intermediate directories are not traversable if you try to get from a
1133 # tempfile in /var/folders to one in /private/var/folders via a
1134 # relative path.
1135 self.root = os.path.realpath(self.enterContext(tempfile.TemporaryDirectory()))
1136 super().setUp()
1138 def testAtomicWrite(self) -> None:
1139 """Test that we write to a temporary and then rename"""
1140 datastore = self.makeDatastore()
1141 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1142 dimensions = self.universe.conform(("visit", "physical_filter"))
1143 metrics = makeExampleMetrics()
1145 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101}
1146 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
1148 with self.assertLogs("lsst.resources", "DEBUG") as cm:
1149 datastore.put(metrics, ref)
1150 move_logs = [ll for ll in cm.output if "transfer=" in ll]
1151 self.assertIn("transfer=move", move_logs[0])
1153 # And the transfer should be file to file.
1154 self.assertEqual(move_logs[0].count("file://"), 2)
1156 def testCanNotDeterminePutFormatterLocation(self) -> None:
1157 """Verify that the expected exception is raised if the FileDatastore
1158 can not determine the put formatter location.
1159 """
1160 _ = makeExampleMetrics()
1161 datastore = self.makeDatastore()
1163 # Create multiple storage classes for testing different formulations
1164 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1166 sccomp = StorageClass("Dummy")
1167 compositeStorageClass = StorageClass(
1168 "StructuredComposite", components={"dummy": sccomp, "dummy2": sccomp}
1169 )
1171 dimensions = self.universe.conform(("visit", "physical_filter"))
1172 dataId = {
1173 "instrument": "dummy",
1174 "visit": 52,
1175 "physical_filter": "V",
1176 "band": "v",
1177 "day_obs": 20250101,
1178 }
1180 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
1181 compRef = self.makeDatasetRef("metric", dimensions, compositeStorageClass, dataId)
1183 def raiser(ref: DatasetRef) -> None:
1184 raise DatasetTypeNotSupportedError()
1186 with unittest.mock.patch.object(
1187 lsst.daf.butler.datastores.fileDatastore.FileDatastore,
1188 "_determine_put_formatter_location",
1189 side_effect=raiser,
1190 ):
1191 # verify the non-composite ref execution path:
1192 with self.assertRaises(DatasetTypeNotSupportedError):
1193 datastore.getURIs(ref, predict=True)
1195 # verify the composite-ref execution path:
1196 with self.assertRaises(DatasetTypeNotSupportedError):
1197 datastore.getURIs(compRef, predict=True)
1199 def test_roots(self):
1200 datastore = self.makeDatastore()
1202 self.assertEqual(set(datastore.names), set(datastore.roots.keys()))
1203 for root in datastore.roots.values():
1204 if root is not None:
1205 self.assertTrue(root.exists())
1207 def test_prepare_get_for_external_client(self):
1208 datastore = self.makeDatastore()
1209 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1210 dimensions = self.universe.conform(("visit", "physical_filter"))
1211 dataId = {"instrument": "dummy", "visit": 52, "physical_filter": "V", "band": "v"}
1212 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
1213 # Most of the coverage for this function is in test_server.py,
1214 # because it requires a file backend that supports URL signing.
1215 self.assertIsNone(datastore.prepare_get_for_external_client(ref))
1218class PosixDatastoreNoChecksumsTestCase(PosixDatastoreTestCase):
1219 """Posix datastore tests but with checksums disabled."""
1221 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreNoChecksums.yaml")
1223 def testChecksum(self) -> None:
1224 """Ensure that checksums have not been calculated."""
1225 datastore = self.makeDatastore()
1226 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1227 dimensions = self.universe.conform(("visit", "physical_filter"))
1228 metrics = makeExampleMetrics()
1230 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101}
1231 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
1233 # Configuration should have disabled checksum calculation
1234 datastore.put(metrics, ref)
1235 infos = datastore.getStoredItemsInfo(ref)
1236 self.assertIsNone(infos[0].checksum)
1238 # Remove put back but with checksums enabled explicitly
1239 datastore.remove(ref)
1240 datastore.useChecksum = True
1241 datastore.put(metrics, ref)
1243 infos = datastore.getStoredItemsInfo(ref)
1244 self.assertIsNotNone(infos[0].checksum)
1246 def test_repeat_ingest(self):
1247 """Test that repeatedly ingesting the same file in direct mode
1248 is allowed.
1250 Test can only run with FileDatastore since that is the only one
1251 supporting "direct" ingest.
1252 """
1253 metrics, v4ref = self._prepareIngestTest()
1254 datastore = self.makeDatastore()
1255 v5ref = DatasetRef(
1256 v4ref.datasetType, v4ref.dataId, v4ref.run, id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN
1257 )
1259 with _temp_yaml_file(metrics._asdict()) as path:
1260 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct")
1262 # This will fail because the ref is using UUIDv4.
1263 with self.assertRaises(RuntimeError):
1264 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct")
1266 # UUIDv5 can be repeatedly ingested in direct mode.
1267 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct")
1268 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct")
1270 with self.assertRaises(RuntimeError):
1271 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="copy")
1274class TrashDatastoreTestCase(PosixDatastoreTestCase):
1275 """Restrict trash test to FileDatastore."""
1277 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1279 def testTrash(self) -> None:
1280 datastore, *refs = self.prepDeleteTest(n_refs=10)
1282 # Trash one of them.
1283 ref = refs.pop()
1284 uri = datastore.getURI(ref)
1285 datastore.trash(ref)
1286 self.assertTrue(uri.exists(), uri) # Not deleted yet
1287 datastore.emptyTrash()
1288 self.assertFalse(uri.exists(), uri)
1290 # Trash it again should be fine.
1291 datastore.trash(ref)
1293 # Trash multiple items at once.
1294 subset = [refs.pop(), refs.pop()]
1295 datastore.trash(subset)
1296 datastore.emptyTrash()
1298 # Remove a record and trash should do nothing.
1299 # This is execution butler scenario.
1300 ref = refs.pop()
1301 uri = datastore.getURI(ref)
1302 datastore._table.delete(["dataset_id"], {"dataset_id": ref.id})
1303 self.assertTrue(uri.exists())
1304 datastore.trash(ref)
1305 datastore.emptyTrash()
1306 self.assertTrue(uri.exists())
1308 # Switch on trust and it should delete the file.
1309 datastore.trustGetRequest = True
1310 datastore.trash([ref])
1311 self.assertFalse(uri.exists())
1313 # Remove multiples at once in trust mode.
1314 subset = [refs.pop() for i in range(3)]
1315 datastore.trash(subset)
1316 datastore.trash(refs.pop()) # Check that a single ref can trash
1318 def test_empty_trash(self) -> None:
1319 """Test parameters and return value for empty trash."""
1320 datastore, *refs = self.prepDeleteTest(n_refs=10)
1322 # Trash one of them.
1323 ref = refs.pop()
1324 uri = datastore.getURI(ref)
1325 datastore.trash(ref)
1326 self.assertTrue(uri.exists(), uri) # Not deleted yet
1328 # Empty trash but with a list of refs that does not include the
1329 # one in the trash table.
1330 removed = datastore.emptyTrash(refs=refs)
1331 self.assertEqual(len(removed), 0)
1332 self.assertTrue(uri.exists(), uri)
1334 # Empty the entire trash but in dry_run mode.
1335 removed = datastore.emptyTrash(dry_run=True)
1336 self.assertEqual(len(removed), 1)
1337 self.assertEqual(removed.pop(), uri)
1338 self.assertTrue(uri.exists(), uri)
1340 # Empty the trash specifying the actual ref.
1341 removed = datastore.emptyTrash(refs=[ref])
1342 self.assertEqual(len(removed), 1)
1343 self.assertEqual(removed.pop(), uri)
1344 self.assertFalse(uri.exists(), uri)
1346 # Trash everything and empty.
1347 datastore.trash(refs)
1348 removed = datastore.emptyTrash(dry_run=True)
1349 for u in removed:
1350 self.assertTrue(u.exists())
1351 removed = datastore.emptyTrash()
1352 for u in removed:
1353 self.assertFalse(u.exists())
1356class CleanupPosixDatastoreTestCase(DatastoreTestsBase, unittest.TestCase):
1357 """Test datastore cleans up on failure."""
1359 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
1361 def setUp(self) -> None:
1362 # Override the working directory before calling the base class
1363 self.root = tempfile.mkdtemp()
1364 super().setUp()
1366 def testCleanup(self) -> None:
1367 """Test that a failed formatter write does cleanup a partial file."""
1368 metrics = makeExampleMetrics()
1369 datastore = self.makeDatastore()
1371 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1373 dimensions = self.universe.conform(("visit", "physical_filter"))
1374 dataId = {
1375 "instrument": "dummy",
1376 "visit": 52,
1377 "physical_filter": "V",
1378 "band": "v",
1379 "day_obs": 20250101,
1380 }
1382 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId)
1384 # Determine where the file will end up (we assume Formatters use
1385 # the same file extension)
1386 expectedUri = datastore.getURI(ref, predict=True)
1387 self.assertEqual(expectedUri.fragment, "predicted")
1389 self.assertEqual(expectedUri.getExtension(), ".yaml", f"Is there a file extension in {expectedUri}")
1391 # Try formatter that fails and formatter that fails and leaves
1392 # a file behind
1393 for formatter in (BadWriteFormatter, BadNoWriteFormatter):
1394 with self.subTest(formatter=str(formatter)):
1395 # Monkey patch the formatter
1396 datastore.formatterFactory.registerFormatter(ref.datasetType, formatter, overwrite=True)
1398 # Try to put the dataset, it should fail
1399 with self.assertRaises(RuntimeError):
1400 datastore.put(metrics, ref)
1402 # Check that there is no file on disk
1403 self.assertFalse(expectedUri.exists(), f"Check for existence of {expectedUri}")
1405 # Check that there is a directory
1406 dir = expectedUri.dirname()
1407 self.assertTrue(dir.exists(), f"Check for existence of directory {dir}")
1409 # Force YamlFormatter and check that this time a file is written
1410 datastore.formatterFactory.registerFormatter(ref.datasetType, YamlFormatter, overwrite=True)
1411 datastore.put(metrics, ref)
1412 self.assertTrue(expectedUri.exists(), f"Check for existence of {expectedUri}")
1413 datastore.remove(ref)
1414 self.assertFalse(expectedUri.exists(), f"Check for existence of now removed {expectedUri}")
1417class InMemoryDatastoreTestCase(DatastoreTests, unittest.TestCase):
1418 """PosixDatastore specialization"""
1420 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastore.yaml")
1421 uriScheme = "mem"
1422 hasUnsupportedPut = False
1423 ingestTransferModes = ()
1424 isEphemeral = True
1425 rootKeys = None
1426 validationCanFail = False
1429class ChainedDatastoreTestCase(PosixDatastoreTestCase):
1430 """ChainedDatastore specialization using a POSIXDatastore"""
1432 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore.yaml")
1433 hasUnsupportedPut = False
1434 canIngestNoTransferAuto = False
1435 ingestTransferModes = (None, "copy", "move", "hardlink", "symlink", "relsymlink", "link", "auto")
1436 isEphemeral = False
1437 rootKeys = (".datastores.1.root", ".datastores.2.root")
1438 validationCanFail = True
1441class ChainedDatastoreMemoryTestCase(InMemoryDatastoreTestCase):
1442 """ChainedDatastore specialization using all InMemoryDatastore"""
1444 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2.yaml")
1445 validationCanFail = False
1446 isEphemeral = True
1449class DatastoreConstraintsTests(DatastoreTestsBase):
1450 """Basic tests of constraints model of Datastores."""
1452 def testConstraints(self) -> None:
1453 """Test constraints model. Assumes that each test class has the
1454 same constraints.
1455 """
1456 metrics = makeExampleMetrics()
1457 datastore = self.makeDatastore()
1459 sc1 = self.storageClassFactory.getStorageClass("StructuredData")
1460 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson")
1461 dimensions = self.universe.conform(("visit", "physical_filter", "instrument"))
1462 dataId = {
1463 "visit": 52,
1464 "physical_filter": "V",
1465 "band": "v",
1466 "instrument": "DummyCamComp",
1467 "day_obs": 20250101,
1468 }
1470 # Write empty file suitable for ingest check (JSON and YAML variants)
1471 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml")
1472 testfile_j = tempfile.NamedTemporaryFile(suffix=".json")
1473 for datasetTypeName, sc, accepted in (
1474 ("metric", sc1, True),
1475 ("metric5", sc1, False),
1476 ("metric33", sc1, True),
1477 ("metric5", sc2, True),
1478 ):
1479 # Choose different temp file depending on StorageClass
1480 testfile = testfile_j if sc.name.endswith("Json") else testfile_y
1482 with self.subTest(datasetTypeName=datasetTypeName, storageClass=sc.name, file=testfile.name):
1483 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId)
1484 if accepted:
1485 datastore.put(metrics, ref)
1486 self.assertTrue(datastore.exists(ref))
1487 datastore.remove(ref)
1489 # Try ingest
1490 if self.canIngest:
1491 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link")
1492 self.assertTrue(datastore.exists(ref))
1493 datastore.remove(ref)
1494 else:
1495 with self.assertRaises(DatasetTypeNotSupportedError):
1496 datastore.put(metrics, ref)
1497 self.assertFalse(datastore.exists(ref))
1499 # Again with ingest
1500 if self.canIngest:
1501 with self.assertRaises(DatasetTypeNotSupportedError):
1502 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link")
1503 self.assertFalse(datastore.exists(ref))
1506class PosixDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase):
1507 """PosixDatastore specialization"""
1509 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreP.yaml")
1510 canIngest = True
1512 def setUp(self) -> None:
1513 # Override the working directory before calling the base class
1514 self.root = tempfile.mkdtemp()
1515 super().setUp()
1518class InMemoryDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase):
1519 """InMemoryDatastore specialization."""
1521 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastoreP.yaml")
1522 canIngest = False
1525class ChainedDatastoreConstraintsNativeTestCase(PosixDatastoreConstraintsTestCase):
1526 """ChainedDatastore specialization using a POSIXDatastore and constraints
1527 at the ChainedDatstore.
1528 """
1530 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePa.yaml")
1533class ChainedDatastoreConstraintsTestCase(PosixDatastoreConstraintsTestCase):
1534 """ChainedDatastore specialization using a POSIXDatastore."""
1536 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastoreP.yaml")
1539class ChainedDatastoreMemoryConstraintsTestCase(InMemoryDatastoreConstraintsTestCase):
1540 """ChainedDatastore specialization using all InMemoryDatastore."""
1542 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2P.yaml")
1543 canIngest = False
1546class ChainedDatastorePerStoreConstraintsTests(DatastoreTestsBase, unittest.TestCase):
1547 """Test that a chained datastore can control constraints per-datastore
1548 even if child datastore would accept.
1549 """
1551 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePb.yaml")
1553 def setUp(self) -> None:
1554 # Override the working directory before calling the base class
1555 self.root = tempfile.mkdtemp()
1556 super().setUp()
1558 def testConstraints(self) -> None:
1559 """Test chained datastore constraints model."""
1560 metrics = makeExampleMetrics()
1561 datastore = self.makeDatastore()
1563 sc1 = self.storageClassFactory.getStorageClass("StructuredData")
1564 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson")
1565 dimensions = self.universe.conform(("visit", "physical_filter", "instrument"))
1566 dataId1 = {
1567 "visit": 52,
1568 "physical_filter": "V",
1569 "band": "v",
1570 "instrument": "DummyCamComp",
1571 "day_obs": 20250101,
1572 }
1573 dataId2 = {"visit": 52, "physical_filter": "V", "band": "v", "instrument": "HSC", "day_obs": 20250101}
1575 # Write empty file suitable for ingest check (JSON and YAML variants)
1576 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml")
1577 testfile_j = tempfile.NamedTemporaryFile(suffix=".json")
1579 for typeName, dataId, sc, accept, ingest in (
1580 ("metric", dataId1, sc1, (False, True, False), True),
1581 ("metric5", dataId1, sc1, (False, False, False), False),
1582 ("metric5", dataId2, sc1, (True, False, False), False),
1583 ("metric33", dataId2, sc2, (True, True, False), True),
1584 ("metric5", dataId1, sc2, (False, True, False), True),
1585 ):
1586 # Choose different temp file depending on StorageClass
1587 testfile = testfile_j if sc.name.endswith("Json") else testfile_y
1589 with self.subTest(datasetTypeName=typeName, dataId=dataId, sc=sc.name):
1590 ref = self.makeDatasetRef(typeName, dimensions, sc, dataId)
1591 if any(accept):
1592 datastore.put(metrics, ref)
1593 self.assertTrue(datastore.exists(ref))
1595 # Check each datastore inside the chained datastore
1596 for childDatastore, expected in zip(datastore.datastores, accept, strict=True):
1597 self.assertEqual(
1598 childDatastore.exists(ref),
1599 expected,
1600 f"Testing presence of {ref} in datastore {childDatastore.name}",
1601 )
1603 datastore.remove(ref)
1605 # Check that ingest works
1606 if ingest:
1607 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link")
1608 self.assertTrue(datastore.exists(ref))
1610 # Check each datastore inside the chained datastore
1611 for childDatastore, expected in zip(datastore.datastores, accept, strict=True):
1612 # Ephemeral datastores means InMemory at the moment
1613 # and that does not accept ingest of files.
1614 if childDatastore.isEphemeral:
1615 expected = False
1616 self.assertEqual(
1617 childDatastore.exists(ref),
1618 expected,
1619 f"Testing presence of ingested {ref} in datastore {childDatastore.name}",
1620 )
1622 datastore.remove(ref)
1623 else:
1624 with self.assertRaises(DatasetTypeNotSupportedError):
1625 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link")
1627 else:
1628 with self.assertRaises(DatasetTypeNotSupportedError):
1629 datastore.put(metrics, ref)
1630 self.assertFalse(datastore.exists(ref))
1632 # Again with ingest
1633 with self.assertRaises(DatasetTypeNotSupportedError):
1634 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link")
1635 self.assertFalse(datastore.exists(ref))
1638@unittest.mock.patch.dict(os.environ, {}, clear=True)
1639class DatastoreCacheTestCase(DatasetTestHelper, unittest.TestCase):
1640 """Tests for datastore caching infrastructure."""
1642 @classmethod
1643 def setUpClass(cls) -> None:
1644 cls.storageClassFactory = StorageClassFactory()
1645 cls.universe = DimensionUniverse()
1647 # Ensure that we load the test storage class definitions.
1648 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml")
1649 cls.storageClassFactory.addFromConfig(scConfigFile)
1651 def setUp(self) -> None:
1652 self.id = 0
1654 # Create a root that we can use for caching tests.
1655 self.root = tempfile.mkdtemp()
1657 # Create some test dataset refs and associated test files
1658 sc = self.storageClassFactory.getStorageClass("StructuredDataDict")
1659 dimensions = self.universe.conform(("visit", "physical_filter"))
1660 dataId = {
1661 "instrument": "dummy",
1662 "visit": 52,
1663 "physical_filter": "V",
1664 "band": "v",
1665 "day_obs": 20250101,
1666 }
1668 # Create list of refs and list of temporary files
1669 n_datasets = 10
1670 self.refs = [self.makeDatasetRef(f"metric{n}", dimensions, sc, dataId) for n in range(n_datasets)]
1672 root_uri = ResourcePath(self.root, forceDirectory=True)
1673 self.files = [root_uri.join(f"file{n}.txt") for n in range(n_datasets)]
1675 # Create test files.
1676 for uri in self.files:
1677 uri.write(b"0123456789")
1679 # Create some composite refs with component files.
1680 sc = self.storageClassFactory.getStorageClass("StructuredData")
1681 self.composite_refs = [self.makeDatasetRef(f"composite{n}", dimensions, sc, dataId) for n in range(3)]
1682 self.comp_files = []
1683 self.comp_refs = []
1684 for n, ref in enumerate(self.composite_refs):
1685 component_refs = []
1686 component_files = []
1687 for component in sc.components:
1688 component_ref = ref.makeComponentRef(component)
1689 file = root_uri.join(f"composite_file-{n}-{component}.txt")
1690 component_refs.append(component_ref)
1691 component_files.append(file)
1692 file.write(b"9876543210")
1694 self.comp_files.append(component_files)
1695 self.comp_refs.append(component_refs)
1697 def tearDown(self) -> None:
1698 if self.root is not None and os.path.exists(self.root):
1699 shutil.rmtree(self.root, ignore_errors=True)
1701 def _make_cache_manager(self, config_str: str) -> DatastoreCacheManager:
1702 config = Config.fromYaml(config_str)
1703 return DatastoreCacheManager(DatastoreCacheManagerConfig(config), universe=self.universe)
1705 def testNoCacheDir(self) -> None:
1706 config_str = """
1707cached:
1708 root: null
1709 cacheable:
1710 metric0: true
1711 """
1712 cache_manager = self._make_cache_manager(config_str)
1714 # Look inside to check we don't have a cache directory
1715 self.assertIsNone(cache_manager._cache_directory)
1717 self.assertCache(cache_manager)
1719 # Test that the cache directory is marked temporary
1720 self.assertTrue(cache_manager.cache_directory.isTemporary)
1722 def testNoCacheDirReversed(self) -> None:
1723 """Use default caching status and metric1 to false"""
1724 config_str = """
1725cached:
1726 root: null
1727 default: true
1728 cacheable:
1729 metric1: false
1730 """
1731 cache_manager = self._make_cache_manager(config_str)
1733 self.assertCache(cache_manager)
1735 def testEnvvarCacheDir(self) -> None:
1736 config_str = f"""
1737cached:
1738 root: '{self.root}'
1739 cacheable:
1740 metric0: true
1741 """
1743 root = ResourcePath(self.root, forceDirectory=True)
1744 env_dir = root.join("somewhere", forceDirectory=True)
1745 elsewhere = root.join("elsewhere", forceDirectory=True)
1747 # Environment variable should override the config value.
1748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath}):
1749 cache_manager = self._make_cache_manager(config_str)
1750 self.assertEqual(cache_manager.cache_directory, env_dir)
1752 # This environment variable should not override the config value.
1753 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}):
1754 cache_manager = self._make_cache_manager(config_str)
1755 self.assertEqual(cache_manager.cache_directory, root)
1757 # No default setting.
1758 config_str = """
1759cached:
1760 root: null
1761 default: true
1762 cacheable:
1763 metric1: false
1764 """
1765 cache_manager = self._make_cache_manager(config_str)
1767 # This environment variable should override the config value.
1768 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}):
1769 cache_manager = self._make_cache_manager(config_str)
1770 self.assertEqual(cache_manager.cache_directory, env_dir)
1772 # If both environment variables are set the main (not IF_UNSET)
1773 # variable should win.
1774 with unittest.mock.patch.dict(
1775 os.environ,
1776 {
1777 "DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath,
1778 "DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": elsewhere.ospath,
1779 },
1780 ):
1781 cache_manager = self._make_cache_manager(config_str)
1782 self.assertEqual(cache_manager.cache_directory, env_dir)
1784 # Use the API to set the environment variable, making sure that the
1785 # variable is reset on exit.
1786 with unittest.mock.patch.dict(
1787 os.environ,
1788 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
1789 ):
1790 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
1791 self.assertTrue(defined)
1792 cache_manager = self._make_cache_manager(config_str)
1793 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))
1795 # Now create the cache manager ahead of time and set the fallback
1796 # later.
1797 cache_manager = self._make_cache_manager(config_str)
1798 self.assertIsNone(cache_manager._cache_directory)
1799 with unittest.mock.patch.dict(
1800 os.environ,
1801 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""},
1802 ):
1803 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
1804 self.assertTrue(defined)
1805 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True))
1807 def testExplicitCacheDir(self) -> None:
1808 config_str = f"""
1809cached:
1810 root: '{self.root}'
1811 cacheable:
1812 metric0: true
1813 """
1814 cache_manager = self._make_cache_manager(config_str)
1816 # Look inside to check we do have a cache directory.
1817 self.assertEqual(cache_manager.cache_directory, ResourcePath(self.root, forceDirectory=True))
1819 self.assertCache(cache_manager)
1821 # Test that the cache directory is not marked temporary
1822 self.assertFalse(cache_manager.cache_directory.isTemporary)
1824 def testUnexpectedFilesInCacheDir(self) -> None:
1825 """Test for regression of a bug where extraneous files in a cache
1826 directory would cause all cache lookups to raise an exception.
1827 """
1828 config_str = f"""
1829cached:
1830 root: '{self.root}'
1831 cacheable:
1832 metric0: true
1833 """
1835 for filename in ["unexpected.txt", "unexpected", "un_expected", "un_expected.txt"]:
1836 unexpected_file = os.path.join(self.root, filename)
1837 with open(unexpected_file, "w") as fh:
1838 fh.write("test")
1840 cache_manager = self._make_cache_manager(config_str)
1841 cache_manager.scan_cache()
1842 self.assertCache(cache_manager)
1844 def assertCache(self, cache_manager: DatastoreCacheManager) -> None:
1845 self.assertTrue(cache_manager.should_be_cached(self.refs[0]))
1846 self.assertFalse(cache_manager.should_be_cached(self.refs[1]))
1848 uri = cache_manager.move_to_cache(self.files[0], self.refs[0])
1849 self.assertIsInstance(uri, ResourcePath)
1850 self.assertIsNone(cache_manager.move_to_cache(self.files[1], self.refs[1]))
1852 # Check presence in cache using ref and then using file extension.
1853 self.assertFalse(cache_manager.known_to_cache(self.refs[1]))
1854 self.assertTrue(cache_manager.known_to_cache(self.refs[0]))
1855 self.assertFalse(cache_manager.known_to_cache(self.refs[1], self.files[1].getExtension()))
1856 self.assertTrue(cache_manager.known_to_cache(self.refs[0], self.files[0].getExtension()))
1858 # Cached file should no longer exist but uncached file should be
1859 # unaffected.
1860 self.assertFalse(self.files[0].exists())
1861 self.assertTrue(self.files[1].exists())
1863 # Should find this file and it should be within the cache directory.
1864 with cache_manager.find_in_cache(self.refs[0], ".txt") as found:
1865 self.assertTrue(found.exists())
1866 self.assertIsNotNone(found.relative_to(cache_manager.cache_directory))
1868 # Should not be able to find these in cache
1869 with cache_manager.find_in_cache(self.refs[0], ".fits") as found:
1870 self.assertIsNone(found)
1871 with cache_manager.find_in_cache(self.refs[1], ".fits") as found:
1872 self.assertIsNone(found)
1874 def testNoCache(self) -> None:
1875 cache_manager = DatastoreDisabledCacheManager("", universe=self.universe)
1876 for uri, ref in zip(self.files, self.refs, strict=True):
1877 self.assertFalse(cache_manager.should_be_cached(ref))
1878 self.assertIsNone(cache_manager.move_to_cache(uri, ref))
1879 self.assertFalse(cache_manager.known_to_cache(ref))
1880 with cache_manager.find_in_cache(ref, ".txt") as found:
1881 self.assertIsNone(found, msg=f"{cache_manager}")
1883 def _expiration_config(self, mode: str, threshold: int) -> str:
1884 return f"""
1885cached:
1886 default: true
1887 expiry:
1888 mode: {mode}
1889 threshold: {threshold}
1890 cacheable:
1891 unused: true
1892 """
1894 def testCacheExpiryFiles(self) -> None:
1895 threshold = 2 # Keep at least 2 files.
1896 mode = "files"
1897 config_str = self._expiration_config(mode, threshold)
1899 cache_manager = self._make_cache_manager(config_str)
1901 # Check that an empty cache returns unknown for arbitrary ref
1902 self.assertFalse(cache_manager.known_to_cache(self.refs[0]))
1904 # Should end with datasets: 2, 3, 4
1905 self.assertExpiration(cache_manager, 5, threshold + 1)
1906 self.assertIn(f"{mode}={threshold}", str(cache_manager))
1908 # Check that we will not expire a file that is actively in use.
1909 with cache_manager.find_in_cache(self.refs[2], ".txt") as found:
1910 self.assertIsNotNone(found)
1912 # Trigger cache expiration that should remove the file
1913 # we just retrieved. Should now have: 3, 4, 5
1914 cached = cache_manager.move_to_cache(self.files[5], self.refs[5])
1915 self.assertIsNotNone(cached)
1917 # Cache should still report the standard file count.
1918 self.assertEqual(cache_manager.file_count, threshold + 1)
1920 # Add additional entry to cache.
1921 # Should now have 4, 5, 6
1922 cached = cache_manager.move_to_cache(self.files[6], self.refs[6])
1923 self.assertIsNotNone(cached)
1925 # Is the file still there?
1926 self.assertTrue(found.exists())
1928 # Can we read it?
1929 data = found.read()
1930 self.assertGreater(len(data), 0)
1932 # Outside context the file should no longer exist.
1933 self.assertFalse(found.exists())
1935 # File count should not have changed.
1936 self.assertEqual(cache_manager.file_count, threshold + 1)
1938 # Dataset 2 was in the exempt directory but because hardlinks
1939 # are used it was deleted from the main cache during cache expiry
1940 # above and so should no longer be found.
1941 with cache_manager.find_in_cache(self.refs[2], ".txt") as found:
1942 self.assertIsNone(found)
1944 # And the one stored after it is also gone.
1945 with cache_manager.find_in_cache(self.refs[3], ".txt") as found:
1946 self.assertIsNone(found)
1948 # But dataset 4 is present.
1949 with cache_manager.find_in_cache(self.refs[4], ".txt") as found:
1950 self.assertIsNotNone(found)
1952 # Adding a new dataset to the cache should now delete it.
1953 cache_manager.move_to_cache(self.files[7], self.refs[7])
1955 with cache_manager.find_in_cache(self.refs[2], ".txt") as found:
1956 self.assertIsNone(found)
1958 def testCacheExpiryDatasets(self) -> None:
1959 threshold = 2 # Keep 2 datasets.
1960 mode = "datasets"
1961 config_str = self._expiration_config(mode, threshold)
1963 cache_manager = self._make_cache_manager(config_str)
1964 self.assertExpiration(cache_manager, 5, threshold + 1)
1965 self.assertIn(f"{mode}={threshold}", str(cache_manager))
1967 def testCacheExpiryDatasetsFromDisabled(self) -> None:
1968 threshold = 2
1969 mode = "datasets"
1970 with unittest.mock.patch.dict(
1971 os.environ,
1972 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"},
1973 ):
1974 cache_manager = DatastoreCacheManager.create_disabled(universe=DimensionUniverse())
1975 self.assertExpiration(cache_manager, 5, threshold + 1)
1976 self.assertIn(f"{mode}={threshold}", str(cache_manager))
1978 def testExpirationModeOverride(self) -> None:
1979 threshold = 2 # Keep 2 datasets.
1980 mode = "datasets"
1981 config_str = self._expiration_config(mode, threshold)
1983 mode = "size"
1984 threshold = 55
1985 with unittest.mock.patch.dict(
1986 os.environ,
1987 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"},
1988 ):
1989 cache_manager = self._make_cache_manager(config_str)
1990 self.assertExpiration(cache_manager, 10, 6)
1991 self.assertIn(f"{mode}={threshold}", str(cache_manager))
1993 # Check we get a warning with unrecognized form.
1994 with unittest.mock.patch.dict(
1995 os.environ,
1996 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something"},
1997 ):
1998 with self.assertLogs(level="WARNING") as cm:
1999 self._make_cache_manager(config_str)
2000 self.assertIn("Unrecognized form (something)", cm.output[0])
2002 with unittest.mock.patch.dict(
2003 os.environ,
2004 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something=5"},
2005 ):
2006 with self.assertRaises(ValueError) as cm:
2007 self._make_cache_manager(config_str)
2008 self.assertIn("Unrecognized value", str(cm.exception))
2010 def testMissingThreshold(self) -> None:
2011 threshold = ""
2012 mode = "datasets"
2013 config_str = self._expiration_config(mode, threshold)
2015 with self.assertRaises(ValueError) as cm:
2016 self._make_cache_manager(config_str)
2017 self.assertIn("Cache expiration threshold", str(cm.exception))
2019 def testCacheExpiryDatasetsComposite(self) -> None:
2020 threshold = 2 # Keep 2 datasets.
2021 mode = "datasets"
2022 config_str = self._expiration_config(mode, threshold)
2024 cache_manager = self._make_cache_manager(config_str)
2026 n_datasets = 3
2027 for i in range(n_datasets):
2028 for component_file, component_ref in zip(self.comp_files[i], self.comp_refs[i], strict=True):
2029 cached = cache_manager.move_to_cache(component_file, component_ref)
2030 self.assertIsNotNone(cached)
2031 self.assertTrue(cache_manager.known_to_cache(component_ref))
2032 self.assertTrue(cache_manager.known_to_cache(component_ref.makeCompositeRef()))
2033 self.assertTrue(cache_manager.known_to_cache(component_ref, component_file.getExtension()))
2035 self.assertEqual(cache_manager.file_count, 6) # 2 datasets each of 3 files
2037 # Write two new non-composite and the number of files should drop.
2038 self.assertExpiration(cache_manager, 2, 5)
2040 def testCacheExpirySize(self) -> None:
2041 threshold = 55 # Each file is 10 bytes
2042 mode = "size"
2043 config_str = self._expiration_config(mode, threshold)
2045 cache_manager = self._make_cache_manager(config_str)
2046 self.assertExpiration(cache_manager, 10, 6)
2047 self.assertIn(f"{mode}={threshold}", str(cache_manager))
2049 def testDisabledCache(self) -> None:
2050 # Configure an active cache but disable via environment.
2051 threshold = 2
2052 mode = "datasets"
2053 config_str = self._expiration_config(mode, threshold)
2055 with unittest.mock.patch.dict(
2056 os.environ,
2057 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "disabled"},
2058 ):
2059 env_cache_manager = self._make_cache_manager(config_str)
2061 # Configure to be disabled
2062 threshold = 0
2063 mode = "disabled"
2064 config_str = self._expiration_config(mode, threshold)
2065 cfg_cache_manager = self._make_cache_manager(config_str)
2067 for cache_manager in (
2068 cfg_cache_manager,
2069 env_cache_manager,
2070 DatastoreCacheManager.create_disabled(universe=DimensionUniverse()),
2071 ):
2072 for uri, ref in zip(self.files, self.refs, strict=True):
2073 self.assertFalse(cache_manager.should_be_cached(ref))
2074 self.assertIsNone(cache_manager.move_to_cache(uri, ref))
2075 self.assertFalse(cache_manager.known_to_cache(ref))
2076 with cache_manager.find_in_cache(ref, ".txt") as found:
2077 self.assertIsNone(found, msg=f"{cache_manager}")
2078 self.assertIn("disabled", str(cache_manager))
2080 def assertExpiration(
2081 self, cache_manager: DatastoreCacheManager, n_datasets: int, n_retained: int
2082 ) -> None:
2083 """Insert the datasets and then check the number retained."""
2084 for i in range(n_datasets):
2085 cached = cache_manager.move_to_cache(self.files[i], self.refs[i])
2086 self.assertIsNotNone(cached)
2088 self.assertEqual(cache_manager.file_count, n_retained)
2090 # The oldest file should not be in the cache any more.
2091 for i in range(n_datasets):
2092 with cache_manager.find_in_cache(self.refs[i], ".txt") as found:
2093 if i >= n_datasets - n_retained:
2094 self.assertIsInstance(found, ResourcePath)
2095 else:
2096 self.assertIsNone(found)
2098 def testCacheExpiryAge(self) -> None:
2099 threshold = 1 # Expire older than 2 seconds
2100 mode = "age"
2101 config_str = self._expiration_config(mode, threshold)
2103 cache_manager = self._make_cache_manager(config_str)
2104 self.assertIn(f"{mode}={threshold}", str(cache_manager))
2106 # Insert 3 files, then sleep, then insert more.
2107 for i in range(2):
2108 cached = cache_manager.move_to_cache(self.files[i], self.refs[i])
2109 self.assertIsNotNone(cached)
2110 time.sleep(2.0)
2111 for j in range(4):
2112 i = 2 + j # Continue the counting
2113 cached = cache_manager.move_to_cache(self.files[i], self.refs[i])
2114 self.assertIsNotNone(cached)
2116 # Only the files written after the sleep should exist.
2117 self.assertEqual(cache_manager.file_count, 4)
2118 with cache_manager.find_in_cache(self.refs[1], ".txt") as found:
2119 self.assertIsNone(found)
2120 with cache_manager.find_in_cache(self.refs[2], ".txt") as found:
2121 self.assertIsInstance(found, ResourcePath)
2124class NullDatastoreTestCase(DatasetTestHelper, unittest.TestCase):
2125 """Test the null datastore."""
2127 storageClassFactory = StorageClassFactory()
2129 def test_basics(self) -> None:
2130 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
2131 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {})
2133 null = NullDatastore(None, None)
2135 self.assertFalse(null.exists(ref))
2136 self.assertFalse(null.knows(ref))
2137 knows = null.knows_these([ref])
2138 self.assertFalse(knows[ref])
2139 null.validateConfiguration(ref)
2141 with self.assertRaises(FileNotFoundError):
2142 null.get(ref)
2143 with self.assertRaises(NotImplementedError):
2144 null.put("", ref)
2145 with self.assertRaises(FileNotFoundError):
2146 null.getURI(ref)
2147 with self.assertRaises(FileNotFoundError):
2148 null.getURIs(ref)
2149 with self.assertRaises(FileNotFoundError):
2150 null.getManyURIs([ref])
2151 with self.assertRaises(NotImplementedError):
2152 null.getLookupKeys()
2153 with self.assertRaises(NotImplementedError):
2154 null.import_records({})
2155 with self.assertRaises(NotImplementedError):
2156 null.export_records([])
2157 with self.assertRaises(NotImplementedError):
2158 null.export_predicted_records([])
2159 with self.assertRaises(NotImplementedError):
2160 null.export([ref])
2161 with self.assertRaises(NotImplementedError):
2162 null.transfer(null, ref)
2163 with self.assertRaises(NotImplementedError):
2164 null.emptyTrash()
2165 with self.assertRaises(NotImplementedError):
2166 null.trash(ref)
2167 with self.assertRaises(NotImplementedError):
2168 null.forget([ref])
2169 with self.assertRaises(NotImplementedError):
2170 null.remove(ref)
2171 with self.assertRaises(NotImplementedError):
2172 null.retrieveArtifacts([ref], ResourcePath("."))
2173 with self.assertRaises(NotImplementedError):
2174 null.transfer_from(null, [ref])
2175 with self.assertRaises(NotImplementedError):
2176 null.ingest()
2179class DatasetRefURIsTestCase(unittest.TestCase):
2180 """Tests for DatasetRefURIs."""
2182 def testSequenceAccess(self) -> None:
2183 """Verify that DatasetRefURIs can be treated like a two-item tuple."""
2184 uris = DatasetRefURIs()
2186 self.assertEqual(len(uris), 2)
2187 self.assertEqual(uris[0], None)
2188 self.assertEqual(uris[1], {})
2190 primaryURI = ResourcePath("1/2/3")
2191 componentURI = ResourcePath("a/b/c")
2193 # affirm that DatasetRefURIs does not support MutableSequence functions
2194 with self.assertRaises(TypeError):
2195 uris[0] = primaryURI
2196 with self.assertRaises(TypeError):
2197 uris[1] = {"foo": componentURI}
2199 # but DatasetRefURIs can be set by property name:
2200 uris.primaryURI = primaryURI
2201 uris.componentURIs = {"foo": componentURI}
2202 self.assertEqual(uris.primaryURI, primaryURI)
2203 self.assertEqual(uris[0], primaryURI)
2205 primary, components = uris
2206 self.assertEqual(primary, primaryURI)
2207 self.assertEqual(components, {"foo": componentURI})
2209 def testRepr(self) -> None:
2210 """Verify __repr__ output."""
2211 uris = DatasetRefURIs(ResourcePath("/1/2/3"), {"comp": ResourcePath("/a/b/c")})
2212 self.assertEqual(
2213 repr(uris),
2214 'DatasetRefURIs(ResourcePath("file:///1/2/3"), {\'comp\': ResourcePath("file:///a/b/c")})',
2215 )
2218class StoredFileInfoTestCase(DatasetTestHelper, unittest.TestCase):
2219 """Test the StoredFileInfo class."""
2221 storageClassFactory = StorageClassFactory()
2223 def test_StoredFileInfo(self) -> None:
2224 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
2225 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {})
2227 record = dict(
2228 storage_class="StructuredDataDict",
2229 formatter="lsst.daf.butler.Formatter",
2230 path="a/b/c.txt",
2231 component="component",
2232 checksum=None,
2233 file_size=5,
2234 )
2235 info = StoredFileInfo.from_record(record)
2237 self.assertEqual(info.to_record(), record)
2239 ref2 = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {})
2240 rebased = info.rebase(ref2)
2241 self.assertEqual(rebased.rebase(ref), info)
2243 with self.assertRaises(TypeError):
2244 rebased.update(formatter=42)
2246 with self.assertRaises(ValueError):
2247 rebased.update(something=42, new="42")
2249 # Check that pickle works on StoredFileInfo.
2250 pickled_info = pickle.dumps(info)
2251 unpickled_info = pickle.loads(pickled_info)
2252 self.assertEqual(unpickled_info, info)
2254 def test_make_datastore_path_relative(self):
2255 self.assertEqual(make_datastore_path_relative("a/relative/path"), "a/relative/path")
2256 self.assertEqual(make_datastore_path_relative("path/with#fragment"), "path/with#fragment")
2257 self.assertEqual(make_datastore_path_relative("http://server.com/some/path"), "some/path")
2258 self.assertEqual(make_datastore_path_relative("http://server.com/some/path#frag"), "some/path#frag")
2260 def test_datastore_record_data_json_types(self):
2261 """Test that we don't round-trip checksums to UUIDs when deserializing
2262 datastore record data.
2263 """
2264 test_json = """
2265 {
2266 "dataset_ids": [
2267 "74478304-abf1-4a9c-9eb2-926090a84446"
2268 ],
2269 "records": {
2270 "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo": {
2271 "74478304abf14a9c9eb2926090a84446": {
2272 "file_datastore_records": [
2273 {
2274 "formatter": "lsst.daf.butler.formatters.yaml.YamlFormatter",
2275 "path": "gain_factors/base-2025-158/gain_factors_spx_base-2025-158.yaml",
2276 "storage_class": "GainFactors",
2277 "component": "__NULL_STRING__",
2278 "checksum": "cab515f6-ab67-0484-393f-aaa525dd526f",
2279 "file_size": 5412
2280 }
2281 ]
2282 }
2283 }
2284 }
2285 }
2286 """
2287 id_str = "74478304abf14a9c9eb2926090a84446"
2288 s = SerializedDatastoreRecordData.model_validate_json(test_json)
2289 self.assertIsInstance(
2290 s.records[get_full_type_name(StoredFileInfo)][id_str]["file_datastore_records"][0]["checksum"],
2291 str,
2292 )
2293 id = uuid.UUID(id_str)
2294 d = DatastoreRecordData.from_simple(s)
2295 self.assertIsInstance(d.records[id]["file_datastore_records"][0], StoredFileInfo)
2296 self.assertIsInstance(d.records[id]["file_datastore_records"][0].checksum, str)
2299@contextlib.contextmanager
2300def _temp_yaml_file(data: Any) -> Iterator[str]:
2301 fh = tempfile.NamedTemporaryFile(mode="w", suffix=".yaml")
2302 try:
2303 yaml.dump(data, stream=fh)
2304 fh.flush()
2305 yield fh.name
2306 finally:
2307 # Some tests delete the file
2308 with contextlib.suppress(FileNotFoundError):
2309 fh.close()
2312if __name__ == "__main__":
2313 unittest.main()