Coverage for tests / test_butler.py: 13%
1900 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Tests for Butler."""
30from __future__ import annotations
32import json
33import logging
34import os
35import pathlib
36import pickle
37import posixpath
38import random
39import re
40import shutil
41import string
42import tempfile
43import unittest
44import unittest.mock
45import uuid
46import warnings
47import weakref
48from collections.abc import Callable, Mapping
49from typing import TYPE_CHECKING, Any, cast
51try:
52 import boto3
53 import botocore
55 from lsst.resources.s3utils import clean_test_environment_for_s3
57 try:
58 from moto import mock_aws # v5
59 except ImportError:
60 from moto import mock_s3 as mock_aws
61except ImportError:
62 boto3 = None
64 def mock_aws(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def]
65 """No-op decorator in case moto mock_aws can not be imported."""
66 return None
69import astropy.time
70from sqlalchemy.exc import IntegrityError
72from lsst.daf.butler import (
73 Butler,
74 ButlerConfig,
75 ButlerMetrics,
76 ButlerRepoIndex,
77 CollectionCycleError,
78 CollectionType,
79 Config,
80 DataCoordinate,
81 DatasetExistence,
82 DatasetNotFoundError,
83 DatasetProvenance,
84 DatasetRef,
85 DatasetType,
86 DimensionRecord,
87 FileDataset,
88 NoDefaultCollectionError,
89 StorageClassFactory,
90 ValidationError,
91 script,
92)
93from lsst.daf.butler._rubin.file_datasets import transfer_datasets_to_datastore
94from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest
95from lsst.daf.butler._rubin.transfer_datasets_in_place import transfer_datasets_in_place
96from lsst.daf.butler.datastore import NullDatastore
97from lsst.daf.butler.datastore.file_templates import FileTemplate, FileTemplateValidationError
98from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex
99from lsst.daf.butler.datastores.fileDatastore import FileDatastore
100from lsst.daf.butler.direct_butler import DirectButler
101from lsst.daf.butler.registry import (
102 CollectionError,
103 CollectionTypeError,
104 ConflictingDefinitionError,
105 DataIdValueError,
106 DatasetTypeExpressionError,
107 MissingCollectionError,
108 OrphanedRecordError,
109)
110from lsst.daf.butler.registry.sql_registry import SqlRegistry
111from lsst.daf.butler.repo_relocation import BUTLER_ROOT_TAG
112from lsst.daf.butler.tests import MetricsExample, MetricsExampleModel, MultiDetectorFormatter
113from lsst.daf.butler.tests.postgresql import TemporaryPostgresInstance, setup_postgres_test_db
114from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available
115from lsst.daf.butler.tests.utils import (
116 MetricTestRepo,
117 TestCaseMixin,
118 create_populated_sqlite_registry,
119 makeTestTempDir,
120 removeTestTempDir,
121 safeTestTempDir,
122)
123from lsst.resources import ResourcePath
124from lsst.resources.http import HttpResourcePath
125from lsst.utils import doImportType
126from lsst.utils.introspection import get_full_type_name
128if butler_server_is_available:
129 from lsst.daf.butler.tests.server import create_test_server
132if TYPE_CHECKING:
133 import types
135 from lsst.daf.butler import DimensionGroup, Registry, StorageClass
137TESTDIR = os.path.abspath(os.path.dirname(__file__))
140def clean_environment() -> None:
141 """Remove external environment variables that affect the tests."""
142 for k in ("DAF_BUTLER_REPOSITORY_INDEX",):
143 os.environ.pop(k, None)
146def makeExampleMetrics() -> MetricsExample:
147 """Return example dataset suitable for tests."""
148 return MetricsExample(
149 {"AM1": 5.2, "AM2": 30.6},
150 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
151 [563, 234, 456.7, 752, 8, 9, 27],
152 )
155class TransactionTestError(Exception):
156 """Specific error for testing transactions, to prevent misdiagnosing
157 that might otherwise occur when a standard exception is used.
158 """
160 pass
163class ButlerConfigTests(unittest.TestCase):
164 """Simple tests for ButlerConfig that are not tested in any other test
165 cases.
166 """
168 def testSearchPath(self) -> None:
169 configFile = os.path.join(TESTDIR, "config", "basic", "butler.yaml")
170 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm:
171 config1 = ButlerConfig(configFile)
172 self.assertNotIn("testConfigs", "\n".join(cm.output))
174 overrideDirectory = os.path.join(TESTDIR, "config", "testConfigs")
175 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm:
176 config2 = ButlerConfig(configFile, searchPaths=[overrideDirectory])
177 self.assertIn("testConfigs", "\n".join(cm.output))
179 key = ("datastore", "records", "table")
180 self.assertNotEqual(config1[key], config2[key])
181 self.assertEqual(config2[key], "override_record")
184class ButlerPutGetTests(TestCaseMixin):
185 """Helper method for running a suite of put/get tests from different
186 butler configurations.
187 """
189 root: str
190 default_run = "ingésτ😺"
191 storageClassFactory: StorageClassFactory
192 configFile: str | None
193 tmpConfigFile: str
195 @staticmethod
196 def addDatasetType(
197 datasetTypeName: str, dimensions: DimensionGroup, storageClass: StorageClass | str, registry: Registry
198 ) -> DatasetType:
199 """Create a DatasetType and register it"""
200 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
201 registry.registerDatasetType(datasetType)
202 return datasetType
204 @classmethod
205 def setUpClass(cls) -> None:
206 cls.storageClassFactory = StorageClassFactory()
207 if cls.configFile is not None:
208 cls.storageClassFactory.addFromConfig(cls.configFile)
210 def assertGetComponents(
211 self,
212 butler: Butler,
213 datasetRef: DatasetRef,
214 components: tuple[str, ...],
215 reference: Any,
216 collections: Any = None,
217 ) -> None:
218 datasetType = datasetRef.datasetType
219 dataId = datasetRef.dataId
220 deferred = butler.getDeferred(datasetRef)
222 for component in components:
223 compTypeName = datasetType.componentTypeName(component)
224 result = butler.get(compTypeName, dataId, collections=collections)
225 self.assertEqual(result, getattr(reference, component))
226 result_deferred = deferred.get(component=component)
227 self.assertEqual(result_deferred, result)
229 def tearDown(self) -> None:
230 if self.root is not None:
231 removeTestTempDir(self.root)
233 def create_empty_butler(
234 self,
235 run: str | None = None,
236 writeable: bool | None = None,
237 metrics: ButlerMetrics | None = None,
238 cleanup: bool = True,
239 ):
240 """Create a Butler for the test repository, without inserting test
241 data.
242 """
243 butler = Butler.from_config(self.tmpConfigFile, run=run, writeable=writeable, metrics=metrics)
244 if cleanup:
245 self.enterContext(butler)
246 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration"
247 return butler
249 def create_butler(
250 self,
251 run: str,
252 storageClass: StorageClass | str,
253 datasetTypeName: str,
254 metrics: ButlerMetrics | None = None,
255 ) -> tuple[Butler, DatasetType]:
256 """Create a Butler for the test repository and insert some test data
257 into it.
258 """
259 butler = self.create_empty_butler(run=run, metrics=metrics)
261 collections = set(butler.collections.query("*"))
262 self.assertEqual(collections, {run})
263 # Create and register a DatasetType
264 dimensions = butler.dimensions.conform(["instrument", "visit"])
266 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
268 # Add needed Dimensions
269 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
270 butler.registry.insertDimensionData(
271 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
272 )
273 butler.registry.insertDimensionData(
274 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"}
275 )
276 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101})
277 visit_start = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai")
278 visit_end = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai")
279 butler.registry.insertDimensionData(
280 "visit",
281 {
282 "instrument": "DummyCamComp",
283 "id": 423,
284 "name": "fourtwentythree",
285 "physical_filter": "d-r",
286 "datetime_begin": visit_start,
287 "datetime_end": visit_end,
288 "day_obs": 20200101,
289 },
290 )
292 # Add more visits for some later tests
293 for visit_id in (424, 425):
294 butler.registry.insertDimensionData(
295 "visit",
296 {
297 "instrument": "DummyCamComp",
298 "id": visit_id,
299 "name": f"fourtwentyfour_{visit_id}",
300 "physical_filter": "d-r",
301 "day_obs": 20200101,
302 },
303 )
304 return butler, datasetType
306 def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> Butler:
307 # New datasets will be added to run and tag, but we will only look in
308 # tag when looking up datasets.
309 run = self.default_run
310 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName)
311 assert butler.run is not None
313 # Create and store a dataset
314 metric = makeExampleMetrics()
315 dataId = butler.registry.expandDataId({"instrument": "DummyCamComp", "visit": 423})
317 # Dataset should not exist if we haven't added it
318 with self.assertRaises(DatasetNotFoundError):
319 butler.get(datasetTypeName, dataId)
321 # Put and remove the dataset once as a DatasetRef, once as a dataId,
322 # and once with a DatasetType
324 # Keep track of any collections we add and do not clean up
325 expected_collections = {run}
327 counter = 0
328 ref = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run="put_run_1")
329 args = tuple[DatasetRef] | tuple[str | DatasetType, DataCoordinate]
330 for args in ((ref,), (datasetTypeName, dataId), (datasetType, dataId)):
331 # Since we are using subTest we can get cascading failures
332 # here with the first attempt failing and the others failing
333 # immediately because the dataset already exists. Work around
334 # this by using a distinct run collection each time
335 counter += 1
336 this_run = f"put_run_{counter}"
337 butler.collections.register(this_run)
338 expected_collections.update({this_run})
340 with self.subTest(args=repr(args)):
341 kwargs: dict[str, Any] = {}
342 if not isinstance(args[0], DatasetRef): # type: ignore
343 kwargs["run"] = this_run
344 ref = butler.put(metric, *args, **kwargs)
345 self.assertIsInstance(ref, DatasetRef)
347 # Test get of a ref.
348 metricOut = butler.get(ref)
349 self.assertEqual(metric, metricOut)
350 # Test get
351 metricOut = butler.get(ref.datasetType.name, dataId, collections=this_run)
352 self.assertEqual(metric, metricOut)
353 # Test get with a datasetRef
354 metricOut = butler.get(ref)
355 self.assertEqual(metric, metricOut)
356 # Test getDeferred with dataId
357 metricOut = butler.getDeferred(ref.datasetType.name, dataId, collections=this_run).get()
358 self.assertEqual(metric, metricOut)
359 # Test getDeferred with a ref
360 metricOut = butler.getDeferred(ref).get()
361 self.assertEqual(metric, metricOut)
363 # Check we can get components
364 if storageClass.isComposite():
365 self.assertGetComponents(
366 butler, ref, ("summary", "data", "output"), metric, collections=this_run
367 )
369 primary_uri, secondary_uris = butler.getURIs(ref)
370 n_uris = len(secondary_uris)
371 if primary_uri:
372 n_uris += 1
374 # Can the artifacts themselves be retrieved?
375 if not butler._datastore.isEphemeral:
376 # Create a temporary directory to hold the retrieved
377 # artifacts.
378 with tempfile.TemporaryDirectory(
379 prefix="butler-artifacts-", ignore_cleanup_errors=True
380 ) as artifact_root:
381 root_uri = ResourcePath(artifact_root, forceDirectory=True)
383 for preserve_path in (True, False):
384 destination = root_uri.join(f"{preserve_path}_{counter}/")
385 log = logging.getLogger("lsst.x")
386 log.debug("Using destination %s for args %s", destination, args)
387 # Use copy so that we can test that overwrite
388 # protection works (using "auto" for File URIs
389 # would use hard links and subsequent transfer
390 # would work because it knows they are the same
391 # file).
392 transferred = butler.retrieveArtifacts(
393 [ref], destination, preserve_path=preserve_path, transfer="copy"
394 )
395 self.assertGreater(len(transferred), 0)
396 artifacts = list(ResourcePath.findFileResources([destination]))
397 # Filter out the index file.
398 artifacts = [a for a in artifacts if a.basename() != ZipIndex.index_name]
399 self.assertEqual(set(transferred), set(artifacts))
401 for artifact in transferred:
402 path_in_destination = artifact.relative_to(destination)
403 self.assertIsNotNone(path_in_destination)
404 assert path_in_destination is not None
406 # When path is not preserved there should not
407 # be any path separators.
408 num_seps = path_in_destination.count("/")
409 if preserve_path:
410 self.assertGreater(num_seps, 0)
411 else:
412 self.assertEqual(num_seps, 0)
414 self.assertEqual(
415 len(artifacts),
416 n_uris,
417 "Comparing expected artifacts vs actual:"
418 f" {artifacts} vs {primary_uri} and {secondary_uris}",
419 )
421 if preserve_path:
422 # No need to run these twice
423 with self.assertRaises(ValueError):
424 butler.retrieveArtifacts([ref], destination, transfer="move")
426 with self.assertRaisesRegex(
427 ValueError, "^Destination location must refer to a directory"
428 ):
429 butler.retrieveArtifacts(
430 [ref], ResourcePath("/some/file.txt", forceDirectory=False)
431 )
433 with self.assertRaises(FileExistsError):
434 butler.retrieveArtifacts([ref], destination)
436 transferred_again = butler.retrieveArtifacts(
437 [ref], destination, preserve_path=preserve_path, overwrite=True
438 )
439 self.assertEqual(set(transferred_again), set(transferred))
441 # Now remove the dataset completely.
442 butler.pruneDatasets([ref], purge=True, unstore=True)
443 # Lookup with original args should still fail.
444 kwargs = {"collections": this_run}
445 if isinstance(args[0], DatasetRef):
446 kwargs = {} # Prevent warning from being issued.
447 self.assertFalse(butler.exists(*args, **kwargs))
448 # get() should still fail.
449 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)):
450 butler.get(ref)
451 # Registry shouldn't be able to find it by dataset_id anymore.
452 self.assertIsNone(butler.get_dataset(ref.id))
454 # Do explicit registry removal since we know they are
455 # empty
456 butler.collections.x_remove(this_run)
457 expected_collections.remove(this_run)
459 # Create DatasetRef for put using default run.
460 refIn = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run=butler.run)
462 # Check that getDeferred fails with standalone ref.
463 with self.assertRaises(LookupError):
464 butler.getDeferred(refIn)
466 # Put the dataset again, since the last thing we did was remove it
467 # and we want to use the default collection.
468 ref = butler.put(metric, refIn)
470 # Get with parameters
471 stop = 4
472 sliced = butler.get(ref, parameters={"slice": slice(stop)})
473 self.assertNotEqual(metric, sliced)
474 self.assertEqual(metric.summary, sliced.summary)
475 self.assertEqual(metric.output, sliced.output)
476 assert metric.data is not None # for mypy
477 self.assertEqual(metric.data[:stop], sliced.data)
478 # getDeferred with parameters
479 sliced = butler.getDeferred(ref, parameters={"slice": slice(stop)}).get()
480 self.assertNotEqual(metric, sliced)
481 self.assertEqual(metric.summary, sliced.summary)
482 self.assertEqual(metric.output, sliced.output)
483 self.assertEqual(metric.data[:stop], sliced.data)
484 # getDeferred with deferred parameters
485 sliced = butler.getDeferred(ref).get(parameters={"slice": slice(stop)})
486 self.assertNotEqual(metric, sliced)
487 self.assertEqual(metric.summary, sliced.summary)
488 self.assertEqual(metric.output, sliced.output)
489 self.assertEqual(metric.data[:stop], sliced.data)
491 if storageClass.isComposite():
492 # Check that components can be retrieved
493 metricOut = butler.get(ref.datasetType.name, dataId)
494 compNameS = ref.datasetType.componentTypeName("summary")
495 compNameD = ref.datasetType.componentTypeName("data")
496 summary = butler.get(compNameS, dataId)
497 self.assertEqual(summary, metric.summary)
498 data = butler.get(compNameD, dataId)
499 self.assertEqual(data, metric.data)
501 if "counter" in storageClass.derivedComponents:
502 count = butler.get(ref.datasetType.componentTypeName("counter"), dataId)
503 self.assertEqual(count, len(data))
505 count = butler.get(
506 ref.datasetType.componentTypeName("counter"), dataId, parameters={"slice": slice(stop)}
507 )
508 self.assertEqual(count, stop)
510 compRef = butler.find_dataset(compNameS, dataId, collections=butler.collections.defaults)
511 assert compRef is not None
512 summary = butler.get(compRef)
513 self.assertEqual(summary, metric.summary)
515 # Create a Dataset type that has the same name but is inconsistent.
516 inconsistentDatasetType = DatasetType(
517 datasetTypeName, datasetType.dimensions, self.storageClassFactory.getStorageClass("Config")
518 )
520 # Getting with a dataset type that does not match registry fails
521 with self.assertRaisesRegex(
522 ValueError,
523 "(Supplied dataset type .* inconsistent with registry)"
524 "|(The new storage class .* is not compatible with the existing storage class)",
525 ):
526 butler.get(inconsistentDatasetType, dataId)
528 # Combining a DatasetRef with a dataId should fail
529 with self.assertRaisesRegex(ValueError, "DatasetRef given, cannot use dataId as well"):
530 butler.get(ref, dataId)
531 # Getting with an explicit ref should fail if the id doesn't match.
532 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)):
533 butler.get(DatasetRef(ref.datasetType, ref.dataId, id=uuid.UUID(int=101), run=butler.run))
535 # Getting a dataset with unknown parameters should fail
536 with self.assertRaisesRegex(KeyError, "Parameter 'unsupported' not understood"):
537 butler.get(ref, parameters={"unsupported": True})
539 # Check we have a collection
540 collections = set(butler.collections.query("*"))
541 self.assertEqual(collections, expected_collections)
543 # Clean up to check that we can remove something that may have
544 # already had a component removed
545 butler.pruneDatasets([ref], unstore=True, purge=True)
547 # Add the same ref again, so we can check that duplicate put fails.
548 ref = butler.put(metric, datasetType, dataId)
550 # Repeat put will fail.
551 with self.assertRaisesRegex(
552 ConflictingDefinitionError, "A database constraint failure was triggered"
553 ):
554 butler.put(metric, datasetType, dataId)
556 # Remove the datastore entry.
557 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False)
559 # Put will still fail
560 with self.assertRaisesRegex(
561 ConflictingDefinitionError, "A database constraint failure was triggered"
562 ):
563 butler.put(metric, datasetType, dataId)
565 # Repeat the same sequence with resolved ref.
566 butler.pruneDatasets([ref], unstore=True, purge=True)
567 ref = butler.put(metric, refIn)
569 # Repeat put will fail.
570 with self.assertRaisesRegex(ConflictingDefinitionError, "Datastore already contains dataset"):
571 butler.put(metric, refIn)
573 # Remove the datastore entry.
574 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False)
576 # In case of resolved ref this write will succeed.
577 ref = butler.put(metric, refIn)
579 # Leave the dataset in place since some downstream tests require
580 # something to be present
582 return butler
584 def testDeferredCollectionPassing(self) -> None:
585 # Construct a butler with no run or collection, but make it writeable.
586 butler = self.create_empty_butler(writeable=True)
587 # Create and register a DatasetType
588 dimensions = butler.dimensions.conform(["instrument", "visit"])
589 datasetType = self.addDatasetType(
590 "example", dimensions, self.storageClassFactory.getStorageClass("StructuredData"), butler.registry
591 )
592 # Add needed Dimensions
593 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
594 butler.registry.insertDimensionData(
595 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
596 )
597 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
598 butler.registry.insertDimensionData(
599 "visit",
600 {
601 "instrument": "DummyCamComp",
602 "id": 423,
603 "name": "fourtwentythree",
604 "physical_filter": "d-r",
605 "day_obs": 20250101,
606 },
607 )
608 dataId = {"instrument": "DummyCamComp", "visit": 423}
609 # Create dataset.
610 metric = makeExampleMetrics()
611 # Register a new run and put dataset.
612 run = "deferred"
613 self.assertTrue(butler.collections.register(run))
614 # Second time it will be allowed but indicate no-op
615 self.assertFalse(butler.collections.register(run))
616 ref = butler.put(metric, datasetType, dataId, run=run)
617 # Putting with no run should fail with TypeError.
618 with self.assertRaises(CollectionError):
619 butler.put(metric, datasetType, dataId)
620 # Dataset should exist.
621 self.assertTrue(butler.exists(datasetType, dataId, collections=[run]))
622 # We should be able to get the dataset back, but with and without
623 # a deferred dataset handle.
624 self.assertEqual(metric, butler.get(datasetType, dataId, collections=[run]))
625 self.assertEqual(metric, butler.getDeferred(datasetType, dataId, collections=[run]).get())
626 # Trying to find the dataset without any collection is an error.
627 with self.assertRaises(NoDefaultCollectionError):
628 butler.exists(datasetType, dataId)
629 with self.assertRaises(CollectionError):
630 butler.get(datasetType, dataId)
631 # Associate the dataset with a different collection.
632 butler.collections.register("tagged", type=CollectionType.TAGGED)
633 butler.registry.associate("tagged", [ref])
634 # Deleting the dataset from the new collection should make it findable
635 # in the original collection.
636 butler.pruneDatasets([ref], tags=["tagged"])
637 self.assertTrue(butler.exists(datasetType, dataId, collections=[run]))
640class ButlerTests(ButlerPutGetTests):
641 """Tests for Butler."""
643 useTempRoot = True
644 validationCanFail: bool
645 fullConfigKey: str | None
646 registryStr: str | None
647 datastoreName: list[str] | None
648 datastoreStr: list[str]
649 predictionSupported = True
650 """Does getURIs support 'prediction mode'?"""
652 def setUp(self) -> None:
653 """Create a new butler root for each test."""
654 self.root = makeTestTempDir(TESTDIR)
655 Butler.makeRepo(self.root, config=Config(self.configFile))
656 self.tmpConfigFile = os.path.join(self.root, "butler.yaml")
658 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool:
659 """Return True if two URIs refer to the same resource.
661 Subclasses may override to handle unique requirements.
662 """
663 return uri1 == uri2
665 def testConstructor(self) -> None:
666 """Independent test of constructor."""
667 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
668 self.enterContext(butler)
669 self.assertIsInstance(butler, Butler)
671 # Check that butler.yaml is added automatically.
672 if self.tmpConfigFile.endswith(end := "/butler.yaml"):
673 config_dir = self.tmpConfigFile[: -len(end)]
674 butler = Butler.from_config(config_dir, run=self.default_run)
675 self.enterContext(butler)
676 self.assertIsInstance(butler, Butler)
678 # Even with a ResourcePath.
679 butler = Butler.from_config(ResourcePath(config_dir, forceDirectory=True), run=self.default_run)
680 self.enterContext(butler)
681 self.assertIsInstance(butler, Butler)
683 collections = set(butler.collections.query("*"))
684 self.assertEqual(collections, {self.default_run})
686 # Check that some special characters can be included in run name.
687 special_run = "u@b.c-A"
688 butler_special = Butler.from_config(butler=butler, run=special_run)
689 self.enterContext(butler_special)
690 collections = set(butler_special.registry.queryCollections("*@*"))
691 self.assertEqual(collections, {special_run})
693 butler2 = Butler.from_config(butler=butler, collections=["other"])
694 self.enterContext(butler2)
695 self.assertEqual(butler2.collections.defaults, ("other",))
696 self.assertIsNone(butler2.run)
697 self.assertEqual(type(butler._datastore), type(butler2._datastore))
698 self.assertEqual(butler._datastore.config, butler2._datastore.config)
700 # Test that we can use an environment variable to find this
701 # repository.
702 butler_index = Config()
703 butler_index["label"] = self.tmpConfigFile
704 for suffix in (".yaml", ".json"):
705 # Ensure that the content differs so that we know that
706 # we aren't reusing the cache.
707 bad_label = f"file://bucket/not_real{suffix}"
708 butler_index["bad_label"] = bad_label
709 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
710 butler_index.dumpToUri(temp_file)
711 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
712 self.assertEqual(Butler.get_known_repos(), {"label", "bad_label"})
713 uri = Butler.get_repo_uri("bad_label")
714 self.assertEqual(uri, ResourcePath(bad_label))
715 uri = Butler.get_repo_uri("label")
716 butler = Butler.from_config(uri, writeable=False)
717 self.assertIsInstance(butler, Butler)
718 butler.close()
719 butler = Butler.from_config("label", writeable=False)
720 self.assertIsInstance(butler, Butler)
721 butler.close()
722 with self.assertRaisesRegex(FileNotFoundError, "aliases:.*bad_label"):
723 Butler.from_config("not_there", writeable=False)
724 with self.assertRaisesRegex(FileNotFoundError, "resolved from alias 'bad_label'"):
725 Butler.from_config("bad_label")
726 with self.assertRaises(FileNotFoundError):
727 # Should ignore aliases.
728 Butler.from_config(ResourcePath("label", forceAbsolute=False))
729 with self.assertRaises(KeyError) as cm:
730 Butler.get_repo_uri("missing")
731 self.assertEqual(
732 Butler.get_repo_uri("missing", True), ResourcePath("missing", forceAbsolute=False)
733 )
734 self.assertIn("not known to", str(cm.exception))
735 # Should report no failure.
736 self.assertEqual(ButlerRepoIndex.get_failure_reason(), "")
737 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
738 # Now with empty configuration.
739 butler_index = Config()
740 butler_index.dumpToUri(temp_file)
741 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
742 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases)"):
743 Butler.from_config("label")
744 with ResourcePath.temporary_uri(suffix=suffix) as temp_file:
745 # Now with bad contents.
746 with open(temp_file.ospath, "w") as fh:
747 print("'", file=fh)
748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}):
749 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases:.*could not be read)"):
750 Butler.from_config("label")
751 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": "file://not_found/x.yaml"}):
752 with self.assertRaises(FileNotFoundError):
753 Butler.get_repo_uri("label")
754 self.assertEqual(Butler.get_known_repos(), set())
756 with self.assertRaisesRegex(FileNotFoundError, "index file not found"):
757 Butler.from_config("label")
759 # Check that we can create Butler when the alias file is not found.
760 butler = Butler.from_config(self.tmpConfigFile, writeable=False)
761 self.enterContext(butler)
762 self.assertIsInstance(butler, Butler)
763 with self.assertRaises(RuntimeError) as cm:
764 # No environment variable set.
765 Butler.get_repo_uri("label")
766 self.assertEqual(Butler.get_repo_uri("label", True), ResourcePath("label", forceAbsolute=False))
767 self.assertIn("No repository index defined", str(cm.exception))
768 with self.assertRaisesRegex(FileNotFoundError, "no known aliases.*No repository index"):
769 # No aliases registered.
770 Butler.from_config("not_there")
771 self.assertEqual(Butler.get_known_repos(), set())
773 def testClose(self):
774 butler = self.create_empty_butler(cleanup=False)
775 is_direct_butler = isinstance(butler, DirectButler)
776 if is_direct_butler:
777 self.assertFalse(butler._closed)
779 with butler as butler_from_context_manager:
780 self.assertIs(butler, butler_from_context_manager)
781 if is_direct_butler:
782 self.assertTrue(butler._closed)
783 with self.assertRaisesRegex(RuntimeError, "has been closed"):
784 butler.get_dataset_type("raw")
786 # Close may be called multiple times.
787 butler.close()
788 if is_direct_butler:
789 self.assertTrue(butler._closed)
791 def testGarbageCollection(self):
792 """Test that Butler does not have any circular references that prevent
793 it from being garbage collected immediately when it goes out of scope.
794 """
795 butler = self.create_empty_butler(cleanup=False)
796 is_direct_butler = isinstance(butler, DirectButler)
797 butler_ref = weakref.ref(butler)
798 if is_direct_butler:
799 registry_ref = weakref.ref(butler._registry)
800 managers_ref = weakref.ref(butler._registry._managers)
801 datastore_ref = weakref.ref(butler._datastore)
802 db_ref = weakref.ref(butler._registry._db)
803 engine_ref = weakref.ref(butler._registry._db._engine)
805 with warnings.catch_warnings():
806 # Hide warnings from unclosed database handles.
807 warnings.simplefilter("ignore", ResourceWarning)
808 del butler
809 self.assertIsNone(butler_ref(), "Butler should have been garbage collected")
810 if is_direct_butler:
811 self.assertIsNone(registry_ref(), "SqlRegistry should have been garbage collected")
812 self.assertIsNone(managers_ref(), "Registry managers should have been garbage collected")
813 self.assertIsNone(datastore_ref(), "Datastore should have been garbage collected")
814 self.assertIsNone(db_ref(), "Database should have been garbage collected")
815 # SQLAlchemy has internal reference cycles, so the Engine instance
816 # is not cleaned up promptly even if we release our reference to
817 # it. Explicitly clean it up here to avoid file handles leaking.
818 if is_direct_butler:
819 engine = engine_ref()
820 if engine is not None:
821 engine.dispose()
823 def testDafButlerRepositories(self):
824 with unittest.mock.patch.dict(
825 os.environ,
826 {"DAF_BUTLER_REPOSITORIES": "label: 'https://someuri.com'\notherLabel: 'https://otheruri.com'\n"},
827 ):
828 self.assertEqual(str(Butler.get_repo_uri("label")), "https://someuri.com")
830 with unittest.mock.patch.dict(
831 os.environ,
832 {
833 "DAF_BUTLER_REPOSITORIES": "label: https://someuri.com",
834 "DAF_BUTLER_REPOSITORY_INDEX": "https://someuri.com",
835 },
836 ):
837 with self.assertRaisesRegex(RuntimeError, "Only one of the environment variables"):
838 Butler.get_repo_uri("label")
840 with unittest.mock.patch.dict(
841 os.environ,
842 {"DAF_BUTLER_REPOSITORIES": "invalid"},
843 ):
844 with self.assertRaisesRegex(ValueError, "Repository index not in expected format"):
845 Butler.get_repo_uri("label")
847 def testBasicPutGet(self) -> None:
848 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
849 self.runPutGetTest(storageClass, "test_metric")
851 def testCompositePutGetConcrete(self) -> None:
852 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadCompNoDisassembly")
853 butler = self.runPutGetTest(storageClass, "test_metric")
855 # Should *not* be disassembled
856 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run))
857 self.assertEqual(len(datasets), 1)
858 uri, components = butler.getURIs(datasets[0])
859 self.assertIsInstance(uri, ResourcePath)
860 self.assertFalse(components)
861 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}")
862 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}")
864 # Predicted dataset
865 if self.predictionSupported:
866 dataId = {"instrument": "DummyCamComp", "visit": 424}
867 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True)
868 self.assertFalse(components)
869 self.assertIsInstance(uri, ResourcePath)
870 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}")
871 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}")
872 # Repeat with a DatasetRef to test that code path.
873 ref = DatasetRef(
874 datasets[0].datasetType,
875 dataId=DataCoordinate.standardize(dataId, universe=butler.dimensions),
876 run=self.default_run,
877 )
878 uri2, components2 = butler.getURIs(ref, predict=True)
879 self.assertFalse(components2)
880 self.assertEqual(uri, uri2)
882 def testCompositePutGetVirtual(self) -> None:
883 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadComp")
884 butler = self.runPutGetTest(storageClass, "test_metric_comp")
886 # Should be disassembled
887 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run))
888 self.assertEqual(len(datasets), 1)
889 uri, components = butler.getURIs(datasets[0])
891 if butler._datastore.isEphemeral:
892 # Never disassemble in-memory datastore
893 self.assertIsInstance(uri, ResourcePath)
894 self.assertFalse(components)
895 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}")
896 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}")
897 else:
898 self.assertIsNone(uri)
899 self.assertEqual(set(components), set(storageClass.components))
900 for compuri in components.values():
901 self.assertIsInstance(compuri, ResourcePath)
902 self.assertIn("423", str(compuri), f"Checking visit is in URI {compuri}")
903 self.assertEqual(compuri.fragment, "", f"Checking absence of fragment in {compuri}")
905 if self.predictionSupported:
906 # Predicted dataset
907 dataId = {"instrument": "DummyCamComp", "visit": 424}
908 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True)
910 if butler._datastore.isEphemeral:
911 # Never disassembled
912 self.assertIsInstance(uri, ResourcePath)
913 self.assertFalse(components)
914 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}")
915 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}")
916 else:
917 self.assertIsNone(uri)
918 self.assertEqual(set(components), set(storageClass.components))
919 for compuri in components.values():
920 self.assertIsInstance(compuri, ResourcePath)
921 self.assertIn("424", str(compuri), f"Checking visit is in URI {compuri}")
922 self.assertEqual(compuri.fragment, "predicted", f"Checking for fragment in {compuri}")
924 def testStorageClassOverrideGet(self) -> None:
925 """Test storage class conversion on get with override."""
926 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
927 datasetTypeName = "anything"
928 run = self.default_run
930 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName)
932 # Create and store a dataset.
933 metric = makeExampleMetrics()
934 dataId = {"instrument": "DummyCamComp", "visit": 423}
936 ref = butler.put(metric, datasetType, dataId)
938 # Return native type.
939 retrieved = butler.get(ref)
940 self.assertEqual(retrieved, metric)
942 # Specify an override.
943 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion")
944 model = butler.get(ref, storageClass=new_sc)
945 self.assertNotEqual(type(model), type(retrieved))
946 self.assertIs(type(model), new_sc.pytype)
947 self.assertEqual(retrieved, model)
949 # Defer but override later.
950 deferred = butler.getDeferred(ref)
951 model = deferred.get(storageClass=new_sc)
952 self.assertIs(type(model), new_sc.pytype)
953 self.assertEqual(retrieved, model)
955 # Defer but override up front.
956 deferred = butler.getDeferred(ref, storageClass=new_sc)
957 model = deferred.get()
958 self.assertIs(type(model), new_sc.pytype)
959 self.assertEqual(retrieved, model)
961 # Retrieve a component. Should be a tuple.
962 data = butler.get("anything.data", dataId, storageClass="StructuredDataDataTestTuple")
963 self.assertIs(type(data), tuple)
964 self.assertEqual(data, tuple(retrieved.data))
966 # Parameter on the write storage class should work regardless
967 # of read storage class.
968 data = butler.get(
969 "anything.data",
970 dataId,
971 storageClass="StructuredDataDataTestTuple",
972 parameters={"slice": slice(2, 4)},
973 )
974 self.assertEqual(len(data), 2)
976 # Try a parameter that is known to the read storage class but not
977 # the write storage class.
978 with self.assertRaises(KeyError):
979 butler.get(
980 "anything.data",
981 dataId,
982 storageClass="StructuredDataDataTestTuple",
983 parameters={"xslice": slice(2, 4)},
984 )
986 def testPytypePutCoercion(self) -> None:
987 """Test python type coercion on Butler.get and put."""
988 # Store some data with the normal example storage class.
989 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
990 datasetTypeName = "test_metric"
991 butler, _ = self.create_butler(self.default_run, storageClass, datasetTypeName)
993 dataId = {"instrument": "DummyCamComp", "visit": 423}
995 # Put a dict and this should coerce to a MetricsExample
996 test_dict = {"summary": {"a": 1}, "output": {"b": 2}}
997 metric_ref = butler.put(test_dict, datasetTypeName, dataId=dataId, visit=424)
998 test_metric = butler.get(metric_ref)
999 self.assertEqual(get_full_type_name(test_metric), "lsst.daf.butler.tests.MetricsExample")
1000 self.assertEqual(test_metric.summary, test_dict["summary"])
1001 self.assertEqual(test_metric.output, test_dict["output"])
1003 # Check that the put still works if a DatasetType is given with
1004 # a definition matching this python type.
1005 registry_type = butler.get_dataset_type(datasetTypeName)
1006 this_type = DatasetType(datasetTypeName, registry_type.dimensions, "StructuredDataDictJson")
1007 metric2_ref = butler.put(test_dict, this_type, dataId=dataId, visit=425)
1008 self.assertEqual(metric2_ref.datasetType, registry_type)
1010 # The get will return the type expected by registry.
1011 test_metric2 = butler.get(metric2_ref)
1012 self.assertEqual(get_full_type_name(test_metric2), "lsst.daf.butler.tests.MetricsExample")
1014 # Make a new DatasetRef with the compatible but different DatasetType.
1015 # This should now return a dict.
1016 new_ref = DatasetRef(this_type, metric2_ref.dataId, id=metric2_ref.id, run=metric2_ref.run)
1017 test_dict2 = butler.get(new_ref)
1018 self.assertEqual(get_full_type_name(test_dict2), "dict")
1020 # Get it again with the wrong dataset type definition using get()
1021 # rather than get(). This should be consistent with get()
1022 # behavior and return the type of the DatasetType.
1023 test_dict3 = butler.get(this_type, dataId=dataId, visit=425)
1024 self.assertEqual(get_full_type_name(test_dict3), "dict")
1026 def test_ingest_zip(self) -> None:
1027 """Create butler, export data, delete data, import from Zip."""
1028 butler, dataset_type = self.create_butler(
1029 run=self.default_run, storageClass="StructuredData", datasetTypeName="metrics"
1030 )
1032 metric = makeExampleMetrics()
1033 refs = []
1034 for visit in (423, 424, 425):
1035 ref = butler.put(metric, dataset_type, instrument="DummyCamComp", visit=visit)
1036 refs.append(ref)
1038 # Retrieve a Zip file.
1039 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
1040 zip = butler.retrieve_artifacts_zip(refs, destination=tmpdir)
1042 # Ingest will fail.
1043 with self.assertRaises(ConflictingDefinitionError):
1044 butler.ingest_zip(zip)
1046 # Clear out the collection.
1047 butler.removeRuns([self.default_run])
1048 self.assertFalse(butler.exists(refs[0]))
1050 butler.ingest_zip(zip, transfer="copy")
1051 self.assertGreater(butler._metrics.time_in_ingest, 0.0)
1052 self.assertEqual(butler._metrics.n_ingest, len(refs))
1054 # Check that it fails if we try it again.
1055 with self.assertRaises(ConflictingDefinitionError):
1056 butler.ingest_zip(zip, transfer="copy")
1058 # This will be a no-op.
1059 butler.ingest_zip(zip, transfer="copy", skip_existing=True)
1061 # Create an entirely new local file butler in this temp directory.
1062 new_butler_cfg = Butler.makeRepo(tmpdir)
1063 new_butler = Butler.from_config(new_butler_cfg, writeable=True)
1064 self.enterContext(new_butler)
1066 # This will fail since dimensions records are missing.
1067 with self.assertRaises(ConflictingDefinitionError):
1068 new_butler.ingest_zip(zip, transfer="copy")
1070 # Dry run should work.
1071 new_butler.ingest_zip(zip, transfer="copy", dry_run=True)
1073 new_butler.ingest_zip(zip, transfer="copy", transfer_dimensions=True)
1074 self.assertTrue(butler.exists(refs[0]))
1076 # Check that the refs can be read again.
1077 _ = [butler.get(ref) for ref in refs]
1079 uri = butler.getURI(refs[2])
1080 self.assertTrue(uri.exists())
1082 # Delete one dataset. The Zip file should still exist and allow
1083 # remaining refs to be read.
1084 butler.pruneDatasets([refs[0]], purge=True, unstore=True)
1085 self.assertTrue(uri.exists())
1087 metric2 = butler.get(refs[1])
1088 self.assertEqual(metric2, metric, msg=f"{metric2} != {metric}")
1090 butler.removeRuns([self.default_run])
1091 self.assertFalse(uri.exists())
1092 self.assertFalse(butler.exists(refs[-1]))
1094 with self.assertRaises(ValueError):
1095 butler.retrieve_artifacts_zip([], destination=".")
1097 def testIngest(self) -> None:
1098 butler = self.create_empty_butler(run=self.default_run)
1100 # Create and register a DatasetType
1101 dimensions = butler.dimensions.conform(["instrument", "visit", "detector"])
1103 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDictYaml")
1104 datasetTypeName = "metric"
1106 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1108 # Add needed Dimensions
1109 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1110 butler.registry.insertDimensionData(
1111 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1112 )
1113 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
1114 for detector in (1, 2):
1115 butler.registry.insertDimensionData(
1116 "detector", {"instrument": "DummyCamComp", "id": detector, "full_name": f"detector{detector}"}
1117 )
1119 butler.registry.insertDimensionData(
1120 "visit",
1121 {
1122 "instrument": "DummyCamComp",
1123 "id": 423,
1124 "name": "fourtwentythree",
1125 "physical_filter": "d-r",
1126 "day_obs": 20250101,
1127 },
1128 {
1129 "instrument": "DummyCamComp",
1130 "id": 424,
1131 "name": "fourtwentyfour",
1132 "physical_filter": "d-r",
1133 "day_obs": 20250101,
1134 },
1135 )
1137 formatter = doImportType("lsst.daf.butler.formatters.yaml.YamlFormatter")
1138 dataRoot = os.path.join(TESTDIR, "data", "basic")
1139 datasets = []
1140 # Test one DatasetRef with a run that exists, and the other with a run
1141 # that doesn't exist, to verify that run collections are created when
1142 # required.
1143 runs = {1: self.default_run, 2: "a/new/run"}
1144 for detector in (1, 2):
1145 detector_name = f"detector_{detector}"
1146 metricFile = os.path.join(dataRoot, f"{detector_name}.yaml")
1147 dataId = butler.registry.expandDataId(
1148 {"instrument": "DummyCamComp", "visit": 423, "detector": detector}
1149 )
1150 # Create a DatasetRef for ingest
1151 refIn = DatasetRef(datasetType, dataId, run=runs[detector])
1153 datasets.append(FileDataset(path=metricFile, refs=[refIn], formatter=formatter))
1155 butler.ingest(*datasets, transfer="copy")
1157 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 423}
1158 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 423}
1160 metrics1 = butler.get(datasetTypeName, dataId1)
1161 metrics2 = butler.get(datasetTypeName, dataId2, collections="a/new/run")
1162 self.assertNotEqual(metrics1, metrics2)
1164 # Compare URIs
1165 uri1 = butler.getURI(datasetTypeName, dataId1)
1166 uri2 = butler.getURI(datasetTypeName, dataId2, collections="a/new/run")
1167 self.assertFalse(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}")
1169 # Re-ingesting the same datasets raises an error with
1170 # skip_existing=False.
1171 with self.assertRaises(ConflictingDefinitionError):
1172 butler.ingest(*datasets, transfer="copy")
1173 # skip_existing=True makes it a no-op to re-ingest the same datasets.
1174 butler.ingest(*datasets, transfer="copy", skip_existing=True)
1176 # Now do a multi-dataset but single file ingest
1177 metricFile = os.path.join(dataRoot, "detectors.yaml")
1178 refs = []
1179 for detector in (1, 2):
1180 detector_name = f"detector_{detector}"
1181 dataId = butler.registry.expandDataId(
1182 {"instrument": "DummyCamComp", "visit": 424, "detector": detector}
1183 )
1184 # Create a DatasetRef for ingest
1185 refs.append(DatasetRef(datasetType, dataId, run=self.default_run))
1187 # Test "move" transfer to ensure that the files themselves
1188 # have disappeared following ingest.
1189 with ResourcePath.temporary_uri(suffix=".yaml") as tempFile:
1190 tempFile.transfer_from(ResourcePath(metricFile), transfer="copy")
1192 datasets = []
1193 datasets.append(FileDataset(path=tempFile, refs=refs, formatter=MultiDetectorFormatter))
1195 # For first ingest use copy.
1196 butler.ingest(*datasets, transfer="copy", record_validation_info=False)
1198 # Now try to ingest again in "execution butler" mode where
1199 # the registry entries exist but the datastore does not have
1200 # the files. We also need to strip the dimension records to ensure
1201 # that they will be re-added by the ingest.
1202 ref = datasets[0].refs[0]
1203 datasets[0].refs = [
1204 cast(
1205 DatasetRef,
1206 butler.find_dataset(ref.datasetType, data_id=ref.dataId, collections=ref.run),
1207 )
1208 for ref in datasets[0].refs
1209 ]
1210 all_refs = []
1211 for dataset in datasets:
1212 refs = []
1213 for ref in dataset.refs:
1214 # Create a dict from the dataId to drop the records.
1215 new_data_id = dict(ref.dataId.required)
1216 new_ref = butler.find_dataset(ref.datasetType, new_data_id, collections=ref.run)
1217 assert new_ref is not None
1218 self.assertFalse(new_ref.dataId.hasRecords())
1219 refs.append(new_ref)
1220 dataset.refs = refs
1221 all_refs.extend(dataset.refs)
1222 butler.pruneDatasets(all_refs, disassociate=False, unstore=True, purge=False)
1224 # Use move mode to test that the file is deleted. Also
1225 # disable recording of file size.
1226 butler.ingest(*datasets, transfer="move", record_validation_info=False)
1228 # Check that every ref now has records.
1229 for dataset in datasets:
1230 for ref in dataset.refs:
1231 self.assertTrue(ref.dataId.hasRecords())
1233 # Ensure that the file has disappeared.
1234 self.assertFalse(tempFile.exists())
1236 # Check that the datastore recorded no file size.
1237 # Not all datastores can support this.
1238 try:
1239 infos = butler._datastore.getStoredItemsInfo(datasets[0].refs[0]) # type: ignore[attr-defined]
1240 self.assertEqual(infos[0].file_size, -1)
1241 except AttributeError:
1242 pass
1244 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 424}
1245 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 424}
1247 multi1 = butler.get(datasetTypeName, dataId1)
1248 multi2 = butler.get(datasetTypeName, dataId2)
1250 self.assertEqual(multi1, metrics1)
1251 self.assertEqual(multi2, metrics2)
1253 # Compare URIs
1254 uri1 = butler.getURI(datasetTypeName, dataId1)
1255 uri2 = butler.getURI(datasetTypeName, dataId2)
1256 self.assertTrue(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}")
1258 # Test that removing one does not break the second
1259 # This line will issue a warning log message for a ChainedDatastore
1260 # that uses an InMemoryDatastore since in-memory can not ingest
1261 # files.
1262 butler.pruneDatasets([datasets[0].refs[0]], unstore=True, disassociate=False)
1263 self.assertFalse(butler.exists(datasetTypeName, dataId1))
1264 self.assertTrue(butler.exists(datasetTypeName, dataId2))
1265 multi2b = butler.get(datasetTypeName, dataId2)
1266 self.assertEqual(multi2, multi2b)
1268 # Ensure we can ingest 0 datasets
1269 datasets = []
1270 butler.ingest(*datasets)
1272 def testPickle(self) -> None:
1273 """Test pickle support."""
1274 butler = self.create_empty_butler(run=self.default_run)
1275 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration"
1276 butlerOut = pickle.loads(pickle.dumps(butler))
1277 self.enterContext(butlerOut)
1278 self.assertIsInstance(butlerOut, Butler)
1279 self.assertEqual(butlerOut._config, butler._config)
1280 self.assertEqual(list(butlerOut.collections.defaults), list(butler.collections.defaults))
1281 self.assertEqual(butlerOut.run, butler.run)
1283 def testGetDatasetTypes(self) -> None:
1284 butler = self.create_empty_butler(run=self.default_run)
1285 dimensions = butler.dimensions.conform(["instrument", "visit", "physical_filter"])
1286 dimensionEntries: list[tuple[str, list[Mapping[str, Any]]]] = [
1287 (
1288 "instrument",
1289 [
1290 {"instrument": "DummyCam"},
1291 {"instrument": "DummyHSC"},
1292 {"instrument": "DummyCamComp"},
1293 ],
1294 ),
1295 ("physical_filter", [{"instrument": "DummyCam", "name": "d-r", "band": "R"}]),
1296 ("day_obs", [{"instrument": "DummyCam", "id": 20250101}]),
1297 (
1298 "visit",
1299 [
1300 {
1301 "instrument": "DummyCam",
1302 "id": 42,
1303 "name": "fortytwo",
1304 "physical_filter": "d-r",
1305 "day_obs": 20250101,
1306 }
1307 ],
1308 ),
1309 ]
1310 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1311 # Add needed Dimensions
1312 for element, data in dimensionEntries:
1313 butler.registry.insertDimensionData(element, *data)
1315 # When a DatasetType is added to the registry entries are not created
1316 # for components but querying them can return the components.
1317 datasetTypeNames = {"metric", "metric2", "metric4", "metric33", "pvi", "paramtest"}
1318 components = set()
1319 for datasetTypeName in datasetTypeNames:
1320 # Create and register a DatasetType
1321 self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1323 for componentName in storageClass.components:
1324 components.add(DatasetType.nameWithComponent(datasetTypeName, componentName))
1326 fromRegistry: set[DatasetType] = set()
1327 for parent_dataset_type in butler.registry.queryDatasetTypes():
1328 fromRegistry.add(parent_dataset_type)
1329 fromRegistry.update(parent_dataset_type.makeAllComponentDatasetTypes())
1330 self.assertEqual({d.name for d in fromRegistry}, datasetTypeNames | components)
1332 # Query with wildcard.
1333 dataset_types = butler.registry.queryDatasetTypes("metric*")
1334 self.assertEqual(len(dataset_types), 4, f"Got: {dataset_types}")
1335 # but not regex.
1336 with self.assertRaises(DatasetTypeExpressionError):
1337 butler.registry.queryDatasetTypes(["pvi", re.compile("metric.*")])
1339 # Now that we have some dataset types registered, validate them
1340 butler.validateConfiguration(
1341 ignore=[
1342 "test_metric_comp",
1343 "metric3",
1344 "metric5",
1345 "calexp",
1346 "DummySC",
1347 "datasetType.component",
1348 "random_data",
1349 "random_data_2",
1350 ]
1351 )
1353 # Add a new datasetType that will fail template validation
1354 self.addDatasetType("test_metric_comp", dimensions, storageClass, butler.registry)
1355 if self.validationCanFail:
1356 with self.assertRaises(ValidationError):
1357 butler.validateConfiguration()
1359 # Rerun validation but with a subset of dataset type names
1360 butler.validateConfiguration(datasetTypeNames=["metric4"])
1362 # Rerun validation but ignore the bad datasetType
1363 butler.validateConfiguration(
1364 ignore=[
1365 "test_metric_comp",
1366 "metric3",
1367 "metric5",
1368 "calexp",
1369 "DummySC",
1370 "datasetType.component",
1371 "random_data",
1372 "random_data_2",
1373 ]
1374 )
1376 def testTransaction(self) -> None:
1377 butler = self.create_empty_butler(run=self.default_run)
1378 datasetTypeName = "test_metric"
1379 dimensions = butler.dimensions.conform(["instrument", "visit"])
1380 dimensionEntries: tuple[tuple[str, Mapping[str, Any]], ...] = (
1381 ("instrument", {"instrument": "DummyCam"}),
1382 ("physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "R"}),
1383 ("day_obs", {"instrument": "DummyCam", "id": 20250101}),
1384 (
1385 "visit",
1386 {
1387 "instrument": "DummyCam",
1388 "id": 42,
1389 "name": "fortytwo",
1390 "physical_filter": "d-r",
1391 "day_obs": 20250101,
1392 },
1393 ),
1394 )
1395 storageClass = self.storageClassFactory.getStorageClass("StructuredData")
1396 metric = makeExampleMetrics()
1397 dataId = {"instrument": "DummyCam", "visit": 42}
1398 # Create and register a DatasetType
1399 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry)
1400 with self.assertRaises(TransactionTestError):
1401 with butler.transaction():
1402 # Add needed Dimensions
1403 for args in dimensionEntries:
1404 butler.registry.insertDimensionData(*args)
1405 # Store a dataset
1406 ref = butler.put(metric, datasetTypeName, dataId)
1407 self.assertIsInstance(ref, DatasetRef)
1408 # Test get of a ref.
1409 metricOut = butler.get(ref)
1410 self.assertEqual(metric, metricOut)
1411 # Test get
1412 metricOut = butler.get(datasetTypeName, dataId)
1413 self.assertEqual(metric, metricOut)
1414 # Check we can get components
1415 self.assertGetComponents(butler, ref, ("summary", "data", "output"), metric)
1416 raise TransactionTestError("This should roll back the entire transaction")
1417 with self.assertRaises(DataIdValueError, msg=f"Check can't expand DataId {dataId}"):
1418 butler.registry.expandDataId(dataId)
1419 # Should raise LookupError for missing data ID value
1420 with self.assertRaises(LookupError, msg=f"Check can't get by {datasetTypeName} and {dataId}"):
1421 butler.get(datasetTypeName, dataId)
1422 # Also check explicitly if Dataset entry is missing
1423 self.assertIsNone(butler.find_dataset(datasetType, dataId, collections=butler.collections.defaults))
1424 # Direct retrieval should not find the file in the Datastore
1425 with self.assertRaises(FileNotFoundError, msg=f"Check {ref} can't be retrieved directly"):
1426 butler.get(ref)
1428 def testMakeRepo(self) -> None:
1429 """Test that we can write butler configuration to a new repository via
1430 the Butler.makeRepo interface and then instantiate a butler from the
1431 repo root.
1432 """
1433 # Do not run the test if we know this datastore configuration does
1434 # not support a file system root
1435 if self.fullConfigKey is None:
1436 return
1438 # create two separate directories
1439 root1 = tempfile.mkdtemp(dir=self.root)
1440 root2 = tempfile.mkdtemp(dir=self.root)
1442 self.assertFalse(Butler.has_repo_config(root1))
1443 butlerConfig = Butler.makeRepo(root1, config=Config(self.configFile))
1444 self.assertTrue(Butler.has_repo_config(root1))
1445 limited = Config(self.configFile)
1446 butler1 = Butler.from_config(butlerConfig)
1447 self.enterContext(butler1)
1448 assert isinstance(butler1, DirectButler), "Expect DirectButler in configuration"
1449 butlerConfig = Butler.makeRepo(root2, standalone=True, config=Config(self.configFile))
1450 full = Config(self.tmpConfigFile)
1451 butler2 = Butler.from_config(butlerConfig)
1452 self.enterContext(butler2)
1453 assert isinstance(butler2, DirectButler), "Expect DirectButler in configuration"
1454 # Butlers should have the same configuration regardless of whether
1455 # defaults were expanded.
1456 self.assertEqual(butler1._config, butler2._config)
1457 # Config files loaded directly should not be the same.
1458 self.assertNotEqual(limited, full)
1459 # Make sure "limited" doesn't have a few keys we know it should be
1460 # inheriting from defaults.
1461 self.assertIn(self.fullConfigKey, full)
1462 self.assertNotIn(self.fullConfigKey, limited)
1464 # Collections don't appear until something is put in them
1465 collections1 = set(butler1.registry.queryCollections())
1466 self.assertEqual(collections1, set())
1467 self.assertEqual(set(butler2.registry.queryCollections()), collections1)
1469 # Check that a config with no associated file name will not
1470 # work properly with relocatable Butler repo
1471 butlerConfig.configFile = None
1472 with self.assertRaises(ValueError):
1473 Butler.from_config(butlerConfig)
1475 with self.assertRaises(FileExistsError):
1476 Butler.makeRepo(self.root, standalone=True, config=Config(self.configFile), overwrite=False)
1478 def testStringification(self) -> None:
1479 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
1480 self.enterContext(butler)
1481 butlerStr = str(butler)
1483 if self.datastoreStr is not None:
1484 for testStr in self.datastoreStr:
1485 self.assertIn(testStr, butlerStr)
1486 if self.registryStr is not None:
1487 self.assertIn(self.registryStr, butlerStr)
1489 datastoreName = butler._datastore.name
1490 if self.datastoreName is not None:
1491 for testStr in self.datastoreName:
1492 self.assertIn(testStr, datastoreName)
1494 def testButlerRewriteDataId(self) -> None:
1495 """Test that dataIds can be rewritten based on dimension records."""
1496 butler = self.create_empty_butler(run=self.default_run)
1498 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
1499 datasetTypeName = "random_data"
1501 # Create dimension records.
1502 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1503 butler.registry.insertDimensionData(
1504 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1505 )
1506 butler.registry.insertDimensionData(
1507 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
1508 )
1510 dimensions = butler.dimensions.conform(["instrument", "exposure"])
1511 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
1512 butler.registry.registerDatasetType(datasetType)
1514 n_exposures = 5
1515 dayobs = 20210530
1517 # Create records for multiple day_obs but same seq_num to test that
1518 # we are constraining gets properly when day_obs/seq_num is used
1519 # for an exposure. Second day is year in future but is not used.
1520 for day_obs in (dayobs, dayobs + 1_00_00):
1521 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": day_obs})
1523 for i in range(n_exposures):
1524 group_name = f"group_{day_obs}_{i}"
1525 butler.registry.insertDimensionData(
1526 "group", {"instrument": "DummyCamComp", "name": group_name}
1527 )
1528 butler.registry.insertDimensionData(
1529 "exposure",
1530 {
1531 "instrument": "DummyCamComp",
1532 "id": day_obs + i,
1533 "obs_id": f"exp_{day_obs}_{i}",
1534 "seq_num": i,
1535 "day_obs": day_obs,
1536 "physical_filter": "d-r",
1537 "group": group_name,
1538 },
1539 )
1541 # Write some data.
1542 for i in range(n_exposures):
1543 metric = {"something": i, "other": "metric", "list": [2 * x for x in range(i)]}
1545 # Use the seq_num for the put to test rewriting.
1546 dataId = {"seq_num": i, "day_obs": dayobs, "instrument": "DummyCamComp", "physical_filter": "d-r"}
1547 ref = butler.put(metric, datasetTypeName, dataId=dataId)
1549 # Check that the exposure is correct in the dataId
1550 self.assertEqual(ref.dataId["exposure"], dayobs + i)
1552 # and check that we can get the dataset back with the same dataId
1553 new_metric = butler.get(datasetTypeName, dataId=dataId)
1554 self.assertEqual(new_metric, metric)
1556 # Check that we can find the datasets using the day_obs or the
1557 # exposure.day_obs.
1558 datasets_1 = list(
1559 butler.registry.queryDatasets(
1560 datasetType,
1561 collections=self.default_run,
1562 where="day_obs = :dayObs AND instrument = :instr",
1563 bind={"dayObs": dayobs, "instr": "DummyCamComp"},
1564 )
1565 )
1566 datasets_2 = list(
1567 butler.registry.queryDatasets(
1568 datasetType,
1569 collections=self.default_run,
1570 where="exposure.day_obs = :dayObs AND instrument = :instr",
1571 bind={"dayObs": dayobs, "instr": "DummyCamComp"},
1572 )
1573 )
1574 self.assertEqual(datasets_1, datasets_2)
1576 def testGetDatasetCollectionCaching(self):
1577 # Prior to DM-41117, there was a bug where get_dataset would throw
1578 # MissingCollectionError if you tried to fetch a dataset that was added
1579 # after the collection cache was last updated.
1580 reader_butler, datasetType = self.create_butler(self.default_run, "int", "datasettypename")
1581 writer_butler = self.create_empty_butler(writeable=True, run="new_run")
1582 dataId = {"instrument": "DummyCamComp", "visit": 423}
1583 put_ref = writer_butler.put(123, datasetType, dataId)
1584 get_ref = reader_butler.get_dataset(put_ref.id)
1585 self.assertEqual(get_ref.id, put_ref.id)
1586 # Also works when looking up via a hexadecimal string instead of a UUID
1587 # instance.
1588 hex_ref = reader_butler.get_dataset(put_ref.id.hex)
1589 self.assertEqual(hex_ref.id, put_ref.id)
1591 def testCollectionChainRedefine(self):
1592 butler = self._setup_to_test_collection_chain()
1594 butler.collections.redefine_chain("chain", "a")
1595 self._check_chain(butler, ["a"])
1597 # Duplicates are removed from the list of children
1598 butler.collections.redefine_chain("chain", ["c", "b", "c"])
1599 self._check_chain(butler, ["c", "b"])
1601 # Empty list clears the chain
1602 butler.collections.redefine_chain("chain", [])
1603 self._check_chain(butler, [])
1605 self._test_common_chain_functionality(butler, butler.collections.redefine_chain)
1607 def testCollectionChainPrepend(self):
1608 butler = self._setup_to_test_collection_chain()
1610 # Duplicates are removed from the list of children
1611 butler.collections.prepend_chain("chain", ["c", "b", "c"])
1612 self._check_chain(butler, ["c", "b"])
1614 # Prepend goes on the front of existing chain
1615 butler.collections.prepend_chain("chain", ["a"])
1616 self._check_chain(butler, ["a", "c", "b"])
1618 # Empty prepend does nothing
1619 butler.collections.prepend_chain("chain", [])
1620 self._check_chain(butler, ["a", "c", "b"])
1622 # Prepending children that already exist in the chain removes them from
1623 # their current position.
1624 butler.collections.prepend_chain("chain", ["d", "b", "c"])
1625 self._check_chain(butler, ["d", "b", "c", "a"])
1627 self._test_common_chain_functionality(butler, butler.collections.prepend_chain)
1629 def testCollectionChainExtend(self):
1630 butler = self._setup_to_test_collection_chain()
1632 # Duplicates are removed from the list of children
1633 butler.collections.extend_chain("chain", ["c", "b", "c"])
1634 self._check_chain(butler, ["c", "b"])
1636 # Extend goes on the end of existing chain
1637 butler.collections.extend_chain("chain", ["a"])
1638 self._check_chain(butler, ["c", "b", "a"])
1640 # Empty extend does nothing
1641 butler.collections.extend_chain("chain", [])
1642 self._check_chain(butler, ["c", "b", "a"])
1644 # Extending children that already exist in the chain removes them from
1645 # their current position.
1646 butler.collections.extend_chain("chain", ["d", "b", "c"])
1647 self._check_chain(butler, ["a", "d", "b", "c"])
1649 self._test_common_chain_functionality(butler, butler.collections.extend_chain)
1651 def testCollectionChainRemove(self) -> None:
1652 butler = self._setup_to_test_collection_chain()
1654 butler.collections.redefine_chain("chain", ["a", "b", "c", "d"])
1656 butler.collections.remove_from_chain("chain", "c")
1657 self._check_chain(butler, ["a", "b", "d"])
1659 # Duplicates are allowed in the list of children
1660 butler.collections.remove_from_chain("chain", ["b", "b", "a"])
1661 self._check_chain(butler, ["d"])
1663 # Empty remove does nothing
1664 butler.collections.remove_from_chain("chain", [])
1665 self._check_chain(butler, ["d"])
1667 # Removing children that aren't in the chain does nothing
1668 butler.collections.remove_from_chain("chain", ["a", "chain"])
1669 self._check_chain(butler, ["d"])
1671 self._test_common_chain_functionality(
1672 butler, butler.collections.remove_from_chain, skip_cycle_check=True
1673 )
1675 def _setup_to_test_collection_chain(self) -> Butler:
1676 butler = self.create_empty_butler(writeable=True)
1678 butler.collections.register("chain", CollectionType.CHAINED)
1680 runs = ["a", "b", "c", "d"]
1681 for run in runs:
1682 butler.collections.register(run)
1684 butler.collections.register("staticchain", CollectionType.CHAINED)
1685 butler.collections.redefine_chain("staticchain", ["a", "b"])
1687 return butler
1689 def _check_chain(self, butler: Butler, expected: list[str]) -> None:
1690 children = butler.collections.get_info("chain").children
1691 self.assertEqual(expected, list(children))
1693 def _test_common_chain_functionality(
1694 self, butler, func: Callable[[str, str | list[str]], Any], *, skip_cycle_check=False
1695 ) -> None:
1696 # Missing parent collection
1697 with self.assertRaises(MissingCollectionError):
1698 func("doesnotexist", [])
1699 # Missing child collection
1700 with self.assertRaises(MissingCollectionError):
1701 func("chain", ["doesnotexist"])
1702 # Forbid operations on non-chained collections
1703 with self.assertRaises(CollectionTypeError):
1704 func("d", ["a"])
1706 # Prevent collection cycles
1707 if not skip_cycle_check:
1708 butler.collections.register("chain2", CollectionType.CHAINED)
1709 func("chain2", "chain")
1710 with self.assertRaises(CollectionCycleError):
1711 func("chain", "chain2")
1713 # Make sure none of the earlier operations interfered with unrelated
1714 # chains.
1715 self.assertEqual(["a", "b"], list(butler.collections.get_info("staticchain").children))
1717 with butler._caching_context():
1718 with self.assertRaisesRegex(RuntimeError, "Chained collection modification not permitted"):
1719 func("chain", "a")
1721 def test_transfer_dimension_records_from(self) -> None:
1722 source_butler = self.create_empty_butler(writeable=True)
1723 source_butler.import_(filename=_get_test_data_path("lsstcam-subset.yaml"))
1725 visit_id = 2025120200439
1726 exposure_id = visit_id
1727 target_butler = self.enterContext(create_populated_sqlite_registry())
1728 target_butler.transfer_dimension_records_from(
1729 source_butler,
1730 [
1731 # Should trigger the lookup of visit and all its associated
1732 # "populated_by" records (visit_detector_region,
1733 # visit_definition, etc.)
1734 DataCoordinate.standardize(
1735 {"instrument": "LSSTCam", "visit": visit_id, "detector": 10},
1736 universe=source_butler.dimensions,
1737 ),
1738 # Shouldn't add any records to the lookup.
1739 DataCoordinate.make_empty(source_butler.dimensions),
1740 ],
1741 )
1743 def _fetch_record(dimension: str) -> DimensionRecord:
1744 records = target_butler.query_dimension_records(dimension)
1745 self.assertEqual(len(records), 1)
1746 return records[0]
1748 visit = _fetch_record("visit")
1749 self.assertEqual(visit.id, visit_id)
1750 self.assertEqual(visit.day_obs, 20251202)
1751 self.assertEqual(visit.target_name, "lowdust")
1752 self.assertEqual(visit.seq_num, 439)
1753 original_visit = source_butler.query_dimension_records("visit", instrument="LSSTCam", visit=visit_id)[
1754 0
1755 ]
1756 self.assertEqual(visit.region, original_visit.region)
1757 self.assertEqual(visit.timespan, original_visit.timespan)
1759 visit_detector_region = _fetch_record("visit_detector_region")
1760 self.assertEqual(visit_detector_region.instrument, "LSSTCam")
1761 self.assertEqual(visit_detector_region.detector, 10)
1762 self.assertEqual(visit_detector_region.visit, visit_id)
1763 original_visit_detector_region = source_butler.query_dimension_records(
1764 "visit_detector_region", instrument="LSSTCam", visit=visit_id, detector=10
1765 )[0]
1766 self.assertEqual(visit_detector_region.region, original_visit_detector_region.region)
1768 visit_definition = _fetch_record("visit_definition")
1769 self.assertEqual(visit_definition.instrument, "LSSTCam")
1770 self.assertEqual(visit_definition.exposure, 2025120200439)
1771 self.assertEqual(visit_definition.visit, visit_id)
1773 # The matching exposure record should have been pulled in via
1774 # visit -> visit_definition.
1775 exposure = _fetch_record("exposure")
1776 self.assertEqual(exposure.instrument, "LSSTCam")
1777 self.assertEqual(exposure.id, 2025120200439)
1778 self.assertEqual(exposure.obs_id, "MC_O_20251202_000439")
1779 original_exposure = source_butler.query_dimension_records(
1780 "exposure", instrument="LSSTCam", exposure=exposure_id
1781 )[0]
1782 self.assertEqual(exposure.timespan, original_exposure.timespan)
1784 group = _fetch_record("group")
1785 self.assertEqual(group.instrument, "LSSTCam")
1786 self.assertEqual(group.name, "2025-12-03T07:58:10.858")
1788 visit_system_memberships = target_butler.query_dimension_records("visit_system_membership")
1789 visit_system_memberships.sort(key=lambda record: record.visit_system)
1790 self.assertEqual(len(visit_system_memberships), 2)
1791 self.assertEqual(visit_system_memberships[0].visit_system, 0)
1792 self.assertEqual(visit_system_memberships[1].visit_system, 2)
1793 self.assertEqual(visit_system_memberships[0].visit, visit_id)
1794 self.assertEqual(visit_system_memberships[1].visit, visit_id)
1796 visit_systems = target_butler.query_dimension_records("visit_system")
1797 visit_systems.sort(key=lambda record: record.id)
1798 visit_system_memberships.sort(key=lambda record: record.visit_system)
1799 self.assertEqual(visit_systems[0].id, 0)
1800 self.assertEqual(visit_systems[1].id, 2)
1801 self.assertEqual(visit_systems[0].name, "one-to-one")
1802 self.assertEqual(visit_systems[1].name, "by-seq-start-end")
1805class FileDatastoreButlerTests(ButlerTests):
1806 """Common tests and specialization of ButlerTests for butlers backed
1807 by datastores that inherit from FileDatastore.
1808 """
1810 trustModeSupported = True
1812 def checkFileExists(self, root: str | ResourcePath, relpath: str | ResourcePath) -> bool:
1813 """Check if file exists at a given path (relative to root).
1815 Test testPutTemplates verifies actual physical existance of the files
1816 in the requested location.
1817 """
1818 uri = ResourcePath(root, forceDirectory=True)
1819 return uri.join(relpath).exists()
1821 def testPutTemplates(self) -> None:
1822 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1823 butler = self.create_empty_butler(run=self.default_run)
1825 # Add needed Dimensions
1826 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
1827 butler.registry.insertDimensionData(
1828 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
1829 )
1830 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101})
1831 butler.registry.insertDimensionData(
1832 "visit",
1833 {
1834 "instrument": "DummyCamComp",
1835 "id": 423,
1836 "name": "v423",
1837 "physical_filter": "d-r",
1838 "day_obs": 20250101,
1839 },
1840 )
1841 butler.registry.insertDimensionData(
1842 "visit",
1843 {
1844 "instrument": "DummyCamComp",
1845 "id": 425,
1846 "name": "v425",
1847 "physical_filter": "d-r",
1848 "day_obs": 20250101,
1849 },
1850 )
1852 # Create and store a dataset
1853 metric = makeExampleMetrics()
1855 # Create two almost-identical DatasetTypes (both will use default
1856 # template)
1857 dimensions = butler.dimensions.conform(["instrument", "visit"])
1858 butler.registry.registerDatasetType(DatasetType("metric1", dimensions, storageClass))
1859 butler.registry.registerDatasetType(DatasetType("metric2", dimensions, storageClass))
1860 butler.registry.registerDatasetType(DatasetType("metric3", dimensions, storageClass))
1862 dataId1 = {"instrument": "DummyCamComp", "visit": 423}
1863 dataId2 = {"instrument": "DummyCamComp", "visit": 423, "physical_filter": "d-r"}
1865 # Put with exactly the data ID keys needed
1866 ref = butler.put(metric, "metric1", dataId1)
1867 uri = butler.getURI(ref)
1868 self.assertTrue(uri.exists())
1869 self.assertTrue(
1870 uri.unquoted_path.endswith(f"{self.default_run}/metric1/??#?/d-r/DummyCamComp_423.pickle")
1871 )
1873 # Check the template based on dimensions
1874 if hasattr(butler._datastore, "templates"):
1875 butler._datastore.templates.validateTemplates([ref])
1877 # Put with extra data ID keys (physical_filter is an optional
1878 # dependency); should not change template (at least the way we're
1879 # defining them to behave now; the important thing is that they
1880 # must be consistent).
1881 ref = butler.put(metric, "metric2", dataId2)
1882 uri = butler.getURI(ref)
1883 self.assertTrue(uri.exists())
1884 self.assertTrue(
1885 uri.unquoted_path.endswith(f"{self.default_run}/metric2/d-r/DummyCamComp_v423.pickle")
1886 )
1888 # Check the template based on dimensions
1889 if hasattr(butler._datastore, "templates"):
1890 butler._datastore.templates.validateTemplates([ref])
1892 # Use a template that has a typo in dimension record metadata.
1893 # Easier to test with a butler that has a ref with records attached.
1894 template = FileTemplate("a/{visit.name}/{id}_{visit.namex:?}.fits")
1895 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"):
1896 path = template.format(ref)
1897 self.assertEqual(path, f"a/v423/{ref.id}_fits")
1899 template = FileTemplate("a/{visit.name}/{id}_{visit.namex}.fits")
1900 with self.assertRaises(KeyError):
1901 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"):
1902 template.format(ref)
1904 # Now use a file template that will not result in unique filenames
1905 with self.assertRaises(FileTemplateValidationError):
1906 butler.put(metric, "metric3", dataId1)
1908 def testImportExport(self) -> None:
1909 # Run put/get tests just to create and populate a repo.
1910 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1911 self.runImportExportTest(storageClass)
1913 @unittest.expectedFailure
1914 def testImportExportVirtualComposite(self) -> None:
1915 # Run put/get tests just to create and populate a repo.
1916 storageClass = self.storageClassFactory.getStorageClass("StructuredComposite")
1917 self.runImportExportTest(storageClass)
1919 def runImportExportTest(self, storageClass: StorageClass) -> None:
1920 """Test exporting and importing.
1922 This test does an export to a temp directory and an import back
1923 into a new temp directory repo. It does not assume a posix datastore.
1924 """
1925 exportButler = self.runPutGetTest(storageClass, "test_metric")
1927 # Test that we must have a file extension.
1928 with self.assertRaises(ValueError):
1929 with exportButler.export(filename="dump", directory=".") as export:
1930 pass
1932 # Test that unknown format is not allowed.
1933 with self.assertRaises(ValueError):
1934 with exportButler.export(filename="dump.fits", directory=".") as export:
1935 pass
1937 # Test that the repo actually has at least one dataset.
1938 datasets = list(exportButler.registry.queryDatasets(..., collections=...))
1939 self.assertGreater(len(datasets), 0)
1940 # Add a DimensionRecord that's unused by those datasets.
1941 skymapRecord = {"name": "example_skymap", "hash": (50).to_bytes(8, byteorder="little")}
1942 exportButler.registry.insertDimensionData("skymap", skymapRecord)
1943 # Export and then import datasets.
1944 with safeTestTempDir(TESTDIR) as exportDir:
1945 exportFile = os.path.join(exportDir, "exports.yaml")
1946 with exportButler.export(filename=exportFile, directory=exportDir, transfer="auto") as export:
1947 export.saveDatasets(datasets)
1948 # Export the same datasets again. This should quietly do
1949 # nothing because of internal deduplication, and it shouldn't
1950 # complain about being asked to export the "htm7" elements even
1951 # though there aren't any in these datasets or in the database.
1952 export.saveDatasets(datasets, elements=["htm7"])
1953 # Save one of the data IDs again; this should be harmless
1954 # because of internal deduplication.
1955 export.saveDataIds([datasets[0].dataId])
1956 # Save some dimension records directly.
1957 export.saveDimensionData("skymap", [skymapRecord])
1958 self.assertTrue(os.path.exists(exportFile))
1959 with safeTestTempDir(TESTDIR) as importDir:
1960 # We always want this to be a local posix butler
1961 Butler.makeRepo(importDir, config=Config(os.path.join(TESTDIR, "config/basic/butler.yaml")))
1962 # Calling script.butlerImport tests the implementation of the
1963 # butler command line interface "import" subcommand. Functions
1964 # in the script folder are generally considered protected and
1965 # should not be used as public api.
1966 with open(exportFile) as f:
1967 script.butlerImport(
1968 importDir,
1969 export_file=f,
1970 directory=exportDir,
1971 transfer="auto",
1972 skip_dimensions=None,
1973 )
1974 importButler = Butler.from_config(importDir, run=self.default_run)
1975 self.enterContext(importButler)
1976 for ref in datasets:
1977 with self.subTest(ref=repr(ref)):
1978 # Test for existence by passing in the DatasetType and
1979 # data ID separately, to avoid lookup by dataset_id.
1980 self.assertTrue(importButler.exists(ref.datasetType, ref.dataId))
1981 self.assertEqual(
1982 list(importButler.registry.queryDimensionRecords("skymap")),
1983 [importButler.dimensions["skymap"].RecordClass(**skymapRecord)],
1984 )
1986 def testRemoveRuns(self) -> None:
1987 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
1988 butler = self.create_empty_butler(writeable=True)
1989 # Load registry data with dimensions to hang datasets off of.
1990 butler.import_(filename=ResourcePath("resource://lsst.daf.butler/tests/registry_data/base.yaml"))
1991 # Add some RUN-type collection.
1992 run1 = "run1"
1993 butler.collections.register(run1)
1994 run2 = "run2"
1995 butler.collections.register(run2)
1996 # put a dataset in each
1997 metric = makeExampleMetrics()
1998 dimensions = butler.dimensions.conform(["instrument", "physical_filter"])
1999 datasetType = self.addDatasetType(
2000 "prune_collections_test_dataset", dimensions, storageClass, butler.registry
2001 )
2002 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1)
2003 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2)
2004 uri1 = butler.getURI(ref1)
2005 uri2 = butler.getURI(ref2)
2007 # Put one of the runs in a chain.
2008 butler.collections.register("Chain", CollectionType.CHAINED)
2009 butler.collections.extend_chain("Chain", run1)
2011 with self.assertRaises(OrphanedRecordError):
2012 butler.registry.removeDatasetType(datasetType.name)
2014 # Remove a non-run.
2015 with self.assertRaises(TypeError):
2016 butler.removeRuns(["Chain"])
2018 # Remove without unlinking from chain should fail.
2019 with self.assertRaises(IntegrityError):
2020 butler.removeRuns([run1])
2022 # Remove from both runs. No longer use unstore parameter since it
2023 # always purges.
2024 butler.removeRuns([run1, run2], unlink_from_chains=True)
2026 # Should be nothing in registry for either one, and datastore should
2027 # not think either exists.
2028 with self.assertRaises(MissingCollectionError):
2029 butler.collections.get_info(run1)
2030 with self.assertRaises(MissingCollectionError):
2031 butler.collections.get_info(run1)
2032 self.assertFalse(butler.stored(ref1))
2033 self.assertFalse(butler.stored(ref2))
2034 # We always unstore so both URIs should be gone.
2035 self.assertFalse(uri1.exists())
2036 self.assertFalse(uri2.exists())
2038 # Now that the collections have been pruned we can remove the
2039 # dataset type
2040 butler.registry.removeDatasetType(datasetType.name)
2042 with self.assertLogs("lsst.daf.butler.registry", "INFO") as cm:
2043 butler.registry.removeDatasetType(("test*", "test*"))
2044 self.assertIn("not defined", "\n".join(cm.output))
2046 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None:
2047 """Simulate an external actor removing a file outside of Butler's
2048 knowledge.
2050 Subclasses may override to handle more complicated datastore
2051 configurations.
2052 """
2053 uri = butler.getURI(ref)
2054 uri.remove()
2055 datastore = cast(FileDatastore, butler._datastore)
2056 datastore.cacheManager.remove_from_cache(ref)
2058 def testPruneDatasets(self) -> None:
2059 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2060 butler = self.create_empty_butler(writeable=True)
2061 # Load registry data with dimensions to hang datasets off of.
2062 butler.import_(filename=_get_test_data_path("base.yaml"))
2063 # Add some RUN-type collections.
2064 run1 = "run1"
2065 butler.collections.register(run1)
2066 run2 = "run2"
2067 butler.collections.register(run2)
2068 # put some datasets. ref1 and ref2 have the same data ID, and are in
2069 # different runs. ref3 has a different data ID.
2070 metric = makeExampleMetrics()
2071 dimensions = butler.dimensions.conform(["instrument", "physical_filter"])
2072 datasetType = self.addDatasetType(
2073 "prune_collections_test_dataset", dimensions, storageClass, butler.registry
2074 )
2075 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1)
2076 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2)
2077 ref3 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-R1"}, run=run1)
2079 many_stored = butler.stored_many([ref1, ref2, ref3])
2080 for ref, stored in many_stored.items():
2081 self.assertTrue(stored, f"Ref {ref} should be stored")
2083 many_exists = butler._exists_many([ref1, ref2, ref3])
2084 for ref, exists in many_exists.items():
2085 self.assertTrue(exists, f"Checking ref {ref} exists.")
2086 self.assertEqual(exists, DatasetExistence.VERIFIED, f"Ref {ref} should be stored")
2088 # Simple prune.
2089 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True)
2090 self.assertFalse(butler.exists(ref1.datasetType, ref1.dataId, collections=run1))
2092 many_stored = butler.stored_many([ref1, ref2, ref3])
2093 for ref, stored in many_stored.items():
2094 self.assertFalse(stored, f"Ref {ref} should not be stored")
2096 many_exists = butler._exists_many([ref1, ref2, ref3])
2097 for ref, exists in many_exists.items():
2098 self.assertEqual(exists, DatasetExistence.UNRECOGNIZED, f"Ref {ref} should not be stored")
2100 # Put data back.
2101 ref1_new = butler.put(metric, ref1)
2102 self.assertEqual(ref1_new, ref1) # Reuses original ID.
2103 ref2 = butler.put(metric, ref2)
2105 many_stored = butler.stored_many([ref1, ref2, ref3])
2106 self.assertTrue(many_stored[ref1])
2107 self.assertTrue(many_stored[ref2])
2108 self.assertFalse(many_stored[ref3])
2110 ref3 = butler.put(metric, ref3)
2112 many_exists = butler._exists_many([ref1, ref2, ref3])
2113 for ref, exists in many_exists.items():
2114 self.assertTrue(exists, f"Ref {ref} should not be stored")
2116 # Clear out the datasets from registry and start again.
2117 refs = [ref1, ref2, ref3]
2118 butler.pruneDatasets(refs, purge=True, unstore=True)
2119 for ref in refs:
2120 butler.put(metric, ref)
2122 # Confirm we can retrieve deferred.
2123 dref1 = butler.getDeferred(ref1) # known and exists
2124 metric1 = dref1.get()
2125 self.assertEqual(metric1, metric)
2127 # Test different forms of file availability.
2128 # Need to be in a state where:
2129 # - one ref just has registry record.
2130 # - one ref has a missing file but a datastore record.
2131 # - one ref has a missing datastore record but file is there.
2132 # - one ref does not exist anywhere.
2133 # Do not need to test a ref that has everything since that is tested
2134 # above.
2135 ref0 = DatasetRef(
2136 datasetType,
2137 DataCoordinate.standardize(
2138 {"instrument": "Cam1", "physical_filter": "Cam1-G"}, universe=butler.dimensions
2139 ),
2140 run=run1,
2141 )
2143 # Delete from datastore and retain in Registry.
2144 butler.pruneDatasets([ref1], purge=False, unstore=True, disassociate=False)
2146 # File has been removed.
2147 self.remove_dataset_out_of_band(butler, ref2)
2149 # Datastore has lost track.
2150 butler._datastore.forget([ref3])
2152 # First test with a standard butler.
2153 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True)
2154 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2155 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED)
2156 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE)
2157 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED)
2159 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=False)
2160 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2161 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED | DatasetExistence._ASSUMED)
2162 self.assertEqual(exists_many[ref2], DatasetExistence.KNOWN)
2163 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ASSUMED)
2164 self.assertTrue(exists_many[ref2])
2166 # Check that per-ref query gives the same answer as many query.
2167 for ref, exists in exists_many.items():
2168 self.assertEqual(butler.exists(ref, full_check=False), exists)
2170 # Get deferred checks for existence before it allows it to be
2171 # retrieved.
2172 with self.assertRaises(LookupError):
2173 butler.getDeferred(ref3) # not known, file exists
2174 dref2 = butler.getDeferred(ref2) # known but file missing
2175 with self.assertRaises(FileNotFoundError):
2176 dref2.get()
2178 # Test again with a trusting butler.
2179 if self.trustModeSupported:
2180 butler._datastore.trustGetRequest = True
2181 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True)
2182 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED)
2183 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED)
2184 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE)
2185 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ARTIFACT)
2187 # When trusting we can get a deferred dataset handle that is not
2188 # known but does exist.
2189 dref3 = butler.getDeferred(ref3)
2190 metric3 = dref3.get()
2191 self.assertEqual(metric3, metric)
2193 # Check that per-ref query gives the same answer as many query.
2194 for ref, exists in exists_many.items():
2195 self.assertEqual(butler.exists(ref, full_check=True), exists)
2197 # Create a ref that surprisingly has the UUID of an existing ref
2198 # but is not the same.
2199 ref_bad = DatasetRef(datasetType, dataId=ref3.dataId, run=ref3.run, id=ref2.id)
2200 with self.assertRaises(ValueError):
2201 butler.exists(ref_bad)
2203 # Create a ref that has a compatible storage class.
2204 ref_compat = ref2.overrideStorageClass("StructuredDataDict")
2205 exists = butler.exists(ref_compat)
2206 self.assertEqual(exists, exists_many[ref2])
2208 # Remove everything and start from scratch.
2209 butler._datastore.trustGetRequest = False
2210 butler.pruneDatasets(refs, purge=True, unstore=True)
2211 for ref in refs:
2212 butler.put(metric, ref)
2214 # These tests mess directly with the trash table and can leave the
2215 # datastore in an odd state. Do them at the end.
2216 # Check that in normal mode, deleting the record will lead to
2217 # trash not touching the file.
2218 uri1 = butler.getURI(ref1)
2219 butler._datastore.bridge.moveToTrash(
2220 [ref1], transaction=None
2221 ) # Update the dataset_location table
2222 butler._datastore.forget([ref1])
2223 butler._datastore.trash(ref1)
2224 butler._datastore.emptyTrash()
2225 self.assertTrue(uri1.exists())
2226 uri1.remove() # Clean it up.
2228 # Simulate execution butler setup by deleting the datastore
2229 # record but keeping the file around and trusting.
2230 butler._datastore.trustGetRequest = True
2231 uris = butler.get_many_uris([ref2, ref3])
2232 uri2 = uris[ref2].primaryURI
2233 uri3 = uris[ref3].primaryURI
2234 self.assertTrue(uri2.exists())
2235 self.assertTrue(uri3.exists())
2237 # Remove the datastore record.
2238 butler._datastore.bridge.moveToTrash(
2239 [ref2], transaction=None
2240 ) # Update the dataset_location table
2241 butler._datastore.forget([ref2])
2242 self.assertTrue(uri2.exists())
2243 butler._datastore.trash([ref2, ref3])
2244 # Immediate removal for ref2 file
2245 self.assertFalse(uri2.exists())
2246 # But ref3 has to wait for the empty.
2247 self.assertTrue(uri3.exists())
2248 butler._datastore.emptyTrash()
2249 self.assertFalse(uri3.exists())
2251 # Clear out the datasets from registry.
2252 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True)
2254 def test_butler_metrics(self):
2255 """Test that metrics are collected."""
2256 run = "test_run"
2257 metrics = ButlerMetrics()
2258 butler, datasetType = self.create_butler(
2259 run, "MetricsExampleModelProvenance", "prov_metric", metrics=metrics
2260 )
2261 data = MetricsExampleModel(
2262 summary={"AM1": 5.2, "AM2": 30.6},
2263 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
2264 data=[563, 234, 456.7, 752, 8, 9, 27],
2265 )
2267 data_ref = butler.put(data, datasetType, visit=424, instrument="DummyCamComp")
2268 butler.get(data_ref)
2269 butler.get(data_ref)
2270 self.assertEqual(metrics.n_get, 2)
2271 self.assertGreater(metrics.time_in_get, 0.0)
2272 self.assertEqual(metrics.n_put, 1)
2273 self.assertGreater(metrics.time_in_put, 0.0)
2275 deferred = butler.getDeferred(data_ref)
2276 deferred.get()
2277 self.assertEqual(metrics.n_get, 3)
2279 with butler.record_metrics() as new:
2280 data_ref_2 = butler.put(data, datasetType, visit=425, instrument="DummyCamComp")
2281 butler.get(data_ref)
2283 butler.pruneDatasets([data_ref, data_ref_2], purge=True, unstore=True)
2284 with ResourcePath.temporary_uri(suffix=".json") as tmpFile:
2285 tmpFile.write(data.model_dump_json().encode())
2286 refs = [
2287 DatasetRef(datasetType, data_ref_2.dataId, run),
2288 DatasetRef(datasetType, data_ref.dataId, run),
2289 ]
2290 datasets = [FileDataset(path=tmpFile, refs=refs)]
2291 butler.ingest(*datasets, transfer="copy")
2293 self.assertEqual(new.n_get, 1)
2294 self.assertEqual(new.n_put, 1)
2295 self.assertEqual(new.n_ingest, 2)
2298class PosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2299 """PosixDatastore specialization of a butler"""
2301 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2302 fullConfigKey: str | None = ".datastore.formatters"
2303 validationCanFail = True
2304 datastoreStr = ["/tmp"]
2305 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"]
2306 registryStr = "/gen3.sqlite3"
2308 def testPathConstructor(self) -> None:
2309 """Independent test of constructor using PathLike."""
2310 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run)
2311 self.enterContext(butler)
2312 self.assertIsInstance(butler, Butler)
2314 # And again with a Path object with the butler yaml
2315 path = pathlib.Path(self.tmpConfigFile)
2316 butler = Butler.from_config(path, writeable=False)
2317 self.enterContext(butler)
2318 self.assertIsInstance(butler, Butler)
2320 # And again with a Path object without the butler yaml
2321 # (making sure we skip it if the tmp config doesn't end
2322 # in butler.yaml -- which is the case for a subclass)
2323 if self.tmpConfigFile.endswith("butler.yaml"):
2324 path = pathlib.Path(os.path.dirname(self.tmpConfigFile))
2325 butler = Butler.from_config(path, writeable=False)
2326 self.enterContext(butler)
2327 self.assertIsInstance(butler, Butler)
2329 def testExportTransferCopy(self) -> None:
2330 """Test local export using all transfer modes"""
2331 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2332 exportButler = self.runPutGetTest(storageClass, "test_metric")
2333 # Test that the repo actually has at least one dataset.
2334 datasets = list(exportButler.registry.queryDatasets(..., collections=...))
2335 self.assertGreater(len(datasets), 0)
2336 uris = [exportButler.getURI(d) for d in datasets]
2337 assert isinstance(exportButler._datastore, FileDatastore)
2338 datastoreRoot = exportButler.get_datastore_roots()[exportButler.get_datastore_names()[0]]
2340 pathsInStore = [uri.relative_to(datastoreRoot) for uri in uris]
2342 for path in pathsInStore:
2343 # Assume local file system
2344 assert path is not None
2345 self.assertTrue(self.checkFileExists(datastoreRoot, path), f"Checking path {path}")
2347 for transfer in ("copy", "link", "symlink", "relsymlink"):
2348 with safeTestTempDir(TESTDIR) as exportDir:
2349 with exportButler.export(directory=exportDir, format="yaml", transfer=transfer) as export:
2350 export.saveDatasets(datasets)
2351 for path in pathsInStore:
2352 assert path is not None
2353 self.assertTrue(
2354 self.checkFileExists(exportDir, path),
2355 f"Check that mode {transfer} exported files",
2356 )
2358 def testPytypeCoercion(self) -> None:
2359 """Test python type coercion on Butler.get and put."""
2360 # Store some data with the normal example storage class.
2361 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2362 datasetTypeName = "test_metric"
2363 butler = self.runPutGetTest(storageClass, datasetTypeName)
2365 dataId = {"instrument": "DummyCamComp", "visit": 423}
2366 metric = butler.get(datasetTypeName, dataId=dataId)
2367 self.assertEqual(get_full_type_name(metric), "lsst.daf.butler.tests.MetricsExample")
2369 datasetType_ori = butler.get_dataset_type(datasetTypeName)
2370 self.assertEqual(datasetType_ori.storageClass.name, "StructuredDataNoComponents")
2372 # Now need to hack the registry dataset type definition.
2373 # There is no API for this.
2374 assert isinstance(butler._registry, SqlRegistry)
2375 manager = butler._registry._managers.datasets
2376 assert hasattr(manager, "_db") and hasattr(manager, "_static")
2377 manager._db.update(
2378 manager._static.dataset_type,
2379 {"name": datasetTypeName},
2380 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataNoComponentsModel"},
2381 )
2383 # Force reset of dataset type cache
2384 butler.registry.refresh()
2386 datasetType_new = butler.get_dataset_type(datasetTypeName)
2387 self.assertEqual(datasetType_new.name, datasetType_ori.name)
2388 self.assertEqual(datasetType_new.storageClass.name, "StructuredDataNoComponentsModel")
2390 metric_model = butler.get(datasetTypeName, dataId=dataId)
2391 self.assertNotEqual(type(metric_model), type(metric))
2392 self.assertEqual(get_full_type_name(metric_model), "lsst.daf.butler.tests.MetricsExampleModel")
2394 # Put the model and read it back to show that everything now
2395 # works as normal.
2396 metric_ref = butler.put(metric_model, datasetTypeName, dataId=dataId, visit=424)
2397 metric_model_new = butler.get(metric_ref)
2398 self.assertEqual(metric_model_new, metric_model)
2400 # Hack the storage class again to something that will fail on the
2401 # get with no conversion class.
2402 manager._db.update(
2403 manager._static.dataset_type,
2404 {"name": datasetTypeName},
2405 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataListYaml"},
2406 )
2407 butler.registry.refresh()
2409 with self.assertRaises(ValueError):
2410 butler.get(datasetTypeName, dataId=dataId)
2412 def test_provenance(self):
2413 """Test that provenance is attached on put."""
2414 run = "test_run"
2415 butler, datasetType = self.create_butler(run, "MetricsExampleModelProvenance", "prov_metric")
2416 metric = MetricsExampleModel(
2417 summary={"AM1": 5.2, "AM2": 30.6},
2418 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
2419 data=[563, 234, 456.7, 752, 8, 9, 27],
2420 )
2421 # Provenance can be attached to the object being put. Whether
2422 # it is or not is dependent on the formatter. For this test we
2423 # copy on adding provenance to ensure they differ.
2424 self.assertIsNone(metric.dataset_id)
2425 metric_ref = butler.put(metric, datasetType, visit=424, instrument="DummyCamComp")
2426 self.assertIsNone(metric.dataset_id)
2427 metric_2 = butler.get(metric_ref)
2428 self.assertEqual(metric_2.data, metric.data)
2429 self.assertEqual(metric_2.dataset_id, metric_ref.id)
2430 self.assertIsNone(metric_2.provenance)
2432 # Put with provenance.
2433 prov = DatasetProvenance(quantum_id=uuid.uuid4())
2434 prov.add_input(metric_ref)
2435 prov.add_extra_provenance(metric_ref.id, {"answer": 42})
2436 metric_ref2 = butler.put(metric, datasetType, visit=423, instrument="DummyCamComp", provenance=prov)
2437 metric_3 = butler.get(metric_ref2)
2438 self.assertEqual(metric_3.provenance, prov)
2440 # Check that we can extract provenance from dict form.
2441 prov_dict = prov.to_flat_dict(metric_ref2)
2442 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler)
2443 self.assertEqual(ref_from_prov, metric_ref2)
2444 # Direct __eq__ of the provenance does not work because one side
2445 # includes dimension records.
2446 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs})
2447 self.assertEqual(prov_from_prov.quantum_id, prov.quantum_id)
2448 self.assertEqual(prov_from_prov.extras, prov.extras)
2450 # Force a bad ID into the dict.
2451 prov_dict["id"] = uuid.uuid4()
2452 with self.assertRaises(ValueError):
2453 DatasetProvenance.from_flat_dict(prov_dict, butler)
2454 del prov_dict["id"]
2455 prov_dict["input 0 id"] = uuid.uuid4()
2456 with self.assertRaises(ValueError):
2457 DatasetProvenance.from_flat_dict(prov_dict, butler)
2459 # Check that simple types can be reconstructed with non-standard
2460 # separators.
2461 prov_dict = prov.to_flat_dict(metric_ref2, prefix="XYZ", sep="😎", simple_types=True)
2462 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler)
2463 self.assertEqual(ref_from_prov, metric_ref2)
2464 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs})
2466 with self.assertRaises(ValueError):
2467 DatasetProvenance.from_flat_dict({"unknown": 42}, butler)
2469 def test_specialized_file_datasets_functions(self):
2470 """Test a workflow used in Prompt Processing where we export datasets
2471 from one repository and write them in-place to the datastore of
2472 another, without immediately inserting registry entries for the
2473 datasets.
2474 """
2475 repo = MetricTestRepo.create_from_butler(
2476 self.create_empty_butler(writeable=True),
2477 self.tmpConfigFile,
2478 "StructuredCompositeReadCompNoDisassembly",
2479 )
2480 source_butler = repo.butler
2482 # Test writing outputs to a FileDatastore.
2483 with tempfile.TemporaryDirectory() as tempdir:
2484 target_repo_config = Butler.makeRepo(tempdir)
2485 refs = [repo.ref1, repo.ref2]
2486 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs)
2487 self.assertEqual(len(datasets), 2)
2488 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets})
2489 for dataset in datasets:
2490 path = ResourcePath(dataset.path, forceAbsolute=False)
2491 # Paths should be relative paths to the target datastore.
2492 self.assertFalse(path.isabs())
2493 # Files should have been copied into the target datastore
2494 self.assertTrue(ResourcePath(tempdir).join(path).exists())
2496 # Make sure the target Butler can ingest the datasets.
2497 target_butler = Butler(target_repo_config, writeable=True)
2498 self.enterContext(target_butler)
2499 target_butler.transfer_dimension_records_from(source_butler, refs)
2500 target_butler.ingest(*datasets, transfer=None)
2501 self.assertIsNotNone(target_butler.get(repo.ref1))
2502 self.assertIsNotNone(target_butler.get(repo.ref2))
2504 # Giving an empty list of files is a no-op.
2505 no_datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), [])
2506 self.assertEqual(len(no_datasets), 0)
2508 # Test writing outputs to a ChainedDatastore.
2509 with tempfile.TemporaryDirectory() as tempdir:
2510 # Set up a second dataset type, so we can split the files across
2511 # multiple datastore roots.
2512 dt1 = repo.datasetType
2513 dt2 = DatasetType("other", dt1.dimensions, dt1.storageClass)
2514 source_butler.registry.registerDatasetType(dt2)
2515 other_ref = repo.addDataset(repo.ref1.dataId, datasetType=dt2)
2516 config = Config.fromString(
2517 f"""
2518 datastore:
2519 cls: lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore
2520 datastore_constraints:
2521 - constraints:
2522 accept:
2523 - {dt1.name}
2524 - constraints:
2525 accept:
2526 - {dt2.name}
2527 datastores:
2528 - datastore:
2529 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
2530 root: <butlerRoot>/FileDatastore_0
2531 - datastore:
2532 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
2533 root: <butlerRoot>/FileDatastore_1
2534 """
2535 )
2536 target_repo_config = Butler.makeRepo(tempdir, config)
2537 refs = [repo.ref1, repo.ref2, other_ref]
2538 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs)
2539 self.assertEqual(len(datasets), 3)
2540 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets})
2541 for dataset in datasets:
2542 path = ResourcePath(dataset.path, forceAbsolute=False)
2543 # Paths should be relative paths to the target datastore.
2544 self.assertFalse(path.isabs())
2545 # Files should have been split up between the two datastores
2546 # in the chain.
2547 datastore_root = ResourcePath(tempdir)
2548 if dataset.refs[0].datasetType.name == dt1.name:
2549 datastore_root = datastore_root.join("FileDatastore_0")
2550 else:
2551 datastore_root = datastore_root.join("FileDatastore_1")
2552 self.assertTrue(datastore_root.join(path).exists())
2554 # Make sure the target Butler can ingest the datasets.
2555 target_butler = Butler(target_repo_config, writeable=True)
2556 self.enterContext(target_butler)
2557 target_butler.transfer_dimension_records_from(source_butler, refs)
2558 target_butler.ingest(*datasets, transfer=None)
2559 self.assertIsNotNone(target_butler.get(repo.ref1))
2560 self.assertIsNotNone(target_butler.get(repo.ref2))
2561 self.assertIsNotNone(target_butler.get(other_ref))
2563 def test_temporary_for_ingest(self) -> None:
2564 """Test the `lsst.daf.butler._rubin.ingest_from_temporary` module."""
2565 with self.create_empty_butler("example_run") as butler:
2566 dataset_type = DatasetType("example", butler.dimensions.empty, "StructuredDataDict")
2567 butler.registry.registerDatasetType(dataset_type)
2568 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(butler.dimensions), "example_run")
2569 with TemporaryForIngest(butler, ref) as temporary:
2570 temporary.path.write(b"three: 3")
2571 found = TemporaryForIngest.find_orphaned_temporaries_by_ref(ref, butler)
2572 self.assertEqual(found, [temporary.path])
2573 self.assertIn(".tmp", temporary.ospath)
2574 temporary.ingest()
2575 loaded = butler.get(ref)
2576 self.assertEqual(loaded, {"three": 3})
2579class PostgresPosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2580 """PosixDatastore specialization of a butler using Postgres"""
2582 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2583 fullConfigKey = ".datastore.formatters"
2584 validationCanFail = True
2585 datastoreStr = ["/tmp"]
2586 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"]
2587 registryStr = "PostgreSQL@test"
2589 @classmethod
2590 def setUpClass(cls) -> None:
2591 cls.postgresql = cls.enterClassContext(setup_postgres_test_db())
2592 super().setUpClass()
2594 def setUp(self) -> None:
2595 # Need to add a registry section to the config.
2596 self._temp_config = False
2597 config = Config(self.configFile)
2598 self.postgresql.patch_butler_config(config)
2599 with tempfile.NamedTemporaryFile("w", suffix=".yaml", delete=False) as fh:
2600 config.dump(fh)
2601 self.configFile = fh.name
2602 self._temp_config = True
2603 super().setUp()
2605 def tearDown(self) -> None:
2606 if self._temp_config and os.path.exists(self.configFile):
2607 os.remove(self.configFile)
2608 super().tearDown()
2610 def testMakeRepo(self) -> None:
2611 # The base class test assumes that it's using sqlite and assumes
2612 # the config file is acceptable to sqlite.
2613 raise unittest.SkipTest("Postgres config is not compatible with this test.")
2616class ClonedPostgresPosixDatastoreButlerTestCase(PostgresPosixDatastoreButlerTestCase, unittest.TestCase):
2617 """Test that Butler with a Postgres registry still works after cloning."""
2619 def create_butler(
2620 self,
2621 run: str,
2622 storageClass: StorageClass | str,
2623 datasetTypeName: str,
2624 metrics: ButlerMetrics | None = None,
2625 ) -> tuple[DirectButler, DatasetType]:
2626 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics)
2627 return butler.clone(run=run, metrics=metrics), datasetType
2630class InMemoryDatastoreButlerTestCase(ButlerTests, unittest.TestCase):
2631 """InMemoryDatastore specialization of a butler"""
2633 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")
2634 fullConfigKey = None
2635 useTempRoot = False
2636 validationCanFail = False
2637 datastoreStr = ["datastore='InMemory"]
2638 datastoreName = ["InMemoryDatastore@"]
2639 registryStr = "/gen3.sqlite3"
2641 def testIngest(self) -> None:
2642 pass
2644 def test_ingest_zip(self) -> None:
2645 pass
2648class ClonedSqliteButlerTestCase(InMemoryDatastoreButlerTestCase, unittest.TestCase):
2649 """Test that a Butler with a Sqlite registry still works after cloning."""
2651 def create_butler(
2652 self,
2653 run: str,
2654 storageClass: StorageClass | str,
2655 datasetTypeName: str,
2656 metrics: ButlerMetrics | None = None,
2657 ) -> tuple[DirectButler, DatasetType]:
2658 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics)
2659 return butler.clone(run=run), datasetType
2662class ChainedDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2663 """PosixDatastore specialization"""
2665 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml")
2666 fullConfigKey = ".datastore.datastores.1.formatters"
2667 validationCanFail = True
2668 datastoreStr = ["datastore='InMemory", "/FileDatastore_1/,", "/FileDatastore_2/'"]
2669 datastoreName = [
2670 "InMemoryDatastore@",
2671 f"FileDatastore@{BUTLER_ROOT_TAG}/FileDatastore_1",
2672 "SecondDatastore",
2673 ]
2674 registryStr = "/gen3.sqlite3"
2676 def testPruneDatasets(self) -> None:
2677 # This test relies on manipulating files out-of-band, which is
2678 # impossible for this configuration because of the InMemoryDatastore in
2679 # the ChainedDatastore.
2680 pass
2683class ButlerExplicitRootTestCase(PosixDatastoreButlerTestCase):
2684 """Test that a yaml file in one location can refer to a root in another."""
2686 datastoreStr = ["dir1"]
2687 # Disable the makeRepo test since we are deliberately not using
2688 # butler.yaml as the config name.
2689 fullConfigKey = None
2691 def setUp(self) -> None:
2692 self.root = makeTestTempDir(TESTDIR)
2694 # Make a new repository in one place
2695 self.dir1 = os.path.join(self.root, "dir1")
2696 Butler.makeRepo(self.dir1, config=Config(self.configFile))
2698 # Move the yaml file to a different place and add a "root"
2699 self.dir2 = os.path.join(self.root, "dir2")
2700 os.makedirs(self.dir2, exist_ok=True)
2701 configFile1 = os.path.join(self.dir1, "butler.yaml")
2702 config = Config(configFile1)
2703 config["root"] = self.dir1
2704 configFile2 = os.path.join(self.dir2, "butler2.yaml")
2705 config.dumpToUri(configFile2)
2706 os.remove(configFile1)
2707 self.tmpConfigFile = configFile2
2709 def testFileLocations(self) -> None:
2710 self.assertNotEqual(self.dir1, self.dir2)
2711 self.assertTrue(os.path.exists(os.path.join(self.dir2, "butler2.yaml")))
2712 self.assertFalse(os.path.exists(os.path.join(self.dir1, "butler.yaml")))
2713 self.assertTrue(os.path.exists(os.path.join(self.dir1, "gen3.sqlite3")))
2716class ButlerMakeRepoOutfileTestCase(ButlerPutGetTests, unittest.TestCase):
2717 """Test that a config file created by makeRepo outside of repo works."""
2719 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2721 def setUp(self) -> None:
2722 self.root = makeTestTempDir(TESTDIR)
2723 self.root2 = makeTestTempDir(TESTDIR)
2725 self.tmpConfigFile = os.path.join(self.root2, "different.yaml")
2726 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2728 def tearDown(self) -> None:
2729 if os.path.exists(self.root2):
2730 shutil.rmtree(self.root2, ignore_errors=True)
2731 super().tearDown()
2733 def testConfigExistence(self) -> None:
2734 c = Config(self.tmpConfigFile)
2735 uri_config = ResourcePath(c["root"])
2736 uri_expected = ResourcePath(self.root, forceDirectory=True)
2737 self.assertEqual(uri_config.geturl(), uri_expected.geturl())
2738 self.assertNotIn(":", uri_config.path, "Check for URI concatenated with normal path")
2740 def testPutGet(self) -> None:
2741 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents")
2742 self.runPutGetTest(storageClass, "test_metric")
2745class ButlerMakeRepoOutfileDirTestCase(ButlerMakeRepoOutfileTestCase):
2746 """Test that a config file created by makeRepo outside of repo works."""
2748 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2750 def setUp(self) -> None:
2751 self.root = makeTestTempDir(TESTDIR)
2752 self.root2 = makeTestTempDir(TESTDIR)
2754 self.tmpConfigFile = self.root2
2755 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2757 def testConfigExistence(self) -> None:
2758 # Append the yaml file else Config constructor does not know the file
2759 # type.
2760 self.tmpConfigFile = os.path.join(self.tmpConfigFile, "butler.yaml")
2761 super().testConfigExistence()
2764class ButlerMakeRepoOutfileUriTestCase(ButlerMakeRepoOutfileTestCase):
2765 """Test that a config file created by makeRepo outside of repo works."""
2767 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
2769 def setUp(self) -> None:
2770 self.root = makeTestTempDir(TESTDIR)
2771 self.root2 = makeTestTempDir(TESTDIR)
2773 self.tmpConfigFile = ResourcePath(os.path.join(self.root2, "something.yaml")).geturl()
2774 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile)
2777@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!")
2778class S3DatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase):
2779 """S3Datastore specialization of a butler; an S3 storage Datastore +
2780 a local in-memory SqlRegistry.
2781 """
2783 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml")
2784 fullConfigKey = None
2785 validationCanFail = True
2787 bucketName = "anybucketname"
2788 """Name of the Bucket that will be used in the tests. The name is read from
2789 the config file used with the tests during set-up.
2790 """
2792 root = "butlerRoot/"
2793 """Root repository directory expected to be used in case useTempRoot=False.
2794 Otherwise the root is set to a 20 characters long randomly generated string
2795 during set-up.
2796 """
2798 datastoreStr = [f"datastore={root}"]
2799 """Contains all expected root locations in a format expected to be
2800 returned by Butler stringification.
2801 """
2803 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"]
2804 """The expected format of the S3 Datastore string."""
2806 registryStr = "/gen3.sqlite3"
2807 """Expected format of the Registry string."""
2809 mock_aws = mock_aws()
2810 """The mocked s3 interface from moto."""
2812 def genRoot(self) -> str:
2813 """Return a random string of len 20 to serve as a root
2814 name for the temporary bucket repo.
2816 This is equivalent to tempfile.mkdtemp as this is what self.root
2817 becomes when useTempRoot is True.
2818 """
2819 rndstr = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20))
2820 return rndstr + "/"
2822 def setUp(self) -> None:
2823 config = Config(self.configFile)
2824 uri = ResourcePath(config[".datastore.datastore.root"])
2825 self.bucketName = uri.netloc
2827 # Enable S3 mocking of tests.
2828 self.enterContext(clean_test_environment_for_s3())
2829 self.mock_aws.start()
2831 if self.useTempRoot:
2832 self.root = self.genRoot()
2833 rooturi = f"s3://{self.bucketName}/{self.root}"
2834 config.update({"datastore": {"datastore": {"root": rooturi}}})
2836 # need local folder to store registry database
2837 self.reg_dir = makeTestTempDir(TESTDIR)
2838 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3"
2840 # MOTO needs to know that we expect Bucket bucketname to exist
2841 # (this used to be the class attribute bucketName)
2842 s3 = boto3.resource("s3")
2843 s3.create_bucket(Bucket=self.bucketName)
2845 self.datastoreStr = [f"datastore='{rooturi}'"]
2846 self.datastoreName = [f"FileDatastore@{rooturi}"]
2847 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False)
2848 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml")
2850 def tearDown(self) -> None:
2851 s3 = boto3.resource("s3")
2852 bucket = s3.Bucket(self.bucketName)
2853 try:
2854 bucket.objects.all().delete()
2855 except botocore.exceptions.ClientError as e:
2856 if e.response["Error"]["Code"] == "404":
2857 # the key was not reachable - pass
2858 pass
2859 else:
2860 raise
2862 bucket = s3.Bucket(self.bucketName)
2863 bucket.delete()
2865 # Stop the S3 mock.
2866 self.mock_aws.stop()
2868 if self.reg_dir is not None and os.path.exists(self.reg_dir):
2869 shutil.rmtree(self.reg_dir, ignore_errors=True)
2871 if self.useTempRoot and os.path.exists(self.root):
2872 shutil.rmtree(self.root, ignore_errors=True)
2874 super().tearDown()
2877class DatastoreTransfers(TestCaseMixin):
2878 """Base test setup for data transfers between butlers. The concrete tests
2879 for specific configurations are in other classes, below.
2880 """
2882 storageClassFactory: StorageClassFactory
2884 @classmethod
2885 def setUpClass(cls) -> None:
2886 cls.storageClassFactory = StorageClassFactory()
2888 def setUp(self) -> None:
2889 self.root = makeTestTempDir(TESTDIR)
2890 self.config = Config(self.configFile)
2892 # Some tests cause convertors to be replaced so ensure
2893 # the storage class factory is reset each time.
2894 self.storageClassFactory.reset()
2895 self.storageClassFactory.addFromConfig(self.configFile)
2897 def tearDown(self) -> None:
2898 removeTestTempDir(self.root)
2900 def create_butler(self, manager: str | None, label: str, config_file: str | None = None) -> Butler:
2901 if manager is None:
2902 manager = (
2903 "lsst.daf.butler.registry.datasets.byDimensions.ByDimensionsDatasetRecordStorageManagerUUID"
2904 )
2905 config = Config(config_file if config_file is not None else self.configFile)
2906 config["registry", "managers", "datasets"] = manager
2907 butler = Butler.from_config(
2908 Butler.makeRepo(f"{self.root}/butler{label}", config=config), writeable=True
2909 )
2910 self.enterContext(butler)
2911 return butler
2913 def assertButlerTransfers(
2914 self,
2915 purge: bool = False,
2916 storageClassName: str = "StructuredData",
2917 storageClassNameTarget: str | None = None,
2918 ) -> None:
2919 """Test that a run can be transferred to another butler."""
2920 storageClass = self.storageClassFactory.getStorageClass(storageClassName)
2921 if storageClassNameTarget is not None:
2922 storageClassTarget = self.storageClassFactory.getStorageClass(storageClassNameTarget)
2923 else:
2924 storageClassTarget = storageClass
2926 datasetTypeName = "random_data"
2928 # Test will create 3 collections and we will want to transfer
2929 # two of those three.
2930 runs = ["run1", "run2", "other"]
2932 # Also want to use two different dataset types to ensure that
2933 # grouping works.
2934 datasetTypeNames = ["random_data", "random_data_2"]
2936 # Create the run collections in the source butler.
2937 for run in runs:
2938 self.source_butler.collections.register(run)
2940 # Create dimensions in source butler.
2941 n_exposures = 30
2942 self.source_butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
2943 self.source_butler.registry.insertDimensionData(
2944 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
2945 )
2946 self.source_butler.registry.insertDimensionData(
2947 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"}
2948 )
2949 self.source_butler.registry.insertDimensionData(
2950 "day_obs",
2951 {
2952 "instrument": "DummyCamComp",
2953 "id": 20250101,
2954 },
2955 )
2957 for i in range(n_exposures):
2958 self.source_butler.registry.insertDimensionData(
2959 "group", {"instrument": "DummyCamComp", "name": f"group{i}"}
2960 )
2961 self.source_butler.registry.insertDimensionData(
2962 "exposure",
2963 {
2964 "instrument": "DummyCamComp",
2965 "id": i,
2966 "obs_id": f"exp{i}",
2967 "physical_filter": "d-r",
2968 "group": f"group{i}",
2969 "day_obs": 20250101,
2970 },
2971 )
2973 # Create dataset types in the source butler.
2974 dimensions = self.source_butler.dimensions.conform(["instrument", "exposure"])
2975 for datasetTypeName in datasetTypeNames:
2976 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
2977 self.source_butler.registry.registerDatasetType(datasetType)
2979 # Write a dataset to an unrelated run -- this will ensure that
2980 # we are rewriting integer dataset ids in the target if necessary.
2981 # Will not be relevant for UUID.
2982 run = "distraction"
2983 butler = Butler.from_config(butler=self.source_butler, run=run)
2984 self.enterContext(butler)
2985 butler.put(
2986 makeExampleMetrics(),
2987 datasetTypeName,
2988 exposure=1,
2989 instrument="DummyCamComp",
2990 physical_filter="d-r",
2991 )
2993 # Write some example metrics to the source
2994 butler = Butler.from_config(butler=self.source_butler)
2995 self.enterContext(butler)
2997 # Set of DatasetRefs that should be in the list of refs to transfer
2998 # but which will not be transferred.
2999 deleted: set[DatasetRef] = set()
3001 n_expected = 20 # Number of datasets expected to be transferred
3002 source_refs = []
3003 for i in range(n_exposures):
3004 # Put a third of datasets into each collection, only retain
3005 # two thirds.
3006 index = i % 3
3007 run = runs[index]
3008 datasetTypeName = datasetTypeNames[i % 2]
3010 metric = MetricsExample(
3011 summary={"counter": i}, output={"text": "metric"}, data=[2 * x for x in range(i)]
3012 )
3013 dataId = {"exposure": i, "instrument": "DummyCamComp", "physical_filter": "d-r"}
3014 ref = butler.put(metric, datasetTypeName, dataId=dataId, run=run)
3016 # Remove the datastore record using low-level API, but only
3017 # for a specific index.
3018 if purge and index == 1:
3019 # For one of these delete the file as well.
3020 # This allows the "missing" code to filter the
3021 # file out.
3022 # Access the individual datastores.
3023 datastores = []
3024 if hasattr(butler._datastore, "datastores"):
3025 datastores.extend(butler._datastore.datastores)
3026 else:
3027 datastores.append(butler._datastore)
3029 if not deleted:
3030 # For a chained datastore we need to remove
3031 # files in each chain.
3032 for datastore in datastores:
3033 # The file might not be known to the datastore
3034 # if constraints are used.
3035 try:
3036 primary, uris = datastore.getURIs(ref)
3037 except FileNotFoundError:
3038 continue
3039 if primary and primary.scheme != "mem":
3040 primary.remove()
3041 for uri in uris.values():
3042 if uri.scheme != "mem":
3043 uri.remove()
3044 n_expected -= 1
3045 deleted.add(ref)
3047 # Remove the datastore record.
3048 for datastore in datastores:
3049 if hasattr(datastore, "removeStoredItemInfo"):
3050 datastore.removeStoredItemInfo(ref)
3052 if index < 2:
3053 source_refs.append(ref)
3054 if ref not in deleted:
3055 new_metric = butler.get(ref)
3056 self.assertEqual(new_metric, metric)
3058 # Create some bad dataset types to ensure we check for inconsistent
3059 # definitions.
3060 badStorageClass = self.storageClassFactory.getStorageClass("StructuredDataList")
3061 for datasetTypeName in datasetTypeNames:
3062 datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass)
3063 self.target_butler.registry.registerDatasetType(datasetType)
3064 with self.assertRaises(ConflictingDefinitionError) as cm:
3065 self.target_butler.transfer_from(self.source_butler, source_refs)
3066 self.assertIn("dataset type differs", str(cm.exception))
3068 # And remove the bad definitions.
3069 for datasetTypeName in datasetTypeNames:
3070 self.target_butler.registry.removeDatasetType(datasetTypeName)
3072 # Transfer without creating dataset types should fail.
3073 with self.assertRaises(KeyError):
3074 self.target_butler.transfer_from(self.source_butler, source_refs)
3076 # Transfer without creating dimensions should fail.
3077 with self.assertRaises(ConflictingDefinitionError) as cm:
3078 self.target_butler.transfer_from(self.source_butler, source_refs, register_dataset_types=True)
3079 self.assertIn("dimension", str(cm.exception))
3081 # The dry run test requires dataset types to exist. If we have
3082 # been given distinct storage classes for the target we have
3083 # to redefine at least one of the dataset types in the target butler.
3084 if storageClass != storageClassTarget:
3085 self.target_butler.registry.removeDatasetType(datasetTypeNames[0])
3086 datasetType = DatasetType(datasetTypeNames[0], dimensions, storageClassTarget)
3087 self.target_butler.registry.registerDatasetType(datasetType)
3089 # The failed transfer above leaves registry in an inconsistent
3090 # state because the run is created but then rolled back without
3091 # the collection cache being cleared. For now force a refresh.
3092 # Can remove with DM-35498.
3093 self.target_butler.registry.refresh()
3095 # Do a dry run -- this should not have any effect on the target butler.
3096 self.target_butler.transfer_from(self.source_butler, source_refs, dry_run=True)
3098 # Transfer the records for one ref to test the alternative API.
3099 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
3100 self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]])
3101 self.assertIn("number of records transferred: 1", ";".join(log_cm.output))
3103 # Now transfer them to the second butler, including dimensions.
3104 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm:
3105 transferred = self.target_butler.transfer_from(
3106 self.source_butler,
3107 source_refs,
3108 register_dataset_types=True,
3109 transfer_dimensions=True,
3110 )
3111 self.assertEqual(len(transferred), n_expected)
3112 log_output = ";".join(log_cm.output)
3114 # A ChainedDatastore will use the in-memory datastore for mexists
3115 # so we can not rely on the mexists log message.
3116 self.assertIn("Number of datastore records found in source", log_output)
3117 self.assertIn("Creating output run", log_output)
3119 # Do the transfer twice to ensure that it will do nothing extra.
3120 # Only do this if purge=True because it does not work for int
3121 # dataset_id.
3122 if purge:
3123 # This should not need to register dataset types.
3124 transferred = self.target_butler.transfer_from(self.source_butler, source_refs)
3125 self.assertEqual(len(transferred), n_expected)
3127 with self.assertRaises((TypeError, AttributeError)):
3128 self.target_butler._datastore.transfer_from(self.source_butler, source_refs) # type: ignore
3130 with self.assertRaises(ValueError):
3131 self.target_butler._datastore.transfer_from(
3132 self.source_butler._datastore, source_refs, transfer="split"
3133 )
3135 # Now try to get the same refs from the new butler.
3136 for ref in source_refs:
3137 if ref not in deleted:
3138 new_metric = self.target_butler.get(ref)
3139 old_metric = self.source_butler.get(ref)
3140 self.assertEqual(new_metric, old_metric)
3142 # Try again without implicit storage class conversion
3143 # triggered by using the source ref. This will do conversion
3144 # since the formatter will be returning the source python type.
3145 target_ref = self.target_butler.get_dataset(ref.id)
3146 if target_ref.datasetType.storageClass != ref.datasetType.storageClass:
3147 new_metric = self.target_butler.get(target_ref)
3148 self.assertNotEqual(type(new_metric), type(old_metric))
3150 # Remove the dataset from the target and put it again
3151 # as if it was the right type all along for this butler.
3152 self.target_butler.pruneDatasets(
3153 [target_ref], unstore=True, purge=True, disassociate=True
3154 )
3155 self.target_butler.put(new_metric, target_ref)
3156 new_new_metric = self.target_butler.get(target_ref)
3157 new_old_metric = self.target_butler.get(
3158 target_ref, storageClass=ref.datasetType.storageClass
3159 )
3160 self.assertEqual(new_new_metric, new_metric)
3161 self.assertEqual(new_old_metric, old_metric)
3163 # Now prune run2 collection and create instead a CHAINED collection.
3164 # This should block the transfer.
3165 self.target_butler.removeRuns(["run2"])
3166 self.target_butler.collections.register("run2", CollectionType.CHAINED)
3167 with self.assertRaises(CollectionTypeError):
3168 # Re-importing the run1 datasets can be problematic if they
3169 # use integer IDs so filter those out.
3170 to_transfer = [ref for ref in source_refs if ref.run == "run2"]
3171 self.target_butler.transfer_from(self.source_butler, to_transfer)
3174class PosixDatastoreTransfers(DatastoreTransfers, unittest.TestCase):
3175 """Test data transfers between butlers.
3177 Test for different managers. UUID to UUID and integer to integer are
3178 tested. UUID to integer is not supported since we do not currently
3179 want to allow that. Integer to UUID is supported with the caveat
3180 that UUID4 will be generated and this will be incorrect for raw
3181 dataset types. The test ignores that.
3182 """
3184 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3186 def create_butlers(
3187 self, manager1: str | None = None, manager2: str | None = None, source_config: str | None = None
3188 ) -> None:
3189 self.source_butler = self.create_butler(manager1, "1", config_file=source_config)
3190 self.target_butler = self.create_butler(manager2, "2")
3192 def testTransferUuidToUuid(self) -> None:
3193 self.create_butlers()
3194 self.assertButlerTransfers()
3196 def testTransferFromChainedUuidToUuid(self) -> None:
3197 """Force the source butler to be a ChainedDatastore."""
3198 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-chained.yaml"))
3199 self.assertButlerTransfers()
3201 def testTransferFromIncompatibleUuidToUuid(self) -> None:
3202 """Force the source butler to be a incompatible datastore."""
3203 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml"))
3204 with self.assertRaises(NotImplementedError):
3205 self.assertButlerTransfers()
3207 def testTransferFromIncompatibleChainUuidToUuid(self) -> None:
3208 """Force the source butler to be a incompatible datastore."""
3209 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory-chain.yaml"))
3210 with self.assertRaises(TypeError):
3211 self.assertButlerTransfers()
3213 def testTransferFromFileUuidToUuid(self) -> None:
3214 """Force the source butler to be a FileDatastore."""
3215 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler.yaml"))
3216 self.assertButlerTransfers()
3218 def testTransferMissing(self) -> None:
3219 """Test transfers where datastore records are missing.
3221 This is how execution butler works.
3222 """
3223 self.create_butlers()
3225 # Configure the source butler to allow trust.
3226 self.source_butler._datastore._set_trust_mode(True)
3228 self.assertButlerTransfers(purge=True)
3230 def testTransferMissingDisassembly(self) -> None:
3231 """Test transfers where datastore records are missing.
3233 This is how execution butler works.
3234 """
3235 self.create_butlers()
3237 # Configure the source butler to allow trust.
3238 self.source_butler._datastore._set_trust_mode(True)
3240 # Test disassembly.
3241 self.assertButlerTransfers(purge=True, storageClassName="StructuredComposite")
3243 def testTransferDifferingStorageClasses(self) -> None:
3244 """Test transfers when the source butler dataset type has a different
3245 but compatible storage class.
3246 """
3247 self.create_butlers()
3249 self.assertButlerTransfers(storageClassNameTarget="MetricsConversion")
3251 def testTransferDifferingStorageClassesDisassembly(self) -> None:
3252 """Test transfers when the source butler dataset type has a different
3253 but compatible storage class and where the source butler has
3254 disassembled.
3255 """
3256 self.create_butlers()
3258 self.assertButlerTransfers(
3259 storageClassName="StructuredComposite", storageClassNameTarget="MetricsConversion"
3260 )
3262 def testUnsafeDirectTransfer(self) -> None:
3263 """Test that transfer='unsafe_direct' records the absolute URI of
3264 source files in the target datastore.
3265 """
3266 self.create_butlers()
3267 dataset_type = DatasetType("dt", [], "int", universe=self.source_butler.dimensions)
3268 self.source_butler.registry.registerDatasetType(dataset_type)
3269 self.source_butler.collections.register("run")
3270 ref = self.source_butler.put(123, "dt", [], run="run")
3271 self.target_butler.transfer_from(
3272 self.source_butler, [ref], transfer="unsafe_direct", register_dataset_types=True
3273 )
3274 self.assertEqual(self.target_butler.get(ref), 123)
3275 self.assertEqual(self.source_butler.getURI(ref), self.target_butler.getURI(ref))
3277 def testAbsoluteURITransferDirect(self) -> None:
3278 """Test transfer using an absolute URI."""
3279 self._absolute_transfer("auto")
3281 def testAbsoluteURITransferUnsafeDirect(self) -> None:
3282 """Test transfer using an absolute URI."""
3283 self._absolute_transfer("unsafe_direct")
3285 def testAbsoluteURITransferCopy(self) -> None:
3286 """Test transfer using an absolute URI."""
3287 self._absolute_transfer("copy")
3289 def _absolute_transfer(self, transfer: str) -> None:
3290 self.create_butlers()
3292 storageClassName = "StructuredData"
3293 storageClass = self.storageClassFactory.getStorageClass(storageClassName)
3294 datasetTypeName = "random_data"
3295 run = "run1"
3296 self.source_butler.collections.register(run)
3298 dimensions = self.source_butler.dimensions.conform(())
3299 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
3300 self.source_butler.registry.registerDatasetType(datasetType)
3302 metrics = makeExampleMetrics()
3303 with ResourcePath.temporary_uri(suffix=".json") as temp:
3304 dataId = DataCoordinate.make_empty(self.source_butler.dimensions)
3305 source_refs = [DatasetRef(datasetType, dataId, run=run)]
3306 temp.write(json.dumps(metrics.exportAsDict()).encode())
3307 dataset = FileDataset(path=temp, refs=source_refs)
3308 self.source_butler.ingest(dataset, transfer="direct")
3310 self.target_butler.transfer_from(
3311 self.source_butler, dataset.refs, register_dataset_types=True, transfer=transfer
3312 )
3314 uri = self.target_butler.getURI(dataset.refs[0])
3315 if transfer == "auto" or transfer == "unsafe_direct":
3316 self.assertEqual(uri, temp)
3317 else:
3318 self.assertNotEqual(uri, temp)
3320 def test_shared_dimension_group(self):
3321 """Test internal logic that divides dataset types by dimension group
3322 when doing registry updates.
3323 """
3324 self.create_butlers()
3325 self.source_butler.import_(filename=_get_test_data_path("base.yaml"), without_datastore=True)
3326 self.source_butler.import_(filename=_get_test_data_path("datasets.yaml"), without_datastore=True)
3328 source_butler = self.source_butler
3329 target_butler = self.target_butler
3331 # Create a dataset type with the same dimensions as the 'bias' dataset
3332 # type from base.yaml
3333 dataset_type = DatasetType(
3334 "test_type", ["instrument", "detector"], "int", universe=source_butler.dimensions
3335 )
3336 source_butler.registry.registerDatasetType(dataset_type)
3337 # This has the same data ID as one of the bias datasets in
3338 # datasets.yaml.
3339 test_ref = source_butler.registry.insertDatasets(
3340 "test_type", [{"instrument": "Cam1", "detector": 2}], run="imported_g"
3341 )[0]
3343 biases = source_butler.query_datasets("bias", ["imported_g", "imported_r"])
3344 flats = source_butler.query_datasets("flat", ["imported_g", "imported_r"])
3345 refs = [test_ref, *biases, *flats]
3347 # Test setup will be even more convoluted if we want the datastore to
3348 # actually transfer files. For testing the dimension group behavior,
3349 # we really only care about the registry.
3350 with unittest.mock.patch.object(target_butler._datastore, "transfer_from") as mock:
3351 mock.return_value = (set(refs), set())
3352 target_butler.transfer_from(
3353 source_butler,
3354 refs,
3355 transfer=None,
3356 register_dataset_types=True,
3357 skip_missing=False,
3358 transfer_dimensions=True,
3359 )
3361 transferred_test_ref = target_butler.find_dataset(
3362 "test_type", {"instrument": "Cam1", "detector": 2}, collections="imported_g"
3363 )
3364 self.assertEqual(transferred_test_ref.id, test_ref.id)
3366 transferred_bias = target_butler.find_dataset(
3367 "bias", {"instrument": "Cam1", "detector": 2}, collections="imported_g"
3368 )
3369 self.assertEqual(transferred_bias.id, uuid.UUID("51352db4-a47a-447c-b12d-a50b206b17cd"))
3371 transferred_flat = target_butler.find_dataset(
3372 "flat",
3373 {"instrument": "Cam1", "detector": 2, "physical_filter": "Cam1-R1", "band": "r"},
3374 collections="imported_r",
3375 )
3376 self.assertEqual(transferred_flat.id, uuid.UUID("c1296796-56c5-4acf-9b49-40d920c6f840"))
3379class ChainedDatastoreTransfers(PosixDatastoreTransfers):
3380 """Test transfers using a chained datastore."""
3382 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml")
3385@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3386class ButlerServerDatastoreTransfers(DatastoreTransfers, unittest.TestCase):
3387 """Test ``transfer_from`` involving Butler server."""
3389 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3391 def test_transfers_from_remote_to_direct(self) -> None:
3392 from lsst.daf.butler.remote_butler._remote_file_transfer_source import (
3393 mock_file_transfer_uris_for_unit_test,
3394 )
3396 self.target_butler = self.create_butler(None, "2")
3397 with create_test_server(TESTDIR) as server:
3398 self.source_butler = server.hybrid_butler
3400 def _remap_transfer_url(path: HttpResourcePath) -> HttpResourcePath:
3401 # The Butler server returns HTTP URIs with a domain name that
3402 # is not resolvable because there is no actual HTTP server
3403 # involved in these tests. Strip this first layer of
3404 # indirection, and return the target of the redirect instead.
3405 response = server.client.get(str(path), follow_redirects=False, headers=path._extra_headers)
3406 return ResourcePath(str(response.next_request.url))
3408 with mock_file_transfer_uris_for_unit_test(_remap_transfer_url):
3409 self.assertButlerTransfers()
3412class TransferDatasetsInPlace(unittest.TestCase):
3413 """Test behavior of transfer_datasets_in_place() specialty function used by
3414 Prompt Publication service.
3415 """
3417 def test_file_datastore(self) -> None:
3418 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3419 with (
3420 tempfile.TemporaryDirectory() as datastore_root,
3421 tempfile.TemporaryDirectory() as other_repo_root,
3422 ):
3423 config = Config(configFile)
3424 config["datastore", "datastore", "name"] = "file_datastore"
3425 Butler.makeRepo(datastore_root, config=config)
3426 config["datastore", "datastore", "root"] = datastore_root
3427 Butler.makeRepo(other_repo_root, config, forceConfigRoot=False)
3428 with (
3429 Butler(datastore_root, writeable=True) as source_butler,
3430 Butler(other_repo_root, writeable=True) as target_butler,
3431 ):
3432 self._test_transfer_datasets_in_place(source_butler, target_butler)
3434 def test_chained_datastore(self) -> None:
3435 configFile = os.path.join(TESTDIR, "config/basic/butler-chained-posix.yaml")
3436 with (
3437 tempfile.TemporaryDirectory() as datastore_root,
3438 tempfile.TemporaryDirectory() as other_repo_root,
3439 ):
3440 config = Config(configFile)
3441 config["datastore", "datastore", "datastores", 0, "datastore", "root"] = (
3442 f"{datastore_root}/butler_test_repository"
3443 )
3444 config["datastore", "datastore", "datastores", 1, "datastore", "root"] = (
3445 f"{datastore_root}/butler_test_repository2"
3446 )
3447 Butler.makeRepo(datastore_root, config=config, forceConfigRoot=False)
3448 Butler.makeRepo(other_repo_root, config=config, forceConfigRoot=False)
3449 with (
3450 Butler(datastore_root, writeable=True) as source_butler,
3451 Butler(other_repo_root, writeable=True) as target_butler,
3452 ):
3453 self._test_transfer_datasets_in_place(source_butler, target_butler)
3455 def _test_transfer_datasets_in_place(
3456 self, source_butler: DirectButler, target_butler: DirectButler
3457 ) -> None:
3458 metric_repo = MetricTestRepo.create_from_butler(
3459 source_butler,
3460 source_butler._config,
3461 )
3462 target_butler.transfer_dimension_records_from(source_butler, [metric_repo.ref1, metric_repo.ref2])
3463 # Verify that the setup was correct and the two repos have
3464 # independent registries.
3465 self.assertIsNone(target_butler.get_dataset(metric_repo.ref1.id))
3466 # Copy one dataset, and make sure we can load it from the
3467 # target repo.
3468 self.assertEqual(
3469 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]),
3470 [metric_repo.ref1],
3471 )
3472 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3473 self.assertIsNone(target_butler.get_dataset(metric_repo.ref2.id))
3474 self.assertEqual(source_butler.getURIs(metric_repo.ref1), target_butler.getURIs(metric_repo.ref1))
3475 # Trying to copy the same dataset again is a no-op.
3476 self.assertEqual(
3477 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]),
3478 [],
3479 )
3480 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3481 # A mix of existing and non-existing datasets.
3482 self.assertEqual(
3483 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1, metric_repo.ref2]),
3484 [metric_repo.ref2],
3485 )
3486 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1))
3487 self.assertEqual(target_butler.get(metric_repo.ref2), source_butler.get(metric_repo.ref2))
3489 # For testing datastore chaining, set up a dataset that is only
3490 # accepted by one of the datastores.
3491 source_butler.registry.registerDatasetType(
3492 DatasetType("rejected_by_first", source_butler.dimensions.conform([]), "int")
3493 )
3494 source_butler.registry.registerRun("run")
3495 ref = source_butler.put(1, "rejected_by_first", dataId={}, run="run")
3496 self.assertEqual(
3497 transfer_datasets_in_place(source_butler, target_butler, [ref]),
3498 [ref],
3499 )
3500 self.assertEqual(1, target_butler.get(ref))
3503class NullDatastoreTestCase(unittest.TestCase):
3504 """Test that we can fall back to a null datastore."""
3506 # Need a good config to create the repo.
3507 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")
3508 storageClassFactory: StorageClassFactory
3510 @classmethod
3511 def setUpClass(cls) -> None:
3512 cls.storageClassFactory = StorageClassFactory()
3513 cls.storageClassFactory.addFromConfig(cls.configFile)
3515 def setUp(self) -> None:
3516 """Create a new butler root for each test."""
3517 self.root = makeTestTempDir(TESTDIR)
3518 Butler.makeRepo(self.root, config=Config(self.configFile))
3520 def tearDown(self) -> None:
3521 removeTestTempDir(self.root)
3523 def test_fallback(self) -> None:
3524 # Read the butler config and mess with the datastore section.
3525 config_path = os.path.join(self.root, "butler.yaml")
3526 bad_config = Config(config_path)
3527 bad_config["datastore", "cls"] = "lsst.not.a.datastore.Datastore"
3528 bad_config.dumpToUri(config_path)
3530 with self.assertRaises(RuntimeError):
3531 Butler(self.root, without_datastore=False)
3533 with self.assertRaises(RuntimeError):
3534 Butler.from_config(self.root, without_datastore=False)
3536 butler = Butler.from_config(self.root, writeable=True, without_datastore=True)
3537 self.enterContext(butler)
3538 self.assertIsInstance(butler._datastore, NullDatastore)
3540 # Check that registry is working.
3541 butler.collections.register("MYRUN")
3542 collections = butler.collections.query("*")
3543 self.assertIn("MYRUN", set(collections))
3545 # Create a ref.
3546 dimensions = butler.dimensions.conform([])
3547 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict")
3548 datasetTypeName = "metric"
3549 datasetType = DatasetType(datasetTypeName, dimensions, storageClass)
3550 butler.registry.registerDatasetType(datasetType)
3551 ref = DatasetRef(datasetType, {}, run="MYRUN")
3553 # Check that datastore will complain.
3554 with self.assertRaises(FileNotFoundError):
3555 butler.get(ref)
3556 with self.assertRaises(FileNotFoundError):
3557 butler.getURI(ref)
3560@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3561class ButlerServerTests(FileDatastoreButlerTests):
3562 """Test RemoteButler and Butler server."""
3564 configFile = None
3565 predictionSupported = False
3566 trustModeSupported = False
3568 postgres: TemporaryPostgresInstance | None
3570 def setUp(self):
3571 self.server_instance = self.enterContext(create_test_server(TESTDIR))
3573 def tearDown(self):
3574 pass
3576 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool:
3577 # S3 pre-signed URLs may end up with differing expiration times in the
3578 # query parameters, so ignore query parameters when comparing.
3579 return uri1.scheme == uri2.scheme and uri1.netloc == uri2.netloc and uri1.path == uri2.path
3581 def create_empty_butler(
3582 self,
3583 run: str | None = None,
3584 writeable: bool | None = None,
3585 metrics: ButlerMetrics | None = None,
3586 cleanup: bool = True,
3587 ) -> Butler:
3588 return self.server_instance.hybrid_butler.clone(run=run, metrics=metrics)
3590 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None:
3591 # Can't delete a file via S3 signed URLs, so we need to reach in
3592 # through DirectButler to delete the dataset.
3593 uri = self.server_instance.direct_butler.getURI(ref)
3594 uri.remove()
3596 def testConstructor(self):
3597 # RemoteButler constructor is tested in test_server.py and
3598 # test_remote_butler.py.
3599 pass
3601 def testDafButlerRepositories(self):
3602 # Loading of RemoteButler via repository index is tested in
3603 # test_server.py.
3604 pass
3606 def testGetDatasetTypes(self) -> None:
3607 # This is mostly a test of validateConfiguration, which is for
3608 # validating Datastore configuration and thus isn't relevant to
3609 # RemoteButler.
3610 pass
3612 def testMakeRepo(self) -> None:
3613 # Only applies to DirectButler.
3614 pass
3616 # Pickling not yet implemented for RemoteButler/HybridButler.
3617 @unittest.expectedFailure
3618 def testPickle(self) -> None:
3619 return super().testPickle()
3621 def testStringification(self) -> None:
3622 self.assertEqual(
3623 str(self.server_instance.remote_butler),
3624 "RemoteButler(https://test.example/api/butler/repo/testrepo/)",
3625 )
3627 def testTransaction(self) -> None:
3628 # Transactions will never be supported for RemoteButler.
3629 pass
3631 def testPutTemplates(self) -> None:
3632 # The Butler server instance is configured with different file naming
3633 # templates than this test is expecting.
3634 pass
3637@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3638class ButlerServerSqliteTests(ButlerServerTests, unittest.TestCase):
3639 """Tests for RemoteButler's registry shim, with a SQLite DB backing the
3640 server.
3641 """
3643 postgres = None
3646@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
3647class ButlerServerPostgresTests(ButlerServerTests, unittest.TestCase):
3648 """Tests for RemoteButler's registry shim, with a Postgres DB backing the
3649 server.
3650 """
3652 @classmethod
3653 def setUpClass(cls):
3654 cls.postgres = cls.enterClassContext(setup_postgres_test_db())
3655 super().setUpClass()
3658def setup_module(module: types.ModuleType) -> None:
3659 """Set up the module for pytest."""
3660 clean_environment()
3663def _get_test_data_path(filename: str) -> ResourcePath:
3664 return ResourcePath(f"resource://lsst.daf.butler/tests/registry_data/{filename}")
3667if __name__ == "__main__":
3668 clean_environment()
3669 unittest.main()