Coverage for tests / test_ingest.py: 18%

266 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:21 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import json 

23import os 

24import pickle 

25import shutil 

26import tempfile 

27import unittest 

28 

29import lsst.daf.butler.tests as butlerTests 

30from lsst.daf.butler import Butler, Config, DataCoordinate, Registry 

31from lsst.daf.butler.registry import ConflictingDefinitionError 

32from lsst.obs.base import RawIngestTask 

33from lsst.obs.base.ingest_tests import IngestTestBase 

34from lsst.obs.base.instrument_tests import DummyCam 

35from lsst.utils.introspection import get_full_type_name 

36 

37TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

38INGESTDIR = os.path.join(TESTDIR, "data", "ingest") 

39 

40 

41class RawIngestTestCase(IngestTestBase, unittest.TestCase): 

42 """Test ingest using JSON sidecar files.""" 

43 

44 ingestDatasetTypeName = "raw_dict" 

45 rawIngestTask = get_full_type_name(RawIngestTask) 

46 curatedCalibrationDatasetTypes = ("testCalib",) 

47 ingestDir = TESTDIR 

48 instrumentClassName = "lsst.obs.base.instrument_tests.DummyCam" 

49 file = os.path.join(INGESTDIR, "sidecar_data", "dataset_1.yaml") 

50 dataIds = [{"instrument": "DummyCam", "exposure": 100, "detector": 0}] 

51 seed_config = os.path.join(TESTDIR, "data", "curated", "seed.yaml") 

52 

53 @property 

54 def visits(self): 

55 with Butler.from_config(self.root, collections=[self.outputRun]) as butler: 

56 return { 

57 DataCoordinate.standardize(instrument="DummyCam", visit=100, universe=butler.dimensions): [ 

58 DataCoordinate.standardize( 

59 instrument="DummyCam", exposure=100, universe=butler.dimensions 

60 ) 

61 ] 

62 } 

63 

64 def testWriteCuratedCalibrations(self): 

65 # Inject the "data package" location. 

66 DummyCam.dataPackageDir = os.path.join(TESTDIR, "data", "curated") 

67 return super().testWriteCuratedCalibrations() 

68 

69 def _check_obscore(self, registry: Registry, has_visits: bool) -> None: 

70 # Docstring inherited from base class. 

71 assert registry.obsCoreTableManager is not None 

72 with registry.obsCoreTableManager.query(lsst_run=self.outputRun) as result: 

73 rows = list(result) 

74 self.assertEqual(len(rows), 1) 

75 row = rows[0] 

76 

77 # No spatial information until visits are defined 

78 if not has_visits: 

79 self.assertIsNone(row.s_ra) 

80 self.assertIsNone(row.s_dec) 

81 self.assertIsNone(row.s_fov) 

82 self.assertIsNone(row.s_region) 

83 else: 

84 self.assertIsNotNone(row.s_ra) 

85 self.assertIsNotNone(row.s_dec) 

86 self.assertIsNotNone(row.s_fov) 

87 self.assertRegex(row.s_region, "POLYGON ICRS .*") 

88 

89 

90class RawIngestImpliedIndexTestCase(RawIngestTestCase): 

91 """Test ingest using JSON index files.""" 

92 

93 file = os.path.join(INGESTDIR, "indexed_data", "dataset_1.yaml") 

94 

95 

96class RawIngestEdgeCaseTestCase(unittest.TestCase): 

97 """Test ingest using non-standard approaches including failures. 

98 

99 Must create a new butler for each test because dimension records are 

100 globals. 

101 """ 

102 

103 def setUp(self): 

104 butlerConfig = """ 

105datastore: 

106 # Want to ingest real files so can't use in-memory datastore 

107 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore 

108""" 

109 self.root = tempfile.mkdtemp(dir=TESTDIR) 

110 self.creatorButler = butlerTests.makeTestRepo(self.root, {}, config=Config.fromYaml(butlerConfig)) 

111 self.enterContext(self.creatorButler) 

112 DummyCam().register(self.creatorButler.registry) 

113 

114 self.butler = butlerTests.makeTestCollection(self.creatorButler, uniqueId=self.id()) 

115 self.enterContext(self.butler) 

116 self.outputRun = self.butler.run 

117 

118 config = RawIngestTask.ConfigClass() 

119 self.task = RawIngestTask(config=config, butler=self.butler) 

120 

121 # Different test files. 

122 self.bad_metadata_file = os.path.join(TESTDIR, "data", "small.fits") 

123 self.good_file = os.path.join(INGESTDIR, "sidecar_data", "dataset_2.yaml") 

124 self.bad_instrument_file = os.path.join(TESTDIR, "data", "calexp.fits") 

125 

126 def tearDown(self): 

127 if self.root is not None: 

128 shutil.rmtree(self.root, ignore_errors=True) 

129 

130 def testSimpleIngest(self): 

131 # Use the default per-instrument run for this. 

132 self.task.run([self.good_file]) 

133 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections="DummyCam/raw/all")) 

134 self.assertEqual(len(datasets), 1) 

135 self.assertGreater(self.task.metrics.time_for_ingest, 0.0) 

136 

137 # Now parallelized. 

138 files = [self.good_file, os.path.join(INGESTDIR, "sidecar_data", "dataset_1.yaml")] 

139 self.task.run(files, num_workers=2, run=self.outputRun) 

140 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun)) 

141 self.assertEqual(len(datasets), 2) 

142 

143 def testTimeStampWarning(self): 

144 # Now ingest a dataset which should generate a warning because of 

145 # the end time being before the begin time. 

146 files = [os.path.join(INGESTDIR, "sidecar_data", "dataset_end.yaml")] 

147 with self.assertLogs("lsst.obs.base._instrument", level="WARNING") as cm: 

148 self.task.run(files, run=self.outputRun) 

149 

150 self.assertIn("has end time before begin time", cm.output[0]) 

151 records = list( 

152 self.butler.registry.queryDimensionRecords( 

153 "exposure", 

154 where="exposure = exp AND instrument = inst", 

155 bind={"exp": 3000, "inst": "DummyCam"}, 

156 ) 

157 ) 

158 record = records[0] 

159 timespan = record.timespan 

160 self.assertEqual(timespan.begin.isot, timespan.end.isot) 

161 

162 def testExplicitIndex(self): 

163 files = [os.path.join(INGESTDIR, "indexed_data", "_index.json")] 

164 self.task.run(files, run=self.outputRun) 

165 

166 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun)) 

167 self.assertEqual(len(datasets), 2) 

168 

169 # Try again with an explicit index and a file that is in that index. 

170 files.append(os.path.join(INGESTDIR, "indexed_data", "dataset_2.yaml")) 

171 new_run = self.outputRun + "b" 

172 self.task.run(files, run=new_run) 

173 

174 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun)) 

175 self.assertEqual(len(datasets), 2) 

176 

177 # Now with two index files that point to the same files. 

178 # Look for the warning from duplication. 

179 files = [ 

180 os.path.join(INGESTDIR, "indexed_data", "_index.json"), 

181 os.path.join(INGESTDIR, "indexed_data", "translated_subdir", "_index.json"), 

182 ] 

183 new_run = self.outputRun + "c" 

184 

185 with self.assertLogs(level="WARNING") as cm: 

186 self.task.run(files, run=new_run) 

187 self.assertIn("already specified in an index file, ignoring content", cm.output[0]) 

188 

189 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun)) 

190 self.assertEqual(len(datasets), 2) 

191 

192 # Again with an index file of metadata and one of translated. 

193 # Translated should win. 

194 # Put the metadata one first to test that order is preserved. 

195 files = [ 

196 os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json"), 

197 os.path.join(INGESTDIR, "indexed_data", "_index.json"), 

198 ] 

199 new_run = self.outputRun + "d" 

200 with self.assertLogs(level="WARNING") as cm: 

201 self.task.run(files, run=new_run) 

202 self.assertIn("already specified in an index file but overriding", cm.output[0]) 

203 

204 # Reversing the order should change the warning. 

205 # Again with an index file of metadata and one of translated. 

206 # Translated should win. 

207 # Put the metadata one first to test that order is preserved. 

208 files = [ 

209 os.path.join(INGESTDIR, "indexed_data", "_index.json"), 

210 os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json"), 

211 ] 

212 

213 new_run = self.outputRun + "e" 

214 with self.assertLogs(level="WARNING") as cm: 

215 self.task.run(files, run=new_run) 

216 self.assertIn("already specified in an index file, ignoring", cm.output[0]) 

217 

218 # Bad index file. 

219 files = [os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")] 

220 with self.assertRaises(RuntimeError): 

221 self.task.run(files, run=self.outputRun) 

222 

223 # Bad index file due to bad instrument. 

224 files = [os.path.join(INGESTDIR, "indexed_data", "bad_instrument", "_index.json")] 

225 with self.assertLogs(level="WARNING") as cm: 

226 with self.assertRaises(RuntimeError): 

227 self.task.run(files, run=self.outputRun) 

228 self.assertIn("Instrument HSC for file", cm.output[0]) 

229 

230 def testBadExposure(self): 

231 """Test that bad exposures trigger the correct failure modes. 

232 

233 This is the only test that uses the bad definition of dataset 4 

234 because exposure definitions are defined globally in a butler registry. 

235 """ 

236 # Ingest 3 files. 2 of them will implicitly find an index and one 

237 # will use a sidecar. 

238 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 2, 3)] 

239 new_run = self.outputRun 

240 self.task.run(files, run=new_run) 

241 

242 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=new_run)) 

243 self.assertEqual(len(datasets), 3) 

244 

245 # Test fail fast. 

246 self.task.config.failFast = True 

247 

248 # Ingest files with conflicting exposure definitions. 

249 # Ingest 3 files. One of them will implicitly find an index and one 

250 # will use a sidecar. The 3rd will fail due to exposure conflict. 

251 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 3, 4)] 

252 new_run = self.outputRun + "_bad_exposure" 

253 with self.assertRaises(ConflictingDefinitionError): 

254 self.task.run(files, run=new_run) 

255 

256 def testBadFile(self): 

257 """Try to ingest a bad file.""" 

258 files = [self.bad_metadata_file] 

259 

260 with self.assertRaises(RuntimeError) as cm: 

261 # Default is to raise an error at the end. 

262 self.task.run(files, run=self.outputRun) 

263 self.assertIn("Some failures", str(cm.exception)) 

264 

265 # Including a good file will result in ingest working but still 

266 # raises (we might want to move this to solely happen in the 

267 # command line invocation). 

268 files.append(self.good_file) 

269 

270 # Also include a file with unknown instrument. 

271 files.append(self.bad_instrument_file) 

272 

273 with self.assertRaises(RuntimeError): 

274 self.task.run(files, run=self.outputRun) 

275 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun)) 

276 self.assertEqual(len(datasets), 1) 

277 

278 # Fail fast will trigger a run time error with different text. 

279 # Use a different output run to be sure we are not failing because 

280 # of the attempt to ingest twice. 

281 self.task.config.failFast = True 

282 new_run = self.outputRun + "b" 

283 with self.assertRaises(RuntimeError) as cm: 

284 self.task.run([self.bad_metadata_file, self.good_file], run=new_run) 

285 self.assertIn("Problem extracting metadata", str(cm.exception)) 

286 

287 # Attempt to ingest good file again -- this will fail for a different 

288 # reason than failed metadata extraction. 

289 with self.assertRaises(ConflictingDefinitionError): 

290 self.task.run([self.good_file], run=self.outputRun) 

291 

292 # Ingest a file with good metadata but unknown instrument. 

293 with self.assertRaises(RuntimeError) as cm: 

294 self.task.run([self.bad_instrument_file], run=self.outputRun) 

295 self.assertIn("Instrument HSC", str(cm.exception)) 

296 

297 # Ingest of a metadata index file that will fail translation. 

298 with self.assertRaises(RuntimeError) as cm: 

299 self.task.run([os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json")]) 

300 self.assertIn("Problem extracting metadata", str(cm.exception)) 

301 

302 # Ingest of a bad index file. 

303 with self.assertRaises(RuntimeError) as cm: 

304 self.task.run([os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")]) 

305 self.assertIn("Problem reading index file", str(cm.exception)) 

306 

307 # Ingest of an implied bad index file. 

308 with self.assertRaises(RuntimeError) as cm: 

309 self.task.run([os.path.join(INGESTDIR, "indexed_data", "bad_implied", "dataset_2.yaml")]) 

310 

311 def testCallbacks(self): 

312 """Test the callbacks for failures.""" 

313 # Define the callbacks. 

314 metadata_failures = [] 

315 successes = [] 

316 ingest_failures = [] 

317 records = [] 

318 

319 def on_metadata_failure(filename, exc): 

320 metadata_failures.append(filename) 

321 

322 def on_success(datasets): 

323 successes.append(datasets) 

324 

325 def on_exposure_record(record): 

326 records.append(record) 

327 

328 def on_ingest_failure(exposure, exc): 

329 ingest_failures.append(exposure) 

330 

331 # Need our own task instance 

332 config = RawIngestTask.ConfigClass() 

333 self.task = RawIngestTask( 

334 config=config, 

335 butler=self.butler, 

336 on_metadata_failure=on_metadata_failure, 

337 on_success=on_success, 

338 on_exposure_record=on_exposure_record, 

339 on_ingest_failure=on_ingest_failure, 

340 ) 

341 

342 files = [self.good_file, self.bad_metadata_file, self.bad_instrument_file] 

343 

344 with self.assertRaises(RuntimeError): 

345 self.task.run(files, run=self.outputRun) 

346 

347 self.assertEqual(len(successes), 1) 

348 self.assertEqual(len(records), 1) 

349 self.assertEqual(len(metadata_failures), 2) 

350 self.assertEqual(len(ingest_failures), 0) 

351 

352 # Try the good one a second time. 

353 with self.assertRaises(RuntimeError): 

354 self.task.run([self.good_file], run=self.outputRun) 

355 

356 self.assertEqual(len(successes), 1) 

357 self.assertEqual(len(ingest_failures), 1) 

358 

359 # An index file with metadata that won't translate. 

360 metadata_failures[:] = [] 

361 files = [os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json")] 

362 with self.assertRaises(RuntimeError): 

363 self.task.run(files, run=self.outputRun) 

364 self.assertEqual(len(metadata_failures), 2) 

365 

366 # Bad index file. 

367 metadata_failures[:] = [] 

368 files = [os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")] 

369 with self.assertRaises(RuntimeError): 

370 self.task.run(files, run=self.outputRun) 

371 self.assertEqual(len(metadata_failures), 1) 

372 

373 # Ingest two files that have conflicting exposure metadata. 

374 ingest_failures[:] = [] 

375 successes[:] = [] 

376 # Ingest 4 files. 2 of them will implicitly find an index and one 

377 # will use a sidecar. The 4th will fail due to exposure conflict. 

378 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 2, 3, 4)] 

379 new_run = self.outputRun + "_fail" 

380 with self.assertRaises(RuntimeError): 

381 self.task.run(files, run=new_run) 

382 self.assertEqual(len(ingest_failures), 1) 

383 self.assertEqual(len(successes), 3) 

384 

385 def testSkipExistingExposures(self): 

386 """Test that skip_existing_exposures=True avoids exceptions from trying 

387 to ingest the same file twice. 

388 

389 Notes 

390 ----- 

391 This option also prevents not-ingested-yet raws from being ingested 

392 when exposure already exists, but that's (A) hard to test given the 

393 test data we have now and (B) not really ideal behavior, just behavior 

394 we can live with in order to have a way to avoid keep duplicate ingests 

395 from being an error. 

396 """ 

397 # Ingest the first time. 

398 self.task.run([self.good_file], run=self.outputRun) 

399 # Attempt to ingest a second time with skip_existing_exposures=False 

400 # (default). This should fail. 

401 with self.assertRaises(RuntimeError): 

402 self.task.run([self.good_file], run=self.outputRun) 

403 # Try again with `skip_existing_exposures=True. 

404 self.task.run([self.good_file], run=self.outputRun, skip_existing_exposures=True) 

405 

406 # Now skip ingest completely. 

407 self.task.run([self.good_file], run=self.outputRun, skip_ingest=True) 

408 

409 def testUpdateExposureRecords(self): 

410 """Test that update_exposure_records=True allows metadata to be 

411 modified. 

412 """ 

413 config = RawIngestTask.ConfigClass(failFast=True) 

414 task = RawIngestTask(config=config, butler=self.butler) 

415 with open(os.path.join(INGESTDIR, "sidecar_data", "dataset_1.json")) as file: 

416 metadata = json.load(file) 

417 # Modify unique identifiers to avoid clashes with ingests from 

418 # other test methods in this test case, because those share a a 

419 # data repository. 

420 metadata["observation_id"] = "DummyDataset_testUpdateExposureRecords" 

421 metadata["observation_counter"] = 10 

422 metadata["exposure_id"] = 500 

423 metadata["exposure_group"] = "50" 

424 metadata["visit_id"] = 500 

425 base_filename = "dataset" 

426 try: 

427 # Copy the original file to be ingested (.yaml) to a temporary 

428 # directory, and write the new metadata next to it. 

429 tmp_dir = tempfile.mkdtemp(dir=TESTDIR) 

430 raw_filename = os.path.join(tmp_dir, f"{base_filename}.yaml") 

431 sidecar_filename = os.path.join(tmp_dir, f"{base_filename}.json") 

432 shutil.copy(self.good_file, raw_filename) 

433 with open(sidecar_filename, "w") as sidecar_file: 

434 json.dump(metadata, sidecar_file) 

435 task.run([raw_filename], run=self.outputRun) 

436 (record1,) = set( 

437 self.butler.registry.queryDimensionRecords("exposure", instrument="DummyCam", exposure=500) 

438 ) 

439 self.assertEqual(record1.exposure_time, metadata["exposure_time"]) 

440 # Modify some metadata and repeat the process to update the 

441 # exposure. 

442 metadata["exposure_time"] *= 2.0 

443 metadata["exposure_time_requested"] *= 2.0 

444 with open(sidecar_filename, "w") as sidecar_file: 

445 json.dump(metadata, sidecar_file) 

446 task.run( 

447 [raw_filename], run=self.outputRun, skip_existing_exposures=True, update_exposure_records=True 

448 ) 

449 (record2,) = set( 

450 self.butler.registry.queryDimensionRecords("exposure", instrument="DummyCam", exposure=500) 

451 ) 

452 self.assertEqual(record2.exposure_time, record1.exposure_time * 2) 

453 finally: 

454 shutil.rmtree(tmp_dir, ignore_errors=True) 

455 

456 

457class TestRawIngestTaskPickle(unittest.TestCase): 

458 """Test that pickling of the RawIngestTask works properly.""" 

459 

460 @classmethod 

461 def setUpClass(cls): 

462 cls.root = tempfile.mkdtemp(dir=TESTDIR) 

463 cls.creatorButler = butlerTests.makeTestRepo(cls.root, {}) 

464 

465 @classmethod 

466 def tearDownClass(cls): 

467 if cls.root is not None: 

468 shutil.rmtree(cls.root, ignore_errors=True) 

469 

470 def setUp(self): 

471 self.butler = butlerTests.makeTestCollection(self.creatorButler, uniqueId=self.id()) 

472 

473 self.config = RawIngestTask.ConfigClass() 

474 self.config.transfer = "copy" # safe non-default value 

475 self.task = RawIngestTask(config=self.config, butler=self.butler) 

476 

477 def testPickleTask(self): 

478 stream = pickle.dumps(self.task) 

479 copy = pickle.loads(stream) 

480 self.enterContext(copy.butler) 

481 self.assertEqual(self.task.getFullName(), copy.getFullName()) 

482 self.assertEqual(self.task.log.name, copy.log.name) 

483 self.assertEqual(self.task.config, copy.config) 

484 self.assertEqual(self.task.butler._config, copy.butler._config) 

485 self.assertEqual(list(self.task.butler.collections.defaults), list(copy.butler.collections.defaults)) 

486 self.assertEqual(self.task.butler.run, copy.butler.run) 

487 self.assertEqual(self.task.universe, copy.universe) 

488 self.assertEqual(self.task.datasetType, copy.datasetType) 

489 

490 

491if __name__ == "__main__": 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 unittest.main()