Coverage for tests / test_headers.py: 24%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:50 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12import copy 

13import os.path 

14import unittest 

15from collections.abc import MutableMapping 

16from typing import Any, Never 

17 

18from astro_metadata_translator import ( 

19 DecamTranslator, 

20 HscTranslator, 

21 ObservationInfo, 

22 fix_header, 

23 merge_headers, 

24) 

25from astro_metadata_translator.file_helpers import read_basic_metadata_from_file 

26from astro_metadata_translator.tests import read_test_file 

27 

28TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

29 

30 

31class NotDecamTranslator(DecamTranslator): 

32 """A DECam translator with override list of header corrections.""" 

33 

34 name = None 

35 

36 @classmethod 

37 def fix_header( 

38 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None 

39 ) -> bool: 

40 header["DTSITE"] = "hi" 

41 return True 

42 

43 @classmethod 

44 def translator_version(cls) -> str: 

45 # Hardcode a version so we can test for it 

46 return "1.0.0" 

47 

48 

49class NotDecamTranslator2(NotDecamTranslator): 

50 """Similar to NotDecamTranslator but has a fixup that will break on 

51 repeat. 

52 """ 

53 

54 name = None 

55 

56 @classmethod 

57 def fix_header( 

58 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None 

59 ) -> bool: 

60 header["DTSITE"] += "hi" 

61 return True 

62 

63 

64class AlsoNotDecamTranslator(DecamTranslator): 

65 """A DECam translator with override list of header corrections 

66 that fails. 

67 """ 

68 

69 name = None 

70 

71 @classmethod 

72 def fix_header( 

73 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None 

74 ) -> Never: 

75 raise RuntimeError("Failure to work something out from header") 

76 

77 

78class NullDecamTranslator(DecamTranslator): 

79 """A DECam translator that doesn't do any fixes.""" 

80 

81 name = None 

82 

83 @classmethod 

84 def fix_header( 

85 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None 

86 ) -> bool: 

87 return False 

88 

89 

90class HeadersTestCase(unittest.TestCase): 

91 """Test header manipulation utilities.""" 

92 

93 def setUp(self) -> None: 

94 # Define reference headers 

95 self.h1 = dict( 

96 ORIGIN="LSST", 

97 KEY0=0, 

98 KEY1=1, 

99 KEY2=3, 

100 KEY3=3.1415, 

101 KEY4="a", 

102 ) 

103 

104 self.h2 = dict(ORIGIN="LSST", KEY0="0", KEY2=4, KEY5=42) 

105 self.h3 = dict( 

106 ORIGIN="AUXTEL", 

107 KEY3=3.1415, 

108 KEY2=50, 

109 KEY5=42, 

110 ) 

111 self.h4 = dict( 

112 KEY6="New", 

113 KEY1="Exists", 

114 ) 

115 

116 # Add keys for sorting by time 

117 # Sorted order: h2, h1, h4, h3 

118 self.h1["MJD-OBS"] = 50000.0 

119 self.h2["MJD-OBS"] = 49000.0 

120 self.h3["MJD-OBS"] = 53000.0 

121 self.h4["MJD-OBS"] = 52000.0 

122 

123 def test_fail(self) -> None: 

124 with self.assertRaises(ValueError): 

125 merge_headers([self.h1, self.h2], mode="wrong") 

126 

127 with self.assertRaises(ValueError): 

128 merge_headers([]) 

129 

130 def test_one(self) -> None: 

131 merged = merge_headers([self.h1], mode="drop") 

132 self.assertEqual(merged, self.h1) 

133 

134 def test_merging_overwrite(self) -> None: 

135 merged = merge_headers([self.h1, self.h2], mode="overwrite") 

136 # The merged header should be the same type as the first header 

137 self.assertIsInstance(merged, type(self.h1)) 

138 

139 expected = { 

140 "MJD-OBS": self.h2["MJD-OBS"], 

141 "ORIGIN": self.h2["ORIGIN"], 

142 "KEY0": self.h2["KEY0"], 

143 "KEY1": self.h1["KEY1"], 

144 "KEY2": self.h2["KEY2"], 

145 "KEY3": self.h1["KEY3"], 

146 "KEY4": self.h1["KEY4"], 

147 "KEY5": self.h2["KEY5"], 

148 } 

149 self.assertEqual(merged, expected) 

150 

151 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="overwrite") 

152 

153 expected = { 

154 "MJD-OBS": self.h4["MJD-OBS"], 

155 "ORIGIN": self.h3["ORIGIN"], 

156 "KEY0": self.h2["KEY0"], 

157 "KEY1": self.h4["KEY1"], 

158 "KEY2": self.h3["KEY2"], 

159 "KEY3": self.h3["KEY3"], 

160 "KEY4": self.h1["KEY4"], 

161 "KEY5": self.h3["KEY5"], 

162 "KEY6": self.h4["KEY6"], 

163 } 

164 

165 self.assertEqual(merged, expected) 

166 

167 def test_merging_first(self) -> None: 

168 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="first") 

169 

170 expected = { 

171 "MJD-OBS": self.h1["MJD-OBS"], 

172 "ORIGIN": self.h1["ORIGIN"], 

173 "KEY0": self.h1["KEY0"], 

174 "KEY1": self.h1["KEY1"], 

175 "KEY2": self.h1["KEY2"], 

176 "KEY3": self.h1["KEY3"], 

177 "KEY4": self.h1["KEY4"], 

178 "KEY5": self.h2["KEY5"], 

179 "KEY6": self.h4["KEY6"], 

180 } 

181 

182 self.assertEqual(merged, expected) 

183 

184 def test_merging_drop(self) -> None: 

185 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="drop") 

186 

187 expected = { 

188 "KEY3": self.h1["KEY3"], 

189 "KEY4": self.h1["KEY4"], 

190 "KEY5": self.h2["KEY5"], 

191 "KEY6": self.h4["KEY6"], 

192 } 

193 

194 self.assertEqual(merged, expected) 

195 

196 # Sorting the headers should make no difference to drop mode 

197 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="drop", sort=True) 

198 self.assertEqual(merged, expected) 

199 

200 # Now retain some headers 

201 merged = merge_headers( 

202 [self.h1, self.h2, self.h3, self.h4], 

203 mode="drop", 

204 sort=False, 

205 first=["ORIGIN"], 

206 last=["KEY2", "KEY1"], 

207 ) 

208 

209 expected = { 

210 "KEY2": self.h3["KEY2"], 

211 "ORIGIN": self.h1["ORIGIN"], 

212 "KEY1": self.h4["KEY1"], 

213 "KEY3": self.h1["KEY3"], 

214 "KEY4": self.h1["KEY4"], 

215 "KEY5": self.h2["KEY5"], 

216 "KEY6": self.h4["KEY6"], 

217 } 

218 self.assertEqual(merged, expected) 

219 

220 # Now retain some headers with sorting 

221 merged = merge_headers( 

222 [self.h1, self.h2, self.h3, self.h4], 

223 mode="drop", 

224 sort=True, 

225 first=["ORIGIN"], 

226 last=["KEY2", "KEY1"], 

227 ) 

228 

229 expected = { 

230 "KEY2": self.h3["KEY2"], 

231 "ORIGIN": self.h2["ORIGIN"], 

232 "KEY1": self.h4["KEY1"], 

233 "KEY3": self.h1["KEY3"], 

234 "KEY4": self.h1["KEY4"], 

235 "KEY5": self.h2["KEY5"], 

236 "KEY6": self.h4["KEY6"], 

237 } 

238 self.assertEqual(merged, expected) 

239 

240 def test_merging_diff(self) -> None: 

241 self.maxDiff = None 

242 

243 # Nothing in common for diff 

244 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="diff") 

245 

246 expected = {"__DIFF__": [self.h1, self.h2, self.h3, self.h4]} 

247 

248 self.assertEqual(merged, expected) 

249 

250 # Now with a subset that does have overlap 

251 merged = merge_headers([self.h1, self.h2], mode="diff") 

252 expected = { 

253 "ORIGIN": "LSST", 

254 "__DIFF__": [ 

255 {k: self.h1[k] for k in ("KEY0", "KEY1", "KEY2", "KEY3", "KEY4", "MJD-OBS")}, 

256 {k: self.h2[k] for k in ("KEY0", "KEY2", "KEY5", "MJD-OBS")}, 

257 ], 

258 } 

259 self.assertEqual(merged, expected) 

260 

261 # Reverse to make sure there is nothing special about the first header 

262 merged = merge_headers([self.h2, self.h1], mode="diff") 

263 expected = { 

264 "ORIGIN": "LSST", 

265 "__DIFF__": [ 

266 {k: self.h2[k] for k in ("KEY0", "KEY2", "KEY5", "MJD-OBS")}, 

267 {k: self.h1[k] for k in ("KEY0", "KEY1", "KEY2", "KEY3", "KEY4", "MJD-OBS")}, 

268 ], 

269 } 

270 self.assertEqual(merged, expected) 

271 

272 # Check that identical headers have empty diff 

273 merged = merge_headers([self.h1, self.h1], mode="diff") 

274 expected = { 

275 **self.h1, 

276 "__DIFF__": [ 

277 {}, 

278 {}, 

279 ], 

280 } 

281 self.assertEqual(merged, expected) 

282 

283 def test_merging_append(self) -> None: 

284 # Try with two headers first 

285 merged = merge_headers([self.h1, self.h2], mode="append") 

286 

287 expected = { 

288 "MJD-OBS": [self.h1["MJD-OBS"], self.h2["MJD-OBS"]], 

289 "ORIGIN": self.h1["ORIGIN"], 

290 "KEY0": [self.h1["KEY0"], self.h2["KEY0"]], 

291 "KEY1": self.h1["KEY1"], 

292 "KEY2": [self.h1["KEY2"], self.h2["KEY2"]], 

293 "KEY3": self.h1["KEY3"], 

294 "KEY4": self.h1["KEY4"], 

295 "KEY5": self.h2["KEY5"], 

296 } 

297 

298 self.assertEqual(merged, expected) 

299 

300 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="append") 

301 

302 expected = { 

303 "MJD-OBS": [self.h1["MJD-OBS"], self.h2["MJD-OBS"], self.h3["MJD-OBS"], self.h4["MJD-OBS"]], 

304 "ORIGIN": [self.h1["ORIGIN"], self.h2["ORIGIN"], self.h3["ORIGIN"], None], 

305 "KEY0": [self.h1["KEY0"], self.h2["KEY0"], None, None], 

306 "KEY1": [self.h1["KEY1"], None, None, self.h4["KEY1"]], 

307 "KEY2": [self.h1["KEY2"], self.h2["KEY2"], self.h3["KEY2"], None], 

308 "KEY3": self.h3["KEY3"], 

309 "KEY4": self.h1["KEY4"], 

310 "KEY5": self.h3["KEY5"], 

311 "KEY6": self.h4["KEY6"], 

312 } 

313 

314 self.assertEqual(merged, expected) 

315 

316 def test_merging_overwrite_sort(self) -> None: 

317 merged = merge_headers([self.h1, self.h2], mode="overwrite", sort=True) 

318 

319 expected = { 

320 "MJD-OBS": self.h1["MJD-OBS"], 

321 "ORIGIN": self.h1["ORIGIN"], 

322 "KEY0": self.h1["KEY0"], 

323 "KEY1": self.h1["KEY1"], 

324 "KEY2": self.h1["KEY2"], 

325 "KEY3": self.h1["KEY3"], 

326 "KEY4": self.h1["KEY4"], 

327 "KEY5": self.h2["KEY5"], 

328 } 

329 self.assertEqual(merged, expected) 

330 

331 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="overwrite", sort=True) 

332 

333 expected = { 

334 "MJD-OBS": self.h3["MJD-OBS"], 

335 "ORIGIN": self.h3["ORIGIN"], 

336 "KEY0": self.h1["KEY0"], 

337 "KEY1": self.h4["KEY1"], 

338 "KEY2": self.h3["KEY2"], 

339 "KEY3": self.h3["KEY3"], 

340 "KEY4": self.h1["KEY4"], 

341 "KEY5": self.h3["KEY5"], 

342 "KEY6": self.h4["KEY6"], 

343 } 

344 

345 self.assertEqual(merged, expected) 

346 

347 # Changing the order should not change the result 

348 merged = merge_headers([self.h4, self.h1, self.h3, self.h2], mode="overwrite", sort=True) 

349 

350 self.assertEqual(merged, expected) 

351 

352 def test_merging_first_sort(self) -> None: 

353 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="first", sort=True) 

354 

355 expected = { 

356 "MJD-OBS": self.h2["MJD-OBS"], 

357 "ORIGIN": self.h2["ORIGIN"], 

358 "KEY0": self.h2["KEY0"], 

359 "KEY1": self.h1["KEY1"], 

360 "KEY2": self.h2["KEY2"], 

361 "KEY3": self.h1["KEY3"], 

362 "KEY4": self.h1["KEY4"], 

363 "KEY5": self.h2["KEY5"], 

364 "KEY6": self.h4["KEY6"], 

365 } 

366 

367 self.assertEqual(merged, expected) 

368 

369 def test_merging_append_sort(self) -> None: 

370 # Try with two headers first 

371 merged = merge_headers([self.h1, self.h2], mode="append", sort=True) 

372 

373 expected = { 

374 "MJD-OBS": [self.h2["MJD-OBS"], self.h1["MJD-OBS"]], 

375 "ORIGIN": self.h1["ORIGIN"], 

376 "KEY0": [self.h2["KEY0"], self.h1["KEY0"]], 

377 "KEY1": self.h1["KEY1"], 

378 "KEY2": [self.h2["KEY2"], self.h1["KEY2"]], 

379 "KEY3": self.h1["KEY3"], 

380 "KEY4": self.h1["KEY4"], 

381 "KEY5": self.h2["KEY5"], 

382 } 

383 

384 self.assertEqual(merged, expected) 

385 

386 merged = merge_headers([self.h1, self.h2, self.h3, self.h4], mode="append", sort=True) 

387 

388 expected = { 

389 "MJD-OBS": [self.h2["MJD-OBS"], self.h1["MJD-OBS"], self.h4["MJD-OBS"], self.h3["MJD-OBS"]], 

390 "ORIGIN": [self.h2["ORIGIN"], self.h1["ORIGIN"], None, self.h3["ORIGIN"]], 

391 "KEY0": [self.h2["KEY0"], self.h1["KEY0"], None, None], 

392 "KEY1": [None, self.h1["KEY1"], self.h4["KEY1"], None], 

393 "KEY2": [self.h2["KEY2"], self.h1["KEY2"], None, self.h3["KEY2"]], 

394 "KEY3": self.h3["KEY3"], 

395 "KEY4": self.h1["KEY4"], 

396 "KEY5": self.h3["KEY5"], 

397 "KEY6": self.h4["KEY6"], 

398 } 

399 

400 self.assertEqual(merged, expected) 

401 

402 # Order should not matter 

403 merged = merge_headers([self.h4, self.h3, self.h2, self.h1], mode="append", sort=True) 

404 self.assertEqual(merged, expected) 

405 

406 def test_stripped_header_after_mutation(self) -> None: 

407 """Check stripping tolerates changes to the original header.""" 

408 header = read_test_file("fitsheader-hsc.yaml", dir=os.path.join(TESTDIR, "data")) 

409 info = ObservationInfo(header, pedantic=False) 

410 used = info.cards_used 

411 self.assertTrue(used) 

412 

413 # Mutate original header after translation. 

414 key = next(iter(used)) 

415 header.pop(key, None) 

416 

417 # Should not raise if cards_used include keys no longer present. 

418 stripped = info.stripped_header() 

419 self.assertNotIn(key, stripped) 

420 

421 

422class FixHeadersTestCase(unittest.TestCase): 

423 """Test header fix up.""" 

424 

425 def test_basic_fix_header(self) -> None: 

426 """Test that a header can be fixed if we specify a local path.""" 

427 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data")) 

428 self.assertEqual(header["DETECTOR"], "S3-111_107419-8-3") 

429 

430 # First fix header but using no search path (should work as no-op) 

431 fixed = fix_header(copy.copy(header), translator_class=NullDecamTranslator) 

432 self.assertFalse(fixed) 

433 

434 # Now using the test corrections directory 

435 header2 = copy.copy(header) 

436 fixed = fix_header( 

437 header2, 

438 search_path=os.path.join(TESTDIR, "data", "corrections"), 

439 translator_class=NullDecamTranslator, 

440 ) 

441 self.assertTrue(fixed) 

442 self.assertEqual(header2["DETECTOR"], "NEW-ID") 

443 

444 # Now with a corrections directory that has bad YAML in it 

445 header2 = copy.copy(header) 

446 with self.assertLogs(level="WARN"): 

447 fixed = fix_header( 

448 header2, 

449 search_path=os.path.join(TESTDIR, "data", "bad_corrections"), 

450 translator_class=NullDecamTranslator, 

451 ) 

452 self.assertFalse(fixed) 

453 

454 # Test that fix_header of unknown header is allowed 

455 header = {"SOMETHING": "UNKNOWN"} 

456 fixed = fix_header(copy.copy(header), translator_class=NullDecamTranslator) 

457 self.assertFalse(fixed) 

458 

459 def test_hsc_fix_header(self) -> None: 

460 """Check that one of the known HSC corrections is being applied 

461 properly. 

462 """ 

463 header = {"EXP-ID": "HSCA00120800", "INSTRUME": "HSC", "DATA-TYP": "FLAT"} 

464 

465 fixed = fix_header(header, translator_class=HscTranslator) 

466 self.assertTrue(fixed) 

467 self.assertEqual(header["DATA-TYP"], "OBJECT") 

468 

469 # Check provenance 

470 self.assertIn("HSC-HSCA00120800.yaml", header["HIERARCH ASTRO METADATA FIX FILE"]) 

471 

472 # And that this header won't be corrected 

473 header = {"EXP-ID": "HSCA00120800X", "INSTRUME": "HSC", "DATA-TYP": "FLAT"} 

474 

475 fixed = fix_header(header, translator_class=HscTranslator) 

476 self.assertFalse(fixed) 

477 self.assertEqual(header["DATA-TYP"], "FLAT") 

478 

479 def test_decam_fix_header(self) -> None: 

480 """Check that one of the known DECam corrections is being applied 

481 properly. 

482 """ 

483 # This header is a bias (zero) with an erroneous Y filter 

484 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data")) 

485 fixed = fix_header(header, translator_class=DecamTranslator) 

486 self.assertTrue(fixed) 

487 self.assertEqual(header["FILTER"], "solid plate 0.0 0.0") 

488 

489 def test_translator_fix_header(self) -> None: 

490 """Check that translator classes can fix headers.""" 

491 # Read in a known header 

492 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data")) 

493 self.assertEqual(header["DTSITE"], "ct") 

494 

495 header2 = copy.copy(header) 

496 fixed = fix_header(header2, translator_class=NotDecamTranslator) 

497 self.assertTrue(fixed) 

498 self.assertEqual(header2["DTSITE"], "hi") 

499 

500 header2 = copy.copy(header) 

501 header2["DTSITE"] = "reset" 

502 with self.assertLogs("astro_metadata_translator", level="FATAL"): 

503 fixed = fix_header(header2, translator_class=AlsoNotDecamTranslator) 

504 self.assertFalse(fixed) 

505 self.assertEqual(header2["DTSITE"], "reset") 

506 

507 def test_no_double_fix(self) -> None: 

508 """Check that header fixup only happens once.""" 

509 # Read in a known header 

510 header = read_test_file("fitsheader-decam-0160496.yaml", dir=os.path.join(TESTDIR, "data")) 

511 self.assertEqual(header["DTSITE"], "ct") 

512 

513 # First time it will modifiy DTSITE 

514 fixed = fix_header(header, translator_class=NotDecamTranslator2) 

515 self.assertTrue(fixed) 

516 self.assertEqual(header["DTSITE"], "cthi") 

517 

518 # Get the fix up date 

519 date = header["HIERARCH ASTRO METADATA FIX DATE"] 

520 

521 # Second time it will do nothing but still report it was fixed 

522 fixed = fix_header(header, translator_class=NotDecamTranslator2) 

523 self.assertTrue(fixed) 

524 self.assertEqual(header["DTSITE"], "cthi") 

525 

526 # Date of fixup should be the same 

527 self.assertEqual(header["HIERARCH ASTRO METADATA FIX DATE"], date) 

528 

529 # Test the translator version in provenance 

530 self.assertEqual(header["HIERARCH ASTRO METADATA FIX VERSION"], "1.0.0") 

531 

532 def test_bad_file(self) -> None: 

533 """Test that we get a log message if no translator can be 

534 determined. 

535 """ 

536 bad_file = os.path.join(TESTDIR, "data", "corrections", "SCUBA_test-20000101_00002.yaml") 

537 md = read_basic_metadata_from_file(bad_file, 0) 

538 assert md is not None # for mypy. 

539 with self.assertLogs(level="DEBUG") as cm: 

540 result = fix_header(md) 

541 self.assertFalse(result) 

542 self.assertIn("Unable to determine translator class", "\n".join(cm.output)) 

543 

544 

545if __name__ == "__main__": 

546 unittest.main()