Coverage for tests / test_datastore.py: 13%

1214 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import contextlib 

31import os 

32import pickle 

33import shutil 

34import tempfile 

35import time 

36import unittest 

37import unittest.mock 

38import uuid 

39from collections.abc import Callable, Iterator 

40from typing import Any, cast 

41 

42import yaml 

43 

44import lsst.utils.tests 

45from lsst.daf.butler import ( 

46 Config, 

47 DataCoordinate, 

48 DatasetIdGenEnum, 

49 DatasetRef, 

50 DatasetType, 

51 DatasetTypeNotSupportedError, 

52 Datastore, 

53 DimensionUniverse, 

54 FileDataset, 

55 StorageClass, 

56 StorageClassFactory, 

57) 

58from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreValidationError, NullDatastore 

59from lsst.daf.butler.datastore.cache_manager import ( 

60 DatastoreCacheManager, 

61 DatastoreCacheManagerConfig, 

62 DatastoreDisabledCacheManager, 

63) 

64from lsst.daf.butler.datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData 

65from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo, make_datastore_path_relative 

66from lsst.daf.butler.formatters.yaml import YamlFormatter 

67from lsst.daf.butler.tests import ( 

68 BadNoWriteFormatter, 

69 BadWriteFormatter, 

70 DatasetTestHelper, 

71 DatastoreTestHelper, 

72 DummyRegistry, 

73 MetricsExample, 

74 MetricsExampleDataclass, 

75 MetricsExampleModel, 

76) 

77from lsst.daf.butler.tests.dict_convertible_model import DictConvertibleModel 

78from lsst.daf.butler.tests.utils import TestCaseMixin 

79from lsst.resources import ResourcePath 

80from lsst.utils import doImport 

81from lsst.utils.introspection import get_full_type_name 

82 

83TESTDIR = os.path.dirname(__file__) 

84 

85 

86def makeExampleMetrics(use_none: bool = False) -> MetricsExample: 

87 """Make example dataset that can be stored in butler.""" 

88 if use_none: 

89 array = None 

90 else: 

91 array = [563, 234, 456.7, 105, 2054, -1045] 

92 return MetricsExample( 

93 {"AM1": 5.2, "AM2": 30.6}, 

94 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

95 array, 

96 ) 

97 

98 

99class TransactionTestError(Exception): 

100 """Specific error for transactions, to prevent misdiagnosing 

101 that might otherwise occur when a standard exception is used. 

102 """ 

103 

104 pass 

105 

106 

107class DatastoreTestsBase(DatasetTestHelper, DatastoreTestHelper, TestCaseMixin): 

108 """Support routines for datastore testing""" 

109 

110 root: str | None = None 

111 universe: DimensionUniverse 

112 storageClassFactory: StorageClassFactory 

113 

114 @classmethod 

115 def setUpClass(cls) -> None: 

116 # Storage Classes are fixed for all datastores in these tests 

117 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml") 

118 cls.storageClassFactory = StorageClassFactory() 

119 cls.storageClassFactory.addFromConfig(scConfigFile) 

120 

121 # Read the Datastore config so we can get the class 

122 # information (since we should not assume the constructor 

123 # name here, but rely on the configuration file itself) 

124 datastoreConfig = DatastoreConfig(cls.configFile) 

125 cls.datastoreType = cast(type[Datastore], doImport(datastoreConfig["cls"])) 

126 cls.universe = DimensionUniverse() 

127 

128 def setUp(self) -> None: 

129 self.setUpDatastoreTests(DummyRegistry, DatastoreConfig) 

130 

131 def tearDown(self) -> None: 

132 if self.root is not None and os.path.exists(self.root): 

133 shutil.rmtree(self.root, ignore_errors=True) 

134 

135 

136class DatastoreTests(DatastoreTestsBase): 

137 """Some basic tests of a simple datastore.""" 

138 

139 hasUnsupportedPut = True 

140 rootKeys: tuple[str, ...] | None = None 

141 isEphemeral: bool = False 

142 validationCanFail: bool = False 

143 

144 def testConfigRoot(self) -> None: 

145 full = DatastoreConfig(self.configFile) 

146 config = DatastoreConfig(self.configFile, mergeDefaults=False) 

147 newroot = "/random/location" 

148 self.datastoreType.setConfigRoot(newroot, config, full) 

149 if self.rootKeys: 

150 for k in self.rootKeys: 

151 self.assertIn(newroot, config[k]) 

152 

153 def testConstructor(self) -> None: 

154 datastore = self.makeDatastore() 

155 self.assertIsNotNone(datastore) 

156 self.assertIs(datastore.isEphemeral, self.isEphemeral) 

157 

158 def testConfigurationValidation(self) -> None: 

159 datastore = self.makeDatastore() 

160 sc = self.storageClassFactory.getStorageClass("ThingOne") 

161 datastore.validateConfiguration([sc]) 

162 

163 sc2 = self.storageClassFactory.getStorageClass("ThingTwo") 

164 if self.validationCanFail: 

165 with self.assertRaises(DatastoreValidationError): 

166 datastore.validateConfiguration([sc2], logFailures=True) 

167 

168 dimensions = self.universe.conform(("visit", "physical_filter")) 

169 dataId = { 

170 "instrument": "dummy", 

171 "visit": 52, 

172 "physical_filter": "V", 

173 "band": "v", 

174 "day_obs": 20250101, 

175 } 

176 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

177 datastore.validateConfiguration([ref]) 

178 

179 def testParameterValidation(self) -> None: 

180 """Check that parameters are validated""" 

181 sc = self.storageClassFactory.getStorageClass("ThingOne") 

182 dimensions = self.universe.conform(("visit", "physical_filter")) 

183 dataId = { 

184 "instrument": "dummy", 

185 "visit": 52, 

186 "physical_filter": "V", 

187 "band": "v", 

188 "day_obs": 20250101, 

189 } 

190 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

191 datastore = self.makeDatastore() 

192 data = {1: 2, 3: 4} 

193 datastore.put(data, ref) 

194 newdata = datastore.get(ref) 

195 self.assertEqual(data, newdata) 

196 with self.assertRaises(KeyError): 

197 newdata = datastore.get(ref, parameters={"missing": 5}) 

198 

199 def testBasicPutGet(self) -> None: 

200 metrics = makeExampleMetrics() 

201 datastore = self.makeDatastore() 

202 

203 # Create multiple storage classes for testing different formulations 

204 storageClasses = [ 

205 self.storageClassFactory.getStorageClass(sc) 

206 for sc in ("StructuredData", "StructuredDataJson", "StructuredDataPickle") 

207 ] 

208 

209 dimensions = self.universe.conform(("visit", "physical_filter")) 

210 dataId = { 

211 "instrument": "dummy", 

212 "visit": 52, 

213 "physical_filter": "V", 

214 "band": "v", 

215 "day_obs": 20250101, 

216 } 

217 dataId2 = { 

218 "instrument": "dummy", 

219 "visit": 53, 

220 "physical_filter": "V", 

221 "band": "v", 

222 "day_obs": 20250101, 

223 } 

224 

225 for sc in storageClasses: 

226 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

227 ref2 = self.makeDatasetRef("metric", dimensions, sc, dataId2) 

228 

229 # Make sure that using getManyURIs without predicting before the 

230 # dataset has been put raises. 

231 with self.assertRaises(FileNotFoundError): 

232 datastore.getManyURIs([ref], predict=False) 

233 

234 # Make sure that using getManyURIs with predicting before the 

235 # dataset has been put predicts the URI. 

236 uris = datastore.getManyURIs([ref, ref2], predict=True) 

237 self.assertIn("52", uris[ref].primaryURI.geturl()) 

238 self.assertIn("#predicted", uris[ref].primaryURI.geturl()) 

239 self.assertIn("53", uris[ref2].primaryURI.geturl()) 

240 self.assertIn("#predicted", uris[ref2].primaryURI.geturl()) 

241 

242 datastore.put(metrics, ref) 

243 

244 # Does it exist? 

245 self.assertTrue(datastore.exists(ref)) 

246 self.assertTrue(datastore.knows(ref)) 

247 multi = datastore.knows_these([ref]) 

248 self.assertTrue(multi[ref]) 

249 multi = datastore.mexists([ref, ref2]) 

250 self.assertTrue(multi[ref]) 

251 self.assertFalse(multi[ref2]) 

252 

253 # Get 

254 metricsOut = datastore.get(ref, parameters=None) 

255 self.assertEqual(metrics, metricsOut) 

256 

257 uri = datastore.getURI(ref) 

258 self.assertEqual(uri.scheme, self.uriScheme) 

259 

260 uris = datastore.getManyURIs([ref]) 

261 self.assertEqual(len(uris), 1) 

262 ref, uri = uris.popitem() 

263 self.assertTrue(uri.primaryURI.exists()) 

264 self.assertFalse(uri.componentURIs) 

265 

266 # Get a component -- we need to construct new refs for them 

267 # with derived storage classes but with parent ID 

268 for comp in ("data", "output"): 

269 compRef = ref.makeComponentRef(comp) 

270 output = datastore.get(compRef) 

271 self.assertEqual(output, getattr(metricsOut, comp)) 

272 

273 uri = datastore.getURI(compRef) 

274 self.assertEqual(uri.scheme, self.uriScheme) 

275 

276 uris = datastore.getManyURIs([compRef]) 

277 self.assertEqual(len(uris), 1) 

278 

279 storageClass = sc 

280 

281 # Check that we can put a metric with None in a component and 

282 # get it back as None 

283 metricsNone = makeExampleMetrics(use_none=True) 

284 dataIdNone = { 

285 "instrument": "dummy", 

286 "visit": 54, 

287 "physical_filter": "V", 

288 "band": "v", 

289 "day_obs": 20250101, 

290 } 

291 refNone = self.makeDatasetRef("metric", dimensions, sc, dataIdNone) 

292 datastore.put(metricsNone, refNone) 

293 

294 comp = "data" 

295 for comp in ("data", "output"): 

296 compRef = refNone.makeComponentRef(comp) 

297 output = datastore.get(compRef) 

298 self.assertEqual(output, getattr(metricsNone, comp)) 

299 

300 # Check that a put fails if the dataset type is not supported 

301 if self.hasUnsupportedPut: 

302 sc = StorageClass("UnsupportedSC", pytype=type(metrics)) 

303 ref = self.makeDatasetRef("unsupportedType", dimensions, sc, dataId) 

304 with self.assertRaises(DatasetTypeNotSupportedError): 

305 datastore.put(metrics, ref) 

306 

307 # These should raise 

308 ref = self.makeDatasetRef("metrics", dimensions, storageClass, dataId) 

309 with self.assertRaises(FileNotFoundError): 

310 # non-existing file 

311 datastore.get(ref) 

312 

313 # Get a URI from it 

314 uri = datastore.getURI(ref, predict=True) 

315 self.assertEqual(uri.scheme, self.uriScheme) 

316 

317 with self.assertRaises(FileNotFoundError): 

318 datastore.getURI(ref) 

319 

320 def testTrustGetRequest(self) -> None: 

321 """Check that we can get datasets that registry knows nothing about.""" 

322 datastore = self.makeDatastore() 

323 

324 # Skip test if the attribute is not defined 

325 if not hasattr(datastore, "trustGetRequest"): 

326 return 

327 

328 metrics = makeExampleMetrics() 

329 

330 i = 0 

331 for sc_name in ("StructuredDataNoComponents", "StructuredData", "StructuredComposite"): 

332 i += 1 

333 datasetTypeName = f"test_metric{i}" # Different dataset type name each time. 

334 

335 if sc_name == "StructuredComposite": 

336 disassembled = True 

337 else: 

338 disassembled = False 

339 

340 # Start datastore in default configuration of using registry 

341 datastore.trustGetRequest = False 

342 

343 # Create multiple storage classes for testing with or without 

344 # disassembly 

345 sc = self.storageClassFactory.getStorageClass(sc_name) 

346 dimensions = self.universe.conform(("visit", "physical_filter")) 

347 

348 dataId = { 

349 "instrument": "dummy", 

350 "visit": 52 + i, 

351 "physical_filter": "V", 

352 "band": "v", 

353 "day_obs": 20250101, 

354 } 

355 

356 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId) 

357 datastore.put(metrics, ref) 

358 

359 # Does it exist? 

360 self.assertTrue(datastore.exists(ref)) 

361 self.assertTrue(datastore.knows(ref)) 

362 multi = datastore.knows_these([ref]) 

363 self.assertTrue(multi[ref]) 

364 multi = datastore.mexists([ref]) 

365 self.assertTrue(multi[ref]) 

366 

367 # Get 

368 metricsOut = datastore.get(ref) 

369 self.assertEqual(metrics, metricsOut) 

370 

371 # Get the URI(s) 

372 allURIs = datastore.getURIs(ref) 

373 primaryURI, componentURIs = allURIs 

374 if disassembled: 

375 self.assertIsNone(primaryURI) 

376 self.assertEqual(len(componentURIs), 3) 

377 self.assertEqual(list(allURIs.iter_all()), list(componentURIs.values())) 

378 else: 

379 self.assertIn(datasetTypeName, primaryURI.path) 

380 self.assertFalse(componentURIs) 

381 self.assertEqual(list(allURIs.iter_all()), [primaryURI]) 

382 

383 # Delete registry entry so now we are trusting 

384 datastore.removeStoredItemInfo(ref) 

385 

386 # Now stop trusting and check that things break 

387 datastore.trustGetRequest = False 

388 

389 # Does it exist? 

390 self.assertFalse(datastore.exists(ref)) 

391 self.assertFalse(datastore.knows(ref)) 

392 multi = datastore.knows_these([ref]) 

393 self.assertFalse(multi[ref]) 

394 multi = datastore.mexists([ref]) 

395 self.assertFalse(multi[ref]) 

396 

397 with self.assertRaises(FileNotFoundError): 

398 datastore.get(ref) 

399 

400 if sc_name != "StructuredDataNoComponents": 

401 with self.assertRaises(FileNotFoundError): 

402 datastore.get(ref.makeComponentRef("data")) 

403 

404 # URI should fail unless we ask for prediction 

405 with self.assertRaises(FileNotFoundError): 

406 datastore.getURIs(ref) 

407 

408 predicted_primary, predicted_disassembled = datastore.getURIs(ref, predict=True) 

409 if disassembled: 

410 self.assertIsNone(predicted_primary) 

411 self.assertEqual(len(predicted_disassembled), 3) 

412 for uri in predicted_disassembled.values(): 

413 self.assertEqual(uri.fragment, "predicted") 

414 self.assertIn(datasetTypeName, uri.path) 

415 else: 

416 self.assertIn(datasetTypeName, predicted_primary.path) 

417 self.assertFalse(predicted_disassembled) 

418 self.assertEqual(predicted_primary.fragment, "predicted") 

419 

420 # Now enable registry-free trusting mode 

421 datastore.trustGetRequest = True 

422 

423 # Try again to get it 

424 metricsOut = datastore.get(ref) 

425 self.assertEqual(metricsOut, metrics) 

426 

427 # Does it exist? 

428 self.assertTrue(datastore.exists(ref)) 

429 

430 # Get a component 

431 if sc_name != "StructuredDataNoComponents": 

432 comp = "data" 

433 compRef = ref.makeComponentRef(comp) 

434 output = datastore.get(compRef) 

435 self.assertEqual(output, getattr(metrics, comp)) 

436 

437 # Get the URI -- if we trust this should work even without 

438 # enabling prediction. 

439 primaryURI2, componentURIs2 = datastore.getURIs(ref) 

440 self.assertEqual(primaryURI2, primaryURI) 

441 self.assertEqual(componentURIs2, componentURIs) 

442 

443 # Check for compatible storage class. 

444 if sc_name in ("StructuredDataNoComponents", "StructuredData"): 

445 # Make new dataset ref with compatible storage class. 

446 ref_comp = ref.overrideStorageClass("StructuredDataDictJson") 

447 

448 # Without `set_retrieve_dataset_type_method` it will fail to 

449 # find correct file. 

450 self.assertFalse(datastore.exists(ref_comp)) 

451 with self.assertRaises(FileNotFoundError): 

452 datastore.get(ref_comp) 

453 with self.assertRaises(FileNotFoundError): 

454 datastore.get(ref, storageClass="StructuredDataDictJson") 

455 

456 # Need a special method to generate stored dataset type. 

457 def _stored_dataset_type(name: str, ref: DatasetRef = ref) -> DatasetType: 

458 if name == ref.datasetType.name: 

459 return ref.datasetType 

460 raise ValueError(f"Unexpected dataset type name {ref.datasetType.name}") 

461 

462 datastore.set_retrieve_dataset_type_method(_stored_dataset_type) 

463 

464 # Storage class override with original dataset ref. 

465 metrics_as_dict = datastore.get(ref, storageClass="StructuredDataDictJson") 

466 self.assertIsInstance(metrics_as_dict, dict) 

467 

468 # get() should return a dict now. 

469 metrics_as_dict = datastore.get(ref_comp) 

470 self.assertIsInstance(metrics_as_dict, dict) 

471 

472 # exists() should work as well. 

473 self.assertTrue(datastore.exists(ref_comp)) 

474 

475 datastore.set_retrieve_dataset_type_method(None) 

476 

477 def testDisassembly(self) -> None: 

478 """Test disassembly within datastore.""" 

479 metrics = makeExampleMetrics() 

480 if self.isEphemeral: 

481 # in-memory datastore does not disassemble 

482 return 

483 

484 # Create multiple storage classes for testing different formulations 

485 # of composites. One of these will not disassemble to provide 

486 # a reference. 

487 storageClasses = [ 

488 self.storageClassFactory.getStorageClass(sc) 

489 for sc in ( 

490 "StructuredComposite", 

491 "StructuredCompositeTestA", 

492 "StructuredCompositeTestB", 

493 "StructuredCompositeReadComp", 

494 "StructuredData", # No disassembly 

495 "StructuredCompositeReadCompNoDisassembly", 

496 ) 

497 ] 

498 

499 # Create the test datastore 

500 datastore = self.makeDatastore() 

501 

502 # Dummy dataId 

503 dimensions = self.universe.conform(("visit", "physical_filter")) 

504 dataId = {"instrument": "dummy", "visit": 428, "physical_filter": "R"} 

505 

506 for i, sc in enumerate(storageClasses): 

507 with self.subTest(storageClass=sc.name): 

508 # Create a different dataset type each time round 

509 # so that a test failure in this subtest does not trigger 

510 # a cascade of tests because of file clashes 

511 ref = self.makeDatasetRef(f"metric_comp_{i}", dimensions, sc, dataId) 

512 

513 disassembled = sc.name not in {"StructuredData", "StructuredCompositeReadCompNoDisassembly"} 

514 

515 datastore.put(metrics, ref) 

516 

517 baseURI, compURIs = datastore.getURIs(ref) 

518 if disassembled: 

519 self.assertIsNone(baseURI) 

520 self.assertEqual(set(compURIs), {"data", "output", "summary"}) 

521 else: 

522 self.assertIsNotNone(baseURI) 

523 self.assertEqual(compURIs, {}) 

524 

525 metrics_get = datastore.get(ref) 

526 self.assertEqual(metrics_get, metrics) 

527 

528 # Retrieve the composite with read parameter 

529 stop = 4 

530 metrics_get = datastore.get(ref, parameters={"slice": slice(stop)}) 

531 self.assertEqual(metrics_get.summary, metrics.summary) 

532 self.assertEqual(metrics_get.output, metrics.output) 

533 self.assertEqual(metrics_get.data, metrics.data[:stop]) 

534 

535 # Retrieve a component 

536 data = datastore.get(ref.makeComponentRef("data")) 

537 self.assertEqual(data, metrics.data) 

538 

539 # On supported storage classes attempt to access a read 

540 # only component 

541 if "ReadComp" in sc.name: 

542 cRef = ref.makeComponentRef("counter") 

543 counter = datastore.get(cRef) 

544 self.assertEqual(counter, len(metrics.data)) 

545 

546 counter = datastore.get(cRef, parameters={"slice": slice(stop)}) 

547 self.assertEqual(counter, stop) 

548 

549 datastore.remove(ref) 

550 

551 def prepDeleteTest(self, n_refs: int = 1) -> tuple[Datastore, tuple[DatasetRef, ...]]: 

552 metrics = makeExampleMetrics() 

553 datastore = self.makeDatastore() 

554 # Put 

555 dimensions = self.universe.conform(("visit", "physical_filter")) 

556 sc = self.storageClassFactory.getStorageClass("StructuredData") 

557 refs = [] 

558 for i in range(n_refs): 

559 dataId = { 

560 "instrument": "dummy", 

561 "visit": 638 + i, 

562 "physical_filter": "U", 

563 "band": "u", 

564 "day_obs": 20250101, 

565 } 

566 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

567 datastore.put(metrics, ref) 

568 

569 # Does it exist? 

570 self.assertTrue(datastore.exists(ref)) 

571 

572 # Get 

573 metricsOut = datastore.get(ref) 

574 self.assertEqual(metrics, metricsOut) 

575 refs.append(ref) 

576 

577 return datastore, *refs 

578 

579 def testRemove(self) -> None: 

580 datastore, ref = self.prepDeleteTest() 

581 

582 # Remove 

583 datastore.remove(ref) 

584 

585 # Does it exist? 

586 self.assertFalse(datastore.exists(ref)) 

587 

588 # Do we now get a predicted URI? 

589 uri = datastore.getURI(ref, predict=True) 

590 self.assertEqual(uri.fragment, "predicted") 

591 

592 # Get should now fail 

593 with self.assertRaises(FileNotFoundError): 

594 datastore.get(ref) 

595 # Can only delete once 

596 with self.assertRaises(FileNotFoundError): 

597 datastore.remove(ref) 

598 

599 def testForget(self) -> None: 

600 datastore, ref = self.prepDeleteTest() 

601 

602 # Remove 

603 datastore.forget([ref]) 

604 

605 # Does it exist (as far as we know)? 

606 self.assertFalse(datastore.exists(ref)) 

607 

608 # Do we now get a predicted URI? 

609 uri = datastore.getURI(ref, predict=True) 

610 self.assertEqual(uri.fragment, "predicted") 

611 

612 # Get should now fail 

613 with self.assertRaises(FileNotFoundError): 

614 datastore.get(ref) 

615 

616 # Forgetting again is a silent no-op 

617 datastore.forget([ref]) 

618 

619 # Predicted URI should still point to the file. 

620 self.assertTrue(uri.exists()) 

621 

622 def testTransfer(self) -> None: 

623 metrics = makeExampleMetrics() 

624 

625 dimensions = self.universe.conform(("visit", "physical_filter")) 

626 dataId = { 

627 "instrument": "dummy", 

628 "visit": 2048, 

629 "physical_filter": "Uprime", 

630 "band": "u", 

631 "day_obs": 20250101, 

632 } 

633 

634 sc = self.storageClassFactory.getStorageClass("StructuredData") 

635 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

636 

637 inputDatastore = self.makeDatastore("test_input_datastore") 

638 outputDatastore = self.makeDatastore("test_output_datastore") 

639 

640 inputDatastore.put(metrics, ref) 

641 outputDatastore.transfer(inputDatastore, ref) 

642 

643 metricsOut = outputDatastore.get(ref) 

644 self.assertEqual(metrics, metricsOut) 

645 

646 def testBasicTransaction(self) -> None: 

647 datastore = self.makeDatastore() 

648 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

649 dimensions = self.universe.conform(("visit", "physical_filter")) 

650 nDatasets = 6 

651 dataIds = [ 

652 {"instrument": "dummy", "visit": i, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

653 for i in range(nDatasets) 

654 ] 

655 data = [ 

656 ( 

657 self.makeDatasetRef("metric", dimensions, storageClass, dataId), 

658 makeExampleMetrics(), 

659 ) 

660 for dataId in dataIds 

661 ] 

662 succeed = data[: nDatasets // 2] 

663 fail = data[nDatasets // 2 :] 

664 # All datasets added in this transaction should continue to exist 

665 with datastore.transaction(): 

666 for ref, metrics in succeed: 

667 datastore.put(metrics, ref) 

668 # Whereas datasets added in this transaction should not 

669 with self.assertRaises(TransactionTestError): 

670 with datastore.transaction(): 

671 for ref, metrics in fail: 

672 datastore.put(metrics, ref) 

673 raise TransactionTestError("This should propagate out of the context manager") 

674 # Check for datasets that should exist 

675 for ref, metrics in succeed: 

676 # Does it exist? 

677 self.assertTrue(datastore.exists(ref)) 

678 # Get 

679 metricsOut = datastore.get(ref, parameters=None) 

680 self.assertEqual(metrics, metricsOut) 

681 # URI 

682 uri = datastore.getURI(ref) 

683 self.assertEqual(uri.scheme, self.uriScheme) 

684 # Check for datasets that should not exist 

685 for ref, _ in fail: 

686 # These should raise 

687 with self.assertRaises(FileNotFoundError): 

688 # non-existing file 

689 datastore.get(ref) 

690 with self.assertRaises(FileNotFoundError): 

691 datastore.getURI(ref) 

692 

693 def testNestedTransaction(self) -> None: 

694 datastore = self.makeDatastore() 

695 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

696 dimensions = self.universe.conform(("visit", "physical_filter")) 

697 metrics = makeExampleMetrics() 

698 

699 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

700 refBefore = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

701 datastore.put(metrics, refBefore) 

702 with self.assertRaises(TransactionTestError): 

703 with datastore.transaction(): 

704 dataId = { 

705 "instrument": "dummy", 

706 "visit": 1, 

707 "physical_filter": "V", 

708 "band": "v", 

709 "day_obs": 20250101, 

710 } 

711 refOuter = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

712 datastore.put(metrics, refOuter) 

713 with datastore.transaction(): 

714 dataId = { 

715 "instrument": "dummy", 

716 "visit": 2, 

717 "physical_filter": "V", 

718 "band": "v", 

719 "day_obs": 20250101, 

720 } 

721 refInner = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

722 datastore.put(metrics, refInner) 

723 # All datasets should exist 

724 for ref in (refBefore, refOuter, refInner): 

725 metricsOut = datastore.get(ref, parameters=None) 

726 self.assertEqual(metrics, metricsOut) 

727 raise TransactionTestError("This should roll back the transaction") 

728 # Dataset(s) inserted before the transaction should still exist 

729 metricsOut = datastore.get(refBefore, parameters=None) 

730 self.assertEqual(metrics, metricsOut) 

731 # But all datasets inserted during the (rolled back) transaction 

732 # should be gone 

733 with self.assertRaises(FileNotFoundError): 

734 datastore.get(refOuter) 

735 with self.assertRaises(FileNotFoundError): 

736 datastore.get(refInner) 

737 

738 def _prepareIngestTest(self) -> tuple[MetricsExample, DatasetRef]: 

739 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

740 dimensions = self.universe.conform(("visit", "physical_filter")) 

741 metrics = makeExampleMetrics() 

742 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

743 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

744 return metrics, ref 

745 

746 def runIngestTest(self, func: Callable[[MetricsExample, str, DatasetRef], None]) -> None: 

747 metrics, ref = self._prepareIngestTest() 

748 # The file will be deleted after the test. 

749 # For symlink tests this leads to a situation where the datastore 

750 # points to a file that does not exist. This will make os.path.exist 

751 # return False but then the new symlink will fail with 

752 # FileExistsError later in the code so the test still passes. 

753 with _temp_yaml_file(metrics._asdict()) as path: 

754 func(metrics, path, ref) 

755 

756 def testIngestNoTransfer(self) -> None: 

757 """Test ingesting existing files with no transfer.""" 

758 for mode in (None, "auto"): 

759 # Some datastores have auto but can't do in place transfer 

760 if mode == "auto" and "auto" in self.ingestTransferModes and not self.canIngestNoTransferAuto: 

761 continue 

762 

763 with self.subTest(mode=mode): 

764 datastore = self.makeDatastore() 

765 

766 def succeed( 

767 obj: MetricsExample, 

768 path: str, 

769 ref: DatasetRef, 

770 mode: str | None = mode, 

771 datastore: Datastore = datastore, 

772 ) -> None: 

773 """Ingest a file already in the datastore root.""" 

774 # first move it into the root, and adjust the path 

775 # accordingly. 

776 # In the case of a ChainedDatastore, we have multiple 

777 # roots, all of which will accept the file, so we 

778 # have to copy it into all the roots. 

779 relative_path = None 

780 for root in datastore.roots.values(): 

781 if root is not None: 

782 copied_path = shutil.copy(path, root.ospath) 

783 relative_path = os.path.relpath(copied_path, start=root.ospath) 

784 assert relative_path is not None, ( 

785 "Running a FileDatastore test on a Datastore instance without any roots" 

786 ) 

787 datastore.ingest(FileDataset(path=relative_path, refs=ref), transfer=mode) 

788 self.assertEqual(obj, datastore.get(ref)) 

789 

790 def failInputDoesNotExist( 

791 obj: MetricsExample, 

792 path: str, 

793 ref: DatasetRef, 

794 mode: str | None = mode, 

795 datastore: Datastore = datastore, 

796 ) -> None: 

797 """Can't ingest files if we're given a bad path.""" 

798 with self.assertRaises(FileNotFoundError): 

799 datastore.ingest( 

800 FileDataset(path="this-file-does-not-exist.yaml", refs=ref), transfer=mode 

801 ) 

802 self.assertFalse(datastore.exists(ref)) 

803 

804 def failOutsideRoot( 

805 obj: MetricsExample, 

806 path: str, 

807 ref: DatasetRef, 

808 mode: str | None = mode, 

809 datastore: Datastore = datastore, 

810 ) -> None: 

811 """Can't ingest files outside of datastore root unless 

812 auto. 

813 """ 

814 if mode == "auto": 

815 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

816 self.assertTrue(datastore.exists(ref)) 

817 else: 

818 with self.assertRaises(RuntimeError): 

819 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

820 self.assertFalse(datastore.exists(ref)) 

821 

822 def failNotImplemented( 

823 obj: MetricsExample, 

824 path: str, 

825 ref: DatasetRef, 

826 mode: str | None = mode, 

827 datastore: Datastore = datastore, 

828 ) -> None: 

829 with self.assertRaises(NotImplementedError): 

830 datastore.ingest(FileDataset(path=path, refs=ref), transfer=mode) 

831 

832 if mode in self.ingestTransferModes: 

833 self.runIngestTest(failOutsideRoot) 

834 self.runIngestTest(failInputDoesNotExist) 

835 self.runIngestTest(succeed) 

836 else: 

837 self.runIngestTest(failNotImplemented) 

838 

839 def testIngestTransfer(self) -> None: 

840 """Test ingesting existing files after transferring them.""" 

841 for mode in ("copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto"): 

842 with self.subTest(mode=mode): 

843 datastore = self.makeDatastore(mode) 

844 

845 def succeed( 

846 obj: MetricsExample, 

847 path: str, 

848 ref: DatasetRef, 

849 mode: str | None = mode, 

850 datastore: Datastore = datastore, 

851 ) -> None: 

852 """Ingest a file by transferring it to the template 

853 location. 

854 """ 

855 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

856 self.assertEqual(obj, datastore.get(ref)) 

857 file_exists = os.path.exists(path) 

858 if mode == "move": 

859 self.assertFalse(file_exists) 

860 else: 

861 self.assertTrue(file_exists) 

862 

863 def failInputDoesNotExist( 

864 obj: MetricsExample, 

865 path: str, 

866 ref: DatasetRef, 

867 mode: str | None = mode, 

868 datastore: Datastore = datastore, 

869 ) -> None: 

870 """Can't ingest files if we're given a bad path.""" 

871 with self.assertRaises(FileNotFoundError): 

872 # Ensure the file does not look like it is in 

873 # datastore for auto mode 

874 datastore.ingest( 

875 FileDataset(path="../this-file-does-not-exist.yaml", refs=ref), transfer=mode 

876 ) 

877 self.assertFalse(datastore.exists(ref), f"Checking not in datastore using mode {mode}") 

878 

879 def failNotImplemented( 

880 obj: MetricsExample, 

881 path: str, 

882 ref: DatasetRef, 

883 mode: str | None = mode, 

884 datastore: Datastore = datastore, 

885 ) -> None: 

886 with self.assertRaises(NotImplementedError): 

887 datastore.ingest(FileDataset(path=os.path.abspath(path), refs=ref), transfer=mode) 

888 

889 if mode in self.ingestTransferModes: 

890 self.runIngestTest(failInputDoesNotExist) 

891 self.runIngestTest(succeed) 

892 else: 

893 self.runIngestTest(failNotImplemented) 

894 

895 def testIngestSymlinkOfSymlink(self) -> None: 

896 """Special test for symlink to a symlink ingest""" 

897 metrics, ref = self._prepareIngestTest() 

898 # The aim of this test is to create a dataset on disk, then 

899 # create a symlink to it and finally ingest the symlink such that 

900 # the symlink in the datastore points to the original dataset. 

901 for mode in ("symlink", "relsymlink"): 

902 if mode not in self.ingestTransferModes: 

903 continue 

904 

905 print(f"Trying mode {mode}") 

906 with _temp_yaml_file(metrics._asdict()) as realpath: 

907 with tempfile.TemporaryDirectory() as tmpdir: 

908 sympath = os.path.join(tmpdir, "symlink.yaml") 

909 os.symlink(os.path.realpath(realpath), sympath) 

910 

911 datastore = self.makeDatastore() 

912 datastore.ingest(FileDataset(path=os.path.abspath(sympath), refs=ref), transfer=mode) 

913 

914 uri = datastore.getURI(ref) 

915 self.assertTrue(uri.isLocal, f"Check {uri.scheme}") 

916 self.assertTrue(os.path.islink(uri.ospath), f"Check {uri} is a symlink") 

917 

918 linkTarget = os.readlink(uri.ospath) 

919 if mode == "relsymlink": 

920 self.assertFalse(os.path.isabs(linkTarget)) 

921 else: 

922 self.assertTrue(os.path.samefile(linkTarget, realpath)) 

923 

924 # Check that we can get the dataset back regardless of mode 

925 metric2 = datastore.get(ref) 

926 self.assertEqual(metric2, metrics) 

927 

928 # Cleanup the file for next time round loop 

929 # since it will get the same file name in store 

930 datastore.remove(ref) 

931 

932 def _populate_export_datastore(self, name: str) -> tuple[Datastore, list[DatasetRef]]: 

933 datastore = self.makeDatastore(name) 

934 

935 # For now only the FileDatastore can be used for this test. 

936 # ChainedDatastore that only includes InMemoryDatastores have to be 

937 # skipped as well. 

938 for name in datastore.names: 

939 if not name.startswith("InMemoryDatastore"): 

940 break 

941 else: 

942 raise unittest.SkipTest("in-memory datastore does not support record export/import") 

943 

944 metrics = makeExampleMetrics() 

945 dimensions = self.universe.conform(("visit", "physical_filter")) 

946 sc = self.storageClassFactory.getStorageClass("StructuredData") 

947 

948 refs = [] 

949 for visit in (2048, 2049, 2050): 

950 dataId = { 

951 "instrument": "dummy", 

952 "visit": visit, 

953 "physical_filter": "Uprime", 

954 "band": "u", 

955 "day_obs": 20250101, 

956 } 

957 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

958 datastore.put(metrics, ref) 

959 refs.append(ref) 

960 return datastore, refs 

961 

962 def testExportImportRecords(self) -> None: 

963 """Test for export_records and import_records methods.""" 

964 datastore, refs = self._populate_export_datastore("test_datastore") 

965 for exported_refs in (refs, refs[1:]): 

966 n_refs = len(exported_refs) 

967 records = datastore.export_records(exported_refs) 

968 self.assertGreater(len(records), 0) 

969 self.assertTrue(set(records.keys()) <= set(datastore.names)) 

970 # In a ChainedDatastore each FileDatastore will have a complete set 

971 for datastore_name in records: 

972 record_data = records[datastore_name] 

973 self.assertEqual(len(record_data.records), n_refs) 

974 

975 # Check that subsetting works, include non-existing dataset ID. 

976 dataset_ids = {exported_refs[0].id, uuid.uuid4()} 

977 subset = record_data.subset(dataset_ids) 

978 assert subset is not None 

979 self.assertEqual(len(subset.records), 1) 

980 subset = record_data.subset({uuid.uuid4()}) 

981 self.assertIsNone(subset) 

982 

983 # Use the same datastore name to import relative path. 

984 datastore2 = self.makeDatastore("test_datastore") 

985 

986 records = datastore.export_records(refs[1:]) 

987 datastore2.import_records(records) 

988 

989 with self.assertRaises(FileNotFoundError): 

990 data = datastore2.get(refs[0]) 

991 data = datastore2.get(refs[1]) 

992 self.assertIsNotNone(data) 

993 data = datastore2.get(refs[2]) 

994 self.assertIsNotNone(data) 

995 

996 def testExportPredictedRecords(self): 

997 if self.isEphemeral: 

998 raise unittest.SkipTest("in-memory datastore does not support record export/import") 

999 sc = self.storageClassFactory.getStorageClass("ThingOne") 

1000 dimensions = self.universe.conform(("visit", "physical_filter")) 

1001 dataId = { 

1002 "instrument": "dummy", 

1003 "visit": 52, 

1004 "physical_filter": "V", 

1005 "band": "v", 

1006 "day_obs": 20250101, 

1007 } 

1008 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

1009 

1010 datastore = self.makeDatastore("test_datastore") 

1011 names = {n for n in datastore.names if not n.startswith("InMemory")} 

1012 records = datastore.export_predicted_records([ref]) 

1013 

1014 # Expect predicted records from all datastores. 

1015 self.assertEqual(set(records.keys()), names) 

1016 

1017 for record_data in records.values(): 

1018 self.assertEqual(len(record_data.records), 1) 

1019 

1020 def testExport(self) -> None: 

1021 datastore, refs = self._populate_export_datastore("test_datastore") 

1022 

1023 datasets = list(datastore.export(refs)) 

1024 self.assertEqual(len(datasets), 3) 

1025 

1026 for transfer in (None, "auto"): 

1027 # Both will default to None 

1028 datasets = list(datastore.export(refs, transfer=transfer)) 

1029 self.assertEqual(len(datasets), 3) 

1030 

1031 with self.assertRaises(TypeError): 

1032 list(datastore.export(refs, transfer="copy")) 

1033 

1034 with self.assertRaises(TypeError): 

1035 list(datastore.export(refs, directory="exportDir", transfer="move")) 

1036 

1037 # Create a new ref that is not known to the datastore and try to 

1038 # export it. 

1039 sc = self.storageClassFactory.getStorageClass("ThingOne") 

1040 dimensions = self.universe.conform(("visit", "physical_filter")) 

1041 dataId = { 

1042 "instrument": "dummy", 

1043 "visit": 52, 

1044 "physical_filter": "V", 

1045 "band": "v", 

1046 "day_obs": 20250101, 

1047 } 

1048 ref = self.makeDatasetRef("metric", dimensions, sc, dataId) 

1049 with self.assertRaises(FileNotFoundError): 

1050 list(datastore.export(refs + [ref], transfer=None)) 

1051 

1052 def test_pydantic_dict_storage_class_conversions(self) -> None: 

1053 """Test converting a dataset stored as a pydantic model into a dict on 

1054 read. 

1055 """ 

1056 datastore = self.makeDatastore() 

1057 store_as_model = self.makeDatasetRef( 

1058 "store_as_model", 

1059 dimensions=self.universe.empty, 

1060 storageClass="DictConvertibleModel", 

1061 dataId=DataCoordinate.make_empty(self.universe), 

1062 ) 

1063 content = {"a": "one", "b": "two"} 

1064 model = DictConvertibleModel.from_dict(content, extra="original content") 

1065 datastore.put(model, store_as_model) 

1066 retrieved_model = datastore.get(store_as_model) 

1067 self.assertEqual(retrieved_model, model) 

1068 loaded = datastore.get(store_as_model.overrideStorageClass("NativeDictForConvertibleModel")) 

1069 self.assertEqual(type(loaded), dict) 

1070 self.assertEqual(loaded, content) 

1071 

1072 def test_simple_class_put_get(self) -> None: 

1073 """Test that we can put and get a simple class with dict() 

1074 constructor. 

1075 """ 

1076 datastore = self.makeDatastore() 

1077 data = MetricsExample(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1078 self._assert_different_puts(datastore, "MetricsExample", data) 

1079 

1080 def test_dataclass_put_get(self) -> None: 

1081 """Test that we can put and get a simple dataclass.""" 

1082 datastore = self.makeDatastore() 

1083 data = MetricsExampleDataclass(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1084 self._assert_different_puts(datastore, "MetricsExampleDataclass", data) 

1085 

1086 def test_pydantic_put_get(self) -> None: 

1087 """Test that we can put and get a simple Pydantic model.""" 

1088 datastore = self.makeDatastore() 

1089 data = MetricsExampleModel(summary={"a": 1}, data=[1, 2, 3], output={"b": 2}) 

1090 self._assert_different_puts(datastore, "MetricsExampleModel", data) 

1091 

1092 def test_tuple_put_get(self) -> None: 

1093 """Test that we can put and get a tuple.""" 

1094 datastore = self.makeDatastore() 

1095 data = ("a", "b", 1) 

1096 self._assert_different_puts(datastore, "TupleExample", data) 

1097 

1098 def _assert_different_puts(self, datastore: Datastore, storageClass_root: str, data: Any) -> None: 

1099 refs = { 

1100 x: self.makeDatasetRef( 

1101 f"stora_as_{x}", 

1102 dimensions=self.universe.empty, 

1103 storageClass=f"{storageClass_root}{x}", 

1104 dataId=DataCoordinate.make_empty(self.universe), 

1105 ) 

1106 for x in ["A", "B"] 

1107 } 

1108 

1109 for ref in refs.values(): 

1110 datastore.put(data, ref) 

1111 

1112 self.assertEqual(datastore.get(refs["A"]), datastore.get(refs["B"])) 

1113 

1114 

1115class PosixDatastoreTestCase(DatastoreTests, unittest.TestCase): 

1116 """PosixDatastore specialization""" 

1117 

1118 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1119 uriScheme = "file" 

1120 canIngestNoTransferAuto = True 

1121 ingestTransferModes = (None, "copy", "move", "link", "hardlink", "symlink", "relsymlink", "auto") 

1122 isEphemeral = False 

1123 rootKeys = ("root",) 

1124 validationCanFail = True 

1125 

1126 def setUp(self) -> None: 

1127 # The call to os.path.realpath is necessary because Mac temporary files 

1128 # can end up in either /private/var/folders or /var/folders, which 

1129 # refer to the same location but don't appear to. 

1130 # This matters for "relsymlink" transfer mode, because it needs to be 

1131 # able to read the file through a relative symlink, but some of the 

1132 # intermediate directories are not traversable if you try to get from a 

1133 # tempfile in /var/folders to one in /private/var/folders via a 

1134 # relative path. 

1135 self.root = os.path.realpath(self.enterContext(tempfile.TemporaryDirectory())) 

1136 super().setUp() 

1137 

1138 def testAtomicWrite(self) -> None: 

1139 """Test that we write to a temporary and then rename""" 

1140 datastore = self.makeDatastore() 

1141 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1142 dimensions = self.universe.conform(("visit", "physical_filter")) 

1143 metrics = makeExampleMetrics() 

1144 

1145 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

1146 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1147 

1148 with self.assertLogs("lsst.resources", "DEBUG") as cm: 

1149 datastore.put(metrics, ref) 

1150 move_logs = [ll for ll in cm.output if "transfer=" in ll] 

1151 self.assertIn("transfer=move", move_logs[0]) 

1152 

1153 # And the transfer should be file to file. 

1154 self.assertEqual(move_logs[0].count("file://"), 2) 

1155 

1156 def testCanNotDeterminePutFormatterLocation(self) -> None: 

1157 """Verify that the expected exception is raised if the FileDatastore 

1158 can not determine the put formatter location. 

1159 """ 

1160 _ = makeExampleMetrics() 

1161 datastore = self.makeDatastore() 

1162 

1163 # Create multiple storage classes for testing different formulations 

1164 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1165 

1166 sccomp = StorageClass("Dummy") 

1167 compositeStorageClass = StorageClass( 

1168 "StructuredComposite", components={"dummy": sccomp, "dummy2": sccomp} 

1169 ) 

1170 

1171 dimensions = self.universe.conform(("visit", "physical_filter")) 

1172 dataId = { 

1173 "instrument": "dummy", 

1174 "visit": 52, 

1175 "physical_filter": "V", 

1176 "band": "v", 

1177 "day_obs": 20250101, 

1178 } 

1179 

1180 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1181 compRef = self.makeDatasetRef("metric", dimensions, compositeStorageClass, dataId) 

1182 

1183 def raiser(ref: DatasetRef) -> None: 

1184 raise DatasetTypeNotSupportedError() 

1185 

1186 with unittest.mock.patch.object( 

1187 lsst.daf.butler.datastores.fileDatastore.FileDatastore, 

1188 "_determine_put_formatter_location", 

1189 side_effect=raiser, 

1190 ): 

1191 # verify the non-composite ref execution path: 

1192 with self.assertRaises(DatasetTypeNotSupportedError): 

1193 datastore.getURIs(ref, predict=True) 

1194 

1195 # verify the composite-ref execution path: 

1196 with self.assertRaises(DatasetTypeNotSupportedError): 

1197 datastore.getURIs(compRef, predict=True) 

1198 

1199 def test_roots(self): 

1200 datastore = self.makeDatastore() 

1201 

1202 self.assertEqual(set(datastore.names), set(datastore.roots.keys())) 

1203 for root in datastore.roots.values(): 

1204 if root is not None: 

1205 self.assertTrue(root.exists()) 

1206 

1207 def test_prepare_get_for_external_client(self): 

1208 datastore = self.makeDatastore() 

1209 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1210 dimensions = self.universe.conform(("visit", "physical_filter")) 

1211 dataId = {"instrument": "dummy", "visit": 52, "physical_filter": "V", "band": "v"} 

1212 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1213 # Most of the coverage for this function is in test_server.py, 

1214 # because it requires a file backend that supports URL signing. 

1215 self.assertIsNone(datastore.prepare_get_for_external_client(ref)) 

1216 

1217 

1218class PosixDatastoreNoChecksumsTestCase(PosixDatastoreTestCase): 

1219 """Posix datastore tests but with checksums disabled.""" 

1220 

1221 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreNoChecksums.yaml") 

1222 

1223 def testChecksum(self) -> None: 

1224 """Ensure that checksums have not been calculated.""" 

1225 datastore = self.makeDatastore() 

1226 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1227 dimensions = self.universe.conform(("visit", "physical_filter")) 

1228 metrics = makeExampleMetrics() 

1229 

1230 dataId = {"instrument": "dummy", "visit": 0, "physical_filter": "V", "band": "v", "day_obs": 20250101} 

1231 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1232 

1233 # Configuration should have disabled checksum calculation 

1234 datastore.put(metrics, ref) 

1235 infos = datastore.getStoredItemsInfo(ref) 

1236 self.assertIsNone(infos[0].checksum) 

1237 

1238 # Remove put back but with checksums enabled explicitly 

1239 datastore.remove(ref) 

1240 datastore.useChecksum = True 

1241 datastore.put(metrics, ref) 

1242 

1243 infos = datastore.getStoredItemsInfo(ref) 

1244 self.assertIsNotNone(infos[0].checksum) 

1245 

1246 def test_repeat_ingest(self): 

1247 """Test that repeatedly ingesting the same file in direct mode 

1248 is allowed. 

1249 

1250 Test can only run with FileDatastore since that is the only one 

1251 supporting "direct" ingest. 

1252 """ 

1253 metrics, v4ref = self._prepareIngestTest() 

1254 datastore = self.makeDatastore() 

1255 v5ref = DatasetRef( 

1256 v4ref.datasetType, v4ref.dataId, v4ref.run, id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN 

1257 ) 

1258 

1259 with _temp_yaml_file(metrics._asdict()) as path: 

1260 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct") 

1261 

1262 # This will fail because the ref is using UUIDv4. 

1263 with self.assertRaises(RuntimeError): 

1264 datastore.ingest(FileDataset(path=path, refs=v4ref), transfer="direct") 

1265 

1266 # UUIDv5 can be repeatedly ingested in direct mode. 

1267 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct") 

1268 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="direct") 

1269 

1270 with self.assertRaises(RuntimeError): 

1271 datastore.ingest(FileDataset(path=path, refs=v5ref), transfer="copy") 

1272 

1273 

1274class TrashDatastoreTestCase(PosixDatastoreTestCase): 

1275 """Restrict trash test to FileDatastore.""" 

1276 

1277 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1278 

1279 def testTrash(self) -> None: 

1280 datastore, *refs = self.prepDeleteTest(n_refs=10) 

1281 

1282 # Trash one of them. 

1283 ref = refs.pop() 

1284 uri = datastore.getURI(ref) 

1285 datastore.trash(ref) 

1286 self.assertTrue(uri.exists(), uri) # Not deleted yet 

1287 datastore.emptyTrash() 

1288 self.assertFalse(uri.exists(), uri) 

1289 

1290 # Trash it again should be fine. 

1291 datastore.trash(ref) 

1292 

1293 # Trash multiple items at once. 

1294 subset = [refs.pop(), refs.pop()] 

1295 datastore.trash(subset) 

1296 datastore.emptyTrash() 

1297 

1298 # Remove a record and trash should do nothing. 

1299 # This is execution butler scenario. 

1300 ref = refs.pop() 

1301 uri = datastore.getURI(ref) 

1302 datastore._table.delete(["dataset_id"], {"dataset_id": ref.id}) 

1303 self.assertTrue(uri.exists()) 

1304 datastore.trash(ref) 

1305 datastore.emptyTrash() 

1306 self.assertTrue(uri.exists()) 

1307 

1308 # Switch on trust and it should delete the file. 

1309 datastore.trustGetRequest = True 

1310 datastore.trash([ref]) 

1311 self.assertFalse(uri.exists()) 

1312 

1313 # Remove multiples at once in trust mode. 

1314 subset = [refs.pop() for i in range(3)] 

1315 datastore.trash(subset) 

1316 datastore.trash(refs.pop()) # Check that a single ref can trash 

1317 

1318 def test_empty_trash(self) -> None: 

1319 """Test parameters and return value for empty trash.""" 

1320 datastore, *refs = self.prepDeleteTest(n_refs=10) 

1321 

1322 # Trash one of them. 

1323 ref = refs.pop() 

1324 uri = datastore.getURI(ref) 

1325 datastore.trash(ref) 

1326 self.assertTrue(uri.exists(), uri) # Not deleted yet 

1327 

1328 # Empty trash but with a list of refs that does not include the 

1329 # one in the trash table. 

1330 removed = datastore.emptyTrash(refs=refs) 

1331 self.assertEqual(len(removed), 0) 

1332 self.assertTrue(uri.exists(), uri) 

1333 

1334 # Empty the entire trash but in dry_run mode. 

1335 removed = datastore.emptyTrash(dry_run=True) 

1336 self.assertEqual(len(removed), 1) 

1337 self.assertEqual(removed.pop(), uri) 

1338 self.assertTrue(uri.exists(), uri) 

1339 

1340 # Empty the trash specifying the actual ref. 

1341 removed = datastore.emptyTrash(refs=[ref]) 

1342 self.assertEqual(len(removed), 1) 

1343 self.assertEqual(removed.pop(), uri) 

1344 self.assertFalse(uri.exists(), uri) 

1345 

1346 # Trash everything and empty. 

1347 datastore.trash(refs) 

1348 removed = datastore.emptyTrash(dry_run=True) 

1349 for u in removed: 

1350 self.assertTrue(u.exists()) 

1351 removed = datastore.emptyTrash() 

1352 for u in removed: 

1353 self.assertFalse(u.exists()) 

1354 

1355 

1356class CleanupPosixDatastoreTestCase(DatastoreTestsBase, unittest.TestCase): 

1357 """Test datastore cleans up on failure.""" 

1358 

1359 configFile = os.path.join(TESTDIR, "config/basic/butler.yaml") 

1360 

1361 def setUp(self) -> None: 

1362 # Override the working directory before calling the base class 

1363 self.root = tempfile.mkdtemp() 

1364 super().setUp() 

1365 

1366 def testCleanup(self) -> None: 

1367 """Test that a failed formatter write does cleanup a partial file.""" 

1368 metrics = makeExampleMetrics() 

1369 datastore = self.makeDatastore() 

1370 

1371 storageClass = self.storageClassFactory.getStorageClass("StructuredData") 

1372 

1373 dimensions = self.universe.conform(("visit", "physical_filter")) 

1374 dataId = { 

1375 "instrument": "dummy", 

1376 "visit": 52, 

1377 "physical_filter": "V", 

1378 "band": "v", 

1379 "day_obs": 20250101, 

1380 } 

1381 

1382 ref = self.makeDatasetRef("metric", dimensions, storageClass, dataId) 

1383 

1384 # Determine where the file will end up (we assume Formatters use 

1385 # the same file extension) 

1386 expectedUri = datastore.getURI(ref, predict=True) 

1387 self.assertEqual(expectedUri.fragment, "predicted") 

1388 

1389 self.assertEqual(expectedUri.getExtension(), ".yaml", f"Is there a file extension in {expectedUri}") 

1390 

1391 # Try formatter that fails and formatter that fails and leaves 

1392 # a file behind 

1393 for formatter in (BadWriteFormatter, BadNoWriteFormatter): 

1394 with self.subTest(formatter=str(formatter)): 

1395 # Monkey patch the formatter 

1396 datastore.formatterFactory.registerFormatter(ref.datasetType, formatter, overwrite=True) 

1397 

1398 # Try to put the dataset, it should fail 

1399 with self.assertRaises(RuntimeError): 

1400 datastore.put(metrics, ref) 

1401 

1402 # Check that there is no file on disk 

1403 self.assertFalse(expectedUri.exists(), f"Check for existence of {expectedUri}") 

1404 

1405 # Check that there is a directory 

1406 dir = expectedUri.dirname() 

1407 self.assertTrue(dir.exists(), f"Check for existence of directory {dir}") 

1408 

1409 # Force YamlFormatter and check that this time a file is written 

1410 datastore.formatterFactory.registerFormatter(ref.datasetType, YamlFormatter, overwrite=True) 

1411 datastore.put(metrics, ref) 

1412 self.assertTrue(expectedUri.exists(), f"Check for existence of {expectedUri}") 

1413 datastore.remove(ref) 

1414 self.assertFalse(expectedUri.exists(), f"Check for existence of now removed {expectedUri}") 

1415 

1416 

1417class InMemoryDatastoreTestCase(DatastoreTests, unittest.TestCase): 

1418 """PosixDatastore specialization""" 

1419 

1420 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastore.yaml") 

1421 uriScheme = "mem" 

1422 hasUnsupportedPut = False 

1423 ingestTransferModes = () 

1424 isEphemeral = True 

1425 rootKeys = None 

1426 validationCanFail = False 

1427 

1428 

1429class ChainedDatastoreTestCase(PosixDatastoreTestCase): 

1430 """ChainedDatastore specialization using a POSIXDatastore""" 

1431 

1432 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore.yaml") 

1433 hasUnsupportedPut = False 

1434 canIngestNoTransferAuto = False 

1435 ingestTransferModes = (None, "copy", "move", "hardlink", "symlink", "relsymlink", "link", "auto") 

1436 isEphemeral = False 

1437 rootKeys = (".datastores.1.root", ".datastores.2.root") 

1438 validationCanFail = True 

1439 

1440 

1441class ChainedDatastoreMemoryTestCase(InMemoryDatastoreTestCase): 

1442 """ChainedDatastore specialization using all InMemoryDatastore""" 

1443 

1444 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2.yaml") 

1445 validationCanFail = False 

1446 isEphemeral = True 

1447 

1448 

1449class DatastoreConstraintsTests(DatastoreTestsBase): 

1450 """Basic tests of constraints model of Datastores.""" 

1451 

1452 def testConstraints(self) -> None: 

1453 """Test constraints model. Assumes that each test class has the 

1454 same constraints. 

1455 """ 

1456 metrics = makeExampleMetrics() 

1457 datastore = self.makeDatastore() 

1458 

1459 sc1 = self.storageClassFactory.getStorageClass("StructuredData") 

1460 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson") 

1461 dimensions = self.universe.conform(("visit", "physical_filter", "instrument")) 

1462 dataId = { 

1463 "visit": 52, 

1464 "physical_filter": "V", 

1465 "band": "v", 

1466 "instrument": "DummyCamComp", 

1467 "day_obs": 20250101, 

1468 } 

1469 

1470 # Write empty file suitable for ingest check (JSON and YAML variants) 

1471 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml") 

1472 testfile_j = tempfile.NamedTemporaryFile(suffix=".json") 

1473 for datasetTypeName, sc, accepted in ( 

1474 ("metric", sc1, True), 

1475 ("metric5", sc1, False), 

1476 ("metric33", sc1, True), 

1477 ("metric5", sc2, True), 

1478 ): 

1479 # Choose different temp file depending on StorageClass 

1480 testfile = testfile_j if sc.name.endswith("Json") else testfile_y 

1481 

1482 with self.subTest(datasetTypeName=datasetTypeName, storageClass=sc.name, file=testfile.name): 

1483 ref = self.makeDatasetRef(datasetTypeName, dimensions, sc, dataId) 

1484 if accepted: 

1485 datastore.put(metrics, ref) 

1486 self.assertTrue(datastore.exists(ref)) 

1487 datastore.remove(ref) 

1488 

1489 # Try ingest 

1490 if self.canIngest: 

1491 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1492 self.assertTrue(datastore.exists(ref)) 

1493 datastore.remove(ref) 

1494 else: 

1495 with self.assertRaises(DatasetTypeNotSupportedError): 

1496 datastore.put(metrics, ref) 

1497 self.assertFalse(datastore.exists(ref)) 

1498 

1499 # Again with ingest 

1500 if self.canIngest: 

1501 with self.assertRaises(DatasetTypeNotSupportedError): 

1502 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1503 self.assertFalse(datastore.exists(ref)) 

1504 

1505 

1506class PosixDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase): 

1507 """PosixDatastore specialization""" 

1508 

1509 configFile = os.path.join(TESTDIR, "config/basic/posixDatastoreP.yaml") 

1510 canIngest = True 

1511 

1512 def setUp(self) -> None: 

1513 # Override the working directory before calling the base class 

1514 self.root = tempfile.mkdtemp() 

1515 super().setUp() 

1516 

1517 

1518class InMemoryDatastoreConstraintsTestCase(DatastoreConstraintsTests, unittest.TestCase): 

1519 """InMemoryDatastore specialization.""" 

1520 

1521 configFile = os.path.join(TESTDIR, "config/basic/inMemoryDatastoreP.yaml") 

1522 canIngest = False 

1523 

1524 

1525class ChainedDatastoreConstraintsNativeTestCase(PosixDatastoreConstraintsTestCase): 

1526 """ChainedDatastore specialization using a POSIXDatastore and constraints 

1527 at the ChainedDatstore. 

1528 """ 

1529 

1530 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePa.yaml") 

1531 

1532 

1533class ChainedDatastoreConstraintsTestCase(PosixDatastoreConstraintsTestCase): 

1534 """ChainedDatastore specialization using a POSIXDatastore.""" 

1535 

1536 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastoreP.yaml") 

1537 

1538 

1539class ChainedDatastoreMemoryConstraintsTestCase(InMemoryDatastoreConstraintsTestCase): 

1540 """ChainedDatastore specialization using all InMemoryDatastore.""" 

1541 

1542 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastore2P.yaml") 

1543 canIngest = False 

1544 

1545 

1546class ChainedDatastorePerStoreConstraintsTests(DatastoreTestsBase, unittest.TestCase): 

1547 """Test that a chained datastore can control constraints per-datastore 

1548 even if child datastore would accept. 

1549 """ 

1550 

1551 configFile = os.path.join(TESTDIR, "config/basic/chainedDatastorePb.yaml") 

1552 

1553 def setUp(self) -> None: 

1554 # Override the working directory before calling the base class 

1555 self.root = tempfile.mkdtemp() 

1556 super().setUp() 

1557 

1558 def testConstraints(self) -> None: 

1559 """Test chained datastore constraints model.""" 

1560 metrics = makeExampleMetrics() 

1561 datastore = self.makeDatastore() 

1562 

1563 sc1 = self.storageClassFactory.getStorageClass("StructuredData") 

1564 sc2 = self.storageClassFactory.getStorageClass("StructuredDataJson") 

1565 dimensions = self.universe.conform(("visit", "physical_filter", "instrument")) 

1566 dataId1 = { 

1567 "visit": 52, 

1568 "physical_filter": "V", 

1569 "band": "v", 

1570 "instrument": "DummyCamComp", 

1571 "day_obs": 20250101, 

1572 } 

1573 dataId2 = {"visit": 52, "physical_filter": "V", "band": "v", "instrument": "HSC", "day_obs": 20250101} 

1574 

1575 # Write empty file suitable for ingest check (JSON and YAML variants) 

1576 testfile_y = tempfile.NamedTemporaryFile(suffix=".yaml") 

1577 testfile_j = tempfile.NamedTemporaryFile(suffix=".json") 

1578 

1579 for typeName, dataId, sc, accept, ingest in ( 

1580 ("metric", dataId1, sc1, (False, True, False), True), 

1581 ("metric5", dataId1, sc1, (False, False, False), False), 

1582 ("metric5", dataId2, sc1, (True, False, False), False), 

1583 ("metric33", dataId2, sc2, (True, True, False), True), 

1584 ("metric5", dataId1, sc2, (False, True, False), True), 

1585 ): 

1586 # Choose different temp file depending on StorageClass 

1587 testfile = testfile_j if sc.name.endswith("Json") else testfile_y 

1588 

1589 with self.subTest(datasetTypeName=typeName, dataId=dataId, sc=sc.name): 

1590 ref = self.makeDatasetRef(typeName, dimensions, sc, dataId) 

1591 if any(accept): 

1592 datastore.put(metrics, ref) 

1593 self.assertTrue(datastore.exists(ref)) 

1594 

1595 # Check each datastore inside the chained datastore 

1596 for childDatastore, expected in zip(datastore.datastores, accept, strict=True): 

1597 self.assertEqual( 

1598 childDatastore.exists(ref), 

1599 expected, 

1600 f"Testing presence of {ref} in datastore {childDatastore.name}", 

1601 ) 

1602 

1603 datastore.remove(ref) 

1604 

1605 # Check that ingest works 

1606 if ingest: 

1607 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1608 self.assertTrue(datastore.exists(ref)) 

1609 

1610 # Check each datastore inside the chained datastore 

1611 for childDatastore, expected in zip(datastore.datastores, accept, strict=True): 

1612 # Ephemeral datastores means InMemory at the moment 

1613 # and that does not accept ingest of files. 

1614 if childDatastore.isEphemeral: 

1615 expected = False 

1616 self.assertEqual( 

1617 childDatastore.exists(ref), 

1618 expected, 

1619 f"Testing presence of ingested {ref} in datastore {childDatastore.name}", 

1620 ) 

1621 

1622 datastore.remove(ref) 

1623 else: 

1624 with self.assertRaises(DatasetTypeNotSupportedError): 

1625 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1626 

1627 else: 

1628 with self.assertRaises(DatasetTypeNotSupportedError): 

1629 datastore.put(metrics, ref) 

1630 self.assertFalse(datastore.exists(ref)) 

1631 

1632 # Again with ingest 

1633 with self.assertRaises(DatasetTypeNotSupportedError): 

1634 datastore.ingest(FileDataset(testfile.name, [ref]), transfer="link") 

1635 self.assertFalse(datastore.exists(ref)) 

1636 

1637 

1638@unittest.mock.patch.dict(os.environ, {}, clear=True) 

1639class DatastoreCacheTestCase(DatasetTestHelper, unittest.TestCase): 

1640 """Tests for datastore caching infrastructure.""" 

1641 

1642 @classmethod 

1643 def setUpClass(cls) -> None: 

1644 cls.storageClassFactory = StorageClassFactory() 

1645 cls.universe = DimensionUniverse() 

1646 

1647 # Ensure that we load the test storage class definitions. 

1648 scConfigFile = os.path.join(TESTDIR, "config/basic/storageClasses.yaml") 

1649 cls.storageClassFactory.addFromConfig(scConfigFile) 

1650 

1651 def setUp(self) -> None: 

1652 self.id = 0 

1653 

1654 # Create a root that we can use for caching tests. 

1655 self.root = tempfile.mkdtemp() 

1656 

1657 # Create some test dataset refs and associated test files 

1658 sc = self.storageClassFactory.getStorageClass("StructuredDataDict") 

1659 dimensions = self.universe.conform(("visit", "physical_filter")) 

1660 dataId = { 

1661 "instrument": "dummy", 

1662 "visit": 52, 

1663 "physical_filter": "V", 

1664 "band": "v", 

1665 "day_obs": 20250101, 

1666 } 

1667 

1668 # Create list of refs and list of temporary files 

1669 n_datasets = 10 

1670 self.refs = [self.makeDatasetRef(f"metric{n}", dimensions, sc, dataId) for n in range(n_datasets)] 

1671 

1672 root_uri = ResourcePath(self.root, forceDirectory=True) 

1673 self.files = [root_uri.join(f"file{n}.txt") for n in range(n_datasets)] 

1674 

1675 # Create test files. 

1676 for uri in self.files: 

1677 uri.write(b"0123456789") 

1678 

1679 # Create some composite refs with component files. 

1680 sc = self.storageClassFactory.getStorageClass("StructuredData") 

1681 self.composite_refs = [self.makeDatasetRef(f"composite{n}", dimensions, sc, dataId) for n in range(3)] 

1682 self.comp_files = [] 

1683 self.comp_refs = [] 

1684 for n, ref in enumerate(self.composite_refs): 

1685 component_refs = [] 

1686 component_files = [] 

1687 for component in sc.components: 

1688 component_ref = ref.makeComponentRef(component) 

1689 file = root_uri.join(f"composite_file-{n}-{component}.txt") 

1690 component_refs.append(component_ref) 

1691 component_files.append(file) 

1692 file.write(b"9876543210") 

1693 

1694 self.comp_files.append(component_files) 

1695 self.comp_refs.append(component_refs) 

1696 

1697 def tearDown(self) -> None: 

1698 if self.root is not None and os.path.exists(self.root): 

1699 shutil.rmtree(self.root, ignore_errors=True) 

1700 

1701 def _make_cache_manager(self, config_str: str) -> DatastoreCacheManager: 

1702 config = Config.fromYaml(config_str) 

1703 return DatastoreCacheManager(DatastoreCacheManagerConfig(config), universe=self.universe) 

1704 

1705 def testNoCacheDir(self) -> None: 

1706 config_str = """ 

1707cached: 

1708 root: null 

1709 cacheable: 

1710 metric0: true 

1711 """ 

1712 cache_manager = self._make_cache_manager(config_str) 

1713 

1714 # Look inside to check we don't have a cache directory 

1715 self.assertIsNone(cache_manager._cache_directory) 

1716 

1717 self.assertCache(cache_manager) 

1718 

1719 # Test that the cache directory is marked temporary 

1720 self.assertTrue(cache_manager.cache_directory.isTemporary) 

1721 

1722 def testNoCacheDirReversed(self) -> None: 

1723 """Use default caching status and metric1 to false""" 

1724 config_str = """ 

1725cached: 

1726 root: null 

1727 default: true 

1728 cacheable: 

1729 metric1: false 

1730 """ 

1731 cache_manager = self._make_cache_manager(config_str) 

1732 

1733 self.assertCache(cache_manager) 

1734 

1735 def testEnvvarCacheDir(self) -> None: 

1736 config_str = f""" 

1737cached: 

1738 root: '{self.root}' 

1739 cacheable: 

1740 metric0: true 

1741 """ 

1742 

1743 root = ResourcePath(self.root, forceDirectory=True) 

1744 env_dir = root.join("somewhere", forceDirectory=True) 

1745 elsewhere = root.join("elsewhere", forceDirectory=True) 

1746 

1747 # Environment variable should override the config value. 

1748 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath}): 

1749 cache_manager = self._make_cache_manager(config_str) 

1750 self.assertEqual(cache_manager.cache_directory, env_dir) 

1751 

1752 # This environment variable should not override the config value. 

1753 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}): 

1754 cache_manager = self._make_cache_manager(config_str) 

1755 self.assertEqual(cache_manager.cache_directory, root) 

1756 

1757 # No default setting. 

1758 config_str = """ 

1759cached: 

1760 root: null 

1761 default: true 

1762 cacheable: 

1763 metric1: false 

1764 """ 

1765 cache_manager = self._make_cache_manager(config_str) 

1766 

1767 # This environment variable should override the config value. 

1768 with unittest.mock.patch.dict(os.environ, {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": env_dir.ospath}): 

1769 cache_manager = self._make_cache_manager(config_str) 

1770 self.assertEqual(cache_manager.cache_directory, env_dir) 

1771 

1772 # If both environment variables are set the main (not IF_UNSET) 

1773 # variable should win. 

1774 with unittest.mock.patch.dict( 

1775 os.environ, 

1776 { 

1777 "DAF_BUTLER_CACHE_DIRECTORY": env_dir.ospath, 

1778 "DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": elsewhere.ospath, 

1779 }, 

1780 ): 

1781 cache_manager = self._make_cache_manager(config_str) 

1782 self.assertEqual(cache_manager.cache_directory, env_dir) 

1783 

1784 # Use the API to set the environment variable, making sure that the 

1785 # variable is reset on exit. 

1786 with unittest.mock.patch.dict( 

1787 os.environ, 

1788 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""}, 

1789 ): 

1790 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset() 

1791 self.assertTrue(defined) 

1792 cache_manager = self._make_cache_manager(config_str) 

1793 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True)) 

1794 

1795 # Now create the cache manager ahead of time and set the fallback 

1796 # later. 

1797 cache_manager = self._make_cache_manager(config_str) 

1798 self.assertIsNone(cache_manager._cache_directory) 

1799 with unittest.mock.patch.dict( 

1800 os.environ, 

1801 {"DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET": ""}, 

1802 ): 

1803 defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset() 

1804 self.assertTrue(defined) 

1805 self.assertEqual(cache_manager.cache_directory, ResourcePath(cache_dir, forceDirectory=True)) 

1806 

1807 def testExplicitCacheDir(self) -> None: 

1808 config_str = f""" 

1809cached: 

1810 root: '{self.root}' 

1811 cacheable: 

1812 metric0: true 

1813 """ 

1814 cache_manager = self._make_cache_manager(config_str) 

1815 

1816 # Look inside to check we do have a cache directory. 

1817 self.assertEqual(cache_manager.cache_directory, ResourcePath(self.root, forceDirectory=True)) 

1818 

1819 self.assertCache(cache_manager) 

1820 

1821 # Test that the cache directory is not marked temporary 

1822 self.assertFalse(cache_manager.cache_directory.isTemporary) 

1823 

1824 def testUnexpectedFilesInCacheDir(self) -> None: 

1825 """Test for regression of a bug where extraneous files in a cache 

1826 directory would cause all cache lookups to raise an exception. 

1827 """ 

1828 config_str = f""" 

1829cached: 

1830 root: '{self.root}' 

1831 cacheable: 

1832 metric0: true 

1833 """ 

1834 

1835 for filename in ["unexpected.txt", "unexpected", "un_expected", "un_expected.txt"]: 

1836 unexpected_file = os.path.join(self.root, filename) 

1837 with open(unexpected_file, "w") as fh: 

1838 fh.write("test") 

1839 

1840 cache_manager = self._make_cache_manager(config_str) 

1841 cache_manager.scan_cache() 

1842 self.assertCache(cache_manager) 

1843 

1844 def assertCache(self, cache_manager: DatastoreCacheManager) -> None: 

1845 self.assertTrue(cache_manager.should_be_cached(self.refs[0])) 

1846 self.assertFalse(cache_manager.should_be_cached(self.refs[1])) 

1847 

1848 uri = cache_manager.move_to_cache(self.files[0], self.refs[0]) 

1849 self.assertIsInstance(uri, ResourcePath) 

1850 self.assertIsNone(cache_manager.move_to_cache(self.files[1], self.refs[1])) 

1851 

1852 # Check presence in cache using ref and then using file extension. 

1853 self.assertFalse(cache_manager.known_to_cache(self.refs[1])) 

1854 self.assertTrue(cache_manager.known_to_cache(self.refs[0])) 

1855 self.assertFalse(cache_manager.known_to_cache(self.refs[1], self.files[1].getExtension())) 

1856 self.assertTrue(cache_manager.known_to_cache(self.refs[0], self.files[0].getExtension())) 

1857 

1858 # Cached file should no longer exist but uncached file should be 

1859 # unaffected. 

1860 self.assertFalse(self.files[0].exists()) 

1861 self.assertTrue(self.files[1].exists()) 

1862 

1863 # Should find this file and it should be within the cache directory. 

1864 with cache_manager.find_in_cache(self.refs[0], ".txt") as found: 

1865 self.assertTrue(found.exists()) 

1866 self.assertIsNotNone(found.relative_to(cache_manager.cache_directory)) 

1867 

1868 # Should not be able to find these in cache 

1869 with cache_manager.find_in_cache(self.refs[0], ".fits") as found: 

1870 self.assertIsNone(found) 

1871 with cache_manager.find_in_cache(self.refs[1], ".fits") as found: 

1872 self.assertIsNone(found) 

1873 

1874 def testNoCache(self) -> None: 

1875 cache_manager = DatastoreDisabledCacheManager("", universe=self.universe) 

1876 for uri, ref in zip(self.files, self.refs, strict=True): 

1877 self.assertFalse(cache_manager.should_be_cached(ref)) 

1878 self.assertIsNone(cache_manager.move_to_cache(uri, ref)) 

1879 self.assertFalse(cache_manager.known_to_cache(ref)) 

1880 with cache_manager.find_in_cache(ref, ".txt") as found: 

1881 self.assertIsNone(found, msg=f"{cache_manager}") 

1882 

1883 def _expiration_config(self, mode: str, threshold: int) -> str: 

1884 return f""" 

1885cached: 

1886 default: true 

1887 expiry: 

1888 mode: {mode} 

1889 threshold: {threshold} 

1890 cacheable: 

1891 unused: true 

1892 """ 

1893 

1894 def testCacheExpiryFiles(self) -> None: 

1895 threshold = 2 # Keep at least 2 files. 

1896 mode = "files" 

1897 config_str = self._expiration_config(mode, threshold) 

1898 

1899 cache_manager = self._make_cache_manager(config_str) 

1900 

1901 # Check that an empty cache returns unknown for arbitrary ref 

1902 self.assertFalse(cache_manager.known_to_cache(self.refs[0])) 

1903 

1904 # Should end with datasets: 2, 3, 4 

1905 self.assertExpiration(cache_manager, 5, threshold + 1) 

1906 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1907 

1908 # Check that we will not expire a file that is actively in use. 

1909 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1910 self.assertIsNotNone(found) 

1911 

1912 # Trigger cache expiration that should remove the file 

1913 # we just retrieved. Should now have: 3, 4, 5 

1914 cached = cache_manager.move_to_cache(self.files[5], self.refs[5]) 

1915 self.assertIsNotNone(cached) 

1916 

1917 # Cache should still report the standard file count. 

1918 self.assertEqual(cache_manager.file_count, threshold + 1) 

1919 

1920 # Add additional entry to cache. 

1921 # Should now have 4, 5, 6 

1922 cached = cache_manager.move_to_cache(self.files[6], self.refs[6]) 

1923 self.assertIsNotNone(cached) 

1924 

1925 # Is the file still there? 

1926 self.assertTrue(found.exists()) 

1927 

1928 # Can we read it? 

1929 data = found.read() 

1930 self.assertGreater(len(data), 0) 

1931 

1932 # Outside context the file should no longer exist. 

1933 self.assertFalse(found.exists()) 

1934 

1935 # File count should not have changed. 

1936 self.assertEqual(cache_manager.file_count, threshold + 1) 

1937 

1938 # Dataset 2 was in the exempt directory but because hardlinks 

1939 # are used it was deleted from the main cache during cache expiry 

1940 # above and so should no longer be found. 

1941 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1942 self.assertIsNone(found) 

1943 

1944 # And the one stored after it is also gone. 

1945 with cache_manager.find_in_cache(self.refs[3], ".txt") as found: 

1946 self.assertIsNone(found) 

1947 

1948 # But dataset 4 is present. 

1949 with cache_manager.find_in_cache(self.refs[4], ".txt") as found: 

1950 self.assertIsNotNone(found) 

1951 

1952 # Adding a new dataset to the cache should now delete it. 

1953 cache_manager.move_to_cache(self.files[7], self.refs[7]) 

1954 

1955 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

1956 self.assertIsNone(found) 

1957 

1958 def testCacheExpiryDatasets(self) -> None: 

1959 threshold = 2 # Keep 2 datasets. 

1960 mode = "datasets" 

1961 config_str = self._expiration_config(mode, threshold) 

1962 

1963 cache_manager = self._make_cache_manager(config_str) 

1964 self.assertExpiration(cache_manager, 5, threshold + 1) 

1965 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1966 

1967 def testCacheExpiryDatasetsFromDisabled(self) -> None: 

1968 threshold = 2 

1969 mode = "datasets" 

1970 with unittest.mock.patch.dict( 

1971 os.environ, 

1972 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"}, 

1973 ): 

1974 cache_manager = DatastoreCacheManager.create_disabled(universe=DimensionUniverse()) 

1975 self.assertExpiration(cache_manager, 5, threshold + 1) 

1976 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1977 

1978 def testExpirationModeOverride(self) -> None: 

1979 threshold = 2 # Keep 2 datasets. 

1980 mode = "datasets" 

1981 config_str = self._expiration_config(mode, threshold) 

1982 

1983 mode = "size" 

1984 threshold = 55 

1985 with unittest.mock.patch.dict( 

1986 os.environ, 

1987 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": f"{mode}={threshold}"}, 

1988 ): 

1989 cache_manager = self._make_cache_manager(config_str) 

1990 self.assertExpiration(cache_manager, 10, 6) 

1991 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

1992 

1993 # Check we get a warning with unrecognized form. 

1994 with unittest.mock.patch.dict( 

1995 os.environ, 

1996 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something"}, 

1997 ): 

1998 with self.assertLogs(level="WARNING") as cm: 

1999 self._make_cache_manager(config_str) 

2000 self.assertIn("Unrecognized form (something)", cm.output[0]) 

2001 

2002 with unittest.mock.patch.dict( 

2003 os.environ, 

2004 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "something=5"}, 

2005 ): 

2006 with self.assertRaises(ValueError) as cm: 

2007 self._make_cache_manager(config_str) 

2008 self.assertIn("Unrecognized value", str(cm.exception)) 

2009 

2010 def testMissingThreshold(self) -> None: 

2011 threshold = "" 

2012 mode = "datasets" 

2013 config_str = self._expiration_config(mode, threshold) 

2014 

2015 with self.assertRaises(ValueError) as cm: 

2016 self._make_cache_manager(config_str) 

2017 self.assertIn("Cache expiration threshold", str(cm.exception)) 

2018 

2019 def testCacheExpiryDatasetsComposite(self) -> None: 

2020 threshold = 2 # Keep 2 datasets. 

2021 mode = "datasets" 

2022 config_str = self._expiration_config(mode, threshold) 

2023 

2024 cache_manager = self._make_cache_manager(config_str) 

2025 

2026 n_datasets = 3 

2027 for i in range(n_datasets): 

2028 for component_file, component_ref in zip(self.comp_files[i], self.comp_refs[i], strict=True): 

2029 cached = cache_manager.move_to_cache(component_file, component_ref) 

2030 self.assertIsNotNone(cached) 

2031 self.assertTrue(cache_manager.known_to_cache(component_ref)) 

2032 self.assertTrue(cache_manager.known_to_cache(component_ref.makeCompositeRef())) 

2033 self.assertTrue(cache_manager.known_to_cache(component_ref, component_file.getExtension())) 

2034 

2035 self.assertEqual(cache_manager.file_count, 6) # 2 datasets each of 3 files 

2036 

2037 # Write two new non-composite and the number of files should drop. 

2038 self.assertExpiration(cache_manager, 2, 5) 

2039 

2040 def testCacheExpirySize(self) -> None: 

2041 threshold = 55 # Each file is 10 bytes 

2042 mode = "size" 

2043 config_str = self._expiration_config(mode, threshold) 

2044 

2045 cache_manager = self._make_cache_manager(config_str) 

2046 self.assertExpiration(cache_manager, 10, 6) 

2047 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2048 

2049 def testDisabledCache(self) -> None: 

2050 # Configure an active cache but disable via environment. 

2051 threshold = 2 

2052 mode = "datasets" 

2053 config_str = self._expiration_config(mode, threshold) 

2054 

2055 with unittest.mock.patch.dict( 

2056 os.environ, 

2057 {"DAF_BUTLER_CACHE_EXPIRATION_MODE": "disabled"}, 

2058 ): 

2059 env_cache_manager = self._make_cache_manager(config_str) 

2060 

2061 # Configure to be disabled 

2062 threshold = 0 

2063 mode = "disabled" 

2064 config_str = self._expiration_config(mode, threshold) 

2065 cfg_cache_manager = self._make_cache_manager(config_str) 

2066 

2067 for cache_manager in ( 

2068 cfg_cache_manager, 

2069 env_cache_manager, 

2070 DatastoreCacheManager.create_disabled(universe=DimensionUniverse()), 

2071 ): 

2072 for uri, ref in zip(self.files, self.refs, strict=True): 

2073 self.assertFalse(cache_manager.should_be_cached(ref)) 

2074 self.assertIsNone(cache_manager.move_to_cache(uri, ref)) 

2075 self.assertFalse(cache_manager.known_to_cache(ref)) 

2076 with cache_manager.find_in_cache(ref, ".txt") as found: 

2077 self.assertIsNone(found, msg=f"{cache_manager}") 

2078 self.assertIn("disabled", str(cache_manager)) 

2079 

2080 def assertExpiration( 

2081 self, cache_manager: DatastoreCacheManager, n_datasets: int, n_retained: int 

2082 ) -> None: 

2083 """Insert the datasets and then check the number retained.""" 

2084 for i in range(n_datasets): 

2085 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2086 self.assertIsNotNone(cached) 

2087 

2088 self.assertEqual(cache_manager.file_count, n_retained) 

2089 

2090 # The oldest file should not be in the cache any more. 

2091 for i in range(n_datasets): 

2092 with cache_manager.find_in_cache(self.refs[i], ".txt") as found: 

2093 if i >= n_datasets - n_retained: 

2094 self.assertIsInstance(found, ResourcePath) 

2095 else: 

2096 self.assertIsNone(found) 

2097 

2098 def testCacheExpiryAge(self) -> None: 

2099 threshold = 1 # Expire older than 2 seconds 

2100 mode = "age" 

2101 config_str = self._expiration_config(mode, threshold) 

2102 

2103 cache_manager = self._make_cache_manager(config_str) 

2104 self.assertIn(f"{mode}={threshold}", str(cache_manager)) 

2105 

2106 # Insert 3 files, then sleep, then insert more. 

2107 for i in range(2): 

2108 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2109 self.assertIsNotNone(cached) 

2110 time.sleep(2.0) 

2111 for j in range(4): 

2112 i = 2 + j # Continue the counting 

2113 cached = cache_manager.move_to_cache(self.files[i], self.refs[i]) 

2114 self.assertIsNotNone(cached) 

2115 

2116 # Only the files written after the sleep should exist. 

2117 self.assertEqual(cache_manager.file_count, 4) 

2118 with cache_manager.find_in_cache(self.refs[1], ".txt") as found: 

2119 self.assertIsNone(found) 

2120 with cache_manager.find_in_cache(self.refs[2], ".txt") as found: 

2121 self.assertIsInstance(found, ResourcePath) 

2122 

2123 

2124class NullDatastoreTestCase(DatasetTestHelper, unittest.TestCase): 

2125 """Test the null datastore.""" 

2126 

2127 storageClassFactory = StorageClassFactory() 

2128 

2129 def test_basics(self) -> None: 

2130 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

2131 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2132 

2133 null = NullDatastore(None, None) 

2134 

2135 self.assertFalse(null.exists(ref)) 

2136 self.assertFalse(null.knows(ref)) 

2137 knows = null.knows_these([ref]) 

2138 self.assertFalse(knows[ref]) 

2139 null.validateConfiguration(ref) 

2140 

2141 with self.assertRaises(FileNotFoundError): 

2142 null.get(ref) 

2143 with self.assertRaises(NotImplementedError): 

2144 null.put("", ref) 

2145 with self.assertRaises(FileNotFoundError): 

2146 null.getURI(ref) 

2147 with self.assertRaises(FileNotFoundError): 

2148 null.getURIs(ref) 

2149 with self.assertRaises(FileNotFoundError): 

2150 null.getManyURIs([ref]) 

2151 with self.assertRaises(NotImplementedError): 

2152 null.getLookupKeys() 

2153 with self.assertRaises(NotImplementedError): 

2154 null.import_records({}) 

2155 with self.assertRaises(NotImplementedError): 

2156 null.export_records([]) 

2157 with self.assertRaises(NotImplementedError): 

2158 null.export_predicted_records([]) 

2159 with self.assertRaises(NotImplementedError): 

2160 null.export([ref]) 

2161 with self.assertRaises(NotImplementedError): 

2162 null.transfer(null, ref) 

2163 with self.assertRaises(NotImplementedError): 

2164 null.emptyTrash() 

2165 with self.assertRaises(NotImplementedError): 

2166 null.trash(ref) 

2167 with self.assertRaises(NotImplementedError): 

2168 null.forget([ref]) 

2169 with self.assertRaises(NotImplementedError): 

2170 null.remove(ref) 

2171 with self.assertRaises(NotImplementedError): 

2172 null.retrieveArtifacts([ref], ResourcePath(".")) 

2173 with self.assertRaises(NotImplementedError): 

2174 null.transfer_from(null, [ref]) 

2175 with self.assertRaises(NotImplementedError): 

2176 null.ingest() 

2177 

2178 

2179class DatasetRefURIsTestCase(unittest.TestCase): 

2180 """Tests for DatasetRefURIs.""" 

2181 

2182 def testSequenceAccess(self) -> None: 

2183 """Verify that DatasetRefURIs can be treated like a two-item tuple.""" 

2184 uris = DatasetRefURIs() 

2185 

2186 self.assertEqual(len(uris), 2) 

2187 self.assertEqual(uris[0], None) 

2188 self.assertEqual(uris[1], {}) 

2189 

2190 primaryURI = ResourcePath("1/2/3") 

2191 componentURI = ResourcePath("a/b/c") 

2192 

2193 # affirm that DatasetRefURIs does not support MutableSequence functions 

2194 with self.assertRaises(TypeError): 

2195 uris[0] = primaryURI 

2196 with self.assertRaises(TypeError): 

2197 uris[1] = {"foo": componentURI} 

2198 

2199 # but DatasetRefURIs can be set by property name: 

2200 uris.primaryURI = primaryURI 

2201 uris.componentURIs = {"foo": componentURI} 

2202 self.assertEqual(uris.primaryURI, primaryURI) 

2203 self.assertEqual(uris[0], primaryURI) 

2204 

2205 primary, components = uris 

2206 self.assertEqual(primary, primaryURI) 

2207 self.assertEqual(components, {"foo": componentURI}) 

2208 

2209 def testRepr(self) -> None: 

2210 """Verify __repr__ output.""" 

2211 uris = DatasetRefURIs(ResourcePath("/1/2/3"), {"comp": ResourcePath("/a/b/c")}) 

2212 self.assertEqual( 

2213 repr(uris), 

2214 'DatasetRefURIs(ResourcePath("file:///1/2/3"), {\'comp\': ResourcePath("file:///a/b/c")})', 

2215 ) 

2216 

2217 

2218class StoredFileInfoTestCase(DatasetTestHelper, unittest.TestCase): 

2219 """Test the StoredFileInfo class.""" 

2220 

2221 storageClassFactory = StorageClassFactory() 

2222 

2223 def test_StoredFileInfo(self) -> None: 

2224 storageClass = self.storageClassFactory.getStorageClass("StructuredDataDict") 

2225 ref = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2226 

2227 record = dict( 

2228 storage_class="StructuredDataDict", 

2229 formatter="lsst.daf.butler.Formatter", 

2230 path="a/b/c.txt", 

2231 component="component", 

2232 checksum=None, 

2233 file_size=5, 

2234 ) 

2235 info = StoredFileInfo.from_record(record) 

2236 

2237 self.assertEqual(info.to_record(), record) 

2238 

2239 ref2 = self.makeDatasetRef("metric", DimensionUniverse().empty, storageClass, {}) 

2240 rebased = info.rebase(ref2) 

2241 self.assertEqual(rebased.rebase(ref), info) 

2242 

2243 with self.assertRaises(TypeError): 

2244 rebased.update(formatter=42) 

2245 

2246 with self.assertRaises(ValueError): 

2247 rebased.update(something=42, new="42") 

2248 

2249 # Check that pickle works on StoredFileInfo. 

2250 pickled_info = pickle.dumps(info) 

2251 unpickled_info = pickle.loads(pickled_info) 

2252 self.assertEqual(unpickled_info, info) 

2253 

2254 def test_make_datastore_path_relative(self): 

2255 self.assertEqual(make_datastore_path_relative("a/relative/path"), "a/relative/path") 

2256 self.assertEqual(make_datastore_path_relative("path/with#fragment"), "path/with#fragment") 

2257 self.assertEqual(make_datastore_path_relative("http://server.com/some/path"), "some/path") 

2258 self.assertEqual(make_datastore_path_relative("http://server.com/some/path#frag"), "some/path#frag") 

2259 

2260 def test_datastore_record_data_json_types(self): 

2261 """Test that we don't round-trip checksums to UUIDs when deserializing 

2262 datastore record data. 

2263 """ 

2264 test_json = """ 

2265 { 

2266 "dataset_ids": [ 

2267 "74478304-abf1-4a9c-9eb2-926090a84446" 

2268 ], 

2269 "records": { 

2270 "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo": { 

2271 "74478304abf14a9c9eb2926090a84446": { 

2272 "file_datastore_records": [ 

2273 { 

2274 "formatter": "lsst.daf.butler.formatters.yaml.YamlFormatter", 

2275 "path": "gain_factors/base-2025-158/gain_factors_spx_base-2025-158.yaml", 

2276 "storage_class": "GainFactors", 

2277 "component": "__NULL_STRING__", 

2278 "checksum": "cab515f6-ab67-0484-393f-aaa525dd526f", 

2279 "file_size": 5412 

2280 } 

2281 ] 

2282 } 

2283 } 

2284 } 

2285 } 

2286 """ 

2287 id_str = "74478304abf14a9c9eb2926090a84446" 

2288 s = SerializedDatastoreRecordData.model_validate_json(test_json) 

2289 self.assertIsInstance( 

2290 s.records[get_full_type_name(StoredFileInfo)][id_str]["file_datastore_records"][0]["checksum"], 

2291 str, 

2292 ) 

2293 id = uuid.UUID(id_str) 

2294 d = DatastoreRecordData.from_simple(s) 

2295 self.assertIsInstance(d.records[id]["file_datastore_records"][0], StoredFileInfo) 

2296 self.assertIsInstance(d.records[id]["file_datastore_records"][0].checksum, str) 

2297 

2298 

2299@contextlib.contextmanager 

2300def _temp_yaml_file(data: Any) -> Iterator[str]: 

2301 fh = tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") 

2302 try: 

2303 yaml.dump(data, stream=fh) 

2304 fh.flush() 

2305 yield fh.name 

2306 finally: 

2307 # Some tests delete the file 

2308 with contextlib.suppress(FileNotFoundError): 

2309 fh.close() 

2310 

2311 

2312if __name__ == "__main__": 

2313 unittest.main()