Coverage for python / astro_metadata_translator / observationGroup.py: 36%
111 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:50 +0000
1# This file is part of astro_metadata_translator.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12"""Represent a collection of translated headers."""
14from __future__ import annotations
16__all__ = ("ObservationGroup",)
18import logging
19from collections.abc import Callable, Iterable, MutableMapping, MutableSequence, Sequence
20from itertools import zip_longest
21from typing import TYPE_CHECKING, Any, cast, overload
23from pydantic import ConfigDict, GetCoreSchemaHandler, RootModel
24from pydantic_core import CoreSchema, core_schema
26from .observationInfo import ObservationInfo
28if TYPE_CHECKING:
29 from .translator import MetadataTranslator
31log = logging.getLogger(__name__)
34class _ObservationGroupPydanticModel(RootModel[list[ObservationInfo]]):
35 """Private helper model for Pydantic interoperability."""
37 model_config = ConfigDict(arbitrary_types_allowed=True, ser_json_inf_nan="constants")
40class ObservationGroup(MutableSequence[ObservationInfo]):
41 """A collection of `ObservationInfo` headers.
43 Parameters
44 ----------
45 members : iterable of `ObservationInfo` or `dict`-like
46 `ObservationInfo` to seed the group membership. If `dict`-like
47 values are used they will be passed to the `ObservationInfo`
48 constructor.
49 translator_class : `MetadataTranslator`-class, optional
50 If any of the members is not an `ObservationInfo`, translator class
51 to pass to the `ObservationInfo` constructor. If `None` the
52 translation class will be determined automatically.
53 pedantic : `bool`, optional
54 If any of the members is not an `ObservationInfo`, passed to the
55 `ObservationInfo` constructor to control whether
56 a failed translation is fatal or not. `None` indicates that the
57 `ObservationInfo` constructor default should be used.
58 """
60 def __init__(
61 self,
62 members: Iterable[ObservationInfo | MutableMapping[str, Any]],
63 translator_class: type[MetadataTranslator] | None = None,
64 pedantic: bool | None = None,
65 ) -> None:
66 self._members = [
67 self._coerce_value(m, translator_class=translator_class, pedantic=pedantic) for m in members
68 ]
69 self._sorted: list[ObservationInfo] | None = None
71 def __len__(self) -> int:
72 return len(self._members)
74 def __delitem__(self, index: int | slice) -> None:
75 del self._members[index]
76 self._sorted = None
78 @overload
79 def __getitem__(self, index: int) -> ObservationInfo: ... 79 ↛ exitline 79 didn't return from function '__getitem__' because
81 @overload
82 def __getitem__(self, index: slice) -> list[ObservationInfo]: ... 82 ↛ exitline 82 didn't return from function '__getitem__' because
84 def __getitem__(self, index: int | slice) -> ObservationInfo | list[ObservationInfo]:
85 return self._members[index]
87 def __str__(self) -> str:
88 results = []
89 for obs_info in self:
90 results.append(f"({obs_info.instrument}, {obs_info.datetime_begin})")
91 return "[" + ", ".join(results) + "]"
93 def _coerce_value(
94 self,
95 value: object,
96 translator_class: type[MetadataTranslator] | None = None,
97 pedantic: bool | None = None,
98 ) -> ObservationInfo:
99 """Given a value, ensure it is an `ObservationInfo`.
101 Parameters
102 ----------
103 value : `ObservationInfo` or `dict`-like
104 Either an `ObservationInfo` or something that can be passed to
105 an `ObservationInfo` constructor.
106 translator_class : `MetadataTranslator`-class, optional
107 If value is not an `ObservationInfo`, translator class to pass to
108 the `ObservationInfo` constructor. If `None` the
109 translation class will be determined automatically.
110 pedantic : `bool`, optional
111 If value is not an `ObservationInfo`, passed to the
112 `ObservationInfo` constructor to control whether
113 a failed translation is fatal or not. `None` indicates that the
114 `ObservationInfo` constructor default should be used.
116 Raises
117 ------
118 ValueError
119 Raised if supplied value is not an `ObservationInfo` and can
120 not be turned into one.
121 """
122 if value is None:
123 raise ValueError("An ObservationGroup cannot contain 'None'")
125 if not isinstance(value, ObservationInfo):
126 try:
127 if not isinstance(value, MutableMapping):
128 if hasattr(value, "items"):
129 value = cast(MutableMapping[str, Any], dict(cast(Any, value).items()))
130 else:
131 raise TypeError(f"Value is not dict-like: {type(value)}")
132 kwargs: dict[str, Any] = {"translator_class": translator_class}
133 if pedantic is not None:
134 kwargs["pedantic"] = pedantic
135 value = ObservationInfo(value, **kwargs)
136 except Exception as e:
137 raise ValueError("Could not convert value to ObservationInfo") from e
139 return value
141 def __eq__(self, other: Any) -> bool:
142 """Check equality with another group.
144 Compare equal if all the members are equal in the same order.
146 Parameters
147 ----------
148 other : `typing.Any`
149 Thing to compare with current group.
150 """
151 if not isinstance(other, ObservationGroup):
152 return NotImplemented
154 for info1, info2 in zip_longest(self, other):
155 if info1 != info2:
156 return False
157 return True
159 @overload
160 def __setitem__(self, index: int, value: ObservationInfo | MutableMapping[str, Any]) -> None: ... 160 ↛ exitline 160 didn't return from function '__setitem__' because
162 @overload
163 def __setitem__( 163 ↛ exitline 163 didn't return from function '__setitem__' because
164 self, index: slice, value: Iterable[ObservationInfo | MutableMapping[str, Any]]
165 ) -> None: ...
167 def __setitem__(
168 self,
169 index: int | slice,
170 value: ObservationInfo
171 | MutableMapping[str, Any]
172 | Iterable[ObservationInfo | MutableMapping[str, Any]],
173 ) -> None:
174 """Store item in group.
176 Parameters
177 ----------
178 index : `int`
179 Index to use to store the item.
180 value : `ObservationInfo` or `~collections.abc.MutableMapping`
181 Information to store in group. Item must be an `ObservationInfo`
182 or something that can be passed to an `ObservationInfo`
183 constructor.
184 """
185 if isinstance(index, slice):
186 if isinstance(value, ObservationInfo) or hasattr(value, "items"):
187 raise TypeError("Can only assign an iterable to an ObservationGroup slice")
188 self._members[index] = [self._coerce_value(v) for v in value]
189 else:
190 self._members[index] = self._coerce_value(value)
191 self._sorted = None
193 def insert(self, index: int, value: ObservationInfo | MutableMapping[str, Any]) -> None:
194 value = self._coerce_value(value)
195 self._members.insert(index, value)
196 self._sorted = None
198 def reverse(self) -> None:
199 self._members.reverse()
201 def sort(self, key: Callable | None = None, reverse: bool = False) -> None:
202 self._members.sort(key=key, reverse=reverse)
203 if key is None and not reverse and self._sorted is None:
204 # Store sorted order in cache. We only cache the sorted order
205 # if we are doing a default time-based sort so that newest
206 # and oldest can work properly without having to resort each time.
207 # We know that if the cache is populated that that is already
208 # the correct answer so no need to re-copy.
209 self._sorted = self._members.copy()
211 def extremes(self) -> tuple[ObservationInfo, ObservationInfo]:
212 """Return the oldest observation in the group and the newest.
214 If there is only one member of the group, the newest and oldest
215 can be the same observation.
217 Returns
218 -------
219 oldest : `ObservationInfo`
220 Oldest observation.
221 newest : `ObservationInfo`
222 Newest observation.
223 """
224 if self._sorted is None:
225 self._sorted = sorted(self._members)
226 return self._sorted[0], self._sorted[-1]
228 def newest(self) -> ObservationInfo:
229 """Return the newest observation in the group.
231 Returns
232 -------
233 newest : `ObservationInfo`
234 The newest `ObservationInfo` in the `ObservationGroup`.
235 """
236 return self.extremes()[1]
238 def oldest(self) -> ObservationInfo:
239 """Return the oldest observation in the group.
241 Returns
242 -------
243 oldest : `ObservationInfo`
244 The oldest `ObservationInfo` in the `ObservationGroup`.
245 """
246 return self.extremes()[0]
248 def property_values(self, property: str) -> set[Any]:
249 """Return a set of values associated with the specified property.
251 Parameters
252 ----------
253 property : `str`
254 Property of an `ObservationInfo`.
256 Returns
257 -------
258 values : `set`
259 All the distinct values for that property within this group.
260 """
261 return {getattr(obs_info, property) for obs_info in self}
263 def to_simple(self) -> list[MutableMapping[str, Any]]:
264 """Convert the group to simplified form.
266 Returns
267 -------
268 simple : `list` of `dict`
269 Simple form is a list containing the simplified dict form of
270 each `ObservationInfo`.
271 """
272 return [obsinfo.to_simple() for obsinfo in self]
274 def model_dump_json(self, **kwargs: Any) -> str:
275 """Serialize to JSON using Pydantic-compatible semantics.
277 Parameters
278 ----------
279 **kwargs : `~typing.Any`
280 Parameters passed to `pydantic.BaseModel.model_dump_json`.
282 Returns
283 -------
284 json_data : `str`
285 JSON string representing the model.
286 """
287 return _ObservationGroupPydanticModel(self._members).model_dump_json(**kwargs)
289 @classmethod
290 def model_validate_json(cls, json_data: str | bytes | bytearray, **kwargs: Any) -> ObservationGroup:
291 """Deserialize from JSON using Pydantic-compatible semantics.
293 Parameters
294 ----------
295 json_data : `str` | `bytes` | `bytearray`
296 JSON representation of the model.
297 **kwargs : `~typing.Any`
298 Parameters passed to `pydantic.BaseModel.model_validate_json`.
300 Returns
301 -------
302 group : `ObservationGroup`
303 Model constructed from the JSON.
304 """
305 model = _ObservationGroupPydanticModel.model_validate_json(json_data, **kwargs)
306 return cls(model.root)
308 @classmethod
309 def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler) -> CoreSchema:
310 # Integrate ObservationGroup as a custom type in Pydantic models.
311 list_schema = core_schema.list_schema(handler.generate_schema(ObservationInfo))
313 return core_schema.no_info_after_validator_function(
314 cls._validate_pydantic,
315 list_schema,
316 serialization=core_schema.plain_serializer_function_ser_schema(
317 cls._serialize_pydantic, return_schema=list_schema
318 ),
319 )
321 @classmethod
322 def _validate_pydantic(cls, value: Any) -> ObservationGroup:
323 if isinstance(value, cls):
324 return value
325 if isinstance(value, list):
326 return cls(value)
327 raise TypeError(f"Unexpected type for {cls.__name__}: {type(value)}")
329 @staticmethod
330 def _serialize_pydantic(value: ObservationGroup) -> list[ObservationInfo]:
331 return value._members
333 @classmethod
334 def from_simple(cls, simple: Sequence[MutableMapping[str, Any]]) -> ObservationGroup:
335 """Convert simplified form back to `ObservationGroup`.
337 Parameters
338 ----------
339 simple : `list` of `dict`
340 Object returned by `to_simple`.
342 Returns
343 -------
344 group : `ObservationGroup`
345 Reconstructed group.
346 """
347 return cls(ObservationInfo.from_simple(o) for o in simple)