Coverage for python / astro_metadata_translator / observationGroup.py: 36%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:38 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Represent a collection of translated headers.""" 

13 

14from __future__ import annotations 

15 

16__all__ = ("ObservationGroup",) 

17 

18import logging 

19from collections.abc import Callable, Iterable, MutableMapping, MutableSequence, Sequence 

20from itertools import zip_longest 

21from typing import TYPE_CHECKING, Any, cast, overload 

22 

23from pydantic import ConfigDict, GetCoreSchemaHandler, RootModel 

24from pydantic_core import CoreSchema, core_schema 

25 

26from .observationInfo import ObservationInfo 

27 

28if TYPE_CHECKING: 

29 from .translator import MetadataTranslator 

30 

31log = logging.getLogger(__name__) 

32 

33 

34class _ObservationGroupPydanticModel(RootModel[list[ObservationInfo]]): 

35 """Private helper model for Pydantic interoperability.""" 

36 

37 model_config = ConfigDict(arbitrary_types_allowed=True, ser_json_inf_nan="constants") 

38 

39 

40class ObservationGroup(MutableSequence[ObservationInfo]): 

41 """A collection of `ObservationInfo` headers. 

42 

43 Parameters 

44 ---------- 

45 members : iterable of `ObservationInfo` or `dict`-like 

46 `ObservationInfo` to seed the group membership. If `dict`-like 

47 values are used they will be passed to the `ObservationInfo` 

48 constructor. 

49 translator_class : `MetadataTranslator`-class, optional 

50 If any of the members is not an `ObservationInfo`, translator class 

51 to pass to the `ObservationInfo` constructor. If `None` the 

52 translation class will be determined automatically. 

53 pedantic : `bool`, optional 

54 If any of the members is not an `ObservationInfo`, passed to the 

55 `ObservationInfo` constructor to control whether 

56 a failed translation is fatal or not. `None` indicates that the 

57 `ObservationInfo` constructor default should be used. 

58 """ 

59 

60 def __init__( 

61 self, 

62 members: Iterable[ObservationInfo | MutableMapping[str, Any]], 

63 translator_class: type[MetadataTranslator] | None = None, 

64 pedantic: bool | None = None, 

65 ) -> None: 

66 self._members = [ 

67 self._coerce_value(m, translator_class=translator_class, pedantic=pedantic) for m in members 

68 ] 

69 self._sorted: list[ObservationInfo] | None = None 

70 

71 def __len__(self) -> int: 

72 return len(self._members) 

73 

74 def __delitem__(self, index: int | slice) -> None: 

75 del self._members[index] 

76 self._sorted = None 

77 

78 @overload 

79 def __getitem__(self, index: int) -> ObservationInfo: ... 79 ↛ exitline 79 didn't return from function '__getitem__' because

80 

81 @overload 

82 def __getitem__(self, index: slice) -> list[ObservationInfo]: ... 82 ↛ exitline 82 didn't return from function '__getitem__' because

83 

84 def __getitem__(self, index: int | slice) -> ObservationInfo | list[ObservationInfo]: 

85 return self._members[index] 

86 

87 def __str__(self) -> str: 

88 results = [] 

89 for obs_info in self: 

90 results.append(f"({obs_info.instrument}, {obs_info.datetime_begin})") 

91 return "[" + ", ".join(results) + "]" 

92 

93 def _coerce_value( 

94 self, 

95 value: object, 

96 translator_class: type[MetadataTranslator] | None = None, 

97 pedantic: bool | None = None, 

98 ) -> ObservationInfo: 

99 """Given a value, ensure it is an `ObservationInfo`. 

100 

101 Parameters 

102 ---------- 

103 value : `ObservationInfo` or `dict`-like 

104 Either an `ObservationInfo` or something that can be passed to 

105 an `ObservationInfo` constructor. 

106 translator_class : `MetadataTranslator`-class, optional 

107 If value is not an `ObservationInfo`, translator class to pass to 

108 the `ObservationInfo` constructor. If `None` the 

109 translation class will be determined automatically. 

110 pedantic : `bool`, optional 

111 If value is not an `ObservationInfo`, passed to the 

112 `ObservationInfo` constructor to control whether 

113 a failed translation is fatal or not. `None` indicates that the 

114 `ObservationInfo` constructor default should be used. 

115 

116 Raises 

117 ------ 

118 ValueError 

119 Raised if supplied value is not an `ObservationInfo` and can 

120 not be turned into one. 

121 """ 

122 if value is None: 

123 raise ValueError("An ObservationGroup cannot contain 'None'") 

124 

125 if not isinstance(value, ObservationInfo): 

126 try: 

127 if not isinstance(value, MutableMapping): 

128 if hasattr(value, "items"): 

129 value = cast(MutableMapping[str, Any], dict(cast(Any, value).items())) 

130 else: 

131 raise TypeError(f"Value is not dict-like: {type(value)}") 

132 kwargs: dict[str, Any] = {"translator_class": translator_class} 

133 if pedantic is not None: 

134 kwargs["pedantic"] = pedantic 

135 value = ObservationInfo(value, **kwargs) 

136 except Exception as e: 

137 raise ValueError("Could not convert value to ObservationInfo") from e 

138 

139 return value 

140 

141 def __eq__(self, other: Any) -> bool: 

142 """Check equality with another group. 

143 

144 Compare equal if all the members are equal in the same order. 

145 

146 Parameters 

147 ---------- 

148 other : `typing.Any` 

149 Thing to compare with current group. 

150 """ 

151 if not isinstance(other, ObservationGroup): 

152 return NotImplemented 

153 

154 for info1, info2 in zip_longest(self, other): 

155 if info1 != info2: 

156 return False 

157 return True 

158 

159 @overload 

160 def __setitem__(self, index: int, value: ObservationInfo | MutableMapping[str, Any]) -> None: ... 160 ↛ exitline 160 didn't return from function '__setitem__' because

161 

162 @overload 

163 def __setitem__( 163 ↛ exitline 163 didn't return from function '__setitem__' because

164 self, index: slice, value: Iterable[ObservationInfo | MutableMapping[str, Any]] 

165 ) -> None: ... 

166 

167 def __setitem__( 

168 self, 

169 index: int | slice, 

170 value: ObservationInfo 

171 | MutableMapping[str, Any] 

172 | Iterable[ObservationInfo | MutableMapping[str, Any]], 

173 ) -> None: 

174 """Store item in group. 

175 

176 Parameters 

177 ---------- 

178 index : `int` 

179 Index to use to store the item. 

180 value : `ObservationInfo` or `~collections.abc.MutableMapping` 

181 Information to store in group. Item must be an `ObservationInfo` 

182 or something that can be passed to an `ObservationInfo` 

183 constructor. 

184 """ 

185 if isinstance(index, slice): 

186 if isinstance(value, ObservationInfo) or hasattr(value, "items"): 

187 raise TypeError("Can only assign an iterable to an ObservationGroup slice") 

188 self._members[index] = [self._coerce_value(v) for v in value] 

189 else: 

190 self._members[index] = self._coerce_value(value) 

191 self._sorted = None 

192 

193 def insert(self, index: int, value: ObservationInfo | MutableMapping[str, Any]) -> None: 

194 value = self._coerce_value(value) 

195 self._members.insert(index, value) 

196 self._sorted = None 

197 

198 def reverse(self) -> None: 

199 self._members.reverse() 

200 

201 def sort(self, key: Callable | None = None, reverse: bool = False) -> None: 

202 self._members.sort(key=key, reverse=reverse) 

203 if key is None and not reverse and self._sorted is None: 

204 # Store sorted order in cache. We only cache the sorted order 

205 # if we are doing a default time-based sort so that newest 

206 # and oldest can work properly without having to resort each time. 

207 # We know that if the cache is populated that that is already 

208 # the correct answer so no need to re-copy. 

209 self._sorted = self._members.copy() 

210 

211 def extremes(self) -> tuple[ObservationInfo, ObservationInfo]: 

212 """Return the oldest observation in the group and the newest. 

213 

214 If there is only one member of the group, the newest and oldest 

215 can be the same observation. 

216 

217 Returns 

218 ------- 

219 oldest : `ObservationInfo` 

220 Oldest observation. 

221 newest : `ObservationInfo` 

222 Newest observation. 

223 """ 

224 if self._sorted is None: 

225 self._sorted = sorted(self._members) 

226 return self._sorted[0], self._sorted[-1] 

227 

228 def newest(self) -> ObservationInfo: 

229 """Return the newest observation in the group. 

230 

231 Returns 

232 ------- 

233 newest : `ObservationInfo` 

234 The newest `ObservationInfo` in the `ObservationGroup`. 

235 """ 

236 return self.extremes()[1] 

237 

238 def oldest(self) -> ObservationInfo: 

239 """Return the oldest observation in the group. 

240 

241 Returns 

242 ------- 

243 oldest : `ObservationInfo` 

244 The oldest `ObservationInfo` in the `ObservationGroup`. 

245 """ 

246 return self.extremes()[0] 

247 

248 def property_values(self, property: str) -> set[Any]: 

249 """Return a set of values associated with the specified property. 

250 

251 Parameters 

252 ---------- 

253 property : `str` 

254 Property of an `ObservationInfo`. 

255 

256 Returns 

257 ------- 

258 values : `set` 

259 All the distinct values for that property within this group. 

260 """ 

261 return {getattr(obs_info, property) for obs_info in self} 

262 

263 def to_simple(self) -> list[MutableMapping[str, Any]]: 

264 """Convert the group to simplified form. 

265 

266 Returns 

267 ------- 

268 simple : `list` of `dict` 

269 Simple form is a list containing the simplified dict form of 

270 each `ObservationInfo`. 

271 """ 

272 return [obsinfo.to_simple() for obsinfo in self] 

273 

274 def model_dump_json(self, **kwargs: Any) -> str: 

275 """Serialize to JSON using Pydantic-compatible semantics. 

276 

277 Parameters 

278 ---------- 

279 **kwargs : `~typing.Any` 

280 Parameters passed to `pydantic.BaseModel.model_dump_json`. 

281 

282 Returns 

283 ------- 

284 json_data : `str` 

285 JSON string representing the model. 

286 """ 

287 return _ObservationGroupPydanticModel(self._members).model_dump_json(**kwargs) 

288 

289 @classmethod 

290 def model_validate_json(cls, json_data: str | bytes | bytearray, **kwargs: Any) -> ObservationGroup: 

291 """Deserialize from JSON using Pydantic-compatible semantics. 

292 

293 Parameters 

294 ---------- 

295 json_data : `str` | `bytes` | `bytearray` 

296 JSON representation of the model. 

297 **kwargs : `~typing.Any` 

298 Parameters passed to `pydantic.BaseModel.model_validate_json`. 

299 

300 Returns 

301 ------- 

302 group : `ObservationGroup` 

303 Model constructed from the JSON. 

304 """ 

305 model = _ObservationGroupPydanticModel.model_validate_json(json_data, **kwargs) 

306 return cls(model.root) 

307 

308 @classmethod 

309 def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler) -> CoreSchema: 

310 # Integrate ObservationGroup as a custom type in Pydantic models. 

311 list_schema = core_schema.list_schema(handler.generate_schema(ObservationInfo)) 

312 

313 return core_schema.no_info_after_validator_function( 

314 cls._validate_pydantic, 

315 list_schema, 

316 serialization=core_schema.plain_serializer_function_ser_schema( 

317 cls._serialize_pydantic, return_schema=list_schema 

318 ), 

319 ) 

320 

321 @classmethod 

322 def _validate_pydantic(cls, value: Any) -> ObservationGroup: 

323 if isinstance(value, cls): 

324 return value 

325 if isinstance(value, list): 

326 return cls(value) 

327 raise TypeError(f"Unexpected type for {cls.__name__}: {type(value)}") 

328 

329 @staticmethod 

330 def _serialize_pydantic(value: ObservationGroup) -> list[ObservationInfo]: 

331 return value._members 

332 

333 @classmethod 

334 def from_simple(cls, simple: Sequence[MutableMapping[str, Any]]) -> ObservationGroup: 

335 """Convert simplified form back to `ObservationGroup`. 

336 

337 Parameters 

338 ---------- 

339 simple : `list` of `dict` 

340 Object returned by `to_simple`. 

341 

342 Returns 

343 ------- 

344 group : `ObservationGroup` 

345 Reconstructed group. 

346 """ 

347 return cls(ObservationInfo.from_simple(o) for o in simple)