Coverage for tests / test_butler.py: 13%

1898 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Tests for Butler.""" 

29 

30from __future__ import annotations 

31 

32import json 

33import logging 

34import os 

35import pathlib 

36import pickle 

37import posixpath 

38import random 

39import re 

40import shutil 

41import string 

42import tempfile 

43import unittest 

44import unittest.mock 

45import uuid 

46import warnings 

47import weakref 

48from collections.abc import Callable, Mapping 

49from typing import TYPE_CHECKING, Any, cast 

50 

51try: 

52 import boto3 

53 import botocore 

54 

55 from lsst.resources.s3utils import clean_test_environment_for_s3 

56 

57 try: 

58 from moto import mock_aws # v5 

59 except ImportError: 

60 from moto import mock_s3 as mock_aws 

61except ImportError: 

62 boto3 = None 

63 

64 def mock_aws(*args: Any, **kwargs: Any) -> Any: # type: ignore[no-untyped-def] 

65 """No-op decorator in case moto mock_aws can not be imported.""" 

66 return None 

67 

68 

69import astropy.time 

70from sqlalchemy.exc import IntegrityError 

71 

72from lsst.daf.butler import ( 

73 Butler, 

74 ButlerConfig, 

75 ButlerMetrics, 

76 ButlerRepoIndex, 

77 CollectionCycleError, 

78 CollectionType, 

79 Config, 

80 DataCoordinate, 

81 DatasetExistence, 

82 DatasetNotFoundError, 

83 DatasetProvenance, 

84 DatasetRef, 

85 DatasetType, 

86 DimensionRecord, 

87 FileDataset, 

88 NoDefaultCollectionError, 

89 StorageClassFactory, 

90 ValidationError, 

91 script, 

92) 

93from lsst.daf.butler._rubin.file_datasets import transfer_datasets_to_datastore 

94from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest 

95from lsst.daf.butler._rubin.transfer_datasets_in_place import transfer_datasets_in_place 

96from lsst.daf.butler.datastore import NullDatastore 

97from lsst.daf.butler.datastore.file_templates import FileTemplate, FileTemplateValidationError 

98from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ZipIndex 

99from lsst.daf.butler.datastores.fileDatastore import FileDatastore 

100from lsst.daf.butler.direct_butler import DirectButler 

101from lsst.daf.butler.registry import ( 

102 CollectionError, 

103 CollectionTypeError, 

104 ConflictingDefinitionError, 

105 DataIdValueError, 

106 DatasetTypeExpressionError, 

107 MissingCollectionError, 

108 OrphanedRecordError, 

109) 

110from lsst.daf.butler.registry.sql_registry import SqlRegistry 

111from lsst.daf.butler.repo_relocation import BUTLER_ROOT_TAG 

112from lsst.daf.butler.tests import MetricsExample, MetricsExampleModel, MultiDetectorFormatter 

113from lsst.daf.butler.tests.postgresql import TemporaryPostgresInstance, setup_postgres_test_db 

114from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available 

115from lsst.daf.butler.tests.utils import ( 

116 MetricTestRepo, 

117 TestCaseMixin, 

118 create_populated_sqlite_registry, 

119 makeTestTempDir, 

120 removeTestTempDir, 

121 safeTestTempDir, 

122) 

123from lsst.resources import ResourcePath 

124from lsst.resources.http import HttpResourcePath 

125from lsst.utils import doImportType 

126from lsst.utils.introspection import get_full_type_name 

127 

128if butler_server_is_available: 

129 from lsst.daf.butler.tests.server import create_test_server 

130 

131 

132if TYPE_CHECKING: 

133 import types 

134 

135 from lsst.daf.butler import DimensionGroup, Registry, StorageClass 

136 

137TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

138 

139 

140def clean_environment() -> None: 

141 """Remove external environment variables that affect the tests.""" 

142 for k in ("DAF_BUTLER_REPOSITORY_INDEX",): 

143 os.environ.pop(k, None) 

144 

145 

146def makeExampleMetrics() -> MetricsExample: 

147 """Return example dataset suitable for tests.""" 

148 return MetricsExample( 

149 {"AM1": 5.2, "AM2": 30.6}, 

150 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

151 [563, 234, 456.7, 752, 8, 9, 27], 

152 ) 

153 

154 

155class TransactionTestError(Exception): 

156 """Specific error for testing transactions, to prevent misdiagnosing 

157 that might otherwise occur when a standard exception is used. 

158 """ 

159 

160 pass 

161 

162 

163class ButlerConfigTests(unittest.TestCase): 

164 """Simple tests for ButlerConfig that are not tested in any other test 

165 cases. 

166 """ 

167 

168 def testSearchPath(self) -> None: 

169 configFile = os.path.join(TESTDIR, "config", "basic", "butler.yaml") 

170 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm: 

171 config1 = ButlerConfig(configFile) 

172 self.assertNotIn("testConfigs", "\n".join(cm.output)) 

173 

174 overrideDirectory = os.path.join(TESTDIR, "config", "testConfigs") 

175 with self.assertLogs("lsst.daf.butler", level="DEBUG") as cm: 

176 config2 = ButlerConfig(configFile, searchPaths=[overrideDirectory]) 

177 self.assertIn("testConfigs", "\n".join(cm.output)) 

178 

179 key = ("datastore", "records", "table") 

180 self.assertNotEqual(config1[key], config2[key]) 

181 self.assertEqual(config2[key], "override_record") 

182 

183 

184class ButlerPutGetTests(TestCaseMixin): 

185 """Helper method for running a suite of put/get tests from different 

186 butler configurations. 

187 """ 

188 

189 root: str 

190 default_run = "ingésτ😺" 

191 storageClassFactory: StorageClassFactory 

192 configFile: str | None 

193 tmpConfigFile: str 

194 

195 @staticmethod 

196 def addDatasetType( 

197 datasetTypeName: str, dimensions: DimensionGroup, storageClass: StorageClass | str, registry: Registry 

198 ) -> DatasetType: 

199 """Create a DatasetType and register it""" 

200 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

201 registry.registerDatasetType(datasetType) 

202 return datasetType 

203 

204 @classmethod 

205 def setUpClass(cls) -> None: 

206 cls.storageClassFactory = StorageClassFactory() 

207 if cls.configFile is not None: 

208 cls.storageClassFactory.addFromConfig(cls.configFile) 

209 

210 def assertGetComponents( 

211 self, 

212 butler: Butler, 

213 datasetRef: DatasetRef, 

214 components: tuple[str, ...], 

215 reference: Any, 

216 collections: Any = None, 

217 ) -> None: 

218 datasetType = datasetRef.datasetType 

219 dataId = datasetRef.dataId 

220 deferred = butler.getDeferred(datasetRef) 

221 

222 for component in components: 

223 compTypeName = datasetType.componentTypeName(component) 

224 result = butler.get(compTypeName, dataId, collections=collections) 

225 self.assertEqual(result, getattr(reference, component)) 

226 result_deferred = deferred.get(component=component) 

227 self.assertEqual(result_deferred, result) 

228 

229 def tearDown(self) -> None: 

230 if self.root is not None: 

231 removeTestTempDir(self.root) 

232 

233 def create_empty_butler( 

234 self, 

235 run: str | None = None, 

236 writeable: bool | None = None, 

237 metrics: ButlerMetrics | None = None, 

238 cleanup: bool = True, 

239 ): 

240 """Create a Butler for the test repository, without inserting test 

241 data. 

242 """ 

243 butler = Butler.from_config(self.tmpConfigFile, run=run, writeable=writeable, metrics=metrics) 

244 if cleanup: 

245 self.enterContext(butler) 

246 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration" 

247 return butler 

248 

249 def create_butler( 

250 self, 

251 run: str, 

252 storageClass: StorageClass | str, 

253 datasetTypeName: str, 

254 metrics: ButlerMetrics | None = None, 

255 ) -> tuple[Butler, DatasetType]: 

256 """Create a Butler for the test repository and insert some test data 

257 into it. 

258 """ 

259 butler = self.create_empty_butler(run=run, metrics=metrics) 

260 

261 collections = set(butler.collections.query("*")) 

262 self.assertEqual(collections, {run}) 

263 # Create and register a DatasetType 

264 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

265 

266 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

267 

268 # Add needed Dimensions 

269 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

270 butler.registry.insertDimensionData( 

271 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

272 ) 

273 butler.registry.insertDimensionData( 

274 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"} 

275 ) 

276 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101}) 

277 visit_start = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai") 

278 visit_end = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai") 

279 butler.registry.insertDimensionData( 

280 "visit", 

281 { 

282 "instrument": "DummyCamComp", 

283 "id": 423, 

284 "name": "fourtwentythree", 

285 "physical_filter": "d-r", 

286 "datetime_begin": visit_start, 

287 "datetime_end": visit_end, 

288 "day_obs": 20200101, 

289 }, 

290 ) 

291 

292 # Add more visits for some later tests 

293 for visit_id in (424, 425): 

294 butler.registry.insertDimensionData( 

295 "visit", 

296 { 

297 "instrument": "DummyCamComp", 

298 "id": visit_id, 

299 "name": f"fourtwentyfour_{visit_id}", 

300 "physical_filter": "d-r", 

301 "day_obs": 20200101, 

302 }, 

303 ) 

304 return butler, datasetType 

305 

306 def runPutGetTest(self, storageClass: StorageClass, datasetTypeName: str) -> Butler: 

307 # New datasets will be added to run and tag, but we will only look in 

308 # tag when looking up datasets. 

309 run = self.default_run 

310 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName) 

311 assert butler.run is not None 

312 

313 # Create and store a dataset 

314 metric = makeExampleMetrics() 

315 dataId = butler.registry.expandDataId({"instrument": "DummyCamComp", "visit": 423}) 

316 

317 # Dataset should not exist if we haven't added it 

318 with self.assertRaises(DatasetNotFoundError): 

319 butler.get(datasetTypeName, dataId) 

320 

321 # Put and remove the dataset once as a DatasetRef, once as a dataId, 

322 # and once with a DatasetType 

323 

324 # Keep track of any collections we add and do not clean up 

325 expected_collections = {run} 

326 

327 counter = 0 

328 ref = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run="put_run_1") 

329 args = tuple[DatasetRef] | tuple[str | DatasetType, DataCoordinate] 

330 for args in ((ref,), (datasetTypeName, dataId), (datasetType, dataId)): 

331 # Since we are using subTest we can get cascading failures 

332 # here with the first attempt failing and the others failing 

333 # immediately because the dataset already exists. Work around 

334 # this by using a distinct run collection each time 

335 counter += 1 

336 this_run = f"put_run_{counter}" 

337 butler.collections.register(this_run) 

338 expected_collections.update({this_run}) 

339 

340 with self.subTest(args=repr(args)): 

341 kwargs: dict[str, Any] = {} 

342 if not isinstance(args[0], DatasetRef): # type: ignore 

343 kwargs["run"] = this_run 

344 ref = butler.put(metric, *args, **kwargs) 

345 self.assertIsInstance(ref, DatasetRef) 

346 

347 # Test get of a ref. 

348 metricOut = butler.get(ref) 

349 self.assertEqual(metric, metricOut) 

350 # Test get 

351 metricOut = butler.get(ref.datasetType.name, dataId, collections=this_run) 

352 self.assertEqual(metric, metricOut) 

353 # Test get with a datasetRef 

354 metricOut = butler.get(ref) 

355 self.assertEqual(metric, metricOut) 

356 # Test getDeferred with dataId 

357 metricOut = butler.getDeferred(ref.datasetType.name, dataId, collections=this_run).get() 

358 self.assertEqual(metric, metricOut) 

359 # Test getDeferred with a ref 

360 metricOut = butler.getDeferred(ref).get() 

361 self.assertEqual(metric, metricOut) 

362 

363 # Check we can get components 

364 if storageClass.isComposite(): 

365 self.assertGetComponents( 

366 butler, ref, ("summary", "data", "output"), metric, collections=this_run 

367 ) 

368 

369 primary_uri, secondary_uris = butler.getURIs(ref) 

370 n_uris = len(secondary_uris) 

371 if primary_uri: 

372 n_uris += 1 

373 

374 # Can the artifacts themselves be retrieved? 

375 if not butler._datastore.isEphemeral: 

376 # Create a temporary directory to hold the retrieved 

377 # artifacts. 

378 with tempfile.TemporaryDirectory( 

379 prefix="butler-artifacts-", ignore_cleanup_errors=True 

380 ) as artifact_root: 

381 root_uri = ResourcePath(artifact_root, forceDirectory=True) 

382 

383 for preserve_path in (True, False): 

384 destination = root_uri.join(f"{preserve_path}_{counter}/") 

385 log = logging.getLogger("lsst.x") 

386 log.debug("Using destination %s for args %s", destination, args) 

387 # Use copy so that we can test that overwrite 

388 # protection works (using "auto" for File URIs 

389 # would use hard links and subsequent transfer 

390 # would work because it knows they are the same 

391 # file). 

392 transferred = butler.retrieveArtifacts( 

393 [ref], destination, preserve_path=preserve_path, transfer="copy" 

394 ) 

395 self.assertGreater(len(transferred), 0) 

396 artifacts = list(ResourcePath.findFileResources([destination])) 

397 # Filter out the index file. 

398 artifacts = [a for a in artifacts if a.basename() != ZipIndex.index_name] 

399 self.assertEqual(set(transferred), set(artifacts)) 

400 

401 for artifact in transferred: 

402 path_in_destination = artifact.relative_to(destination) 

403 self.assertIsNotNone(path_in_destination) 

404 assert path_in_destination is not None 

405 

406 # When path is not preserved there should not 

407 # be any path separators. 

408 num_seps = path_in_destination.count("/") 

409 if preserve_path: 

410 self.assertGreater(num_seps, 0) 

411 else: 

412 self.assertEqual(num_seps, 0) 

413 

414 self.assertEqual( 

415 len(artifacts), 

416 n_uris, 

417 "Comparing expected artifacts vs actual:" 

418 f" {artifacts} vs {primary_uri} and {secondary_uris}", 

419 ) 

420 

421 if preserve_path: 

422 # No need to run these twice 

423 with self.assertRaises(ValueError): 

424 butler.retrieveArtifacts([ref], destination, transfer="move") 

425 

426 with self.assertRaisesRegex( 

427 ValueError, "^Destination location must refer to a directory" 

428 ): 

429 butler.retrieveArtifacts( 

430 [ref], ResourcePath("/some/file.txt", forceDirectory=False) 

431 ) 

432 

433 with self.assertRaises(FileExistsError): 

434 butler.retrieveArtifacts([ref], destination) 

435 

436 transferred_again = butler.retrieveArtifacts( 

437 [ref], destination, preserve_path=preserve_path, overwrite=True 

438 ) 

439 self.assertEqual(set(transferred_again), set(transferred)) 

440 

441 # Now remove the dataset completely. 

442 butler.pruneDatasets([ref], purge=True, unstore=True) 

443 # Lookup with original args should still fail. 

444 kwargs = {"collections": this_run} 

445 if isinstance(args[0], DatasetRef): 

446 kwargs = {} # Prevent warning from being issued. 

447 self.assertFalse(butler.exists(*args, **kwargs)) 

448 # get() should still fail. 

449 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)): 

450 butler.get(ref) 

451 # Registry shouldn't be able to find it by dataset_id anymore. 

452 self.assertIsNone(butler.get_dataset(ref.id)) 

453 

454 # Do explicit registry removal since we know they are 

455 # empty 

456 butler.collections.x_remove(this_run) 

457 expected_collections.remove(this_run) 

458 

459 # Create DatasetRef for put using default run. 

460 refIn = DatasetRef(datasetType, dataId, id=uuid.UUID(int=1), run=butler.run) 

461 

462 # Check that getDeferred fails with standalone ref. 

463 with self.assertRaises(LookupError): 

464 butler.getDeferred(refIn) 

465 

466 # Put the dataset again, since the last thing we did was remove it 

467 # and we want to use the default collection. 

468 ref = butler.put(metric, refIn) 

469 

470 # Get with parameters 

471 stop = 4 

472 sliced = butler.get(ref, parameters={"slice": slice(stop)}) 

473 self.assertNotEqual(metric, sliced) 

474 self.assertEqual(metric.summary, sliced.summary) 

475 self.assertEqual(metric.output, sliced.output) 

476 assert metric.data is not None # for mypy 

477 self.assertEqual(metric.data[:stop], sliced.data) 

478 # getDeferred with parameters 

479 sliced = butler.getDeferred(ref, parameters={"slice": slice(stop)}).get() 

480 self.assertNotEqual(metric, sliced) 

481 self.assertEqual(metric.summary, sliced.summary) 

482 self.assertEqual(metric.output, sliced.output) 

483 self.assertEqual(metric.data[:stop], sliced.data) 

484 # getDeferred with deferred parameters 

485 sliced = butler.getDeferred(ref).get(parameters={"slice": slice(stop)}) 

486 self.assertNotEqual(metric, sliced) 

487 self.assertEqual(metric.summary, sliced.summary) 

488 self.assertEqual(metric.output, sliced.output) 

489 self.assertEqual(metric.data[:stop], sliced.data) 

490 

491 if storageClass.isComposite(): 

492 # Check that components can be retrieved 

493 metricOut = butler.get(ref.datasetType.name, dataId) 

494 compNameS = ref.datasetType.componentTypeName("summary") 

495 compNameD = ref.datasetType.componentTypeName("data") 

496 summary = butler.get(compNameS, dataId) 

497 self.assertEqual(summary, metric.summary) 

498 data = butler.get(compNameD, dataId) 

499 self.assertEqual(data, metric.data) 

500 

501 if "counter" in storageClass.derivedComponents: 

502 count = butler.get(ref.datasetType.componentTypeName("counter"), dataId) 

503 self.assertEqual(count, len(data)) 

504 

505 count = butler.get( 

506 ref.datasetType.componentTypeName("counter"), dataId, parameters={"slice": slice(stop)} 

507 ) 

508 self.assertEqual(count, stop) 

509 

510 compRef = butler.find_dataset(compNameS, dataId, collections=butler.collections.defaults) 

511 assert compRef is not None 

512 summary = butler.get(compRef) 

513 self.assertEqual(summary, metric.summary) 

514 

515 # Create a Dataset type that has the same name but is inconsistent. 

516 inconsistentDatasetType = DatasetType( 

517 datasetTypeName, datasetType.dimensions, self.storageClassFactory.getStorageClass("Config") 

518 ) 

519 

520 # Getting with a dataset type that does not match registry fails 

521 with self.assertRaisesRegex( 

522 ValueError, 

523 "(Supplied dataset type .* inconsistent with registry)" 

524 "|(The new storage class .* is not compatible with the existing storage class)", 

525 ): 

526 butler.get(inconsistentDatasetType, dataId) 

527 

528 # Combining a DatasetRef with a dataId should fail 

529 with self.assertRaisesRegex(ValueError, "DatasetRef given, cannot use dataId as well"): 

530 butler.get(ref, dataId) 

531 # Getting with an explicit ref should fail if the id doesn't match. 

532 with self.assertRaises((FileNotFoundError, DatasetNotFoundError)): 

533 butler.get(DatasetRef(ref.datasetType, ref.dataId, id=uuid.UUID(int=101), run=butler.run)) 

534 

535 # Getting a dataset with unknown parameters should fail 

536 with self.assertRaisesRegex(KeyError, "Parameter 'unsupported' not understood"): 

537 butler.get(ref, parameters={"unsupported": True}) 

538 

539 # Check we have a collection 

540 collections = set(butler.collections.query("*")) 

541 self.assertEqual(collections, expected_collections) 

542 

543 # Clean up to check that we can remove something that may have 

544 # already had a component removed 

545 butler.pruneDatasets([ref], unstore=True, purge=True) 

546 

547 # Add the same ref again, so we can check that duplicate put fails. 

548 ref = butler.put(metric, datasetType, dataId) 

549 

550 # Repeat put will fail. 

551 with self.assertRaisesRegex( 

552 ConflictingDefinitionError, "A database constraint failure was triggered" 

553 ): 

554 butler.put(metric, datasetType, dataId) 

555 

556 # Remove the datastore entry. 

557 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False) 

558 

559 # Put will still fail 

560 with self.assertRaisesRegex( 

561 ConflictingDefinitionError, "A database constraint failure was triggered" 

562 ): 

563 butler.put(metric, datasetType, dataId) 

564 

565 # Repeat the same sequence with resolved ref. 

566 butler.pruneDatasets([ref], unstore=True, purge=True) 

567 ref = butler.put(metric, refIn) 

568 

569 # Repeat put will fail. 

570 with self.assertRaisesRegex(ConflictingDefinitionError, "Datastore already contains dataset"): 

571 butler.put(metric, refIn) 

572 

573 # Remove the datastore entry. 

574 butler.pruneDatasets([ref], unstore=True, purge=False, disassociate=False) 

575 

576 # In case of resolved ref this write will succeed. 

577 ref = butler.put(metric, refIn) 

578 

579 # Leave the dataset in place since some downstream tests require 

580 # something to be present 

581 

582 return butler 

583 

584 def testDeferredCollectionPassing(self) -> None: 

585 # Construct a butler with no run or collection, but make it writeable. 

586 butler = self.create_empty_butler(writeable=True) 

587 # Create and register a DatasetType 

588 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

589 datasetType = self.addDatasetType( 

590 "example", dimensions, self.storageClassFactory.getStorageClass("StructuredData"), butler.registry 

591 ) 

592 # Add needed Dimensions 

593 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

594 butler.registry.insertDimensionData( 

595 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

596 ) 

597 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

598 butler.registry.insertDimensionData( 

599 "visit", 

600 { 

601 "instrument": "DummyCamComp", 

602 "id": 423, 

603 "name": "fourtwentythree", 

604 "physical_filter": "d-r", 

605 "day_obs": 20250101, 

606 }, 

607 ) 

608 dataId = {"instrument": "DummyCamComp", "visit": 423} 

609 # Create dataset. 

610 metric = makeExampleMetrics() 

611 # Register a new run and put dataset. 

612 run = "deferred" 

613 self.assertTrue(butler.collections.register(run)) 

614 # Second time it will be allowed but indicate no-op 

615 self.assertFalse(butler.collections.register(run)) 

616 ref = butler.put(metric, datasetType, dataId, run=run) 

617 # Putting with no run should fail with TypeError. 

618 with self.assertRaises(CollectionError): 

619 butler.put(metric, datasetType, dataId) 

620 # Dataset should exist. 

621 self.assertTrue(butler.exists(datasetType, dataId, collections=[run])) 

622 # We should be able to get the dataset back, but with and without 

623 # a deferred dataset handle. 

624 self.assertEqual(metric, butler.get(datasetType, dataId, collections=[run])) 

625 self.assertEqual(metric, butler.getDeferred(datasetType, dataId, collections=[run]).get()) 

626 # Trying to find the dataset without any collection is an error. 

627 with self.assertRaises(NoDefaultCollectionError): 

628 butler.exists(datasetType, dataId) 

629 with self.assertRaises(CollectionError): 

630 butler.get(datasetType, dataId) 

631 # Associate the dataset with a different collection. 

632 butler.collections.register("tagged", type=CollectionType.TAGGED) 

633 butler.registry.associate("tagged", [ref]) 

634 # Deleting the dataset from the new collection should make it findable 

635 # in the original collection. 

636 butler.pruneDatasets([ref], tags=["tagged"]) 

637 self.assertTrue(butler.exists(datasetType, dataId, collections=[run])) 

638 

639 

640class ButlerTests(ButlerPutGetTests): 

641 """Tests for Butler.""" 

642 

643 useTempRoot = True 

644 validationCanFail: bool 

645 fullConfigKey: str | None 

646 registryStr: str | None 

647 datastoreName: list[str] | None 

648 datastoreStr: list[str] 

649 predictionSupported = True 

650 """Does getURIs support 'prediction mode'?""" 

651 

652 def setUp(self) -> None: 

653 """Create a new butler root for each test.""" 

654 self.root = makeTestTempDir(TESTDIR) 

655 Butler.makeRepo(self.root, config=Config(self.configFile)) 

656 self.tmpConfigFile = os.path.join(self.root, "butler.yaml") 

657 

658 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool: 

659 """Return True if two URIs refer to the same resource. 

660 

661 Subclasses may override to handle unique requirements. 

662 """ 

663 return uri1 == uri2 

664 

665 def testConstructor(self) -> None: 

666 """Independent test of constructor.""" 

667 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

668 self.enterContext(butler) 

669 self.assertIsInstance(butler, Butler) 

670 

671 # Check that butler.yaml is added automatically. 

672 if self.tmpConfigFile.endswith(end := "/butler.yaml"): 

673 config_dir = self.tmpConfigFile[: -len(end)] 

674 butler = Butler.from_config(config_dir, run=self.default_run) 

675 self.enterContext(butler) 

676 self.assertIsInstance(butler, Butler) 

677 

678 # Even with a ResourcePath. 

679 butler = Butler.from_config(ResourcePath(config_dir, forceDirectory=True), run=self.default_run) 

680 self.enterContext(butler) 

681 self.assertIsInstance(butler, Butler) 

682 

683 collections = set(butler.collections.query("*")) 

684 self.assertEqual(collections, {self.default_run}) 

685 

686 # Check that some special characters can be included in run name. 

687 special_run = "u@b.c-A" 

688 butler_special = Butler.from_config(butler=butler, run=special_run) 

689 self.enterContext(butler_special) 

690 collections = set(butler_special.registry.queryCollections("*@*")) 

691 self.assertEqual(collections, {special_run}) 

692 

693 butler2 = Butler.from_config(butler=butler, collections=["other"]) 

694 self.enterContext(butler2) 

695 self.assertEqual(butler2.collections.defaults, ("other",)) 

696 self.assertIsNone(butler2.run) 

697 self.assertEqual(type(butler._datastore), type(butler2._datastore)) 

698 self.assertEqual(butler._datastore.config, butler2._datastore.config) 

699 

700 # Test that we can use an environment variable to find this 

701 # repository. 

702 butler_index = Config() 

703 butler_index["label"] = self.tmpConfigFile 

704 for suffix in (".yaml", ".json"): 

705 # Ensure that the content differs so that we know that 

706 # we aren't reusing the cache. 

707 bad_label = f"file://bucket/not_real{suffix}" 

708 butler_index["bad_label"] = bad_label 

709 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

710 butler_index.dumpToUri(temp_file) 

711 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

712 self.assertEqual(Butler.get_known_repos(), {"label", "bad_label"}) 

713 uri = Butler.get_repo_uri("bad_label") 

714 self.assertEqual(uri, ResourcePath(bad_label)) 

715 uri = Butler.get_repo_uri("label") 

716 butler = Butler.from_config(uri, writeable=False) 

717 self.assertIsInstance(butler, Butler) 

718 butler.close() 

719 butler = Butler.from_config("label", writeable=False) 

720 self.assertIsInstance(butler, Butler) 

721 butler.close() 

722 with self.assertRaisesRegex(FileNotFoundError, "aliases:.*bad_label"): 

723 Butler.from_config("not_there", writeable=False) 

724 with self.assertRaisesRegex(FileNotFoundError, "resolved from alias 'bad_label'"): 

725 Butler.from_config("bad_label") 

726 with self.assertRaises(FileNotFoundError): 

727 # Should ignore aliases. 

728 Butler.from_config(ResourcePath("label", forceAbsolute=False)) 

729 with self.assertRaises(KeyError) as cm: 

730 Butler.get_repo_uri("missing") 

731 self.assertEqual( 

732 Butler.get_repo_uri("missing", True), ResourcePath("missing", forceAbsolute=False) 

733 ) 

734 self.assertIn("not known to", str(cm.exception)) 

735 # Should report no failure. 

736 self.assertEqual(ButlerRepoIndex.get_failure_reason(), "") 

737 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

738 # Now with empty configuration. 

739 butler_index = Config() 

740 butler_index.dumpToUri(temp_file) 

741 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

742 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases)"): 

743 Butler.from_config("label") 

744 with ResourcePath.temporary_uri(suffix=suffix) as temp_file: 

745 # Now with bad contents. 

746 with open(temp_file.ospath, "w") as fh: 

747 print("'", file=fh) 

748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": str(temp_file)}): 

749 with self.assertRaisesRegex(FileNotFoundError, "(no known aliases:.*could not be read)"): 

750 Butler.from_config("label") 

751 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_REPOSITORY_INDEX": "file://not_found/x.yaml"}): 

752 with self.assertRaises(FileNotFoundError): 

753 Butler.get_repo_uri("label") 

754 self.assertEqual(Butler.get_known_repos(), set()) 

755 

756 with self.assertRaisesRegex(FileNotFoundError, "index file not found"): 

757 Butler.from_config("label") 

758 

759 # Check that we can create Butler when the alias file is not found. 

760 butler = Butler.from_config(self.tmpConfigFile, writeable=False) 

761 self.enterContext(butler) 

762 self.assertIsInstance(butler, Butler) 

763 with self.assertRaises(RuntimeError) as cm: 

764 # No environment variable set. 

765 Butler.get_repo_uri("label") 

766 self.assertEqual(Butler.get_repo_uri("label", True), ResourcePath("label", forceAbsolute=False)) 

767 self.assertIn("No repository index defined", str(cm.exception)) 

768 with self.assertRaisesRegex(FileNotFoundError, "no known aliases.*No repository index"): 

769 # No aliases registered. 

770 Butler.from_config("not_there") 

771 self.assertEqual(Butler.get_known_repos(), set()) 

772 

773 def testClose(self): 

774 butler = self.create_empty_butler(cleanup=False) 

775 is_direct_butler = isinstance(butler, DirectButler) 

776 if is_direct_butler: 

777 self.assertFalse(butler._closed) 

778 

779 with butler as butler_from_context_manager: 

780 self.assertIs(butler, butler_from_context_manager) 

781 if is_direct_butler: 

782 self.assertTrue(butler._closed) 

783 with self.assertRaisesRegex(RuntimeError, "has been closed"): 

784 butler.get_dataset_type("raw") 

785 

786 # Close may be called multiple times. 

787 butler.close() 

788 if is_direct_butler: 

789 self.assertTrue(butler._closed) 

790 

791 def testGarbageCollection(self): 

792 """Test that Butler does not have any circular references that prevent 

793 it from being garbage collected immediately when it goes out of scope. 

794 """ 

795 butler = self.create_empty_butler(cleanup=False) 

796 is_direct_butler = isinstance(butler, DirectButler) 

797 butler_ref = weakref.ref(butler) 

798 if is_direct_butler: 

799 registry_ref = weakref.ref(butler._registry) 

800 managers_ref = weakref.ref(butler._registry._managers) 

801 datastore_ref = weakref.ref(butler._datastore) 

802 db_ref = weakref.ref(butler._registry._db) 

803 engine_ref = weakref.ref(butler._registry._db._engine) 

804 

805 with warnings.catch_warnings(): 

806 # Hide warnings from unclosed database handles. 

807 warnings.simplefilter("ignore", ResourceWarning) 

808 del butler 

809 self.assertIsNone(butler_ref(), "Butler should have been garbage collected") 

810 if is_direct_butler: 

811 self.assertIsNone(registry_ref(), "SqlRegistry should have been garbage collected") 

812 self.assertIsNone(managers_ref(), "Registry managers should have been garbage collected") 

813 self.assertIsNone(datastore_ref(), "Datastore should have been garbage collected") 

814 self.assertIsNone(db_ref(), "Database should have been garbage collected") 

815 # SQLAlchemy has internal reference cycles, so the Engine instance 

816 # is not cleaned up promptly even if we release our reference to 

817 # it. Explicitly clean it up here to avoid file handles leaking. 

818 if is_direct_butler: 

819 engine = engine_ref() 

820 if engine is not None: 

821 engine.dispose() 

822 

823 def testDafButlerRepositories(self): 

824 with unittest.mock.patch.dict( 

825 os.environ, 

826 {"DAF_BUTLER_REPOSITORIES": "label: 'https://someuri.com'\notherLabel: 'https://otheruri.com'\n"}, 

827 ): 

828 self.assertEqual(str(Butler.get_repo_uri("label")), "https://someuri.com") 

829 

830 with unittest.mock.patch.dict( 

831 os.environ, 

832 { 

833 "DAF_BUTLER_REPOSITORIES": "label: https://someuri.com", 

834 "DAF_BUTLER_REPOSITORY_INDEX": "https://someuri.com", 

835 }, 

836 ): 

837 with self.assertRaisesRegex(RuntimeError, "Only one of the environment variables"): 

838 Butler.get_repo_uri("label") 

839 

840 with unittest.mock.patch.dict( 

841 os.environ, 

842 {"DAF_BUTLER_REPOSITORIES": "invalid"}, 

843 ): 

844 with self.assertRaisesRegex(ValueError, "Repository index not in expected format"): 

845 Butler.get_repo_uri("label") 

846 

847 def testBasicPutGet(self) -> None: 

848 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

849 self.runPutGetTest(storageClass, "test_metric") 

850 

851 def testCompositePutGetConcrete(self) -> None: 

852 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadCompNoDisassembly") 

853 butler = self.runPutGetTest(storageClass, "test_metric") 

854 

855 # Should *not* be disassembled 

856 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run)) 

857 self.assertEqual(len(datasets), 1) 

858 uri, components = butler.getURIs(datasets[0]) 

859 self.assertIsInstance(uri, ResourcePath) 

860 self.assertFalse(components) 

861 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}") 

862 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}") 

863 

864 # Predicted dataset 

865 if self.predictionSupported: 

866 dataId = {"instrument": "DummyCamComp", "visit": 424} 

867 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True) 

868 self.assertFalse(components) 

869 self.assertIsInstance(uri, ResourcePath) 

870 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}") 

871 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}") 

872 # Repeat with a DatasetRef to test that code path. 

873 ref = DatasetRef( 

874 datasets[0].datasetType, 

875 dataId=DataCoordinate.standardize(dataId, universe=butler.dimensions), 

876 run=self.default_run, 

877 ) 

878 uri2, components2 = butler.getURIs(ref, predict=True) 

879 self.assertFalse(components2) 

880 self.assertEqual(uri, uri2) 

881 

882 def testCompositePutGetVirtual(self) -> None: 

883 storageClass = self.storageClassFactory.getStorageClass("StructuredCompositeReadComp") 

884 butler = self.runPutGetTest(storageClass, "test_metric_comp") 

885 

886 # Should be disassembled 

887 datasets = list(butler.registry.queryDatasets(..., collections=self.default_run)) 

888 self.assertEqual(len(datasets), 1) 

889 uri, components = butler.getURIs(datasets[0]) 

890 

891 if butler._datastore.isEphemeral: 

892 # Never disassemble in-memory datastore 

893 self.assertIsInstance(uri, ResourcePath) 

894 self.assertFalse(components) 

895 self.assertEqual(uri.fragment, "", f"Checking absence of fragment in {uri}") 

896 self.assertIn("423", str(uri), f"Checking visit is in URI {uri}") 

897 else: 

898 self.assertIsNone(uri) 

899 self.assertEqual(set(components), set(storageClass.components)) 

900 for compuri in components.values(): 

901 self.assertIsInstance(compuri, ResourcePath) 

902 self.assertIn("423", str(compuri), f"Checking visit is in URI {compuri}") 

903 self.assertEqual(compuri.fragment, "", f"Checking absence of fragment in {compuri}") 

904 

905 if self.predictionSupported: 

906 # Predicted dataset 

907 dataId = {"instrument": "DummyCamComp", "visit": 424} 

908 uri, components = butler.getURIs(datasets[0].datasetType, dataId=dataId, predict=True) 

909 

910 if butler._datastore.isEphemeral: 

911 # Never disassembled 

912 self.assertIsInstance(uri, ResourcePath) 

913 self.assertFalse(components) 

914 self.assertIn("424", str(uri), f"Checking visit is in URI {uri}") 

915 self.assertEqual(uri.fragment, "predicted", f"Checking for fragment in {uri}") 

916 else: 

917 self.assertIsNone(uri) 

918 self.assertEqual(set(components), set(storageClass.components)) 

919 for compuri in components.values(): 

920 self.assertIsInstance(compuri, ResourcePath) 

921 self.assertIn("424", str(compuri), f"Checking visit is in URI {compuri}") 

922 self.assertEqual(compuri.fragment, "predicted", f"Checking for fragment in {compuri}") 

923 

924 def testStorageClassOverrideGet(self) -> None: 

925 """Test storage class conversion on get with override.""" 

926 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

927 datasetTypeName = "anything" 

928 run = self.default_run 

929 

930 butler, datasetType = self.create_butler(run, storageClass, datasetTypeName) 

931 

932 # Create and store a dataset. 

933 metric = makeExampleMetrics() 

934 dataId = {"instrument": "DummyCamComp", "visit": 423} 

935 

936 ref = butler.put(metric, datasetType, dataId) 

937 

938 # Return native type. 

939 retrieved = butler.get(ref) 

940 self.assertEqual(retrieved, metric) 

941 

942 # Specify an override. 

943 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion") 

944 model = butler.get(ref, storageClass=new_sc) 

945 self.assertNotEqual(type(model), type(retrieved)) 

946 self.assertIs(type(model), new_sc.pytype) 

947 self.assertEqual(retrieved, model) 

948 

949 # Defer but override later. 

950 deferred = butler.getDeferred(ref) 

951 model = deferred.get(storageClass=new_sc) 

952 self.assertIs(type(model), new_sc.pytype) 

953 self.assertEqual(retrieved, model) 

954 

955 # Defer but override up front. 

956 deferred = butler.getDeferred(ref, storageClass=new_sc) 

957 model = deferred.get() 

958 self.assertIs(type(model), new_sc.pytype) 

959 self.assertEqual(retrieved, model) 

960 

961 # Retrieve a component. Should be a tuple. 

962 data = butler.get("anything.data", dataId, storageClass="StructuredDataDataTestTuple") 

963 self.assertIs(type(data), tuple) 

964 self.assertEqual(data, tuple(retrieved.data)) 

965 

966 # Parameter on the write storage class should work regardless 

967 # of read storage class. 

968 data = butler.get( 

969 "anything.data", 

970 dataId, 

971 storageClass="StructuredDataDataTestTuple", 

972 parameters={"slice": slice(2, 4)}, 

973 ) 

974 self.assertEqual(len(data), 2) 

975 

976 # Try a parameter that is known to the read storage class but not 

977 # the write storage class. 

978 with self.assertRaises(KeyError): 

979 butler.get( 

980 "anything.data", 

981 dataId, 

982 storageClass="StructuredDataDataTestTuple", 

983 parameters={"xslice": slice(2, 4)}, 

984 ) 

985 

986 def testPytypePutCoercion(self) -> None: 

987 """Test python type coercion on Butler.get and put.""" 

988 # Store some data with the normal example storage class. 

989 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

990 datasetTypeName = "test_metric" 

991 butler, _ = self.create_butler(self.default_run, storageClass, datasetTypeName) 

992 

993 dataId = {"instrument": "DummyCamComp", "visit": 423} 

994 

995 # Put a dict and this should coerce to a MetricsExample 

996 test_dict = {"summary": {"a": 1}, "output": {"b": 2}} 

997 metric_ref = butler.put(test_dict, datasetTypeName, dataId=dataId, visit=424) 

998 test_metric = butler.get(metric_ref) 

999 self.assertEqual(get_full_type_name(test_metric), "lsst.daf.butler.tests.MetricsExample") 

1000 self.assertEqual(test_metric.summary, test_dict["summary"]) 

1001 self.assertEqual(test_metric.output, test_dict["output"]) 

1002 

1003 # Check that the put still works if a DatasetType is given with 

1004 # a definition matching this python type. 

1005 registry_type = butler.get_dataset_type(datasetTypeName) 

1006 this_type = DatasetType(datasetTypeName, registry_type.dimensions, "StructuredDataDictJson") 

1007 metric2_ref = butler.put(test_dict, this_type, dataId=dataId, visit=425) 

1008 self.assertEqual(metric2_ref.datasetType, registry_type) 

1009 

1010 # The get will return the type expected by registry. 

1011 test_metric2 = butler.get(metric2_ref) 

1012 self.assertEqual(get_full_type_name(test_metric2), "lsst.daf.butler.tests.MetricsExample") 

1013 

1014 # Make a new DatasetRef with the compatible but different DatasetType. 

1015 # This should now return a dict. 

1016 new_ref = DatasetRef(this_type, metric2_ref.dataId, id=metric2_ref.id, run=metric2_ref.run) 

1017 test_dict2 = butler.get(new_ref) 

1018 self.assertEqual(get_full_type_name(test_dict2), "dict") 

1019 

1020 # Get it again with the wrong dataset type definition using get() 

1021 # rather than get(). This should be consistent with get() 

1022 # behavior and return the type of the DatasetType. 

1023 test_dict3 = butler.get(this_type, dataId=dataId, visit=425) 

1024 self.assertEqual(get_full_type_name(test_dict3), "dict") 

1025 

1026 def test_ingest_zip(self) -> None: 

1027 """Create butler, export data, delete data, import from Zip.""" 

1028 butler, dataset_type = self.create_butler( 

1029 run=self.default_run, storageClass="StructuredData", datasetTypeName="metrics" 

1030 ) 

1031 

1032 metric = makeExampleMetrics() 

1033 refs = [] 

1034 for visit in (423, 424, 425): 

1035 ref = butler.put(metric, dataset_type, instrument="DummyCamComp", visit=visit) 

1036 refs.append(ref) 

1037 

1038 # Retrieve a Zip file. 

1039 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

1040 zip = butler.retrieve_artifacts_zip(refs, destination=tmpdir) 

1041 

1042 # Ingest will fail. 

1043 with self.assertRaises(ConflictingDefinitionError): 

1044 butler.ingest_zip(zip) 

1045 

1046 # Clear out the collection. 

1047 butler.removeRuns([self.default_run]) 

1048 self.assertFalse(butler.exists(refs[0])) 

1049 

1050 butler.ingest_zip(zip, transfer="copy") 

1051 self.assertGreater(butler._metrics.time_in_ingest, 0.0) 

1052 self.assertEqual(butler._metrics.n_ingest, len(refs)) 

1053 

1054 # Check that it fails if we try it again. 

1055 with self.assertRaises(ConflictingDefinitionError): 

1056 butler.ingest_zip(zip, transfer="copy") 

1057 

1058 # This will be a no-op. 

1059 butler.ingest_zip(zip, transfer="copy", skip_existing=True) 

1060 

1061 # Create an entirely new local file butler in this temp directory. 

1062 new_butler_cfg = Butler.makeRepo(tmpdir) 

1063 new_butler = Butler.from_config(new_butler_cfg, writeable=True) 

1064 self.enterContext(new_butler) 

1065 

1066 # This will fail since dimensions records are missing. 

1067 with self.assertRaises(ConflictingDefinitionError): 

1068 new_butler.ingest_zip(zip, transfer="copy") 

1069 

1070 # Dry run should work. 

1071 new_butler.ingest_zip(zip, transfer="copy", dry_run=True) 

1072 

1073 new_butler.ingest_zip(zip, transfer="copy", transfer_dimensions=True) 

1074 self.assertTrue(butler.exists(refs[0])) 

1075 

1076 # Check that the refs can be read again. 

1077 _ = [butler.get(ref) for ref in refs] 

1078 

1079 uri = butler.getURI(refs[2]) 

1080 self.assertTrue(uri.exists()) 

1081 

1082 # Delete one dataset. The Zip file should still exist and allow 

1083 # remaining refs to be read. 

1084 butler.pruneDatasets([refs[0]], purge=True, unstore=True) 

1085 self.assertTrue(uri.exists()) 

1086 

1087 metric2 = butler.get(refs[1]) 

1088 self.assertEqual(metric2, metric, msg=f"{metric2} != {metric}") 

1089 

1090 butler.removeRuns([self.default_run]) 

1091 self.assertFalse(uri.exists()) 

1092 self.assertFalse(butler.exists(refs[-1])) 

1093 

1094 with self.assertRaises(ValueError): 

1095 butler.retrieve_artifacts_zip([], destination=".") 

1096 

1097 def testIngest(self) -> None: 

1098 butler = self.create_empty_butler(run=self.default_run) 

1099 

1100 # Create and register a DatasetType 

1101 dimensions = butler.dimensions.conform(["instrument", "visit", "detector"]) 

1102 

1103 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDictYaml") 

1104 datasetTypeName = "metric" 

1105 

1106 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1107 

1108 # Add needed Dimensions 

1109 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1110 butler.registry.insertDimensionData( 

1111 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1112 ) 

1113 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

1114 for detector in (1, 2): 

1115 butler.registry.insertDimensionData( 

1116 "detector", {"instrument": "DummyCamComp", "id": detector, "full_name": f"detector{detector}"} 

1117 ) 

1118 

1119 butler.registry.insertDimensionData( 

1120 "visit", 

1121 { 

1122 "instrument": "DummyCamComp", 

1123 "id": 423, 

1124 "name": "fourtwentythree", 

1125 "physical_filter": "d-r", 

1126 "day_obs": 20250101, 

1127 }, 

1128 { 

1129 "instrument": "DummyCamComp", 

1130 "id": 424, 

1131 "name": "fourtwentyfour", 

1132 "physical_filter": "d-r", 

1133 "day_obs": 20250101, 

1134 }, 

1135 ) 

1136 

1137 formatter = doImportType("lsst.daf.butler.formatters.yaml.YamlFormatter") 

1138 dataRoot = os.path.join(TESTDIR, "data", "basic") 

1139 datasets = [] 

1140 # Test one DatasetRef with a run that exists, and the other with a run 

1141 # that doesn't exist, to verify that run collections are created when 

1142 # required. 

1143 runs = {1: self.default_run, 2: "a/new/run"} 

1144 for detector in (1, 2): 

1145 detector_name = f"detector_{detector}" 

1146 metricFile = os.path.join(dataRoot, f"{detector_name}.yaml") 

1147 dataId = butler.registry.expandDataId( 

1148 {"instrument": "DummyCamComp", "visit": 423, "detector": detector} 

1149 ) 

1150 # Create a DatasetRef for ingest 

1151 refIn = DatasetRef(datasetType, dataId, run=runs[detector]) 

1152 

1153 datasets.append(FileDataset(path=metricFile, refs=[refIn], formatter=formatter)) 

1154 

1155 butler.ingest(*datasets, transfer="copy") 

1156 

1157 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 423} 

1158 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 423} 

1159 

1160 metrics1 = butler.get(datasetTypeName, dataId1) 

1161 metrics2 = butler.get(datasetTypeName, dataId2, collections="a/new/run") 

1162 self.assertNotEqual(metrics1, metrics2) 

1163 

1164 # Compare URIs 

1165 uri1 = butler.getURI(datasetTypeName, dataId1) 

1166 uri2 = butler.getURI(datasetTypeName, dataId2, collections="a/new/run") 

1167 self.assertFalse(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}") 

1168 

1169 # Re-ingesting the same datasets raises an error with 

1170 # skip_existing=False. 

1171 with self.assertRaises(ConflictingDefinitionError): 

1172 butler.ingest(*datasets, transfer="copy") 

1173 # skip_existing=True makes it a no-op to re-ingest the same datasets. 

1174 butler.ingest(*datasets, transfer="copy", skip_existing=True) 

1175 

1176 # Now do a multi-dataset but single file ingest 

1177 metricFile = os.path.join(dataRoot, "detectors.yaml") 

1178 refs = [] 

1179 for detector in (1, 2): 

1180 detector_name = f"detector_{detector}" 

1181 dataId = butler.registry.expandDataId( 

1182 {"instrument": "DummyCamComp", "visit": 424, "detector": detector} 

1183 ) 

1184 # Create a DatasetRef for ingest 

1185 refs.append(DatasetRef(datasetType, dataId, run=self.default_run)) 

1186 

1187 # Test "move" transfer to ensure that the files themselves 

1188 # have disappeared following ingest. 

1189 with ResourcePath.temporary_uri(suffix=".yaml") as tempFile: 

1190 tempFile.transfer_from(ResourcePath(metricFile), transfer="copy") 

1191 

1192 datasets = [] 

1193 datasets.append(FileDataset(path=tempFile, refs=refs, formatter=MultiDetectorFormatter)) 

1194 

1195 # For first ingest use copy. 

1196 butler.ingest(*datasets, transfer="copy", record_validation_info=False) 

1197 

1198 # Now try to ingest again in "execution butler" mode where 

1199 # the registry entries exist but the datastore does not have 

1200 # the files. We also need to strip the dimension records to ensure 

1201 # that they will be re-added by the ingest. 

1202 ref = datasets[0].refs[0] 

1203 datasets[0].refs = [ 

1204 cast( 

1205 DatasetRef, 

1206 butler.find_dataset(ref.datasetType, data_id=ref.dataId, collections=ref.run), 

1207 ) 

1208 for ref in datasets[0].refs 

1209 ] 

1210 all_refs = [] 

1211 for dataset in datasets: 

1212 refs = [] 

1213 for ref in dataset.refs: 

1214 # Create a dict from the dataId to drop the records. 

1215 new_data_id = dict(ref.dataId.required) 

1216 new_ref = butler.find_dataset(ref.datasetType, new_data_id, collections=ref.run) 

1217 assert new_ref is not None 

1218 self.assertFalse(new_ref.dataId.hasRecords()) 

1219 refs.append(new_ref) 

1220 dataset.refs = refs 

1221 all_refs.extend(dataset.refs) 

1222 butler.pruneDatasets(all_refs, disassociate=False, unstore=True, purge=False) 

1223 

1224 # Use move mode to test that the file is deleted. Also 

1225 # disable recording of file size. 

1226 butler.ingest(*datasets, transfer="move", record_validation_info=False) 

1227 

1228 # Check that every ref now has records. 

1229 for dataset in datasets: 

1230 for ref in dataset.refs: 

1231 self.assertTrue(ref.dataId.hasRecords()) 

1232 

1233 # Ensure that the file has disappeared. 

1234 self.assertFalse(tempFile.exists()) 

1235 

1236 # Check that the datastore recorded no file size. 

1237 # Not all datastores can support this. 

1238 try: 

1239 infos = butler._datastore.getStoredItemsInfo(datasets[0].refs[0]) # type: ignore[attr-defined] 

1240 self.assertEqual(infos[0].file_size, -1) 

1241 except AttributeError: 

1242 pass 

1243 

1244 dataId1 = {"instrument": "DummyCamComp", "detector": 1, "visit": 424} 

1245 dataId2 = {"instrument": "DummyCamComp", "detector": 2, "visit": 424} 

1246 

1247 multi1 = butler.get(datasetTypeName, dataId1) 

1248 multi2 = butler.get(datasetTypeName, dataId2) 

1249 

1250 self.assertEqual(multi1, metrics1) 

1251 self.assertEqual(multi2, metrics2) 

1252 

1253 # Compare URIs 

1254 uri1 = butler.getURI(datasetTypeName, dataId1) 

1255 uri2 = butler.getURI(datasetTypeName, dataId2) 

1256 self.assertTrue(self.are_uris_equivalent(uri1, uri2), f"Cf. {uri1} with {uri2}") 

1257 

1258 # Test that removing one does not break the second 

1259 # This line will issue a warning log message for a ChainedDatastore 

1260 # that uses an InMemoryDatastore since in-memory can not ingest 

1261 # files. 

1262 butler.pruneDatasets([datasets[0].refs[0]], unstore=True, disassociate=False) 

1263 self.assertFalse(butler.exists(datasetTypeName, dataId1)) 

1264 self.assertTrue(butler.exists(datasetTypeName, dataId2)) 

1265 multi2b = butler.get(datasetTypeName, dataId2) 

1266 self.assertEqual(multi2, multi2b) 

1267 

1268 # Ensure we can ingest 0 datasets 

1269 datasets = [] 

1270 butler.ingest(*datasets) 

1271 

1272 def testPickle(self) -> None: 

1273 """Test pickle support.""" 

1274 butler = self.create_empty_butler(run=self.default_run) 

1275 assert isinstance(butler, DirectButler), "Expect DirectButler in configuration" 

1276 butlerOut = pickle.loads(pickle.dumps(butler)) 

1277 self.enterContext(butlerOut) 

1278 self.assertIsInstance(butlerOut, Butler) 

1279 self.assertEqual(butlerOut._config, butler._config) 

1280 self.assertEqual(list(butlerOut.collections.defaults), list(butler.collections.defaults)) 

1281 self.assertEqual(butlerOut.run, butler.run) 

1282 

1283 def testGetDatasetTypes(self) -> None: 

1284 butler = self.create_empty_butler(run=self.default_run) 

1285 dimensions = butler.dimensions.conform(["instrument", "visit", "physical_filter"]) 

1286 dimensionEntries: list[tuple[str, list[Mapping[str, Any]]]] = [ 

1287 ( 

1288 "instrument", 

1289 [ 

1290 {"instrument": "DummyCam"}, 

1291 {"instrument": "DummyHSC"}, 

1292 {"instrument": "DummyCamComp"}, 

1293 ], 

1294 ), 

1295 ("physical_filter", [{"instrument": "DummyCam", "name": "d-r", "band": "R"}]), 

1296 ("day_obs", [{"instrument": "DummyCam", "id": 20250101}]), 

1297 ( 

1298 "visit", 

1299 [ 

1300 { 

1301 "instrument": "DummyCam", 

1302 "id": 42, 

1303 "name": "fortytwo", 

1304 "physical_filter": "d-r", 

1305 "day_obs": 20250101, 

1306 } 

1307 ], 

1308 ), 

1309 ] 

1310 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1311 # Add needed Dimensions 

1312 for element, data in dimensionEntries: 

1313 butler.registry.insertDimensionData(element, *data) 

1314 

1315 # When a DatasetType is added to the registry entries are not created 

1316 # for components but querying them can return the components. 

1317 datasetTypeNames = {"metric", "metric2", "metric4", "metric33", "pvi", "paramtest"} 

1318 components = set() 

1319 for datasetTypeName in datasetTypeNames: 

1320 # Create and register a DatasetType 

1321 self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1322 

1323 for componentName in storageClass.components: 

1324 components.add(DatasetType.nameWithComponent(datasetTypeName, componentName)) 

1325 

1326 fromRegistry: set[DatasetType] = set() 

1327 for parent_dataset_type in butler.registry.queryDatasetTypes(): 

1328 fromRegistry.add(parent_dataset_type) 

1329 fromRegistry.update(parent_dataset_type.makeAllComponentDatasetTypes()) 

1330 self.assertEqual({d.name for d in fromRegistry}, datasetTypeNames | components) 

1331 

1332 # Query with wildcard. 

1333 dataset_types = butler.registry.queryDatasetTypes("metric*") 

1334 self.assertEqual(len(dataset_types), 4, f"Got: {dataset_types}") 

1335 # but not regex. 

1336 with self.assertRaises(DatasetTypeExpressionError): 

1337 butler.registry.queryDatasetTypes(["pvi", re.compile("metric.*")]) 

1338 

1339 # Now that we have some dataset types registered, validate them 

1340 butler.validateConfiguration( 

1341 ignore=[ 

1342 "test_metric_comp", 

1343 "metric3", 

1344 "metric5", 

1345 "calexp", 

1346 "DummySC", 

1347 "datasetType.component", 

1348 "random_data", 

1349 "random_data_2", 

1350 ] 

1351 ) 

1352 

1353 # Add a new datasetType that will fail template validation 

1354 self.addDatasetType("test_metric_comp", dimensions, storageClass, butler.registry) 

1355 if self.validationCanFail: 

1356 with self.assertRaises(ValidationError): 

1357 butler.validateConfiguration() 

1358 

1359 # Rerun validation but with a subset of dataset type names 

1360 butler.validateConfiguration(datasetTypeNames=["metric4"]) 

1361 

1362 # Rerun validation but ignore the bad datasetType 

1363 butler.validateConfiguration( 

1364 ignore=[ 

1365 "test_metric_comp", 

1366 "metric3", 

1367 "metric5", 

1368 "calexp", 

1369 "DummySC", 

1370 "datasetType.component", 

1371 "random_data", 

1372 "random_data_2", 

1373 ] 

1374 ) 

1375 

1376 def testTransaction(self) -> None: 

1377 butler = self.create_empty_butler(run=self.default_run) 

1378 datasetTypeName = "test_metric" 

1379 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

1380 dimensionEntries: tuple[tuple[str, Mapping[str, Any]], ...] = ( 

1381 ("instrument", {"instrument": "DummyCam"}), 

1382 ("physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "R"}), 

1383 ("day_obs", {"instrument": "DummyCam", "id": 20250101}), 

1384 ( 

1385 "visit", 

1386 { 

1387 "instrument": "DummyCam", 

1388 "id": 42, 

1389 "name": "fortytwo", 

1390 "physical_filter": "d-r", 

1391 "day_obs": 20250101, 

1392 }, 

1393 ), 

1394 ) 

1395 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1396 metric = makeExampleMetrics() 

1397 dataId = {"instrument": "DummyCam", "visit": 42} 

1398 # Create and register a DatasetType 

1399 datasetType = self.addDatasetType(datasetTypeName, dimensions, storageClass, butler.registry) 

1400 with self.assertRaises(TransactionTestError): 

1401 with butler.transaction(): 

1402 # Add needed Dimensions 

1403 for args in dimensionEntries: 

1404 butler.registry.insertDimensionData(*args) 

1405 # Store a dataset 

1406 ref = butler.put(metric, datasetTypeName, dataId) 

1407 self.assertIsInstance(ref, DatasetRef) 

1408 # Test get of a ref. 

1409 metricOut = butler.get(ref) 

1410 self.assertEqual(metric, metricOut) 

1411 # Test get 

1412 metricOut = butler.get(datasetTypeName, dataId) 

1413 self.assertEqual(metric, metricOut) 

1414 # Check we can get components 

1415 self.assertGetComponents(butler, ref, ("summary", "data", "output"), metric) 

1416 raise TransactionTestError("This should roll back the entire transaction") 

1417 with self.assertRaises(DataIdValueError, msg=f"Check can't expand DataId {dataId}"): 

1418 butler.registry.expandDataId(dataId) 

1419 # Should raise LookupError for missing data ID value 

1420 with self.assertRaises(LookupError, msg=f"Check can't get by {datasetTypeName} and {dataId}"): 

1421 butler.get(datasetTypeName, dataId) 

1422 # Also check explicitly if Dataset entry is missing 

1423 self.assertIsNone(butler.find_dataset(datasetType, dataId, collections=butler.collections.defaults)) 

1424 # Direct retrieval should not find the file in the Datastore 

1425 with self.assertRaises(FileNotFoundError, msg=f"Check {ref} can't be retrieved directly"): 

1426 butler.get(ref) 

1427 

1428 def testMakeRepo(self) -> None: 

1429 """Test that we can write butler configuration to a new repository via 

1430 the Butler.makeRepo interface and then instantiate a butler from the 

1431 repo root. 

1432 """ 

1433 # Do not run the test if we know this datastore configuration does 

1434 # not support a file system root 

1435 if self.fullConfigKey is None: 

1436 return 

1437 

1438 # create two separate directories 

1439 root1 = tempfile.mkdtemp(dir=self.root) 

1440 root2 = tempfile.mkdtemp(dir=self.root) 

1441 

1442 butlerConfig = Butler.makeRepo(root1, config=Config(self.configFile)) 

1443 limited = Config(self.configFile) 

1444 butler1 = Butler.from_config(butlerConfig) 

1445 self.enterContext(butler1) 

1446 assert isinstance(butler1, DirectButler), "Expect DirectButler in configuration" 

1447 butlerConfig = Butler.makeRepo(root2, standalone=True, config=Config(self.configFile)) 

1448 full = Config(self.tmpConfigFile) 

1449 butler2 = Butler.from_config(butlerConfig) 

1450 self.enterContext(butler2) 

1451 assert isinstance(butler2, DirectButler), "Expect DirectButler in configuration" 

1452 # Butlers should have the same configuration regardless of whether 

1453 # defaults were expanded. 

1454 self.assertEqual(butler1._config, butler2._config) 

1455 # Config files loaded directly should not be the same. 

1456 self.assertNotEqual(limited, full) 

1457 # Make sure "limited" doesn't have a few keys we know it should be 

1458 # inheriting from defaults. 

1459 self.assertIn(self.fullConfigKey, full) 

1460 self.assertNotIn(self.fullConfigKey, limited) 

1461 

1462 # Collections don't appear until something is put in them 

1463 collections1 = set(butler1.registry.queryCollections()) 

1464 self.assertEqual(collections1, set()) 

1465 self.assertEqual(set(butler2.registry.queryCollections()), collections1) 

1466 

1467 # Check that a config with no associated file name will not 

1468 # work properly with relocatable Butler repo 

1469 butlerConfig.configFile = None 

1470 with self.assertRaises(ValueError): 

1471 Butler.from_config(butlerConfig) 

1472 

1473 with self.assertRaises(FileExistsError): 

1474 Butler.makeRepo(self.root, standalone=True, config=Config(self.configFile), overwrite=False) 

1475 

1476 def testStringification(self) -> None: 

1477 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

1478 self.enterContext(butler) 

1479 butlerStr = str(butler) 

1480 

1481 if self.datastoreStr is not None: 

1482 for testStr in self.datastoreStr: 

1483 self.assertIn(testStr, butlerStr) 

1484 if self.registryStr is not None: 

1485 self.assertIn(self.registryStr, butlerStr) 

1486 

1487 datastoreName = butler._datastore.name 

1488 if self.datastoreName is not None: 

1489 for testStr in self.datastoreName: 

1490 self.assertIn(testStr, datastoreName) 

1491 

1492 def testButlerRewriteDataId(self) -> None: 

1493 """Test that dataIds can be rewritten based on dimension records.""" 

1494 butler = self.create_empty_butler(run=self.default_run) 

1495 

1496 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

1497 datasetTypeName = "random_data" 

1498 

1499 # Create dimension records. 

1500 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1501 butler.registry.insertDimensionData( 

1502 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1503 ) 

1504 butler.registry.insertDimensionData( 

1505 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"} 

1506 ) 

1507 

1508 dimensions = butler.dimensions.conform(["instrument", "exposure"]) 

1509 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

1510 butler.registry.registerDatasetType(datasetType) 

1511 

1512 n_exposures = 5 

1513 dayobs = 20210530 

1514 

1515 # Create records for multiple day_obs but same seq_num to test that 

1516 # we are constraining gets properly when day_obs/seq_num is used 

1517 # for an exposure. Second day is year in future but is not used. 

1518 for day_obs in (dayobs, dayobs + 1_00_00): 

1519 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": day_obs}) 

1520 

1521 for i in range(n_exposures): 

1522 group_name = f"group_{day_obs}_{i}" 

1523 butler.registry.insertDimensionData( 

1524 "group", {"instrument": "DummyCamComp", "name": group_name} 

1525 ) 

1526 butler.registry.insertDimensionData( 

1527 "exposure", 

1528 { 

1529 "instrument": "DummyCamComp", 

1530 "id": day_obs + i, 

1531 "obs_id": f"exp_{day_obs}_{i}", 

1532 "seq_num": i, 

1533 "day_obs": day_obs, 

1534 "physical_filter": "d-r", 

1535 "group": group_name, 

1536 }, 

1537 ) 

1538 

1539 # Write some data. 

1540 for i in range(n_exposures): 

1541 metric = {"something": i, "other": "metric", "list": [2 * x for x in range(i)]} 

1542 

1543 # Use the seq_num for the put to test rewriting. 

1544 dataId = {"seq_num": i, "day_obs": dayobs, "instrument": "DummyCamComp", "physical_filter": "d-r"} 

1545 ref = butler.put(metric, datasetTypeName, dataId=dataId) 

1546 

1547 # Check that the exposure is correct in the dataId 

1548 self.assertEqual(ref.dataId["exposure"], dayobs + i) 

1549 

1550 # and check that we can get the dataset back with the same dataId 

1551 new_metric = butler.get(datasetTypeName, dataId=dataId) 

1552 self.assertEqual(new_metric, metric) 

1553 

1554 # Check that we can find the datasets using the day_obs or the 

1555 # exposure.day_obs. 

1556 datasets_1 = list( 

1557 butler.registry.queryDatasets( 

1558 datasetType, 

1559 collections=self.default_run, 

1560 where="day_obs = :dayObs AND instrument = :instr", 

1561 bind={"dayObs": dayobs, "instr": "DummyCamComp"}, 

1562 ) 

1563 ) 

1564 datasets_2 = list( 

1565 butler.registry.queryDatasets( 

1566 datasetType, 

1567 collections=self.default_run, 

1568 where="exposure.day_obs = :dayObs AND instrument = :instr", 

1569 bind={"dayObs": dayobs, "instr": "DummyCamComp"}, 

1570 ) 

1571 ) 

1572 self.assertEqual(datasets_1, datasets_2) 

1573 

1574 def testGetDatasetCollectionCaching(self): 

1575 # Prior to DM-41117, there was a bug where get_dataset would throw 

1576 # MissingCollectionError if you tried to fetch a dataset that was added 

1577 # after the collection cache was last updated. 

1578 reader_butler, datasetType = self.create_butler(self.default_run, "int", "datasettypename") 

1579 writer_butler = self.create_empty_butler(writeable=True, run="new_run") 

1580 dataId = {"instrument": "DummyCamComp", "visit": 423} 

1581 put_ref = writer_butler.put(123, datasetType, dataId) 

1582 get_ref = reader_butler.get_dataset(put_ref.id) 

1583 self.assertEqual(get_ref.id, put_ref.id) 

1584 # Also works when looking up via a hexadecimal string instead of a UUID 

1585 # instance. 

1586 hex_ref = reader_butler.get_dataset(put_ref.id.hex) 

1587 self.assertEqual(hex_ref.id, put_ref.id) 

1588 

1589 def testCollectionChainRedefine(self): 

1590 butler = self._setup_to_test_collection_chain() 

1591 

1592 butler.collections.redefine_chain("chain", "a") 

1593 self._check_chain(butler, ["a"]) 

1594 

1595 # Duplicates are removed from the list of children 

1596 butler.collections.redefine_chain("chain", ["c", "b", "c"]) 

1597 self._check_chain(butler, ["c", "b"]) 

1598 

1599 # Empty list clears the chain 

1600 butler.collections.redefine_chain("chain", []) 

1601 self._check_chain(butler, []) 

1602 

1603 self._test_common_chain_functionality(butler, butler.collections.redefine_chain) 

1604 

1605 def testCollectionChainPrepend(self): 

1606 butler = self._setup_to_test_collection_chain() 

1607 

1608 # Duplicates are removed from the list of children 

1609 butler.collections.prepend_chain("chain", ["c", "b", "c"]) 

1610 self._check_chain(butler, ["c", "b"]) 

1611 

1612 # Prepend goes on the front of existing chain 

1613 butler.collections.prepend_chain("chain", ["a"]) 

1614 self._check_chain(butler, ["a", "c", "b"]) 

1615 

1616 # Empty prepend does nothing 

1617 butler.collections.prepend_chain("chain", []) 

1618 self._check_chain(butler, ["a", "c", "b"]) 

1619 

1620 # Prepending children that already exist in the chain removes them from 

1621 # their current position. 

1622 butler.collections.prepend_chain("chain", ["d", "b", "c"]) 

1623 self._check_chain(butler, ["d", "b", "c", "a"]) 

1624 

1625 self._test_common_chain_functionality(butler, butler.collections.prepend_chain) 

1626 

1627 def testCollectionChainExtend(self): 

1628 butler = self._setup_to_test_collection_chain() 

1629 

1630 # Duplicates are removed from the list of children 

1631 butler.collections.extend_chain("chain", ["c", "b", "c"]) 

1632 self._check_chain(butler, ["c", "b"]) 

1633 

1634 # Extend goes on the end of existing chain 

1635 butler.collections.extend_chain("chain", ["a"]) 

1636 self._check_chain(butler, ["c", "b", "a"]) 

1637 

1638 # Empty extend does nothing 

1639 butler.collections.extend_chain("chain", []) 

1640 self._check_chain(butler, ["c", "b", "a"]) 

1641 

1642 # Extending children that already exist in the chain removes them from 

1643 # their current position. 

1644 butler.collections.extend_chain("chain", ["d", "b", "c"]) 

1645 self._check_chain(butler, ["a", "d", "b", "c"]) 

1646 

1647 self._test_common_chain_functionality(butler, butler.collections.extend_chain) 

1648 

1649 def testCollectionChainRemove(self) -> None: 

1650 butler = self._setup_to_test_collection_chain() 

1651 

1652 butler.collections.redefine_chain("chain", ["a", "b", "c", "d"]) 

1653 

1654 butler.collections.remove_from_chain("chain", "c") 

1655 self._check_chain(butler, ["a", "b", "d"]) 

1656 

1657 # Duplicates are allowed in the list of children 

1658 butler.collections.remove_from_chain("chain", ["b", "b", "a"]) 

1659 self._check_chain(butler, ["d"]) 

1660 

1661 # Empty remove does nothing 

1662 butler.collections.remove_from_chain("chain", []) 

1663 self._check_chain(butler, ["d"]) 

1664 

1665 # Removing children that aren't in the chain does nothing 

1666 butler.collections.remove_from_chain("chain", ["a", "chain"]) 

1667 self._check_chain(butler, ["d"]) 

1668 

1669 self._test_common_chain_functionality( 

1670 butler, butler.collections.remove_from_chain, skip_cycle_check=True 

1671 ) 

1672 

1673 def _setup_to_test_collection_chain(self) -> Butler: 

1674 butler = self.create_empty_butler(writeable=True) 

1675 

1676 butler.collections.register("chain", CollectionType.CHAINED) 

1677 

1678 runs = ["a", "b", "c", "d"] 

1679 for run in runs: 

1680 butler.collections.register(run) 

1681 

1682 butler.collections.register("staticchain", CollectionType.CHAINED) 

1683 butler.collections.redefine_chain("staticchain", ["a", "b"]) 

1684 

1685 return butler 

1686 

1687 def _check_chain(self, butler: Butler, expected: list[str]) -> None: 

1688 children = butler.collections.get_info("chain").children 

1689 self.assertEqual(expected, list(children)) 

1690 

1691 def _test_common_chain_functionality( 

1692 self, butler, func: Callable[[str, str | list[str]], Any], *, skip_cycle_check=False 

1693 ) -> None: 

1694 # Missing parent collection 

1695 with self.assertRaises(MissingCollectionError): 

1696 func("doesnotexist", []) 

1697 # Missing child collection 

1698 with self.assertRaises(MissingCollectionError): 

1699 func("chain", ["doesnotexist"]) 

1700 # Forbid operations on non-chained collections 

1701 with self.assertRaises(CollectionTypeError): 

1702 func("d", ["a"]) 

1703 

1704 # Prevent collection cycles 

1705 if not skip_cycle_check: 

1706 butler.collections.register("chain2", CollectionType.CHAINED) 

1707 func("chain2", "chain") 

1708 with self.assertRaises(CollectionCycleError): 

1709 func("chain", "chain2") 

1710 

1711 # Make sure none of the earlier operations interfered with unrelated 

1712 # chains. 

1713 self.assertEqual(["a", "b"], list(butler.collections.get_info("staticchain").children)) 

1714 

1715 with butler._caching_context(): 

1716 with self.assertRaisesRegex(RuntimeError, "Chained collection modification not permitted"): 

1717 func("chain", "a") 

1718 

1719 def test_transfer_dimension_records_from(self) -> None: 

1720 source_butler = self.create_empty_butler(writeable=True) 

1721 source_butler.import_(filename=_get_test_data_path("lsstcam-subset.yaml")) 

1722 

1723 visit_id = 2025120200439 

1724 exposure_id = visit_id 

1725 target_butler = self.enterContext(create_populated_sqlite_registry()) 

1726 target_butler.transfer_dimension_records_from( 

1727 source_butler, 

1728 [ 

1729 # Should trigger the lookup of visit and all its associated 

1730 # "populated_by" records (visit_detector_region, 

1731 # visit_definition, etc.) 

1732 DataCoordinate.standardize( 

1733 {"instrument": "LSSTCam", "visit": visit_id, "detector": 10}, 

1734 universe=source_butler.dimensions, 

1735 ), 

1736 # Shouldn't add any records to the lookup. 

1737 DataCoordinate.make_empty(source_butler.dimensions), 

1738 ], 

1739 ) 

1740 

1741 def _fetch_record(dimension: str) -> DimensionRecord: 

1742 records = target_butler.query_dimension_records(dimension) 

1743 self.assertEqual(len(records), 1) 

1744 return records[0] 

1745 

1746 visit = _fetch_record("visit") 

1747 self.assertEqual(visit.id, visit_id) 

1748 self.assertEqual(visit.day_obs, 20251202) 

1749 self.assertEqual(visit.target_name, "lowdust") 

1750 self.assertEqual(visit.seq_num, 439) 

1751 original_visit = source_butler.query_dimension_records("visit", instrument="LSSTCam", visit=visit_id)[ 

1752 0 

1753 ] 

1754 self.assertEqual(visit.region, original_visit.region) 

1755 self.assertEqual(visit.timespan, original_visit.timespan) 

1756 

1757 visit_detector_region = _fetch_record("visit_detector_region") 

1758 self.assertEqual(visit_detector_region.instrument, "LSSTCam") 

1759 self.assertEqual(visit_detector_region.detector, 10) 

1760 self.assertEqual(visit_detector_region.visit, visit_id) 

1761 original_visit_detector_region = source_butler.query_dimension_records( 

1762 "visit_detector_region", instrument="LSSTCam", visit=visit_id, detector=10 

1763 )[0] 

1764 self.assertEqual(visit_detector_region.region, original_visit_detector_region.region) 

1765 

1766 visit_definition = _fetch_record("visit_definition") 

1767 self.assertEqual(visit_definition.instrument, "LSSTCam") 

1768 self.assertEqual(visit_definition.exposure, 2025120200439) 

1769 self.assertEqual(visit_definition.visit, visit_id) 

1770 

1771 # The matching exposure record should have been pulled in via 

1772 # visit -> visit_definition. 

1773 exposure = _fetch_record("exposure") 

1774 self.assertEqual(exposure.instrument, "LSSTCam") 

1775 self.assertEqual(exposure.id, 2025120200439) 

1776 self.assertEqual(exposure.obs_id, "MC_O_20251202_000439") 

1777 original_exposure = source_butler.query_dimension_records( 

1778 "exposure", instrument="LSSTCam", exposure=exposure_id 

1779 )[0] 

1780 self.assertEqual(exposure.timespan, original_exposure.timespan) 

1781 

1782 group = _fetch_record("group") 

1783 self.assertEqual(group.instrument, "LSSTCam") 

1784 self.assertEqual(group.name, "2025-12-03T07:58:10.858") 

1785 

1786 visit_system_memberships = target_butler.query_dimension_records("visit_system_membership") 

1787 visit_system_memberships.sort(key=lambda record: record.visit_system) 

1788 self.assertEqual(len(visit_system_memberships), 2) 

1789 self.assertEqual(visit_system_memberships[0].visit_system, 0) 

1790 self.assertEqual(visit_system_memberships[1].visit_system, 2) 

1791 self.assertEqual(visit_system_memberships[0].visit, visit_id) 

1792 self.assertEqual(visit_system_memberships[1].visit, visit_id) 

1793 

1794 visit_systems = target_butler.query_dimension_records("visit_system") 

1795 visit_systems.sort(key=lambda record: record.id) 

1796 visit_system_memberships.sort(key=lambda record: record.visit_system) 

1797 self.assertEqual(visit_systems[0].id, 0) 

1798 self.assertEqual(visit_systems[1].id, 2) 

1799 self.assertEqual(visit_systems[0].name, "one-to-one") 

1800 self.assertEqual(visit_systems[1].name, "by-seq-start-end") 

1801 

1802 

1803class FileDatastoreButlerTests(ButlerTests): 

1804 """Common tests and specialization of ButlerTests for butlers backed 

1805 by datastores that inherit from FileDatastore. 

1806 """ 

1807 

1808 trustModeSupported = True 

1809 

1810 def checkFileExists(self, root: str | ResourcePath, relpath: str | ResourcePath) -> bool: 

1811 """Check if file exists at a given path (relative to root). 

1812 

1813 Test testPutTemplates verifies actual physical existance of the files 

1814 in the requested location. 

1815 """ 

1816 uri = ResourcePath(root, forceDirectory=True) 

1817 return uri.join(relpath).exists() 

1818 

1819 def testPutTemplates(self) -> None: 

1820 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1821 butler = self.create_empty_butler(run=self.default_run) 

1822 

1823 # Add needed Dimensions 

1824 butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

1825 butler.registry.insertDimensionData( 

1826 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

1827 ) 

1828 butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20250101}) 

1829 butler.registry.insertDimensionData( 

1830 "visit", 

1831 { 

1832 "instrument": "DummyCamComp", 

1833 "id": 423, 

1834 "name": "v423", 

1835 "physical_filter": "d-r", 

1836 "day_obs": 20250101, 

1837 }, 

1838 ) 

1839 butler.registry.insertDimensionData( 

1840 "visit", 

1841 { 

1842 "instrument": "DummyCamComp", 

1843 "id": 425, 

1844 "name": "v425", 

1845 "physical_filter": "d-r", 

1846 "day_obs": 20250101, 

1847 }, 

1848 ) 

1849 

1850 # Create and store a dataset 

1851 metric = makeExampleMetrics() 

1852 

1853 # Create two almost-identical DatasetTypes (both will use default 

1854 # template) 

1855 dimensions = butler.dimensions.conform(["instrument", "visit"]) 

1856 butler.registry.registerDatasetType(DatasetType("metric1", dimensions, storageClass)) 

1857 butler.registry.registerDatasetType(DatasetType("metric2", dimensions, storageClass)) 

1858 butler.registry.registerDatasetType(DatasetType("metric3", dimensions, storageClass)) 

1859 

1860 dataId1 = {"instrument": "DummyCamComp", "visit": 423} 

1861 dataId2 = {"instrument": "DummyCamComp", "visit": 423, "physical_filter": "d-r"} 

1862 

1863 # Put with exactly the data ID keys needed 

1864 ref = butler.put(metric, "metric1", dataId1) 

1865 uri = butler.getURI(ref) 

1866 self.assertTrue(uri.exists()) 

1867 self.assertTrue( 

1868 uri.unquoted_path.endswith(f"{self.default_run}/metric1/??#?/d-r/DummyCamComp_423.pickle") 

1869 ) 

1870 

1871 # Check the template based on dimensions 

1872 if hasattr(butler._datastore, "templates"): 

1873 butler._datastore.templates.validateTemplates([ref]) 

1874 

1875 # Put with extra data ID keys (physical_filter is an optional 

1876 # dependency); should not change template (at least the way we're 

1877 # defining them to behave now; the important thing is that they 

1878 # must be consistent). 

1879 ref = butler.put(metric, "metric2", dataId2) 

1880 uri = butler.getURI(ref) 

1881 self.assertTrue(uri.exists()) 

1882 self.assertTrue( 

1883 uri.unquoted_path.endswith(f"{self.default_run}/metric2/d-r/DummyCamComp_v423.pickle") 

1884 ) 

1885 

1886 # Check the template based on dimensions 

1887 if hasattr(butler._datastore, "templates"): 

1888 butler._datastore.templates.validateTemplates([ref]) 

1889 

1890 # Use a template that has a typo in dimension record metadata. 

1891 # Easier to test with a butler that has a ref with records attached. 

1892 template = FileTemplate("a/{visit.name}/{id}_{visit.namex:?}.fits") 

1893 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"): 

1894 path = template.format(ref) 

1895 self.assertEqual(path, f"a/v423/{ref.id}_fits") 

1896 

1897 template = FileTemplate("a/{visit.name}/{id}_{visit.namex}.fits") 

1898 with self.assertRaises(KeyError): 

1899 with self.assertLogs("lsst.daf.butler.datastore.file_templates", "INFO"): 

1900 template.format(ref) 

1901 

1902 # Now use a file template that will not result in unique filenames 

1903 with self.assertRaises(FileTemplateValidationError): 

1904 butler.put(metric, "metric3", dataId1) 

1905 

1906 def testImportExport(self) -> None: 

1907 # Run put/get tests just to create and populate a repo. 

1908 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1909 self.runImportExportTest(storageClass) 

1910 

1911 @unittest.expectedFailure 

1912 def testImportExportVirtualComposite(self) -> None: 

1913 # Run put/get tests just to create and populate a repo. 

1914 storageClass = self.storageClassFactory.getStorageClass("StructuredComposite") 

1915 self.runImportExportTest(storageClass) 

1916 

1917 def runImportExportTest(self, storageClass: StorageClass) -> None: 

1918 """Test exporting and importing. 

1919 

1920 This test does an export to a temp directory and an import back 

1921 into a new temp directory repo. It does not assume a posix datastore. 

1922 """ 

1923 exportButler = self.runPutGetTest(storageClass, "test_metric") 

1924 

1925 # Test that we must have a file extension. 

1926 with self.assertRaises(ValueError): 

1927 with exportButler.export(filename="dump", directory=".") as export: 

1928 pass 

1929 

1930 # Test that unknown format is not allowed. 

1931 with self.assertRaises(ValueError): 

1932 with exportButler.export(filename="dump.fits", directory=".") as export: 

1933 pass 

1934 

1935 # Test that the repo actually has at least one dataset. 

1936 datasets = list(exportButler.registry.queryDatasets(..., collections=...)) 

1937 self.assertGreater(len(datasets), 0) 

1938 # Add a DimensionRecord that's unused by those datasets. 

1939 skymapRecord = {"name": "example_skymap", "hash": (50).to_bytes(8, byteorder="little")} 

1940 exportButler.registry.insertDimensionData("skymap", skymapRecord) 

1941 # Export and then import datasets. 

1942 with safeTestTempDir(TESTDIR) as exportDir: 

1943 exportFile = os.path.join(exportDir, "exports.yaml") 

1944 with exportButler.export(filename=exportFile, directory=exportDir, transfer="auto") as export: 

1945 export.saveDatasets(datasets) 

1946 # Export the same datasets again. This should quietly do 

1947 # nothing because of internal deduplication, and it shouldn't 

1948 # complain about being asked to export the "htm7" elements even 

1949 # though there aren't any in these datasets or in the database. 

1950 export.saveDatasets(datasets, elements=["htm7"]) 

1951 # Save one of the data IDs again; this should be harmless 

1952 # because of internal deduplication. 

1953 export.saveDataIds([datasets[0].dataId]) 

1954 # Save some dimension records directly. 

1955 export.saveDimensionData("skymap", [skymapRecord]) 

1956 self.assertTrue(os.path.exists(exportFile)) 

1957 with safeTestTempDir(TESTDIR) as importDir: 

1958 # We always want this to be a local posix butler 

1959 Butler.makeRepo(importDir, config=Config(os.path.join(TESTDIR, "config/basic/butler.yaml"))) 

1960 # Calling script.butlerImport tests the implementation of the 

1961 # butler command line interface "import" subcommand. Functions 

1962 # in the script folder are generally considered protected and 

1963 # should not be used as public api. 

1964 with open(exportFile) as f: 

1965 script.butlerImport( 

1966 importDir, 

1967 export_file=f, 

1968 directory=exportDir, 

1969 transfer="auto", 

1970 skip_dimensions=None, 

1971 ) 

1972 importButler = Butler.from_config(importDir, run=self.default_run) 

1973 self.enterContext(importButler) 

1974 for ref in datasets: 

1975 with self.subTest(ref=repr(ref)): 

1976 # Test for existence by passing in the DatasetType and 

1977 # data ID separately, to avoid lookup by dataset_id. 

1978 self.assertTrue(importButler.exists(ref.datasetType, ref.dataId)) 

1979 self.assertEqual( 

1980 list(importButler.registry.queryDimensionRecords("skymap")), 

1981 [importButler.dimensions["skymap"].RecordClass(**skymapRecord)], 

1982 ) 

1983 

1984 def testRemoveRuns(self) -> None: 

1985 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

1986 butler = self.create_empty_butler(writeable=True) 

1987 # Load registry data with dimensions to hang datasets off of. 

1988 butler.import_(filename=ResourcePath("resource://lsst.daf.butler/tests/registry_data/base.yaml")) 

1989 # Add some RUN-type collection. 

1990 run1 = "run1" 

1991 butler.collections.register(run1) 

1992 run2 = "run2" 

1993 butler.collections.register(run2) 

1994 # put a dataset in each 

1995 metric = makeExampleMetrics() 

1996 dimensions = butler.dimensions.conform(["instrument", "physical_filter"]) 

1997 datasetType = self.addDatasetType( 

1998 "prune_collections_test_dataset", dimensions, storageClass, butler.registry 

1999 ) 

2000 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1) 

2001 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2) 

2002 uri1 = butler.getURI(ref1) 

2003 uri2 = butler.getURI(ref2) 

2004 

2005 # Put one of the runs in a chain. 

2006 butler.collections.register("Chain", CollectionType.CHAINED) 

2007 butler.collections.extend_chain("Chain", run1) 

2008 

2009 with self.assertRaises(OrphanedRecordError): 

2010 butler.registry.removeDatasetType(datasetType.name) 

2011 

2012 # Remove a non-run. 

2013 with self.assertRaises(TypeError): 

2014 butler.removeRuns(["Chain"]) 

2015 

2016 # Remove without unlinking from chain should fail. 

2017 with self.assertRaises(IntegrityError): 

2018 butler.removeRuns([run1]) 

2019 

2020 # Remove from both runs. No longer use unstore parameter since it 

2021 # always purges. 

2022 butler.removeRuns([run1, run2], unlink_from_chains=True) 

2023 

2024 # Should be nothing in registry for either one, and datastore should 

2025 # not think either exists. 

2026 with self.assertRaises(MissingCollectionError): 

2027 butler.collections.get_info(run1) 

2028 with self.assertRaises(MissingCollectionError): 

2029 butler.collections.get_info(run1) 

2030 self.assertFalse(butler.stored(ref1)) 

2031 self.assertFalse(butler.stored(ref2)) 

2032 # We always unstore so both URIs should be gone. 

2033 self.assertFalse(uri1.exists()) 

2034 self.assertFalse(uri2.exists()) 

2035 

2036 # Now that the collections have been pruned we can remove the 

2037 # dataset type 

2038 butler.registry.removeDatasetType(datasetType.name) 

2039 

2040 with self.assertLogs("lsst.daf.butler.registry", "INFO") as cm: 

2041 butler.registry.removeDatasetType(("test*", "test*")) 

2042 self.assertIn("not defined", "\n".join(cm.output)) 

2043 

2044 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None: 

2045 """Simulate an external actor removing a file outside of Butler's 

2046 knowledge. 

2047 

2048 Subclasses may override to handle more complicated datastore 

2049 configurations. 

2050 """ 

2051 uri = butler.getURI(ref) 

2052 uri.remove() 

2053 datastore = cast(FileDatastore, butler._datastore) 

2054 datastore.cacheManager.remove_from_cache(ref) 

2055 

2056 def testPruneDatasets(self) -> None: 

2057 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2058 butler = self.create_empty_butler(writeable=True) 

2059 # Load registry data with dimensions to hang datasets off of. 

2060 butler.import_(filename=_get_test_data_path("base.yaml")) 

2061 # Add some RUN-type collections. 

2062 run1 = "run1" 

2063 butler.collections.register(run1) 

2064 run2 = "run2" 

2065 butler.collections.register(run2) 

2066 # put some datasets. ref1 and ref2 have the same data ID, and are in 

2067 # different runs. ref3 has a different data ID. 

2068 metric = makeExampleMetrics() 

2069 dimensions = butler.dimensions.conform(["instrument", "physical_filter"]) 

2070 datasetType = self.addDatasetType( 

2071 "prune_collections_test_dataset", dimensions, storageClass, butler.registry 

2072 ) 

2073 ref1 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run1) 

2074 ref2 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-G"}, run=run2) 

2075 ref3 = butler.put(metric, datasetType, {"instrument": "Cam1", "physical_filter": "Cam1-R1"}, run=run1) 

2076 

2077 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2078 for ref, stored in many_stored.items(): 

2079 self.assertTrue(stored, f"Ref {ref} should be stored") 

2080 

2081 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2082 for ref, exists in many_exists.items(): 

2083 self.assertTrue(exists, f"Checking ref {ref} exists.") 

2084 self.assertEqual(exists, DatasetExistence.VERIFIED, f"Ref {ref} should be stored") 

2085 

2086 # Simple prune. 

2087 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True) 

2088 self.assertFalse(butler.exists(ref1.datasetType, ref1.dataId, collections=run1)) 

2089 

2090 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2091 for ref, stored in many_stored.items(): 

2092 self.assertFalse(stored, f"Ref {ref} should not be stored") 

2093 

2094 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2095 for ref, exists in many_exists.items(): 

2096 self.assertEqual(exists, DatasetExistence.UNRECOGNIZED, f"Ref {ref} should not be stored") 

2097 

2098 # Put data back. 

2099 ref1_new = butler.put(metric, ref1) 

2100 self.assertEqual(ref1_new, ref1) # Reuses original ID. 

2101 ref2 = butler.put(metric, ref2) 

2102 

2103 many_stored = butler.stored_many([ref1, ref2, ref3]) 

2104 self.assertTrue(many_stored[ref1]) 

2105 self.assertTrue(many_stored[ref2]) 

2106 self.assertFalse(many_stored[ref3]) 

2107 

2108 ref3 = butler.put(metric, ref3) 

2109 

2110 many_exists = butler._exists_many([ref1, ref2, ref3]) 

2111 for ref, exists in many_exists.items(): 

2112 self.assertTrue(exists, f"Ref {ref} should not be stored") 

2113 

2114 # Clear out the datasets from registry and start again. 

2115 refs = [ref1, ref2, ref3] 

2116 butler.pruneDatasets(refs, purge=True, unstore=True) 

2117 for ref in refs: 

2118 butler.put(metric, ref) 

2119 

2120 # Confirm we can retrieve deferred. 

2121 dref1 = butler.getDeferred(ref1) # known and exists 

2122 metric1 = dref1.get() 

2123 self.assertEqual(metric1, metric) 

2124 

2125 # Test different forms of file availability. 

2126 # Need to be in a state where: 

2127 # - one ref just has registry record. 

2128 # - one ref has a missing file but a datastore record. 

2129 # - one ref has a missing datastore record but file is there. 

2130 # - one ref does not exist anywhere. 

2131 # Do not need to test a ref that has everything since that is tested 

2132 # above. 

2133 ref0 = DatasetRef( 

2134 datasetType, 

2135 DataCoordinate.standardize( 

2136 {"instrument": "Cam1", "physical_filter": "Cam1-G"}, universe=butler.dimensions 

2137 ), 

2138 run=run1, 

2139 ) 

2140 

2141 # Delete from datastore and retain in Registry. 

2142 butler.pruneDatasets([ref1], purge=False, unstore=True, disassociate=False) 

2143 

2144 # File has been removed. 

2145 self.remove_dataset_out_of_band(butler, ref2) 

2146 

2147 # Datastore has lost track. 

2148 butler._datastore.forget([ref3]) 

2149 

2150 # First test with a standard butler. 

2151 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True) 

2152 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2153 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED) 

2154 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE) 

2155 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED) 

2156 

2157 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=False) 

2158 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2159 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED | DatasetExistence._ASSUMED) 

2160 self.assertEqual(exists_many[ref2], DatasetExistence.KNOWN) 

2161 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ASSUMED) 

2162 self.assertTrue(exists_many[ref2]) 

2163 

2164 # Check that per-ref query gives the same answer as many query. 

2165 for ref, exists in exists_many.items(): 

2166 self.assertEqual(butler.exists(ref, full_check=False), exists) 

2167 

2168 # Get deferred checks for existence before it allows it to be 

2169 # retrieved. 

2170 with self.assertRaises(LookupError): 

2171 butler.getDeferred(ref3) # not known, file exists 

2172 dref2 = butler.getDeferred(ref2) # known but file missing 

2173 with self.assertRaises(FileNotFoundError): 

2174 dref2.get() 

2175 

2176 # Test again with a trusting butler. 

2177 if self.trustModeSupported: 

2178 butler._datastore.trustGetRequest = True 

2179 exists_many = butler._exists_many([ref0, ref1, ref2, ref3], full_check=True) 

2180 self.assertEqual(exists_many[ref0], DatasetExistence.UNRECOGNIZED) 

2181 self.assertEqual(exists_many[ref1], DatasetExistence.RECORDED) 

2182 self.assertEqual(exists_many[ref2], DatasetExistence.RECORDED | DatasetExistence.DATASTORE) 

2183 self.assertEqual(exists_many[ref3], DatasetExistence.RECORDED | DatasetExistence._ARTIFACT) 

2184 

2185 # When trusting we can get a deferred dataset handle that is not 

2186 # known but does exist. 

2187 dref3 = butler.getDeferred(ref3) 

2188 metric3 = dref3.get() 

2189 self.assertEqual(metric3, metric) 

2190 

2191 # Check that per-ref query gives the same answer as many query. 

2192 for ref, exists in exists_many.items(): 

2193 self.assertEqual(butler.exists(ref, full_check=True), exists) 

2194 

2195 # Create a ref that surprisingly has the UUID of an existing ref 

2196 # but is not the same. 

2197 ref_bad = DatasetRef(datasetType, dataId=ref3.dataId, run=ref3.run, id=ref2.id) 

2198 with self.assertRaises(ValueError): 

2199 butler.exists(ref_bad) 

2200 

2201 # Create a ref that has a compatible storage class. 

2202 ref_compat = ref2.overrideStorageClass("StructuredDataDict") 

2203 exists = butler.exists(ref_compat) 

2204 self.assertEqual(exists, exists_many[ref2]) 

2205 

2206 # Remove everything and start from scratch. 

2207 butler._datastore.trustGetRequest = False 

2208 butler.pruneDatasets(refs, purge=True, unstore=True) 

2209 for ref in refs: 

2210 butler.put(metric, ref) 

2211 

2212 # These tests mess directly with the trash table and can leave the 

2213 # datastore in an odd state. Do them at the end. 

2214 # Check that in normal mode, deleting the record will lead to 

2215 # trash not touching the file. 

2216 uri1 = butler.getURI(ref1) 

2217 butler._datastore.bridge.moveToTrash( 

2218 [ref1], transaction=None 

2219 ) # Update the dataset_location table 

2220 butler._datastore.forget([ref1]) 

2221 butler._datastore.trash(ref1) 

2222 butler._datastore.emptyTrash() 

2223 self.assertTrue(uri1.exists()) 

2224 uri1.remove() # Clean it up. 

2225 

2226 # Simulate execution butler setup by deleting the datastore 

2227 # record but keeping the file around and trusting. 

2228 butler._datastore.trustGetRequest = True 

2229 uris = butler.get_many_uris([ref2, ref3]) 

2230 uri2 = uris[ref2].primaryURI 

2231 uri3 = uris[ref3].primaryURI 

2232 self.assertTrue(uri2.exists()) 

2233 self.assertTrue(uri3.exists()) 

2234 

2235 # Remove the datastore record. 

2236 butler._datastore.bridge.moveToTrash( 

2237 [ref2], transaction=None 

2238 ) # Update the dataset_location table 

2239 butler._datastore.forget([ref2]) 

2240 self.assertTrue(uri2.exists()) 

2241 butler._datastore.trash([ref2, ref3]) 

2242 # Immediate removal for ref2 file 

2243 self.assertFalse(uri2.exists()) 

2244 # But ref3 has to wait for the empty. 

2245 self.assertTrue(uri3.exists()) 

2246 butler._datastore.emptyTrash() 

2247 self.assertFalse(uri3.exists()) 

2248 

2249 # Clear out the datasets from registry. 

2250 butler.pruneDatasets([ref1, ref2, ref3], purge=True, unstore=True) 

2251 

2252 def test_butler_metrics(self): 

2253 """Test that metrics are collected.""" 

2254 run = "test_run" 

2255 metrics = ButlerMetrics() 

2256 butler, datasetType = self.create_butler( 

2257 run, "MetricsExampleModelProvenance", "prov_metric", metrics=metrics 

2258 ) 

2259 data = MetricsExampleModel( 

2260 summary={"AM1": 5.2, "AM2": 30.6}, 

2261 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

2262 data=[563, 234, 456.7, 752, 8, 9, 27], 

2263 ) 

2264 

2265 data_ref = butler.put(data, datasetType, visit=424, instrument="DummyCamComp") 

2266 butler.get(data_ref) 

2267 butler.get(data_ref) 

2268 self.assertEqual(metrics.n_get, 2) 

2269 self.assertGreater(metrics.time_in_get, 0.0) 

2270 self.assertEqual(metrics.n_put, 1) 

2271 self.assertGreater(metrics.time_in_put, 0.0) 

2272 

2273 deferred = butler.getDeferred(data_ref) 

2274 deferred.get() 

2275 self.assertEqual(metrics.n_get, 3) 

2276 

2277 with butler.record_metrics() as new: 

2278 data_ref_2 = butler.put(data, datasetType, visit=425, instrument="DummyCamComp") 

2279 butler.get(data_ref) 

2280 

2281 butler.pruneDatasets([data_ref, data_ref_2], purge=True, unstore=True) 

2282 with ResourcePath.temporary_uri(suffix=".json") as tmpFile: 

2283 tmpFile.write(data.model_dump_json().encode()) 

2284 refs = [ 

2285 DatasetRef(datasetType, data_ref_2.dataId, run), 

2286 DatasetRef(datasetType, data_ref.dataId, run), 

2287 ] 

2288 datasets = [FileDataset(path=tmpFile, refs=refs)] 

2289 butler.ingest(*datasets, transfer="copy") 

2290 

2291 self.assertEqual(new.n_get, 1) 

2292 self.assertEqual(new.n_put, 1) 

2293 self.assertEqual(new.n_ingest, 2) 

2294 

2295 

2296class PosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2297 """PosixDatastore specialization of a butler""" 

2298 

2299 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2300 fullConfigKey: str | None = ".datastore.formatters" 

2301 validationCanFail = True 

2302 datastoreStr = ["/tmp"] 

2303 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"] 

2304 registryStr = "/gen3.sqlite3" 

2305 

2306 def testPathConstructor(self) -> None: 

2307 """Independent test of constructor using PathLike.""" 

2308 butler = Butler.from_config(self.tmpConfigFile, run=self.default_run) 

2309 self.enterContext(butler) 

2310 self.assertIsInstance(butler, Butler) 

2311 

2312 # And again with a Path object with the butler yaml 

2313 path = pathlib.Path(self.tmpConfigFile) 

2314 butler = Butler.from_config(path, writeable=False) 

2315 self.enterContext(butler) 

2316 self.assertIsInstance(butler, Butler) 

2317 

2318 # And again with a Path object without the butler yaml 

2319 # (making sure we skip it if the tmp config doesn't end 

2320 # in butler.yaml -- which is the case for a subclass) 

2321 if self.tmpConfigFile.endswith("butler.yaml"): 

2322 path = pathlib.Path(os.path.dirname(self.tmpConfigFile)) 

2323 butler = Butler.from_config(path, writeable=False) 

2324 self.enterContext(butler) 

2325 self.assertIsInstance(butler, Butler) 

2326 

2327 def testExportTransferCopy(self) -> None: 

2328 """Test local export using all transfer modes""" 

2329 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2330 exportButler = self.runPutGetTest(storageClass, "test_metric") 

2331 # Test that the repo actually has at least one dataset. 

2332 datasets = list(exportButler.registry.queryDatasets(..., collections=...)) 

2333 self.assertGreater(len(datasets), 0) 

2334 uris = [exportButler.getURI(d) for d in datasets] 

2335 assert isinstance(exportButler._datastore, FileDatastore) 

2336 datastoreRoot = exportButler.get_datastore_roots()[exportButler.get_datastore_names()[0]] 

2337 

2338 pathsInStore = [uri.relative_to(datastoreRoot) for uri in uris] 

2339 

2340 for path in pathsInStore: 

2341 # Assume local file system 

2342 assert path is not None 

2343 self.assertTrue(self.checkFileExists(datastoreRoot, path), f"Checking path {path}") 

2344 

2345 for transfer in ("copy", "link", "symlink", "relsymlink"): 

2346 with safeTestTempDir(TESTDIR) as exportDir: 

2347 with exportButler.export(directory=exportDir, format="yaml", transfer=transfer) as export: 

2348 export.saveDatasets(datasets) 

2349 for path in pathsInStore: 

2350 assert path is not None 

2351 self.assertTrue( 

2352 self.checkFileExists(exportDir, path), 

2353 f"Check that mode {transfer} exported files", 

2354 ) 

2355 

2356 def testPytypeCoercion(self) -> None: 

2357 """Test python type coercion on Butler.get and put.""" 

2358 # Store some data with the normal example storage class. 

2359 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2360 datasetTypeName = "test_metric" 

2361 butler = self.runPutGetTest(storageClass, datasetTypeName) 

2362 

2363 dataId = {"instrument": "DummyCamComp", "visit": 423} 

2364 metric = butler.get(datasetTypeName, dataId=dataId) 

2365 self.assertEqual(get_full_type_name(metric), "lsst.daf.butler.tests.MetricsExample") 

2366 

2367 datasetType_ori = butler.get_dataset_type(datasetTypeName) 

2368 self.assertEqual(datasetType_ori.storageClass.name, "StructuredDataNoComponents") 

2369 

2370 # Now need to hack the registry dataset type definition. 

2371 # There is no API for this. 

2372 assert isinstance(butler._registry, SqlRegistry) 

2373 manager = butler._registry._managers.datasets 

2374 assert hasattr(manager, "_db") and hasattr(manager, "_static") 

2375 manager._db.update( 

2376 manager._static.dataset_type, 

2377 {"name": datasetTypeName}, 

2378 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataNoComponentsModel"}, 

2379 ) 

2380 

2381 # Force reset of dataset type cache 

2382 butler.registry.refresh() 

2383 

2384 datasetType_new = butler.get_dataset_type(datasetTypeName) 

2385 self.assertEqual(datasetType_new.name, datasetType_ori.name) 

2386 self.assertEqual(datasetType_new.storageClass.name, "StructuredDataNoComponentsModel") 

2387 

2388 metric_model = butler.get(datasetTypeName, dataId=dataId) 

2389 self.assertNotEqual(type(metric_model), type(metric)) 

2390 self.assertEqual(get_full_type_name(metric_model), "lsst.daf.butler.tests.MetricsExampleModel") 

2391 

2392 # Put the model and read it back to show that everything now 

2393 # works as normal. 

2394 metric_ref = butler.put(metric_model, datasetTypeName, dataId=dataId, visit=424) 

2395 metric_model_new = butler.get(metric_ref) 

2396 self.assertEqual(metric_model_new, metric_model) 

2397 

2398 # Hack the storage class again to something that will fail on the 

2399 # get with no conversion class. 

2400 manager._db.update( 

2401 manager._static.dataset_type, 

2402 {"name": datasetTypeName}, 

2403 {datasetTypeName: datasetTypeName, "storage_class": "StructuredDataListYaml"}, 

2404 ) 

2405 butler.registry.refresh() 

2406 

2407 with self.assertRaises(ValueError): 

2408 butler.get(datasetTypeName, dataId=dataId) 

2409 

2410 def test_provenance(self): 

2411 """Test that provenance is attached on put.""" 

2412 run = "test_run" 

2413 butler, datasetType = self.create_butler(run, "MetricsExampleModelProvenance", "prov_metric") 

2414 metric = MetricsExampleModel( 

2415 summary={"AM1": 5.2, "AM2": 30.6}, 

2416 output={"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

2417 data=[563, 234, 456.7, 752, 8, 9, 27], 

2418 ) 

2419 # Provenance can be attached to the object being put. Whether 

2420 # it is or not is dependent on the formatter. For this test we 

2421 # copy on adding provenance to ensure they differ. 

2422 self.assertIsNone(metric.dataset_id) 

2423 metric_ref = butler.put(metric, datasetType, visit=424, instrument="DummyCamComp") 

2424 self.assertIsNone(metric.dataset_id) 

2425 metric_2 = butler.get(metric_ref) 

2426 self.assertEqual(metric_2.data, metric.data) 

2427 self.assertEqual(metric_2.dataset_id, metric_ref.id) 

2428 self.assertIsNone(metric_2.provenance) 

2429 

2430 # Put with provenance. 

2431 prov = DatasetProvenance(quantum_id=uuid.uuid4()) 

2432 prov.add_input(metric_ref) 

2433 prov.add_extra_provenance(metric_ref.id, {"answer": 42}) 

2434 metric_ref2 = butler.put(metric, datasetType, visit=423, instrument="DummyCamComp", provenance=prov) 

2435 metric_3 = butler.get(metric_ref2) 

2436 self.assertEqual(metric_3.provenance, prov) 

2437 

2438 # Check that we can extract provenance from dict form. 

2439 prov_dict = prov.to_flat_dict(metric_ref2) 

2440 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler) 

2441 self.assertEqual(ref_from_prov, metric_ref2) 

2442 # Direct __eq__ of the provenance does not work because one side 

2443 # includes dimension records. 

2444 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs}) 

2445 self.assertEqual(prov_from_prov.quantum_id, prov.quantum_id) 

2446 self.assertEqual(prov_from_prov.extras, prov.extras) 

2447 

2448 # Force a bad ID into the dict. 

2449 prov_dict["id"] = uuid.uuid4() 

2450 with self.assertRaises(ValueError): 

2451 DatasetProvenance.from_flat_dict(prov_dict, butler) 

2452 del prov_dict["id"] 

2453 prov_dict["input 0 id"] = uuid.uuid4() 

2454 with self.assertRaises(ValueError): 

2455 DatasetProvenance.from_flat_dict(prov_dict, butler) 

2456 

2457 # Check that simple types can be reconstructed with non-standard 

2458 # separators. 

2459 prov_dict = prov.to_flat_dict(metric_ref2, prefix="XYZ", sep="😎", simple_types=True) 

2460 prov_from_prov, ref_from_prov = DatasetProvenance.from_flat_dict(prov_dict, butler) 

2461 self.assertEqual(ref_from_prov, metric_ref2) 

2462 self.assertEqual({ref.id for ref in prov_from_prov.inputs}, {ref.id for ref in prov.inputs}) 

2463 

2464 with self.assertRaises(ValueError): 

2465 DatasetProvenance.from_flat_dict({"unknown": 42}, butler) 

2466 

2467 def test_specialized_file_datasets_functions(self): 

2468 """Test a workflow used in Prompt Processing where we export datasets 

2469 from one repository and write them in-place to the datastore of 

2470 another, without immediately inserting registry entries for the 

2471 datasets. 

2472 """ 

2473 repo = MetricTestRepo.create_from_butler( 

2474 self.create_empty_butler(writeable=True), 

2475 self.tmpConfigFile, 

2476 "StructuredCompositeReadCompNoDisassembly", 

2477 ) 

2478 source_butler = repo.butler 

2479 

2480 # Test writing outputs to a FileDatastore. 

2481 with tempfile.TemporaryDirectory() as tempdir: 

2482 target_repo_config = Butler.makeRepo(tempdir) 

2483 refs = [repo.ref1, repo.ref2] 

2484 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs) 

2485 self.assertEqual(len(datasets), 2) 

2486 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets}) 

2487 for dataset in datasets: 

2488 path = ResourcePath(dataset.path, forceAbsolute=False) 

2489 # Paths should be relative paths to the target datastore. 

2490 self.assertFalse(path.isabs()) 

2491 # Files should have been copied into the target datastore 

2492 self.assertTrue(ResourcePath(tempdir).join(path).exists()) 

2493 

2494 # Make sure the target Butler can ingest the datasets. 

2495 target_butler = Butler(target_repo_config, writeable=True) 

2496 self.enterContext(target_butler) 

2497 target_butler.transfer_dimension_records_from(source_butler, refs) 

2498 target_butler.ingest(*datasets, transfer=None) 

2499 self.assertIsNotNone(target_butler.get(repo.ref1)) 

2500 self.assertIsNotNone(target_butler.get(repo.ref2)) 

2501 

2502 # Giving an empty list of files is a no-op. 

2503 no_datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), []) 

2504 self.assertEqual(len(no_datasets), 0) 

2505 

2506 # Test writing outputs to a ChainedDatastore. 

2507 with tempfile.TemporaryDirectory() as tempdir: 

2508 # Set up a second dataset type, so we can split the files across 

2509 # multiple datastore roots. 

2510 dt1 = repo.datasetType 

2511 dt2 = DatasetType("other", dt1.dimensions, dt1.storageClass) 

2512 source_butler.registry.registerDatasetType(dt2) 

2513 other_ref = repo.addDataset(repo.ref1.dataId, datasetType=dt2) 

2514 config = Config.fromString( 

2515 f""" 

2516 datastore: 

2517 cls: lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore 

2518 datastore_constraints: 

2519 - constraints: 

2520 accept: 

2521 - {dt1.name} 

2522 - constraints: 

2523 accept: 

2524 - {dt2.name} 

2525 datastores: 

2526 - datastore: 

2527 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore 

2528 root: <butlerRoot>/FileDatastore_0 

2529 - datastore: 

2530 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore 

2531 root: <butlerRoot>/FileDatastore_1 

2532 """ 

2533 ) 

2534 target_repo_config = Butler.makeRepo(tempdir, config) 

2535 refs = [repo.ref1, repo.ref2, other_ref] 

2536 datasets = transfer_datasets_to_datastore(source_butler, ButlerConfig(target_repo_config), refs) 

2537 self.assertEqual(len(datasets), 3) 

2538 self.assertEqual({ref.id for ref in refs}, {dataset.refs[0].id for dataset in datasets}) 

2539 for dataset in datasets: 

2540 path = ResourcePath(dataset.path, forceAbsolute=False) 

2541 # Paths should be relative paths to the target datastore. 

2542 self.assertFalse(path.isabs()) 

2543 # Files should have been split up between the two datastores 

2544 # in the chain. 

2545 datastore_root = ResourcePath(tempdir) 

2546 if dataset.refs[0].datasetType.name == dt1.name: 

2547 datastore_root = datastore_root.join("FileDatastore_0") 

2548 else: 

2549 datastore_root = datastore_root.join("FileDatastore_1") 

2550 self.assertTrue(datastore_root.join(path).exists()) 

2551 

2552 # Make sure the target Butler can ingest the datasets. 

2553 target_butler = Butler(target_repo_config, writeable=True) 

2554 self.enterContext(target_butler) 

2555 target_butler.transfer_dimension_records_from(source_butler, refs) 

2556 target_butler.ingest(*datasets, transfer=None) 

2557 self.assertIsNotNone(target_butler.get(repo.ref1)) 

2558 self.assertIsNotNone(target_butler.get(repo.ref2)) 

2559 self.assertIsNotNone(target_butler.get(other_ref)) 

2560 

2561 def test_temporary_for_ingest(self) -> None: 

2562 """Test the `lsst.daf.butler._rubin.ingest_from_temporary` module.""" 

2563 with self.create_empty_butler("example_run") as butler: 

2564 dataset_type = DatasetType("example", butler.dimensions.empty, "StructuredDataDict") 

2565 butler.registry.registerDatasetType(dataset_type) 

2566 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(butler.dimensions), "example_run") 

2567 with TemporaryForIngest(butler, ref) as temporary: 

2568 temporary.path.write(b"three: 3") 

2569 found = TemporaryForIngest.find_orphaned_temporaries_by_ref(ref, butler) 

2570 self.assertEqual(found, [temporary.path]) 

2571 self.assertIn(".tmp", temporary.ospath) 

2572 temporary.ingest() 

2573 loaded = butler.get(ref) 

2574 self.assertEqual(loaded, {"three": 3}) 

2575 

2576 

2577class PostgresPosixDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2578 """PosixDatastore specialization of a butler using Postgres""" 

2579 

2580 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2581 fullConfigKey = ".datastore.formatters" 

2582 validationCanFail = True 

2583 datastoreStr = ["/tmp"] 

2584 datastoreName = [f"FileDatastore@{BUTLER_ROOT_TAG}"] 

2585 registryStr = "PostgreSQL@test" 

2586 

2587 @classmethod 

2588 def setUpClass(cls) -> None: 

2589 cls.postgresql = cls.enterClassContext(setup_postgres_test_db()) 

2590 super().setUpClass() 

2591 

2592 def setUp(self) -> None: 

2593 # Need to add a registry section to the config. 

2594 self._temp_config = False 

2595 config = Config(self.configFile) 

2596 self.postgresql.patch_butler_config(config) 

2597 with tempfile.NamedTemporaryFile("w", suffix=".yaml", delete=False) as fh: 

2598 config.dump(fh) 

2599 self.configFile = fh.name 

2600 self._temp_config = True 

2601 super().setUp() 

2602 

2603 def tearDown(self) -> None: 

2604 if self._temp_config and os.path.exists(self.configFile): 

2605 os.remove(self.configFile) 

2606 super().tearDown() 

2607 

2608 def testMakeRepo(self) -> None: 

2609 # The base class test assumes that it's using sqlite and assumes 

2610 # the config file is acceptable to sqlite. 

2611 raise unittest.SkipTest("Postgres config is not compatible with this test.") 

2612 

2613 

2614class ClonedPostgresPosixDatastoreButlerTestCase(PostgresPosixDatastoreButlerTestCase, unittest.TestCase): 

2615 """Test that Butler with a Postgres registry still works after cloning.""" 

2616 

2617 def create_butler( 

2618 self, 

2619 run: str, 

2620 storageClass: StorageClass | str, 

2621 datasetTypeName: str, 

2622 metrics: ButlerMetrics | None = None, 

2623 ) -> tuple[DirectButler, DatasetType]: 

2624 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics) 

2625 return butler.clone(run=run, metrics=metrics), datasetType 

2626 

2627 

2628class InMemoryDatastoreButlerTestCase(ButlerTests, unittest.TestCase): 

2629 """InMemoryDatastore specialization of a butler""" 

2630 

2631 configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml") 

2632 fullConfigKey = None 

2633 useTempRoot = False 

2634 validationCanFail = False 

2635 datastoreStr = ["datastore='InMemory"] 

2636 datastoreName = ["InMemoryDatastore@"] 

2637 registryStr = "/gen3.sqlite3" 

2638 

2639 def testIngest(self) -> None: 

2640 pass 

2641 

2642 def test_ingest_zip(self) -> None: 

2643 pass 

2644 

2645 

2646class ClonedSqliteButlerTestCase(InMemoryDatastoreButlerTestCase, unittest.TestCase): 

2647 """Test that a Butler with a Sqlite registry still works after cloning.""" 

2648 

2649 def create_butler( 

2650 self, 

2651 run: str, 

2652 storageClass: StorageClass | str, 

2653 datasetTypeName: str, 

2654 metrics: ButlerMetrics | None = None, 

2655 ) -> tuple[DirectButler, DatasetType]: 

2656 butler, datasetType = super().create_butler(run, storageClass, datasetTypeName, metrics=metrics) 

2657 return butler.clone(run=run), datasetType 

2658 

2659 

2660class ChainedDatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2661 """PosixDatastore specialization""" 

2662 

2663 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml") 

2664 fullConfigKey = ".datastore.datastores.1.formatters" 

2665 validationCanFail = True 

2666 datastoreStr = ["datastore='InMemory", "/FileDatastore_1/,", "/FileDatastore_2/'"] 

2667 datastoreName = [ 

2668 "InMemoryDatastore@", 

2669 f"FileDatastore@{BUTLER_ROOT_TAG}/FileDatastore_1", 

2670 "SecondDatastore", 

2671 ] 

2672 registryStr = "/gen3.sqlite3" 

2673 

2674 def testPruneDatasets(self) -> None: 

2675 # This test relies on manipulating files out-of-band, which is 

2676 # impossible for this configuration because of the InMemoryDatastore in 

2677 # the ChainedDatastore. 

2678 pass 

2679 

2680 

2681class ButlerExplicitRootTestCase(PosixDatastoreButlerTestCase): 

2682 """Test that a yaml file in one location can refer to a root in another.""" 

2683 

2684 datastoreStr = ["dir1"] 

2685 # Disable the makeRepo test since we are deliberately not using 

2686 # butler.yaml as the config name. 

2687 fullConfigKey = None 

2688 

2689 def setUp(self) -> None: 

2690 self.root = makeTestTempDir(TESTDIR) 

2691 

2692 # Make a new repository in one place 

2693 self.dir1 = os.path.join(self.root, "dir1") 

2694 Butler.makeRepo(self.dir1, config=Config(self.configFile)) 

2695 

2696 # Move the yaml file to a different place and add a "root" 

2697 self.dir2 = os.path.join(self.root, "dir2") 

2698 os.makedirs(self.dir2, exist_ok=True) 

2699 configFile1 = os.path.join(self.dir1, "butler.yaml") 

2700 config = Config(configFile1) 

2701 config["root"] = self.dir1 

2702 configFile2 = os.path.join(self.dir2, "butler2.yaml") 

2703 config.dumpToUri(configFile2) 

2704 os.remove(configFile1) 

2705 self.tmpConfigFile = configFile2 

2706 

2707 def testFileLocations(self) -> None: 

2708 self.assertNotEqual(self.dir1, self.dir2) 

2709 self.assertTrue(os.path.exists(os.path.join(self.dir2, "butler2.yaml"))) 

2710 self.assertFalse(os.path.exists(os.path.join(self.dir1, "butler.yaml"))) 

2711 self.assertTrue(os.path.exists(os.path.join(self.dir1, "gen3.sqlite3"))) 

2712 

2713 

2714class ButlerMakeRepoOutfileTestCase(ButlerPutGetTests, unittest.TestCase): 

2715 """Test that a config file created by makeRepo outside of repo works.""" 

2716 

2717 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2718 

2719 def setUp(self) -> None: 

2720 self.root = makeTestTempDir(TESTDIR) 

2721 self.root2 = makeTestTempDir(TESTDIR) 

2722 

2723 self.tmpConfigFile = os.path.join(self.root2, "different.yaml") 

2724 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2725 

2726 def tearDown(self) -> None: 

2727 if os.path.exists(self.root2): 

2728 shutil.rmtree(self.root2, ignore_errors=True) 

2729 super().tearDown() 

2730 

2731 def testConfigExistence(self) -> None: 

2732 c = Config(self.tmpConfigFile) 

2733 uri_config = ResourcePath(c["root"]) 

2734 uri_expected = ResourcePath(self.root, forceDirectory=True) 

2735 self.assertEqual(uri_config.geturl(), uri_expected.geturl()) 

2736 self.assertNotIn(":", uri_config.path, "Check for URI concatenated with normal path") 

2737 

2738 def testPutGet(self) -> None: 

2739 storageClass = self.storageClassFactory.getStorageClass("StructuredDataNoComponents") 

2740 self.runPutGetTest(storageClass, "test_metric") 

2741 

2742 

2743class ButlerMakeRepoOutfileDirTestCase(ButlerMakeRepoOutfileTestCase): 

2744 """Test that a config file created by makeRepo outside of repo works.""" 

2745 

2746 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2747 

2748 def setUp(self) -> None: 

2749 self.root = makeTestTempDir(TESTDIR) 

2750 self.root2 = makeTestTempDir(TESTDIR) 

2751 

2752 self.tmpConfigFile = self.root2 

2753 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2754 

2755 def testConfigExistence(self) -> None: 

2756 # Append the yaml file else Config constructor does not know the file 

2757 # type. 

2758 self.tmpConfigFile = os.path.join(self.tmpConfigFile, "butler.yaml") 

2759 super().testConfigExistence() 

2760 

2761 

2762class ButlerMakeRepoOutfileUriTestCase(ButlerMakeRepoOutfileTestCase): 

2763 """Test that a config file created by makeRepo outside of repo works.""" 

2764 

2765 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

2766 

2767 def setUp(self) -> None: 

2768 self.root = makeTestTempDir(TESTDIR) 

2769 self.root2 = makeTestTempDir(TESTDIR) 

2770 

2771 self.tmpConfigFile = ResourcePath(os.path.join(self.root2, "something.yaml")).geturl() 

2772 Butler.makeRepo(self.root, config=Config(self.configFile), outfile=self.tmpConfigFile) 

2773 

2774 

2775@unittest.skipIf(not boto3, "Warning: boto3 AWS SDK not found!") 

2776class S3DatastoreButlerTestCase(FileDatastoreButlerTests, unittest.TestCase): 

2777 """S3Datastore specialization of a butler; an S3 storage Datastore + 

2778 a local in-memory SqlRegistry. 

2779 """ 

2780 

2781 configFile = os.path.join(TESTDIR, "config/basic/butler-s3store.yaml") 

2782 fullConfigKey = None 

2783 validationCanFail = True 

2784 

2785 bucketName = "anybucketname" 

2786 """Name of the Bucket that will be used in the tests. The name is read from 

2787 the config file used with the tests during set-up. 

2788 """ 

2789 

2790 root = "butlerRoot/" 

2791 """Root repository directory expected to be used in case useTempRoot=False. 

2792 Otherwise the root is set to a 20 characters long randomly generated string 

2793 during set-up. 

2794 """ 

2795 

2796 datastoreStr = [f"datastore={root}"] 

2797 """Contains all expected root locations in a format expected to be 

2798 returned by Butler stringification. 

2799 """ 

2800 

2801 datastoreName = ["FileDatastore@s3://{bucketName}/{root}"] 

2802 """The expected format of the S3 Datastore string.""" 

2803 

2804 registryStr = "/gen3.sqlite3" 

2805 """Expected format of the Registry string.""" 

2806 

2807 mock_aws = mock_aws() 

2808 """The mocked s3 interface from moto.""" 

2809 

2810 def genRoot(self) -> str: 

2811 """Return a random string of len 20 to serve as a root 

2812 name for the temporary bucket repo. 

2813 

2814 This is equivalent to tempfile.mkdtemp as this is what self.root 

2815 becomes when useTempRoot is True. 

2816 """ 

2817 rndstr = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(20)) 

2818 return rndstr + "/" 

2819 

2820 def setUp(self) -> None: 

2821 config = Config(self.configFile) 

2822 uri = ResourcePath(config[".datastore.datastore.root"]) 

2823 self.bucketName = uri.netloc 

2824 

2825 # Enable S3 mocking of tests. 

2826 self.enterContext(clean_test_environment_for_s3()) 

2827 self.mock_aws.start() 

2828 

2829 if self.useTempRoot: 

2830 self.root = self.genRoot() 

2831 rooturi = f"s3://{self.bucketName}/{self.root}" 

2832 config.update({"datastore": {"datastore": {"root": rooturi}}}) 

2833 

2834 # need local folder to store registry database 

2835 self.reg_dir = makeTestTempDir(TESTDIR) 

2836 config["registry", "db"] = f"sqlite:///{self.reg_dir}/gen3.sqlite3" 

2837 

2838 # MOTO needs to know that we expect Bucket bucketname to exist 

2839 # (this used to be the class attribute bucketName) 

2840 s3 = boto3.resource("s3") 

2841 s3.create_bucket(Bucket=self.bucketName) 

2842 

2843 self.datastoreStr = [f"datastore='{rooturi}'"] 

2844 self.datastoreName = [f"FileDatastore@{rooturi}"] 

2845 Butler.makeRepo(rooturi, config=config, forceConfigRoot=False) 

2846 self.tmpConfigFile = posixpath.join(rooturi, "butler.yaml") 

2847 

2848 def tearDown(self) -> None: 

2849 s3 = boto3.resource("s3") 

2850 bucket = s3.Bucket(self.bucketName) 

2851 try: 

2852 bucket.objects.all().delete() 

2853 except botocore.exceptions.ClientError as e: 

2854 if e.response["Error"]["Code"] == "404": 

2855 # the key was not reachable - pass 

2856 pass 

2857 else: 

2858 raise 

2859 

2860 bucket = s3.Bucket(self.bucketName) 

2861 bucket.delete() 

2862 

2863 # Stop the S3 mock. 

2864 self.mock_aws.stop() 

2865 

2866 if self.reg_dir is not None and os.path.exists(self.reg_dir): 

2867 shutil.rmtree(self.reg_dir, ignore_errors=True) 

2868 

2869 if self.useTempRoot and os.path.exists(self.root): 

2870 shutil.rmtree(self.root, ignore_errors=True) 

2871 

2872 super().tearDown() 

2873 

2874 

2875class DatastoreTransfers(TestCaseMixin): 

2876 """Base test setup for data transfers between butlers. The concrete tests 

2877 for specific configurations are in other classes, below. 

2878 """ 

2879 

2880 storageClassFactory: StorageClassFactory 

2881 

2882 @classmethod 

2883 def setUpClass(cls) -> None: 

2884 cls.storageClassFactory = StorageClassFactory() 

2885 

2886 def setUp(self) -> None: 

2887 self.root = makeTestTempDir(TESTDIR) 

2888 self.config = Config(self.configFile) 

2889 

2890 # Some tests cause convertors to be replaced so ensure 

2891 # the storage class factory is reset each time. 

2892 self.storageClassFactory.reset() 

2893 self.storageClassFactory.addFromConfig(self.configFile) 

2894 

2895 def tearDown(self) -> None: 

2896 removeTestTempDir(self.root) 

2897 

2898 def create_butler(self, manager: str | None, label: str, config_file: str | None = None) -> Butler: 

2899 if manager is None: 

2900 manager = ( 

2901 "lsst.daf.butler.registry.datasets.byDimensions.ByDimensionsDatasetRecordStorageManagerUUID" 

2902 ) 

2903 config = Config(config_file if config_file is not None else self.configFile) 

2904 config["registry", "managers", "datasets"] = manager 

2905 butler = Butler.from_config( 

2906 Butler.makeRepo(f"{self.root}/butler{label}", config=config), writeable=True 

2907 ) 

2908 self.enterContext(butler) 

2909 return butler 

2910 

2911 def assertButlerTransfers( 

2912 self, 

2913 purge: bool = False, 

2914 storageClassName: str = "StructuredData", 

2915 storageClassNameTarget: str | None = None, 

2916 ) -> None: 

2917 """Test that a run can be transferred to another butler.""" 

2918 storageClass = self.storageClassFactory.getStorageClass(storageClassName) 

2919 if storageClassNameTarget is not None: 

2920 storageClassTarget = self.storageClassFactory.getStorageClass(storageClassNameTarget) 

2921 else: 

2922 storageClassTarget = storageClass 

2923 

2924 datasetTypeName = "random_data" 

2925 

2926 # Test will create 3 collections and we will want to transfer 

2927 # two of those three. 

2928 runs = ["run1", "run2", "other"] 

2929 

2930 # Also want to use two different dataset types to ensure that 

2931 # grouping works. 

2932 datasetTypeNames = ["random_data", "random_data_2"] 

2933 

2934 # Create the run collections in the source butler. 

2935 for run in runs: 

2936 self.source_butler.collections.register(run) 

2937 

2938 # Create dimensions in source butler. 

2939 n_exposures = 30 

2940 self.source_butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

2941 self.source_butler.registry.insertDimensionData( 

2942 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

2943 ) 

2944 self.source_butler.registry.insertDimensionData( 

2945 "detector", {"instrument": "DummyCamComp", "id": 1, "full_name": "det1"} 

2946 ) 

2947 self.source_butler.registry.insertDimensionData( 

2948 "day_obs", 

2949 { 

2950 "instrument": "DummyCamComp", 

2951 "id": 20250101, 

2952 }, 

2953 ) 

2954 

2955 for i in range(n_exposures): 

2956 self.source_butler.registry.insertDimensionData( 

2957 "group", {"instrument": "DummyCamComp", "name": f"group{i}"} 

2958 ) 

2959 self.source_butler.registry.insertDimensionData( 

2960 "exposure", 

2961 { 

2962 "instrument": "DummyCamComp", 

2963 "id": i, 

2964 "obs_id": f"exp{i}", 

2965 "physical_filter": "d-r", 

2966 "group": f"group{i}", 

2967 "day_obs": 20250101, 

2968 }, 

2969 ) 

2970 

2971 # Create dataset types in the source butler. 

2972 dimensions = self.source_butler.dimensions.conform(["instrument", "exposure"]) 

2973 for datasetTypeName in datasetTypeNames: 

2974 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

2975 self.source_butler.registry.registerDatasetType(datasetType) 

2976 

2977 # Write a dataset to an unrelated run -- this will ensure that 

2978 # we are rewriting integer dataset ids in the target if necessary. 

2979 # Will not be relevant for UUID. 

2980 run = "distraction" 

2981 butler = Butler.from_config(butler=self.source_butler, run=run) 

2982 self.enterContext(butler) 

2983 butler.put( 

2984 makeExampleMetrics(), 

2985 datasetTypeName, 

2986 exposure=1, 

2987 instrument="DummyCamComp", 

2988 physical_filter="d-r", 

2989 ) 

2990 

2991 # Write some example metrics to the source 

2992 butler = Butler.from_config(butler=self.source_butler) 

2993 self.enterContext(butler) 

2994 

2995 # Set of DatasetRefs that should be in the list of refs to transfer 

2996 # but which will not be transferred. 

2997 deleted: set[DatasetRef] = set() 

2998 

2999 n_expected = 20 # Number of datasets expected to be transferred 

3000 source_refs = [] 

3001 for i in range(n_exposures): 

3002 # Put a third of datasets into each collection, only retain 

3003 # two thirds. 

3004 index = i % 3 

3005 run = runs[index] 

3006 datasetTypeName = datasetTypeNames[i % 2] 

3007 

3008 metric = MetricsExample( 

3009 summary={"counter": i}, output={"text": "metric"}, data=[2 * x for x in range(i)] 

3010 ) 

3011 dataId = {"exposure": i, "instrument": "DummyCamComp", "physical_filter": "d-r"} 

3012 ref = butler.put(metric, datasetTypeName, dataId=dataId, run=run) 

3013 

3014 # Remove the datastore record using low-level API, but only 

3015 # for a specific index. 

3016 if purge and index == 1: 

3017 # For one of these delete the file as well. 

3018 # This allows the "missing" code to filter the 

3019 # file out. 

3020 # Access the individual datastores. 

3021 datastores = [] 

3022 if hasattr(butler._datastore, "datastores"): 

3023 datastores.extend(butler._datastore.datastores) 

3024 else: 

3025 datastores.append(butler._datastore) 

3026 

3027 if not deleted: 

3028 # For a chained datastore we need to remove 

3029 # files in each chain. 

3030 for datastore in datastores: 

3031 # The file might not be known to the datastore 

3032 # if constraints are used. 

3033 try: 

3034 primary, uris = datastore.getURIs(ref) 

3035 except FileNotFoundError: 

3036 continue 

3037 if primary and primary.scheme != "mem": 

3038 primary.remove() 

3039 for uri in uris.values(): 

3040 if uri.scheme != "mem": 

3041 uri.remove() 

3042 n_expected -= 1 

3043 deleted.add(ref) 

3044 

3045 # Remove the datastore record. 

3046 for datastore in datastores: 

3047 if hasattr(datastore, "removeStoredItemInfo"): 

3048 datastore.removeStoredItemInfo(ref) 

3049 

3050 if index < 2: 

3051 source_refs.append(ref) 

3052 if ref not in deleted: 

3053 new_metric = butler.get(ref) 

3054 self.assertEqual(new_metric, metric) 

3055 

3056 # Create some bad dataset types to ensure we check for inconsistent 

3057 # definitions. 

3058 badStorageClass = self.storageClassFactory.getStorageClass("StructuredDataList") 

3059 for datasetTypeName in datasetTypeNames: 

3060 datasetType = DatasetType(datasetTypeName, dimensions, badStorageClass) 

3061 self.target_butler.registry.registerDatasetType(datasetType) 

3062 with self.assertRaises(ConflictingDefinitionError) as cm: 

3063 self.target_butler.transfer_from(self.source_butler, source_refs) 

3064 self.assertIn("dataset type differs", str(cm.exception)) 

3065 

3066 # And remove the bad definitions. 

3067 for datasetTypeName in datasetTypeNames: 

3068 self.target_butler.registry.removeDatasetType(datasetTypeName) 

3069 

3070 # Transfer without creating dataset types should fail. 

3071 with self.assertRaises(KeyError): 

3072 self.target_butler.transfer_from(self.source_butler, source_refs) 

3073 

3074 # Transfer without creating dimensions should fail. 

3075 with self.assertRaises(ConflictingDefinitionError) as cm: 

3076 self.target_butler.transfer_from(self.source_butler, source_refs, register_dataset_types=True) 

3077 self.assertIn("dimension", str(cm.exception)) 

3078 

3079 # The dry run test requires dataset types to exist. If we have 

3080 # been given distinct storage classes for the target we have 

3081 # to redefine at least one of the dataset types in the target butler. 

3082 if storageClass != storageClassTarget: 

3083 self.target_butler.registry.removeDatasetType(datasetTypeNames[0]) 

3084 datasetType = DatasetType(datasetTypeNames[0], dimensions, storageClassTarget) 

3085 self.target_butler.registry.registerDatasetType(datasetType) 

3086 

3087 # The failed transfer above leaves registry in an inconsistent 

3088 # state because the run is created but then rolled back without 

3089 # the collection cache being cleared. For now force a refresh. 

3090 # Can remove with DM-35498. 

3091 self.target_butler.registry.refresh() 

3092 

3093 # Do a dry run -- this should not have any effect on the target butler. 

3094 self.target_butler.transfer_from(self.source_butler, source_refs, dry_run=True) 

3095 

3096 # Transfer the records for one ref to test the alternative API. 

3097 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: 

3098 self.target_butler.transfer_dimension_records_from(self.source_butler, [source_refs[0]]) 

3099 self.assertIn("number of records transferred: 1", ";".join(log_cm.output)) 

3100 

3101 # Now transfer them to the second butler, including dimensions. 

3102 with self.assertLogs(logger="lsst", level=logging.DEBUG) as log_cm: 

3103 transferred = self.target_butler.transfer_from( 

3104 self.source_butler, 

3105 source_refs, 

3106 register_dataset_types=True, 

3107 transfer_dimensions=True, 

3108 ) 

3109 self.assertEqual(len(transferred), n_expected) 

3110 log_output = ";".join(log_cm.output) 

3111 

3112 # A ChainedDatastore will use the in-memory datastore for mexists 

3113 # so we can not rely on the mexists log message. 

3114 self.assertIn("Number of datastore records found in source", log_output) 

3115 self.assertIn("Creating output run", log_output) 

3116 

3117 # Do the transfer twice to ensure that it will do nothing extra. 

3118 # Only do this if purge=True because it does not work for int 

3119 # dataset_id. 

3120 if purge: 

3121 # This should not need to register dataset types. 

3122 transferred = self.target_butler.transfer_from(self.source_butler, source_refs) 

3123 self.assertEqual(len(transferred), n_expected) 

3124 

3125 with self.assertRaises((TypeError, AttributeError)): 

3126 self.target_butler._datastore.transfer_from(self.source_butler, source_refs) # type: ignore 

3127 

3128 with self.assertRaises(ValueError): 

3129 self.target_butler._datastore.transfer_from( 

3130 self.source_butler._datastore, source_refs, transfer="split" 

3131 ) 

3132 

3133 # Now try to get the same refs from the new butler. 

3134 for ref in source_refs: 

3135 if ref not in deleted: 

3136 new_metric = self.target_butler.get(ref) 

3137 old_metric = self.source_butler.get(ref) 

3138 self.assertEqual(new_metric, old_metric) 

3139 

3140 # Try again without implicit storage class conversion 

3141 # triggered by using the source ref. This will do conversion 

3142 # since the formatter will be returning the source python type. 

3143 target_ref = self.target_butler.get_dataset(ref.id) 

3144 if target_ref.datasetType.storageClass != ref.datasetType.storageClass: 

3145 new_metric = self.target_butler.get(target_ref) 

3146 self.assertNotEqual(type(new_metric), type(old_metric)) 

3147 

3148 # Remove the dataset from the target and put it again 

3149 # as if it was the right type all along for this butler. 

3150 self.target_butler.pruneDatasets( 

3151 [target_ref], unstore=True, purge=True, disassociate=True 

3152 ) 

3153 self.target_butler.put(new_metric, target_ref) 

3154 new_new_metric = self.target_butler.get(target_ref) 

3155 new_old_metric = self.target_butler.get( 

3156 target_ref, storageClass=ref.datasetType.storageClass 

3157 ) 

3158 self.assertEqual(new_new_metric, new_metric) 

3159 self.assertEqual(new_old_metric, old_metric) 

3160 

3161 # Now prune run2 collection and create instead a CHAINED collection. 

3162 # This should block the transfer. 

3163 self.target_butler.removeRuns(["run2"]) 

3164 self.target_butler.collections.register("run2", CollectionType.CHAINED) 

3165 with self.assertRaises(CollectionTypeError): 

3166 # Re-importing the run1 datasets can be problematic if they 

3167 # use integer IDs so filter those out. 

3168 to_transfer = [ref for ref in source_refs if ref.run == "run2"] 

3169 self.target_butler.transfer_from(self.source_butler, to_transfer) 

3170 

3171 

3172class PosixDatastoreTransfers(DatastoreTransfers, unittest.TestCase): 

3173 """Test data transfers between butlers. 

3174 

3175 Test for different managers. UUID to UUID and integer to integer are 

3176 tested. UUID to integer is not supported since we do not currently 

3177 want to allow that. Integer to UUID is supported with the caveat 

3178 that UUID4 will be generated and this will be incorrect for raw 

3179 dataset types. The test ignores that. 

3180 """ 

3181 

3182 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3183 

3184 def create_butlers( 

3185 self, manager1: str | None = None, manager2: str | None = None, source_config: str | None = None 

3186 ) -> None: 

3187 self.source_butler = self.create_butler(manager1, "1", config_file=source_config) 

3188 self.target_butler = self.create_butler(manager2, "2") 

3189 

3190 def testTransferUuidToUuid(self) -> None: 

3191 self.create_butlers() 

3192 self.assertButlerTransfers() 

3193 

3194 def testTransferFromChainedUuidToUuid(self) -> None: 

3195 """Force the source butler to be a ChainedDatastore.""" 

3196 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-chained.yaml")) 

3197 self.assertButlerTransfers() 

3198 

3199 def testTransferFromIncompatibleUuidToUuid(self) -> None: 

3200 """Force the source butler to be a incompatible datastore.""" 

3201 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")) 

3202 with self.assertRaises(NotImplementedError): 

3203 self.assertButlerTransfers() 

3204 

3205 def testTransferFromIncompatibleChainUuidToUuid(self) -> None: 

3206 """Force the source butler to be a incompatible datastore.""" 

3207 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler-inmemory-chain.yaml")) 

3208 with self.assertRaises(TypeError): 

3209 self.assertButlerTransfers() 

3210 

3211 def testTransferFromFileUuidToUuid(self) -> None: 

3212 """Force the source butler to be a FileDatastore.""" 

3213 self.create_butlers(source_config=os.path.join(TESTDIR, "config/basic/butler.yaml")) 

3214 self.assertButlerTransfers() 

3215 

3216 def testTransferMissing(self) -> None: 

3217 """Test transfers where datastore records are missing. 

3218 

3219 This is how execution butler works. 

3220 """ 

3221 self.create_butlers() 

3222 

3223 # Configure the source butler to allow trust. 

3224 self.source_butler._datastore._set_trust_mode(True) 

3225 

3226 self.assertButlerTransfers(purge=True) 

3227 

3228 def testTransferMissingDisassembly(self) -> None: 

3229 """Test transfers where datastore records are missing. 

3230 

3231 This is how execution butler works. 

3232 """ 

3233 self.create_butlers() 

3234 

3235 # Configure the source butler to allow trust. 

3236 self.source_butler._datastore._set_trust_mode(True) 

3237 

3238 # Test disassembly. 

3239 self.assertButlerTransfers(purge=True, storageClassName="StructuredComposite") 

3240 

3241 def testTransferDifferingStorageClasses(self) -> None: 

3242 """Test transfers when the source butler dataset type has a different 

3243 but compatible storage class. 

3244 """ 

3245 self.create_butlers() 

3246 

3247 self.assertButlerTransfers(storageClassNameTarget="MetricsConversion") 

3248 

3249 def testTransferDifferingStorageClassesDisassembly(self) -> None: 

3250 """Test transfers when the source butler dataset type has a different 

3251 but compatible storage class and where the source butler has 

3252 disassembled. 

3253 """ 

3254 self.create_butlers() 

3255 

3256 self.assertButlerTransfers( 

3257 storageClassName="StructuredComposite", storageClassNameTarget="MetricsConversion" 

3258 ) 

3259 

3260 def testUnsafeDirectTransfer(self) -> None: 

3261 """Test that transfer='unsafe_direct' records the absolute URI of 

3262 source files in the target datastore. 

3263 """ 

3264 self.create_butlers() 

3265 dataset_type = DatasetType("dt", [], "int", universe=self.source_butler.dimensions) 

3266 self.source_butler.registry.registerDatasetType(dataset_type) 

3267 self.source_butler.collections.register("run") 

3268 ref = self.source_butler.put(123, "dt", [], run="run") 

3269 self.target_butler.transfer_from( 

3270 self.source_butler, [ref], transfer="unsafe_direct", register_dataset_types=True 

3271 ) 

3272 self.assertEqual(self.target_butler.get(ref), 123) 

3273 self.assertEqual(self.source_butler.getURI(ref), self.target_butler.getURI(ref)) 

3274 

3275 def testAbsoluteURITransferDirect(self) -> None: 

3276 """Test transfer using an absolute URI.""" 

3277 self._absolute_transfer("auto") 

3278 

3279 def testAbsoluteURITransferUnsafeDirect(self) -> None: 

3280 """Test transfer using an absolute URI.""" 

3281 self._absolute_transfer("unsafe_direct") 

3282 

3283 def testAbsoluteURITransferCopy(self) -> None: 

3284 """Test transfer using an absolute URI.""" 

3285 self._absolute_transfer("copy") 

3286 

3287 def _absolute_transfer(self, transfer: str) -> None: 

3288 self.create_butlers() 

3289 

3290 storageClassName = "StructuredData" 

3291 storageClass = self.storageClassFactory.getStorageClass(storageClassName) 

3292 datasetTypeName = "random_data" 

3293 run = "run1" 

3294 self.source_butler.collections.register(run) 

3295 

3296 dimensions = self.source_butler.dimensions.conform(()) 

3297 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

3298 self.source_butler.registry.registerDatasetType(datasetType) 

3299 

3300 metrics = makeExampleMetrics() 

3301 with ResourcePath.temporary_uri(suffix=".json") as temp: 

3302 dataId = DataCoordinate.make_empty(self.source_butler.dimensions) 

3303 source_refs = [DatasetRef(datasetType, dataId, run=run)] 

3304 temp.write(json.dumps(metrics.exportAsDict()).encode()) 

3305 dataset = FileDataset(path=temp, refs=source_refs) 

3306 self.source_butler.ingest(dataset, transfer="direct") 

3307 

3308 self.target_butler.transfer_from( 

3309 self.source_butler, dataset.refs, register_dataset_types=True, transfer=transfer 

3310 ) 

3311 

3312 uri = self.target_butler.getURI(dataset.refs[0]) 

3313 if transfer == "auto" or transfer == "unsafe_direct": 

3314 self.assertEqual(uri, temp) 

3315 else: 

3316 self.assertNotEqual(uri, temp) 

3317 

3318 def test_shared_dimension_group(self): 

3319 """Test internal logic that divides dataset types by dimension group 

3320 when doing registry updates. 

3321 """ 

3322 self.create_butlers() 

3323 self.source_butler.import_(filename=_get_test_data_path("base.yaml"), without_datastore=True) 

3324 self.source_butler.import_(filename=_get_test_data_path("datasets.yaml"), without_datastore=True) 

3325 

3326 source_butler = self.source_butler 

3327 target_butler = self.target_butler 

3328 

3329 # Create a dataset type with the same dimensions as the 'bias' dataset 

3330 # type from base.yaml 

3331 dataset_type = DatasetType( 

3332 "test_type", ["instrument", "detector"], "int", universe=source_butler.dimensions 

3333 ) 

3334 source_butler.registry.registerDatasetType(dataset_type) 

3335 # This has the same data ID as one of the bias datasets in 

3336 # datasets.yaml. 

3337 test_ref = source_butler.registry.insertDatasets( 

3338 "test_type", [{"instrument": "Cam1", "detector": 2}], run="imported_g" 

3339 )[0] 

3340 

3341 biases = source_butler.query_datasets("bias", ["imported_g", "imported_r"]) 

3342 flats = source_butler.query_datasets("flat", ["imported_g", "imported_r"]) 

3343 refs = [test_ref, *biases, *flats] 

3344 

3345 # Test setup will be even more convoluted if we want the datastore to 

3346 # actually transfer files. For testing the dimension group behavior, 

3347 # we really only care about the registry. 

3348 with unittest.mock.patch.object(target_butler._datastore, "transfer_from") as mock: 

3349 mock.return_value = (set(refs), set()) 

3350 target_butler.transfer_from( 

3351 source_butler, 

3352 refs, 

3353 transfer=None, 

3354 register_dataset_types=True, 

3355 skip_missing=False, 

3356 transfer_dimensions=True, 

3357 ) 

3358 

3359 transferred_test_ref = target_butler.find_dataset( 

3360 "test_type", {"instrument": "Cam1", "detector": 2}, collections="imported_g" 

3361 ) 

3362 self.assertEqual(transferred_test_ref.id, test_ref.id) 

3363 

3364 transferred_bias = target_butler.find_dataset( 

3365 "bias", {"instrument": "Cam1", "detector": 2}, collections="imported_g" 

3366 ) 

3367 self.assertEqual(transferred_bias.id, uuid.UUID("51352db4-a47a-447c-b12d-a50b206b17cd")) 

3368 

3369 transferred_flat = target_butler.find_dataset( 

3370 "flat", 

3371 {"instrument": "Cam1", "detector": 2, "physical_filter": "Cam1-R1", "band": "r"}, 

3372 collections="imported_r", 

3373 ) 

3374 self.assertEqual(transferred_flat.id, uuid.UUID("c1296796-56c5-4acf-9b49-40d920c6f840")) 

3375 

3376 

3377class ChainedDatastoreTransfers(PosixDatastoreTransfers): 

3378 """Test transfers using a chained datastore.""" 

3379 

3380 configFile = os.path.join(TESTDIR, "config/basic/butler-chained.yaml") 

3381 

3382 

3383@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3384class ButlerServerDatastoreTransfers(DatastoreTransfers, unittest.TestCase): 

3385 """Test ``transfer_from`` involving Butler server.""" 

3386 

3387 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3388 

3389 def test_transfers_from_remote_to_direct(self) -> None: 

3390 from lsst.daf.butler.remote_butler._remote_file_transfer_source import ( 

3391 mock_file_transfer_uris_for_unit_test, 

3392 ) 

3393 

3394 self.target_butler = self.create_butler(None, "2") 

3395 with create_test_server(TESTDIR) as server: 

3396 self.source_butler = server.hybrid_butler 

3397 

3398 def _remap_transfer_url(path: HttpResourcePath) -> HttpResourcePath: 

3399 # The Butler server returns HTTP URIs with a domain name that 

3400 # is not resolvable because there is no actual HTTP server 

3401 # involved in these tests. Strip this first layer of 

3402 # indirection, and return the target of the redirect instead. 

3403 response = server.client.get(str(path), follow_redirects=False, headers=path._extra_headers) 

3404 return ResourcePath(str(response.next_request.url)) 

3405 

3406 with mock_file_transfer_uris_for_unit_test(_remap_transfer_url): 

3407 self.assertButlerTransfers() 

3408 

3409 

3410class TransferDatasetsInPlace(unittest.TestCase): 

3411 """Test behavior of transfer_datasets_in_place() specialty function used by 

3412 Prompt Publication service. 

3413 """ 

3414 

3415 def test_file_datastore(self) -> None: 

3416 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3417 with ( 

3418 tempfile.TemporaryDirectory() as datastore_root, 

3419 tempfile.TemporaryDirectory() as other_repo_root, 

3420 ): 

3421 config = Config(configFile) 

3422 config["datastore", "datastore", "name"] = "file_datastore" 

3423 Butler.makeRepo(datastore_root, config=config) 

3424 config["datastore", "datastore", "root"] = datastore_root 

3425 Butler.makeRepo(other_repo_root, config, forceConfigRoot=False) 

3426 with ( 

3427 Butler(datastore_root, writeable=True) as source_butler, 

3428 Butler(other_repo_root, writeable=True) as target_butler, 

3429 ): 

3430 self._test_transfer_datasets_in_place(source_butler, target_butler) 

3431 

3432 def test_chained_datastore(self) -> None: 

3433 configFile = os.path.join(TESTDIR, "config/basic/butler-chained-posix.yaml") 

3434 with ( 

3435 tempfile.TemporaryDirectory() as datastore_root, 

3436 tempfile.TemporaryDirectory() as other_repo_root, 

3437 ): 

3438 config = Config(configFile) 

3439 config["datastore", "datastore", "datastores", 0, "datastore", "root"] = ( 

3440 f"{datastore_root}/butler_test_repository" 

3441 ) 

3442 config["datastore", "datastore", "datastores", 1, "datastore", "root"] = ( 

3443 f"{datastore_root}/butler_test_repository2" 

3444 ) 

3445 Butler.makeRepo(datastore_root, config=config, forceConfigRoot=False) 

3446 Butler.makeRepo(other_repo_root, config=config, forceConfigRoot=False) 

3447 with ( 

3448 Butler(datastore_root, writeable=True) as source_butler, 

3449 Butler(other_repo_root, writeable=True) as target_butler, 

3450 ): 

3451 self._test_transfer_datasets_in_place(source_butler, target_butler) 

3452 

3453 def _test_transfer_datasets_in_place( 

3454 self, source_butler: DirectButler, target_butler: DirectButler 

3455 ) -> None: 

3456 metric_repo = MetricTestRepo.create_from_butler( 

3457 source_butler, 

3458 source_butler._config, 

3459 ) 

3460 target_butler.transfer_dimension_records_from(source_butler, [metric_repo.ref1, metric_repo.ref2]) 

3461 # Verify that the setup was correct and the two repos have 

3462 # independent registries. 

3463 self.assertIsNone(target_butler.get_dataset(metric_repo.ref1.id)) 

3464 # Copy one dataset, and make sure we can load it from the 

3465 # target repo. 

3466 self.assertEqual( 

3467 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]), 

3468 [metric_repo.ref1], 

3469 ) 

3470 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3471 self.assertIsNone(target_butler.get_dataset(metric_repo.ref2.id)) 

3472 self.assertEqual(source_butler.getURIs(metric_repo.ref1), target_butler.getURIs(metric_repo.ref1)) 

3473 # Trying to copy the same dataset again is a no-op. 

3474 self.assertEqual( 

3475 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1]), 

3476 [], 

3477 ) 

3478 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3479 # A mix of existing and non-existing datasets. 

3480 self.assertEqual( 

3481 transfer_datasets_in_place(source_butler, target_butler, [metric_repo.ref1, metric_repo.ref2]), 

3482 [metric_repo.ref2], 

3483 ) 

3484 self.assertEqual(target_butler.get(metric_repo.ref1), source_butler.get(metric_repo.ref1)) 

3485 self.assertEqual(target_butler.get(metric_repo.ref2), source_butler.get(metric_repo.ref2)) 

3486 

3487 # For testing datastore chaining, set up a dataset that is only 

3488 # accepted by one of the datastores. 

3489 source_butler.registry.registerDatasetType( 

3490 DatasetType("rejected_by_first", source_butler.dimensions.conform([]), "int") 

3491 ) 

3492 source_butler.registry.registerRun("run") 

3493 ref = source_butler.put(1, "rejected_by_first", dataId={}, run="run") 

3494 self.assertEqual( 

3495 transfer_datasets_in_place(source_butler, target_butler, [ref]), 

3496 [ref], 

3497 ) 

3498 self.assertEqual(1, target_butler.get(ref)) 

3499 

3500 

3501class NullDatastoreTestCase(unittest.TestCase): 

3502 """Test that we can fall back to a null datastore.""" 

3503 

3504 # Need a good config to create the repo. 

3505 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

3506 storageClassFactory: StorageClassFactory 

3507 

3508 @classmethod 

3509 def setUpClass(cls) -> None: 

3510 cls.storageClassFactory = StorageClassFactory() 

3511 cls.storageClassFactory.addFromConfig(cls.configFile) 

3512 

3513 def setUp(self) -> None: 

3514 """Create a new butler root for each test.""" 

3515 self.root = makeTestTempDir(TESTDIR) 

3516 Butler.makeRepo(self.root, config=Config(self.configFile)) 

3517 

3518 def tearDown(self) -> None: 

3519 removeTestTempDir(self.root) 

3520 

3521 def test_fallback(self) -> None: 

3522 # Read the butler config and mess with the datastore section. 

3523 config_path = os.path.join(self.root, "butler.yaml") 

3524 bad_config = Config(config_path) 

3525 bad_config["datastore", "cls"] = "lsst.not.a.datastore.Datastore" 

3526 bad_config.dumpToUri(config_path) 

3527 

3528 with self.assertRaises(RuntimeError): 

3529 Butler(self.root, without_datastore=False) 

3530 

3531 with self.assertRaises(RuntimeError): 

3532 Butler.from_config(self.root, without_datastore=False) 

3533 

3534 butler = Butler.from_config(self.root, writeable=True, without_datastore=True) 

3535 self.enterContext(butler) 

3536 self.assertIsInstance(butler._datastore, NullDatastore) 

3537 

3538 # Check that registry is working. 

3539 butler.collections.register("MYRUN") 

3540 collections = butler.collections.query("*") 

3541 self.assertIn("MYRUN", set(collections)) 

3542 

3543 # Create a ref. 

3544 dimensions = butler.dimensions.conform([]) 

3545 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

3546 datasetTypeName = "metric" 

3547 datasetType = DatasetType(datasetTypeName, dimensions, storageClass) 

3548 butler.registry.registerDatasetType(datasetType) 

3549 ref = DatasetRef(datasetType, {}, run="MYRUN") 

3550 

3551 # Check that datastore will complain. 

3552 with self.assertRaises(FileNotFoundError): 

3553 butler.get(ref) 

3554 with self.assertRaises(FileNotFoundError): 

3555 butler.getURI(ref) 

3556 

3557 

3558@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3559class ButlerServerTests(FileDatastoreButlerTests): 

3560 """Test RemoteButler and Butler server.""" 

3561 

3562 configFile = None 

3563 predictionSupported = False 

3564 trustModeSupported = False 

3565 

3566 postgres: TemporaryPostgresInstance | None 

3567 

3568 def setUp(self): 

3569 self.server_instance = self.enterContext(create_test_server(TESTDIR)) 

3570 

3571 def tearDown(self): 

3572 pass 

3573 

3574 def are_uris_equivalent(self, uri1: ResourcePath, uri2: ResourcePath) -> bool: 

3575 # S3 pre-signed URLs may end up with differing expiration times in the 

3576 # query parameters, so ignore query parameters when comparing. 

3577 return uri1.scheme == uri2.scheme and uri1.netloc == uri2.netloc and uri1.path == uri2.path 

3578 

3579 def create_empty_butler( 

3580 self, 

3581 run: str | None = None, 

3582 writeable: bool | None = None, 

3583 metrics: ButlerMetrics | None = None, 

3584 cleanup: bool = True, 

3585 ) -> Butler: 

3586 return self.server_instance.hybrid_butler.clone(run=run, metrics=metrics) 

3587 

3588 def remove_dataset_out_of_band(self, butler: Butler, ref: DatasetRef) -> None: 

3589 # Can't delete a file via S3 signed URLs, so we need to reach in 

3590 # through DirectButler to delete the dataset. 

3591 uri = self.server_instance.direct_butler.getURI(ref) 

3592 uri.remove() 

3593 

3594 def testConstructor(self): 

3595 # RemoteButler constructor is tested in test_server.py and 

3596 # test_remote_butler.py. 

3597 pass 

3598 

3599 def testDafButlerRepositories(self): 

3600 # Loading of RemoteButler via repository index is tested in 

3601 # test_server.py. 

3602 pass 

3603 

3604 def testGetDatasetTypes(self) -> None: 

3605 # This is mostly a test of validateConfiguration, which is for 

3606 # validating Datastore configuration and thus isn't relevant to 

3607 # RemoteButler. 

3608 pass 

3609 

3610 def testMakeRepo(self) -> None: 

3611 # Only applies to DirectButler. 

3612 pass 

3613 

3614 # Pickling not yet implemented for RemoteButler/HybridButler. 

3615 @unittest.expectedFailure 

3616 def testPickle(self) -> None: 

3617 return super().testPickle() 

3618 

3619 def testStringification(self) -> None: 

3620 self.assertEqual( 

3621 str(self.server_instance.remote_butler), 

3622 "RemoteButler(https://test.example/api/butler/repo/testrepo/)", 

3623 ) 

3624 

3625 def testTransaction(self) -> None: 

3626 # Transactions will never be supported for RemoteButler. 

3627 pass 

3628 

3629 def testPutTemplates(self) -> None: 

3630 # The Butler server instance is configured with different file naming 

3631 # templates than this test is expecting. 

3632 pass 

3633 

3634 

3635@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3636class ButlerServerSqliteTests(ButlerServerTests, unittest.TestCase): 

3637 """Tests for RemoteButler's registry shim, with a SQLite DB backing the 

3638 server. 

3639 """ 

3640 

3641 postgres = None 

3642 

3643 

3644@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

3645class ButlerServerPostgresTests(ButlerServerTests, unittest.TestCase): 

3646 """Tests for RemoteButler's registry shim, with a Postgres DB backing the 

3647 server. 

3648 """ 

3649 

3650 @classmethod 

3651 def setUpClass(cls): 

3652 cls.postgres = cls.enterClassContext(setup_postgres_test_db()) 

3653 super().setUpClass() 

3654 

3655 

3656def setup_module(module: types.ModuleType) -> None: 

3657 """Set up the module for pytest.""" 

3658 clean_environment() 

3659 

3660 

3661def _get_test_data_path(filename: str) -> ResourcePath: 

3662 return ResourcePath(f"resource://lsst.daf.butler/tests/registry_data/{filename}") 

3663 

3664 

3665if __name__ == "__main__": 

3666 clean_environment() 

3667 unittest.main()