Coverage for tests / test_butler.py: 13%

1900 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Tests for Butler.""" 

29 

30from __future__ import annotations 

31 

32import json 

33import logging 

34import os 

35import pathlib 

36import pickle 

37import posixpath 

38import random 

39import re 

40import shutil 

41import string 

42import tempfile 

43import unittest 

44import unittest.mock 

45import uuid 

46import warnings 

47import weakref 

48from collections.abc import Callable, Mapping 

49from typing import TYPE_CHECKING, Any, cast 

50 

51try: 

52 import boto3 

53 import botocore 

54 

55 from lsst.resources.s3utils import clean_test_environment_for_s3 

56 

57 try: 

58 from moto import mock_aws # v5 

59 except ImportError: 

60 from moto import mock_s3 as mock_aws 

61except ImportError: 

62 boto3 = None 

63 

64 def mock_aws(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def] 

65 """No-op decorator in case moto mock_aws can not be imported.""" 

66 return None 

67 

68 

69import astropy.time 

70from sqlalchemy.exc import IntegrityError 

71 

72from lsst.daf.butler import ( 

73 Butler, 

74 ButlerConfig, 

75 ButlerMetrics, 

76 ButlerRepoIndex, 

77 CollectionCycleError, 

78 CollectionType, 

79 Config, 

80 DataCoordinate, 

81 DatasetExistence, 

82 DatasetNotFoundError, 

83 DatasetProvenance, 

84 DatasetRef, 

85 DatasetType, 

86 DimensionRecord, 

87 FileDataset, 

88 NoDefaultCollectionError, 

89 StorageClassFactory, 

90 ValidationError, 

91 script, 

92) 

93from lsst.daf.butler._rubin.file_datasets import transfer_datasets_to_datastore 

94from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest 

95from lsst.daf.butler._rubin.transfer_datasets_in_place import transfer_datasets_in_place 

96from lsst.daf.butler.datastore import NullDatastore 

97from lsst.daf.butler.datastore.file_templates import FileTemplate, FileTemplateValidationError 

98from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

99from lsst.daf.butler.datastores.fileDatastore import FileDatastore 

100from lsst.daf.butler.direct_butler import DirectButler 

101from lsst.daf.butler.registry import ( 

102 CollectionError, 

103 CollectionTypeError, 

104 ConflictingDefinitionError, 

105 DataIdValueError, 

106 DatasetTypeExpressionError, 

107 MissingCollectionError, 

108 OrphanedRecordError, 

109) 

110from lsst.daf.butler.registry.sql_registry import SqlRegistry 

111from lsst.daf.butler.repo_relocation import BUTLER_ROOT_TAG 

112from lsst.daf.butler.tests import MetricsExample, MetricsExampleModel, MultiDetectorFormatter 

113from lsst.daf.butler.tests.postgresql import TemporaryPostgresInstance, setup_postgres_test_db 

114from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available 

115from lsst.daf.butler.tests.utils import ( 

116 MetricTestRepo, 

117 TestCaseMixin, 

118 create_populated_sqlite_registry, 

119 makeTestTempDir, 

120 removeTestTempDir, 

121 safeTestTempDir, 

122) 

123from lsst.resources import ResourcePath 

124from lsst.resources.http import HttpResourcePath 

125from lsst.utils import doImportType 

126from lsst.utils.introspection import get_full_type_name 

127 

128if butler_server_is_available: 

129 from lsst.daf.butler.tests.server import create_test_server 

130 

131 

132if TYPE_CHECKING: 

133 import types 

134 

135 from lsst.daf.butler import DimensionGroup, Registry, StorageClass 

136 

137TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

138 

139 

140def clean_environment() -> None: 

141 """Remove external environment variables that affect the tests.""" 

142 for k in ("DAF_BUTLER_REPOSITORY_INDEX",): 

143 os.environ.pop(k, None) 

144 

145 

146def makeExampleMetrics() -> MetricsExample: 

147 """Return example dataset suitable for tests.""" 

148 return MetricsExample( 

149 {"AM1": 5.2, "AM2": 30.6}, 

150 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

151 [563, 234, 456.7, 752, 8, 9, 27], 

152 ) 

153 

154 

155class TransactionTestError(Exception): 

156 """Specific error for testing transactions, to prevent misdiagnosing 

157 that might otherwise occur when a standard exception is used. 

158 """ 

159 

160 pass 

161 

162 

163class ButlerConfigTests(unittest.TestCase): 

164 """Simple tests for ButlerConfig that are not tested in any other test 

165 cases. 

166 """ 

167 

168 def testSearchPath(self) -> None: 

169 configFile = os.path.join(TESTDIR, "config", "basic", "butler.yaml") 

170 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm: 

171 config1 = ButlerConfig(configFile) 

172 self.assertNotIn("testConfigs", "\n".join(cm.output)) 

173 

174 overrideDirectory = os.path.join(TESTDIR, "config", "testConfigs") 

175 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm: 

176 config2 = ButlerConfig(configFile, searchPaths=[overrideDirectory]) 

177 self.assertIn("testConfigs", "\n".join(cm.output)) 

178 

179 key = ("datastore", "records", "table") 

180 self.assertNotEqual(config1[key], config2[key]) 

181 self.assertEqual(config2[key], "override_record") 

182 

183 

184class ButlerPutGetTests(TestCaseMixin): 

185 """Helper method for running a suite of put/get tests from different 

186 butler configurations. 

187 """ 

188 

189 root: str 

190 default_run = "ingésτ😺" 

191 storageClassFactory: StorageClassFactory 

192 configFile: str | None 

193 tmpConfigFile: str 

194 

195 @staticmethod 

196 def addDatasetType( 

197 datasetTypeName: str, dimensions: DimensionGroup, storageClass: StorageClass | str, registry: Registry 

198 ) -> DatasetType: 

199 """Create a DatasetType and register it""" 

200 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

201 registry.registerDatasetType(datasetType) 

202 return datasetType 

203 

204 @classmethod 

205 def setUpClass(cls) -> None: 

206 cls.storageClassFactory = StorageClassFactory() 

207 if cls.configFile is not None: 

208 cls.storageClassFactory.addFromConfig(cls.configFile) 

209 

210 def assertGetComponents( 

211 self, 

212 butler: Butler, 

213 datasetRef: DatasetRef, 

214 components: tuple[str, ...], 

215 reference: Any, 

216 collections: Any = None, 

217 ) -> None: 

218 datasetType = datasetRef.datasetType 

219 dataId = datasetRef.dataId 

220 deferred = butler.getDeferred(datasetRef) 

221 

222 for component in components: 

223 compTypeName = datasetType.componentTypeName(component) 

224 result = butler.get(compTypeName, dataId, collections=collections) 

225 self.assertEqual(result, getattr(reference, component)) 

226 result_deferred = deferred.get(component=component) 

227 self.assertEqual(result_deferred, result) 

228 

229 def tearDown(self) -> None: 

230 if self.root is not None: 

231 removeTestTempDir(self.root) 

232 

233 def create_empty_butler( 

234 self, 

235 run: str | None = None, 

236 writeable: bool | None = None, 

237 metrics: ButlerMetrics | None = None, 

238 cleanup: bool = True, 

239 ): 

240 """Create a Butler for the test repository, without inserting test 

241 data. 

242 """ 

243 butler = Butler.from_config(self.tmpConfigFile, run=run, writeable=writeable, metrics=metrics) 

244 if cleanup: 

245 self.enterContext(butler) 

246 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration" 

247 return butler 

248 

249 def create_butler( 

250 self, 

251 run: str, 

252 storageClass: StorageClass | str, 

253 datasetTypeName: str, 

254 metrics: ButlerMetrics | None = None, 

255 ) -> tuple[Butler, DatasetType]: 

256 """Create a Butler for the test repository and insert some test data 

257 into it. 

258 """ 

259 butler = self.create_empty_butler(run=run, metrics=metrics) 

260 

261 collections = set(butler.collections.query("*")) 

262 self.assertEqual(collections, {run}) 

263 # Create and register a DatasetType 

264 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

265 

266 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

267 

268 # Add needed Dimensions 

269 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

270 butler.registry.insertDimensionData( 

271 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

272 ) 

273 butler.registry.insertDimensionData( 

274 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"} 

275 ) 

276 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101}) 

277 visit_start = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai") 

278 visit_end = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai") 

279 butler.registry.insertDimensionData( 

280 "visit", 

281 { 

282 "instrument": "DummyCamComp", 

283 "id": 423, 

284 "name": "fourtwentythree", 

285 "physical_filter": "d-r", 

286 "datetime_begin": visit_start, 

287 "datetime_end": visit_end, 

288 "day_obs": 20200101, 

289 }, 

290 ) 

291 

292 # Add more visits for some later tests 

293 for visit_id in (424, 425): 

294 butler.registry.insertDimensionData( 

295 "visit", 

296 { 

297 "instrument": "DummyCamComp", 

298 "id": visit_id, 

299 "name": f"fourtwentyfour_{visit_id}", 

300 "physical_filter": "d-r", 

301 "day_obs": 20200101, 

302 }, 

303 ) 

304 return butler, datasetType 

305 

306 def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> Butler: 

307 # New datasets will be added to run and tag, but we will only look in 

308 # tag when looking up datasets. 

309 run = self.default_run 

310 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName) 

311 assert butler.run is not None 

312 

313 # Create and store a dataset 

314 metric = makeExampleMetrics() 

315 dataId = butler.registry.expandDataId({"instrument": "DummyCamComp", "visit": 423}) 

316 

317 # Dataset should not exist if we haven't added it 

318 with self.assertRaises(DatasetNotFoundError): 

319 butler.get(datasetTypeName, dataId) 

320 

321 # Put and remove the dataset once as a DatasetRef, once as a dataId, 

322 # and once with a DatasetType 

323 

324 # Keep track of any collections we add and do not clean up 

325 expected_collections = {run} 

326 

327 counter = 0 

328 ref = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run="put_run_1") 

329 args = tuple[DatasetRef] | tuple[str | DatasetType, DataCoordinate] 

330 for args in ((ref,), (datasetTypeName, dataId), (datasetType, dataId)): 

331 # Since we are using subTest we can get cascading failures 

332 # here with the first attempt failing and the others failing 

333 # immediately because the dataset already exists. Work around 

334 # this by using a distinct run collection each time 

335 counter += 1 

336 this_run = f"put_run_{counter}" 

337 butler.collections.register(this_run) 

338 expected_collections.update({this_run}) 

339 

340 with self.subTest(args=repr(args)): 

341 kwargs: dict[str, Any] = {} 

342 if not isinstance(args[0], DatasetRef): # type: ignore 

343 kwargs["run"] = this_run 

344 ref = butler.put(metric, *args, **kwargs) 

345 self.assertIsInstance(ref, DatasetRef) 

346 

347 # Test get of a ref. 

348 metricOut = butler.get(ref) 

349 self.assertEqual(metric, metricOut) 

350 # Test get 

351 metricOut = butler.get(ref.datasetType.name, dataId, collections=this_run) 

352 self.assertEqual(metric, metricOut) 

353 # Test get with a datasetRef 

354 metricOut = butler.get(ref) 

355 self.assertEqual(metric, metricOut) 

356 # Test getDeferred with dataId 

357 metricOut = butler.getDeferred(ref.datasetType.name, dataId, collections=this_run).get() 

358 self.assertEqual(metric, metricOut) 

359 # Test getDeferred with a ref 

360 metricOut = butler.getDeferred(ref).get() 

361 self.assertEqual(metric, metricOut) 

362 

363 # Check we can get components 

364 if storageClass.isComposite(): 

365 self.assertGetComponents( 

366 butler, ref, ("summary", "data", "output"), metric, collections=this_run 

367 ) 

368 

369 primary_uri, secondary_uris = butler.getURIs(ref) 

370 n_uris = len(secondary_uris) 

371 if primary_uri: 

372 n_uris += 1 

373 

374 # Can the artifacts themselves be retrieved? 

375 if not butler._datastore.isEphemeral: 

376 # Create a temporary directory to hold the retrieved 

377 # artifacts. 

378 with tempfile.TemporaryDirectory( 

379 prefix="butler-artifacts-", ignore_cleanup_errors=True 

380 ) as artifact_root: 

381 root_uri = ResourcePath(artifact_root, forceDirectory=True) 

382 

383 for preserve_path in (True, False): 

384 destination = root_uri.join(f"{preserve_path}_{counter}/") 

385 log = logging.getLogger("lsst.x") 

386 log.debug("Using destination %s for args %s", destination, args) 

387 # Use copy so that we can test that overwrite 

388 # protection works (using "auto" for File URIs 

389 # would use hard links and subsequent transfer 

390 # would work because it knows they are the same 

391 # file). 

392 transferred = butler.retrieveArtifacts( 

393 [ref], destination, preserve_path=preserve_path, transfer="copy" 

394 ) 

395 self.assertGreater(len(transferred), 0) 

396 artifacts = list(ResourcePath.findFileResources([destination])) 

397 # Filter out the index file. 

398 artifacts = [a for a in artifacts if a.basename() != ZipIndex.index_name] 

399 self.assertEqual(set(transferred), set(artifacts)) 

400 

401 for artifact in transferred: 

402 path_in_destination = artifact.relative_to(destination) 

403 self.assertIsNotNone(path_in_destination) 

404 assert path_in_destination is not None 

405 

406 # When path is not preserved there should not 

407 # be any path separators. 

408 num_seps = path_in_destination.count("/") 

409 if preserve_path: 

410 self.assertGreater(num_seps, 0) 

411 else: 

412 self.assertEqual(num_seps, 0) 

413 

414 self.assertEqual( 

415 len(artifacts), 

416 n_uris, 

417 "Comparing expected artifacts vs actual:" 

418 f" {artifacts} vs {primary_uri} and {secondary_uris}", 

419 ) 

420 

421 if preserve_path: 

422 # No need to run these twice 

423 with self.assertRaises(ValueError): 

424 butler.retrieveArtifacts([ref], destination, transfer="move") 

425 

426 with self.assertRaisesRegex( 

427 ValueError, "^Destination location must refer to a directory" 

428 ): 

429 butler.retrieveArtifacts( 

430 [ref], ResourcePath("/some/file.txt", forceDirectory=False) 

431 ) 

432 

433 with self.assertRaises(FileExistsError): 

434 butler.retrieveArtifacts([ref], destination) 

435 

436 transferred_again = butler.retrieveArtifacts( 

437 [ref], destination, preserve_path=preserve_path, overwrite=True 

438 ) 

439 self.assertEqual(set(transferred_again), set(transferred)) 

440 

441 # Now remove the dataset completely. 

442 butler.pruneDatasets([ref], purge=True, unstore=True) 

443 # Lookup with original args should still fail. 

444 kwargs = {"collections": this_run} 

445 if isinstance(args[0], DatasetRef): 

446 kwargs = {} # Prevent warning from being issued. 

447 self.assertFalse(butler.exists(*args, **kwargs)) 

448 # get() should still fail. 

449 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)): 

450 butler.get(ref) 

451 # Registry shouldn't be able to find it by dataset_id anymore. 

452 self.assertIsNone(butler.get_dataset(ref.id)) 

453 

454 # Do explicit registry removal since we know they are 

455 # empty 

456 butler.collections.x_remove(this_run) 

457 expected_collections.remove(this_run) 

458 

459 # Create DatasetRef for put using default run. 

460 refIn = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run=butler.run) 

461 

462 # Check that getDeferred fails with standalone ref. 

463 with self.assertRaises(LookupError): 

464 butler.getDeferred(refIn) 

465 

466 # Put the dataset again, since the last thing we did was remove it 

467 # and we want to use the default collection. 

468 ref = butler.put(metric, refIn) 

469 

470 # Get with parameters 

471 stop = 4 

472 sliced = butler.get(ref, parameters={"slice": slice(stop)}) 

473 self.assertNotEqual(metric, sliced) 

474 self.assertEqual(metric.summary, sliced.summary) 

475 self.assertEqual(metric.output, sliced.output) 

476 assert metric.data is not None # for mypy 

477 self.assertEqual(metric.data[:stop], sliced.data) 

478 # getDeferred with parameters 

479 sliced = butler.getDeferred(ref, parameters={"slice": slice(stop)}).get() 

480 self.assertNotEqual(metric, sliced) 

481 self.assertEqual(metric.summary, sliced.summary) 

482 self.assertEqual(metric.output, sliced.output) 

483 self.assertEqual(metric.data[:stop], sliced.data) 

484 # getDeferred with deferred parameters 

485 sliced = butler.getDeferred(ref).get(parameters={"slice": slice(stop)}) 

486 self.assertNotEqual(metric, sliced) 

487 self.assertEqual(metric.summary, sliced.summary) 

488 self.assertEqual(metric.output, sliced.output) 

489 self.assertEqual(metric.data[:stop], sliced.data) 

490 

491 if storageClass.isComposite(): 

492 # Check that components can be retrieved 

493 metricOut = butler.get(ref.datasetType.name, dataId) 

494 compNameS = ref.datasetType.componentTypeName("summary") 

495 compNameD = ref.datasetType.componentTypeName("data") 

496 summary = butler.get(compNameS, dataId) 

497 self.assertEqual(summary, metric.summary) 

498 data = butler.get(compNameD, dataId) 

499 self.assertEqual(data, metric.data) 

500 

501 if "counter" in storageClass.derivedComponents: 

502 count = butler.get(ref.datasetType.componentTypeName("counter"), dataId) 

503 self.assertEqual(count, len(data)) 

504 

505 count = butler.get( 

506 ref.datasetType.componentTypeName("counter"), dataId, parameters={"slice": slice(stop)} 

507 ) 

508 self.assertEqual(count, stop) 

509 

510 compRef = butler.find_dataset(compNameS, dataId, collections=butler.collections.defaults) 

511 assert compRef is not None 

512 summary = butler.get(compRef) 

513 self.assertEqual(summary, metric.summary) 

514 

515 # Create a Dataset type that has the same name but is inconsistent. 

516 inconsistentDatasetType = DatasetType( 

517 datasetTypeName, datasetType.dimensions, self.storageClassFactory.getStorageClass("Config") 

518 ) 

519 

520 # Getting with a dataset type that does not match registry fails 

521 with self.assertRaisesRegex( 

522 ValueError, 

523 "(Supplied dataset type .* inconsistent with registry)" 

524 "|(The new storage class .* is not compatible with the existing storage class)", 

525 ): 

526 butler.get(inconsistentDatasetType, dataId) 

527 

528 # Combining a DatasetRef with a dataId should fail 

529 with self.assertRaisesRegex(ValueError, "DatasetRef given, cannot use dataId as well"): 

530 butler.get(ref, dataId) 

531 # Getting with an explicit ref should fail if the id doesn't match. 

532 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)): 

533 butler.get(DatasetRef(ref.datasetType, ref.dataId, id=uuid.UUID(int=101), run=butler.run)) 

534 

535 # Getting a dataset with unknown parameters should fail 

536 with self.assertRaisesRegex(KeyError, "Parameter 'unsupported' not understood"): 

537 butler.get(ref, parameters={"unsupported": True}) 

538 

539 # Check we have a collection 

540 collections = set(butler.collections.query("*")) 

541 self.assertEqual(collections, expected_collections) 

542 

543 # Clean up to check that we can remove something that may have 

544 # already had a component removed 

545 butler.pruneDatasets([ref], unstore=True, purge=True) 

546 

547 # Add the same ref again, so we can check that duplicate put fails. 

548 ref = butler.put(metric, datasetType, dataId) 

549 

550 # Repeat put will fail. 

551 with self.assertRaisesRegex( 

552 ConflictingDefinitionError, "A database constraint failure was triggered" 

553 ): 

554 butler.put(metric, datasetType, dataId) 

555 

556 # Remove the datastore entry. 

557 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False) 

558 

559 # Put will still fail 

560 with self.assertRaisesRegex( 

561 ConflictingDefinitionError, "A database constraint failure was triggered" 

562 ): 

563 butler.put(metric, datasetType, dataId) 

564 

565 # Repeat the same sequence with resolved ref. 

566 butler.pruneDatasets([ref], unstore=True, purge=True) 

567 ref = butler.put(metric, refIn) 

568 

569 # Repeat put will fail. 

570 with self.assertRaisesRegex(ConflictingDefinitionError, "Datastore already contains dataset"): 

571 butler.put(metric, refIn) 

572 

573 # Remove the datastore entry. 

574 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False) 

575 

576 # In case of resolved ref this write will succeed. 

577 ref = butler.put(metric, refIn) 

578 

579 # Leave the dataset in place since some downstream tests require 

580 # something to be present 

581 

582 return butler 

583 

584 def testDeferredCollectionPassing(self) -> None: 

585 # Construct a butler with no run or collection, but make it writeable. 

586 butler = self.create_empty_butler(writeable=True) 

587 # Create and register a DatasetType 

588 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

589 datasetType = self.addDatasetType( 

590 "example", dimensions, self.storageClassFactory.getStorageClass("StructuredData"), butler.registry 

591 ) 

592 # Add needed Dimensions 

593 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

594 butler.registry.insertDimensionData( 

595 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

596 ) 

597 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

598 butler.registry.insertDimensionData( 

599 "visit", 

600 { 

601 "instrument": "DummyCamComp", 

602 "id": 423, 

603 "name": "fourtwentythree", 

604 "physical_filter": "d-r", 

605 "day_obs": 20250101, 

606 }, 

607 ) 

608 dataId = {"instrument": "DummyCamComp", "visit": 423} 

609 # Create dataset. 

610 metric = makeExampleMetrics() 

611 # Register a new run and put dataset. 

612 run = "deferred" 

613 self.assertTrue(butler.collections.register(run)) 

614 # Second time it will be allowed but indicate no-op 

615 self.assertFalse(butler.collections.register(run)) 

616 ref = butler.put(metric, datasetType, dataId, run=run) 

617 # Putting with no run should fail with TypeError. 

618 with self.assertRaises(CollectionError): 

619 butler.put(metric, datasetType, dataId) 

620 # Dataset should exist. 

621 self.assertTrue(butler.exists(datasetType, dataId, collections=[run])) 

622 # We should be able to get the dataset back, but with and without 

623 # a deferred dataset handle. 

624 self.assertEqual(metric, butler.get(datasetType, dataId, collections=[run])) 

625 self.assertEqual(metric, butler.getDeferred(datasetType, dataId, collections=[run]).get()) 

626 # Trying to find the dataset without any collection is an error. 

627 with self.assertRaises(NoDefaultCollectionError): 

628 butler.exists(datasetType, dataId) 

629 with self.assertRaises(CollectionError): 

630 butler.get(datasetType, dataId) 

631 # Associate the dataset with a different collection. 

632 butler.collections.register("tagged", type=CollectionType.TAGGED) 

633 butler.registry.associate("tagged", [ref]) 

634 # Deleting the dataset from the new collection should make it findable 

635 # in the original collection. 

636 butler.pruneDatasets([ref], tags=["tagged"]) 

637 self.assertTrue(butler.exists(datasetType, dataId, collections=[run])) 

638 

639 

640class ButlerTests(ButlerPutGetTests): 

641 """Tests for Butler.""" 

642 

643 useTempRoot = True 

644 validationCanFail: bool 

645 fullConfigKey: str | None 

646 registryStr: str | None 

647 datastoreName: list[str] | None 

648 datastoreStr: list[str] 

649 predictionSupported = True 

650 """Does getURIs support 'prediction mode'?""" 

651 

652 def setUp(self) -> None: 

653 """Create a new butler root for each test.""" 

654 self.root = makeTestTempDir(TESTDIR) 

655 Butler.makeRepo(self.root, config=Config(self.configFile)) 

656 self.tmpConfigFile = os.path.join(self.root, "butler.yaml") 

657 

658 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool: 

659 """Return True if two URIs refer to the same resource. 

660 

661 Subclasses may override to handle unique requirements. 

662 """ 

663 return uri1 == uri2 

664 

665 def testConstructor(self) -> None: 

666 """Independent test of constructor.""" 

667 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

668 self.enterContext(butler) 

669 self.assertIsInstance(butler, Butler) 

670 

671 # Check that butler.yaml is added automatically. 

672 if self.tmpConfigFile.endswith(end := "/butler.yaml"): 

673 config_dir = self.tmpConfigFile[: -len(end)] 

674 butler = Butler.from_config(config_dir, run=self.default_run) 

675 self.enterContext(butler) 

676 self.assertIsInstance(butler, Butler) 

677 

678 # Even with a ResourcePath. 

679 butler = Butler.from_config(ResourcePath(config_dir, forceDirectory=True), run=self.default_run) 

680 self.enterContext(butler) 

681 self.assertIsInstance(butler, Butler) 

682 

683 collections = set(butler.collections.query("*")) 

684 self.assertEqual(collections, {self.default_run}) 

685 

686 # Check that some special characters can be included in run name. 

687 special_run = "u@b.c-A" 

688 butler_special = Butler.from_config(butler=butler, run=special_run) 

689 self.enterContext(butler_special) 

690 collections = set(butler_special.registry.queryCollections("*@*")) 

691 self.assertEqual(collections, {special_run}) 

692 

693 butler2 = Butler.from_config(butler=butler, collections=["other"]) 

694 self.enterContext(butler2) 

695 self.assertEqual(butler2.collections.defaults, ("other",)) 

696 self.assertIsNone(butler2.run) 

697 self.assertEqual(type(butler._datastore), type(butler2._datastore)) 

698 self.assertEqual(butler._datastore.config, butler2._datastore.config) 

699 

700 # Test that we can use an environment variable to find this 

701 # repository. 

702 butler_index = Config() 

703 butler_index["label"] = self.tmpConfigFile 

704 for suffix in (".yaml", ".json"): 

705 # Ensure that the content differs so that we know that 

706 # we aren't reusing the cache. 

707 bad_label = f"file://bucket/not_real{suffix}" 

708 butler_index["bad_label"] = bad_label 

709 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

710 butler_index.dumpToUri(temp_file) 

711 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

712 self.assertEqual(Butler.get_known_repos(), {"label", "bad_label"}) 

713 uri = Butler.get_repo_uri("bad_label") 

714 self.assertEqual(uri, ResourcePath(bad_label)) 

715 uri = Butler.get_repo_uri("label") 

716 butler = Butler.from_config(uri, writeable=False) 

717 self.assertIsInstance(butler, Butler) 

718 butler.close() 

719 butler = Butler.from_config("label", writeable=False) 

720 self.assertIsInstance(butler, Butler) 

721 butler.close() 

722 with self.assertRaisesRegex(FileNotFoundError, "aliases:.*bad_label"): 

723 Butler.from_config("not_there", writeable=False) 

724 with self.assertRaisesRegex(FileNotFoundError, "resolved from alias 'bad_label'"): 

725 Butler.from_config("bad_label") 

726 with self.assertRaises(FileNotFoundError): 

727 # Should ignore aliases. 

728 Butler.from_config(ResourcePath("label", forceAbsolute=False)) 

729 with self.assertRaises(KeyError) as cm: 

730 Butler.get_repo_uri("missing") 

731 self.assertEqual( 

732 Butler.get_repo_uri("missing", True), ResourcePath("missing", forceAbsolute=False) 

733 ) 

734 self.assertIn("not known to", str(cm.exception)) 

735 # Should report no failure. 

736 self.assertEqual(ButlerRepoIndex.get_failure_reason(), "") 

737 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

738 # Now with empty configuration. 

739 butler_index = Config() 

740 butler_index.dumpToUri(temp_file) 

741 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

742 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases)"): 

743 Butler.from_config("label") 

744 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

745 # Now with bad contents. 

746 with open(temp_file.ospath, "w") as fh: 

747 print("'", file=fh) 

748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

749 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases:.*could not be read)"): 

750 Butler.from_config("label") 

751 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": "file://not_found/x.yaml"}): 

752 with self.assertRaises(FileNotFoundError): 

753 Butler.get_repo_uri("label") 

754 self.assertEqual(Butler.get_known_repos(), set()) 

755 

756 with self.assertRaisesRegex(FileNotFoundError, "index file not found"): 

757 Butler.from_config("label") 

758 

759 # Check that we can create Butler when the alias file is not found. 

760 butler = Butler.from_config(self.tmpConfigFile, writeable=False) 

761 self.enterContext(butler) 

762 self.assertIsInstance(butler, Butler) 

763 with self.assertRaises(RuntimeError) as cm: 

764 # No environment variable set. 

765 Butler.get_repo_uri("label") 

766 self.assertEqual(Butler.get_repo_uri("label", True), ResourcePath("label", forceAbsolute=False)) 

767 self.assertIn("No repository index defined", str(cm.exception)) 

768 with self.assertRaisesRegex(FileNotFoundError, "no known aliases.*No repository index"): 

769 # No aliases registered. 

770 Butler.from_config("not_there") 

771 self.assertEqual(Butler.get_known_repos(), set()) 

772 

773 def testClose(self): 

774 butler = self.create_empty_butler(cleanup=False) 

775 is_direct_butler = isinstance(butler, DirectButler) 

776 if is_direct_butler: 

777 self.assertFalse(butler._closed) 

778 

779 with butler as butler_from_context_manager: 

780 self.assertIs(butler, butler_from_context_manager) 

781 if is_direct_butler: 

782 self.assertTrue(butler._closed) 

783 with self.assertRaisesRegex(RuntimeError, "has been closed"): 

784 butler.get_dataset_type("raw") 

785 

786 # Close may be called multiple times. 

787 butler.close() 

788 if is_direct_butler: 

789 self.assertTrue(butler._closed) 

790 

791 def testGarbageCollection(self): 

792 """Test that Butler does not have any circular references that prevent 

793 it from being garbage collected immediately when it goes out of scope. 

794 """ 

795 butler = self.create_empty_butler(cleanup=False) 

796 is_direct_butler = isinstance(butler, DirectButler) 

797 butler_ref = weakref.ref(butler) 

798 if is_direct_butler: 

799 registry_ref = weakref.ref(butler._registry) 

800 managers_ref = weakref.ref(butler._registry._managers) 

801 datastore_ref = weakref.ref(butler._datastore) 

802 db_ref = weakref.ref(butler._registry._db) 

803 engine_ref = weakref.ref(butler._registry._db._engine) 

804 

805 with warnings.catch_warnings(): 

806 # Hide warnings from unclosed database handles. 

807 warnings.simplefilter("ignore", ResourceWarning) 

808 del butler 

809 self.assertIsNone(butler_ref(), "Butler should have been garbage collected") 

810 if is_direct_butler: 

811 self.assertIsNone(registry_ref(), "SqlRegistry should have been garbage collected") 

812 self.assertIsNone(managers_ref(), "Registry managers should have been garbage collected") 

813 self.assertIsNone(datastore_ref(), "Datastore should have been garbage collected") 

814 self.assertIsNone(db_ref(), "Database should have been garbage collected") 

815 # SQLAlchemy has internal reference cycles, so the Engine instance 

816 # is not cleaned up promptly even if we release our reference to 

817 # it. Explicitly clean it up here to avoid file handles leaking. 

818 if is_direct_butler: 

819 engine = engine_ref() 

820 if engine is not None: 

821 engine.dispose() 

822 

823 def testDafButlerRepositories(self): 

824 with unittest.mock.patch.dict( 

825 os.environ, 

826 {"DAF_BUTLER_REPOSITORIES": "label: 'https://someuri.com'\notherLabel: 'https://otheruri.com'\n"}, 

827 ): 

828 self.assertEqual(str(Butler.get_repo_uri("label")), "https://someuri.com") 

829 

830 with unittest.mock.patch.dict( 

831 os.environ, 

832 { 

833 "DAF_BUTLER_REPOSITORIES": "label: https://someuri.com", 

834 "DAF_BUTLER_REPOSITORY_INDEX": "https://someuri.com", 

835 }, 

836 ): 

837 with self.assertRaisesRegex(RuntimeError, "Only one of the environment variables"): 

838 Butler.get_repo_uri("label") 

839 

840 with unittest.mock.patch.dict( 

841 os.environ, 

842 {"DAF_BUTLER_REPOSITORIES": "invalid"}, 

843 ): 

844 with self.assertRaisesRegex(ValueError, "Repository index not in expected format"): 

845 Butler.get_repo_uri("label") 

846 

847 def testBasicPutGet(self) -> None: 

848 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

849 self.runPutGetTest(storageClass, "test_metric") 

850 

851 def testCompositePutGetConcrete(self) -> None: 

852 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadCompNoDisassembly") 

853 butler = self.runPutGetTest(storageClass, "test_metric") 

854 

855 # Should *not* be disassembled 

856 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run)) 

857 self.assertEqual(len(datasets), 1) 

858 uri, components = butler.getURIs(datasets[0]) 

859 self.assertIsInstance(uri, ResourcePath) 

860 self.assertFalse(components) 

861 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}") 

862 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}") 

863 

864 # Predicted dataset 

865 if self.predictionSupported: 

866 dataId = {"instrument": "DummyCamComp", "visit": 424} 

867 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True) 

868 self.assertFalse(components) 

869 self.assertIsInstance(uri, ResourcePath) 

870 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}") 

871 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}") 

872 # Repeat with a DatasetRef to test that code path. 

873 ref = DatasetRef( 

874 datasets[0].datasetType, 

875 dataId=DataCoordinate.standardize(dataId, universe=butler.dimensions), 

876 run=self.default_run, 

877 ) 

878 uri2, components2 = butler.getURIs(ref, predict=True) 

879 self.assertFalse(components2) 

880 self.assertEqual(uri, uri2) 

881 

882 def testCompositePutGetVirtual(self) -> None: 

883 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadComp") 

884 butler = self.runPutGetTest(storageClass, "test_metric_comp") 

885 

886 # Should be disassembled 

887 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run)) 

888 self.assertEqual(len(datasets), 1) 

889 uri, components = butler.getURIs(datasets[0]) 

890 

891 if butler._datastore.isEphemeral: 

892 # Never disassemble in-memory datastore 

893 self.assertIsInstance(uri, ResourcePath) 

894 self.assertFalse(components) 

895 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}") 

896 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}") 

897 else: 

898 self.assertIsNone(uri) 

899 self.assertEqual(set(components), set(storageClass.components)) 

900 for compuri in components.values(): 

901 self.assertIsInstance(compuri, ResourcePath) 

902 self.assertIn("423", str(compuri), f"Checking visit is in URI {compuri}") 

903 self.assertEqual(compuri.fragment, "", f"Checking absence of fragment in {compuri}") 

904 

905 if self.predictionSupported: 

906 # Predicted dataset 

907 dataId = {"instrument": "DummyCamComp", "visit": 424} 

908 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True) 

909 

910 if butler._datastore.isEphemeral: 

911 # Never disassembled 

912 self.assertIsInstance(uri, ResourcePath) 

913 self.assertFalse(components) 

914 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}") 

915 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}") 

916 else: 

917 self.assertIsNone(uri) 

918 self.assertEqual(set(components), set(storageClass.components)) 

919 for compuri in components.values(): 

920 self.assertIsInstance(compuri, ResourcePath) 

921 self.assertIn("424", str(compuri), f"Checking visit is in URI {compuri}") 

922 self.assertEqual(compuri.fragment, "predicted", f"Checking for fragment in {compuri}") 

923 

924 def testStorageClassOverrideGet(self) -> None: 

925 """Test storage class conversion on get with override.""" 

926 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

927 datasetTypeName = "anything" 

928 run = self.default_run 

929 

930 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName) 

931 

932 # Create and store a dataset. 

933 metric = makeExampleMetrics() 

934 dataId = {"instrument": "DummyCamComp", "visit": 423} 

935 

936 ref = butler.put(metric, datasetType, dataId) 

937 

938 # Return native type. 

939 retrieved = butler.get(ref) 

940 self.assertEqual(retrieved, metric) 

941 

942 # Specify an override. 

943 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion") 

944 model = butler.get(ref, storageClass=new_sc) 

945 self.assertNotEqual(type(model), type(retrieved)) 

946 self.assertIs(type(model), new_sc.pytype) 

947 self.assertEqual(retrieved, model) 

948 

949 # Defer but override later. 

950 deferred = butler.getDeferred(ref) 

951 model = deferred.get(storageClass=new_sc) 

952 self.assertIs(type(model), new_sc.pytype) 

953 self.assertEqual(retrieved, model) 

954 

955 # Defer but override up front. 

956 deferred = butler.getDeferred(ref, storageClass=new_sc) 

957 model = deferred.get() 

958 self.assertIs(type(model), new_sc.pytype) 

959 self.assertEqual(retrieved, model) 

960 

961 # Retrieve a component. Should be a tuple. 

962 data = butler.get("anything.data", dataId, storageClass="StructuredDataDataTestTuple") 

963 self.assertIs(type(data), tuple) 

964 self.assertEqual(data, tuple(retrieved.data)) 

965 

966 # Parameter on the write storage class should work regardless 

967 # of read storage class. 

968 data = butler.get( 

969 "anything.data", 

970 dataId, 

971 storageClass="StructuredDataDataTestTuple", 

972 parameters={"slice": slice(2, 4)}, 

973 ) 

974 self.assertEqual(len(data), 2) 

975 

976 # Try a parameter that is known to the read storage class but not 

977 # the write storage class. 

978 with self.assertRaises(KeyError): 

979 butler.get( 

980 "anything.data", 

981 dataId, 

982 storageClass="StructuredDataDataTestTuple", 

983 parameters={"xslice": slice(2, 4)}, 

984 ) 

985 

986 def testPytypePutCoercion(self) -> None: 

987 """Test python type coercion on Butler.get and put.""" 

988 # Store some data with the normal example storage class. 

989 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

990 datasetTypeName = "test_metric" 

991 butler, _ = self.create_butler(self.default_run, storageClass, datasetTypeName) 

992 

993 dataId = {"instrument": "DummyCamComp", "visit": 423} 

994 

995 # Put a dict and this should coerce to a MetricsExample 

996 test_dict = {"summary": {"a": 1}, "output": {"b": 2}} 

997 metric_ref = butler.put(test_dict, datasetTypeName, dataId=dataId, visit=424) 

998 test_metric = butler.get(metric_ref) 

999 self.assertEqual(get_full_type_name(test_metric), "lsst.daf.butler.tests.MetricsExample") 

1000 self.assertEqual(test_metric.summary, test_dict["summary"]) 

1001 self.assertEqual(test_metric.output, test_dict["output"]) 

1002 

1003 # Check that the put still works if a DatasetType is given with 

1004 # a definition matching this python type. 

1005 registry_type = butler.get_dataset_type(datasetTypeName) 

1006 this_type = DatasetType(datasetTypeName, registry_type.dimensions, "StructuredDataDictJson") 

1007 metric2_ref = butler.put(test_dict, this_type, dataId=dataId, visit=425) 

1008 self.assertEqual(metric2_ref.datasetType, registry_type) 

1009 

1010 # The get will return the type expected by registry. 

1011 test_metric2 = butler.get(metric2_ref) 

1012 self.assertEqual(get_full_type_name(test_metric2), "lsst.daf.butler.tests.MetricsExample") 

1013 

1014 # Make a new DatasetRef with the compatible but different DatasetType. 

1015 # This should now return a dict. 

1016 new_ref = DatasetRef(this_type, metric2_ref.dataId, id=metric2_ref.id, run=metric2_ref.run) 

1017 test_dict2 = butler.get(new_ref) 

1018 self.assertEqual(get_full_type_name(test_dict2), "dict") 

1019 

1020 # Get it again with the wrong dataset type definition using get() 

1021 # rather than get(). This should be consistent with get() 

1022 # behavior and return the type of the DatasetType. 

1023 test_dict3 = butler.get(this_type, dataId=dataId, visit=425) 

1024 self.assertEqual(get_full_type_name(test_dict3), "dict") 

1025 

1026 def test_ingest_zip(self) -> None: 

1027 """Create butler, export data, delete data, import from Zip.""" 

1028 butler, dataset_type = self.create_butler( 

1029 run=self.default_run, storageClass="StructuredData", datasetTypeName="metrics" 

1030 ) 

1031 

1032 metric = makeExampleMetrics() 

1033 refs = [] 

1034 for visit in (423, 424, 425): 

1035 ref = butler.put(metric, dataset_type, instrument="DummyCamComp", visit=visit) 

1036 refs.append(ref) 

1037 

1038 # Retrieve a Zip file. 

1039 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

1040 zip = butler.retrieve_artifacts_zip(refs, destination=tmpdir) 

1041 

1042 # Ingest will fail. 

1043 with self.assertRaises(ConflictingDefinitionError): 

1044 butler.ingest_zip(zip) 

1045 

1046 # Clear out the collection. 

1047 butler.removeRuns([self.default_run]) 

1048 self.assertFalse(butler.exists(refs[0])) 

1049 

1050 butler.ingest_zip(zip, transfer="copy") 

1051 self.assertGreater(butler._metrics.time_in_ingest, 0.0) 

1052 self.assertEqual(butler._metrics.n_ingest, len(refs)) 

1053 

1054 # Check that it fails if we try it again. 

1055 with self.assertRaises(ConflictingDefinitionError): 

1056 butler.ingest_zip(zip, transfer="copy") 

1057 

1058 # This will be a no-op. 

1059 butler.ingest_zip(zip, transfer="copy", skip_existing=True) 

1060 

1061 # Create an entirely new local file butler in this temp directory. 

1062 new_butler_cfg = Butler.makeRepo(tmpdir) 

1063 new_butler = Butler.from_config(new_butler_cfg, writeable=True) 

1064 self.enterContext(new_butler) 

1065 

1066 # This will fail since dimensions records are missing. 

1067 with self.assertRaises(ConflictingDefinitionError): 

1068 new_butler.ingest_zip(zip, transfer="copy") 

1069 

1070 # Dry run should work. 

1071 new_butler.ingest_zip(zip, transfer="copy", dry_run=True) 

1072 

1073 new_butler.ingest_zip(zip, transfer="copy", transfer_dimensions=True) 

1074 self.assertTrue(butler.exists(refs[0])) 

1075 

1076 # Check that the refs can be read again. 

1077 _ = [butler.get(ref) for ref in refs] 

1078 

1079 uri = butler.getURI(refs[2]) 

1080 self.assertTrue(uri.exists()) 

1081 

1082 # Delete one dataset. The Zip file should still exist and allow 

1083 # remaining refs to be read. 

1084 butler.pruneDatasets([refs[0]], purge=True, unstore=True) 

1085 self.assertTrue(uri.exists()) 

1086 

1087 metric2 = butler.get(refs[1]) 

1088 self.assertEqual(metric2, metric, msg=f"{metric2} != {metric}") 

1089 

1090 butler.removeRuns([self.default_run]) 

1091 self.assertFalse(uri.exists()) 

1092 self.assertFalse(butler.exists(refs[-1])) 

1093 

1094 with self.assertRaises(ValueError): 

1095 butler.retrieve_artifacts_zip([], destination=".") 

1096 

1097 def testIngest(self) -> None: 

1098 butler = self.create_empty_butler(run=self.default_run) 

1099 

1100 # Create and register a DatasetType 

1101 dimensions = butler.dimensions.conform(["instrument", "visit", "detector"]) 

1102 

1103 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDictYaml") 

1104 datasetTypeName = "metric" 

1105 

1106 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1107 

1108 # Add needed Dimensions 

1109 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1110 butler.registry.insertDimensionData( 

1111 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1112 ) 

1113 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

1114 for detector in (1, 2): 

1115 butler.registry.insertDimensionData( 

1116 "detector", {"instrument": "DummyCamComp", "id": detector, "full_name": f"detector{detector}"} 

1117 ) 

1118 

1119 butler.registry.insertDimensionData( 

1120 "visit", 

1121 { 

1122 "instrument": "DummyCamComp", 

1123 "id": 423, 

1124 "name": "fourtwentythree", 

1125 "physical_filter": "d-r", 

1126 "day_obs": 20250101, 

1127 }, 

1128 { 

1129 "instrument": "DummyCamComp", 

1130 "id": 424, 

1131 "name": "fourtwentyfour", 

1132 "physical_filter": "d-r", 

1133 "day_obs": 20250101, 

1134 }, 

1135 ) 

1136 

1137 formatter = doImportType("lsst.daf.butler.formatters.yaml.YamlFormatter") 

1138 dataRoot = os.path.join(TESTDIR, "data", "basic") 

1139 datasets = [] 

1140 # Test one DatasetRef with a run that exists, and the other with a run 

1141 # that doesn't exist, to verify that run collections are created when 

1142 # required. 

1143 runs = {1: self.default_run, 2: "a/new/run"} 

1144 for detector in (1, 2): 

1145 detector_name = f"detector_{detector}" 

1146 metricFile = os.path.join(dataRoot, f"{detector_name}.yaml") 

1147 dataId = butler.registry.expandDataId( 

1148 {"instrument": "DummyCamComp", "visit": 423, "detector": detector} 

1149 ) 

1150 # Create a DatasetRef for ingest 

1151 refIn = DatasetRef(datasetType, dataId, run=runs[detector]) 

1152 

1153 datasets.append(FileDataset(path=metricFile, refs=[refIn], formatter=formatter)) 

1154 

1155 butler.ingest(*datasets, transfer="copy") 

1156 

1157 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 423} 

1158 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 423} 

1159 

1160 metrics1 = butler.get(datasetTypeName, dataId1) 

1161 metrics2 = butler.get(datasetTypeName, dataId2, collections="a/new/run") 

1162 self.assertNotEqual(metrics1, metrics2) 

1163 

1164 # Compare URIs 

1165 uri1 = butler.getURI(datasetTypeName, dataId1) 

1166 uri2 = butler.getURI(datasetTypeName, dataId2, collections="a/new/run") 

1167 self.assertFalse(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}") 

1168 

1169 # Re-ingesting the same datasets raises an error with 

1170 # skip_existing=False. 

1171 with self.assertRaises(ConflictingDefinitionError): 

1172 butler.ingest(*datasets, transfer="copy") 

1173 # skip_existing=True makes it a no-op to re-ingest the same datasets. 

1174 butler.ingest(*datasets, transfer="copy", skip_existing=True) 

1175 

1176 # Now do a multi-dataset but single file ingest 

1177 metricFile = os.path.join(dataRoot, "detectors.yaml") 

1178 refs = [] 

1179 for detector in (1, 2): 

1180 detector_name = f"detector_{detector}" 

1181 dataId = butler.registry.expandDataId( 

1182 {"instrument": "DummyCamComp", "visit": 424, "detector": detector} 

1183 ) 

1184 # Create a DatasetRef for ingest 

1185 refs.append(DatasetRef(datasetType, dataId, run=self.default_run)) 

1186 

1187 # Test "move" transfer to ensure that the files themselves 

1188 # have disappeared following ingest. 

1189 with ResourcePath.temporary_uri(suffix=".yaml") as tempFile: 

1190 tempFile.transfer_from(ResourcePath(metricFile), transfer="copy") 

1191 

1192 datasets = [] 

1193 datasets.append(FileDataset(path=tempFile, refs=refs, formatter=MultiDetectorFormatter)) 

1194 

1195 # For first ingest use copy. 

1196 butler.ingest(*datasets, transfer="copy", record_validation_info=False) 

1197 

1198 # Now try to ingest again in "execution butler" mode where 

1199 # the registry entries exist but the datastore does not have 

1200 # the files. We also need to strip the dimension records to ensure 

1201 # that they will be re-added by the ingest. 

1202 ref = datasets[0].refs[0] 

1203 datasets[0].refs = [ 

1204 cast( 

1205 DatasetRef, 

1206 butler.find_dataset(ref.datasetType, data_id=ref.dataId, collections=ref.run), 

1207 ) 

1208 for ref in datasets[0].refs 

1209 ] 

1210 all_refs = [] 

1211 for dataset in datasets: 

1212 refs = [] 

1213 for ref in dataset.refs: 

1214 # Create a dict from the dataId to drop the records. 

1215 new_data_id = dict(ref.dataId.required) 

1216 new_ref = butler.find_dataset(ref.datasetType, new_data_id, collections=ref.run) 

1217 assert new_ref is not None 

1218 self.assertFalse(new_ref.dataId.hasRecords()) 

1219 refs.append(new_ref) 

1220 dataset.refs = refs 

1221 all_refs.extend(dataset.refs) 

1222 butler.pruneDatasets(all_refs, disassociate=False, unstore=True, purge=False) 

1223 

1224 # Use move mode to test that the file is deleted. Also 

1225 # disable recording of file size. 

1226 butler.ingest(*datasets, transfer="move", record_validation_info=False) 

1227 

1228 # Check that every ref now has records. 

1229 for dataset in datasets: 

1230 for ref in dataset.refs: 

1231 self.assertTrue(ref.dataId.hasRecords()) 

1232 

1233 # Ensure that the file has disappeared. 

1234 self.assertFalse(tempFile.exists()) 

1235 

1236 # Check that the datastore recorded no file size. 

1237 # Not all datastores can support this. 

1238 try: 

1239 infos = butler._datastore.getStoredItemsInfo(datasets[0].refs[0]) # type: ignore[attr-defined] 

1240 self.assertEqual(infos[0].file_size, -1) 

1241 except AttributeError: 

1242 pass 

1243 

1244 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 424} 

1245 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 424} 

1246 

1247 multi1 = butler.get(datasetTypeName, dataId1) 

1248 multi2 = butler.get(datasetTypeName, dataId2) 

1249 

1250 self.assertEqual(multi1, metrics1) 

1251 self.assertEqual(multi2, metrics2) 

1252 

1253 # Compare URIs 

1254 uri1 = butler.getURI(datasetTypeName, dataId1) 

1255 uri2 = butler.getURI(datasetTypeName, dataId2) 

1256 self.assertTrue(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}") 

1257 

1258 # Test that removing one does not break the second 

1259 # This line will issue a warning log message for a ChainedDatastore 

1260 # that uses an InMemoryDatastore since in-memory can not ingest 

1261 # files. 

1262 butler.pruneDatasets([datasets[0].refs[0]], unstore=True, disassociate=False) 

1263 self.assertFalse(butler.exists(datasetTypeName, dataId1)) 

1264 self.assertTrue(butler.exists(datasetTypeName, dataId2)) 

1265 multi2b = butler.get(datasetTypeName, dataId2) 

1266 self.assertEqual(multi2, multi2b) 

1267 

1268 # Ensure we can ingest 0 datasets 

1269 datasets = [] 

1270 butler.ingest(*datasets) 

1271 

1272 def testPickle(self) -> None: 

1273 """Test pickle support.""" 

1274 butler = self.create_empty_butler(run=self.default_run) 

1275 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration" 

1276 butlerOut = pickle.loads(pickle.dumps(butler)) 

1277 self.enterContext(butlerOut) 

1278 self.assertIsInstance(butlerOut, Butler) 

1279 self.assertEqual(butlerOut._config, butler._config) 

1280 self.assertEqual(list(butlerOut.collections.defaults), list(butler.collections.defaults)) 

1281 self.assertEqual(butlerOut.run, butler.run) 

1282 

1283 def testGetDatasetTypes(self) -> None: 

1284 butler = self.create_empty_butler(run=self.default_run) 

1285 dimensions = butler.dimensions.conform(["instrument", "visit", "physical_filter"]) 

1286 dimensionEntries: list[tuple[str, list[Mapping[str, Any]]]] = [ 

1287 ( 

1288 "instrument", 

1289 [ 

1290 {"instrument": "DummyCam"}, 

1291 {"instrument": "DummyHSC"}, 

1292 {"instrument": "DummyCamComp"}, 

1293 ], 

1294 ), 

1295 ("physical_filter", [{"instrument": "DummyCam", "name": "d-r", "band": "R"}]), 

1296 ("day_obs", [{"instrument": "DummyCam", "id": 20250101}]), 

1297 ( 

1298 "visit", 

1299 [ 

1300 { 

1301 "instrument": "DummyCam", 

1302 "id": 42, 

1303 "name": "fortytwo", 

1304 "physical_filter": "d-r", 

1305 "day_obs": 20250101, 

1306 } 

1307 ], 

1308 ), 

1309 ] 

1310 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1311 # Add needed Dimensions 

1312 for element, data in dimensionEntries: 

1313 butler.registry.insertDimensionData(element, *data) 

1314 

1315 # When a DatasetType is added to the registry entries are not created 

1316 # for components but querying them can return the components. 

1317 datasetTypeNames = {"metric", "metric2", "metric4", "metric33", "pvi", "paramtest"} 

1318 components = set() 

1319 for datasetTypeName in datasetTypeNames: 

1320 # Create and register a DatasetType 

1321 self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1322 

1323 for componentName in storageClass.components: 

1324 components.add(DatasetType.nameWithComponent(datasetTypeName, componentName)) 

1325 

1326 fromRegistry: set[DatasetType] = set() 

1327 for parent_dataset_type in butler.registry.queryDatasetTypes(): 

1328 fromRegistry.add(parent_dataset_type) 

1329 fromRegistry.update(parent_dataset_type.makeAllComponentDatasetTypes()) 

1330 self.assertEqual({d.name for d in fromRegistry}, datasetTypeNames | components) 

1331 

1332 # Query with wildcard. 

1333 dataset_types = butler.registry.queryDatasetTypes("metric*") 

1334 self.assertEqual(len(dataset_types), 4, f"Got: {dataset_types}") 

1335 # but not regex. 

1336 with self.assertRaises(DatasetTypeExpressionError): 

1337 butler.registry.queryDatasetTypes(["pvi", re.compile("metric.*")]) 

1338 

1339 # Now that we have some dataset types registered, validate them 

1340 butler.validateConfiguration( 

1341 ignore=[ 

1342 "test_metric_comp", 

1343 "metric3", 

1344 "metric5", 

1345 "calexp", 

1346 "DummySC", 

1347 "datasetType.component", 

1348 "random_data", 

1349 "random_data_2", 

1350 ] 

1351 ) 

1352 

1353 # Add a new datasetType that will fail template validation 

1354 self.addDatasetType("test_metric_comp", dimensions, storageClass, butler.registry) 

1355 if self.validationCanFail: 

1356 with self.assertRaises(ValidationError): 

1357 butler.validateConfiguration() 

1358 

1359 # Rerun validation but with a subset of dataset type names 

1360 butler.validateConfiguration(datasetTypeNames=["metric4"]) 

1361 

1362 # Rerun validation but ignore the bad datasetType 

1363 butler.validateConfiguration( 

1364 ignore=[ 

1365 "test_metric_comp", 

1366 "metric3", 

1367 "metric5", 

1368 "calexp", 

1369 "DummySC", 

1370 "datasetType.component", 

1371 "random_data", 

1372 "random_data_2", 

1373 ] 

1374 ) 

1375 

1376 def testTransaction(self) -> None: 

1377 butler = self.create_empty_butler(run=self.default_run) 

1378 datasetTypeName = "test_metric" 

1379 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

1380 dimensionEntries: tuple[tuple[str, Mapping[str, Any]], ...] = ( 

1381 ("instrument", {"instrument": "DummyCam"}), 

1382 ("physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "R"}), 

1383 ("day_obs", {"instrument": "DummyCam", "id": 20250101}), 

1384 ( 

1385 "visit", 

1386 { 

1387 "instrument": "DummyCam", 

1388 "id": 42, 

1389 "name": "fortytwo", 

1390 "physical_filter": "d-r", 

1391 "day_obs": 20250101, 

1392 }, 

1393 ), 

1394 ) 

1395 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1396 metric = makeExampleMetrics() 

1397 dataId = {"instrument": "DummyCam", "visit": 42} 

1398 # Create and register a DatasetType 

1399 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1400 with self.assertRaises(TransactionTestError): 

1401 with butler.transaction(): 

1402 # Add needed Dimensions 

1403 for args in dimensionEntries: 

1404 butler.registry.insertDimensionData(*args) 

1405 # Store a dataset 

1406 ref = butler.put(metric, datasetTypeName, dataId) 

1407 self.assertIsInstance(ref, DatasetRef) 

1408 # Test get of a ref. 

1409 metricOut = butler.get(ref) 

1410 self.assertEqual(metric, metricOut) 

1411 # Test get 

1412 metricOut = butler.get(datasetTypeName, dataId) 

1413 self.assertEqual(metric, metricOut) 

1414 # Check we can get components 

1415 self.assertGetComponents(butler, ref, ("summary", "data", "output"), metric) 

1416 raise TransactionTestError("This should roll back the entire transaction") 

1417 with self.assertRaises(DataIdValueError, msg=f"Check can't expand DataId {dataId}"): 

1418 butler.registry.expandDataId(dataId) 

1419 # Should raise LookupError for missing data ID value 

1420 with self.assertRaises(LookupError, msg=f"Check can't get by {datasetTypeName} and {dataId}"): 

1421 butler.get(datasetTypeName, dataId) 

1422 # Also check explicitly if Dataset entry is missing 

1423 self.assertIsNone(butler.find_dataset(datasetType, dataId, collections=butler.collections.defaults)) 

1424 # Direct retrieval should not find the file in the Datastore 

1425 with self.assertRaises(FileNotFoundError, msg=f"Check {ref} can't be retrieved directly"): 

1426 butler.get(ref) 

1427 

1428 def testMakeRepo(self) -> None: 

1429 """Test that we can write butler configuration to a new repository via 

1430 the Butler.makeRepo interface and then instantiate a butler from the 

1431 repo root. 

1432 """ 

1433 # Do not run the test if we know this datastore configuration does 

1434 # not support a file system root 

1435 if self.fullConfigKey is None: 

1436 return 

1437 

1438 # create two separate directories 

1439 root1 = tempfile.mkdtemp(dir=self.root) 

1440 root2 = tempfile.mkdtemp(dir=self.root) 

1441 

1442 self.assertFalse(Butler.has_repo_config(root1)) 

1443 butlerConfig = Butler.makeRepo(root1, config=Config(self.configFile)) 

1444 self.assertTrue(Butler.has_repo_config(root1)) 

1445 limited = Config(self.configFile) 

1446 butler1 = Butler.from_config(butlerConfig) 

1447 self.enterContext(butler1) 

1448 assert isinstance(butler1, DirectButler), "Expect DirectButler in configuration" 

1449 butlerConfig = Butler.makeRepo(root2, standalone=True, config=Config(self.configFile)) 

1450 full = Config(self.tmpConfigFile) 

1451 butler2 = Butler.from_config(butlerConfig) 

1452 self.enterContext(butler2) 

1453 assert isinstance(butler2, DirectButler), "Expect DirectButler in configuration" 

1454 # Butlers should have the same configuration regardless of whether 

1455 # defaults were expanded. 

1456 self.assertEqual(butler1._config, butler2._config) 

1457 # Config files loaded directly should not be the same. 

1458 self.assertNotEqual(limited, full) 

1459 # Make sure "limited" doesn't have a few keys we know it should be 

1460 # inheriting from defaults. 

1461 self.assertIn(self.fullConfigKey, full) 

1462 self.assertNotIn(self.fullConfigKey, limited) 

1463 

1464 # Collections don't appear until something is put in them 

1465 collections1 = set(butler1.registry.queryCollections()) 

1466 self.assertEqual(collections1, set()) 

1467 self.assertEqual(set(butler2.registry.queryCollections()), collections1) 

1468 

1469 # Check that a config with no associated file name will not 

1470 # work properly with relocatable Butler repo 

1471 butlerConfig.configFile = None 

1472 with self.assertRaises(ValueError): 

1473 Butler.from_config(butlerConfig) 

1474 

1475 with self.assertRaises(FileExistsError): 

1476 Butler.makeRepo(self.root, standalone=True, config=Config(self.configFile), overwrite=False) 

1477 

1478 def testStringification(self) -> None: 

1479 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

1480 self.enterContext(butler) 

1481 butlerStr = str(butler) 

1482 

1483 if self.datastoreStr is not None: 

1484 for testStr in self.datastoreStr: 

1485 self.assertIn(testStr, butlerStr) 

1486 if self.registryStr is not None: 

1487 self.assertIn(self.registryStr, butlerStr) 

1488 

1489 datastoreName = butler._datastore.name 

1490 if self.datastoreName is not None: 

1491 for testStr in self.datastoreName: 

1492 self.assertIn(testStr, datastoreName) 

1493 

1494 def testButlerRewriteDataId(self) -> None: 

1495 """Test that dataIds can be rewritten based on dimension records.""" 

1496 butler = self.create_empty_butler(run=self.default_run) 

1497 

1498 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

1499 datasetTypeName = "random_data" 

1500 

1501 # Create dimension records. 

1502 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1503 butler.registry.insertDimensionData( 

1504 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1505 ) 

1506 butler.registry.insertDimensionData( 

1507 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"} 

1508 ) 

1509 

1510 dimensions = butler.dimensions.conform(["instrument", "exposure"]) 

1511 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

1512 butler.registry.registerDatasetType(datasetType) 

1513 

1514 n_exposures = 5 

1515 dayobs = 20210530 

1516 

1517 # Create records for multiple day_obs but same seq_num to test that 

1518 # we are constraining gets properly when day_obs/seq_num is used 

1519 # for an exposure. Second day is year in future but is not used. 

1520 for day_obs in (dayobs, dayobs + 1_00_00): 

1521 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": day_obs}) 

1522 

1523 for i in range(n_exposures): 

1524 group_name = f"group_{day_obs}_{i}" 

1525 butler.registry.insertDimensionData( 

1526 "group", {"instrument": "DummyCamComp", "name": group_name} 

1527 ) 

1528 butler.registry.insertDimensionData( 

1529 "exposure", 

1530 { 

1531 "instrument": "DummyCamComp", 

1532 "id": day_obs + i, 

1533 "obs_id": f"exp_{day_obs}_{i}", 

1534 "seq_num": i, 

1535 "day_obs": day_obs, 

1536 "physical_filter": "d-r", 

1537 "group": group_name, 

1538 }, 

1539 ) 

1540 

1541 # Write some data. 

1542 for i in range(n_exposures): 

1543 metric = {"something": i, "other": "metric", "list": [2 * x for x in range(i)]} 

1544 

1545 # Use the seq_num for the put to test rewriting. 

1546 dataId = {"seq_num": i, "day_obs": dayobs, "instrument": "DummyCamComp", "physical_filter": "d-r"} 

1547 ref = butler.put(metric, datasetTypeName, dataId=dataId) 

1548 

1549 # Check that the exposure is correct in the dataId 

1550 self.assertEqual(ref.dataId["exposure"], dayobs + i) 

1551 

1552 # and check that we can get the dataset back with the same dataId 

1553 new_metric = butler.get(datasetTypeName, dataId=dataId) 

1554 self.assertEqual(new_metric, metric) 

1555 

1556 # Check that we can find the datasets using the day_obs or the 

1557 # exposure.day_obs. 

1558 datasets_1 = list( 

1559 butler.registry.queryDatasets( 

1560 datasetType, 

1561 collections=self.default_run, 

1562 where="day_obs = :dayObs AND instrument = :instr", 

1563 bind={"dayObs": dayobs, "instr": "DummyCamComp"}, 

1564 ) 

1565 ) 

1566 datasets_2 = list( 

1567 butler.registry.queryDatasets( 

1568 datasetType, 

1569 collections=self.default_run, 

1570 where="exposure.day_obs = :dayObs AND instrument = :instr", 

1571 bind={"dayObs": dayobs, "instr": "DummyCamComp"}, 

1572 ) 

1573 ) 

1574 self.assertEqual(datasets_1, datasets_2) 

1575 

1576 def testGetDatasetCollectionCaching(self): 

1577 # Prior to DM-41117, there was a bug where get_dataset would throw 

1578 # MissingCollectionError if you tried to fetch a dataset that was added 

1579 # after the collection cache was last updated. 

1580 reader_butler, datasetType = self.create_butler(self.default_run, "int", "datasettypename") 

1581 writer_butler = self.create_empty_butler(writeable=True, run="new_run") 

1582 dataId = {"instrument": "DummyCamComp", "visit": 423} 

1583 put_ref = writer_butler.put(123, datasetType, dataId) 

1584 get_ref = reader_butler.get_dataset(put_ref.id) 

1585 self.assertEqual(get_ref.id, put_ref.id) 

1586 # Also works when looking up via a hexadecimal string instead of a UUID 

1587 # instance. 

1588 hex_ref = reader_butler.get_dataset(put_ref.id.hex) 

1589 self.assertEqual(hex_ref.id, put_ref.id) 

1590 

1591 def testCollectionChainRedefine(self): 

1592 butler = self._setup_to_test_collection_chain() 

1593 

1594 butler.collections.redefine_chain("chain", "a") 

1595 self._check_chain(butler, ["a"]) 

1596 

1597 # Duplicates are removed from the list of children 

1598 butler.collections.redefine_chain("chain", ["c", "b", "c"]) 

1599 self._check_chain(butler, ["c", "b"]) 

1600 

1601 # Empty list clears the chain 

1602 butler.collections.redefine_chain("chain", []) 

1603 self._check_chain(butler, []) 

1604 

1605 self._test_common_chain_functionality(butler, butler.collections.redefine_chain) 

1606 

1607 def testCollectionChainPrepend(self): 

1608 butler = self._setup_to_test_collection_chain() 

1609 

1610 # Duplicates are removed from the list of children 

1611 butler.collections.prepend_chain("chain", ["c", "b", "c"]) 

1612 self._check_chain(butler, ["c", "b"]) 

1613 

1614 # Prepend goes on the front of existing chain 

1615 butler.collections.prepend_chain("chain", ["a"]) 

1616 self._check_chain(butler, ["a", "c", "b"]) 

1617 

1618 # Empty prepend does nothing 

1619 butler.collections.prepend_chain("chain", []) 

1620 self._check_chain(butler, ["a", "c", "b"]) 

1621 

1622 # Prepending children that already exist in the chain removes them from 

1623 # their current position. 

1624 butler.collections.prepend_chain("chain", ["d", "b", "c"]) 

1625 self._check_chain(butler, ["d", "b", "c", "a"]) 

1626 

1627 self._test_common_chain_functionality(butler, butler.collections.prepend_chain) 

1628 

1629 def testCollectionChainExtend(self): 

1630 butler = self._setup_to_test_collection_chain() 

1631 

1632 # Duplicates are removed from the list of children 

1633 butler.collections.extend_chain("chain", ["c", "b", "c"]) 

1634 self._check_chain(butler, ["c", "b"]) 

1635 

1636 # Extend goes on the end of existing chain 

1637 butler.collections.extend_chain("chain", ["a"]) 

1638 self._check_chain(butler, ["c", "b", "a"]) 

1639 

1640 # Empty extend does nothing 

1641 butler.collections.extend_chain("chain", []) 

1642 self._check_chain(butler, ["c", "b", "a"]) 

1643 

1644 # Extending children that already exist in the chain removes them from 

1645 # their current position. 

1646 butler.collections.extend_chain("chain", ["d", "b", "c"]) 

1647 self._check_chain(butler, ["a", "d", "b", "c"]) 

1648 

1649 self._test_common_chain_functionality(butler, butler.collections.extend_chain) 

1650 

1651 def testCollectionChainRemove(self) -> None: 

1652 butler = self._setup_to_test_collection_chain() 

1653 

1654 butler.collections.redefine_chain("chain", ["a", "b", "c", "d"]) 

1655 

1656 butler.collections.remove_from_chain("chain", "c") 

1657 self._check_chain(butler, ["a", "b", "d"]) 

1658 

1659 # Duplicates are allowed in the list of children 

1660 butler.collections.remove_from_chain("chain", ["b", "b", "a"]) 

1661 self._check_chain(butler, ["d"]) 

1662 

1663 # Empty remove does nothing 

1664 butler.collections.remove_from_chain("chain", []) 

1665 self._check_chain(butler, ["d"]) 

1666 

1667 # Removing children that aren't in the chain does nothing 

1668 butler.collections.remove_from_chain("chain", ["a", "chain"]) 

1669 self._check_chain(butler, ["d"]) 

1670 

1671 self._test_common_chain_functionality( 

1672 butler, butler.collections.remove_from_chain, skip_cycle_check=True 

1673 ) 

1674 

1675 def _setup_to_test_collection_chain(self) -> Butler: 

1676 butler = self.create_empty_butler(writeable=True) 

1677 

1678 butler.collections.register("chain", CollectionType.CHAINED) 

1679 

1680 runs = ["a", "b", "c", "d"] 

1681 for run in runs: 

1682 butler.collections.register(run) 

1683 

1684 butler.collections.register("staticchain", CollectionType.CHAINED) 

1685 butler.collections.redefine_chain("staticchain", ["a", "b"]) 

1686 

1687 return butler 

1688 

1689 def _check_chain(self, butler: Butler, expected: list[str]) -> None: 

1690 children = butler.collections.get_info("chain").children 

1691 self.assertEqual(expected, list(children)) 

1692 

1693 def _test_common_chain_functionality( 

1694 self, butler, func: Callable[[str, str | list[str]], Any], *, skip_cycle_check=False 

1695 ) -> None: 

1696 # Missing parent collection 

1697 with self.assertRaises(MissingCollectionError): 

1698 func("doesnotexist", []) 

1699 # Missing child collection 

1700 with self.assertRaises(MissingCollectionError): 

1701 func("chain", ["doesnotexist"]) 

1702 # Forbid operations on non-chained collections 

1703 with self.assertRaises(CollectionTypeError): 

1704 func("d", ["a"]) 

1705 

1706 # Prevent collection cycles 

1707 if not skip_cycle_check: 

1708 butler.collections.register("chain2", CollectionType.CHAINED) 

1709 func("chain2", "chain") 

1710 with self.assertRaises(CollectionCycleError): 

1711 func("chain", "chain2") 

1712 

1713 # Make sure none of the earlier operations interfered with unrelated 

1714 # chains. 

1715 self.assertEqual(["a", "b"], list(butler.collections.get_info("staticchain").children)) 

1716 

1717 with butler._caching_context(): 

1718 with self.assertRaisesRegex(RuntimeError, "Chained collection modification not permitted"): 

1719 func("chain", "a") 

1720 

1721 def test_transfer_dimension_records_from(self) -> None: 

1722 source_butler = self.create_empty_butler(writeable=True) 

1723 source_butler.import_(filename=_get_test_data_path("lsstcam-subset.yaml")) 

1724 

1725 visit_id = 2025120200439 

1726 exposure_id = visit_id 

1727 target_butler = self.enterContext(create_populated_sqlite_registry()) 

1728 target_butler.transfer_dimension_records_from( 

1729 source_butler, 

1730 [ 

1731 # Should trigger the lookup of visit and all its associated 

1732 # "populated_by" records (visit_detector_region, 

1733 # visit_definition, etc.) 

1734 DataCoordinate.standardize( 

1735 {"instrument": "LSSTCam", "visit": visit_id, "detector": 10}, 

1736 universe=source_butler.dimensions, 

1737 ), 

1738 # Shouldn't add any records to the lookup. 

1739 DataCoordinate.make_empty(source_butler.dimensions), 

1740 ], 

1741 ) 

1742 

1743 def _fetch_record(dimension: str) -> DimensionRecord: 

1744 records = target_butler.query_dimension_records(dimension) 

1745 self.assertEqual(len(records), 1) 

1746 return records[0] 

1747 

1748 visit = _fetch_record("visit") 

1749 self.assertEqual(visit.id, visit_id) 

1750 self.assertEqual(visit.day_obs, 20251202) 

1751 self.assertEqual(visit.target_name, "lowdust") 

1752 self.assertEqual(visit.seq_num, 439) 

1753 original_visit = source_butler.query_dimension_records("visit", instrument="LSSTCam", visit=visit_id)[ 

1754 0 

1755 ] 

1756 self.assertEqual(visit.region, original_visit.region) 

1757 self.assertEqual(visit.timespan, original_visit.timespan) 

1758 

1759 visit_detector_region = _fetch_record("visit_detector_region") 

1760 self.assertEqual(visit_detector_region.instrument, "LSSTCam") 

1761 self.assertEqual(visit_detector_region.detector, 10) 

1762 self.assertEqual(visit_detector_region.visit, visit_id) 

1763 original_visit_detector_region = source_butler.query_dimension_records( 

1764 "visit_detector_region", instrument="LSSTCam", visit=visit_id, detector=10 

1765 )[0] 

1766 self.assertEqual(visit_detector_region.region, original_visit_detector_region.region) 

1767 

1768 visit_definition = _fetch_record("visit_definition") 

1769 self.assertEqual(visit_definition.instrument, "LSSTCam") 

1770 self.assertEqual(visit_definition.exposure, 2025120200439) 

1771 self.assertEqual(visit_definition.visit, visit_id) 

1772 

1773 # The matching exposure record should have been pulled in via 

1774 # visit -> visit_definition. 

1775 exposure = _fetch_record("exposure") 

1776 self.assertEqual(exposure.instrument, "LSSTCam") 

1777 self.assertEqual(exposure.id, 2025120200439) 

1778 self.assertEqual(exposure.obs_id, "MC_O_20251202_000439") 

1779 original_exposure = source_butler.query_dimension_records( 

1780 "exposure", instrument="LSSTCam", exposure=exposure_id 

1781 )[0] 

1782 self.assertEqual(exposure.timespan, original_exposure.timespan) 

1783 

1784 group = _fetch_record("group") 

1785 self.assertEqual(group.instrument, "LSSTCam") 

1786 self.assertEqual(group.name, "2025-12-03T07:58:10.858") 

1787 

1788 visit_system_memberships = target_butler.query_dimension_records("visit_system_membership") 

1789 visit_system_memberships.sort(key=lambda record: record.visit_system) 

1790 self.assertEqual(len(visit_system_memberships), 2) 

1791 self.assertEqual(visit_system_memberships[0].visit_system, 0) 

1792 self.assertEqual(visit_system_memberships[1].visit_system, 2) 

1793 self.assertEqual(visit_system_memberships[0].visit, visit_id) 

1794 self.assertEqual(visit_system_memberships[1].visit, visit_id) 

1795 

1796 visit_systems = target_butler.query_dimension_records("visit_system") 

1797 visit_systems.sort(key=lambda record: record.id) 

1798 visit_system_memberships.sort(key=lambda record: record.visit_system) 

1799 self.assertEqual(visit_systems[0].id, 0) 

1800 self.assertEqual(visit_systems[1].id, 2) 

1801 self.assertEqual(visit_systems[0].name, "one-to-one") 

1802 self.assertEqual(visit_systems[1].name, "by-seq-start-end") 

1803 

1804 

1805class FileDatastoreButlerTests(ButlerTests): 

1806 """Common tests and specialization of ButlerTests for butlers backed 

1807 by datastores that inherit from FileDatastore. 

1808 """ 

1809 

1810 trustModeSupported = True 

1811 

1812 def checkFileExists(self, root: str | ResourcePath, relpath: str | ResourcePath) -> bool: 

1813 """Check if file exists at a given path (relative to root). 

1814 

1815 Test testPutTemplates verifies actual physical existance of the files 

1816 in the requested location. 

1817 """ 

1818 uri = ResourcePath(root, forceDirectory=True) 

1819 return uri.join(relpath).exists() 

1820 

1821 def testPutTemplates(self) -> None: 

1822 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1823 butler = self.create_empty_butler(run=self.default_run) 

1824 

1825 # Add needed Dimensions 

1826 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1827 butler.registry.insertDimensionData( 

1828 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1829 ) 

1830 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

1831 butler.registry.insertDimensionData( 

1832 "visit", 

1833 { 

1834 "instrument": "DummyCamComp", 

1835 "id": 423, 

1836 "name": "v423", 

1837 "physical_filter": "d-r", 

1838 "day_obs": 20250101, 

1839 }, 

1840 ) 

1841 butler.registry.insertDimensionData( 

1842 "visit", 

1843 { 

1844 "instrument": "DummyCamComp", 

1845 "id": 425, 

1846 "name": "v425", 

1847 "physical_filter": "d-r", 

1848 "day_obs": 20250101, 

1849 }, 

1850 ) 

1851 

1852 # Create and store a dataset 

1853 metric = makeExampleMetrics() 

1854 

1855 # Create two almost-identical DatasetTypes (both will use default 

1856 # template) 

1857 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

1858 butler.registry.registerDatasetType(DatasetType("metric1", dimensions, storageClass)) 

1859 butler.registry.registerDatasetType(DatasetType("metric2", dimensions, storageClass)) 

1860 butler.registry.registerDatasetType(DatasetType("metric3", dimensions, storageClass)) 

1861 

1862 dataId1 = {"instrument": "DummyCamComp", "visit": 423} 

1863 dataId2 = {"instrument": "DummyCamComp", "visit": 423, "physical_filter": "d-r"} 

1864 

1865 # Put with exactly the data ID keys needed 

1866 ref = butler.put(metric, "metric1", dataId1) 

1867 uri = butler.getURI(ref) 

1868 self.assertTrue(uri.exists()) 

1869 self.assertTrue( 

1870 uri.unquoted_path.endswith(f"{self.default_run}/metric1/??#?/d-r/DummyCamComp_423.pickle") 

1871 ) 

1872 

1873 # Check the template based on dimensions 

1874 if hasattr(butler._datastore, "templates"): 

1875 butler._datastore.templates.validateTemplates([ref]) 

1876 

1877 # Put with extra data ID keys (physical_filter is an optional 

1878 # dependency); should not change template (at least the way we're 

1879 # defining them to behave now; the important thing is that they 

1880 # must be consistent). 

1881 ref = butler.put(metric, "metric2", dataId2) 

1882 uri = butler.getURI(ref) 

1883 self.assertTrue(uri.exists()) 

1884 self.assertTrue( 

1885 uri.unquoted_path.endswith(f"{self.default_run}/metric2/d-r/DummyCamComp_v423.pickle") 

1886 ) 

1887 

1888 # Check the template based on dimensions 

1889 if hasattr(butler._datastore, "templates"): 

1890 butler._datastore.templates.validateTemplates([ref]) 

1891 

1892 # Use a template that has a typo in dimension record metadata. 

1893 # Easier to test with a butler that has a ref with records attached. 

1894 template = FileTemplate("a/{visit.name}/{id}_{visit.namex:?}.fits") 

1895 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"): 

1896 path = template.format(ref) 

1897 self.assertEqual(path, f"a/v423/{ref.id}_fits") 

1898 

1899 template = FileTemplate("a/{visit.name}/{id}_{visit.namex}.fits") 

1900 with self.assertRaises(KeyError): 

1901 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"): 

1902 template.format(ref) 

1903 

1904 # Now use a file template that will not result in unique filenames 

1905 with self.assertRaises(FileTemplateValidationError): 

1906 butler.put(metric, "metric3", dataId1) 

1907 

1908 def testImportExport(self) -> None: 

1909 # Run put/get tests just to create and populate a repo. 

1910 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1911 self.runImportExportTest(storageClass) 

1912 

1913 @unittest.expectedFailure 

1914 def testImportExportVirtualComposite(self) -> None: 

1915 # Run put/get tests just to create and populate a repo. 

1916 storageClass = self.storageClassFactory.getStorageClass("StructuredComposite") 

1917 self.runImportExportTest(storageClass) 

1918 

1919 def runImportExportTest(self, storageClass: StorageClass) -> None: 

1920 """Test exporting and importing. 

1921 

1922 This test does an export to a temp directory and an import back 

1923 into a new temp directory repo. It does not assume a posix datastore. 

1924 """ 

1925 exportButler = self.runPutGetTest(storageClass, "test_metric") 

1926 

1927 # Test that we must have a file extension. 

1928 with self.assertRaises(ValueError): 

1929 with exportButler.export(filename="dump", directory=".") as export: 

1930 pass 

1931 

1932 # Test that unknown format is not allowed. 

1933 with self.assertRaises(ValueError): 

1934 with exportButler.export(filename="dump.fits", directory=".") as export: 

1935 pass 

1936 

1937 # Test that the repo actually has at least one dataset. 

1938 datasets = list(exportButler.registry.queryDatasets(..., collections=...)) 

1939 self.assertGreater(len(datasets), 0) 

1940 # Add a DimensionRecord that's unused by those datasets. 

1941 skymapRecord = {"name": "example_skymap", "hash": (50).to_bytes(8, byteorder="little")} 

1942 exportButler.registry.insertDimensionData("skymap", skymapRecord) 

1943 # Export and then import datasets. 

1944 with safeTestTempDir(TESTDIR) as exportDir: 

1945 exportFile = os.path.join(exportDir, "exports.yaml") 

1946 with exportButler.export(filename=exportFile, directory=exportDir, transfer="auto") as export: 

1947 export.saveDatasets(datasets) 

1948 # Export the same datasets again. This should quietly do 

1949 # nothing because of internal deduplication, and it shouldn't 

1950 # complain about being asked to export the "htm7" elements even 

1951 # though there aren't any in these datasets or in the database. 

1952 export.saveDatasets(datasets, elements=["htm7"]) 

1953 # Save one of the data IDs again; this should be harmless 

1954 # because of internal deduplication. 

1955 export.saveDataIds([datasets[0].dataId]) 

1956 # Save some dimension records directly. 

1957 export.saveDimensionData("skymap", [skymapRecord]) 

1958 self.assertTrue(os.path.exists(exportFile)) 

1959 with safeTestTempDir(TESTDIR) as importDir: 

1960 # We always want this to be a local posix butler 

1961 Butler.makeRepo(importDir, config=Config(os.path.join(TESTDIR, "config/basic/butler.yaml"))) 

1962 # Calling script.butlerImport tests the implementation of the 

1963 # butler command line interface "import" subcommand. Functions 

1964 # in the script folder are generally considered protected and 

1965 # should not be used as public api. 

1966 with open(exportFile) as f: 

1967 script.butlerImport( 

1968 importDir, 

1969 export_file=f, 

1970 directory=exportDir, 

1971 transfer="auto", 

1972 skip_dimensions=None, 

1973 ) 

1974 importButler = Butler.from_config(importDir, run=self.default_run) 

1975 self.enterContext(importButler) 

1976 for ref in datasets: 

1977 with self.subTest(ref=repr(ref)): 

1978 # Test for existence by passing in the DatasetType and 

1979 # data ID separately, to avoid lookup by dataset_id. 

1980 self.assertTrue(importButler.exists(ref.datasetType, ref.dataId)) 

1981 self.assertEqual( 

1982 list(importButler.registry.queryDimensionRecords("skymap")), 

1983 [importButler.dimensions["skymap"].RecordClass(**skymapRecord)], 

1984 ) 

1985 

1986 def testRemoveRuns(self) -> None: 

1987 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1988 butler = self.create_empty_butler(writeable=True) 

1989 # Load registry data with dimensions to hang datasets off of. 

1990 butler.import_(filename=ResourcePath("resource://lsst.daf.butler/tests/registry_data/base.yaml")) 

1991 # Add some RUN-type collection. 

1992 run1 = "run1" 

1993 butler.collections.register(run1) 

1994 run2 = "run2" 

1995 butler.collections.register(run2) 

1996 # put a dataset in each 

1997 metric = makeExampleMetrics() 

1998 dimensions = butler.dimensions.conform(["instrument", "physical_filter"]) 

1999 datasetType = self.addDatasetType( 

2000 "prune_collections_test_dataset", dimensions, storageClass, butler.registry 

2001 ) 

2002 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1) 

2003 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2) 

2004 uri1 = butler.getURI(ref1) 

2005 uri2 = butler.getURI(ref2) 

2006 

2007 # Put one of the runs in a chain. 

2008 butler.collections.register("Chain", CollectionType.CHAINED) 

2009 butler.collections.extend_chain("Chain", run1) 

2010 

2011 with self.assertRaises(OrphanedRecordError): 

2012 butler.registry.removeDatasetType(datasetType.name) 

2013 

2014 # Remove a non-run. 

2015 with self.assertRaises(TypeError): 

2016 butler.removeRuns(["Chain"]) 

2017 

2018 # Remove without unlinking from chain should fail. 

2019 with self.assertRaises(IntegrityError): 

2020 butler.removeRuns([run1]) 

2021 

2022 # Remove from both runs. No longer use unstore parameter since it 

2023 # always purges. 

2024 butler.removeRuns([run1, run2], unlink_from_chains=True) 

2025 

2026 # Should be nothing in registry for either one, and datastore should 

2027 # not think either exists. 

2028 with self.assertRaises(MissingCollectionError): 

2029 butler.collections.get_info(run1) 

2030 with self.assertRaises(MissingCollectionError): 

2031 butler.collections.get_info(run1) 

2032 self.assertFalse(butler.stored(ref1)) 

2033 self.assertFalse(butler.stored(ref2)) 

2034 # We always unstore so both URIs should be gone. 

2035 self.assertFalse(uri1.exists()) 

2036 self.assertFalse(uri2.exists()) 

2037 

2038 # Now that the collections have been pruned we can remove the 

2039 # dataset type 

2040 butler.registry.removeDatasetType(datasetType.name) 

2041 

2042 with self.assertLogs("lsst.daf.butler.registry", "INFO") as cm: 

2043 butler.registry.removeDatasetType(("test*", "test*")) 

2044 self.assertIn("not defined", "\n".join(cm.output)) 

2045 

2046 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None: 

2047 """Simulate an external actor removing a file outside of Butler's 

2048 knowledge. 

2049 

2050 Subclasses may override to handle more complicated datastore 

2051 configurations. 

2052 """ 

2053 uri = butler.getURI(ref) 

2054 uri.remove() 

2055 datastore = cast(FileDatastore, butler._datastore) 

2056 datastore.cacheManager.remove_from_cache(ref) 

2057 

2058 def testPruneDatasets(self) -> None: 

2059 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2060 butler = self.create_empty_butler(writeable=True) 

2061 # Load registry data with dimensions to hang datasets off of. 

2062 butler.import_(filename=_get_test_data_path("base.yaml")) 

2063 # Add some RUN-type collections. 

2064 run1 = "run1" 

2065 butler.collections.register(run1) 

2066 run2 = "run2" 

2067 butler.collections.register(run2) 

2068 # put some datasets. ref1 and ref2 have the same data ID, and are in 

2069 # different runs. ref3 has a different data ID. 

2070 metric = makeExampleMetrics() 

2071 dimensions = butler.dimensions.conform(["instrument", "physical_filter"]) 

2072 datasetType = self.addDatasetType( 

2073 "prune_collections_test_dataset", dimensions, storageClass, butler.registry 

2074 ) 

2075 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1) 

2076 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2) 

2077 ref3 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-R1"}, run=run1) 

2078 

2079 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2080 for ref, stored in many_stored.items(): 

2081 self.assertTrue(stored, f"Ref {ref} should be stored") 

2082 

2083 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2084 for ref, exists in many_exists.items(): 

2085 self.assertTrue(exists, f"Checking ref {ref} exists.") 

2086 self.assertEqual(exists, DatasetExistence.VERIFIED, f"Ref {ref} should be stored") 

2087 

2088 # Simple prune. 

2089 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True) 

2090 self.assertFalse(butler.exists(ref1.datasetType, ref1.dataId, collections=run1)) 

2091 

2092 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2093 for ref, stored in many_stored.items(): 

2094 self.assertFalse(stored, f"Ref {ref} should not be stored") 

2095 

2096 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2097 for ref, exists in many_exists.items(): 

2098 self.assertEqual(exists, DatasetExistence.UNRECOGNIZED, f"Ref {ref} should not be stored") 

2099 

2100 # Put data back. 

2101 ref1_new = butler.put(metric, ref1) 

2102 self.assertEqual(ref1_new, ref1) # Reuses original ID. 

2103 ref2 = butler.put(metric, ref2) 

2104 

2105 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2106 self.assertTrue(many_stored[ref1]) 

2107 self.assertTrue(many_stored[ref2]) 

2108 self.assertFalse(many_stored[ref3]) 

2109 

2110 ref3 = butler.put(metric, ref3) 

2111 

2112 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2113 for ref, exists in many_exists.items(): 

2114 self.assertTrue(exists, f"Ref {ref} should not be stored") 

2115 

2116 # Clear out the datasets from registry and start again. 

2117 refs = [ref1, ref2, ref3] 

2118 butler.pruneDatasets(refs, purge=True, unstore=True) 

2119 for ref in refs: 

2120 butler.put(metric, ref) 

2121 

2122 # Confirm we can retrieve deferred. 

2123 dref1 = butler.getDeferred(ref1) # known and exists 

2124 metric1 = dref1.get() 

2125 self.assertEqual(metric1, metric) 

2126 

2127 # Test different forms of file availability. 

2128 # Need to be in a state where: 

2129 # - one ref just has registry record. 

2130 # - one ref has a missing file but a datastore record. 

2131 # - one ref has a missing datastore record but file is there. 

2132 # - one ref does not exist anywhere. 

2133 # Do not need to test a ref that has everything since that is tested 

2134 # above. 

2135 ref0 = DatasetRef( 

2136 datasetType, 

2137 DataCoordinate.standardize( 

2138 {"instrument": "Cam1", "physical_filter": "Cam1-G"}, universe=butler.dimensions 

2139 ), 

2140 run=run1, 

2141 ) 

2142 

2143 # Delete from datastore and retain in Registry. 

2144 butler.pruneDatasets([ref1], purge=False, unstore=True, disassociate=False) 

2145 

2146 # File has been removed. 

2147 self.remove_dataset_out_of_band(butler, ref2) 

2148 

2149 # Datastore has lost track. 

2150 butler._datastore.forget([ref3]) 

2151 

2152 # First test with a standard butler. 

2153 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True) 

2154 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2155 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED) 

2156 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE) 

2157 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED) 

2158 

2159 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=False) 

2160 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2161 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED | DatasetExistence._ASSUMED) 

2162 self.assertEqual(exists_many[ref2], DatasetExistence.KNOWN) 

2163 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ASSUMED) 

2164 self.assertTrue(exists_many[ref2]) 

2165 

2166 # Check that per-ref query gives the same answer as many query. 

2167 for ref, exists in exists_many.items(): 

2168 self.assertEqual(butler.exists(ref, full_check=False), exists) 

2169 

2170 # Get deferred checks for existence before it allows it to be 

2171 # retrieved. 

2172 with self.assertRaises(LookupError): 

2173 butler.getDeferred(ref3) # not known, file exists 

2174 dref2 = butler.getDeferred(ref2) # known but file missing 

2175 with self.assertRaises(FileNotFoundError): 

2176 dref2.get() 

2177 

2178 # Test again with a trusting butler. 

2179 if self.trustModeSupported: 

2180 butler._datastore.trustGetRequest = True 

2181 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True) 

2182 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2183 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED) 

2184 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE) 

2185 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ARTIFACT) 

2186 

2187 # When trusting we can get a deferred dataset handle that is not 

2188 # known but does exist. 

2189 dref3 = butler.getDeferred(ref3) 

2190 metric3 = dref3.get() 

2191 self.assertEqual(metric3, metric) 

2192 

2193 # Check that per-ref query gives the same answer as many query. 

2194 for ref, exists in exists_many.items(): 

2195 self.assertEqual(butler.exists(ref, full_check=True), exists) 

2196 

2197 # Create a ref that surprisingly has the UUID of an existing ref 

2198 # but is not the same. 

2199 ref_bad = DatasetRef(datasetType, dataId=ref3.dataId, run=ref3.run, id=ref2.id) 

2200 with self.assertRaises(ValueError): 

2201 butler.exists(ref_bad) 

2202 

2203 # Create a ref that has a compatible storage class. 

2204 ref_compat = ref2.overrideStorageClass("StructuredDataDict") 

2205 exists = butler.exists(ref_compat) 

2206 self.assertEqual(exists, exists_many[ref2]) 

2207 

2208 # Remove everything and start from scratch. 

2209 butler._datastore.trustGetRequest = False 

2210 butler.pruneDatasets(refs, purge=True, unstore=True) 

2211 for ref in refs: 

2212 butler.put(metric, ref) 

2213 

2214 # These tests mess directly with the trash table and can leave the 

2215 # datastore in an odd state. Do them at the end. 

2216 # Check that in normal mode, deleting the record will lead to 

2217 # trash not touching the file. 

2218 uri1 = butler.getURI(ref1) 

2219 butler._datastore.bridge.moveToTrash( 

2220 [ref1], transaction=None 

2221 ) # Update the dataset_location table 

2222 butler._datastore.forget([ref1]) 

2223 butler._datastore.trash(ref1) 

2224 butler._datastore.emptyTrash() 

2225 self.assertTrue(uri1.exists()) 

2226 uri1.remove() # Clean it up. 

2227 

2228 # Simulate execution butler setup by deleting the datastore 

2229 # record but keeping the file around and trusting. 

2230 butler._datastore.trustGetRequest = True 

2231 uris = butler.get_many_uris([ref2, ref3]) 

2232 uri2 = uris[ref2].primaryURI 

2233 uri3 = uris[ref3].primaryURI 

2234 self.assertTrue(uri2.exists()) 

2235 self.assertTrue(uri3.exists()) 

2236 

2237 # Remove the datastore record. 

2238 butler._datastore.bridge.moveToTrash( 

2239 [ref2], transaction=None 

2240 ) # Update the dataset_location table 

2241 butler._datastore.forget([ref2]) 

2242 self.assertTrue(uri2.exists()) 

2243 butler._datastore.trash([ref2, ref3]) 

2244 # Immediate removal for ref2 file 

2245 self.assertFalse(uri2.exists()) 

2246 # But ref3 has to wait for the empty. 

2247 self.assertTrue(uri3.exists()) 

2248 butler._datastore.emptyTrash() 

2249 self.assertFalse(uri3.exists()) 

2250 

2251 # Clear out the datasets from registry. 

2252 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True) 

2253 

2254 def test_butler_metrics(self): 

2255 """Test that metrics are collected.""" 

2256 run = "test_run" 

2257 metrics = ButlerMetrics() 

2258 butler, datasetType = self.create_butler( 

2259 run, "MetricsExampleModelProvenance", "prov_metric", metrics=metrics 

2260 ) 

2261 data = MetricsExampleModel( 

2262 summary={"AM1": 5.2, "AM2": 30.6}, 

2263 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

2264 data=[563, 234, 456.7, 752, 8, 9, 27], 

2265 ) 

2266 

2267 data_ref = butler.put(data, datasetType, visit=424, instrument="DummyCamComp") 

2268 butler.get(data_ref) 

2269 butler.get(data_ref) 

2270 self.assertEqual(metrics.n_get, 2) 

2271 self.assertGreater(metrics.time_in_get, 0.0) 

2272 self.assertEqual(metrics.n_put, 1) 

2273 self.assertGreater(metrics.time_in_put, 0.0) 

2274 

2275 deferred = butler.getDeferred(data_ref) 

2276 deferred.get() 

2277 self.assertEqual(metrics.n_get, 3) 

2278 

2279 with butler.record_metrics() as new: 

2280 data_ref_2 = butler.put(data, datasetType, visit=425, instrument="DummyCamComp") 

2281 butler.get(data_ref) 

2282 

2283 butler.pruneDatasets([data_ref, data_ref_2], purge=True, unstore=True) 

2284 with ResourcePath.temporary_uri(suffix=".json") as tmpFile: 

2285 tmpFile.write(data.model_dump_json().encode()) 

2286 refs = [ 

2287 DatasetRef(datasetType, data_ref_2.dataId, run), 

2288 DatasetRef(datasetType, data_ref.dataId, run), 

2289 ] 

2290 datasets = [FileDataset(path=tmpFile, refs=refs)] 

2291 butler.ingest(*datasets, transfer="copy") 

2292 

2293 self.assertEqual(new.n_get, 1) 

2294 self.assertEqual(new.n_put, 1) 

2295 self.assertEqual(new.n_ingest, 2) 

2296 

2297 

2298class PosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2299 """PosixDatastore specialization of a butler""" 

2300 

2301 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2302 fullConfigKey: str | None = ".datastore.formatters" 

2303 validationCanFail = True 

2304 datastoreStr = ["/tmp"] 

2305 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"] 

2306 registryStr = "/gen3.sqlite3" 

2307 

2308 def testPathConstructor(self) -> None: 

2309 """Independent test of constructor using PathLike.""" 

2310 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

2311 self.enterContext(butler) 

2312 self.assertIsInstance(butler, Butler) 

2313 

2314 # And again with a Path object with the butler yaml 

2315 path = pathlib.Path(self.tmpConfigFile) 

2316 butler = Butler.from_config(path, writeable=False) 

2317 self.enterContext(butler) 

2318 self.assertIsInstance(butler, Butler) 

2319 

2320 # And again with a Path object without the butler yaml 

2321 # (making sure we skip it if the tmp config doesn't end 

2322 # in butler.yaml -- which is the case for a subclass) 

2323 if self.tmpConfigFile.endswith("butler.yaml"): 

2324 path = pathlib.Path(os.path.dirname(self.tmpConfigFile)) 

2325 butler = Butler.from_config(path, writeable=False) 

2326 self.enterContext(butler) 

2327 self.assertIsInstance(butler, Butler) 

2328 

2329 def testExportTransferCopy(self) -> None: 

2330 """Test local export using all transfer modes""" 

2331 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2332 exportButler = self.runPutGetTest(storageClass, "test_metric") 

2333 # Test that the repo actually has at least one dataset. 

2334 datasets = list(exportButler.registry.queryDatasets(..., collections=...)) 

2335 self.assertGreater(len(datasets), 0) 

2336 uris = [exportButler.getURI(d) for d in datasets] 

2337 assert isinstance(exportButler._datastore, FileDatastore) 

2338 datastoreRoot = exportButler.get_datastore_roots()[exportButler.get_datastore_names()[0]] 

2339 

2340 pathsInStore = [uri.relative_to(datastoreRoot) for uri in uris] 

2341 

2342 for path in pathsInStore: 

2343 # Assume local file system 

2344 assert path is not None 

2345 self.assertTrue(self.checkFileExists(datastoreRoot, path), f"Checking path {path}") 

2346 

2347 for transfer in ("copy", "link", "symlink", "relsymlink"): 

2348 with safeTestTempDir(TESTDIR) as exportDir: 

2349 with exportButler.export(directory=exportDir, format="yaml", transfer=transfer) as export: 

2350 export.saveDatasets(datasets) 

2351 for path in pathsInStore: 

2352 assert path is not None 

2353 self.assertTrue( 

2354 self.checkFileExists(exportDir, path), 

2355 f"Check that mode {transfer} exported files", 

2356 ) 

2357 

2358 def testPytypeCoercion(self) -> None: 

2359 """Test python type coercion on Butler.get and put.""" 

2360 # Store some data with the normal example storage class. 

2361 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2362 datasetTypeName = "test_metric" 

2363 butler = self.runPutGetTest(storageClass, datasetTypeName) 

2364 

2365 dataId = {"instrument": "DummyCamComp", "visit": 423} 

2366 metric = butler.get(datasetTypeName, dataId=dataId) 

2367 self.assertEqual(get_full_type_name(metric), "lsst.daf.butler.tests.MetricsExample") 

2368 

2369 datasetType_ori = butler.get_dataset_type(datasetTypeName) 

2370 self.assertEqual(datasetType_ori.storageClass.name, "StructuredDataNoComponents") 

2371 

2372 # Now need to hack the registry dataset type definition. 

2373 # There is no API for this. 

2374 assert isinstance(butler._registry, SqlRegistry) 

2375 manager = butler._registry._managers.datasets 

2376 assert hasattr(manager, "_db") and hasattr(manager, "_static") 

2377 manager._db.update( 

2378 manager._static.dataset_type, 

2379 {"name": datasetTypeName}, 

2380 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataNoComponentsModel"}, 

2381 ) 

2382 

2383 # Force reset of dataset type cache 

2384 butler.registry.refresh() 

2385 

2386 datasetType_new = butler.get_dataset_type(datasetTypeName) 

2387 self.assertEqual(datasetType_new.name, datasetType_ori.name) 

2388 self.assertEqual(datasetType_new.storageClass.name, "StructuredDataNoComponentsModel") 

2389 

2390 metric_model = butler.get(datasetTypeName, dataId=dataId) 

2391 self.assertNotEqual(type(metric_model), type(metric)) 

2392 self.assertEqual(get_full_type_name(metric_model), "lsst.daf.butler.tests.MetricsExampleModel") 

2393 

2394 # Put the model and read it back to show that everything now 

2395 # works as normal. 

2396 metric_ref = butler.put(metric_model, datasetTypeName, dataId=dataId, visit=424) 

2397 metric_model_new = butler.get(metric_ref) 

2398 self.assertEqual(metric_model_new, metric_model) 

2399 

2400 # Hack the storage class again to something that will fail on the 

2401 # get with no conversion class. 

2402 manager._db.update( 

2403 manager._static.dataset_type, 

2404 {"name": datasetTypeName}, 

2405 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataListYaml"}, 

2406 ) 

2407 butler.registry.refresh() 

2408 

2409 with self.assertRaises(ValueError): 

2410 butler.get(datasetTypeName, dataId=dataId) 

2411 

2412 def test_provenance(self): 

2413 """Test that provenance is attached on put.""" 

2414 run = "test_run" 

2415 butler, datasetType = self.create_butler(run, "MetricsExampleModelProvenance", "prov_metric") 

2416 metric = MetricsExampleModel( 

2417 summary={"AM1": 5.2, "AM2": 30.6}, 

2418 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

2419 data=[563, 234, 456.7, 752, 8, 9, 27], 

2420 ) 

2421 # Provenance can be attached to the object being put. Whether 

2422 # it is or not is dependent on the formatter. For this test we 

2423 # copy on adding provenance to ensure they differ. 

2424 self.assertIsNone(metric.dataset_id) 

2425 metric_ref = butler.put(metric, datasetType, visit=424, instrument="DummyCamComp") 

2426 self.assertIsNone(metric.dataset_id) 

2427 metric_2 = butler.get(metric_ref) 

2428 self.assertEqual(metric_2.data, metric.data) 

2429 self.assertEqual(metric_2.dataset_id, metric_ref.id) 

2430 self.assertIsNone(metric_2.provenance) 

2431 

2432 # Put with provenance. 

2433 prov = DatasetProvenance(quantum_id=uuid.uuid4()) 

2434 prov.add_input(metric_ref) 

2435 prov.add_extra_provenance(metric_ref.id, {"answer": 42}) 

2436 metric_ref2 = butler.put(metric, datasetType, visit=423, instrument="DummyCamComp", provenance=prov) 

2437 metric_3 = butler.get(metric_ref2) 

2438 self.assertEqual(metric_3.provenance, prov) 

2439 

2440 # Check that we can extract provenance from dict form. 

2441 prov_dict = prov.to_flat_dict(metric_ref2) 

2442 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler) 

2443 self.assertEqual(ref_from_prov, metric_ref2) 

2444 # Direct __eq__ of the provenance does not work because one side 

2445 # includes dimension records. 

2446 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs}) 

2447 self.assertEqual(prov_from_prov.quantum_id, prov.quantum_id) 

2448 self.assertEqual(prov_from_prov.extras, prov.extras) 

2449 

2450 # Force a bad ID into the dict. 

2451 prov_dict["id"] = uuid.uuid4() 

2452 with self.assertRaises(ValueError): 

2453 DatasetProvenance.from_flat_dict(prov_dict, butler) 

2454 del prov_dict["id"] 

2455 prov_dict["input 0 id"] = uuid.uuid4() 

2456 with self.assertRaises(ValueError): 

2457 DatasetProvenance.from_flat_dict(prov_dict, butler) 

2458 

2459 # Check that simple types can be reconstructed with non-standard 

2460 # separators. 

2461 prov_dict = prov.to_flat_dict(metric_ref2, prefix="XYZ", sep="😎", simple_types=True) 

2462 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler) 

2463 self.assertEqual(ref_from_prov, metric_ref2) 

2464 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs}) 

2465 

2466 with self.assertRaises(ValueError): 

2467 DatasetProvenance.from_flat_dict({"unknown": 42}, butler) 

2468 

2469 def test_specialized_file_datasets_functions(self): 

2470 """Test a workflow used in Prompt Processing where we export datasets 

2471 from one repository and write them in-place to the datastore of 

2472 another, without immediately inserting registry entries for the 

2473 datasets. 

2474 """ 

2475 repo = MetricTestRepo.create_from_butler( 

2476 self.create_empty_butler(writeable=True), 

2477 self.tmpConfigFile, 

2478 "StructuredCompositeReadCompNoDisassembly", 

2479 ) 

2480 source_butler = repo.butler 

2481 

2482 # Test writing outputs to a FileDatastore. 

2483 with tempfile.TemporaryDirectory() as tempdir: 

2484 target_repo_config = Butler.makeRepo(tempdir) 

2485 refs = [repo.ref1, repo.ref2] 

2486 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs) 

2487 self.assertEqual(len(datasets), 2) 

2488 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets}) 

2489 for dataset in datasets: 

2490 path = ResourcePath(dataset.path, forceAbsolute=False) 

2491 # Paths should be relative paths to the target datastore. 

2492 self.assertFalse(path.isabs()) 

2493 # Files should have been copied into the target datastore 

2494 self.assertTrue(ResourcePath(tempdir).join(path).exists()) 

2495 

2496 # Make sure the target Butler can ingest the datasets. 

2497 target_butler = Butler(target_repo_config, writeable=True) 

2498 self.enterContext(target_butler) 

2499 target_butler.transfer_dimension_records_from(source_butler, refs) 

2500 target_butler.ingest(*datasets, transfer=None) 

2501 self.assertIsNotNone(target_butler.get(repo.ref1)) 

2502 self.assertIsNotNone(target_butler.get(repo.ref2)) 

2503 

2504 # Giving an empty list of files is a no-op. 

2505 no_datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), []) 

2506 self.assertEqual(len(no_datasets), 0) 

2507 

2508 # Test writing outputs to a ChainedDatastore. 

2509 with tempfile.TemporaryDirectory() as tempdir: 

2510 # Set up a second dataset type, so we can split the files across 

2511 # multiple datastore roots. 

2512 dt1 = repo.datasetType 

2513 dt2 = DatasetType("other", dt1.dimensions, dt1.storageClass) 

2514 source_butler.registry.registerDatasetType(dt2) 

2515 other_ref = repo.addDataset(repo.ref1.dataId, datasetType=dt2) 

2516 config = Config.fromString( 

2517 f""" 

2518 datastore: 

2519 cls: lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore 

2520 datastore_constraints: 

2521 - constraints: 

2522 accept: 

2523 - {dt1.name} 

2524 - constraints: 

2525 accept: 

2526 - {dt2.name} 

2527 datastores: 

2528 - datastore: 

2529 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore 

2530 root: <butlerRoot>/FileDatastore_0 

2531 - datastore: 

2532 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore 

2533 root: <butlerRoot>/FileDatastore_1 

2534 """ 

2535 ) 

2536 target_repo_config = Butler.makeRepo(tempdir, config) 

2537 refs = [repo.ref1, repo.ref2, other_ref] 

2538 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs) 

2539 self.assertEqual(len(datasets), 3) 

2540 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets}) 

2541 for dataset in datasets: 

2542 path = ResourcePath(dataset.path, forceAbsolute=False) 

2543 # Paths should be relative paths to the target datastore. 

2544 self.assertFalse(path.isabs()) 

2545 # Files should have been split up between the two datastores 

2546 # in the chain. 

2547 datastore_root = ResourcePath(tempdir) 

2548 if dataset.refs[0].datasetType.name == dt1.name: 

2549 datastore_root = datastore_root.join("FileDatastore_0") 

2550 else: 

2551 datastore_root = datastore_root.join("FileDatastore_1") 

2552 self.assertTrue(datastore_root.join(path).exists()) 

2553 

2554 # Make sure the target Butler can ingest the datasets. 

2555 target_butler = Butler(target_repo_config, writeable=True) 

2556 self.enterContext(target_butler) 

2557 target_butler.transfer_dimension_records_from(source_butler, refs) 

2558 target_butler.ingest(*datasets, transfer=None) 

2559 self.assertIsNotNone(target_butler.get(repo.ref1)) 

2560 self.assertIsNotNone(target_butler.get(repo.ref2)) 

2561 self.assertIsNotNone(target_butler.get(other_ref)) 

2562 

2563 def test_temporary_for_ingest(self) -> None: 

2564 """Test the `lsst.daf.butler._rubin.ingest_from_temporary` module.""" 

2565 with self.create_empty_butler("example_run") as butler: 

2566 dataset_type = DatasetType("example", butler.dimensions.empty, "StructuredDataDict") 

2567 butler.registry.registerDatasetType(dataset_type) 

2568 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(butler.dimensions), "example_run") 

2569 with TemporaryForIngest(butler, ref) as temporary: 

2570 temporary.path.write(b"three: 3") 

2571 found = TemporaryForIngest.find_orphaned_temporaries_by_ref(ref, butler) 

2572 self.assertEqual(found, [temporary.path]) 

2573 self.assertIn(".tmp", temporary.ospath) 

2574 temporary.ingest() 

2575 loaded = butler.get(ref) 

2576 self.assertEqual(loaded, {"three": 3}) 

2577 

2578 

2579class PostgresPosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2580 """PosixDatastore specialization of a butler using Postgres""" 

2581 

2582 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2583 fullConfigKey = ".datastore.formatters" 

2584 validationCanFail = True 

2585 datastoreStr = ["/tmp"] 

2586 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"] 

2587 registryStr = "PostgreSQL@test" 

2588 

2589 @classmethod 

2590 def setUpClass(cls) -> None: 

2591 cls.postgresql = cls.enterClassContext(setup_postgres_test_db()) 

2592 super().setUpClass() 

2593 

2594 def setUp(self) -> None: 

2595 # Need to add a registry section to the config. 

2596 self._temp_config = False 

2597 config = Config(self.configFile) 

2598 self.postgresql.patch_butler_config(config) 

2599 with tempfile.NamedTemporaryFile("w", suffix=".yaml", delete=False) as fh: 

2600 config.dump(fh) 

2601 self.configFile = fh.name 

2602 self._temp_config = True 

2603 super().setUp() 

2604 

2605 def tearDown(self) -> None: 

2606 if self._temp_config and os.path.exists(self.configFile): 

2607 os.remove(self.configFile) 

2608 super().tearDown() 

2609 

2610 def testMakeRepo(self) -> None: 

2611 # The base class test assumes that it's using sqlite and assumes 

2612 # the config file is acceptable to sqlite. 

2613 raise unittest.SkipTest("Postgres config is not compatible with this test.") 

2614 

2615 

2616class ClonedPostgresPosixDatastoreButlerTestCase(PostgresPosixDatastoreButlerTestCase, unittest.TestCase): 

2617 """Test that Butler with a Postgres registry still works after cloning.""" 

2618 

2619 def create_butler( 

2620 self, 

2621 run: str, 

2622 storageClass: StorageClass | str, 

2623 datasetTypeName: str, 

2624 metrics: ButlerMetrics | None = None, 

2625 ) -> tuple[DirectButler, DatasetType]: 

2626 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics) 

2627 return butler.clone(run=run, metrics=metrics), datasetType 

2628 

2629 

2630class InMemoryDatastoreButlerTestCase(ButlerTests, unittest.TestCase): 

2631 """InMemoryDatastore specialization of a butler""" 

2632 

2633 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

2634 fullConfigKey = None 

2635 useTempRoot = False 

2636 validationCanFail = False 

2637 datastoreStr = ["datastore='InMemory"] 

2638 datastoreName = ["InMemoryDatastore@"] 

2639 registryStr = "/gen3.sqlite3" 

2640 

2641 def testIngest(self) -> None: 

2642 pass 

2643 

2644 def test_ingest_zip(self) -> None: 

2645 pass 

2646 

2647 

2648class ClonedSqliteButlerTestCase(InMemoryDatastoreButlerTestCase, unittest.TestCase): 

2649 """Test that a Butler with a Sqlite registry still works after cloning.""" 

2650 

2651 def create_butler( 

2652 self, 

2653 run: str, 

2654 storageClass: StorageClass | str, 

2655 datasetTypeName: str, 

2656 metrics: ButlerMetrics | None = None, 

2657 ) -> tuple[DirectButler, DatasetType]: 

2658 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics) 

2659 return butler.clone(run=run), datasetType 

2660 

2661 

2662class ChainedDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2663 """PosixDatastore specialization""" 

2664 

2665 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml") 

2666 fullConfigKey = ".datastore.datastores.1.formatters" 

2667 validationCanFail = True 

2668 datastoreStr = ["datastore='InMemory", "/FileDatastore_1/,", "/FileDatastore_2/'"] 

2669 datastoreName = [ 

2670 "InMemoryDatastore@", 

2671 f"FileDatastore@{BUTLER_ROOT_TAG}/FileDatastore_1", 

2672 "SecondDatastore", 

2673 ] 

2674 registryStr = "/gen3.sqlite3" 

2675 

2676 def testPruneDatasets(self) -> None: 

2677 # This test relies on manipulating files out-of-band, which is 

2678 # impossible for this configuration because of the InMemoryDatastore in 

2679 # the ChainedDatastore. 

2680 pass 

2681 

2682 

2683class ButlerExplicitRootTestCase(PosixDatastoreButlerTestCase): 

2684 """Test that a yaml file in one location can refer to a root in another.""" 

2685 

2686 datastoreStr = ["dir1"] 

2687 # Disable the makeRepo test since we are deliberately not using 

2688 # butler.yaml as the config name. 

2689 fullConfigKey = None 

2690 

2691 def setUp(self) -> None: 

2692 self.root = makeTestTempDir(TESTDIR) 

2693 

2694 # Make a new repository in one place 

2695 self.dir1 = os.path.join(self.root, "dir1") 

2696 Butler.makeRepo(self.dir1, config=Config(self.configFile)) 

2697 

2698 # Move the yaml file to a different place and add a "root" 

2699 self.dir2 = os.path.join(self.root, "dir2") 

2700 os.makedirs(self.dir2, exist_ok=True) 

2701 configFile1 = os.path.join(self.dir1, "butler.yaml") 

2702 config = Config(configFile1) 

2703 config["root"] = self.dir1 

2704 configFile2 = os.path.join(self.dir2, "butler2.yaml") 

2705 config.dumpToUri(configFile2) 

2706 os.remove(configFile1) 

2707 self.tmpConfigFile = configFile2 

2708 

2709 def testFileLocations(self) -> None: 

2710 self.assertNotEqual(self.dir1, self.dir2) 

2711 self.assertTrue(os.path.exists(os.path.join(self.dir2, "butler2.yaml"))) 

2712 self.assertFalse(os.path.exists(os.path.join(self.dir1, "butler.yaml"))) 

2713 self.assertTrue(os.path.exists(os.path.join(self.dir1, "gen3.sqlite3"))) 

2714 

2715 

2716class ButlerMakeRepoOutfileTestCase(ButlerPutGetTests, unittest.TestCase): 

2717 """Test that a config file created by makeRepo outside of repo works.""" 

2718 

2719 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2720 

2721 def setUp(self) -> None: 

2722 self.root = makeTestTempDir(TESTDIR) 

2723 self.root2 = makeTestTempDir(TESTDIR) 

2724 

2725 self.tmpConfigFile = os.path.join(self.root2, "different.yaml") 

2726 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2727 

2728 def tearDown(self) -> None: 

2729 if os.path.exists(self.root2): 

2730 shutil.rmtree(self.root2, ignore_errors=True) 

2731 super().tearDown() 

2732 

2733 def testConfigExistence(self) -> None: 

2734 c = Config(self.tmpConfigFile) 

2735 uri_config = ResourcePath(c["root"]) 

2736 uri_expected = ResourcePath(self.root, forceDirectory=True) 

2737 self.assertEqual(uri_config.geturl(), uri_expected.geturl()) 

2738 self.assertNotIn(":", uri_config.path, "Check for URI concatenated with normal path") 

2739 

2740 def testPutGet(self) -> None: 

2741 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2742 self.runPutGetTest(storageClass, "test_metric") 

2743 

2744 

2745class ButlerMakeRepoOutfileDirTestCase(ButlerMakeRepoOutfileTestCase): 

2746 """Test that a config file created by makeRepo outside of repo works.""" 

2747 

2748 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2749 

2750 def setUp(self) -> None: 

2751 self.root = makeTestTempDir(TESTDIR) 

2752 self.root2 = makeTestTempDir(TESTDIR) 

2753 

2754 self.tmpConfigFile = self.root2 

2755 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2756 

2757 def testConfigExistence(self) -> None: 

2758 # Append the yaml file else Config constructor does not know the file 

2759 # type. 

2760 self.tmpConfigFile = os.path.join(self.tmpConfigFile, "butler.yaml") 

2761 super().testConfigExistence() 

2762 

2763 

2764class ButlerMakeRepoOutfileUriTestCase(ButlerMakeRepoOutfileTestCase): 

2765 """Test that a config file created by makeRepo outside of repo works.""" 

2766 

2767 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2768 

2769 def setUp(self) -> None: 

2770 self.root = makeTestTempDir(TESTDIR) 

2771 self.root2 = makeTestTempDir(TESTDIR) 

2772 

2773 self.tmpConfigFile = ResourcePath(os.path.join(self.root2, "something.yaml")).geturl() 

2774 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2775 

2776 

2777@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") 

2778class S3DatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2779 """S3Datastore specialization of a butler; an S3 storage Datastore + 

2780 a local in-memory SqlRegistry. 

2781 """ 

2782 

2783 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml") 

2784 fullConfigKey = None 

2785 validationCanFail = True 

2786 

2787 bucketName = "anybucketname" 

2788 """Name of the Bucket that will be used in the tests. The name is read from 

2789 the config file used with the tests during set-up. 

2790 """ 

2791 

2792 root = "butlerRoot/" 

2793 """Root repository directory expected to be used in case useTempRoot=False. 

2794 Otherwise the root is set to a 20 characters long randomly generated string 

2795 during set-up. 

2796 """ 

2797 

2798 datastoreStr = [f"datastore={root}"] 

2799 """Contains all expected root locations in a format expected to be 

2800 returned by Butler stringification. 

2801 """ 

2802 

2803 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"] 

2804 """The expected format of the S3 Datastore string.""" 

2805 

2806 registryStr = "/gen3.sqlite3" 

2807 """Expected format of the Registry string.""" 

2808 

2809 mock_aws = mock_aws() 

2810 """The mocked s3 interface from moto.""" 

2811 

2812 def genRoot(self) -> str: 

2813 """Return a random string of len 20 to serve as a root 

2814 name for the temporary bucket repo. 

2815 

2816 This is equivalent to tempfile.mkdtemp as this is what self.root 

2817 becomes when useTempRoot is True. 

2818 """ 

2819 rndstr = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20)) 

2820 return rndstr + "/" 

2821 

2822 def setUp(self) -> None: 

2823 config = Config(self.configFile) 

2824 uri = ResourcePath(config[".datastore.datastore.root"]) 

2825 self.bucketName = uri.netloc 

2826 

2827 # Enable S3 mocking of tests. 

2828 self.enterContext(clean_test_environment_for_s3()) 

2829 self.mock_aws.start() 

2830 

2831 if self.useTempRoot: 

2832 self.root = self.genRoot() 

2833 rooturi = f"s3://{self.bucketName}/{self.root}" 

2834 config.update({"datastore": {"datastore": {"root": rooturi}}}) 

2835 

2836 # need local folder to store registry database 

2837 self.reg_dir = makeTestTempDir(TESTDIR) 

2838 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3" 

2839 

2840 # MOTO needs to know that we expect Bucket bucketname to exist 

2841 # (this used to be the class attribute bucketName) 

2842 s3 = boto3.resource("s3") 

2843 s3.create_bucket(Bucket=self.bucketName) 

2844 

2845 self.datastoreStr = [f"datastore='{rooturi}'"] 

2846 self.datastoreName = [f"FileDatastore@{rooturi}"] 

2847 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False) 

2848 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml") 

2849 

2850 def tearDown(self) -> None: 

2851 s3 = boto3.resource("s3") 

2852 bucket = s3.Bucket(self.bucketName) 

2853 try: 

2854 bucket.objects.all().delete() 

2855 except botocore.exceptions.ClientError as e: 

2856 if e.response["Error"]["Code"] == "404": 

2857 # the key was not reachable - pass 

2858 pass 

2859 else: 

2860 raise 

2861 

2862 bucket = s3.Bucket(self.bucketName) 

2863 bucket.delete() 

2864 

2865 # Stop the S3 mock. 

2866 self.mock_aws.stop() 

2867 

2868 if self.reg_dir is not None and os.path.exists(self.reg_dir): 

2869 shutil.rmtree(self.reg_dir, ignore_errors=True) 

2870 

2871 if self.useTempRoot and os.path.exists(self.root): 

2872 shutil.rmtree(self.root, ignore_errors=True) 

2873 

2874 super().tearDown() 

2875 

2876 

2877class DatastoreTransfers(TestCaseMixin): 

2878 """Base test setup for data transfers between butlers. The concrete tests 

2879 for specific configurations are in other classes, below. 

2880 """ 

2881 

2882 storageClassFactory: StorageClassFactory 

2883 

2884 @classmethod 

2885 def setUpClass(cls) -> None: 

2886 cls.storageClassFactory = StorageClassFactory() 

2887 

2888 def setUp(self) -> None: 

2889 self.root = makeTestTempDir(TESTDIR) 

2890 self.config = Config(self.configFile) 

2891 

2892 # Some tests cause convertors to be replaced so ensure 

2893 # the storage class factory is reset each time. 

2894 self.storageClassFactory.reset() 

2895 self.storageClassFactory.addFromConfig(self.configFile) 

2896 

2897 def tearDown(self) -> None: 

2898 removeTestTempDir(self.root) 

2899 

2900 def create_butler(self, manager: str | None, label: str, config_file: str | None = None) -> Butler: 

2901 if manager is None: 

2902 manager = ( 

2903 "lsst.daf.butler.registry.datasets.byDimensions.ByDimensionsDatasetRecordStorageManagerUUID" 

2904 ) 

2905 config = Config(config_file if config_file is not None else self.configFile) 

2906 config["registry", "managers", "datasets"] = manager 

2907 butler = Butler.from_config( 

2908 Butler.makeRepo(f"{self.root}/butler{label}", config=config), writeable=True 

2909 ) 

2910 self.enterContext(butler) 

2911 return butler 

2912 

2913 def assertButlerTransfers( 

2914 self, 

2915 purge: bool = False, 

2916 storageClassName: str = "StructuredData", 

2917 storageClassNameTarget: str | None = None, 

2918 ) -> None: 

2919 """Test that a run can be transferred to another butler.""" 

2920 storageClass = self.storageClassFactory.getStorageClass(storageClassName) 

2921 if storageClassNameTarget is not None: 

2922 storageClassTarget = self.storageClassFactory.getStorageClass(storageClassNameTarget) 

2923 else: 

2924 storageClassTarget = storageClass 

2925 

2926 datasetTypeName = "random_data" 

2927 

2928 # Test will create 3 collections and we will want to transfer 

2929 # two of those three. 

2930 runs = ["run1", "run2", "other"] 

2931 

2932 # Also want to use two different dataset types to ensure that 

2933 # grouping works. 

2934 datasetTypeNames = ["random_data", "random_data_2"] 

2935 

2936 # Create the run collections in the source butler. 

2937 for run in runs: 

2938 self.source_butler.collections.register(run) 

2939 

2940 # Create dimensions in source butler. 

2941 n_exposures = 30 

2942 self.source_butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

2943 self.source_butler.registry.insertDimensionData( 

2944 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

2945 ) 

2946 self.source_butler.registry.insertDimensionData( 

2947 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"} 

2948 ) 

2949 self.source_butler.registry.insertDimensionData( 

2950 "day_obs", 

2951 { 

2952 "instrument": "DummyCamComp", 

2953 "id": 20250101, 

2954 }, 

2955 ) 

2956 

2957 for i in range(n_exposures): 

2958 self.source_butler.registry.insertDimensionData( 

2959 "group", {"instrument": "DummyCamComp", "name": f"group{i}"} 

2960 ) 

2961 self.source_butler.registry.insertDimensionData( 

2962 "exposure", 

2963 { 

2964 "instrument": "DummyCamComp", 

2965 "id": i, 

2966 "obs_id": f"exp{i}", 

2967 "physical_filter": "d-r", 

2968 "group": f"group{i}", 

2969 "day_obs": 20250101, 

2970 }, 

2971 ) 

2972 

2973 # Create dataset types in the source butler. 

2974 dimensions = self.source_butler.dimensions.conform(["instrument", "exposure"]) 

2975 for datasetTypeName in datasetTypeNames: 

2976 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

2977 self.source_butler.registry.registerDatasetType(datasetType) 

2978 

2979 # Write a dataset to an unrelated run -- this will ensure that 

2980 # we are rewriting integer dataset ids in the target if necessary. 

2981 # Will not be relevant for UUID. 

2982 run = "distraction" 

2983 butler = Butler.from_config(butler=self.source_butler, run=run) 

2984 self.enterContext(butler) 

2985 butler.put( 

2986 makeExampleMetrics(), 

2987 datasetTypeName, 

2988 exposure=1, 

2989 instrument="DummyCamComp", 

2990 physical_filter="d-r", 

2991 ) 

2992 

2993 # Write some example metrics to the source 

2994 butler = Butler.from_config(butler=self.source_butler) 

2995 self.enterContext(butler) 

2996 

2997 # Set of DatasetRefs that should be in the list of refs to transfer 

2998 # but which will not be transferred. 

2999 deleted: set[DatasetRef] = set() 

3000 

3001 n_expected = 20 # Number of datasets expected to be transferred 

3002 source_refs = [] 

3003 for i in range(n_exposures): 

3004 # Put a third of datasets into each collection, only retain 

3005 # two thirds. 

3006 index = i % 3 

3007 run = runs[index] 

3008 datasetTypeName = datasetTypeNames[i % 2] 

3009 

3010 metric = MetricsExample( 

3011 summary={"counter": i}, output={"text": "metric"}, data=[2 * x for x in range(i)] 

3012 ) 

3013 dataId = {"exposure": i, "instrument": "DummyCamComp", "physical_filter": "d-r"} 

3014 ref = butler.put(metric, datasetTypeName, dataId=dataId, run=run) 

3015 

3016 # Remove the datastore record using low-level API, but only 

3017 # for a specific index. 

3018 if purge and index == 1: 

3019 # For one of these delete the file as well. 

3020 # This allows the "missing" code to filter the 

3021 # file out. 

3022 # Access the individual datastores. 

3023 datastores = [] 

3024 if hasattr(butler._datastore, "datastores"): 

3025 datastores.extend(butler._datastore.datastores) 

3026 else: 

3027 datastores.append(butler._datastore) 

3028 

3029 if not deleted: 

3030 # For a chained datastore we need to remove 

3031 # files in each chain. 

3032 for datastore in datastores: 

3033 # The file might not be known to the datastore 

3034 # if constraints are used. 

3035 try: 

3036 primary, uris = datastore.getURIs(ref) 

3037 except FileNotFoundError: 

3038 continue 

3039 if primary and primary.scheme != "mem": 

3040 primary.remove() 

3041 for uri in uris.values(): 

3042 if uri.scheme != "mem": 

3043 uri.remove() 

3044 n_expected -= 1 

3045 deleted.add(ref) 

3046 

3047 # Remove the datastore record. 

3048 for datastore in datastores: 

3049 if hasattr(datastore, "removeStoredItemInfo"): 

3050 datastore.removeStoredItemInfo(ref) 

3051 

3052 if index < 2: 

3053 source_refs.append(ref) 

3054 if ref not in deleted: 

3055 new_metric = butler.get(ref) 

3056 self.assertEqual(new_metric, metric) 

3057 

3058 # Create some bad dataset types to ensure we check for inconsistent 

3059 # definitions. 

3060 badStorageClass = self.storageClassFactory.getStorageClass("StructuredDataList") 

3061 for datasetTypeName in datasetTypeNames: 

3062 datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass) 

3063 self.target_butler.registry.registerDatasetType(datasetType) 

3064 with self.assertRaises(ConflictingDefinitionError) as cm: 

3065 self.target_butler.transfer_from(self.source_butler, source_refs) 

3066 self.assertIn("dataset type differs", str(cm.exception)) 

3067 

3068 # And remove the bad definitions. 

3069 for datasetTypeName in datasetTypeNames: 

3070 self.target_butler.registry.removeDatasetType(datasetTypeName) 

3071 

3072 # Transfer without creating dataset types should fail. 

3073 with self.assertRaises(KeyError): 

3074 self.target_butler.transfer_from(self.source_butler, source_refs) 

3075 

3076 # Transfer without creating dimensions should fail. 

3077 with self.assertRaises(ConflictingDefinitionError) as cm: 

3078 self.target_butler.transfer_from(self.source_butler, source_refs, register_dataset_types=True) 

3079 self.assertIn("dimension", str(cm.exception)) 

3080 

3081 # The dry run test requires dataset types to exist. If we have 

3082 # been given distinct storage classes for the target we have 

3083 # to redefine at least one of the dataset types in the target butler. 

3084 if storageClass != storageClassTarget: 

3085 self.target_butler.registry.removeDatasetType(datasetTypeNames[0]) 

3086 datasetType = DatasetType(datasetTypeNames[0], dimensions, storageClassTarget) 

3087 self.target_butler.registry.registerDatasetType(datasetType) 

3088 

3089 # The failed transfer above leaves registry in an inconsistent 

3090 # state because the run is created but then rolled back without 

3091 # the collection cache being cleared. For now force a refresh. 

3092 # Can remove with DM-35498. 

3093 self.target_butler.registry.refresh() 

3094 

3095 # Do a dry run -- this should not have any effect on the target butler. 

3096 self.target_butler.transfer_from(self.source_butler, source_refs, dry_run=True) 

3097 

3098 # Transfer the records for one ref to test the alternative API. 

3099 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: 

3100 self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]]) 

3101 self.assertIn("number of records transferred: 1", ";".join(log_cm.output)) 

3102 

3103 # Now transfer them to the second butler, including dimensions. 

3104 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: 

3105 transferred = self.target_butler.transfer_from( 

3106 self.source_butler, 

3107 source_refs, 

3108 register_dataset_types=True, 

3109 transfer_dimensions=True, 

3110 ) 

3111 self.assertEqual(len(transferred), n_expected) 

3112 log_output = ";".join(log_cm.output) 

3113 

3114 # A ChainedDatastore will use the in-memory datastore for mexists 

3115 # so we can not rely on the mexists log message. 

3116 self.assertIn("Number of datastore records found in source", log_output) 

3117 self.assertIn("Creating output run", log_output) 

3118 

3119 # Do the transfer twice to ensure that it will do nothing extra. 

3120 # Only do this if purge=True because it does not work for int 

3121 # dataset_id. 

3122 if purge: 

3123 # This should not need to register dataset types. 

3124 transferred = self.target_butler.transfer_from(self.source_butler, source_refs) 

3125 self.assertEqual(len(transferred), n_expected) 

3126 

3127 with self.assertRaises((TypeError, AttributeError)): 

3128 self.target_butler._datastore.transfer_from(self.source_butler, source_refs) # type: ignore 

3129 

3130 with self.assertRaises(ValueError): 

3131 self.target_butler._datastore.transfer_from( 

3132 self.source_butler._datastore, source_refs, transfer="split" 

3133 ) 

3134 

3135 # Now try to get the same refs from the new butler. 

3136 for ref in source_refs: 

3137 if ref not in deleted: 

3138 new_metric = self.target_butler.get(ref) 

3139 old_metric = self.source_butler.get(ref) 

3140 self.assertEqual(new_metric, old_metric) 

3141 

3142 # Try again without implicit storage class conversion 

3143 # triggered by using the source ref. This will do conversion 

3144 # since the formatter will be returning the source python type. 

3145 target_ref = self.target_butler.get_dataset(ref.id) 

3146 if target_ref.datasetType.storageClass != ref.datasetType.storageClass: 

3147 new_metric = self.target_butler.get(target_ref) 

3148 self.assertNotEqual(type(new_metric), type(old_metric)) 

3149 

3150 # Remove the dataset from the target and put it again 

3151 # as if it was the right type all along for this butler. 

3152 self.target_butler.pruneDatasets( 

3153 [target_ref], unstore=True, purge=True, disassociate=True 

3154 ) 

3155 self.target_butler.put(new_metric, target_ref) 

3156 new_new_metric = self.target_butler.get(target_ref) 

3157 new_old_metric = self.target_butler.get( 

3158 target_ref, storageClass=ref.datasetType.storageClass 

3159 ) 

3160 self.assertEqual(new_new_metric, new_metric) 

3161 self.assertEqual(new_old_metric, old_metric) 

3162 

3163 # Now prune run2 collection and create instead a CHAINED collection. 

3164 # This should block the transfer. 

3165 self.target_butler.removeRuns(["run2"]) 

3166 self.target_butler.collections.register("run2", CollectionType.CHAINED) 

3167 with self.assertRaises(CollectionTypeError): 

3168 # Re-importing the run1 datasets can be problematic if they 

3169 # use integer IDs so filter those out. 

3170 to_transfer = [ref for ref in source_refs if ref.run == "run2"] 

3171 self.target_butler.transfer_from(self.source_butler, to_transfer) 

3172 

3173 

3174class PosixDatastoreTransfers(DatastoreTransfers, unittest.TestCase): 

3175 """Test data transfers between butlers. 

3176 

3177 Test for different managers. UUID to UUID and integer to integer are 

3178 tested. UUID to integer is not supported since we do not currently 

3179 want to allow that. Integer to UUID is supported with the caveat 

3180 that UUID4 will be generated and this will be incorrect for raw 

3181 dataset types. The test ignores that. 

3182 """ 

3183 

3184 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3185 

3186 def create_butlers( 

3187 self, manager1: str | None = None, manager2: str | None = None, source_config: str | None = None 

3188 ) -> None: 

3189 self.source_butler = self.create_butler(manager1, "1", config_file=source_config) 

3190 self.target_butler = self.create_butler(manager2, "2") 

3191 

3192 def testTransferUuidToUuid(self) -> None: 

3193 self.create_butlers() 

3194 self.assertButlerTransfers() 

3195 

3196 def testTransferFromChainedUuidToUuid(self) -> None: 

3197 """Force the source butler to be a ChainedDatastore.""" 

3198 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-chained.yaml")) 

3199 self.assertButlerTransfers() 

3200 

3201 def testTransferFromIncompatibleUuidToUuid(self) -> None: 

3202 """Force the source butler to be a incompatible datastore.""" 

3203 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")) 

3204 with self.assertRaises(NotImplementedError): 

3205 self.assertButlerTransfers() 

3206 

3207 def testTransferFromIncompatibleChainUuidToUuid(self) -> None: 

3208 """Force the source butler to be a incompatible datastore.""" 

3209 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory-chain.yaml")) 

3210 with self.assertRaises(TypeError): 

3211 self.assertButlerTransfers() 

3212 

3213 def testTransferFromFileUuidToUuid(self) -> None: 

3214 """Force the source butler to be a FileDatastore.""" 

3215 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler.yaml")) 

3216 self.assertButlerTransfers() 

3217 

3218 def testTransferMissing(self) -> None: 

3219 """Test transfers where datastore records are missing. 

3220 

3221 This is how execution butler works. 

3222 """ 

3223 self.create_butlers() 

3224 

3225 # Configure the source butler to allow trust. 

3226 self.source_butler._datastore._set_trust_mode(True) 

3227 

3228 self.assertButlerTransfers(purge=True) 

3229 

3230 def testTransferMissingDisassembly(self) -> None: 

3231 """Test transfers where datastore records are missing. 

3232 

3233 This is how execution butler works. 

3234 """ 

3235 self.create_butlers() 

3236 

3237 # Configure the source butler to allow trust. 

3238 self.source_butler._datastore._set_trust_mode(True) 

3239 

3240 # Test disassembly. 

3241 self.assertButlerTransfers(purge=True, storageClassName="StructuredComposite") 

3242 

3243 def testTransferDifferingStorageClasses(self) -> None: 

3244 """Test transfers when the source butler dataset type has a different 

3245 but compatible storage class. 

3246 """ 

3247 self.create_butlers() 

3248 

3249 self.assertButlerTransfers(storageClassNameTarget="MetricsConversion") 

3250 

3251 def testTransferDifferingStorageClassesDisassembly(self) -> None: 

3252 """Test transfers when the source butler dataset type has a different 

3253 but compatible storage class and where the source butler has 

3254 disassembled. 

3255 """ 

3256 self.create_butlers() 

3257 

3258 self.assertButlerTransfers( 

3259 storageClassName="StructuredComposite", storageClassNameTarget="MetricsConversion" 

3260 ) 

3261 

3262 def testUnsafeDirectTransfer(self) -> None: 

3263 """Test that transfer='unsafe_direct' records the absolute URI of 

3264 source files in the target datastore. 

3265 """ 

3266 self.create_butlers() 

3267 dataset_type = DatasetType("dt", [], "int", universe=self.source_butler.dimensions) 

3268 self.source_butler.registry.registerDatasetType(dataset_type) 

3269 self.source_butler.collections.register("run") 

3270 ref = self.source_butler.put(123, "dt", [], run="run") 

3271 self.target_butler.transfer_from( 

3272 self.source_butler, [ref], transfer="unsafe_direct", register_dataset_types=True 

3273 ) 

3274 self.assertEqual(self.target_butler.get(ref), 123) 

3275 self.assertEqual(self.source_butler.getURI(ref), self.target_butler.getURI(ref)) 

3276 

3277 def testAbsoluteURITransferDirect(self) -> None: 

3278 """Test transfer using an absolute URI.""" 

3279 self._absolute_transfer("auto") 

3280 

3281 def testAbsoluteURITransferUnsafeDirect(self) -> None: 

3282 """Test transfer using an absolute URI.""" 

3283 self._absolute_transfer("unsafe_direct") 

3284 

3285 def testAbsoluteURITransferCopy(self) -> None: 

3286 """Test transfer using an absolute URI.""" 

3287 self._absolute_transfer("copy") 

3288 

3289 def _absolute_transfer(self, transfer: str) -> None: 

3290 self.create_butlers() 

3291 

3292 storageClassName = "StructuredData" 

3293 storageClass = self.storageClassFactory.getStorageClass(storageClassName) 

3294 datasetTypeName = "random_data" 

3295 run = "run1" 

3296 self.source_butler.collections.register(run) 

3297 

3298 dimensions = self.source_butler.dimensions.conform(()) 

3299 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

3300 self.source_butler.registry.registerDatasetType(datasetType) 

3301 

3302 metrics = makeExampleMetrics() 

3303 with ResourcePath.temporary_uri(suffix=".json") as temp: 

3304 dataId = DataCoordinate.make_empty(self.source_butler.dimensions) 

3305 source_refs = [DatasetRef(datasetType, dataId, run=run)] 

3306 temp.write(json.dumps(metrics.exportAsDict()).encode()) 

3307 dataset = FileDataset(path=temp, refs=source_refs) 

3308 self.source_butler.ingest(dataset, transfer="direct") 

3309 

3310 self.target_butler.transfer_from( 

3311 self.source_butler, dataset.refs, register_dataset_types=True, transfer=transfer 

3312 ) 

3313 

3314 uri = self.target_butler.getURI(dataset.refs[0]) 

3315 if transfer == "auto" or transfer == "unsafe_direct": 

3316 self.assertEqual(uri, temp) 

3317 else: 

3318 self.assertNotEqual(uri, temp) 

3319 

3320 def test_shared_dimension_group(self): 

3321 """Test internal logic that divides dataset types by dimension group 

3322 when doing registry updates. 

3323 """ 

3324 self.create_butlers() 

3325 self.source_butler.import_(filename=_get_test_data_path("base.yaml"), without_datastore=True) 

3326 self.source_butler.import_(filename=_get_test_data_path("datasets.yaml"), without_datastore=True) 

3327 

3328 source_butler = self.source_butler 

3329 target_butler = self.target_butler 

3330 

3331 # Create a dataset type with the same dimensions as the 'bias' dataset 

3332 # type from base.yaml 

3333 dataset_type = DatasetType( 

3334 "test_type", ["instrument", "detector"], "int", universe=source_butler.dimensions 

3335 ) 

3336 source_butler.registry.registerDatasetType(dataset_type) 

3337 # This has the same data ID as one of the bias datasets in 

3338 # datasets.yaml. 

3339 test_ref = source_butler.registry.insertDatasets( 

3340 "test_type", [{"instrument": "Cam1", "detector": 2}], run="imported_g" 

3341 )[0] 

3342 

3343 biases = source_butler.query_datasets("bias", ["imported_g", "imported_r"]) 

3344 flats = source_butler.query_datasets("flat", ["imported_g", "imported_r"]) 

3345 refs = [test_ref, *biases, *flats] 

3346 

3347 # Test setup will be even more convoluted if we want the datastore to 

3348 # actually transfer files. For testing the dimension group behavior, 

3349 # we really only care about the registry. 

3350 with unittest.mock.patch.object(target_butler._datastore, "transfer_from") as mock: 

3351 mock.return_value = (set(refs), set()) 

3352 target_butler.transfer_from( 

3353 source_butler, 

3354 refs, 

3355 transfer=None, 

3356 register_dataset_types=True, 

3357 skip_missing=False, 

3358 transfer_dimensions=True, 

3359 ) 

3360 

3361 transferred_test_ref = target_butler.find_dataset( 

3362 "test_type", {"instrument": "Cam1", "detector": 2}, collections="imported_g" 

3363 ) 

3364 self.assertEqual(transferred_test_ref.id, test_ref.id) 

3365 

3366 transferred_bias = target_butler.find_dataset( 

3367 "bias", {"instrument": "Cam1", "detector": 2}, collections="imported_g" 

3368 ) 

3369 self.assertEqual(transferred_bias.id, uuid.UUID("51352db4-a47a-447c-b12d-a50b206b17cd")) 

3370 

3371 transferred_flat = target_butler.find_dataset( 

3372 "flat", 

3373 {"instrument": "Cam1", "detector": 2, "physical_filter": "Cam1-R1", "band": "r"}, 

3374 collections="imported_r", 

3375 ) 

3376 self.assertEqual(transferred_flat.id, uuid.UUID("c1296796-56c5-4acf-9b49-40d920c6f840")) 

3377 

3378 

3379class ChainedDatastoreTransfers(PosixDatastoreTransfers): 

3380 """Test transfers using a chained datastore.""" 

3381 

3382 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml") 

3383 

3384 

3385@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3386class ButlerServerDatastoreTransfers(DatastoreTransfers, unittest.TestCase): 

3387 """Test ``transfer_from`` involving Butler server.""" 

3388 

3389 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3390 

3391 def test_transfers_from_remote_to_direct(self) -> None: 

3392 from lsst.daf.butler.remote_butler._remote_file_transfer_source import ( 

3393 mock_file_transfer_uris_for_unit_test, 

3394 ) 

3395 

3396 self.target_butler = self.create_butler(None, "2") 

3397 with create_test_server(TESTDIR) as server: 

3398 self.source_butler = server.hybrid_butler 

3399 

3400 def _remap_transfer_url(path: HttpResourcePath) -> HttpResourcePath: 

3401 # The Butler server returns HTTP URIs with a domain name that 

3402 # is not resolvable because there is no actual HTTP server 

3403 # involved in these tests. Strip this first layer of 

3404 # indirection, and return the target of the redirect instead. 

3405 response = server.client.get(str(path), follow_redirects=False, headers=path._extra_headers) 

3406 return ResourcePath(str(response.next_request.url)) 

3407 

3408 with mock_file_transfer_uris_for_unit_test(_remap_transfer_url): 

3409 self.assertButlerTransfers() 

3410 

3411 

3412class TransferDatasetsInPlace(unittest.TestCase): 

3413 """Test behavior of transfer_datasets_in_place() specialty function used by 

3414 Prompt Publication service. 

3415 """ 

3416 

3417 def test_file_datastore(self) -> None: 

3418 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3419 with ( 

3420 tempfile.TemporaryDirectory() as datastore_root, 

3421 tempfile.TemporaryDirectory() as other_repo_root, 

3422 ): 

3423 config = Config(configFile) 

3424 config["datastore", "datastore", "name"] = "file_datastore" 

3425 Butler.makeRepo(datastore_root, config=config) 

3426 config["datastore", "datastore", "root"] = datastore_root 

3427 Butler.makeRepo(other_repo_root, config, forceConfigRoot=False) 

3428 with ( 

3429 Butler(datastore_root, writeable=True) as source_butler, 

3430 Butler(other_repo_root, writeable=True) as target_butler, 

3431 ): 

3432 self._test_transfer_datasets_in_place(source_butler, target_butler) 

3433 

3434 def test_chained_datastore(self) -> None: 

3435 configFile = os.path.join(TESTDIR, "config/basic/butler-chained-posix.yaml") 

3436 with ( 

3437 tempfile.TemporaryDirectory() as datastore_root, 

3438 tempfile.TemporaryDirectory() as other_repo_root, 

3439 ): 

3440 config = Config(configFile) 

3441 config["datastore", "datastore", "datastores", 0, "datastore", "root"] = ( 

3442 f"{datastore_root}/butler_test_repository" 

3443 ) 

3444 config["datastore", "datastore", "datastores", 1, "datastore", "root"] = ( 

3445 f"{datastore_root}/butler_test_repository2" 

3446 ) 

3447 Butler.makeRepo(datastore_root, config=config, forceConfigRoot=False) 

3448 Butler.makeRepo(other_repo_root, config=config, forceConfigRoot=False) 

3449 with ( 

3450 Butler(datastore_root, writeable=True) as source_butler, 

3451 Butler(other_repo_root, writeable=True) as target_butler, 

3452 ): 

3453 self._test_transfer_datasets_in_place(source_butler, target_butler) 

3454 

3455 def _test_transfer_datasets_in_place( 

3456 self, source_butler: DirectButler, target_butler: DirectButler 

3457 ) -> None: 

3458 metric_repo = MetricTestRepo.create_from_butler( 

3459 source_butler, 

3460 source_butler._config, 

3461 ) 

3462 target_butler.transfer_dimension_records_from(source_butler, [metric_repo.ref1, metric_repo.ref2]) 

3463 # Verify that the setup was correct and the two repos have 

3464 # independent registries. 

3465 self.assertIsNone(target_butler.get_dataset(metric_repo.ref1.id)) 

3466 # Copy one dataset, and make sure we can load it from the 

3467 # target repo. 

3468 self.assertEqual( 

3469 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]), 

3470 [metric_repo.ref1], 

3471 ) 

3472 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3473 self.assertIsNone(target_butler.get_dataset(metric_repo.ref2.id)) 

3474 self.assertEqual(source_butler.getURIs(metric_repo.ref1), target_butler.getURIs(metric_repo.ref1)) 

3475 # Trying to copy the same dataset again is a no-op. 

3476 self.assertEqual( 

3477 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]), 

3478 [], 

3479 ) 

3480 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3481 # A mix of existing and non-existing datasets. 

3482 self.assertEqual( 

3483 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1, metric_repo.ref2]), 

3484 [metric_repo.ref2], 

3485 ) 

3486 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3487 self.assertEqual(target_butler.get(metric_repo.ref2), source_butler.get(metric_repo.ref2)) 

3488 

3489 # For testing datastore chaining, set up a dataset that is only 

3490 # accepted by one of the datastores. 

3491 source_butler.registry.registerDatasetType( 

3492 DatasetType("rejected_by_first", source_butler.dimensions.conform([]), "int") 

3493 ) 

3494 source_butler.registry.registerRun("run") 

3495 ref = source_butler.put(1, "rejected_by_first", dataId={}, run="run") 

3496 self.assertEqual( 

3497 transfer_datasets_in_place(source_butler, target_butler, [ref]), 

3498 [ref], 

3499 ) 

3500 self.assertEqual(1, target_butler.get(ref)) 

3501 

3502 

3503class NullDatastoreTestCase(unittest.TestCase): 

3504 """Test that we can fall back to a null datastore.""" 

3505 

3506 # Need a good config to create the repo. 

3507 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3508 storageClassFactory: StorageClassFactory 

3509 

3510 @classmethod 

3511 def setUpClass(cls) -> None: 

3512 cls.storageClassFactory = StorageClassFactory() 

3513 cls.storageClassFactory.addFromConfig(cls.configFile) 

3514 

3515 def setUp(self) -> None: 

3516 """Create a new butler root for each test.""" 

3517 self.root = makeTestTempDir(TESTDIR) 

3518 Butler.makeRepo(self.root, config=Config(self.configFile)) 

3519 

3520 def tearDown(self) -> None: 

3521 removeTestTempDir(self.root) 

3522 

3523 def test_fallback(self) -> None: 

3524 # Read the butler config and mess with the datastore section. 

3525 config_path = os.path.join(self.root, "butler.yaml") 

3526 bad_config = Config(config_path) 

3527 bad_config["datastore", "cls"] = "lsst.not.a.datastore.Datastore" 

3528 bad_config.dumpToUri(config_path) 

3529 

3530 with self.assertRaises(RuntimeError): 

3531 Butler(self.root, without_datastore=False) 

3532 

3533 with self.assertRaises(RuntimeError): 

3534 Butler.from_config(self.root, without_datastore=False) 

3535 

3536 butler = Butler.from_config(self.root, writeable=True, without_datastore=True) 

3537 self.enterContext(butler) 

3538 self.assertIsInstance(butler._datastore, NullDatastore) 

3539 

3540 # Check that registry is working. 

3541 butler.collections.register("MYRUN") 

3542 collections = butler.collections.query("*") 

3543 self.assertIn("MYRUN", set(collections)) 

3544 

3545 # Create a ref. 

3546 dimensions = butler.dimensions.conform([]) 

3547 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

3548 datasetTypeName = "metric" 

3549 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

3550 butler.registry.registerDatasetType(datasetType) 

3551 ref = DatasetRef(datasetType, {}, run="MYRUN") 

3552 

3553 # Check that datastore will complain. 

3554 with self.assertRaises(FileNotFoundError): 

3555 butler.get(ref) 

3556 with self.assertRaises(FileNotFoundError): 

3557 butler.getURI(ref) 

3558 

3559 

3560@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3561class ButlerServerTests(FileDatastoreButlerTests): 

3562 """Test RemoteButler and Butler server.""" 

3563 

3564 configFile = None 

3565 predictionSupported = False 

3566 trustModeSupported = False 

3567 

3568 postgres: TemporaryPostgresInstance | None 

3569 

3570 def setUp(self): 

3571 self.server_instance = self.enterContext(create_test_server(TESTDIR)) 

3572 

3573 def tearDown(self): 

3574 pass 

3575 

3576 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool: 

3577 # S3 pre-signed URLs may end up with differing expiration times in the 

3578 # query parameters, so ignore query parameters when comparing. 

3579 return uri1.scheme == uri2.scheme and uri1.netloc == uri2.netloc and uri1.path == uri2.path 

3580 

3581 def create_empty_butler( 

3582 self, 

3583 run: str | None = None, 

3584 writeable: bool | None = None, 

3585 metrics: ButlerMetrics | None = None, 

3586 cleanup: bool = True, 

3587 ) -> Butler: 

3588 return self.server_instance.hybrid_butler.clone(run=run, metrics=metrics) 

3589 

3590 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None: 

3591 # Can't delete a file via S3 signed URLs, so we need to reach in 

3592 # through DirectButler to delete the dataset. 

3593 uri = self.server_instance.direct_butler.getURI(ref) 

3594 uri.remove() 

3595 

3596 def testConstructor(self): 

3597 # RemoteButler constructor is tested in test_server.py and 

3598 # test_remote_butler.py. 

3599 pass 

3600 

3601 def testDafButlerRepositories(self): 

3602 # Loading of RemoteButler via repository index is tested in 

3603 # test_server.py. 

3604 pass 

3605 

3606 def testGetDatasetTypes(self) -> None: 

3607 # This is mostly a test of validateConfiguration, which is for 

3608 # validating Datastore configuration and thus isn't relevant to 

3609 # RemoteButler. 

3610 pass 

3611 

3612 def testMakeRepo(self) -> None: 

3613 # Only applies to DirectButler. 

3614 pass 

3615 

3616 # Pickling not yet implemented for RemoteButler/HybridButler. 

3617 @unittest.expectedFailure 

3618 def testPickle(self) -> None: 

3619 return super().testPickle() 

3620 

3621 def testStringification(self) -> None: 

3622 self.assertEqual( 

3623 str(self.server_instance.remote_butler), 

3624 "RemoteButler(https://test.example/api/butler/repo/testrepo/)", 

3625 ) 

3626 

3627 def testTransaction(self) -> None: 

3628 # Transactions will never be supported for RemoteButler. 

3629 pass 

3630 

3631 def testPutTemplates(self) -> None: 

3632 # The Butler server instance is configured with different file naming 

3633 # templates than this test is expecting. 

3634 pass 

3635 

3636 

3637@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3638class ButlerServerSqliteTests(ButlerServerTests, unittest.TestCase): 

3639 """Tests for RemoteButler's registry shim, with a SQLite DB backing the 

3640 server. 

3641 """ 

3642 

3643 postgres = None 

3644 

3645 

3646@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3647class ButlerServerPostgresTests(ButlerServerTests, unittest.TestCase): 

3648 """Tests for RemoteButler's registry shim, with a Postgres DB backing the 

3649 server. 

3650 """ 

3651 

3652 @classmethod 

3653 def setUpClass(cls): 

3654 cls.postgres = cls.enterClassContext(setup_postgres_test_db()) 

3655 super().setUpClass() 

3656 

3657 

3658def setup_module(module: types.ModuleType) -> None: 

3659 """Set up the module for pytest.""" 

3660 clean_environment() 

3661 

3662 

3663def _get_test_data_path(filename: str) -> ResourcePath: 

3664 return ResourcePath(f"resource://lsst.daf.butler/tests/registry_data/{filename}") 

3665 

3666 

3667if __name__ == "__main__": 

3668 clean_environment() 

3669 unittest.main()