Coverage for tests / test_ingest.py: 18%
266 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:02 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:02 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import json
23import os
24import pickle
25import shutil
26import tempfile
27import unittest
29import lsst.daf.butler.tests as butlerTests
30from lsst.daf.butler import Butler, Config, DataCoordinate, Registry
31from lsst.daf.butler.registry import ConflictingDefinitionError
32from lsst.obs.base import RawIngestTask
33from lsst.obs.base.ingest_tests import IngestTestBase
34from lsst.obs.base.instrument_tests import DummyCam
35from lsst.utils.introspection import get_full_type_name
37TESTDIR = os.path.abspath(os.path.dirname(__file__))
38INGESTDIR = os.path.join(TESTDIR, "data", "ingest")
41class RawIngestTestCase(IngestTestBase, unittest.TestCase):
42 """Test ingest using JSON sidecar files."""
44 ingestDatasetTypeName = "raw_dict"
45 rawIngestTask = get_full_type_name(RawIngestTask)
46 curatedCalibrationDatasetTypes = ("testCalib",)
47 ingestDir = TESTDIR
48 instrumentClassName = "lsst.obs.base.instrument_tests.DummyCam"
49 file = os.path.join(INGESTDIR, "sidecar_data", "dataset_1.yaml")
50 dataIds = [{"instrument": "DummyCam", "exposure": 100, "detector": 0}]
51 seed_config = os.path.join(TESTDIR, "data", "curated", "seed.yaml")
53 @property
54 def visits(self):
55 with Butler.from_config(self.root, collections=[self.outputRun]) as butler:
56 return {
57 DataCoordinate.standardize(instrument="DummyCam", visit=100, universe=butler.dimensions): [
58 DataCoordinate.standardize(
59 instrument="DummyCam", exposure=100, universe=butler.dimensions
60 )
61 ]
62 }
64 def testWriteCuratedCalibrations(self):
65 # Inject the "data package" location.
66 DummyCam.dataPackageDir = os.path.join(TESTDIR, "data", "curated")
67 return super().testWriteCuratedCalibrations()
69 def _check_obscore(self, registry: Registry, has_visits: bool) -> None:
70 # Docstring inherited from base class.
71 assert registry.obsCoreTableManager is not None
72 with registry.obsCoreTableManager.query(lsst_run=self.outputRun) as result:
73 rows = list(result)
74 self.assertEqual(len(rows), 1)
75 row = rows[0]
77 # No spatial information until visits are defined
78 if not has_visits:
79 self.assertIsNone(row.s_ra)
80 self.assertIsNone(row.s_dec)
81 self.assertIsNone(row.s_fov)
82 self.assertIsNone(row.s_region)
83 else:
84 self.assertIsNotNone(row.s_ra)
85 self.assertIsNotNone(row.s_dec)
86 self.assertIsNotNone(row.s_fov)
87 self.assertRegex(row.s_region, "POLYGON ICRS .*")
90class RawIngestImpliedIndexTestCase(RawIngestTestCase):
91 """Test ingest using JSON index files."""
93 file = os.path.join(INGESTDIR, "indexed_data", "dataset_1.yaml")
96class RawIngestEdgeCaseTestCase(unittest.TestCase):
97 """Test ingest using non-standard approaches including failures.
99 Must create a new butler for each test because dimension records are
100 globals.
101 """
103 def setUp(self):
104 butlerConfig = """
105datastore:
106 # Want to ingest real files so can't use in-memory datastore
107 cls: lsst.daf.butler.datastores.fileDatastore.FileDatastore
108"""
109 self.root = tempfile.mkdtemp(dir=TESTDIR)
110 self.creatorButler = butlerTests.makeTestRepo(self.root, {}, config=Config.fromYaml(butlerConfig))
111 self.enterContext(self.creatorButler)
112 DummyCam().register(self.creatorButler.registry)
114 self.butler = butlerTests.makeTestCollection(self.creatorButler, uniqueId=self.id())
115 self.enterContext(self.butler)
116 self.outputRun = self.butler.run
118 config = RawIngestTask.ConfigClass()
119 self.task = RawIngestTask(config=config, butler=self.butler)
121 # Different test files.
122 self.bad_metadata_file = os.path.join(TESTDIR, "data", "small.fits")
123 self.good_file = os.path.join(INGESTDIR, "sidecar_data", "dataset_2.yaml")
124 self.bad_instrument_file = os.path.join(TESTDIR, "data", "calexp.fits")
126 def tearDown(self):
127 if self.root is not None:
128 shutil.rmtree(self.root, ignore_errors=True)
130 def testSimpleIngest(self):
131 # Use the default per-instrument run for this.
132 self.task.run([self.good_file])
133 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections="DummyCam/raw/all"))
134 self.assertEqual(len(datasets), 1)
135 self.assertGreater(self.task.metrics.time_for_ingest, 0.0)
137 # Now parallelized.
138 files = [self.good_file, os.path.join(INGESTDIR, "sidecar_data", "dataset_1.yaml")]
139 self.task.run(files, num_workers=2, run=self.outputRun)
140 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun))
141 self.assertEqual(len(datasets), 2)
143 def testTimeStampWarning(self):
144 # Now ingest a dataset which should generate a warning because of
145 # the end time being before the begin time.
146 files = [os.path.join(INGESTDIR, "sidecar_data", "dataset_end.yaml")]
147 with self.assertLogs("lsst.obs.base._instrument", level="WARNING") as cm:
148 self.task.run(files, run=self.outputRun)
150 self.assertIn("has end time before begin time", cm.output[0])
151 records = list(
152 self.butler.registry.queryDimensionRecords(
153 "exposure",
154 where="exposure = exp AND instrument = inst",
155 bind={"exp": 3000, "inst": "DummyCam"},
156 )
157 )
158 record = records[0]
159 timespan = record.timespan
160 self.assertEqual(timespan.begin.isot, timespan.end.isot)
162 def testExplicitIndex(self):
163 files = [os.path.join(INGESTDIR, "indexed_data", "_index.json")]
164 self.task.run(files, run=self.outputRun)
166 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun))
167 self.assertEqual(len(datasets), 2)
169 # Try again with an explicit index and a file that is in that index.
170 files.append(os.path.join(INGESTDIR, "indexed_data", "dataset_2.yaml"))
171 new_run = self.outputRun + "b"
172 self.task.run(files, run=new_run)
174 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun))
175 self.assertEqual(len(datasets), 2)
177 # Now with two index files that point to the same files.
178 # Look for the warning from duplication.
179 files = [
180 os.path.join(INGESTDIR, "indexed_data", "_index.json"),
181 os.path.join(INGESTDIR, "indexed_data", "translated_subdir", "_index.json"),
182 ]
183 new_run = self.outputRun + "c"
185 with self.assertLogs(level="WARNING") as cm:
186 self.task.run(files, run=new_run)
187 self.assertIn("already specified in an index file, ignoring content", cm.output[0])
189 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun))
190 self.assertEqual(len(datasets), 2)
192 # Again with an index file of metadata and one of translated.
193 # Translated should win.
194 # Put the metadata one first to test that order is preserved.
195 files = [
196 os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json"),
197 os.path.join(INGESTDIR, "indexed_data", "_index.json"),
198 ]
199 new_run = self.outputRun + "d"
200 with self.assertLogs(level="WARNING") as cm:
201 self.task.run(files, run=new_run)
202 self.assertIn("already specified in an index file but overriding", cm.output[0])
204 # Reversing the order should change the warning.
205 # Again with an index file of metadata and one of translated.
206 # Translated should win.
207 # Put the metadata one first to test that order is preserved.
208 files = [
209 os.path.join(INGESTDIR, "indexed_data", "_index.json"),
210 os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json"),
211 ]
213 new_run = self.outputRun + "e"
214 with self.assertLogs(level="WARNING") as cm:
215 self.task.run(files, run=new_run)
216 self.assertIn("already specified in an index file, ignoring", cm.output[0])
218 # Bad index file.
219 files = [os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")]
220 with self.assertRaises(RuntimeError):
221 self.task.run(files, run=self.outputRun)
223 # Bad index file due to bad instrument.
224 files = [os.path.join(INGESTDIR, "indexed_data", "bad_instrument", "_index.json")]
225 with self.assertLogs(level="WARNING") as cm:
226 with self.assertRaises(RuntimeError):
227 self.task.run(files, run=self.outputRun)
228 self.assertIn("Instrument HSC for file", cm.output[0])
230 def testBadExposure(self):
231 """Test that bad exposures trigger the correct failure modes.
233 This is the only test that uses the bad definition of dataset 4
234 because exposure definitions are defined globally in a butler registry.
235 """
236 # Ingest 3 files. 2 of them will implicitly find an index and one
237 # will use a sidecar.
238 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 2, 3)]
239 new_run = self.outputRun
240 self.task.run(files, run=new_run)
242 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=new_run))
243 self.assertEqual(len(datasets), 3)
245 # Test fail fast.
246 self.task.config.failFast = True
248 # Ingest files with conflicting exposure definitions.
249 # Ingest 3 files. One of them will implicitly find an index and one
250 # will use a sidecar. The 3rd will fail due to exposure conflict.
251 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 3, 4)]
252 new_run = self.outputRun + "_bad_exposure"
253 with self.assertRaises(ConflictingDefinitionError):
254 self.task.run(files, run=new_run)
256 def testBadFile(self):
257 """Try to ingest a bad file."""
258 files = [self.bad_metadata_file]
260 with self.assertRaises(RuntimeError) as cm:
261 # Default is to raise an error at the end.
262 self.task.run(files, run=self.outputRun)
263 self.assertIn("Some failures", str(cm.exception))
265 # Including a good file will result in ingest working but still
266 # raises (we might want to move this to solely happen in the
267 # command line invocation).
268 files.append(self.good_file)
270 # Also include a file with unknown instrument.
271 files.append(self.bad_instrument_file)
273 with self.assertRaises(RuntimeError):
274 self.task.run(files, run=self.outputRun)
275 datasets = list(self.butler.registry.queryDatasets("raw_dict", collections=self.outputRun))
276 self.assertEqual(len(datasets), 1)
278 # Fail fast will trigger a run time error with different text.
279 # Use a different output run to be sure we are not failing because
280 # of the attempt to ingest twice.
281 self.task.config.failFast = True
282 new_run = self.outputRun + "b"
283 with self.assertRaises(RuntimeError) as cm:
284 self.task.run([self.bad_metadata_file, self.good_file], run=new_run)
285 self.assertIn("Problem extracting metadata", str(cm.exception))
287 # Attempt to ingest good file again -- this will fail for a different
288 # reason than failed metadata extraction.
289 with self.assertRaises(ConflictingDefinitionError):
290 self.task.run([self.good_file], run=self.outputRun)
292 # Ingest a file with good metadata but unknown instrument.
293 with self.assertRaises(RuntimeError) as cm:
294 self.task.run([self.bad_instrument_file], run=self.outputRun)
295 self.assertIn("Instrument HSC", str(cm.exception))
297 # Ingest of a metadata index file that will fail translation.
298 with self.assertRaises(RuntimeError) as cm:
299 self.task.run([os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json")])
300 self.assertIn("Problem extracting metadata", str(cm.exception))
302 # Ingest of a bad index file.
303 with self.assertRaises(RuntimeError) as cm:
304 self.task.run([os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")])
305 self.assertIn("Problem reading index file", str(cm.exception))
307 # Ingest of an implied bad index file.
308 with self.assertRaises(RuntimeError) as cm:
309 self.task.run([os.path.join(INGESTDIR, "indexed_data", "bad_implied", "dataset_2.yaml")])
311 def testCallbacks(self):
312 """Test the callbacks for failures."""
313 # Define the callbacks.
314 metadata_failures = []
315 successes = []
316 ingest_failures = []
317 records = []
319 def on_metadata_failure(filename, exc):
320 metadata_failures.append(filename)
322 def on_success(datasets):
323 successes.append(datasets)
325 def on_exposure_record(record):
326 records.append(record)
328 def on_ingest_failure(exposure, exc):
329 ingest_failures.append(exposure)
331 # Need our own task instance
332 config = RawIngestTask.ConfigClass()
333 self.task = RawIngestTask(
334 config=config,
335 butler=self.butler,
336 on_metadata_failure=on_metadata_failure,
337 on_success=on_success,
338 on_exposure_record=on_exposure_record,
339 on_ingest_failure=on_ingest_failure,
340 )
342 files = [self.good_file, self.bad_metadata_file, self.bad_instrument_file]
344 with self.assertRaises(RuntimeError):
345 self.task.run(files, run=self.outputRun)
347 self.assertEqual(len(successes), 1)
348 self.assertEqual(len(records), 1)
349 self.assertEqual(len(metadata_failures), 2)
350 self.assertEqual(len(ingest_failures), 0)
352 # Try the good one a second time.
353 with self.assertRaises(RuntimeError):
354 self.task.run([self.good_file], run=self.outputRun)
356 self.assertEqual(len(successes), 1)
357 self.assertEqual(len(ingest_failures), 1)
359 # An index file with metadata that won't translate.
360 metadata_failures[:] = []
361 files = [os.path.join(INGESTDIR, "indexed_data", "metadata_subdir", "_index.json")]
362 with self.assertRaises(RuntimeError):
363 self.task.run(files, run=self.outputRun)
364 self.assertEqual(len(metadata_failures), 2)
366 # Bad index file.
367 metadata_failures[:] = []
368 files = [os.path.join(INGESTDIR, "indexed_data", "bad_index", "_index.json")]
369 with self.assertRaises(RuntimeError):
370 self.task.run(files, run=self.outputRun)
371 self.assertEqual(len(metadata_failures), 1)
373 # Ingest two files that have conflicting exposure metadata.
374 ingest_failures[:] = []
375 successes[:] = []
376 # Ingest 4 files. 2 of them will implicitly find an index and one
377 # will use a sidecar. The 4th will fail due to exposure conflict.
378 files = [os.path.join(INGESTDIR, "indexed_data", f"dataset_{n}.yaml") for n in (1, 2, 3, 4)]
379 new_run = self.outputRun + "_fail"
380 with self.assertRaises(RuntimeError):
381 self.task.run(files, run=new_run)
382 self.assertEqual(len(ingest_failures), 1)
383 self.assertEqual(len(successes), 3)
385 def testSkipExistingExposures(self):
386 """Test that skip_existing_exposures=True avoids exceptions from trying
387 to ingest the same file twice.
389 Notes
390 -----
391 This option also prevents not-ingested-yet raws from being ingested
392 when exposure already exists, but that's (A) hard to test given the
393 test data we have now and (B) not really ideal behavior, just behavior
394 we can live with in order to have a way to avoid keep duplicate ingests
395 from being an error.
396 """
397 # Ingest the first time.
398 self.task.run([self.good_file], run=self.outputRun)
399 # Attempt to ingest a second time with skip_existing_exposures=False
400 # (default). This should fail.
401 with self.assertRaises(RuntimeError):
402 self.task.run([self.good_file], run=self.outputRun)
403 # Try again with `skip_existing_exposures=True.
404 self.task.run([self.good_file], run=self.outputRun, skip_existing_exposures=True)
406 # Now skip ingest completely.
407 self.task.run([self.good_file], run=self.outputRun, skip_ingest=True)
409 def testUpdateExposureRecords(self):
410 """Test that update_exposure_records=True allows metadata to be
411 modified.
412 """
413 config = RawIngestTask.ConfigClass(failFast=True)
414 task = RawIngestTask(config=config, butler=self.butler)
415 with open(os.path.join(INGESTDIR, "sidecar_data", "dataset_1.json")) as file:
416 metadata = json.load(file)
417 # Modify unique identifiers to avoid clashes with ingests from
418 # other test methods in this test case, because those share a a
419 # data repository.
420 metadata["observation_id"] = "DummyDataset_testUpdateExposureRecords"
421 metadata["observation_counter"] = 10
422 metadata["exposure_id"] = 500
423 metadata["exposure_group"] = "50"
424 metadata["visit_id"] = 500
425 base_filename = "dataset"
426 try:
427 # Copy the original file to be ingested (.yaml) to a temporary
428 # directory, and write the new metadata next to it.
429 tmp_dir = tempfile.mkdtemp(dir=TESTDIR)
430 raw_filename = os.path.join(tmp_dir, f"{base_filename}.yaml")
431 sidecar_filename = os.path.join(tmp_dir, f"{base_filename}.json")
432 shutil.copy(self.good_file, raw_filename)
433 with open(sidecar_filename, "w") as sidecar_file:
434 json.dump(metadata, sidecar_file)
435 task.run([raw_filename], run=self.outputRun)
436 (record1,) = set(
437 self.butler.registry.queryDimensionRecords("exposure", instrument="DummyCam", exposure=500)
438 )
439 self.assertEqual(record1.exposure_time, metadata["exposure_time"])
440 # Modify some metadata and repeat the process to update the
441 # exposure.
442 metadata["exposure_time"] *= 2.0
443 metadata["exposure_time_requested"] *= 2.0
444 with open(sidecar_filename, "w") as sidecar_file:
445 json.dump(metadata, sidecar_file)
446 task.run(
447 [raw_filename], run=self.outputRun, skip_existing_exposures=True, update_exposure_records=True
448 )
449 (record2,) = set(
450 self.butler.registry.queryDimensionRecords("exposure", instrument="DummyCam", exposure=500)
451 )
452 self.assertEqual(record2.exposure_time, record1.exposure_time * 2)
453 finally:
454 shutil.rmtree(tmp_dir, ignore_errors=True)
457class TestRawIngestTaskPickle(unittest.TestCase):
458 """Test that pickling of the RawIngestTask works properly."""
460 @classmethod
461 def setUpClass(cls):
462 cls.root = tempfile.mkdtemp(dir=TESTDIR)
463 cls.creatorButler = butlerTests.makeTestRepo(cls.root, {})
465 @classmethod
466 def tearDownClass(cls):
467 if cls.root is not None:
468 shutil.rmtree(cls.root, ignore_errors=True)
470 def setUp(self):
471 self.butler = butlerTests.makeTestCollection(self.creatorButler, uniqueId=self.id())
473 self.config = RawIngestTask.ConfigClass()
474 self.config.transfer = "copy" # safe non-default value
475 self.task = RawIngestTask(config=self.config, butler=self.butler)
477 def testPickleTask(self):
478 stream = pickle.dumps(self.task)
479 copy = pickle.loads(stream)
480 self.enterContext(copy.butler)
481 self.assertEqual(self.task.getFullName(), copy.getFullName())
482 self.assertEqual(self.task.log.name, copy.log.name)
483 self.assertEqual(self.task.config, copy.config)
484 self.assertEqual(self.task.butler._config, copy.butler._config)
485 self.assertEqual(list(self.task.butler.collections.defaults), list(copy.butler.collections.defaults))
486 self.assertEqual(self.task.butler.run, copy.butler.run)
487 self.assertEqual(self.task.universe, copy.universe)
488 self.assertEqual(self.task.datasetType, copy.datasetType)
491if __name__ == "__main__": 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 unittest.main()