Coverage for python / lsst / obs / base / ingest_tests.py: 27%

237 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:46 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Base class for writing Gen3 raw data ingest tests.""" 

23 

24from __future__ import annotations 

25 

26__all__ = ("IngestTestBase",) 

27 

28import abc 

29import os 

30import shutil 

31import tempfile 

32import unittest 

33from typing import TYPE_CHECKING 

34 

35import lsst.afw.cameraGeom 

36import lsst.afw.cameraGeom.testUtils # For assertDetectorsEqual 

37import lsst.obs.base 

38from lsst.daf.butler import Butler, DataCoordinate, Registry 

39from lsst.daf.butler.cli.butler import cli as butlerCli 

40from lsst.daf.butler.cli.utils import LogCliRunner 

41from lsst.obs.base import Instrument 

42from lsst.pipe.base import Task 

43from lsst.resources import ResourcePath 

44from lsst.utils import doImportType 

45 

46from . import script 

47 

48if TYPE_CHECKING: 

49 from collections.abc import Callable 

50 

51 import lsst.afw.image 

52 import lsst.sphgeom 

53 

54 

55class IngestTestBase(metaclass=abc.ABCMeta): 

56 """Base class for tests of gen3 ingest. Subclass from this, then 

57 `unittest.TestCase` to get a working test suite. 

58 """ 

59 

60 ingestDir: str = "" 

61 """Root path to ingest files into. Typically ``obs_package/tests/``; the 

62 actual directory will be a tempdir under this one. 

63 """ 

64 

65 ingestDatasetTypeName: str = "raw" 

66 """The DatasetType to use for the ingest. 

67 

68 If this is not an Exposure dataset type the tests will be more limited. 

69 """ 

70 

71 dataIds: list[DataCoordinate] = [] 

72 """list of butler data IDs of files that should have been ingested.""" 

73 

74 file: str = "" 

75 """Full path to a file to ingest in tests.""" 

76 

77 filterLabel: lsst.afw.image.FilterLabel = None 

78 """The lsst.afw.image.FilterLabel that should be returned by the above 

79 file.""" 

80 

81 rawIngestTask: str = "lsst.obs.base.RawIngestTask" 

82 """The task to use in the Ingest test.""" 

83 

84 curatedCalibrationDatasetTypes: list[str] | None = None 

85 """List or tuple of Datasets types that should be present after calling 

86 writeCuratedCalibrations. If `None` writeCuratedCalibrations will 

87 not be called and the test will be skipped.""" 

88 

89 defineVisitsTask: type[Task] = lsst.obs.base.DefineVisitsTask 

90 """The task to use to define visits from groups of exposures. 

91 This is ignored if ``visits`` is `None`. 

92 """ 

93 

94 visits: dict[DataCoordinate, DataCoordinate] = {} 

95 """A dictionary mapping visit data IDs the lists of exposure data IDs that 

96 are associated with them. 

97 If this is empty (but not `None`), visit definition will be run but no 

98 visits will be expected (e.g. because no exposures are on-sky 

99 observations). 

100 """ 

101 

102 seed_config: str | None = None 

103 """Location of a seed configuration file to pass to butler create. 

104 

105 Useful if additional formatters or storage classes need to be defined. 

106 """ 

107 

108 root: str 

109 """Root directory of the test butler.""" 

110 

111 datastore_root: ResourcePath 

112 """Root of the file datastore used for testing.""" 

113 

114 if TYPE_CHECKING: 

115 enterContext: Callable 

116 assertEqual: Callable 

117 assertIsNotNone: Callable 

118 assertTrue: Callable 

119 assertRaises: Callable 

120 assertIn: Callable 

121 assertFalse: Callable 

122 assertIsInstance: Callable 

123 assertCountEqual: Callable 

124 skipTest: Callable 

125 assertGreater: Callable 

126 assertDetectorsEqual: Callable 

127 subTest: Callable 

128 

129 def id(self) -> str: ... 

130 

131 @property 

132 @abc.abstractmethod 

133 def instrumentClassName(self) -> str: 

134 """The fully qualified instrument class name. 

135 

136 Returns 

137 ------- 

138 `str` 

139 The fully qualified instrument class name. 

140 """ 

141 pass 

142 

143 @property 

144 def instrumentClass(self) -> type[Instrument]: 

145 """The instrument class.""" 

146 inst_class = doImportType(self.instrumentClassName) 

147 assert issubclass(inst_class, Instrument) 

148 return inst_class 

149 

150 @property 

151 def instrumentName(self) -> str: 

152 """The name of the instrument. 

153 

154 Returns 

155 ------- 

156 `str` 

157 The name of the instrument. 

158 """ 

159 return self.instrumentClass.getName() 

160 

161 @classmethod 

162 def setUpClass(cls) -> None: 

163 # Use a temporary working directory. 

164 cls.root = tempfile.mkdtemp(dir=cls.ingestDir) 

165 cls._createRepo() 

166 

167 # Register the instrument and its static metadata. 

168 cls._registerInstrument() 

169 

170 # Determine the relevant datastore root to use for testing. 

171 with Butler.from_config(cls.root) as butler: 

172 roots = butler.get_datastore_roots() 

173 assert len(roots) == 1 # Only one datastore. 

174 _, root = roots.popitem() 

175 assert isinstance(root, ResourcePath) 

176 cls.datastore_root = root 

177 

178 def setUp(self) -> None: 

179 # Want a unique run name per test. 

180 self.outputRun = "raw_ingest_" + self.id() 

181 

182 @classmethod 

183 def tearDownClass(cls) -> None: 

184 if os.path.exists(cls.root): 

185 shutil.rmtree(cls.root, ignore_errors=True) 

186 

187 def verifyIngest( 

188 self, files: list[str] | None = None, cli: bool = False, fullCheck: bool = False 

189 ) -> None: 

190 """Test that RawIngestTask ingested the expected files. 

191 

192 Parameters 

193 ---------- 

194 files : `list` [`str`], or None 

195 List of files to be ingested, or None to use ``self.file`` 

196 cli : `bool`, optional 

197 Unused. 

198 fullCheck : `bool`, optional 

199 If `True`, read the full raw dataset and check component 

200 consistency. If `False` check that a component can be read 

201 but do not read the entire raw exposure. 

202 

203 Notes 

204 ----- 

205 Reading all the ingested test data can be expensive. The code paths 

206 for reading the second raw are the same as reading the first so 

207 we do not gain anything by doing full checks of everything. 

208 Only read full pixel data for first dataset from file. 

209 Don't even do that if we are requested not to by the caller. 

210 This only really affects files that contain multiple datasets. 

211 """ 

212 butler = Butler.from_config(self.root, run=self.outputRun) 

213 self.enterContext(butler) 

214 datasets = list(butler.registry.queryDatasets(self.ingestDatasetTypeName, collections=self.outputRun)) 

215 self.assertEqual(len(datasets), len(self.dataIds)) 

216 

217 # Can check that the timespan in the day_obs matches the exposure 

218 # record. 

219 if "day_obs" in butler.dimensions: 

220 days = { 

221 (rec.instrument, rec.id): rec.timespan 

222 for rec in butler.registry.queryDimensionRecords("day_obs") 

223 } 

224 

225 exp_records = list(butler.registry.queryDimensionRecords("exposure")) 

226 for exp in exp_records: 

227 day_span = days[exp.instrument, exp.day_obs] 

228 if day_span is not None: 

229 self.assertTrue( 

230 day_span.contains(exp.timespan.begin), f"Timespan mismatch of {exp} and {day_span}" 

231 ) 

232 

233 # Get the URI to the first dataset and check it is inside the 

234 # datastore. 

235 datasetUri = butler.getURI(datasets[0]) 

236 self.assertIsNotNone(datasetUri.relative_to(self.datastore_root)) 

237 

238 # Get the relevant dataset type. 

239 datasetType = butler.get_dataset_type(self.ingestDatasetTypeName) 

240 

241 for dataId in self.dataIds: 

242 # For testing we only read the entire dataset the first time 

243 # round if this is an Exposure. If it's not an Exposure 

244 # we always read it completely but we don't read components 

245 # because for an arbitrary dataset type we can't easily tell 

246 # what component to test. 

247 

248 if not datasetType.storageClass.name.startswith("Exposure"): 

249 exposure = butler.get(self.ingestDatasetTypeName, dataId) 

250 # Could be anything so nothing to test by default 

251 continue 

252 

253 # Check that we can read metadata from a raw. 

254 metadata = butler.get(f"{self.ingestDatasetTypeName}.metadata", dataId) 

255 if not fullCheck: 

256 continue 

257 fullCheck = False 

258 exposure = butler.get(self.ingestDatasetTypeName, dataId) 

259 

260 # Comparing headers will not work directly because of header 

261 # fix up provenance. 

262 metadata_headers = metadata.toDict() 

263 exposure_headers = exposure.getMetadata().toDict() 

264 metadata_headers.pop("HIERARCH ASTRO METADATA FIX DATE", None) 

265 exposure_headers.pop("HIERARCH ASTRO METADATA FIX DATE", None) 

266 self.assertEqual(metadata_headers, exposure_headers) 

267 

268 # Since components follow a different code path we check that 

269 # WCS match and also we check that at least the shape 

270 # of the image is the same (rather than doing per-pixel equality) 

271 wcs = butler.get(f"{self.ingestDatasetTypeName}.wcs", dataId) 

272 self.assertEqual(wcs, exposure.getWcs()) 

273 

274 rawImage = butler.get(f"{self.ingestDatasetTypeName}.image", dataId) 

275 self.assertEqual(rawImage.getBBox(), exposure.getBBox()) 

276 

277 # Check that the filter label got the correct band. 

278 filterLabel = butler.get(f"{self.ingestDatasetTypeName}.filter", dataId) 

279 self.assertEqual(filterLabel, self.filterLabel) 

280 

281 # Check that the exposure's Detector is the same as the component 

282 # we would read (this is tricky for LSST, which modifies its 

283 # detector at read time; for most other cameras it should be 

284 # trivially satisfied. 

285 detector = butler.get(f"{self.ingestDatasetTypeName}.detector", dataId) 

286 self.assertDetectorsEqual(detector, exposure.getDetector(), compareTransforms=False) 

287 

288 self.checkRepo(files=files) 

289 

290 def checkRepo(self, files: list[str] | None = None) -> None: 

291 """Check the state of the repository after ingest. 

292 

293 This is an optional hook provided for subclasses; by default it does 

294 nothing. 

295 

296 Parameters 

297 ---------- 

298 files : `list` [`str`], or None 

299 List of files to be ingested, or None to use ``self.file`` 

300 """ 

301 return 

302 

303 @classmethod 

304 def _createRepo(cls) -> None: 

305 """Use the Click `testing` module to call the butler command line api 

306 to create a repository. 

307 """ 

308 runner = LogCliRunner() 

309 args = [] 

310 if cls.seed_config: 

311 args.extend(["--seed-config", cls.seed_config]) 

312 result = runner.invoke(butlerCli, ["create", cls.root, *args]) 

313 # Classmethod so assertEqual does not work. 

314 assert result.exit_code == 0, f"output: {result.output} exception: {result.exception}" 

315 

316 def _ingestRaws(self, transfer: str, file: str | None = None, skip_existing: bool = True) -> None: 

317 """Use the Click `testing` module to call the butler command line api 

318 to ingest raws. 

319 

320 Parameters 

321 ---------- 

322 transfer : `str` 

323 The external data transfer type. 

324 file : `str` or `None`, optional 

325 Path to a file to ingest instead of the default associated with 

326 the object. 

327 skip_existing : `bool`, optional 

328 Whether to use the ``--no-skip-existing`` flag for ingest. 

329 """ 

330 if file is None: 

331 file = self.file 

332 

333 args = [ 

334 "ingest-raws", 

335 self.root, 

336 file, 

337 "--output-run", 

338 self.outputRun, 

339 "--transfer", 

340 transfer, 

341 "--ingest-task", 

342 self.rawIngestTask, 

343 ] 

344 if not skip_existing: 

345 args.append("--no-skip-existing") 

346 runner = LogCliRunner() 

347 result = runner.invoke(butlerCli, args) 

348 self.assertEqual(result.exit_code, 0, f"output: {result.output} exception: {result.exception}") 

349 

350 @classmethod 

351 def _registerInstrument(cls) -> None: 

352 """Use the Click `testing` module to call the butler command line api 

353 to register the instrument. 

354 """ 

355 runner = LogCliRunner() 

356 result = runner.invoke(butlerCli, ["register-instrument", cls.root, cls.instrumentClassName]) 

357 # Classmethod so assertEqual does not work. 

358 assert result.exit_code == 0, f"output: {result.output} exception: {result.exception}" 

359 

360 def _writeCuratedCalibrations(self) -> None: 

361 """Use the Click `testing` module to call the butler command line api 

362 to write curated calibrations. 

363 """ 

364 runner = LogCliRunner() 

365 result = runner.invoke( 

366 butlerCli, ["write-curated-calibrations", self.root, self.instrumentName, "test"] 

367 ) 

368 self.assertEqual(result.exit_code, 0, f"output: {result.output} exception: {result.exception}") 

369 

370 def testLink(self) -> None: 

371 self._ingestRaws(transfer="link") 

372 self.verifyIngest() 

373 

374 def testSymLink(self) -> None: 

375 self._ingestRaws(transfer="symlink") 

376 self.verifyIngest() 

377 

378 def testDirect(self) -> None: 

379 self._ingestRaws(transfer="direct") 

380 

381 # Check that it really did have a URI outside of datastore. 

382 srcUri = ResourcePath(self.file, forceAbsolute=True) 

383 butler = Butler.from_config(self.root, run=self.outputRun) 

384 self.enterContext(butler) 

385 datasets = list(butler.registry.queryDatasets(self.ingestDatasetTypeName, collections=self.outputRun)) 

386 datastoreUri = butler.getURI(datasets[0]) 

387 self.assertEqual(datastoreUri, srcUri) 

388 

389 def testCopy(self) -> None: 

390 self._ingestRaws(transfer="copy") 

391 # Only test full read of raws for the copy test. No need to do it 

392 # in the other tests since the formatter will be the same in all 

393 # cases. 

394 self.verifyIngest(fullCheck=True) 

395 

396 def testHardLink(self) -> None: 

397 try: 

398 self._ingestRaws(transfer="hardlink") 

399 # Running ingest through the Click testing infrastructure causes 

400 # the original exception indicating that we can't hard-link 

401 # on this filesystem to be turned into a nonzero exit code, which 

402 # then trips the test assertion. 

403 except (AssertionError, PermissionError) as err: 

404 raise unittest.SkipTest( 

405 "Skipping hard-link test because input data is on a different filesystem." 

406 ) from err 

407 self.verifyIngest() 

408 

409 def testInPlace(self) -> None: 

410 """Test that files already in the directory can be added to the 

411 registry in-place. 

412 """ 

413 butler = Butler.from_config(self.root, run=self.outputRun) 

414 self.enterContext(butler) 

415 

416 # If the test uses an index file the index file needs to also 

417 # appear in the datastore root along with the file to be ingested. 

418 # In that scenario the file name being used for ingest can not 

419 # be modified and must have the same name as found in the index 

420 # file itself. 

421 source_file_uri = ResourcePath(self.file) 

422 index_file = source_file_uri.dirname().join("_index.json") 

423 pathInStore = source_file_uri.basename() 

424 if index_file.exists(): 

425 os.symlink(index_file.ospath, self.datastore_root.join("_index.json").ospath) 

426 else: 

427 # No index file so we are free to pick any name. 

428 pathInStore = "prefix-" + pathInStore 

429 

430 # Create a symlink to the original file so that it looks like it 

431 # is now inside the datastore. 

432 newPath = self.datastore_root.join(pathInStore) 

433 os.symlink(os.path.abspath(self.file), newPath.ospath) 

434 

435 # If there is a sidecar file it needs to be linked in as well 

436 # since ingest code does not follow symlinks. 

437 sidecar_uri = ResourcePath(source_file_uri).updatedExtension(".json") 

438 if sidecar_uri.exists(): 

439 newSidecar = ResourcePath(newPath).updatedExtension(".json") 

440 os.symlink(sidecar_uri.ospath, newSidecar.ospath) 

441 

442 # Run ingest with auto mode since that should automatically determine 

443 # that an in-place ingest is happening. 

444 self._ingestRaws(transfer="auto", file=newPath.ospath) 

445 self.verifyIngest() 

446 

447 # Recreate a butler post-ingest (the earlier one won't see the 

448 # ingested files). 

449 butler = Butler.from_config(self.root, run=self.outputRun) 

450 self.enterContext(butler) 

451 

452 # Check that the URI associated with this path is the right one. 

453 uri = butler.getURI(self.ingestDatasetTypeName, self.dataIds[0]) 

454 self.assertEqual(uri.relative_to(self.datastore_root), pathInStore) 

455 

456 def testFailOnConflict(self) -> None: 

457 """Re-ingesting the same data into the repository should fail.""" 

458 self._ingestRaws(transfer="symlink") 

459 self._ingestRaws(transfer="symlink") # Default is to skip. 

460 with self.assertRaises(AssertionError): 

461 self._ingestRaws(transfer="symlink", skip_existing=False) 

462 

463 def testWriteCuratedCalibrations(self) -> None: 

464 """Test that we can ingest the curated calibrations, and read them 

465 with `~lsst.obs.base.loadCamera` both before and after. 

466 """ 

467 if self.curatedCalibrationDatasetTypes is None: 

468 raise unittest.SkipTest("Class requests disabling of writeCuratedCalibrations test") 

469 

470 butler = Butler.from_config(self.root, writeable=False) 

471 self.enterContext(butler) 

472 collection = self.instrumentClass().makeCalibrationCollectionName("test") 

473 

474 # Trying to load a camera with a data ID not known to the registry 

475 # is an error, because we can't get any temporal information. 

476 with self.assertRaises(LookupError): 

477 lsst.obs.base.loadCamera(butler, {"exposure": 0}, collections=collection) 

478 

479 # Ingest raws in order to get some exposure records. 

480 self._ingestRaws(transfer="auto") 

481 

482 # Load camera should returned an unversioned camera because there's 

483 # nothing in the repo. 

484 camera, isVersioned = lsst.obs.base.loadCamera(butler, self.dataIds[0], collections=collection) 

485 self.assertFalse(isVersioned) 

486 self.assertIsInstance(camera, lsst.afw.cameraGeom.Camera) 

487 

488 self._writeCuratedCalibrations() 

489 

490 # Make a new butler instance to make sure we don't have any stale 

491 # caches (e.g. of DatasetTypes). Note that we didn't give 

492 # _writeCuratedCalibrations the butler instance we had, because it's 

493 # trying to test the CLI interface anyway. 

494 butler = Butler.from_config(self.root, writeable=False) 

495 self.enterContext(butler) 

496 

497 instrumentClass = self.instrumentClass() 

498 calibration_names = instrumentClass.getCuratedCalibrationNames() 

499 

500 for datasetTypeName in self.curatedCalibrationDatasetTypes: 

501 with self.subTest(dtype=datasetTypeName): 

502 found = list( 

503 butler.registry.queryDatasetAssociations( 

504 datasetTypeName, 

505 collections=collection, 

506 ) 

507 ) 

508 self.assertGreater(len(found), 0, f"Checking {datasetTypeName}") 

509 self.assertIn(datasetTypeName, calibration_names) 

510 

511 # Load camera should returned the versioned camera from the repo. 

512 camera, isVersioned = lsst.obs.base.loadCamera(butler, self.dataIds[0], collections=collection) 

513 self.assertTrue(isVersioned) 

514 self.assertIsInstance(camera, lsst.afw.cameraGeom.Camera) 

515 

516 def testDefineVisits(self) -> None: 

517 if not self.visits: 

518 self.skipTest("Expected visits were not defined.") 

519 self._ingestRaws(transfer="link") 

520 

521 # Check that obscore table (if configured) has correct contents. 

522 butler = Butler.from_config(self.root, run=self.outputRun) 

523 self.enterContext(butler) 

524 self._check_obscore(butler.registry, has_visits=False) 

525 

526 # Calling defineVisits tests the implementation of the butler command 

527 # line interface "define-visits" subcommand. Functions in the script 

528 # folder are generally considered protected and should not be used 

529 # as public api. 

530 script.defineVisits( 

531 self.root, 

532 config_file=None, 

533 collections=[self.outputRun], 

534 instrument=self.instrumentName, 

535 ) 

536 

537 # Test that we got the visits we expected. 

538 visits = butler.registry.queryDataIds(["visit"]).expanded().toSet() 

539 self.assertCountEqual(visits, self.visits.keys()) 

540 instr = Instrument.from_string(self.instrumentName, butler.registry) 

541 camera = instr.getCamera() 

542 for foundVisit, (expectedVisit, expectedExposures) in zip(visits, self.visits.items(), strict=True): 

543 # Test that this visit is associated with the expected exposures. 

544 foundExposures = ( 

545 butler.registry.queryDataIds(["exposure"], dataId=expectedVisit).expanded().toSet() 

546 ) 

547 self.assertCountEqual(foundExposures, expectedExposures) 

548 # Test that we have a visit region, and that it contains all of the 

549 # detector+visit regions. 

550 self.assertIsNotNone(foundVisit.region) 

551 detectorVisitDataIds = ( 

552 butler.registry.queryDataIds(["visit", "detector"], dataId=expectedVisit).expanded().toSet() 

553 ) 

554 self.assertEqual(len(detectorVisitDataIds), len(camera)) 

555 for dataId in detectorVisitDataIds: 

556 assert isinstance(foundVisit.region, lsst.sphgeom.Region) 

557 self.assertTrue(foundVisit.region.contains(dataId.region)) 

558 

559 # Check obscore table again. 

560 self._check_obscore(butler.registry, has_visits=True) 

561 

562 def _check_obscore(self, registry: Registry, has_visits: bool) -> None: 

563 """Verify contents of obscore table.""" 

564 return