Coverage for tests / test_headers.py: 24%
193 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:50 +0000
1# This file is part of astro_metadata_translator.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12import copy
13import os.path
14import unittest
15from collections.abc import MutableMapping
16from typing import Any, Never
18from astro_metadata_translator import (
19 DecamTranslator,
20 HscTranslator,
21 ObservationInfo,
22 fix_header,
23 merge_headers,
24)
25from astro_metadata_translator.file_helpers import read_basic_metadata_from_file
26from astro_metadata_translator.tests import read_test_file
28TESTDIR = os.path.abspath(os.path.dirname(__file__))
31class NotDecamTranslator(DecamTranslator):
32 """A DECam translator with override list of header corrections."""
34 name = None
36 @classmethod
37 def fix_header(
38 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None
39 ) -> bool:
40 header["DTSITE"] = "hi"
41 return True
43 @classmethod
44 def translator_version(cls) -> str:
45 # Hardcode a version so we can test for it
46 return "1.0.0"
49class NotDecamTranslator2(NotDecamTranslator):
50 """Similar to NotDecamTranslator but has a fixup that will break on
51 repeat.
52 """
54 name = None
56 @classmethod
57 def fix_header(
58 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None
59 ) -> bool:
60 header["DTSITE"] += "hi"
61 return True
64class AlsoNotDecamTranslator(DecamTranslator):
65 """A DECam translator with override list of header corrections
66 that fails.
67 """
69 name = None
71 @classmethod
72 def fix_header(
73 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None
74 ) -> Never:
75 raise RuntimeError("Failure to work something out from header")
78class NullDecamTranslator(DecamTranslator):
79 """A DECam translator that doesn't do any fixes."""
81 name = None
83 @classmethod
84 def fix_header(
85 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None
86 ) -> bool:
87 return False
90class HeadersTestCase(unittest.TestCase):
91 """Test header manipulation utilities."""
93 def setUp(self) -> None:
94 # Define reference headers
95 self.h1 = dict(
96 ORIGIN="LSST",
97 KEY0=0,
98 KEY1=1,
99 KEY2=3,
100 KEY3=3.1415,
101 KEY4="a",
102 )
104 self.h2 = dict(ORIGIN="LSST", KEY0="0", KEY2=4, KEY5=42)
105 self.h3 = dict(
106 ORIGIN="AUXTEL",
107 KEY3=3.1415,
108 KEY2=50,
109 KEY5=42,
110 )
111 self.h4 = dict(
112 KEY6="New",
113 KEY1="Exists",
114 )
116 # Add keys for sorting by time
117 # Sorted order: h2, h1, h4, h3
118 self.h1["MJD-OBS"] = 50000.0
119 self.h2["MJD-OBS"] = 49000.0
120 self.h3["MJD-OBS"] = 53000.0
121 self.h4["MJD-OBS"] = 52000.0
123 def test_fail(self) -> None:
124 with self.assertRaises(ValueError):
125 merge_headers([self.h1, self.h2], mode="wrong")
127 with self.assertRaises(ValueError):
128 merge_headers([])
130 def test_one(self) -> None:
131 merged = merge_headers([self.h1], mode="drop")
132 self.assertEqual(merged, self.h1)
134 def test_merging_overwrite(self) -> None:
135 merged = merge_headers([self.h1, self.h2], mode="overwrite")
136 # The merged header should be the same type as the first header
137 self.assertIsInstance(merged, type(self.h1))
139 expected = {
140 "MJD-OBS": self.h2["MJD-OBS"],
141 "ORIGIN": self.h2["ORIGIN"],
142 "KEY0": self.h2["KEY0"],
143 "KEY1": self.h1["KEY1"],
144 "KEY2": self.h2["KEY2"],
145 "KEY3": self.h1["KEY3"],
146 "KEY4": self.h1["KEY4"],
147 "KEY5": self.h2["KEY5"],
148 }
149 self.assertEqual(merged, expected)
151 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="overwrite")
153 expected = {
154 "MJD-OBS": self.h4["MJD-OBS"],
155 "ORIGIN": self.h3["ORIGIN"],
156 "KEY0": self.h2["KEY0"],
157 "KEY1": self.h4["KEY1"],
158 "KEY2": self.h3["KEY2"],
159 "KEY3": self.h3["KEY3"],
160 "KEY4": self.h1["KEY4"],
161 "KEY5": self.h3["KEY5"],
162 "KEY6": self.h4["KEY6"],
163 }
165 self.assertEqual(merged, expected)
167 def test_merging_first(self) -> None:
168 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="first")
170 expected = {
171 "MJD-OBS": self.h1["MJD-OBS"],
172 "ORIGIN": self.h1["ORIGIN"],
173 "KEY0": self.h1["KEY0"],
174 "KEY1": self.h1["KEY1"],
175 "KEY2": self.h1["KEY2"],
176 "KEY3": self.h1["KEY3"],
177 "KEY4": self.h1["KEY4"],
178 "KEY5": self.h2["KEY5"],
179 "KEY6": self.h4["KEY6"],
180 }
182 self.assertEqual(merged, expected)
184 def test_merging_drop(self) -> None:
185 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="drop")
187 expected = {
188 "KEY3": self.h1["KEY3"],
189 "KEY4": self.h1["KEY4"],
190 "KEY5": self.h2["KEY5"],
191 "KEY6": self.h4["KEY6"],
192 }
194 self.assertEqual(merged, expected)
196 # Sorting the headers should make no difference to drop mode
197 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="drop", sort=True)
198 self.assertEqual(merged, expected)
200 # Now retain some headers
201 merged = merge_headers(
202 [self.h1, self.h2, self.h3, self.h4],
203 mode="drop",
204 sort=False,
205 first=["ORIGIN"],
206 last=["KEY2", "KEY1"],
207 )
209 expected = {
210 "KEY2": self.h3["KEY2"],
211 "ORIGIN": self.h1["ORIGIN"],
212 "KEY1": self.h4["KEY1"],
213 "KEY3": self.h1["KEY3"],
214 "KEY4": self.h1["KEY4"],
215 "KEY5": self.h2["KEY5"],
216 "KEY6": self.h4["KEY6"],
217 }
218 self.assertEqual(merged, expected)
220 # Now retain some headers with sorting
221 merged = merge_headers(
222 [self.h1, self.h2, self.h3, self.h4],
223 mode="drop",
224 sort=True,
225 first=["ORIGIN"],
226 last=["KEY2", "KEY1"],
227 )
229 expected = {
230 "KEY2": self.h3["KEY2"],
231 "ORIGIN": self.h2["ORIGIN"],
232 "KEY1": self.h4["KEY1"],
233 "KEY3": self.h1["KEY3"],
234 "KEY4": self.h1["KEY4"],
235 "KEY5": self.h2["KEY5"],
236 "KEY6": self.h4["KEY6"],
237 }
238 self.assertEqual(merged, expected)
240 def test_merging_diff(self) -> None:
241 self.maxDiff = None
243 # Nothing in common for diff
244 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="diff")
246 expected = {"__DIFF__": [self.h1, self.h2, self.h3, self.h4]}
248 self.assertEqual(merged, expected)
250 # Now with a subset that does have overlap
251 merged = merge_headers([self.h1, self.h2], mode="diff")
252 expected = {
253 "ORIGIN": "LSST",
254 "__DIFF__": [
255 {k: self.h1[k] for k in ("KEY0", "KEY1", "KEY2", "KEY3", "KEY4", "MJD-OBS")},
256 {k: self.h2[k] for k in ("KEY0", "KEY2", "KEY5", "MJD-OBS")},
257 ],
258 }
259 self.assertEqual(merged, expected)
261 # Reverse to make sure there is nothing special about the first header
262 merged = merge_headers([self.h2, self.h1], mode="diff")
263 expected = {
264 "ORIGIN": "LSST",
265 "__DIFF__": [
266 {k: self.h2[k] for k in ("KEY0", "KEY2", "KEY5", "MJD-OBS")},
267 {k: self.h1[k] for k in ("KEY0", "KEY1", "KEY2", "KEY3", "KEY4", "MJD-OBS")},
268 ],
269 }
270 self.assertEqual(merged, expected)
272 # Check that identical headers have empty diff
273 merged = merge_headers([self.h1, self.h1], mode="diff")
274 expected = {
275 **self.h1,
276 "__DIFF__": [
277 {},
278 {},
279 ],
280 }
281 self.assertEqual(merged, expected)
283 def test_merging_append(self) -> None:
284 # Try with two headers first
285 merged = merge_headers([self.h1, self.h2], mode="append")
287 expected = {
288 "MJD-OBS": [self.h1["MJD-OBS"], self.h2["MJD-OBS"]],
289 "ORIGIN": self.h1["ORIGIN"],
290 "KEY0": [self.h1["KEY0"], self.h2["KEY0"]],
291 "KEY1": self.h1["KEY1"],
292 "KEY2": [self.h1["KEY2"], self.h2["KEY2"]],
293 "KEY3": self.h1["KEY3"],
294 "KEY4": self.h1["KEY4"],
295 "KEY5": self.h2["KEY5"],
296 }
298 self.assertEqual(merged, expected)
300 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="append")
302 expected = {
303 "MJD-OBS": [self.h1["MJD-OBS"], self.h2["MJD-OBS"], self.h3["MJD-OBS"], self.h4["MJD-OBS"]],
304 "ORIGIN": [self.h1["ORIGIN"], self.h2["ORIGIN"], self.h3["ORIGIN"], None],
305 "KEY0": [self.h1["KEY0"], self.h2["KEY0"], None, None],
306 "KEY1": [self.h1["KEY1"], None, None, self.h4["KEY1"]],
307 "KEY2": [self.h1["KEY2"], self.h2["KEY2"], self.h3["KEY2"], None],
308 "KEY3": self.h3["KEY3"],
309 "KEY4": self.h1["KEY4"],
310 "KEY5": self.h3["KEY5"],
311 "KEY6": self.h4["KEY6"],
312 }
314 self.assertEqual(merged, expected)
316 def test_merging_overwrite_sort(self) -> None:
317 merged = merge_headers([self.h1, self.h2], mode="overwrite", sort=True)
319 expected = {
320 "MJD-OBS": self.h1["MJD-OBS"],
321 "ORIGIN": self.h1["ORIGIN"],
322 "KEY0": self.h1["KEY0"],
323 "KEY1": self.h1["KEY1"],
324 "KEY2": self.h1["KEY2"],
325 "KEY3": self.h1["KEY3"],
326 "KEY4": self.h1["KEY4"],
327 "KEY5": self.h2["KEY5"],
328 }
329 self.assertEqual(merged, expected)
331 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="overwrite", sort=True)
333 expected = {
334 "MJD-OBS": self.h3["MJD-OBS"],
335 "ORIGIN": self.h3["ORIGIN"],
336 "KEY0": self.h1["KEY0"],
337 "KEY1": self.h4["KEY1"],
338 "KEY2": self.h3["KEY2"],
339 "KEY3": self.h3["KEY3"],
340 "KEY4": self.h1["KEY4"],
341 "KEY5": self.h3["KEY5"],
342 "KEY6": self.h4["KEY6"],
343 }
345 self.assertEqual(merged, expected)
347 # Changing the order should not change the result
348 merged = merge_headers([self.h4, self.h1, self.h3, self.h2], mode="overwrite", sort=True)
350 self.assertEqual(merged, expected)
352 def test_merging_first_sort(self) -> None:
353 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="first", sort=True)
355 expected = {
356 "MJD-OBS": self.h2["MJD-OBS"],
357 "ORIGIN": self.h2["ORIGIN"],
358 "KEY0": self.h2["KEY0"],
359 "KEY1": self.h1["KEY1"],
360 "KEY2": self.h2["KEY2"],
361 "KEY3": self.h1["KEY3"],
362 "KEY4": self.h1["KEY4"],
363 "KEY5": self.h2["KEY5"],
364 "KEY6": self.h4["KEY6"],
365 }
367 self.assertEqual(merged, expected)
369 def test_merging_append_sort(self) -> None:
370 # Try with two headers first
371 merged = merge_headers([self.h1, self.h2], mode="append", sort=True)
373 expected = {
374 "MJD-OBS": [self.h2["MJD-OBS"], self.h1["MJD-OBS"]],
375 "ORIGIN": self.h1["ORIGIN"],
376 "KEY0": [self.h2["KEY0"], self.h1["KEY0"]],
377 "KEY1": self.h1["KEY1"],
378 "KEY2": [self.h2["KEY2"], self.h1["KEY2"]],
379 "KEY3": self.h1["KEY3"],
380 "KEY4": self.h1["KEY4"],
381 "KEY5": self.h2["KEY5"],
382 }
384 self.assertEqual(merged, expected)
386 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="append", sort=True)
388 expected = {
389 "MJD-OBS": [self.h2["MJD-OBS"], self.h1["MJD-OBS"], self.h4["MJD-OBS"], self.h3["MJD-OBS"]],
390 "ORIGIN": [self.h2["ORIGIN"], self.h1["ORIGIN"], None, self.h3["ORIGIN"]],
391 "KEY0": [self.h2["KEY0"], self.h1["KEY0"], None, None],
392 "KEY1": [None, self.h1["KEY1"], self.h4["KEY1"], None],
393 "KEY2": [self.h2["KEY2"], self.h1["KEY2"], None, self.h3["KEY2"]],
394 "KEY3": self.h3["KEY3"],
395 "KEY4": self.h1["KEY4"],
396 "KEY5": self.h3["KEY5"],
397 "KEY6": self.h4["KEY6"],
398 }
400 self.assertEqual(merged, expected)
402 # Order should not matter
403 merged = merge_headers([self.h4, self.h3, self.h2, self.h1], mode="append", sort=True)
404 self.assertEqual(merged, expected)
406 def test_stripped_header_after_mutation(self) -> None:
407 """Check stripping tolerates changes to the original header."""
408 header = read_test_file("fitsheader-hsc.yaml", dir=os.path.join(TESTDIR, "data"))
409 info = ObservationInfo(header, pedantic=False)
410 used = info.cards_used
411 self.assertTrue(used)
413 # Mutate original header after translation.
414 key = next(iter(used))
415 header.pop(key, None)
417 # Should not raise if cards_used include keys no longer present.
418 stripped = info.stripped_header()
419 self.assertNotIn(key, stripped)
422class FixHeadersTestCase(unittest.TestCase):
423 """Test header fix up."""
425 def test_basic_fix_header(self) -> None:
426 """Test that a header can be fixed if we specify a local path."""
427 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data"))
428 self.assertEqual(header["DETECTOR"], "S3-111_107419-8-3")
430 # First fix header but using no search path (should work as no-op)
431 fixed = fix_header(copy.copy(header), translator_class=NullDecamTranslator)
432 self.assertFalse(fixed)
434 # Now using the test corrections directory
435 header2 = copy.copy(header)
436 fixed = fix_header(
437 header2,
438 search_path=os.path.join(TESTDIR, "data", "corrections"),
439 translator_class=NullDecamTranslator,
440 )
441 self.assertTrue(fixed)
442 self.assertEqual(header2["DETECTOR"], "NEW-ID")
444 # Now with a corrections directory that has bad YAML in it
445 header2 = copy.copy(header)
446 with self.assertLogs(level="WARN"):
447 fixed = fix_header(
448 header2,
449 search_path=os.path.join(TESTDIR, "data", "bad_corrections"),
450 translator_class=NullDecamTranslator,
451 )
452 self.assertFalse(fixed)
454 # Test that fix_header of unknown header is allowed
455 header = {"SOMETHING": "UNKNOWN"}
456 fixed = fix_header(copy.copy(header), translator_class=NullDecamTranslator)
457 self.assertFalse(fixed)
459 def test_hsc_fix_header(self) -> None:
460 """Check that one of the known HSC corrections is being applied
461 properly.
462 """
463 header = {"EXP-ID": "HSCA00120800", "INSTRUME": "HSC", "DATA-TYP": "FLAT"}
465 fixed = fix_header(header, translator_class=HscTranslator)
466 self.assertTrue(fixed)
467 self.assertEqual(header["DATA-TYP"], "OBJECT")
469 # Check provenance
470 self.assertIn("HSC-HSCA00120800.yaml", header["HIERARCH ASTRO METADATA FIX FILE"])
472 # And that this header won't be corrected
473 header = {"EXP-ID": "HSCA00120800X", "INSTRUME": "HSC", "DATA-TYP": "FLAT"}
475 fixed = fix_header(header, translator_class=HscTranslator)
476 self.assertFalse(fixed)
477 self.assertEqual(header["DATA-TYP"], "FLAT")
479 def test_decam_fix_header(self) -> None:
480 """Check that one of the known DECam corrections is being applied
481 properly.
482 """
483 # This header is a bias (zero) with an erroneous Y filter
484 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data"))
485 fixed = fix_header(header, translator_class=DecamTranslator)
486 self.assertTrue(fixed)
487 self.assertEqual(header["FILTER"], "solid plate 0.0 0.0")
489 def test_translator_fix_header(self) -> None:
490 """Check that translator classes can fix headers."""
491 # Read in a known header
492 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data"))
493 self.assertEqual(header["DTSITE"], "ct")
495 header2 = copy.copy(header)
496 fixed = fix_header(header2, translator_class=NotDecamTranslator)
497 self.assertTrue(fixed)
498 self.assertEqual(header2["DTSITE"], "hi")
500 header2 = copy.copy(header)
501 header2["DTSITE"] = "reset"
502 with self.assertLogs("astro_metadata_translator", level="FATAL"):
503 fixed = fix_header(header2, translator_class=AlsoNotDecamTranslator)
504 self.assertFalse(fixed)
505 self.assertEqual(header2["DTSITE"], "reset")
507 def test_no_double_fix(self) -> None:
508 """Check that header fixup only happens once."""
509 # Read in a known header
510 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data"))
511 self.assertEqual(header["DTSITE"], "ct")
513 # First time it will modifiy DTSITE
514 fixed = fix_header(header, translator_class=NotDecamTranslator2)
515 self.assertTrue(fixed)
516 self.assertEqual(header["DTSITE"], "cthi")
518 # Get the fix up date
519 date = header["HIERARCH ASTRO METADATA FIX DATE"]
521 # Second time it will do nothing but still report it was fixed
522 fixed = fix_header(header, translator_class=NotDecamTranslator2)
523 self.assertTrue(fixed)
524 self.assertEqual(header["DTSITE"], "cthi")
526 # Date of fixup should be the same
527 self.assertEqual(header["HIERARCH ASTRO METADATA FIX DATE"], date)
529 # Test the translator version in provenance
530 self.assertEqual(header["HIERARCH ASTRO METADATA FIX VERSION"], "1.0.0")
532 def test_bad_file(self) -> None:
533 """Test that we get a log message if no translator can be
534 determined.
535 """
536 bad_file = os.path.join(TESTDIR, "data", "corrections", "SCUBA_test-20000101_00002.yaml")
537 md = read_basic_metadata_from_file(bad_file, 0)
538 assert md is not None # for mypy.
539 with self.assertLogs(level="DEBUG") as cm:
540 result = fix_header(md)
541 self.assertFalse(result)
542 self.assertIn("Unable to determine translator class", "\n".join(cm.output))
545if __name__ == "__main__":
546 unittest.main()