Coverage for tests / test_butler.py: 13%
1898 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Tests for Butler."""
30from __future__ import annotations
32import json
33import logging
34import os
35import pathlib
36import pickle
37import posixpath
38import random
39import re
40import shutil
41import string
42import tempfile
43import unittest
44import unittest.mock
45import uuid
46import warnings
47import weakref
48from collections.abc import Callable, Mapping
49from typing import TYPE_CHECKING, Any, cast
51try:
52 import boto3
53 import botocore
55 from lsst.resources.s3utils import clean_test_environment_for_s3
57 try:
58 from moto import mock_aws # v5
59 except ImportError:
60 from moto import mock_s3 as mock_aws
61except ImportError:
62 boto3 = None
64 def mock_aws(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def]
65 """No-op decorator in case moto mock_aws can not be imported."""
66 return None
69import astropy.time
70from sqlalchemy.exc import IntegrityError
72from lsst.daf.butler import (
73 Butler,
74 ButlerConfig,
75 ButlerMetrics,
76 ButlerRepoIndex,
77 CollectionCycleError,
78 CollectionType,
79 Config,
80 DataCoordinate,
81 DatasetExistence,
82 DatasetNotFoundError,
83 DatasetProvenance,
84 DatasetRef,
85 DatasetType,
86 DimensionRecord,
87 FileDataset,
88 NoDefaultCollectionError,
89 StorageClassFactory,
90 ValidationError,
91 script,
92)
93from lsst.daf.butler._rubin.file_datasets import transfer_datasets_to_datastore
94from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest
95from lsst.daf.butler._rubin.transfer_datasets_in_place import transfer_datasets_in_place
96from lsst.daf.butler.datastore import NullDatastore
97from lsst.daf.butler.datastore.file_templates import FileTemplate, FileTemplateValidationError
98from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex
99from lsst.daf.butler.datastores.fileDatastore import FileDatastore
100from lsst.daf.butler.direct_butler import DirectButler
101from lsst.daf.butler.registry import (
102 CollectionError,
103 CollectionTypeError,
104 ConflictingDefinitionError,
105 DataIdValueError,
106 DatasetTypeExpressionError,
107 MissingCollectionError,
108 OrphanedRecordError,
109)
110from lsst.daf.butler.registry.sql_registry import SqlRegistry
111from lsst.daf.butler.repo_relocation import BUTLER_ROOT_TAG
112from lsst.daf.butler.tests import MetricsExample, MetricsExampleModel, MultiDetectorFormatter
113from lsst.daf.butler.tests.postgresql import TemporaryPostgresInstance, setup_postgres_test_db
114from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available
115from lsst.daf.butler.tests.utils import (
116 MetricTestRepo,
117 TestCaseMixin,
118 create_populated_sqlite_registry,
119 makeTestTempDir,
120 removeTestTempDir,
121 safeTestTempDir,
122)
123from lsst.resources import ResourcePath
124from lsst.resources.http import HttpResourcePath
125from lsst.utils import doImportType
126from lsst.utils.introspection import get_full_type_name
128if butler_server_is_available:
129 from lsst.daf.butler.tests.server import create_test_server
132if TYPE_CHECKING:
133 import types
135 from lsst.daf.butler import DimensionGroup, Registry, StorageClass
137TESTDIR = os.path.abspath(os.path.dirname(__file__))
140def clean_environment() -> None:
141 """Remove external environment variables that affect the tests."""
142 for k in ("DAF_BUTLER_REPOSITORY_INDEX",):
143 os.environ.pop(k, None)
146def makeExampleMetrics() -> MetricsExample:
147 """Return example dataset suitable for tests."""
148 return MetricsExample(
149 {"AM1": 5.2, "AM2": 30.6},
150 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
151 [563, 234, 456.7, 752, 8, 9, 27],
152 )
155class TransactionTestError(Exception):
156 """Specific error for testing transactions, to prevent misdiagnosing
157 that might otherwise occur when a standard exception is used.
158 """
160 pass
163class ButlerConfigTests(unittest.TestCase):
164 """Simple tests for ButlerConfig that are not tested in any other test
165 cases.
166 """
168 def testSearchPath(self) -> None:
169 configFile = os.path.join(TESTDIR, "config", "basic", "butler.yaml")
170 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm:
171 config1 = ButlerConfig(configFile)
172 self.assertNotIn("testConfigs", "\n".join(cm.output))
174 overrideDirectory = os.path.join(TESTDIR, "config", "testConfigs")
175 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm:
176 config2 = ButlerConfig(configFile, searchPaths=[overrideDirectory])
177 self.assertIn("testConfigs", "\n".join(cm.output))
179 key = ("datastore", "records", "table")
180 self.assertNotEqual(config1[key], config2[key])
181 self.assertEqual(config2[key], "override_record")
184class ButlerPutGetTests(TestCaseMixin):
185 """Helper method for running a suite of put/get tests from different
186 butler configurations.
187 """
189 root: str
190 default_run = "ingésτ😺"
191 storageClassFactory: StorageClassFactory
192 configFile: str | None
193 tmpConfigFile: str
195 @staticmethod
196 def addDatasetType(
197 datasetTypeName: str, dimensions: DimensionGroup, storageClass: StorageClass | str, registry: Registry
198 ) -> DatasetType:
199 """Create a DatasetType and register it"""
200 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
201 registry.registerDatasetType(datasetType)
202 return datasetType
204 @classmethod
205 def setUpClass(cls) -> None:
206 cls.storageClassFactory = StorageClassFactory()
207 if cls.configFile is not None:
208 cls.storageClassFactory.addFromConfig(cls.configFile)
210 def assertGetComponents(
211 self,
212 butler: Butler,
213 datasetRef: DatasetRef,
214 components: tuple[str, ...],
215 reference: Any,
216 collections: Any = None,
217 ) -> None:
218 datasetType = datasetRef.datasetType
219 dataId = datasetRef.dataId
220 deferred = butler.getDeferred(datasetRef)
222 for component in components:
223 compTypeName = datasetType.componentTypeName(component)
224 result = butler.get(compTypeName, dataId, collections=collections)
225 self.assertEqual(result, getattr(reference, component))
226 result_deferred = deferred.get(component=component)
227 self.assertEqual(result_deferred, result)
229 def tearDown(self) -> None:
230 if self.root is not None:
231 removeTestTempDir(self.root)
233 def create_empty_butler(
234 self,
235 run: str | None = None,
236 writeable: bool | None = None,
237 metrics: ButlerMetrics | None = None,
238 cleanup: bool = True,
239 ):
240 """Create a Butler for the test repository, without inserting test
241 data.
242 """
243 butler = Butler.from_config(self.tmpConfigFile, run=run, writeable=writeable, metrics=metrics)
244 if cleanup:
245 self.enterContext(butler)
246 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration"
247 return butler
249 def create_butler(
250 self,
251 run: str,
252 storageClass: StorageClass | str,
253 datasetTypeName: str,
254 metrics: ButlerMetrics | None = None,
255 ) -> tuple[Butler, DatasetType]:
256 """Create a Butler for the test repository and insert some test data
257 into it.
258 """
259 butler = self.create_empty_butler(run=run, metrics=metrics)
261 collections = set(butler.collections.query("*"))
262 self.assertEqual(collections, {run})
263 # Create and register a DatasetType
264 dimensions = butler.dimensions.conform(["instrument", "visit"])
266 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
268 # Add needed Dimensions
269 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
270 butler.registry.insertDimensionData(
271 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
272 )
273 butler.registry.insertDimensionData(
274 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"}
275 )
276 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101})
277 visit_start = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai")
278 visit_end = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai")
279 butler.registry.insertDimensionData(
280 "visit",
281 {
282 "instrument": "DummyCamComp",
283 "id": 423,
284 "name": "fourtwentythree",
285 "physical_filter": "d-r",
286 "datetime_begin": visit_start,
287 "datetime_end": visit_end,
288 "day_obs": 20200101,
289 },
290 )
292 # Add more visits for some later tests
293 for visit_id in (424, 425):
294 butler.registry.insertDimensionData(
295 "visit",
296 {
297 "instrument": "DummyCamComp",
298 "id": visit_id,
299 "name": f"fourtwentyfour_{visit_id}",
300 "physical_filter": "d-r",
301 "day_obs": 20200101,
302 },
303 )
304 return butler, datasetType
306 def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> Butler:
307 # New datasets will be added to run and tag, but we will only look in
308 # tag when looking up datasets.
309 run = self.default_run
310 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName)
311 assert butler.run is not None
313 # Create and store a dataset
314 metric = makeExampleMetrics()
315 dataId = butler.registry.expandDataId({"instrument": "DummyCamComp", "visit": 423})
317 # Dataset should not exist if we haven't added it
318 with self.assertRaises(DatasetNotFoundError):
319 butler.get(datasetTypeName, dataId)
321 # Put and remove the dataset once as a DatasetRef, once as a dataId,
322 # and once with a DatasetType
324 # Keep track of any collections we add and do not clean up
325 expected_collections = {run}
327 counter = 0
328 ref = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run="put_run_1")
329 args = tuple[DatasetRef] | tuple[str | DatasetType, DataCoordinate]
330 for args in ((ref,), (datasetTypeName, dataId), (datasetType, dataId)):
331 # Since we are using subTest we can get cascading failures
332 # here with the first attempt failing and the others failing
333 # immediately because the dataset already exists. Work around
334 # this by using a distinct run collection each time
335 counter += 1
336 this_run = f"put_run_{counter}"
337 butler.collections.register(this_run)
338 expected_collections.update({this_run})
340 with self.subTest(args=repr(args)):
341 kwargs: dict[str, Any] = {}
342 if not isinstance(args[0], DatasetRef): # type: ignore
343 kwargs["run"] = this_run
344 ref = butler.put(metric, *args, **kwargs)
345 self.assertIsInstance(ref, DatasetRef)
347 # Test get of a ref.
348 metricOut = butler.get(ref)
349 self.assertEqual(metric, metricOut)
350 # Test get
351 metricOut = butler.get(ref.datasetType.name, dataId, collections=this_run)
352 self.assertEqual(metric, metricOut)
353 # Test get with a datasetRef
354 metricOut = butler.get(ref)
355 self.assertEqual(metric, metricOut)
356 # Test getDeferred with dataId
357 metricOut = butler.getDeferred(ref.datasetType.name, dataId, collections=this_run).get()
358 self.assertEqual(metric, metricOut)
359 # Test getDeferred with a ref
360 metricOut = butler.getDeferred(ref).get()
361 self.assertEqual(metric, metricOut)
363 # Check we can get components
364 if storageClass.isComposite():
365 self.assertGetComponents(
366 butler, ref, ("summary", "data", "output"), metric, collections=this_run
367 )
369 primary_uri, secondary_uris = butler.getURIs(ref)
370 n_uris = len(secondary_uris)
371 if primary_uri:
372 n_uris += 1
374 # Can the artifacts themselves be retrieved?
375 if not butler._datastore.isEphemeral:
376 # Create a temporary directory to hold the retrieved
377 # artifacts.
378 with tempfile.TemporaryDirectory(
379 prefix="butler-artifacts-", ignore_cleanup_errors=True
380 ) as artifact_root:
381 root_uri = ResourcePath(artifact_root, forceDirectory=True)
383 for preserve_path in (True, False):
384 destination = root_uri.join(f"{preserve_path}_{counter}/")
385 log = logging.getLogger("lsst.x")
386 log.debug("Using destination %s for args %s", destination, args)
387 # Use copy so that we can test that overwrite
388 # protection works (using "auto" for File URIs
389 # would use hard links and subsequent transfer
390 # would work because it knows they are the same
391 # file).
392 transferred = butler.retrieveArtifacts(
393 [ref], destination, preserve_path=preserve_path, transfer="copy"
394 )
395 self.assertGreater(len(transferred), 0)
396 artifacts = list(ResourcePath.findFileResources([destination]))
397 # Filter out the index file.
398 artifacts = [a for a in artifacts if a.basename() != ZipIndex.index_name]
399 self.assertEqual(set(transferred), set(artifacts))
401 for artifact in transferred:
402 path_in_destination = artifact.relative_to(destination)
403 self.assertIsNotNone(path_in_destination)
404 assert path_in_destination is not None
406 # When path is not preserved there should not
407 # be any path separators.
408 num_seps = path_in_destination.count("/")
409 if preserve_path:
410 self.assertGreater(num_seps, 0)
411 else:
412 self.assertEqual(num_seps, 0)
414 self.assertEqual(
415 len(artifacts),
416 n_uris,
417 "Comparing expected artifacts vs actual:"
418 f" {artifacts} vs {primary_uri} and {secondary_uris}",
419 )
421 if preserve_path:
422 # No need to run these twice
423 with self.assertRaises(ValueError):
424 butler.retrieveArtifacts([ref], destination, transfer="move")
426 with self.assertRaisesRegex(
427 ValueError, "^Destination location must refer to a directory"
428 ):
429 butler.retrieveArtifacts(
430 [ref], ResourcePath("/some/file.txt", forceDirectory=False)
431 )
433 with self.assertRaises(FileExistsError):
434 butler.retrieveArtifacts([ref], destination)
436 transferred_again = butler.retrieveArtifacts(
437 [ref], destination, preserve_path=preserve_path, overwrite=True
438 )
439 self.assertEqual(set(transferred_again), set(transferred))
441 # Now remove the dataset completely.
442 butler.pruneDatasets([ref], purge=True, unstore=True)
443 # Lookup with original args should still fail.
444 kwargs = {"collections": this_run}
445 if isinstance(args[0], DatasetRef):
446 kwargs = {} # Prevent warning from being issued.
447 self.assertFalse(butler.exists(*args, **kwargs))
448 # get() should still fail.
449 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)):
450 butler.get(ref)
451 # Registry shouldn't be able to find it by dataset_id anymore.
452 self.assertIsNone(butler.get_dataset(ref.id))
454 # Do explicit registry removal since we know they are
455 # empty
456 butler.collections.x_remove(this_run)
457 expected_collections.remove(this_run)
459 # Create DatasetRef for put using default run.
460 refIn = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run=butler.run)
462 # Check that getDeferred fails with standalone ref.
463 with self.assertRaises(LookupError):
464 butler.getDeferred(refIn)
466 # Put the dataset again, since the last thing we did was remove it
467 # and we want to use the default collection.
468 ref = butler.put(metric, refIn)
470 # Get with parameters
471 stop = 4
472 sliced = butler.get(ref, parameters={"slice": slice(stop)})
473 self.assertNotEqual(metric, sliced)
474 self.assertEqual(metric.summary, sliced.summary)
475 self.assertEqual(metric.output, sliced.output)
476 assert metric.data is not None # for mypy
477 self.assertEqual(metric.data[:stop], sliced.data)
478 # getDeferred with parameters
479 sliced = butler.getDeferred(ref, parameters={"slice": slice(stop)}).get()
480 self.assertNotEqual(metric, sliced)
481 self.assertEqual(metric.summary, sliced.summary)
482 self.assertEqual(metric.output, sliced.output)
483 self.assertEqual(metric.data[:stop], sliced.data)
484 # getDeferred with deferred parameters
485 sliced = butler.getDeferred(ref).get(parameters={"slice": slice(stop)})
486 self.assertNotEqual(metric, sliced)
487 self.assertEqual(metric.summary, sliced.summary)
488 self.assertEqual(metric.output, sliced.output)
489 self.assertEqual(metric.data[:stop], sliced.data)
491 if storageClass.isComposite():
492 # Check that components can be retrieved
493 metricOut = butler.get(ref.datasetType.name, dataId)
494 compNameS = ref.datasetType.componentTypeName("summary")
495 compNameD = ref.datasetType.componentTypeName("data")
496 summary = butler.get(compNameS, dataId)
497 self.assertEqual(summary, metric.summary)
498 data = butler.get(compNameD, dataId)
499 self.assertEqual(data, metric.data)
501 if "counter" in storageClass.derivedComponents:
502 count = butler.get(ref.datasetType.componentTypeName("counter"), dataId)
503 self.assertEqual(count, len(data))
505 count = butler.get(
506 ref.datasetType.componentTypeName("counter"), dataId, parameters={"slice": slice(stop)}
507 )
508 self.assertEqual(count, stop)
510 compRef = butler.find_dataset(compNameS, dataId, collections=butler.collections.defaults)
511 assert compRef is not None
512 summary = butler.get(compRef)
513 self.assertEqual(summary, metric.summary)
515 # Create a Dataset type that has the same name but is inconsistent.
516 inconsistentDatasetType = DatasetType(
517 datasetTypeName, datasetType.dimensions, self.storageClassFactory.getStorageClass("Config")
518 )
520 # Getting with a dataset type that does not match registry fails
521 with self.assertRaisesRegex(
522 ValueError,
523 "(Supplied dataset type .* inconsistent with registry)"
524 "|(The new storage class .* is not compatible with the existing storage class)",
525 ):
526 butler.get(inconsistentDatasetType, dataId)
528 # Combining a DatasetRef with a dataId should fail
529 with self.assertRaisesRegex(ValueError, "DatasetRef given, cannot use dataId as well"):
530 butler.get(ref, dataId)
531 # Getting with an explicit ref should fail if the id doesn't match.
532 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)):
533 butler.get(DatasetRef(ref.datasetType, ref.dataId, id=uuid.UUID(int=101), run=butler.run))
535 # Getting a dataset with unknown parameters should fail
536 with self.assertRaisesRegex(KeyError, "Parameter 'unsupported' not understood"):
537 butler.get(ref, parameters={"unsupported": True})
539 # Check we have a collection
540 collections = set(butler.collections.query("*"))
541 self.assertEqual(collections, expected_collections)
543 # Clean up to check that we can remove something that may have
544 # already had a component removed
545 butler.pruneDatasets([ref], unstore=True, purge=True)
547 # Add the same ref again, so we can check that duplicate put fails.
548 ref = butler.put(metric, datasetType, dataId)
550 # Repeat put will fail.
551 with self.assertRaisesRegex(
552 ConflictingDefinitionError, "A database constraint failure was triggered"
553 ):
554 butler.put(metric, datasetType, dataId)
556 # Remove the datastore entry.
557 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False)
559 # Put will still fail
560 with self.assertRaisesRegex(
561 ConflictingDefinitionError, "A database constraint failure was triggered"
562 ):
563 butler.put(metric, datasetType, dataId)
565 # Repeat the same sequence with resolved ref.
566 butler.pruneDatasets([ref], unstore=True, purge=True)
567 ref = butler.put(metric, refIn)
569 # Repeat put will fail.
570 with self.assertRaisesRegex(ConflictingDefinitionError, "Datastore already contains dataset"):
571 butler.put(metric, refIn)
573 # Remove the datastore entry.
574 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False)
576 # In case of resolved ref this write will succeed.
577 ref = butler.put(metric, refIn)
579 # Leave the dataset in place since some downstream tests require
580 # something to be present
582 return butler
584 def testDeferredCollectionPassing(self) -> None:
585 # Construct a butler with no run or collection, but make it writeable.
586 butler = self.create_empty_butler(writeable=True)
587 # Create and register a DatasetType
588 dimensions = butler.dimensions.conform(["instrument", "visit"])
589 datasetType = self.addDatasetType(
590 "example", dimensions, self.storageClassFactory.getStorageClass("StructuredData"), butler.registry
591 )
592 # Add needed Dimensions
593 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
594 butler.registry.insertDimensionData(
595 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
596 )
597 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
598 butler.registry.insertDimensionData(
599 "visit",
600 {
601 "instrument": "DummyCamComp",
602 "id": 423,
603 "name": "fourtwentythree",
604 "physical_filter": "d-r",
605 "day_obs": 20250101,
606 },
607 )
608 dataId = {"instrument": "DummyCamComp", "visit": 423}
609 # Create dataset.
610 metric = makeExampleMetrics()
611 # Register a new run and put dataset.
612 run = "deferred"
613 self.assertTrue(butler.collections.register(run))
614 # Second time it will be allowed but indicate no-op
615 self.assertFalse(butler.collections.register(run))
616 ref = butler.put(metric, datasetType, dataId, run=run)
617 # Putting with no run should fail with TypeError.
618 with self.assertRaises(CollectionError):
619 butler.put(metric, datasetType, dataId)
620 # Dataset should exist.
621 self.assertTrue(butler.exists(datasetType, dataId, collections=[run]))
622 # We should be able to get the dataset back, but with and without
623 # a deferred dataset handle.
624 self.assertEqual(metric, butler.get(datasetType, dataId, collections=[run]))
625 self.assertEqual(metric, butler.getDeferred(datasetType, dataId, collections=[run]).get())
626 # Trying to find the dataset without any collection is an error.
627 with self.assertRaises(NoDefaultCollectionError):
628 butler.exists(datasetType, dataId)
629 with self.assertRaises(CollectionError):
630 butler.get(datasetType, dataId)
631 # Associate the dataset with a different collection.
632 butler.collections.register("tagged", type=CollectionType.TAGGED)
633 butler.registry.associate("tagged", [ref])
634 # Deleting the dataset from the new collection should make it findable
635 # in the original collection.
636 butler.pruneDatasets([ref], tags=["tagged"])
637 self.assertTrue(butler.exists(datasetType, dataId, collections=[run]))
640class ButlerTests(ButlerPutGetTests):
641 """Tests for Butler."""
643 useTempRoot = True
644 validationCanFail: bool
645 fullConfigKey: str | None
646 registryStr: str | None
647 datastoreName: list[str] | None
648 datastoreStr: list[str]
649 predictionSupported = True
650 """Does getURIs support 'prediction mode'?"""
652 def setUp(self) -> None:
653 """Create a new butler root for each test."""
654 self.root = makeTestTempDir(TESTDIR)
655 Butler.makeRepo(self.root, config=Config(self.configFile))
656 self.tmpConfigFile = os.path.join(self.root, "butler.yaml")
658 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool:
659 """Return True if two URIs refer to the same resource.
661 Subclasses may override to handle unique requirements.
662 """
663 return uri1 == uri2
665 def testConstructor(self) -> None:
666 """Independent test of constructor."""
667 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
668 self.enterContext(butler)
669 self.assertIsInstance(butler, Butler)
671 # Check that butler.yaml is added automatically.
672 if self.tmpConfigFile.endswith(end := "/butler.yaml"):
673 config_dir = self.tmpConfigFile[: -len(end)]
674 butler = Butler.from_config(config_dir, run=self.default_run)
675 self.enterContext(butler)
676 self.assertIsInstance(butler, Butler)
678 # Even with a ResourcePath.
679 butler = Butler.from_config(ResourcePath(config_dir, forceDirectory=True), run=self.default_run)
680 self.enterContext(butler)
681 self.assertIsInstance(butler, Butler)
683 collections = set(butler.collections.query("*"))
684 self.assertEqual(collections, {self.default_run})
686 # Check that some special characters can be included in run name.
687 special_run = "u@b.c-A"
688 butler_special = Butler.from_config(butler=butler, run=special_run)
689 self.enterContext(butler_special)
690 collections = set(butler_special.registry.queryCollections("*@*"))
691 self.assertEqual(collections, {special_run})
693 butler2 = Butler.from_config(butler=butler, collections=["other"])
694 self.enterContext(butler2)
695 self.assertEqual(butler2.collections.defaults, ("other",))
696 self.assertIsNone(butler2.run)
697 self.assertEqual(type(butler._datastore), type(butler2._datastore))
698 self.assertEqual(butler._datastore.config, butler2._datastore.config)
700 # Test that we can use an environment variable to find this
701 # repository.
702 butler_index = Config()
703 butler_index["label"] = self.tmpConfigFile
704 for suffix in (".yaml", ".json"):
705 # Ensure that the content differs so that we know that
706 # we aren't reusing the cache.
707 bad_label = f"file://bucket/not_real{suffix}"
708 butler_index["bad_label"] = bad_label
709 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
710 butler_index.dumpToUri(temp_file)
711 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
712 self.assertEqual(Butler.get_known_repos(), {"label", "bad_label"})
713 uri = Butler.get_repo_uri("bad_label")
714 self.assertEqual(uri, ResourcePath(bad_label))
715 uri = Butler.get_repo_uri("label")
716 butler = Butler.from_config(uri, writeable=False)
717 self.assertIsInstance(butler, Butler)
718 butler.close()
719 butler = Butler.from_config("label", writeable=False)
720 self.assertIsInstance(butler, Butler)
721 butler.close()
722 with self.assertRaisesRegex(FileNotFoundError, "aliases:.*bad_label"):
723 Butler.from_config("not_there", writeable=False)
724 with self.assertRaisesRegex(FileNotFoundError, "resolved from alias 'bad_label'"):
725 Butler.from_config("bad_label")
726 with self.assertRaises(FileNotFoundError):
727 # Should ignore aliases.
728 Butler.from_config(ResourcePath("label", forceAbsolute=False))
729 with self.assertRaises(KeyError) as cm:
730 Butler.get_repo_uri("missing")
731 self.assertEqual(
732 Butler.get_repo_uri("missing", True), ResourcePath("missing", forceAbsolute=False)
733 )
734 self.assertIn("not known to", str(cm.exception))
735 # Should report no failure.
736 self.assertEqual(ButlerRepoIndex.get_failure_reason(), "")
737 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
738 # Now with empty configuration.
739 butler_index = Config()
740 butler_index.dumpToUri(temp_file)
741 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
742 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases)"):
743 Butler.from_config("label")
744 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
745 # Now with bad contents.
746 with open(temp_file.ospath, "w") as fh:
747 print("'", file=fh)
748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
749 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases:.*could not be read)"):
750 Butler.from_config("label")
751 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": "file://not_found/x.yaml"}):
752 with self.assertRaises(FileNotFoundError):
753 Butler.get_repo_uri("label")
754 self.assertEqual(Butler.get_known_repos(), set())
756 with self.assertRaisesRegex(FileNotFoundError, "index file not found"):
757 Butler.from_config("label")
759 # Check that we can create Butler when the alias file is not found.
760 butler = Butler.from_config(self.tmpConfigFile, writeable=False)
761 self.enterContext(butler)
762 self.assertIsInstance(butler, Butler)
763 with self.assertRaises(RuntimeError) as cm:
764 # No environment variable set.
765 Butler.get_repo_uri("label")
766 self.assertEqual(Butler.get_repo_uri("label", True), ResourcePath("label", forceAbsolute=False))
767 self.assertIn("No repository index defined", str(cm.exception))
768 with self.assertRaisesRegex(FileNotFoundError, "no known aliases.*No repository index"):
769 # No aliases registered.
770 Butler.from_config("not_there")
771 self.assertEqual(Butler.get_known_repos(), set())
773 def testClose(self):
774 butler = self.create_empty_butler(cleanup=False)
775 is_direct_butler = isinstance(butler, DirectButler)
776 if is_direct_butler:
777 self.assertFalse(butler._closed)
779 with butler as butler_from_context_manager:
780 self.assertIs(butler, butler_from_context_manager)
781 if is_direct_butler:
782 self.assertTrue(butler._closed)
783 with self.assertRaisesRegex(RuntimeError, "has been closed"):
784 butler.get_dataset_type("raw")
786 # Close may be called multiple times.
787 butler.close()
788 if is_direct_butler:
789 self.assertTrue(butler._closed)
791 def testGarbageCollection(self):
792 """Test that Butler does not have any circular references that prevent
793 it from being garbage collected immediately when it goes out of scope.
794 """
795 butler = self.create_empty_butler(cleanup=False)
796 is_direct_butler = isinstance(butler, DirectButler)
797 butler_ref = weakref.ref(butler)
798 if is_direct_butler:
799 registry_ref = weakref.ref(butler._registry)
800 managers_ref = weakref.ref(butler._registry._managers)
801 datastore_ref = weakref.ref(butler._datastore)
802 db_ref = weakref.ref(butler._registry._db)
803 engine_ref = weakref.ref(butler._registry._db._engine)
805 with warnings.catch_warnings():
806 # Hide warnings from unclosed database handles.
807 warnings.simplefilter("ignore", ResourceWarning)
808 del butler
809 self.assertIsNone(butler_ref(), "Butler should have been garbage collected")
810 if is_direct_butler:
811 self.assertIsNone(registry_ref(), "SqlRegistry should have been garbage collected")
812 self.assertIsNone(managers_ref(), "Registry managers should have been garbage collected")
813 self.assertIsNone(datastore_ref(), "Datastore should have been garbage collected")
814 self.assertIsNone(db_ref(), "Database should have been garbage collected")
815 # SQLAlchemy has internal reference cycles, so the Engine instance
816 # is not cleaned up promptly even if we release our reference to
817 # it. Explicitly clean it up here to avoid file handles leaking.
818 if is_direct_butler:
819 engine = engine_ref()
820 if engine is not None:
821 engine.dispose()
823 def testDafButlerRepositories(self):
824 with unittest.mock.patch.dict(
825 os.environ,
826 {"DAF_BUTLER_REPOSITORIES": "label: 'https://someuri.com'\notherLabel: 'https://otheruri.com'\n"},
827 ):
828 self.assertEqual(str(Butler.get_repo_uri("label")), "https://someuri.com")
830 with unittest.mock.patch.dict(
831 os.environ,
832 {
833 "DAF_BUTLER_REPOSITORIES": "label: https://someuri.com",
834 "DAF_BUTLER_REPOSITORY_INDEX": "https://someuri.com",
835 },
836 ):
837 with self.assertRaisesRegex(RuntimeError, "Only one of the environment variables"):
838 Butler.get_repo_uri("label")
840 with unittest.mock.patch.dict(
841 os.environ,
842 {"DAF_BUTLER_REPOSITORIES": "invalid"},
843 ):
844 with self.assertRaisesRegex(ValueError, "Repository index not in expected format"):
845 Butler.get_repo_uri("label")
847 def testBasicPutGet(self) -> None:
848 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
849 self.runPutGetTest(storageClass, "test_metric")
851 def testCompositePutGetConcrete(self) -> None:
852 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadCompNoDisassembly")
853 butler = self.runPutGetTest(storageClass, "test_metric")
855 # Should *not* be disassembled
856 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run))
857 self.assertEqual(len(datasets), 1)
858 uri, components = butler.getURIs(datasets[0])
859 self.assertIsInstance(uri, ResourcePath)
860 self.assertFalse(components)
861 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}")
862 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}")
864 # Predicted dataset
865 if self.predictionSupported:
866 dataId = {"instrument": "DummyCamComp", "visit": 424}
867 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True)
868 self.assertFalse(components)
869 self.assertIsInstance(uri, ResourcePath)
870 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}")
871 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}")
872 # Repeat with a DatasetRef to test that code path.
873 ref = DatasetRef(
874 datasets[0].datasetType,
875 dataId=DataCoordinate.standardize(dataId, universe=butler.dimensions),
876 run=self.default_run,
877 )
878 uri2, components2 = butler.getURIs(ref, predict=True)
879 self.assertFalse(components2)
880 self.assertEqual(uri, uri2)
882 def testCompositePutGetVirtual(self) -> None:
883 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadComp")
884 butler = self.runPutGetTest(storageClass, "test_metric_comp")
886 # Should be disassembled
887 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run))
888 self.assertEqual(len(datasets), 1)
889 uri, components = butler.getURIs(datasets[0])
891 if butler._datastore.isEphemeral:
892 # Never disassemble in-memory datastore
893 self.assertIsInstance(uri, ResourcePath)
894 self.assertFalse(components)
895 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}")
896 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}")
897 else:
898 self.assertIsNone(uri)
899 self.assertEqual(set(components), set(storageClass.components))
900 for compuri in components.values():
901 self.assertIsInstance(compuri, ResourcePath)
902 self.assertIn("423", str(compuri), f"Checking visit is in URI {compuri}")
903 self.assertEqual(compuri.fragment, "", f"Checking absence of fragment in {compuri}")
905 if self.predictionSupported:
906 # Predicted dataset
907 dataId = {"instrument": "DummyCamComp", "visit": 424}
908 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True)
910 if butler._datastore.isEphemeral:
911 # Never disassembled
912 self.assertIsInstance(uri, ResourcePath)
913 self.assertFalse(components)
914 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}")
915 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}")
916 else:
917 self.assertIsNone(uri)
918 self.assertEqual(set(components), set(storageClass.components))
919 for compuri in components.values():
920 self.assertIsInstance(compuri, ResourcePath)
921 self.assertIn("424", str(compuri), f"Checking visit is in URI {compuri}")
922 self.assertEqual(compuri.fragment, "predicted", f"Checking for fragment in {compuri}")
924 def testStorageClassOverrideGet(self) -> None:
925 """Test storage class conversion on get with override."""
926 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
927 datasetTypeName = "anything"
928 run = self.default_run
930 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName)
932 # Create and store a dataset.
933 metric = makeExampleMetrics()
934 dataId = {"instrument": "DummyCamComp", "visit": 423}
936 ref = butler.put(metric, datasetType, dataId)
938 # Return native type.
939 retrieved = butler.get(ref)
940 self.assertEqual(retrieved, metric)
942 # Specify an override.
943 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion")
944 model = butler.get(ref, storageClass=new_sc)
945 self.assertNotEqual(type(model), type(retrieved))
946 self.assertIs(type(model), new_sc.pytype)
947 self.assertEqual(retrieved, model)
949 # Defer but override later.
950 deferred = butler.getDeferred(ref)
951 model = deferred.get(storageClass=new_sc)
952 self.assertIs(type(model), new_sc.pytype)
953 self.assertEqual(retrieved, model)
955 # Defer but override up front.
956 deferred = butler.getDeferred(ref, storageClass=new_sc)
957 model = deferred.get()
958 self.assertIs(type(model), new_sc.pytype)
959 self.assertEqual(retrieved, model)
961 # Retrieve a component. Should be a tuple.
962 data = butler.get("anything.data", dataId, storageClass="StructuredDataDataTestTuple")
963 self.assertIs(type(data), tuple)
964 self.assertEqual(data, tuple(retrieved.data))
966 # Parameter on the write storage class should work regardless
967 # of read storage class.
968 data = butler.get(
969 "anything.data",
970 dataId,
971 storageClass="StructuredDataDataTestTuple",
972 parameters={"slice": slice(2, 4)},
973 )
974 self.assertEqual(len(data), 2)
976 # Try a parameter that is known to the read storage class but not
977 # the write storage class.
978 with self.assertRaises(KeyError):
979 butler.get(
980 "anything.data",
981 dataId,
982 storageClass="StructuredDataDataTestTuple",
983 parameters={"xslice": slice(2, 4)},
984 )
986 def testPytypePutCoercion(self) -> None:
987 """Test python type coercion on Butler.get and put."""
988 # Store some data with the normal example storage class.
989 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
990 datasetTypeName = "test_metric"
991 butler, _ = self.create_butler(self.default_run, storageClass, datasetTypeName)
993 dataId = {"instrument": "DummyCamComp", "visit": 423}
995 # Put a dict and this should coerce to a MetricsExample
996 test_dict = {"summary": {"a": 1}, "output": {"b": 2}}
997 metric_ref = butler.put(test_dict, datasetTypeName, dataId=dataId, visit=424)
998 test_metric = butler.get(metric_ref)
999 self.assertEqual(get_full_type_name(test_metric), "lsst.daf.butler.tests.MetricsExample")
1000 self.assertEqual(test_metric.summary, test_dict["summary"])
1001 self.assertEqual(test_metric.output, test_dict["output"])
1003 # Check that the put still works if a DatasetType is given with
1004 # a definition matching this python type.
1005 registry_type = butler.get_dataset_type(datasetTypeName)
1006 this_type = DatasetType(datasetTypeName, registry_type.dimensions, "StructuredDataDictJson")
1007 metric2_ref = butler.put(test_dict, this_type, dataId=dataId, visit=425)
1008 self.assertEqual(metric2_ref.datasetType, registry_type)
1010 # The get will return the type expected by registry.
1011 test_metric2 = butler.get(metric2_ref)
1012 self.assertEqual(get_full_type_name(test_metric2), "lsst.daf.butler.tests.MetricsExample")
1014 # Make a new DatasetRef with the compatible but different DatasetType.
1015 # This should now return a dict.
1016 new_ref = DatasetRef(this_type, metric2_ref.dataId, id=metric2_ref.id, run=metric2_ref.run)
1017 test_dict2 = butler.get(new_ref)
1018 self.assertEqual(get_full_type_name(test_dict2), "dict")
1020 # Get it again with the wrong dataset type definition using get()
1021 # rather than get(). This should be consistent with get()
1022 # behavior and return the type of the DatasetType.
1023 test_dict3 = butler.get(this_type, dataId=dataId, visit=425)
1024 self.assertEqual(get_full_type_name(test_dict3), "dict")
1026 def test_ingest_zip(self) -> None:
1027 """Create butler, export data, delete data, import from Zip."""
1028 butler, dataset_type = self.create_butler(
1029 run=self.default_run, storageClass="StructuredData", datasetTypeName="metrics"
1030 )
1032 metric = makeExampleMetrics()
1033 refs = []
1034 for visit in (423, 424, 425):
1035 ref = butler.put(metric, dataset_type, instrument="DummyCamComp", visit=visit)
1036 refs.append(ref)
1038 # Retrieve a Zip file.
1039 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
1040 zip = butler.retrieve_artifacts_zip(refs, destination=tmpdir)
1042 # Ingest will fail.
1043 with self.assertRaises(ConflictingDefinitionError):
1044 butler.ingest_zip(zip)
1046 # Clear out the collection.
1047 butler.removeRuns([self.default_run])
1048 self.assertFalse(butler.exists(refs[0]))
1050 butler.ingest_zip(zip, transfer="copy")
1051 self.assertGreater(butler._metrics.time_in_ingest, 0.0)
1052 self.assertEqual(butler._metrics.n_ingest, len(refs))
1054 # Check that it fails if we try it again.
1055 with self.assertRaises(ConflictingDefinitionError):
1056 butler.ingest_zip(zip, transfer="copy")
1058 # This will be a no-op.
1059 butler.ingest_zip(zip, transfer="copy", skip_existing=True)
1061 # Create an entirely new local file butler in this temp directory.
1062 new_butler_cfg = Butler.makeRepo(tmpdir)
1063 new_butler = Butler.from_config(new_butler_cfg, writeable=True)
1064 self.enterContext(new_butler)
1066 # This will fail since dimensions records are missing.
1067 with self.assertRaises(ConflictingDefinitionError):
1068 new_butler.ingest_zip(zip, transfer="copy")
1070 # Dry run should work.
1071 new_butler.ingest_zip(zip, transfer="copy", dry_run=True)
1073 new_butler.ingest_zip(zip, transfer="copy", transfer_dimensions=True)
1074 self.assertTrue(butler.exists(refs[0]))
1076 # Check that the refs can be read again.
1077 _ = [butler.get(ref) for ref in refs]
1079 uri = butler.getURI(refs[2])
1080 self.assertTrue(uri.exists())
1082 # Delete one dataset. The Zip file should still exist and allow
1083 # remaining refs to be read.
1084 butler.pruneDatasets([refs[0]], purge=True, unstore=True)
1085 self.assertTrue(uri.exists())
1087 metric2 = butler.get(refs[1])
1088 self.assertEqual(metric2, metric, msg=f"{metric2} != {metric}")
1090 butler.removeRuns([self.default_run])
1091 self.assertFalse(uri.exists())
1092 self.assertFalse(butler.exists(refs[-1]))
1094 with self.assertRaises(ValueError):
1095 butler.retrieve_artifacts_zip([], destination=".")
1097 def testIngest(self) -> None:
1098 butler = self.create_empty_butler(run=self.default_run)
1100 # Create and register a DatasetType
1101 dimensions = butler.dimensions.conform(["instrument", "visit", "detector"])
1103 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDictYaml")
1104 datasetTypeName = "metric"
1106 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1108 # Add needed Dimensions
1109 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1110 butler.registry.insertDimensionData(
1111 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1112 )
1113 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
1114 for detector in (1, 2):
1115 butler.registry.insertDimensionData(
1116 "detector", {"instrument": "DummyCamComp", "id": detector, "full_name": f"detector{detector}"}
1117 )
1119 butler.registry.insertDimensionData(
1120 "visit",
1121 {
1122 "instrument": "DummyCamComp",
1123 "id": 423,
1124 "name": "fourtwentythree",
1125 "physical_filter": "d-r",
1126 "day_obs": 20250101,
1127 },
1128 {
1129 "instrument": "DummyCamComp",
1130 "id": 424,
1131 "name": "fourtwentyfour",
1132 "physical_filter": "d-r",
1133 "day_obs": 20250101,
1134 },
1135 )
1137 formatter = doImportType("lsst.daf.butler.formatters.yaml.YamlFormatter")
1138 dataRoot = os.path.join(TESTDIR, "data", "basic")
1139 datasets = []
1140 # Test one DatasetRef with a run that exists, and the other with a run
1141 # that doesn't exist, to verify that run collections are created when
1142 # required.
1143 runs = {1: self.default_run, 2: "a/new/run"}
1144 for detector in (1, 2):
1145 detector_name = f"detector_{detector}"
1146 metricFile = os.path.join(dataRoot, f"{detector_name}.yaml")
1147 dataId = butler.registry.expandDataId(
1148 {"instrument": "DummyCamComp", "visit": 423, "detector": detector}
1149 )
1150 # Create a DatasetRef for ingest
1151 refIn = DatasetRef(datasetType, dataId, run=runs[detector])
1153 datasets.append(FileDataset(path=metricFile, refs=[refIn], formatter=formatter))
1155 butler.ingest(*datasets, transfer="copy")
1157 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 423}
1158 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 423}
1160 metrics1 = butler.get(datasetTypeName, dataId1)
1161 metrics2 = butler.get(datasetTypeName, dataId2, collections="a/new/run")
1162 self.assertNotEqual(metrics1, metrics2)
1164 # Compare URIs
1165 uri1 = butler.getURI(datasetTypeName, dataId1)
1166 uri2 = butler.getURI(datasetTypeName, dataId2, collections="a/new/run")
1167 self.assertFalse(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}")
1169 # Re-ingesting the same datasets raises an error with
1170 # skip_existing=False.
1171 with self.assertRaises(ConflictingDefinitionError):
1172 butler.ingest(*datasets, transfer="copy")
1173 # skip_existing=True makes it a no-op to re-ingest the same datasets.
1174 butler.ingest(*datasets, transfer="copy", skip_existing=True)
1176 # Now do a multi-dataset but single file ingest
1177 metricFile = os.path.join(dataRoot, "detectors.yaml")
1178 refs = []
1179 for detector in (1, 2):
1180 detector_name = f"detector_{detector}"
1181 dataId = butler.registry.expandDataId(
1182 {"instrument": "DummyCamComp", "visit": 424, "detector": detector}
1183 )
1184 # Create a DatasetRef for ingest
1185 refs.append(DatasetRef(datasetType, dataId, run=self.default_run))
1187 # Test "move" transfer to ensure that the files themselves
1188 # have disappeared following ingest.
1189 with ResourcePath.temporary_uri(suffix=".yaml") as tempFile:
1190 tempFile.transfer_from(ResourcePath(metricFile), transfer="copy")
1192 datasets = []
1193 datasets.append(FileDataset(path=tempFile, refs=refs, formatter=MultiDetectorFormatter))
1195 # For first ingest use copy.
1196 butler.ingest(*datasets, transfer="copy", record_validation_info=False)
1198 # Now try to ingest again in "execution butler" mode where
1199 # the registry entries exist but the datastore does not have
1200 # the files. We also need to strip the dimension records to ensure
1201 # that they will be re-added by the ingest.
1202 ref = datasets[0].refs[0]
1203 datasets[0].refs = [
1204 cast(
1205 DatasetRef,
1206 butler.find_dataset(ref.datasetType, data_id=ref.dataId, collections=ref.run),
1207 )
1208 for ref in datasets[0].refs
1209 ]
1210 all_refs = []
1211 for dataset in datasets:
1212 refs = []
1213 for ref in dataset.refs:
1214 # Create a dict from the dataId to drop the records.
1215 new_data_id = dict(ref.dataId.required)
1216 new_ref = butler.find_dataset(ref.datasetType, new_data_id, collections=ref.run)
1217 assert new_ref is not None
1218 self.assertFalse(new_ref.dataId.hasRecords())
1219 refs.append(new_ref)
1220 dataset.refs = refs
1221 all_refs.extend(dataset.refs)
1222 butler.pruneDatasets(all_refs, disassociate=False, unstore=True, purge=False)
1224 # Use move mode to test that the file is deleted. Also
1225 # disable recording of file size.
1226 butler.ingest(*datasets, transfer="move", record_validation_info=False)
1228 # Check that every ref now has records.
1229 for dataset in datasets:
1230 for ref in dataset.refs:
1231 self.assertTrue(ref.dataId.hasRecords())
1233 # Ensure that the file has disappeared.
1234 self.assertFalse(tempFile.exists())
1236 # Check that the datastore recorded no file size.
1237 # Not all datastores can support this.
1238 try:
1239 infos = butler._datastore.getStoredItemsInfo(datasets[0].refs[0]) # type: ignore[attr-defined]
1240 self.assertEqual(infos[0].file_size, -1)
1241 except AttributeError:
1242 pass
1244 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 424}
1245 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 424}
1247 multi1 = butler.get(datasetTypeName, dataId1)
1248 multi2 = butler.get(datasetTypeName, dataId2)
1250 self.assertEqual(multi1, metrics1)
1251 self.assertEqual(multi2, metrics2)
1253 # Compare URIs
1254 uri1 = butler.getURI(datasetTypeName, dataId1)
1255 uri2 = butler.getURI(datasetTypeName, dataId2)
1256 self.assertTrue(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}")
1258 # Test that removing one does not break the second
1259 # This line will issue a warning log message for a ChainedDatastore
1260 # that uses an InMemoryDatastore since in-memory can not ingest
1261 # files.
1262 butler.pruneDatasets([datasets[0].refs[0]], unstore=True, disassociate=False)
1263 self.assertFalse(butler.exists(datasetTypeName, dataId1))
1264 self.assertTrue(butler.exists(datasetTypeName, dataId2))
1265 multi2b = butler.get(datasetTypeName, dataId2)
1266 self.assertEqual(multi2, multi2b)
1268 # Ensure we can ingest 0 datasets
1269 datasets = []
1270 butler.ingest(*datasets)
1272 def testPickle(self) -> None:
1273 """Test pickle support."""
1274 butler = self.create_empty_butler(run=self.default_run)
1275 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration"
1276 butlerOut = pickle.loads(pickle.dumps(butler))
1277 self.enterContext(butlerOut)
1278 self.assertIsInstance(butlerOut, Butler)
1279 self.assertEqual(butlerOut._config, butler._config)
1280 self.assertEqual(list(butlerOut.collections.defaults), list(butler.collections.defaults))
1281 self.assertEqual(butlerOut.run, butler.run)
1283 def testGetDatasetTypes(self) -> None:
1284 butler = self.create_empty_butler(run=self.default_run)
1285 dimensions = butler.dimensions.conform(["instrument", "visit", "physical_filter"])
1286 dimensionEntries: list[tuple[str, list[Mapping[str, Any]]]] = [
1287 (
1288 "instrument",
1289 [
1290 {"instrument": "DummyCam"},
1291 {"instrument": "DummyHSC"},
1292 {"instrument": "DummyCamComp"},
1293 ],
1294 ),
1295 ("physical_filter", [{"instrument": "DummyCam", "name": "d-r", "band": "R"}]),
1296 ("day_obs", [{"instrument": "DummyCam", "id": 20250101}]),
1297 (
1298 "visit",
1299 [
1300 {
1301 "instrument": "DummyCam",
1302 "id": 42,
1303 "name": "fortytwo",
1304 "physical_filter": "d-r",
1305 "day_obs": 20250101,
1306 }
1307 ],
1308 ),
1309 ]
1310 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1311 # Add needed Dimensions
1312 for element, data in dimensionEntries:
1313 butler.registry.insertDimensionData(element, *data)
1315 # When a DatasetType is added to the registry entries are not created
1316 # for components but querying them can return the components.
1317 datasetTypeNames = {"metric", "metric2", "metric4", "metric33", "pvi", "paramtest"}
1318 components = set()
1319 for datasetTypeName in datasetTypeNames:
1320 # Create and register a DatasetType
1321 self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1323 for componentName in storageClass.components:
1324 components.add(DatasetType.nameWithComponent(datasetTypeName, componentName))
1326 fromRegistry: set[DatasetType] = set()
1327 for parent_dataset_type in butler.registry.queryDatasetTypes():
1328 fromRegistry.add(parent_dataset_type)
1329 fromRegistry.update(parent_dataset_type.makeAllComponentDatasetTypes())
1330 self.assertEqual({d.name for d in fromRegistry}, datasetTypeNames | components)
1332 # Query with wildcard.
1333 dataset_types = butler.registry.queryDatasetTypes("metric*")
1334 self.assertEqual(len(dataset_types), 4, f"Got: {dataset_types}")
1335 # but not regex.
1336 with self.assertRaises(DatasetTypeExpressionError):
1337 butler.registry.queryDatasetTypes(["pvi", re.compile("metric.*")])
1339 # Now that we have some dataset types registered, validate them
1340 butler.validateConfiguration(
1341 ignore=[
1342 "test_metric_comp",
1343 "metric3",
1344 "metric5",
1345 "calexp",
1346 "DummySC",
1347 "datasetType.component",
1348 "random_data",
1349 "random_data_2",
1350 ]
1351 )
1353 # Add a new datasetType that will fail template validation
1354 self.addDatasetType("test_metric_comp", dimensions, storageClass, butler.registry)
1355 if self.validationCanFail:
1356 with self.assertRaises(ValidationError):
1357 butler.validateConfiguration()
1359 # Rerun validation but with a subset of dataset type names
1360 butler.validateConfiguration(datasetTypeNames=["metric4"])
1362 # Rerun validation but ignore the bad datasetType
1363 butler.validateConfiguration(
1364 ignore=[
1365 "test_metric_comp",
1366 "metric3",
1367 "metric5",
1368 "calexp",
1369 "DummySC",
1370 "datasetType.component",
1371 "random_data",
1372 "random_data_2",
1373 ]
1374 )
1376 def testTransaction(self) -> None:
1377 butler = self.create_empty_butler(run=self.default_run)
1378 datasetTypeName = "test_metric"
1379 dimensions = butler.dimensions.conform(["instrument", "visit"])
1380 dimensionEntries: tuple[tuple[str, Mapping[str, Any]], ...] = (
1381 ("instrument", {"instrument": "DummyCam"}),
1382 ("physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "R"}),
1383 ("day_obs", {"instrument": "DummyCam", "id": 20250101}),
1384 (
1385 "visit",
1386 {
1387 "instrument": "DummyCam",
1388 "id": 42,
1389 "name": "fortytwo",
1390 "physical_filter": "d-r",
1391 "day_obs": 20250101,
1392 },
1393 ),
1394 )
1395 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1396 metric = makeExampleMetrics()
1397 dataId = {"instrument": "DummyCam", "visit": 42}
1398 # Create and register a DatasetType
1399 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1400 with self.assertRaises(TransactionTestError):
1401 with butler.transaction():
1402 # Add needed Dimensions
1403 for args in dimensionEntries:
1404 butler.registry.insertDimensionData(*args)
1405 # Store a dataset
1406 ref = butler.put(metric, datasetTypeName, dataId)
1407 self.assertIsInstance(ref, DatasetRef)
1408 # Test get of a ref.
1409 metricOut = butler.get(ref)
1410 self.assertEqual(metric, metricOut)
1411 # Test get
1412 metricOut = butler.get(datasetTypeName, dataId)
1413 self.assertEqual(metric, metricOut)
1414 # Check we can get components
1415 self.assertGetComponents(butler, ref, ("summary", "data", "output"), metric)
1416 raise TransactionTestError("This should roll back the entire transaction")
1417 with self.assertRaises(DataIdValueError, msg=f"Check can't expand DataId {dataId}"):
1418 butler.registry.expandDataId(dataId)
1419 # Should raise LookupError for missing data ID value
1420 with self.assertRaises(LookupError, msg=f"Check can't get by {datasetTypeName} and {dataId}"):
1421 butler.get(datasetTypeName, dataId)
1422 # Also check explicitly if Dataset entry is missing
1423 self.assertIsNone(butler.find_dataset(datasetType, dataId, collections=butler.collections.defaults))
1424 # Direct retrieval should not find the file in the Datastore
1425 with self.assertRaises(FileNotFoundError, msg=f"Check {ref} can't be retrieved directly"):
1426 butler.get(ref)
1428 def testMakeRepo(self) -> None:
1429 """Test that we can write butler configuration to a new repository via
1430 the Butler.makeRepo interface and then instantiate a butler from the
1431 repo root.
1432 """
1433 # Do not run the test if we know this datastore configuration does
1434 # not support a file system root
1435 if self.fullConfigKey is None:
1436 return
1438 # create two separate directories
1439 root1 = tempfile.mkdtemp(dir=self.root)
1440 root2 = tempfile.mkdtemp(dir=self.root)
1442 butlerConfig = Butler.makeRepo(root1, config=Config(self.configFile))
1443 limited = Config(self.configFile)
1444 butler1 = Butler.from_config(butlerConfig)
1445 self.enterContext(butler1)
1446 assert isinstance(butler1, DirectButler), "Expect DirectButler in configuration"
1447 butlerConfig = Butler.makeRepo(root2, standalone=True, config=Config(self.configFile))
1448 full = Config(self.tmpConfigFile)
1449 butler2 = Butler.from_config(butlerConfig)
1450 self.enterContext(butler2)
1451 assert isinstance(butler2, DirectButler), "Expect DirectButler in configuration"
1452 # Butlers should have the same configuration regardless of whether
1453 # defaults were expanded.
1454 self.assertEqual(butler1._config, butler2._config)
1455 # Config files loaded directly should not be the same.
1456 self.assertNotEqual(limited, full)
1457 # Make sure "limited" doesn't have a few keys we know it should be
1458 # inheriting from defaults.
1459 self.assertIn(self.fullConfigKey, full)
1460 self.assertNotIn(self.fullConfigKey, limited)
1462 # Collections don't appear until something is put in them
1463 collections1 = set(butler1.registry.queryCollections())
1464 self.assertEqual(collections1, set())
1465 self.assertEqual(set(butler2.registry.queryCollections()), collections1)
1467 # Check that a config with no associated file name will not
1468 # work properly with relocatable Butler repo
1469 butlerConfig.configFile = None
1470 with self.assertRaises(ValueError):
1471 Butler.from_config(butlerConfig)
1473 with self.assertRaises(FileExistsError):
1474 Butler.makeRepo(self.root, standalone=True, config=Config(self.configFile), overwrite=False)
1476 def testStringification(self) -> None:
1477 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
1478 self.enterContext(butler)
1479 butlerStr = str(butler)
1481 if self.datastoreStr is not None:
1482 for testStr in self.datastoreStr:
1483 self.assertIn(testStr, butlerStr)
1484 if self.registryStr is not None:
1485 self.assertIn(self.registryStr, butlerStr)
1487 datastoreName = butler._datastore.name
1488 if self.datastoreName is not None:
1489 for testStr in self.datastoreName:
1490 self.assertIn(testStr, datastoreName)
1492 def testButlerRewriteDataId(self) -> None:
1493 """Test that dataIds can be rewritten based on dimension records."""
1494 butler = self.create_empty_butler(run=self.default_run)
1496 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
1497 datasetTypeName = "random_data"
1499 # Create dimension records.
1500 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1501 butler.registry.insertDimensionData(
1502 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1503 )
1504 butler.registry.insertDimensionData(
1505 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
1506 )
1508 dimensions = butler.dimensions.conform(["instrument", "exposure"])
1509 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
1510 butler.registry.registerDatasetType(datasetType)
1512 n_exposures = 5
1513 dayobs = 20210530
1515 # Create records for multiple day_obs but same seq_num to test that
1516 # we are constraining gets properly when day_obs/seq_num is used
1517 # for an exposure. Second day is year in future but is not used.
1518 for day_obs in (dayobs, dayobs + 1_00_00):
1519 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": day_obs})
1521 for i in range(n_exposures):
1522 group_name = f"group_{day_obs}_{i}"
1523 butler.registry.insertDimensionData(
1524 "group", {"instrument": "DummyCamComp", "name": group_name}
1525 )
1526 butler.registry.insertDimensionData(
1527 "exposure",
1528 {
1529 "instrument": "DummyCamComp",
1530 "id": day_obs + i,
1531 "obs_id": f"exp_{day_obs}_{i}",
1532 "seq_num": i,
1533 "day_obs": day_obs,
1534 "physical_filter": "d-r",
1535 "group": group_name,
1536 },
1537 )
1539 # Write some data.
1540 for i in range(n_exposures):
1541 metric = {"something": i, "other": "metric", "list": [2 * x for x in range(i)]}
1543 # Use the seq_num for the put to test rewriting.
1544 dataId = {"seq_num": i, "day_obs": dayobs, "instrument": "DummyCamComp", "physical_filter": "d-r"}
1545 ref = butler.put(metric, datasetTypeName, dataId=dataId)
1547 # Check that the exposure is correct in the dataId
1548 self.assertEqual(ref.dataId["exposure"], dayobs + i)
1550 # and check that we can get the dataset back with the same dataId
1551 new_metric = butler.get(datasetTypeName, dataId=dataId)
1552 self.assertEqual(new_metric, metric)
1554 # Check that we can find the datasets using the day_obs or the
1555 # exposure.day_obs.
1556 datasets_1 = list(
1557 butler.registry.queryDatasets(
1558 datasetType,
1559 collections=self.default_run,
1560 where="day_obs = :dayObs AND instrument = :instr",
1561 bind={"dayObs": dayobs, "instr": "DummyCamComp"},
1562 )
1563 )
1564 datasets_2 = list(
1565 butler.registry.queryDatasets(
1566 datasetType,
1567 collections=self.default_run,
1568 where="exposure.day_obs = :dayObs AND instrument = :instr",
1569 bind={"dayObs": dayobs, "instr": "DummyCamComp"},
1570 )
1571 )
1572 self.assertEqual(datasets_1, datasets_2)
1574 def testGetDatasetCollectionCaching(self):
1575 # Prior to DM-41117, there was a bug where get_dataset would throw
1576 # MissingCollectionError if you tried to fetch a dataset that was added
1577 # after the collection cache was last updated.
1578 reader_butler, datasetType = self.create_butler(self.default_run, "int", "datasettypename")
1579 writer_butler = self.create_empty_butler(writeable=True, run="new_run")
1580 dataId = {"instrument": "DummyCamComp", "visit": 423}
1581 put_ref = writer_butler.put(123, datasetType, dataId)
1582 get_ref = reader_butler.get_dataset(put_ref.id)
1583 self.assertEqual(get_ref.id, put_ref.id)
1584 # Also works when looking up via a hexadecimal string instead of a UUID
1585 # instance.
1586 hex_ref = reader_butler.get_dataset(put_ref.id.hex)
1587 self.assertEqual(hex_ref.id, put_ref.id)
1589 def testCollectionChainRedefine(self):
1590 butler = self._setup_to_test_collection_chain()
1592 butler.collections.redefine_chain("chain", "a")
1593 self._check_chain(butler, ["a"])
1595 # Duplicates are removed from the list of children
1596 butler.collections.redefine_chain("chain", ["c", "b", "c"])
1597 self._check_chain(butler, ["c", "b"])
1599 # Empty list clears the chain
1600 butler.collections.redefine_chain("chain", [])
1601 self._check_chain(butler, [])
1603 self._test_common_chain_functionality(butler, butler.collections.redefine_chain)
1605 def testCollectionChainPrepend(self):
1606 butler = self._setup_to_test_collection_chain()
1608 # Duplicates are removed from the list of children
1609 butler.collections.prepend_chain("chain", ["c", "b", "c"])
1610 self._check_chain(butler, ["c", "b"])
1612 # Prepend goes on the front of existing chain
1613 butler.collections.prepend_chain("chain", ["a"])
1614 self._check_chain(butler, ["a", "c", "b"])
1616 # Empty prepend does nothing
1617 butler.collections.prepend_chain("chain", [])
1618 self._check_chain(butler, ["a", "c", "b"])
1620 # Prepending children that already exist in the chain removes them from
1621 # their current position.
1622 butler.collections.prepend_chain("chain", ["d", "b", "c"])
1623 self._check_chain(butler, ["d", "b", "c", "a"])
1625 self._test_common_chain_functionality(butler, butler.collections.prepend_chain)
1627 def testCollectionChainExtend(self):
1628 butler = self._setup_to_test_collection_chain()
1630 # Duplicates are removed from the list of children
1631 butler.collections.extend_chain("chain", ["c", "b", "c"])
1632 self._check_chain(butler, ["c", "b"])
1634 # Extend goes on the end of existing chain
1635 butler.collections.extend_chain("chain", ["a"])
1636 self._check_chain(butler, ["c", "b", "a"])
1638 # Empty extend does nothing
1639 butler.collections.extend_chain("chain", [])
1640 self._check_chain(butler, ["c", "b", "a"])
1642 # Extending children that already exist in the chain removes them from
1643 # their current position.
1644 butler.collections.extend_chain("chain", ["d", "b", "c"])
1645 self._check_chain(butler, ["a", "d", "b", "c"])
1647 self._test_common_chain_functionality(butler, butler.collections.extend_chain)
1649 def testCollectionChainRemove(self) -> None:
1650 butler = self._setup_to_test_collection_chain()
1652 butler.collections.redefine_chain("chain", ["a", "b", "c", "d"])
1654 butler.collections.remove_from_chain("chain", "c")
1655 self._check_chain(butler, ["a", "b", "d"])
1657 # Duplicates are allowed in the list of children
1658 butler.collections.remove_from_chain("chain", ["b", "b", "a"])
1659 self._check_chain(butler, ["d"])
1661 # Empty remove does nothing
1662 butler.collections.remove_from_chain("chain", [])
1663 self._check_chain(butler, ["d"])
1665 # Removing children that aren't in the chain does nothing
1666 butler.collections.remove_from_chain("chain", ["a", "chain"])
1667 self._check_chain(butler, ["d"])
1669 self._test_common_chain_functionality(
1670 butler, butler.collections.remove_from_chain, skip_cycle_check=True
1671 )
1673 def _setup_to_test_collection_chain(self) -> Butler:
1674 butler = self.create_empty_butler(writeable=True)
1676 butler.collections.register("chain", CollectionType.CHAINED)
1678 runs = ["a", "b", "c", "d"]
1679 for run in runs:
1680 butler.collections.register(run)
1682 butler.collections.register("staticchain", CollectionType.CHAINED)
1683 butler.collections.redefine_chain("staticchain", ["a", "b"])
1685 return butler
1687 def _check_chain(self, butler: Butler, expected: list[str]) -> None:
1688 children = butler.collections.get_info("chain").children
1689 self.assertEqual(expected, list(children))
1691 def _test_common_chain_functionality(
1692 self, butler, func: Callable[[str, str | list[str]], Any], *, skip_cycle_check=False
1693 ) -> None:
1694 # Missing parent collection
1695 with self.assertRaises(MissingCollectionError):
1696 func("doesnotexist", [])
1697 # Missing child collection
1698 with self.assertRaises(MissingCollectionError):
1699 func("chain", ["doesnotexist"])
1700 # Forbid operations on non-chained collections
1701 with self.assertRaises(CollectionTypeError):
1702 func("d", ["a"])
1704 # Prevent collection cycles
1705 if not skip_cycle_check:
1706 butler.collections.register("chain2", CollectionType.CHAINED)
1707 func("chain2", "chain")
1708 with self.assertRaises(CollectionCycleError):
1709 func("chain", "chain2")
1711 # Make sure none of the earlier operations interfered with unrelated
1712 # chains.
1713 self.assertEqual(["a", "b"], list(butler.collections.get_info("staticchain").children))
1715 with butler._caching_context():
1716 with self.assertRaisesRegex(RuntimeError, "Chained collection modification not permitted"):
1717 func("chain", "a")
1719 def test_transfer_dimension_records_from(self) -> None:
1720 source_butler = self.create_empty_butler(writeable=True)
1721 source_butler.import_(filename=_get_test_data_path("lsstcam-subset.yaml"))
1723 visit_id = 2025120200439
1724 exposure_id = visit_id
1725 target_butler = self.enterContext(create_populated_sqlite_registry())
1726 target_butler.transfer_dimension_records_from(
1727 source_butler,
1728 [
1729 # Should trigger the lookup of visit and all its associated
1730 # "populated_by" records (visit_detector_region,
1731 # visit_definition, etc.)
1732 DataCoordinate.standardize(
1733 {"instrument": "LSSTCam", "visit": visit_id, "detector": 10},
1734 universe=source_butler.dimensions,
1735 ),
1736 # Shouldn't add any records to the lookup.
1737 DataCoordinate.make_empty(source_butler.dimensions),
1738 ],
1739 )
1741 def _fetch_record(dimension: str) -> DimensionRecord:
1742 records = target_butler.query_dimension_records(dimension)
1743 self.assertEqual(len(records), 1)
1744 return records[0]
1746 visit = _fetch_record("visit")
1747 self.assertEqual(visit.id, visit_id)
1748 self.assertEqual(visit.day_obs, 20251202)
1749 self.assertEqual(visit.target_name, "lowdust")
1750 self.assertEqual(visit.seq_num, 439)
1751 original_visit = source_butler.query_dimension_records("visit", instrument="LSSTCam", visit=visit_id)[
1752 0
1753 ]
1754 self.assertEqual(visit.region, original_visit.region)
1755 self.assertEqual(visit.timespan, original_visit.timespan)
1757 visit_detector_region = _fetch_record("visit_detector_region")
1758 self.assertEqual(visit_detector_region.instrument, "LSSTCam")
1759 self.assertEqual(visit_detector_region.detector, 10)
1760 self.assertEqual(visit_detector_region.visit, visit_id)
1761 original_visit_detector_region = source_butler.query_dimension_records(
1762 "visit_detector_region", instrument="LSSTCam", visit=visit_id, detector=10
1763 )[0]
1764 self.assertEqual(visit_detector_region.region, original_visit_detector_region.region)
1766 visit_definition = _fetch_record("visit_definition")
1767 self.assertEqual(visit_definition.instrument, "LSSTCam")
1768 self.assertEqual(visit_definition.exposure, 2025120200439)
1769 self.assertEqual(visit_definition.visit, visit_id)
1771 # The matching exposure record should have been pulled in via
1772 # visit -> visit_definition.
1773 exposure = _fetch_record("exposure")
1774 self.assertEqual(exposure.instrument, "LSSTCam")
1775 self.assertEqual(exposure.id, 2025120200439)
1776 self.assertEqual(exposure.obs_id, "MC_O_20251202_000439")
1777 original_exposure = source_butler.query_dimension_records(
1778 "exposure", instrument="LSSTCam", exposure=exposure_id
1779 )[0]
1780 self.assertEqual(exposure.timespan, original_exposure.timespan)
1782 group = _fetch_record("group")
1783 self.assertEqual(group.instrument, "LSSTCam")
1784 self.assertEqual(group.name, "2025-12-03T07:58:10.858")
1786 visit_system_memberships = target_butler.query_dimension_records("visit_system_membership")
1787 visit_system_memberships.sort(key=lambda record: record.visit_system)
1788 self.assertEqual(len(visit_system_memberships), 2)
1789 self.assertEqual(visit_system_memberships[0].visit_system, 0)
1790 self.assertEqual(visit_system_memberships[1].visit_system, 2)
1791 self.assertEqual(visit_system_memberships[0].visit, visit_id)
1792 self.assertEqual(visit_system_memberships[1].visit, visit_id)
1794 visit_systems = target_butler.query_dimension_records("visit_system")
1795 visit_systems.sort(key=lambda record: record.id)
1796 visit_system_memberships.sort(key=lambda record: record.visit_system)
1797 self.assertEqual(visit_systems[0].id, 0)
1798 self.assertEqual(visit_systems[1].id, 2)
1799 self.assertEqual(visit_systems[0].name, "one-to-one")
1800 self.assertEqual(visit_systems[1].name, "by-seq-start-end")
1803class FileDatastoreButlerTests(ButlerTests):
1804 """Common tests and specialization of ButlerTests for butlers backed
1805 by datastores that inherit from FileDatastore.
1806 """
1808 trustModeSupported = True
1810 def checkFileExists(self, root: str | ResourcePath, relpath: str | ResourcePath) -> bool:
1811 """Check if file exists at a given path (relative to root).
1813 Test testPutTemplates verifies actual physical existance of the files
1814 in the requested location.
1815 """
1816 uri = ResourcePath(root, forceDirectory=True)
1817 return uri.join(relpath).exists()
1819 def testPutTemplates(self) -> None:
1820 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1821 butler = self.create_empty_butler(run=self.default_run)
1823 # Add needed Dimensions
1824 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1825 butler.registry.insertDimensionData(
1826 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1827 )
1828 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
1829 butler.registry.insertDimensionData(
1830 "visit",
1831 {
1832 "instrument": "DummyCamComp",
1833 "id": 423,
1834 "name": "v423",
1835 "physical_filter": "d-r",
1836 "day_obs": 20250101,
1837 },
1838 )
1839 butler.registry.insertDimensionData(
1840 "visit",
1841 {
1842 "instrument": "DummyCamComp",
1843 "id": 425,
1844 "name": "v425",
1845 "physical_filter": "d-r",
1846 "day_obs": 20250101,
1847 },
1848 )
1850 # Create and store a dataset
1851 metric = makeExampleMetrics()
1853 # Create two almost-identical DatasetTypes (both will use default
1854 # template)
1855 dimensions = butler.dimensions.conform(["instrument", "visit"])
1856 butler.registry.registerDatasetType(DatasetType("metric1", dimensions, storageClass))
1857 butler.registry.registerDatasetType(DatasetType("metric2", dimensions, storageClass))
1858 butler.registry.registerDatasetType(DatasetType("metric3", dimensions, storageClass))
1860 dataId1 = {"instrument": "DummyCamComp", "visit": 423}
1861 dataId2 = {"instrument": "DummyCamComp", "visit": 423, "physical_filter": "d-r"}
1863 # Put with exactly the data ID keys needed
1864 ref = butler.put(metric, "metric1", dataId1)
1865 uri = butler.getURI(ref)
1866 self.assertTrue(uri.exists())
1867 self.assertTrue(
1868 uri.unquoted_path.endswith(f"{self.default_run}/metric1/??#?/d-r/DummyCamComp_423.pickle")
1869 )
1871 # Check the template based on dimensions
1872 if hasattr(butler._datastore, "templates"):
1873 butler._datastore.templates.validateTemplates([ref])
1875 # Put with extra data ID keys (physical_filter is an optional
1876 # dependency); should not change template (at least the way we're
1877 # defining them to behave now; the important thing is that they
1878 # must be consistent).
1879 ref = butler.put(metric, "metric2", dataId2)
1880 uri = butler.getURI(ref)
1881 self.assertTrue(uri.exists())
1882 self.assertTrue(
1883 uri.unquoted_path.endswith(f"{self.default_run}/metric2/d-r/DummyCamComp_v423.pickle")
1884 )
1886 # Check the template based on dimensions
1887 if hasattr(butler._datastore, "templates"):
1888 butler._datastore.templates.validateTemplates([ref])
1890 # Use a template that has a typo in dimension record metadata.
1891 # Easier to test with a butler that has a ref with records attached.
1892 template = FileTemplate("a/{visit.name}/{id}_{visit.namex:?}.fits")
1893 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"):
1894 path = template.format(ref)
1895 self.assertEqual(path, f"a/v423/{ref.id}_fits")
1897 template = FileTemplate("a/{visit.name}/{id}_{visit.namex}.fits")
1898 with self.assertRaises(KeyError):
1899 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"):
1900 template.format(ref)
1902 # Now use a file template that will not result in unique filenames
1903 with self.assertRaises(FileTemplateValidationError):
1904 butler.put(metric, "metric3", dataId1)
1906 def testImportExport(self) -> None:
1907 # Run put/get tests just to create and populate a repo.
1908 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1909 self.runImportExportTest(storageClass)
1911 @unittest.expectedFailure
1912 def testImportExportVirtualComposite(self) -> None:
1913 # Run put/get tests just to create and populate a repo.
1914 storageClass = self.storageClassFactory.getStorageClass("StructuredComposite")
1915 self.runImportExportTest(storageClass)
1917 def runImportExportTest(self, storageClass: StorageClass) -> None:
1918 """Test exporting and importing.
1920 This test does an export to a temp directory and an import back
1921 into a new temp directory repo. It does not assume a posix datastore.
1922 """
1923 exportButler = self.runPutGetTest(storageClass, "test_metric")
1925 # Test that we must have a file extension.
1926 with self.assertRaises(ValueError):
1927 with exportButler.export(filename="dump", directory=".") as export:
1928 pass
1930 # Test that unknown format is not allowed.
1931 with self.assertRaises(ValueError):
1932 with exportButler.export(filename="dump.fits", directory=".") as export:
1933 pass
1935 # Test that the repo actually has at least one dataset.
1936 datasets = list(exportButler.registry.queryDatasets(..., collections=...))
1937 self.assertGreater(len(datasets), 0)
1938 # Add a DimensionRecord that's unused by those datasets.
1939 skymapRecord = {"name": "example_skymap", "hash": (50).to_bytes(8, byteorder="little")}
1940 exportButler.registry.insertDimensionData("skymap", skymapRecord)
1941 # Export and then import datasets.
1942 with safeTestTempDir(TESTDIR) as exportDir:
1943 exportFile = os.path.join(exportDir, "exports.yaml")
1944 with exportButler.export(filename=exportFile, directory=exportDir, transfer="auto") as export:
1945 export.saveDatasets(datasets)
1946 # Export the same datasets again. This should quietly do
1947 # nothing because of internal deduplication, and it shouldn't
1948 # complain about being asked to export the "htm7" elements even
1949 # though there aren't any in these datasets or in the database.
1950 export.saveDatasets(datasets, elements=["htm7"])
1951 # Save one of the data IDs again; this should be harmless
1952 # because of internal deduplication.
1953 export.saveDataIds([datasets[0].dataId])
1954 # Save some dimension records directly.
1955 export.saveDimensionData("skymap", [skymapRecord])
1956 self.assertTrue(os.path.exists(exportFile))
1957 with safeTestTempDir(TESTDIR) as importDir:
1958 # We always want this to be a local posix butler
1959 Butler.makeRepo(importDir, config=Config(os.path.join(TESTDIR, "config/basic/butler.yaml")))
1960 # Calling script.butlerImport tests the implementation of the
1961 # butler command line interface "import" subcommand. Functions
1962 # in the script folder are generally considered protected and
1963 # should not be used as public api.
1964 with open(exportFile) as f:
1965 script.butlerImport(
1966 importDir,
1967 export_file=f,
1968 directory=exportDir,
1969 transfer="auto",
1970 skip_dimensions=None,
1971 )
1972 importButler = Butler.from_config(importDir, run=self.default_run)
1973 self.enterContext(importButler)
1974 for ref in datasets:
1975 with self.subTest(ref=repr(ref)):
1976 # Test for existence by passing in the DatasetType and
1977 # data ID separately, to avoid lookup by dataset_id.
1978 self.assertTrue(importButler.exists(ref.datasetType, ref.dataId))
1979 self.assertEqual(
1980 list(importButler.registry.queryDimensionRecords("skymap")),
1981 [importButler.dimensions["skymap"].RecordClass(**skymapRecord)],
1982 )
1984 def testRemoveRuns(self) -> None:
1985 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1986 butler = self.create_empty_butler(writeable=True)
1987 # Load registry data with dimensions to hang datasets off of.
1988 butler.import_(filename=ResourcePath("resource://lsst.daf.butler/tests/registry_data/base.yaml"))
1989 # Add some RUN-type collection.
1990 run1 = "run1"
1991 butler.collections.register(run1)
1992 run2 = "run2"
1993 butler.collections.register(run2)
1994 # put a dataset in each
1995 metric = makeExampleMetrics()
1996 dimensions = butler.dimensions.conform(["instrument", "physical_filter"])
1997 datasetType = self.addDatasetType(
1998 "prune_collections_test_dataset", dimensions, storageClass, butler.registry
1999 )
2000 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1)
2001 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2)
2002 uri1 = butler.getURI(ref1)
2003 uri2 = butler.getURI(ref2)
2005 # Put one of the runs in a chain.
2006 butler.collections.register("Chain", CollectionType.CHAINED)
2007 butler.collections.extend_chain("Chain", run1)
2009 with self.assertRaises(OrphanedRecordError):
2010 butler.registry.removeDatasetType(datasetType.name)
2012 # Remove a non-run.
2013 with self.assertRaises(TypeError):
2014 butler.removeRuns(["Chain"])
2016 # Remove without unlinking from chain should fail.
2017 with self.assertRaises(IntegrityError):
2018 butler.removeRuns([run1])
2020 # Remove from both runs. No longer use unstore parameter since it
2021 # always purges.
2022 butler.removeRuns([run1, run2], unlink_from_chains=True)
2024 # Should be nothing in registry for either one, and datastore should
2025 # not think either exists.
2026 with self.assertRaises(MissingCollectionError):
2027 butler.collections.get_info(run1)
2028 with self.assertRaises(MissingCollectionError):
2029 butler.collections.get_info(run1)
2030 self.assertFalse(butler.stored(ref1))
2031 self.assertFalse(butler.stored(ref2))
2032 # We always unstore so both URIs should be gone.
2033 self.assertFalse(uri1.exists())
2034 self.assertFalse(uri2.exists())
2036 # Now that the collections have been pruned we can remove the
2037 # dataset type
2038 butler.registry.removeDatasetType(datasetType.name)
2040 with self.assertLogs("lsst.daf.butler.registry", "INFO") as cm:
2041 butler.registry.removeDatasetType(("test*", "test*"))
2042 self.assertIn("not defined", "\n".join(cm.output))
2044 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None:
2045 """Simulate an external actor removing a file outside of Butler's
2046 knowledge.
2048 Subclasses may override to handle more complicated datastore
2049 configurations.
2050 """
2051 uri = butler.getURI(ref)
2052 uri.remove()
2053 datastore = cast(FileDatastore, butler._datastore)
2054 datastore.cacheManager.remove_from_cache(ref)
2056 def testPruneDatasets(self) -> None:
2057 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2058 butler = self.create_empty_butler(writeable=True)
2059 # Load registry data with dimensions to hang datasets off of.
2060 butler.import_(filename=_get_test_data_path("base.yaml"))
2061 # Add some RUN-type collections.
2062 run1 = "run1"
2063 butler.collections.register(run1)
2064 run2 = "run2"
2065 butler.collections.register(run2)
2066 # put some datasets. ref1 and ref2 have the same data ID, and are in
2067 # different runs. ref3 has a different data ID.
2068 metric = makeExampleMetrics()
2069 dimensions = butler.dimensions.conform(["instrument", "physical_filter"])
2070 datasetType = self.addDatasetType(
2071 "prune_collections_test_dataset", dimensions, storageClass, butler.registry
2072 )
2073 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1)
2074 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2)
2075 ref3 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-R1"}, run=run1)
2077 many_stored = butler.stored_many([ref1, ref2, ref3])
2078 for ref, stored in many_stored.items():
2079 self.assertTrue(stored, f"Ref {ref} should be stored")
2081 many_exists = butler._exists_many([ref1, ref2, ref3])
2082 for ref, exists in many_exists.items():
2083 self.assertTrue(exists, f"Checking ref {ref} exists.")
2084 self.assertEqual(exists, DatasetExistence.VERIFIED, f"Ref {ref} should be stored")
2086 # Simple prune.
2087 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True)
2088 self.assertFalse(butler.exists(ref1.datasetType, ref1.dataId, collections=run1))
2090 many_stored = butler.stored_many([ref1, ref2, ref3])
2091 for ref, stored in many_stored.items():
2092 self.assertFalse(stored, f"Ref {ref} should not be stored")
2094 many_exists = butler._exists_many([ref1, ref2, ref3])
2095 for ref, exists in many_exists.items():
2096 self.assertEqual(exists, DatasetExistence.UNRECOGNIZED, f"Ref {ref} should not be stored")
2098 # Put data back.
2099 ref1_new = butler.put(metric, ref1)
2100 self.assertEqual(ref1_new, ref1) # Reuses original ID.
2101 ref2 = butler.put(metric, ref2)
2103 many_stored = butler.stored_many([ref1, ref2, ref3])
2104 self.assertTrue(many_stored[ref1])
2105 self.assertTrue(many_stored[ref2])
2106 self.assertFalse(many_stored[ref3])
2108 ref3 = butler.put(metric, ref3)
2110 many_exists = butler._exists_many([ref1, ref2, ref3])
2111 for ref, exists in many_exists.items():
2112 self.assertTrue(exists, f"Ref {ref} should not be stored")
2114 # Clear out the datasets from registry and start again.
2115 refs = [ref1, ref2, ref3]
2116 butler.pruneDatasets(refs, purge=True, unstore=True)
2117 for ref in refs:
2118 butler.put(metric, ref)
2120 # Confirm we can retrieve deferred.
2121 dref1 = butler.getDeferred(ref1) # known and exists
2122 metric1 = dref1.get()
2123 self.assertEqual(metric1, metric)
2125 # Test different forms of file availability.
2126 # Need to be in a state where:
2127 # - one ref just has registry record.
2128 # - one ref has a missing file but a datastore record.
2129 # - one ref has a missing datastore record but file is there.
2130 # - one ref does not exist anywhere.
2131 # Do not need to test a ref that has everything since that is tested
2132 # above.
2133 ref0 = DatasetRef(
2134 datasetType,
2135 DataCoordinate.standardize(
2136 {"instrument": "Cam1", "physical_filter": "Cam1-G"}, universe=butler.dimensions
2137 ),
2138 run=run1,
2139 )
2141 # Delete from datastore and retain in Registry.
2142 butler.pruneDatasets([ref1], purge=False, unstore=True, disassociate=False)
2144 # File has been removed.
2145 self.remove_dataset_out_of_band(butler, ref2)
2147 # Datastore has lost track.
2148 butler._datastore.forget([ref3])
2150 # First test with a standard butler.
2151 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True)
2152 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2153 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED)
2154 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE)
2155 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED)
2157 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=False)
2158 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2159 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED | DatasetExistence._ASSUMED)
2160 self.assertEqual(exists_many[ref2], DatasetExistence.KNOWN)
2161 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ASSUMED)
2162 self.assertTrue(exists_many[ref2])
2164 # Check that per-ref query gives the same answer as many query.
2165 for ref, exists in exists_many.items():
2166 self.assertEqual(butler.exists(ref, full_check=False), exists)
2168 # Get deferred checks for existence before it allows it to be
2169 # retrieved.
2170 with self.assertRaises(LookupError):
2171 butler.getDeferred(ref3) # not known, file exists
2172 dref2 = butler.getDeferred(ref2) # known but file missing
2173 with self.assertRaises(FileNotFoundError):
2174 dref2.get()
2176 # Test again with a trusting butler.
2177 if self.trustModeSupported:
2178 butler._datastore.trustGetRequest = True
2179 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True)
2180 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2181 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED)
2182 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE)
2183 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ARTIFACT)
2185 # When trusting we can get a deferred dataset handle that is not
2186 # known but does exist.
2187 dref3 = butler.getDeferred(ref3)
2188 metric3 = dref3.get()
2189 self.assertEqual(metric3, metric)
2191 # Check that per-ref query gives the same answer as many query.
2192 for ref, exists in exists_many.items():
2193 self.assertEqual(butler.exists(ref, full_check=True), exists)
2195 # Create a ref that surprisingly has the UUID of an existing ref
2196 # but is not the same.
2197 ref_bad = DatasetRef(datasetType, dataId=ref3.dataId, run=ref3.run, id=ref2.id)
2198 with self.assertRaises(ValueError):
2199 butler.exists(ref_bad)
2201 # Create a ref that has a compatible storage class.
2202 ref_compat = ref2.overrideStorageClass("StructuredDataDict")
2203 exists = butler.exists(ref_compat)
2204 self.assertEqual(exists, exists_many[ref2])
2206 # Remove everything and start from scratch.
2207 butler._datastore.trustGetRequest = False
2208 butler.pruneDatasets(refs, purge=True, unstore=True)
2209 for ref in refs:
2210 butler.put(metric, ref)
2212 # These tests mess directly with the trash table and can leave the
2213 # datastore in an odd state. Do them at the end.
2214 # Check that in normal mode, deleting the record will lead to
2215 # trash not touching the file.
2216 uri1 = butler.getURI(ref1)
2217 butler._datastore.bridge.moveToTrash(
2218 [ref1], transaction=None
2219 ) # Update the dataset_location table
2220 butler._datastore.forget([ref1])
2221 butler._datastore.trash(ref1)
2222 butler._datastore.emptyTrash()
2223 self.assertTrue(uri1.exists())
2224 uri1.remove() # Clean it up.
2226 # Simulate execution butler setup by deleting the datastore
2227 # record but keeping the file around and trusting.
2228 butler._datastore.trustGetRequest = True
2229 uris = butler.get_many_uris([ref2, ref3])
2230 uri2 = uris[ref2].primaryURI
2231 uri3 = uris[ref3].primaryURI
2232 self.assertTrue(uri2.exists())
2233 self.assertTrue(uri3.exists())
2235 # Remove the datastore record.
2236 butler._datastore.bridge.moveToTrash(
2237 [ref2], transaction=None
2238 ) # Update the dataset_location table
2239 butler._datastore.forget([ref2])
2240 self.assertTrue(uri2.exists())
2241 butler._datastore.trash([ref2, ref3])
2242 # Immediate removal for ref2 file
2243 self.assertFalse(uri2.exists())
2244 # But ref3 has to wait for the empty.
2245 self.assertTrue(uri3.exists())
2246 butler._datastore.emptyTrash()
2247 self.assertFalse(uri3.exists())
2249 # Clear out the datasets from registry.
2250 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True)
2252 def test_butler_metrics(self):
2253 """Test that metrics are collected."""
2254 run = "test_run"
2255 metrics = ButlerMetrics()
2256 butler, datasetType = self.create_butler(
2257 run, "MetricsExampleModelProvenance", "prov_metric", metrics=metrics
2258 )
2259 data = MetricsExampleModel(
2260 summary={"AM1": 5.2, "AM2": 30.6},
2261 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
2262 data=[563, 234, 456.7, 752, 8, 9, 27],
2263 )
2265 data_ref = butler.put(data, datasetType, visit=424, instrument="DummyCamComp")
2266 butler.get(data_ref)
2267 butler.get(data_ref)
2268 self.assertEqual(metrics.n_get, 2)
2269 self.assertGreater(metrics.time_in_get, 0.0)
2270 self.assertEqual(metrics.n_put, 1)
2271 self.assertGreater(metrics.time_in_put, 0.0)
2273 deferred = butler.getDeferred(data_ref)
2274 deferred.get()
2275 self.assertEqual(metrics.n_get, 3)
2277 with butler.record_metrics() as new:
2278 data_ref_2 = butler.put(data, datasetType, visit=425, instrument="DummyCamComp")
2279 butler.get(data_ref)
2281 butler.pruneDatasets([data_ref, data_ref_2], purge=True, unstore=True)
2282 with ResourcePath.temporary_uri(suffix=".json") as tmpFile:
2283 tmpFile.write(data.model_dump_json().encode())
2284 refs = [
2285 DatasetRef(datasetType, data_ref_2.dataId, run),
2286 DatasetRef(datasetType, data_ref.dataId, run),
2287 ]
2288 datasets = [FileDataset(path=tmpFile, refs=refs)]
2289 butler.ingest(*datasets, transfer="copy")
2291 self.assertEqual(new.n_get, 1)
2292 self.assertEqual(new.n_put, 1)
2293 self.assertEqual(new.n_ingest, 2)
2296class PosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2297 """PosixDatastore specialization of a butler"""
2299 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2300 fullConfigKey: str | None = ".datastore.formatters"
2301 validationCanFail = True
2302 datastoreStr = ["/tmp"]
2303 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"]
2304 registryStr = "/gen3.sqlite3"
2306 def testPathConstructor(self) -> None:
2307 """Independent test of constructor using PathLike."""
2308 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
2309 self.enterContext(butler)
2310 self.assertIsInstance(butler, Butler)
2312 # And again with a Path object with the butler yaml
2313 path = pathlib.Path(self.tmpConfigFile)
2314 butler = Butler.from_config(path, writeable=False)
2315 self.enterContext(butler)
2316 self.assertIsInstance(butler, Butler)
2318 # And again with a Path object without the butler yaml
2319 # (making sure we skip it if the tmp config doesn't end
2320 # in butler.yaml -- which is the case for a subclass)
2321 if self.tmpConfigFile.endswith("butler.yaml"):
2322 path = pathlib.Path(os.path.dirname(self.tmpConfigFile))
2323 butler = Butler.from_config(path, writeable=False)
2324 self.enterContext(butler)
2325 self.assertIsInstance(butler, Butler)
2327 def testExportTransferCopy(self) -> None:
2328 """Test local export using all transfer modes"""
2329 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2330 exportButler = self.runPutGetTest(storageClass, "test_metric")
2331 # Test that the repo actually has at least one dataset.
2332 datasets = list(exportButler.registry.queryDatasets(..., collections=...))
2333 self.assertGreater(len(datasets), 0)
2334 uris = [exportButler.getURI(d) for d in datasets]
2335 assert isinstance(exportButler._datastore, FileDatastore)
2336 datastoreRoot = exportButler.get_datastore_roots()[exportButler.get_datastore_names()[0]]
2338 pathsInStore = [uri.relative_to(datastoreRoot) for uri in uris]
2340 for path in pathsInStore:
2341 # Assume local file system
2342 assert path is not None
2343 self.assertTrue(self.checkFileExists(datastoreRoot, path), f"Checking path {path}")
2345 for transfer in ("copy", "link", "symlink", "relsymlink"):
2346 with safeTestTempDir(TESTDIR) as exportDir:
2347 with exportButler.export(directory=exportDir, format="yaml", transfer=transfer) as export:
2348 export.saveDatasets(datasets)
2349 for path in pathsInStore:
2350 assert path is not None
2351 self.assertTrue(
2352 self.checkFileExists(exportDir, path),
2353 f"Check that mode {transfer} exported files",
2354 )
2356 def testPytypeCoercion(self) -> None:
2357 """Test python type coercion on Butler.get and put."""
2358 # Store some data with the normal example storage class.
2359 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2360 datasetTypeName = "test_metric"
2361 butler = self.runPutGetTest(storageClass, datasetTypeName)
2363 dataId = {"instrument": "DummyCamComp", "visit": 423}
2364 metric = butler.get(datasetTypeName, dataId=dataId)
2365 self.assertEqual(get_full_type_name(metric), "lsst.daf.butler.tests.MetricsExample")
2367 datasetType_ori = butler.get_dataset_type(datasetTypeName)
2368 self.assertEqual(datasetType_ori.storageClass.name, "StructuredDataNoComponents")
2370 # Now need to hack the registry dataset type definition.
2371 # There is no API for this.
2372 assert isinstance(butler._registry, SqlRegistry)
2373 manager = butler._registry._managers.datasets
2374 assert hasattr(manager, "_db") and hasattr(manager, "_static")
2375 manager._db.update(
2376 manager._static.dataset_type,
2377 {"name": datasetTypeName},
2378 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataNoComponentsModel"},
2379 )
2381 # Force reset of dataset type cache
2382 butler.registry.refresh()
2384 datasetType_new = butler.get_dataset_type(datasetTypeName)
2385 self.assertEqual(datasetType_new.name, datasetType_ori.name)
2386 self.assertEqual(datasetType_new.storageClass.name, "StructuredDataNoComponentsModel")
2388 metric_model = butler.get(datasetTypeName, dataId=dataId)
2389 self.assertNotEqual(type(metric_model), type(metric))
2390 self.assertEqual(get_full_type_name(metric_model), "lsst.daf.butler.tests.MetricsExampleModel")
2392 # Put the model and read it back to show that everything now
2393 # works as normal.
2394 metric_ref = butler.put(metric_model, datasetTypeName, dataId=dataId, visit=424)
2395 metric_model_new = butler.get(metric_ref)
2396 self.assertEqual(metric_model_new, metric_model)
2398 # Hack the storage class again to something that will fail on the
2399 # get with no conversion class.
2400 manager._db.update(
2401 manager._static.dataset_type,
2402 {"name": datasetTypeName},
2403 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataListYaml"},
2404 )
2405 butler.registry.refresh()
2407 with self.assertRaises(ValueError):
2408 butler.get(datasetTypeName, dataId=dataId)
2410 def test_provenance(self):
2411 """Test that provenance is attached on put."""
2412 run = "test_run"
2413 butler, datasetType = self.create_butler(run, "MetricsExampleModelProvenance", "prov_metric")
2414 metric = MetricsExampleModel(
2415 summary={"AM1": 5.2, "AM2": 30.6},
2416 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
2417 data=[563, 234, 456.7, 752, 8, 9, 27],
2418 )
2419 # Provenance can be attached to the object being put. Whether
2420 # it is or not is dependent on the formatter. For this test we
2421 # copy on adding provenance to ensure they differ.
2422 self.assertIsNone(metric.dataset_id)
2423 metric_ref = butler.put(metric, datasetType, visit=424, instrument="DummyCamComp")
2424 self.assertIsNone(metric.dataset_id)
2425 metric_2 = butler.get(metric_ref)
2426 self.assertEqual(metric_2.data, metric.data)
2427 self.assertEqual(metric_2.dataset_id, metric_ref.id)
2428 self.assertIsNone(metric_2.provenance)
2430 # Put with provenance.
2431 prov = DatasetProvenance(quantum_id=uuid.uuid4())
2432 prov.add_input(metric_ref)
2433 prov.add_extra_provenance(metric_ref.id, {"answer": 42})
2434 metric_ref2 = butler.put(metric, datasetType, visit=423, instrument="DummyCamComp", provenance=prov)
2435 metric_3 = butler.get(metric_ref2)
2436 self.assertEqual(metric_3.provenance, prov)
2438 # Check that we can extract provenance from dict form.
2439 prov_dict = prov.to_flat_dict(metric_ref2)
2440 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler)
2441 self.assertEqual(ref_from_prov, metric_ref2)
2442 # Direct __eq__ of the provenance does not work because one side
2443 # includes dimension records.
2444 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs})
2445 self.assertEqual(prov_from_prov.quantum_id, prov.quantum_id)
2446 self.assertEqual(prov_from_prov.extras, prov.extras)
2448 # Force a bad ID into the dict.
2449 prov_dict["id"] = uuid.uuid4()
2450 with self.assertRaises(ValueError):
2451 DatasetProvenance.from_flat_dict(prov_dict, butler)
2452 del prov_dict["id"]
2453 prov_dict["input 0 id"] = uuid.uuid4()
2454 with self.assertRaises(ValueError):
2455 DatasetProvenance.from_flat_dict(prov_dict, butler)
2457 # Check that simple types can be reconstructed with non-standard
2458 # separators.
2459 prov_dict = prov.to_flat_dict(metric_ref2, prefix="XYZ", sep="😎", simple_types=True)
2460 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler)
2461 self.assertEqual(ref_from_prov, metric_ref2)
2462 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs})
2464 with self.assertRaises(ValueError):
2465 DatasetProvenance.from_flat_dict({"unknown": 42}, butler)
2467 def test_specialized_file_datasets_functions(self):
2468 """Test a workflow used in Prompt Processing where we export datasets
2469 from one repository and write them in-place to the datastore of
2470 another, without immediately inserting registry entries for the
2471 datasets.
2472 """
2473 repo = MetricTestRepo.create_from_butler(
2474 self.create_empty_butler(writeable=True),
2475 self.tmpConfigFile,
2476 "StructuredCompositeReadCompNoDisassembly",
2477 )
2478 source_butler = repo.butler
2480 # Test writing outputs to a FileDatastore.
2481 with tempfile.TemporaryDirectory() as tempdir:
2482 target_repo_config = Butler.makeRepo(tempdir)
2483 refs = [repo.ref1, repo.ref2]
2484 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs)
2485 self.assertEqual(len(datasets), 2)
2486 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets})
2487 for dataset in datasets:
2488 path = ResourcePath(dataset.path, forceAbsolute=False)
2489 # Paths should be relative paths to the target datastore.
2490 self.assertFalse(path.isabs())
2491 # Files should have been copied into the target datastore
2492 self.assertTrue(ResourcePath(tempdir).join(path).exists())
2494 # Make sure the target Butler can ingest the datasets.
2495 target_butler = Butler(target_repo_config, writeable=True)
2496 self.enterContext(target_butler)
2497 target_butler.transfer_dimension_records_from(source_butler, refs)
2498 target_butler.ingest(*datasets, transfer=None)
2499 self.assertIsNotNone(target_butler.get(repo.ref1))
2500 self.assertIsNotNone(target_butler.get(repo.ref2))
2502 # Giving an empty list of files is a no-op.
2503 no_datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), [])
2504 self.assertEqual(len(no_datasets), 0)
2506 # Test writing outputs to a ChainedDatastore.
2507 with tempfile.TemporaryDirectory() as tempdir:
2508 # Set up a second dataset type, so we can split the files across
2509 # multiple datastore roots.
2510 dt1 = repo.datasetType
2511 dt2 = DatasetType("other", dt1.dimensions, dt1.storageClass)
2512 source_butler.registry.registerDatasetType(dt2)
2513 other_ref = repo.addDataset(repo.ref1.dataId, datasetType=dt2)
2514 config = Config.fromString(
2515 f"""
2516 datastore:
2517 cls: lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore
2518 datastore_constraints:
2519 - constraints:
2520 accept:
2521 - {dt1.name}
2522 - constraints:
2523 accept:
2524 - {dt2.name}
2525 datastores:
2526 - datastore:
2527 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
2528 root: <butlerRoot>/FileDatastore_0
2529 - datastore:
2530 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
2531 root: <butlerRoot>/FileDatastore_1
2532 """
2533 )
2534 target_repo_config = Butler.makeRepo(tempdir, config)
2535 refs = [repo.ref1, repo.ref2, other_ref]
2536 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs)
2537 self.assertEqual(len(datasets), 3)
2538 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets})
2539 for dataset in datasets:
2540 path = ResourcePath(dataset.path, forceAbsolute=False)
2541 # Paths should be relative paths to the target datastore.
2542 self.assertFalse(path.isabs())
2543 # Files should have been split up between the two datastores
2544 # in the chain.
2545 datastore_root = ResourcePath(tempdir)
2546 if dataset.refs[0].datasetType.name == dt1.name:
2547 datastore_root = datastore_root.join("FileDatastore_0")
2548 else:
2549 datastore_root = datastore_root.join("FileDatastore_1")
2550 self.assertTrue(datastore_root.join(path).exists())
2552 # Make sure the target Butler can ingest the datasets.
2553 target_butler = Butler(target_repo_config, writeable=True)
2554 self.enterContext(target_butler)
2555 target_butler.transfer_dimension_records_from(source_butler, refs)
2556 target_butler.ingest(*datasets, transfer=None)
2557 self.assertIsNotNone(target_butler.get(repo.ref1))
2558 self.assertIsNotNone(target_butler.get(repo.ref2))
2559 self.assertIsNotNone(target_butler.get(other_ref))
2561 def test_temporary_for_ingest(self) -> None:
2562 """Test the `lsst.daf.butler._rubin.ingest_from_temporary` module."""
2563 with self.create_empty_butler("example_run") as butler:
2564 dataset_type = DatasetType("example", butler.dimensions.empty, "StructuredDataDict")
2565 butler.registry.registerDatasetType(dataset_type)
2566 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(butler.dimensions), "example_run")
2567 with TemporaryForIngest(butler, ref) as temporary:
2568 temporary.path.write(b"three: 3")
2569 found = TemporaryForIngest.find_orphaned_temporaries_by_ref(ref, butler)
2570 self.assertEqual(found, [temporary.path])
2571 self.assertIn(".tmp", temporary.ospath)
2572 temporary.ingest()
2573 loaded = butler.get(ref)
2574 self.assertEqual(loaded, {"three": 3})
2577class PostgresPosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2578 """PosixDatastore specialization of a butler using Postgres"""
2580 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2581 fullConfigKey = ".datastore.formatters"
2582 validationCanFail = True
2583 datastoreStr = ["/tmp"]
2584 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"]
2585 registryStr = "PostgreSQL@test"
2587 @classmethod
2588 def setUpClass(cls) -> None:
2589 cls.postgresql = cls.enterClassContext(setup_postgres_test_db())
2590 super().setUpClass()
2592 def setUp(self) -> None:
2593 # Need to add a registry section to the config.
2594 self._temp_config = False
2595 config = Config(self.configFile)
2596 self.postgresql.patch_butler_config(config)
2597 with tempfile.NamedTemporaryFile("w", suffix=".yaml", delete=False) as fh:
2598 config.dump(fh)
2599 self.configFile = fh.name
2600 self._temp_config = True
2601 super().setUp()
2603 def tearDown(self) -> None:
2604 if self._temp_config and os.path.exists(self.configFile):
2605 os.remove(self.configFile)
2606 super().tearDown()
2608 def testMakeRepo(self) -> None:
2609 # The base class test assumes that it's using sqlite and assumes
2610 # the config file is acceptable to sqlite.
2611 raise unittest.SkipTest("Postgres config is not compatible with this test.")
2614class ClonedPostgresPosixDatastoreButlerTestCase(PostgresPosixDatastoreButlerTestCase, unittest.TestCase):
2615 """Test that Butler with a Postgres registry still works after cloning."""
2617 def create_butler(
2618 self,
2619 run: str,
2620 storageClass: StorageClass | str,
2621 datasetTypeName: str,
2622 metrics: ButlerMetrics | None = None,
2623 ) -> tuple[DirectButler, DatasetType]:
2624 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics)
2625 return butler.clone(run=run, metrics=metrics), datasetType
2628class InMemoryDatastoreButlerTestCase(ButlerTests, unittest.TestCase):
2629 """InMemoryDatastore specialization of a butler"""
2631 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
2632 fullConfigKey = None
2633 useTempRoot = False
2634 validationCanFail = False
2635 datastoreStr = ["datastore='InMemory"]
2636 datastoreName = ["InMemoryDatastore@"]
2637 registryStr = "/gen3.sqlite3"
2639 def testIngest(self) -> None:
2640 pass
2642 def test_ingest_zip(self) -> None:
2643 pass
2646class ClonedSqliteButlerTestCase(InMemoryDatastoreButlerTestCase, unittest.TestCase):
2647 """Test that a Butler with a Sqlite registry still works after cloning."""
2649 def create_butler(
2650 self,
2651 run: str,
2652 storageClass: StorageClass | str,
2653 datasetTypeName: str,
2654 metrics: ButlerMetrics | None = None,
2655 ) -> tuple[DirectButler, DatasetType]:
2656 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics)
2657 return butler.clone(run=run), datasetType
2660class ChainedDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2661 """PosixDatastore specialization"""
2663 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml")
2664 fullConfigKey = ".datastore.datastores.1.formatters"
2665 validationCanFail = True
2666 datastoreStr = ["datastore='InMemory", "/FileDatastore_1/,", "/FileDatastore_2/'"]
2667 datastoreName = [
2668 "InMemoryDatastore@",
2669 f"FileDatastore@{BUTLER_ROOT_TAG}/FileDatastore_1",
2670 "SecondDatastore",
2671 ]
2672 registryStr = "/gen3.sqlite3"
2674 def testPruneDatasets(self) -> None:
2675 # This test relies on manipulating files out-of-band, which is
2676 # impossible for this configuration because of the InMemoryDatastore in
2677 # the ChainedDatastore.
2678 pass
2681class ButlerExplicitRootTestCase(PosixDatastoreButlerTestCase):
2682 """Test that a yaml file in one location can refer to a root in another."""
2684 datastoreStr = ["dir1"]
2685 # Disable the makeRepo test since we are deliberately not using
2686 # butler.yaml as the config name.
2687 fullConfigKey = None
2689 def setUp(self) -> None:
2690 self.root = makeTestTempDir(TESTDIR)
2692 # Make a new repository in one place
2693 self.dir1 = os.path.join(self.root, "dir1")
2694 Butler.makeRepo(self.dir1, config=Config(self.configFile))
2696 # Move the yaml file to a different place and add a "root"
2697 self.dir2 = os.path.join(self.root, "dir2")
2698 os.makedirs(self.dir2, exist_ok=True)
2699 configFile1 = os.path.join(self.dir1, "butler.yaml")
2700 config = Config(configFile1)
2701 config["root"] = self.dir1
2702 configFile2 = os.path.join(self.dir2, "butler2.yaml")
2703 config.dumpToUri(configFile2)
2704 os.remove(configFile1)
2705 self.tmpConfigFile = configFile2
2707 def testFileLocations(self) -> None:
2708 self.assertNotEqual(self.dir1, self.dir2)
2709 self.assertTrue(os.path.exists(os.path.join(self.dir2, "butler2.yaml")))
2710 self.assertFalse(os.path.exists(os.path.join(self.dir1, "butler.yaml")))
2711 self.assertTrue(os.path.exists(os.path.join(self.dir1, "gen3.sqlite3")))
2714class ButlerMakeRepoOutfileTestCase(ButlerPutGetTests, unittest.TestCase):
2715 """Test that a config file created by makeRepo outside of repo works."""
2717 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2719 def setUp(self) -> None:
2720 self.root = makeTestTempDir(TESTDIR)
2721 self.root2 = makeTestTempDir(TESTDIR)
2723 self.tmpConfigFile = os.path.join(self.root2, "different.yaml")
2724 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2726 def tearDown(self) -> None:
2727 if os.path.exists(self.root2):
2728 shutil.rmtree(self.root2, ignore_errors=True)
2729 super().tearDown()
2731 def testConfigExistence(self) -> None:
2732 c = Config(self.tmpConfigFile)
2733 uri_config = ResourcePath(c["root"])
2734 uri_expected = ResourcePath(self.root, forceDirectory=True)
2735 self.assertEqual(uri_config.geturl(), uri_expected.geturl())
2736 self.assertNotIn(":", uri_config.path, "Check for URI concatenated with normal path")
2738 def testPutGet(self) -> None:
2739 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2740 self.runPutGetTest(storageClass, "test_metric")
2743class ButlerMakeRepoOutfileDirTestCase(ButlerMakeRepoOutfileTestCase):
2744 """Test that a config file created by makeRepo outside of repo works."""
2746 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2748 def setUp(self) -> None:
2749 self.root = makeTestTempDir(TESTDIR)
2750 self.root2 = makeTestTempDir(TESTDIR)
2752 self.tmpConfigFile = self.root2
2753 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2755 def testConfigExistence(self) -> None:
2756 # Append the yaml file else Config constructor does not know the file
2757 # type.
2758 self.tmpConfigFile = os.path.join(self.tmpConfigFile, "butler.yaml")
2759 super().testConfigExistence()
2762class ButlerMakeRepoOutfileUriTestCase(ButlerMakeRepoOutfileTestCase):
2763 """Test that a config file created by makeRepo outside of repo works."""
2765 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2767 def setUp(self) -> None:
2768 self.root = makeTestTempDir(TESTDIR)
2769 self.root2 = makeTestTempDir(TESTDIR)
2771 self.tmpConfigFile = ResourcePath(os.path.join(self.root2, "something.yaml")).geturl()
2772 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2775@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!")
2776class S3DatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2777 """S3Datastore specialization of a butler; an S3 storage Datastore +
2778 a local in-memory SqlRegistry.
2779 """
2781 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml")
2782 fullConfigKey = None
2783 validationCanFail = True
2785 bucketName = "anybucketname"
2786 """Name of the Bucket that will be used in the tests. The name is read from
2787 the config file used with the tests during set-up.
2788 """
2790 root = "butlerRoot/"
2791 """Root repository directory expected to be used in case useTempRoot=False.
2792 Otherwise the root is set to a 20 characters long randomly generated string
2793 during set-up.
2794 """
2796 datastoreStr = [f"datastore={root}"]
2797 """Contains all expected root locations in a format expected to be
2798 returned by Butler stringification.
2799 """
2801 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"]
2802 """The expected format of the S3 Datastore string."""
2804 registryStr = "/gen3.sqlite3"
2805 """Expected format of the Registry string."""
2807 mock_aws = mock_aws()
2808 """The mocked s3 interface from moto."""
2810 def genRoot(self) -> str:
2811 """Return a random string of len 20 to serve as a root
2812 name for the temporary bucket repo.
2814 This is equivalent to tempfile.mkdtemp as this is what self.root
2815 becomes when useTempRoot is True.
2816 """
2817 rndstr = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20))
2818 return rndstr + "/"
2820 def setUp(self) -> None:
2821 config = Config(self.configFile)
2822 uri = ResourcePath(config[".datastore.datastore.root"])
2823 self.bucketName = uri.netloc
2825 # Enable S3 mocking of tests.
2826 self.enterContext(clean_test_environment_for_s3())
2827 self.mock_aws.start()
2829 if self.useTempRoot:
2830 self.root = self.genRoot()
2831 rooturi = f"s3://{self.bucketName}/{self.root}"
2832 config.update({"datastore": {"datastore": {"root": rooturi}}})
2834 # need local folder to store registry database
2835 self.reg_dir = makeTestTempDir(TESTDIR)
2836 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3"
2838 # MOTO needs to know that we expect Bucket bucketname to exist
2839 # (this used to be the class attribute bucketName)
2840 s3 = boto3.resource("s3")
2841 s3.create_bucket(Bucket=self.bucketName)
2843 self.datastoreStr = [f"datastore='{rooturi}'"]
2844 self.datastoreName = [f"FileDatastore@{rooturi}"]
2845 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False)
2846 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml")
2848 def tearDown(self) -> None:
2849 s3 = boto3.resource("s3")
2850 bucket = s3.Bucket(self.bucketName)
2851 try:
2852 bucket.objects.all().delete()
2853 except botocore.exceptions.ClientError as e:
2854 if e.response["Error"]["Code"] == "404":
2855 # the key was not reachable - pass
2856 pass
2857 else:
2858 raise
2860 bucket = s3.Bucket(self.bucketName)
2861 bucket.delete()
2863 # Stop the S3 mock.
2864 self.mock_aws.stop()
2866 if self.reg_dir is not None and os.path.exists(self.reg_dir):
2867 shutil.rmtree(self.reg_dir, ignore_errors=True)
2869 if self.useTempRoot and os.path.exists(self.root):
2870 shutil.rmtree(self.root, ignore_errors=True)
2872 super().tearDown()
2875class DatastoreTransfers(TestCaseMixin):
2876 """Base test setup for data transfers between butlers. The concrete tests
2877 for specific configurations are in other classes, below.
2878 """
2880 storageClassFactory: StorageClassFactory
2882 @classmethod
2883 def setUpClass(cls) -> None:
2884 cls.storageClassFactory = StorageClassFactory()
2886 def setUp(self) -> None:
2887 self.root = makeTestTempDir(TESTDIR)
2888 self.config = Config(self.configFile)
2890 # Some tests cause convertors to be replaced so ensure
2891 # the storage class factory is reset each time.
2892 self.storageClassFactory.reset()
2893 self.storageClassFactory.addFromConfig(self.configFile)
2895 def tearDown(self) -> None:
2896 removeTestTempDir(self.root)
2898 def create_butler(self, manager: str | None, label: str, config_file: str | None = None) -> Butler:
2899 if manager is None:
2900 manager = (
2901 "lsst.daf.butler.registry.datasets.byDimensions.ByDimensionsDatasetRecordStorageManagerUUID"
2902 )
2903 config = Config(config_file if config_file is not None else self.configFile)
2904 config["registry", "managers", "datasets"] = manager
2905 butler = Butler.from_config(
2906 Butler.makeRepo(f"{self.root}/butler{label}", config=config), writeable=True
2907 )
2908 self.enterContext(butler)
2909 return butler
2911 def assertButlerTransfers(
2912 self,
2913 purge: bool = False,
2914 storageClassName: str = "StructuredData",
2915 storageClassNameTarget: str | None = None,
2916 ) -> None:
2917 """Test that a run can be transferred to another butler."""
2918 storageClass = self.storageClassFactory.getStorageClass(storageClassName)
2919 if storageClassNameTarget is not None:
2920 storageClassTarget = self.storageClassFactory.getStorageClass(storageClassNameTarget)
2921 else:
2922 storageClassTarget = storageClass
2924 datasetTypeName = "random_data"
2926 # Test will create 3 collections and we will want to transfer
2927 # two of those three.
2928 runs = ["run1", "run2", "other"]
2930 # Also want to use two different dataset types to ensure that
2931 # grouping works.
2932 datasetTypeNames = ["random_data", "random_data_2"]
2934 # Create the run collections in the source butler.
2935 for run in runs:
2936 self.source_butler.collections.register(run)
2938 # Create dimensions in source butler.
2939 n_exposures = 30
2940 self.source_butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
2941 self.source_butler.registry.insertDimensionData(
2942 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
2943 )
2944 self.source_butler.registry.insertDimensionData(
2945 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
2946 )
2947 self.source_butler.registry.insertDimensionData(
2948 "day_obs",
2949 {
2950 "instrument": "DummyCamComp",
2951 "id": 20250101,
2952 },
2953 )
2955 for i in range(n_exposures):
2956 self.source_butler.registry.insertDimensionData(
2957 "group", {"instrument": "DummyCamComp", "name": f"group{i}"}
2958 )
2959 self.source_butler.registry.insertDimensionData(
2960 "exposure",
2961 {
2962 "instrument": "DummyCamComp",
2963 "id": i,
2964 "obs_id": f"exp{i}",
2965 "physical_filter": "d-r",
2966 "group": f"group{i}",
2967 "day_obs": 20250101,
2968 },
2969 )
2971 # Create dataset types in the source butler.
2972 dimensions = self.source_butler.dimensions.conform(["instrument", "exposure"])
2973 for datasetTypeName in datasetTypeNames:
2974 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
2975 self.source_butler.registry.registerDatasetType(datasetType)
2977 # Write a dataset to an unrelated run -- this will ensure that
2978 # we are rewriting integer dataset ids in the target if necessary.
2979 # Will not be relevant for UUID.
2980 run = "distraction"
2981 butler = Butler.from_config(butler=self.source_butler, run=run)
2982 self.enterContext(butler)
2983 butler.put(
2984 makeExampleMetrics(),
2985 datasetTypeName,
2986 exposure=1,
2987 instrument="DummyCamComp",
2988 physical_filter="d-r",
2989 )
2991 # Write some example metrics to the source
2992 butler = Butler.from_config(butler=self.source_butler)
2993 self.enterContext(butler)
2995 # Set of DatasetRefs that should be in the list of refs to transfer
2996 # but which will not be transferred.
2997 deleted: set[DatasetRef] = set()
2999 n_expected = 20 # Number of datasets expected to be transferred
3000 source_refs = []
3001 for i in range(n_exposures):
3002 # Put a third of datasets into each collection, only retain
3003 # two thirds.
3004 index = i % 3
3005 run = runs[index]
3006 datasetTypeName = datasetTypeNames[i % 2]
3008 metric = MetricsExample(
3009 summary={"counter": i}, output={"text": "metric"}, data=[2 * x for x in range(i)]
3010 )
3011 dataId = {"exposure": i, "instrument": "DummyCamComp", "physical_filter": "d-r"}
3012 ref = butler.put(metric, datasetTypeName, dataId=dataId, run=run)
3014 # Remove the datastore record using low-level API, but only
3015 # for a specific index.
3016 if purge and index == 1:
3017 # For one of these delete the file as well.
3018 # This allows the "missing" code to filter the
3019 # file out.
3020 # Access the individual datastores.
3021 datastores = []
3022 if hasattr(butler._datastore, "datastores"):
3023 datastores.extend(butler._datastore.datastores)
3024 else:
3025 datastores.append(butler._datastore)
3027 if not deleted:
3028 # For a chained datastore we need to remove
3029 # files in each chain.
3030 for datastore in datastores:
3031 # The file might not be known to the datastore
3032 # if constraints are used.
3033 try:
3034 primary, uris = datastore.getURIs(ref)
3035 except FileNotFoundError:
3036 continue
3037 if primary and primary.scheme != "mem":
3038 primary.remove()
3039 for uri in uris.values():
3040 if uri.scheme != "mem":
3041 uri.remove()
3042 n_expected -= 1
3043 deleted.add(ref)
3045 # Remove the datastore record.
3046 for datastore in datastores:
3047 if hasattr(datastore, "removeStoredItemInfo"):
3048 datastore.removeStoredItemInfo(ref)
3050 if index < 2:
3051 source_refs.append(ref)
3052 if ref not in deleted:
3053 new_metric = butler.get(ref)
3054 self.assertEqual(new_metric, metric)
3056 # Create some bad dataset types to ensure we check for inconsistent
3057 # definitions.
3058 badStorageClass = self.storageClassFactory.getStorageClass("StructuredDataList")
3059 for datasetTypeName in datasetTypeNames:
3060 datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass)
3061 self.target_butler.registry.registerDatasetType(datasetType)
3062 with self.assertRaises(ConflictingDefinitionError) as cm:
3063 self.target_butler.transfer_from(self.source_butler, source_refs)
3064 self.assertIn("dataset type differs", str(cm.exception))
3066 # And remove the bad definitions.
3067 for datasetTypeName in datasetTypeNames:
3068 self.target_butler.registry.removeDatasetType(datasetTypeName)
3070 # Transfer without creating dataset types should fail.
3071 with self.assertRaises(KeyError):
3072 self.target_butler.transfer_from(self.source_butler, source_refs)
3074 # Transfer without creating dimensions should fail.
3075 with self.assertRaises(ConflictingDefinitionError) as cm:
3076 self.target_butler.transfer_from(self.source_butler, source_refs, register_dataset_types=True)
3077 self.assertIn("dimension", str(cm.exception))
3079 # The dry run test requires dataset types to exist. If we have
3080 # been given distinct storage classes for the target we have
3081 # to redefine at least one of the dataset types in the target butler.
3082 if storageClass != storageClassTarget:
3083 self.target_butler.registry.removeDatasetType(datasetTypeNames[0])
3084 datasetType = DatasetType(datasetTypeNames[0], dimensions, storageClassTarget)
3085 self.target_butler.registry.registerDatasetType(datasetType)
3087 # The failed transfer above leaves registry in an inconsistent
3088 # state because the run is created but then rolled back without
3089 # the collection cache being cleared. For now force a refresh.
3090 # Can remove with DM-35498.
3091 self.target_butler.registry.refresh()
3093 # Do a dry run -- this should not have any effect on the target butler.
3094 self.target_butler.transfer_from(self.source_butler, source_refs, dry_run=True)
3096 # Transfer the records for one ref to test the alternative API.
3097 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
3098 self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]])
3099 self.assertIn("number of records transferred: 1", ";".join(log_cm.output))
3101 # Now transfer them to the second butler, including dimensions.
3102 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
3103 transferred = self.target_butler.transfer_from(
3104 self.source_butler,
3105 source_refs,
3106 register_dataset_types=True,
3107 transfer_dimensions=True,
3108 )
3109 self.assertEqual(len(transferred), n_expected)
3110 log_output = ";".join(log_cm.output)
3112 # A ChainedDatastore will use the in-memory datastore for mexists
3113 # so we can not rely on the mexists log message.
3114 self.assertIn("Number of datastore records found in source", log_output)
3115 self.assertIn("Creating output run", log_output)
3117 # Do the transfer twice to ensure that it will do nothing extra.
3118 # Only do this if purge=True because it does not work for int
3119 # dataset_id.
3120 if purge:
3121 # This should not need to register dataset types.
3122 transferred = self.target_butler.transfer_from(self.source_butler, source_refs)
3123 self.assertEqual(len(transferred), n_expected)
3125 with self.assertRaises((TypeError, AttributeError)):
3126 self.target_butler._datastore.transfer_from(self.source_butler, source_refs) # type: ignore
3128 with self.assertRaises(ValueError):
3129 self.target_butler._datastore.transfer_from(
3130 self.source_butler._datastore, source_refs, transfer="split"
3131 )
3133 # Now try to get the same refs from the new butler.
3134 for ref in source_refs:
3135 if ref not in deleted:
3136 new_metric = self.target_butler.get(ref)
3137 old_metric = self.source_butler.get(ref)
3138 self.assertEqual(new_metric, old_metric)
3140 # Try again without implicit storage class conversion
3141 # triggered by using the source ref. This will do conversion
3142 # since the formatter will be returning the source python type.
3143 target_ref = self.target_butler.get_dataset(ref.id)
3144 if target_ref.datasetType.storageClass != ref.datasetType.storageClass:
3145 new_metric = self.target_butler.get(target_ref)
3146 self.assertNotEqual(type(new_metric), type(old_metric))
3148 # Remove the dataset from the target and put it again
3149 # as if it was the right type all along for this butler.
3150 self.target_butler.pruneDatasets(
3151 [target_ref], unstore=True, purge=True, disassociate=True
3152 )
3153 self.target_butler.put(new_metric, target_ref)
3154 new_new_metric = self.target_butler.get(target_ref)
3155 new_old_metric = self.target_butler.get(
3156 target_ref, storageClass=ref.datasetType.storageClass
3157 )
3158 self.assertEqual(new_new_metric, new_metric)
3159 self.assertEqual(new_old_metric, old_metric)
3161 # Now prune run2 collection and create instead a CHAINED collection.
3162 # This should block the transfer.
3163 self.target_butler.removeRuns(["run2"])
3164 self.target_butler.collections.register("run2", CollectionType.CHAINED)
3165 with self.assertRaises(CollectionTypeError):
3166 # Re-importing the run1 datasets can be problematic if they
3167 # use integer IDs so filter those out.
3168 to_transfer = [ref for ref in source_refs if ref.run == "run2"]
3169 self.target_butler.transfer_from(self.source_butler, to_transfer)
3172class PosixDatastoreTransfers(DatastoreTransfers, unittest.TestCase):
3173 """Test data transfers between butlers.
3175 Test for different managers. UUID to UUID and integer to integer are
3176 tested. UUID to integer is not supported since we do not currently
3177 want to allow that. Integer to UUID is supported with the caveat
3178 that UUID4 will be generated and this will be incorrect for raw
3179 dataset types. The test ignores that.
3180 """
3182 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3184 def create_butlers(
3185 self, manager1: str | None = None, manager2: str | None = None, source_config: str | None = None
3186 ) -> None:
3187 self.source_butler = self.create_butler(manager1, "1", config_file=source_config)
3188 self.target_butler = self.create_butler(manager2, "2")
3190 def testTransferUuidToUuid(self) -> None:
3191 self.create_butlers()
3192 self.assertButlerTransfers()
3194 def testTransferFromChainedUuidToUuid(self) -> None:
3195 """Force the source butler to be a ChainedDatastore."""
3196 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-chained.yaml"))
3197 self.assertButlerTransfers()
3199 def testTransferFromIncompatibleUuidToUuid(self) -> None:
3200 """Force the source butler to be a incompatible datastore."""
3201 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml"))
3202 with self.assertRaises(NotImplementedError):
3203 self.assertButlerTransfers()
3205 def testTransferFromIncompatibleChainUuidToUuid(self) -> None:
3206 """Force the source butler to be a incompatible datastore."""
3207 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory-chain.yaml"))
3208 with self.assertRaises(TypeError):
3209 self.assertButlerTransfers()
3211 def testTransferFromFileUuidToUuid(self) -> None:
3212 """Force the source butler to be a FileDatastore."""
3213 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler.yaml"))
3214 self.assertButlerTransfers()
3216 def testTransferMissing(self) -> None:
3217 """Test transfers where datastore records are missing.
3219 This is how execution butler works.
3220 """
3221 self.create_butlers()
3223 # Configure the source butler to allow trust.
3224 self.source_butler._datastore._set_trust_mode(True)
3226 self.assertButlerTransfers(purge=True)
3228 def testTransferMissingDisassembly(self) -> None:
3229 """Test transfers where datastore records are missing.
3231 This is how execution butler works.
3232 """
3233 self.create_butlers()
3235 # Configure the source butler to allow trust.
3236 self.source_butler._datastore._set_trust_mode(True)
3238 # Test disassembly.
3239 self.assertButlerTransfers(purge=True, storageClassName="StructuredComposite")
3241 def testTransferDifferingStorageClasses(self) -> None:
3242 """Test transfers when the source butler dataset type has a different
3243 but compatible storage class.
3244 """
3245 self.create_butlers()
3247 self.assertButlerTransfers(storageClassNameTarget="MetricsConversion")
3249 def testTransferDifferingStorageClassesDisassembly(self) -> None:
3250 """Test transfers when the source butler dataset type has a different
3251 but compatible storage class and where the source butler has
3252 disassembled.
3253 """
3254 self.create_butlers()
3256 self.assertButlerTransfers(
3257 storageClassName="StructuredComposite", storageClassNameTarget="MetricsConversion"
3258 )
3260 def testUnsafeDirectTransfer(self) -> None:
3261 """Test that transfer='unsafe_direct' records the absolute URI of
3262 source files in the target datastore.
3263 """
3264 self.create_butlers()
3265 dataset_type = DatasetType("dt", [], "int", universe=self.source_butler.dimensions)
3266 self.source_butler.registry.registerDatasetType(dataset_type)
3267 self.source_butler.collections.register("run")
3268 ref = self.source_butler.put(123, "dt", [], run="run")
3269 self.target_butler.transfer_from(
3270 self.source_butler, [ref], transfer="unsafe_direct", register_dataset_types=True
3271 )
3272 self.assertEqual(self.target_butler.get(ref), 123)
3273 self.assertEqual(self.source_butler.getURI(ref), self.target_butler.getURI(ref))
3275 def testAbsoluteURITransferDirect(self) -> None:
3276 """Test transfer using an absolute URI."""
3277 self._absolute_transfer("auto")
3279 def testAbsoluteURITransferUnsafeDirect(self) -> None:
3280 """Test transfer using an absolute URI."""
3281 self._absolute_transfer("unsafe_direct")
3283 def testAbsoluteURITransferCopy(self) -> None:
3284 """Test transfer using an absolute URI."""
3285 self._absolute_transfer("copy")
3287 def _absolute_transfer(self, transfer: str) -> None:
3288 self.create_butlers()
3290 storageClassName = "StructuredData"
3291 storageClass = self.storageClassFactory.getStorageClass(storageClassName)
3292 datasetTypeName = "random_data"
3293 run = "run1"
3294 self.source_butler.collections.register(run)
3296 dimensions = self.source_butler.dimensions.conform(())
3297 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
3298 self.source_butler.registry.registerDatasetType(datasetType)
3300 metrics = makeExampleMetrics()
3301 with ResourcePath.temporary_uri(suffix=".json") as temp:
3302 dataId = DataCoordinate.make_empty(self.source_butler.dimensions)
3303 source_refs = [DatasetRef(datasetType, dataId, run=run)]
3304 temp.write(json.dumps(metrics.exportAsDict()).encode())
3305 dataset = FileDataset(path=temp, refs=source_refs)
3306 self.source_butler.ingest(dataset, transfer="direct")
3308 self.target_butler.transfer_from(
3309 self.source_butler, dataset.refs, register_dataset_types=True, transfer=transfer
3310 )
3312 uri = self.target_butler.getURI(dataset.refs[0])
3313 if transfer == "auto" or transfer == "unsafe_direct":
3314 self.assertEqual(uri, temp)
3315 else:
3316 self.assertNotEqual(uri, temp)
3318 def test_shared_dimension_group(self):
3319 """Test internal logic that divides dataset types by dimension group
3320 when doing registry updates.
3321 """
3322 self.create_butlers()
3323 self.source_butler.import_(filename=_get_test_data_path("base.yaml"), without_datastore=True)
3324 self.source_butler.import_(filename=_get_test_data_path("datasets.yaml"), without_datastore=True)
3326 source_butler = self.source_butler
3327 target_butler = self.target_butler
3329 # Create a dataset type with the same dimensions as the 'bias' dataset
3330 # type from base.yaml
3331 dataset_type = DatasetType(
3332 "test_type", ["instrument", "detector"], "int", universe=source_butler.dimensions
3333 )
3334 source_butler.registry.registerDatasetType(dataset_type)
3335 # This has the same data ID as one of the bias datasets in
3336 # datasets.yaml.
3337 test_ref = source_butler.registry.insertDatasets(
3338 "test_type", [{"instrument": "Cam1", "detector": 2}], run="imported_g"
3339 )[0]
3341 biases = source_butler.query_datasets("bias", ["imported_g", "imported_r"])
3342 flats = source_butler.query_datasets("flat", ["imported_g", "imported_r"])
3343 refs = [test_ref, *biases, *flats]
3345 # Test setup will be even more convoluted if we want the datastore to
3346 # actually transfer files. For testing the dimension group behavior,
3347 # we really only care about the registry.
3348 with unittest.mock.patch.object(target_butler._datastore, "transfer_from") as mock:
3349 mock.return_value = (set(refs), set())
3350 target_butler.transfer_from(
3351 source_butler,
3352 refs,
3353 transfer=None,
3354 register_dataset_types=True,
3355 skip_missing=False,
3356 transfer_dimensions=True,
3357 )
3359 transferred_test_ref = target_butler.find_dataset(
3360 "test_type", {"instrument": "Cam1", "detector": 2}, collections="imported_g"
3361 )
3362 self.assertEqual(transferred_test_ref.id, test_ref.id)
3364 transferred_bias = target_butler.find_dataset(
3365 "bias", {"instrument": "Cam1", "detector": 2}, collections="imported_g"
3366 )
3367 self.assertEqual(transferred_bias.id, uuid.UUID("51352db4-a47a-447c-b12d-a50b206b17cd"))
3369 transferred_flat = target_butler.find_dataset(
3370 "flat",
3371 {"instrument": "Cam1", "detector": 2, "physical_filter": "Cam1-R1", "band": "r"},
3372 collections="imported_r",
3373 )
3374 self.assertEqual(transferred_flat.id, uuid.UUID("c1296796-56c5-4acf-9b49-40d920c6f840"))
3377class ChainedDatastoreTransfers(PosixDatastoreTransfers):
3378 """Test transfers using a chained datastore."""
3380 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml")
3383@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3384class ButlerServerDatastoreTransfers(DatastoreTransfers, unittest.TestCase):
3385 """Test ``transfer_from`` involving Butler server."""
3387 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3389 def test_transfers_from_remote_to_direct(self) -> None:
3390 from lsst.daf.butler.remote_butler._remote_file_transfer_source import (
3391 mock_file_transfer_uris_for_unit_test,
3392 )
3394 self.target_butler = self.create_butler(None, "2")
3395 with create_test_server(TESTDIR) as server:
3396 self.source_butler = server.hybrid_butler
3398 def _remap_transfer_url(path: HttpResourcePath) -> HttpResourcePath:
3399 # The Butler server returns HTTP URIs with a domain name that
3400 # is not resolvable because there is no actual HTTP server
3401 # involved in these tests. Strip this first layer of
3402 # indirection, and return the target of the redirect instead.
3403 response = server.client.get(str(path), follow_redirects=False, headers=path._extra_headers)
3404 return ResourcePath(str(response.next_request.url))
3406 with mock_file_transfer_uris_for_unit_test(_remap_transfer_url):
3407 self.assertButlerTransfers()
3410class TransferDatasetsInPlace(unittest.TestCase):
3411 """Test behavior of transfer_datasets_in_place() specialty function used by
3412 Prompt Publication service.
3413 """
3415 def test_file_datastore(self) -> None:
3416 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3417 with (
3418 tempfile.TemporaryDirectory() as datastore_root,
3419 tempfile.TemporaryDirectory() as other_repo_root,
3420 ):
3421 config = Config(configFile)
3422 config["datastore", "datastore", "name"] = "file_datastore"
3423 Butler.makeRepo(datastore_root, config=config)
3424 config["datastore", "datastore", "root"] = datastore_root
3425 Butler.makeRepo(other_repo_root, config, forceConfigRoot=False)
3426 with (
3427 Butler(datastore_root, writeable=True) as source_butler,
3428 Butler(other_repo_root, writeable=True) as target_butler,
3429 ):
3430 self._test_transfer_datasets_in_place(source_butler, target_butler)
3432 def test_chained_datastore(self) -> None:
3433 configFile = os.path.join(TESTDIR, "config/basic/butler-chained-posix.yaml")
3434 with (
3435 tempfile.TemporaryDirectory() as datastore_root,
3436 tempfile.TemporaryDirectory() as other_repo_root,
3437 ):
3438 config = Config(configFile)
3439 config["datastore", "datastore", "datastores", 0, "datastore", "root"] = (
3440 f"{datastore_root}/butler_test_repository"
3441 )
3442 config["datastore", "datastore", "datastores", 1, "datastore", "root"] = (
3443 f"{datastore_root}/butler_test_repository2"
3444 )
3445 Butler.makeRepo(datastore_root, config=config, forceConfigRoot=False)
3446 Butler.makeRepo(other_repo_root, config=config, forceConfigRoot=False)
3447 with (
3448 Butler(datastore_root, writeable=True) as source_butler,
3449 Butler(other_repo_root, writeable=True) as target_butler,
3450 ):
3451 self._test_transfer_datasets_in_place(source_butler, target_butler)
3453 def _test_transfer_datasets_in_place(
3454 self, source_butler: DirectButler, target_butler: DirectButler
3455 ) -> None:
3456 metric_repo = MetricTestRepo.create_from_butler(
3457 source_butler,
3458 source_butler._config,
3459 )
3460 target_butler.transfer_dimension_records_from(source_butler, [metric_repo.ref1, metric_repo.ref2])
3461 # Verify that the setup was correct and the two repos have
3462 # independent registries.
3463 self.assertIsNone(target_butler.get_dataset(metric_repo.ref1.id))
3464 # Copy one dataset, and make sure we can load it from the
3465 # target repo.
3466 self.assertEqual(
3467 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]),
3468 [metric_repo.ref1],
3469 )
3470 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3471 self.assertIsNone(target_butler.get_dataset(metric_repo.ref2.id))
3472 self.assertEqual(source_butler.getURIs(metric_repo.ref1), target_butler.getURIs(metric_repo.ref1))
3473 # Trying to copy the same dataset again is a no-op.
3474 self.assertEqual(
3475 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]),
3476 [],
3477 )
3478 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3479 # A mix of existing and non-existing datasets.
3480 self.assertEqual(
3481 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1, metric_repo.ref2]),
3482 [metric_repo.ref2],
3483 )
3484 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3485 self.assertEqual(target_butler.get(metric_repo.ref2), source_butler.get(metric_repo.ref2))
3487 # For testing datastore chaining, set up a dataset that is only
3488 # accepted by one of the datastores.
3489 source_butler.registry.registerDatasetType(
3490 DatasetType("rejected_by_first", source_butler.dimensions.conform([]), "int")
3491 )
3492 source_butler.registry.registerRun("run")
3493 ref = source_butler.put(1, "rejected_by_first", dataId={}, run="run")
3494 self.assertEqual(
3495 transfer_datasets_in_place(source_butler, target_butler, [ref]),
3496 [ref],
3497 )
3498 self.assertEqual(1, target_butler.get(ref))
3501class NullDatastoreTestCase(unittest.TestCase):
3502 """Test that we can fall back to a null datastore."""
3504 # Need a good config to create the repo.
3505 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3506 storageClassFactory: StorageClassFactory
3508 @classmethod
3509 def setUpClass(cls) -> None:
3510 cls.storageClassFactory = StorageClassFactory()
3511 cls.storageClassFactory.addFromConfig(cls.configFile)
3513 def setUp(self) -> None:
3514 """Create a new butler root for each test."""
3515 self.root = makeTestTempDir(TESTDIR)
3516 Butler.makeRepo(self.root, config=Config(self.configFile))
3518 def tearDown(self) -> None:
3519 removeTestTempDir(self.root)
3521 def test_fallback(self) -> None:
3522 # Read the butler config and mess with the datastore section.
3523 config_path = os.path.join(self.root, "butler.yaml")
3524 bad_config = Config(config_path)
3525 bad_config["datastore", "cls"] = "lsst.not.a.datastore.Datastore"
3526 bad_config.dumpToUri(config_path)
3528 with self.assertRaises(RuntimeError):
3529 Butler(self.root, without_datastore=False)
3531 with self.assertRaises(RuntimeError):
3532 Butler.from_config(self.root, without_datastore=False)
3534 butler = Butler.from_config(self.root, writeable=True, without_datastore=True)
3535 self.enterContext(butler)
3536 self.assertIsInstance(butler._datastore, NullDatastore)
3538 # Check that registry is working.
3539 butler.collections.register("MYRUN")
3540 collections = butler.collections.query("*")
3541 self.assertIn("MYRUN", set(collections))
3543 # Create a ref.
3544 dimensions = butler.dimensions.conform([])
3545 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
3546 datasetTypeName = "metric"
3547 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
3548 butler.registry.registerDatasetType(datasetType)
3549 ref = DatasetRef(datasetType, {}, run="MYRUN")
3551 # Check that datastore will complain.
3552 with self.assertRaises(FileNotFoundError):
3553 butler.get(ref)
3554 with self.assertRaises(FileNotFoundError):
3555 butler.getURI(ref)
3558@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3559class ButlerServerTests(FileDatastoreButlerTests):
3560 """Test RemoteButler and Butler server."""
3562 configFile = None
3563 predictionSupported = False
3564 trustModeSupported = False
3566 postgres: TemporaryPostgresInstance | None
3568 def setUp(self):
3569 self.server_instance = self.enterContext(create_test_server(TESTDIR))
3571 def tearDown(self):
3572 pass
3574 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool:
3575 # S3 pre-signed URLs may end up with differing expiration times in the
3576 # query parameters, so ignore query parameters when comparing.
3577 return uri1.scheme == uri2.scheme and uri1.netloc == uri2.netloc and uri1.path == uri2.path
3579 def create_empty_butler(
3580 self,
3581 run: str | None = None,
3582 writeable: bool | None = None,
3583 metrics: ButlerMetrics | None = None,
3584 cleanup: bool = True,
3585 ) -> Butler:
3586 return self.server_instance.hybrid_butler.clone(run=run, metrics=metrics)
3588 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None:
3589 # Can't delete a file via S3 signed URLs, so we need to reach in
3590 # through DirectButler to delete the dataset.
3591 uri = self.server_instance.direct_butler.getURI(ref)
3592 uri.remove()
3594 def testConstructor(self):
3595 # RemoteButler constructor is tested in test_server.py and
3596 # test_remote_butler.py.
3597 pass
3599 def testDafButlerRepositories(self):
3600 # Loading of RemoteButler via repository index is tested in
3601 # test_server.py.
3602 pass
3604 def testGetDatasetTypes(self) -> None:
3605 # This is mostly a test of validateConfiguration, which is for
3606 # validating Datastore configuration and thus isn't relevant to
3607 # RemoteButler.
3608 pass
3610 def testMakeRepo(self) -> None:
3611 # Only applies to DirectButler.
3612 pass
3614 # Pickling not yet implemented for RemoteButler/HybridButler.
3615 @unittest.expectedFailure
3616 def testPickle(self) -> None:
3617 return super().testPickle()
3619 def testStringification(self) -> None:
3620 self.assertEqual(
3621 str(self.server_instance.remote_butler),
3622 "RemoteButler(https://test.example/api/butler/repo/testrepo/)",
3623 )
3625 def testTransaction(self) -> None:
3626 # Transactions will never be supported for RemoteButler.
3627 pass
3629 def testPutTemplates(self) -> None:
3630 # The Butler server instance is configured with different file naming
3631 # templates than this test is expecting.
3632 pass
3635@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3636class ButlerServerSqliteTests(ButlerServerTests, unittest.TestCase):
3637 """Tests for RemoteButler's registry shim, with a SQLite DB backing the
3638 server.
3639 """
3641 postgres = None
3644@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3645class ButlerServerPostgresTests(ButlerServerTests, unittest.TestCase):
3646 """Tests for RemoteButler's registry shim, with a Postgres DB backing the
3647 server.
3648 """
3650 @classmethod
3651 def setUpClass(cls):
3652 cls.postgres = cls.enterClassContext(setup_postgres_test_db())
3653 super().setUpClass()
3656def setup_module(module: types.ModuleType) -> None:
3657 """Set up the module for pytest."""
3658 clean_environment()
3661def _get_test_data_path(filename: str) -> ResourcePath:
3662 return ResourcePath(f"resource://lsst.daf.butler/tests/registry_data/{filename}")
3665if __name__ == "__main__":
3666 clean_environment()
3667 unittest.main()