Coverage for tests / test_groups.py: 23%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:43 +0000
1# This file is part of astro_metadata_translator.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12import math
13import os.path
14import unittest
15from collections.abc import MutableMapping, Sequence
16from typing import Any
18from pydantic import BaseModel, ConfigDict
20from astro_metadata_translator import ObservationGroup, ObservationInfo
21from astro_metadata_translator.serialize import group_to_fits, info_to_fits
22from astro_metadata_translator.tests import read_test_file
24TESTDIR = os.path.abspath(os.path.dirname(__file__))
27class ModelWithObsGroup(BaseModel):
28 """Pydantic model that contains an ObservationGroup."""
30 model_config = ConfigDict(
31 ser_json_inf_nan="constants", # Pydantic does not preserve NaN support from child models.
32 )
34 obs_group: ObservationGroup
35 number: int
38class ObservationGroupTestCase(unittest.TestCase):
39 """Test grouping of observations."""
41 datadir = os.path.join(TESTDIR, "data")
43 def setUp(self) -> None:
44 self.decam_files = (
45 "fitsheader-decam.yaml",
46 "fitsheader-decam-0160496.yaml",
47 "fitsheader-decam-calexp-0412037_10.yaml",
48 )
49 self.hsc_files = ("fitsheader-hsc-HSCA04090107.yaml", "fitsheader-hsc.yaml")
51 def _files_to_headers(self, files: Sequence[str]) -> Sequence[MutableMapping[str, Any]]:
52 return [read_test_file(os.path.join(self.datadir, f)) for f in files]
54 def test_groups(self) -> None:
55 headers = self._files_to_headers(self.decam_files)
57 obs_group = ObservationGroup(headers)
58 self.assertEqual(len(obs_group), 3)
59 self.assertEqual(
60 str(obs_group),
61 "[(DECam, 2013-09-01T06:02:55.754), (DECam, 2012-12-11T22:06:32.859),"
62 " (DECam, 2015-02-20T00:47:21.127)]",
63 )
65 # Ensure that pedantic conversion is allowed.
66 obs_group = ObservationGroup(headers, pedantic=True)
68 sorted_group = ObservationGroup(sorted(obs_group))
69 self.assertIsInstance(sorted_group, ObservationGroup)
70 self.assertEqual(len(sorted_group), 3)
71 self.assertEqual(sorted_group[0], obs_group[1])
73 self.assertNotEqual(obs_group, sorted_group)
74 obs_group.sort()
75 self.assertEqual(obs_group, sorted_group)
76 obs_group.reverse()
77 self.assertEqual(obs_group[0], sorted_group[-1])
79 newest = obs_group.newest()
80 oldest = obs_group.oldest()
81 self.assertEqual(newest, sorted_group[-1])
82 self.assertEqual(oldest, sorted_group[0])
84 self.assertLess(oldest, newest)
85 self.assertGreater(newest, oldest)
87 self.assertNotEqual(oldest, obs_group)
88 self.assertNotEqual(obs_group, oldest)
90 # Add some headers and check that sorting still works
91 obs_group.extend(self._files_to_headers(self.hsc_files))
92 self.assertEqual(len(obs_group), 5)
93 self.assertEqual(obs_group.newest(), obs_group[3])
95 instruments = obs_group.property_values("instrument")
96 self.assertEqual(instruments, {"HSC", "DECam"})
98 # Check that simplified form round trips
99 self.assertEqual(ObservationGroup.from_simple(obs_group.to_simple()), obs_group)
101 # Delete some entries.
102 first = obs_group.pop()
103 self.assertIsInstance(first, ObservationInfo)
104 self.assertEqual(len(obs_group), 4)
106 # Override an entry with type coercion.
107 obs_group[0] = headers[-1]
108 self.assertIsInstance(obs_group[0], ObservationInfo)
110 with self.assertRaises(ValueError):
111 # None is not allowed here.
112 ObservationGroup([None, *headers])
114 with self.assertRaises(ValueError):
115 # Passing in a bad translator class.
116 ObservationGroup(headers, translator_class=[])
118 def test_sort_key(self) -> None:
119 """Sorting with keys."""
120 headers = self._files_to_headers(self.decam_files)
122 obs_group = ObservationGroup(headers)
124 # Force sorting.
125 obs_group.sort()
126 self.assertIsNotNone(obs_group._sorted)
128 def by_airmass(value: ObservationInfo) -> float:
129 am = value.boresight_airmass
130 if am is None:
131 return 0.0
132 return am
134 obs_group.sort(key=by_airmass, reverse=True)
135 self.assertEqual(obs_group[0].boresight_airmass, 1.67)
137 def test_fits_group(self) -> None:
138 headers = self._files_to_headers(self.decam_files)
140 obs_group = ObservationGroup(headers)
141 cards, comments = group_to_fits(obs_group)
143 expected = {
144 "INSTRUME": "DECam",
145 "TIMESYS": "TAI",
146 "DATE-OBS": "2012-12-11T22:07:07.859",
147 "MJD-OBS": 56272.92161874134,
148 "DATE-BEG": "2012-12-11T22:07:07.859",
149 "MJD-BEG": 56272.92161874134,
150 "DATE-END": "2015-02-20T00:50:11.000",
151 "MJD-END": 57073.034849537034,
152 "DATE-AVG": "2014-01-15T23:28:39.430",
153 "MJD-AVG": 56672.97823413919,
154 }
155 self.assertEqual(cards, expected)
157 def test_fits_info(self) -> None:
158 header = self._files_to_headers(self.decam_files)[0]
159 obs_group = ObservationGroup([header])
160 cards, comments = info_to_fits(obs_group[0])
162 expected = {
163 "INSTRUME": "DECam",
164 "TIMESYS": "TAI",
165 "MJD-AVG": 56536.25417681625,
166 "MJD-END": 56536.25591435185,
167 "MJD-OBS": 56536.25243928065,
168 "MJD-BEG": 56536.25243928065,
169 "DATE-OBS": "2013-09-01T06:03:30.754",
170 "DATE-BEG": "2013-09-01T06:03:30.754",
171 "DATE-AVG": "2013-09-01T06:06:00.877",
172 "DATE-END": "2013-09-01T06:08:31.000",
173 }
174 self.assertEqual(cards, expected)
176 def test_pydantic_roundtrip(self) -> None:
177 """Test that ObservationGroup can round trip using Pydantic APIs."""
178 headers = self._files_to_headers(self.decam_files)
180 obsinfo = ObservationInfo(detector_name="detA", boresight_airmass=math.nan)
181 headers.append(obsinfo)
183 obs_group = ObservationGroup(headers)
184 j_str = obs_group.model_dump_json()
185 new_group = ObservationGroup.model_validate_json(j_str)
186 self.assertEqual(new_group, obs_group)
187 self.assertTrue(math.isnan(new_group[-1].boresight_airmass))
189 # Now try a model containing a group.
190 test_model = ModelWithObsGroup(obs_group=new_group, number=42)
191 j_str = test_model.model_dump_json()
192 new_model = ModelWithObsGroup.model_validate_json(j_str)
193 self.assertEqual(new_model, test_model)
194 self.assertTrue(math.isnan(new_model.obs_group[-1].boresight_airmass))
196 def test_iteration_semantics(self) -> None:
197 """Test that iterating the group yields ObservationInfo members."""
198 headers = self._files_to_headers(self.decam_files)
199 obs_group = ObservationGroup(headers)
200 self.assertEqual(list(obs_group), obs_group._members)
203if __name__ == "__main__":
204 unittest.main()