Coverage for python / astro_metadata_translator / translator.py: 44%

440 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:38 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12"""Classes and support code for metadata translation.""" 

13 

14from __future__ import annotations 

15 

16__all__ = ("MetadataTranslator", "StubTranslator", "cache_translation") 

17 

18import functools 

19import importlib 

20import inspect 

21import logging 

22import math 

23import numbers 

24import textwrap 

25import warnings 

26from abc import abstractmethod 

27from collections.abc import Callable, Iterable, Iterator, Mapping, MutableMapping, Sequence 

28from importlib.metadata import entry_points 

29from typing import TYPE_CHECKING, Any, ClassVar, Concatenate, ParamSpec, TypeVar, cast 

30 

31import astropy.io.fits.card 

32import astropy.time 

33import astropy.units as u 

34from astropy.coordinates import Angle 

35from lsst.resources import ResourcePath 

36from lsst.utils.iteration import ensure_iterable 

37 

38from .properties import PROPERTIES, PropertyDefinition 

39 

40if TYPE_CHECKING: 

41 import astropy.coordinates 

42 from lsst.resources import ResourcePathExpression 

43 

44log = logging.getLogger(__name__) 

45 

46# Location of the root of the corrections resource files 

47CORRECTIONS_RESOURCE_ROOT = "corrections" 

48 

49"""Cache of version strings indexed by class.""" 

50_VERSION_CACHE: dict[type, str] = {} 

51 

52P = ParamSpec("P") 

53R = TypeVar("R") 

54SelfT = TypeVar("SelfT", bound="MetadataTranslator") 

55 

56 

57def cache_translation( 

58 func: Callable[Concatenate[SelfT, P], R], method: str | None = None 

59) -> Callable[Concatenate[SelfT, P], R]: 

60 """Cache the result of a translation method. 

61 

62 Parameters 

63 ---------- 

64 func : `~collections.abc.Callable` 

65 Translation method to cache. 

66 method : `str`, optional 

67 Name of the translation method to cache. Not needed if the decorator 

68 is used around a normal method, but necessary when the decorator is 

69 being used in a metaclass. 

70 

71 Returns 

72 ------- 

73 wrapped : `~collections.abc.Callable` 

74 Method wrapped by the caching function. 

75 

76 Notes 

77 ----- 

78 Especially useful when a translation uses many other translation 

79 methods or involves significant computation. 

80 Should be used only on ``to_x()`` methods. 

81 

82 .. code-block:: python 

83 

84 @cache_translation 

85 def to_detector_num(self): 

86 .... 

87 """ 

88 name = func.__name__ if method is None else method 

89 

90 @functools.wraps(func) 

91 def func_wrapper(self: SelfT, *args: P.args, **kwargs: P.kwargs) -> R: 

92 if name not in self._translation_cache: 

93 self._translation_cache[name] = func(self, *args, **kwargs) 

94 return cast(R, self._translation_cache[name]) 

95 

96 return func_wrapper 

97 

98 

99def _set_method_metadata(func: Callable, cls: type, method: str) -> None: 

100 target_qualname = f"{cls.__qualname__}.{method}" 

101 target_module = cls.__module__ 

102 target_name = method 

103 current = func 

104 while True: 

105 current.__name__ = target_name 

106 current.__qualname__ = target_qualname 

107 current.__module__ = target_module 

108 wrapped = getattr(current, "__wrapped__", None) 

109 if wrapped is None or wrapped is current: 

110 break 

111 current = wrapped 

112 

113 

114class MetadataTranslator: 

115 """Per-instrument metadata translation support. 

116 

117 Parameters 

118 ---------- 

119 header : `dict`-like 

120 Representation of an instrument header that can be manipulated 

121 as if it was a `dict`. 

122 filename : `str` or `~lsst.resources.ResourcePathExpression`, optional 

123 Name of the file whose header is being translated. For some 

124 datasets with missing header information this can sometimes 

125 allow for some fixups in translations. It is usually used for error 

126 reporting and logging. 

127 """ 

128 

129 # These are all deliberately empty in the base class. 

130 name: str | None = None 

131 """The declared name of the translator.""" 

132 

133 default_search_path: Sequence[str] | None = None 

134 """Default search path to use to locate header correction files.""" 

135 

136 default_resource_package = __name__.split(".")[0] 

137 """Module name to use to locate the correction resources.""" 

138 

139 default_resource_root: str | None = None 

140 """Default package resource path root to use to locate header correction 

141 files within the ``default_resource_package`` package.""" 

142 

143 _trivial_map: dict[str, str | list[str] | tuple[Any, ...]] = {} 

144 """Dict of one-to-one mappings for header translation from standard 

145 property to corresponding keyword.""" 

146 

147 _const_map: dict[str, Any] = {} 

148 """Dict defining a constant for specified standard properties.""" 

149 

150 translators: dict[str, type[MetadataTranslator]] = {} 

151 """All registered metadata translation classes.""" 

152 

153 supported_instrument: str | None = None 

154 """Name of instrument understood by this translation class.""" 

155 

156 all_properties: dict[str, PropertyDefinition] = {} 

157 """All the valid properties for this translator including extensions.""" 

158 

159 extensions: dict[str, PropertyDefinition] = {} 

160 """Extension properties (`str`: `PropertyDefinition`) 

161 

162 Some instruments have important properties beyond the standard set; this is 

163 the place to declare that they exist, and they will be treated in the same 

164 way as the standard set, except that their names will everywhere be 

165 prefixed with ``ext_``. 

166 

167 Each property is indexed by name (`str`), with a corresponding 

168 `PropertyDefinition`. 

169 """ 

170 

171 _sky_observation_types: tuple[str, ...] = ("science", "object") 

172 """Observation types that correspond to an observation where the detector 

173 can see sky photons. This is used by the default implementation of 

174 ``can_see_sky`` determination.""" 

175 

176 _non_sky_observation_types: tuple[str, ...] = ("bias", "dark") 

177 """Observation types that correspond to an observation where the detector 

178 can not see sky photons. This is used by the default implementation of 

179 ``can_see_sky`` determination.""" 

180 

181 # Static typing requires that we define the standard dynamic properties 

182 # statically. Translator methods that refer to on-sky observations can 

183 # return None if the observation is a calibration. It seems that SDSS 

184 # can sometimes fail to calculate a detector_exposure_id so we allow None 

185 # there too. That is effectively a deprecated property anyhow. The 

186 # date calculations currently allow a value to not be found to allow 

187 # subclasses to try alternative options. 

188 if TYPE_CHECKING: 

189 to_telescope: ClassVar[Callable[[MetadataTranslator], str]] 

190 to_instrument: ClassVar[Callable[[MetadataTranslator], str]] 

191 to_location: ClassVar[Callable[[MetadataTranslator], astropy.coordinates.EarthLocation]] 

192 to_exposure_id: ClassVar[Callable[[MetadataTranslator], int]] 

193 to_visit_id: ClassVar[Callable[[MetadataTranslator], int]] 

194 to_physical_filter: ClassVar[Callable[[MetadataTranslator], str]] 

195 to_datetime_begin: ClassVar[Callable[[MetadataTranslator], astropy.time.Time | None]] 

196 to_datetime_end: ClassVar[Callable[[MetadataTranslator], astropy.time.Time | None]] 

197 to_exposure_time: ClassVar[Callable[[MetadataTranslator], u.Quantity]] 

198 to_dark_time: ClassVar[Callable[[MetadataTranslator], u.Quantity]] 

199 to_boresight_airmass: ClassVar[Callable[[MetadataTranslator], float | None]] 

200 to_boresight_rotation_angle: ClassVar[Callable[[MetadataTranslator], u.Quantity | None]] 

201 to_boresight_rotation_coord: ClassVar[Callable[[MetadataTranslator], str]] 

202 to_detector_num: ClassVar[Callable[[MetadataTranslator], int]] 

203 to_detector_name: ClassVar[Callable[[MetadataTranslator], str]] 

204 to_detector_serial: ClassVar[Callable[[MetadataTranslator], str]] 

205 to_detector_group: ClassVar[Callable[[MetadataTranslator], str | None]] 

206 to_detector_exposure_id: ClassVar[Callable[[MetadataTranslator], int | None]] 

207 to_object: ClassVar[Callable[[MetadataTranslator], str]] 

208 to_temperature: ClassVar[Callable[[MetadataTranslator], u.Quantity]] 

209 to_pressure: ClassVar[Callable[[MetadataTranslator], u.Quantity]] 

210 to_relative_humidity: ClassVar[Callable[[MetadataTranslator], float]] 

211 to_tracking_radec: ClassVar[Callable[[MetadataTranslator], astropy.coordinates.SkyCoord | None]] 

212 to_altaz_begin: ClassVar[Callable[[MetadataTranslator], astropy.coordinates.AltAz | None]] 

213 to_science_program: ClassVar[Callable[[MetadataTranslator], str]] 

214 to_observation_type: ClassVar[Callable[[MetadataTranslator], str]] 

215 to_observation_id: ClassVar[Callable[[MetadataTranslator], str]] 

216 

217 @classmethod 

218 def defined_in_this_class(cls, name: str) -> bool | None: 

219 """Report if the specified class attribute is defined specifically in 

220 this class. 

221 

222 Parameters 

223 ---------- 

224 name : `str` 

225 Name of the attribute to test. 

226 

227 Returns 

228 ------- 

229 in_class : `bool` 

230 `True` if there is a attribute of that name defined in this 

231 specific subclass. 

232 `False` if the method is not defined in this specific subclass 

233 but is defined in a parent class. 

234 Returns `None` if the attribute is not defined anywhere 

235 in the class hierarchy (which can happen if translators have 

236 typos in their mapping tables). 

237 

238 Notes 

239 ----- 

240 Retrieves the attribute associated with the given name. 

241 Then looks in all the parent classes to determine whether that 

242 attribute comes from a parent class or from the current class. 

243 Attributes are compared using :py:func:`id`. 

244 """ 

245 # The attribute to compare. 

246 if not hasattr(cls, name): 

247 return None 

248 attr_id = id(getattr(cls, name)) 

249 

250 # Get all the classes in the hierarchy 

251 mro = list(inspect.getmro(cls)) 

252 

253 # Remove the first entry from the list since that will be the 

254 # current class 

255 mro.pop(0) 

256 

257 for parent in mro: 

258 # Some attributes may only exist in subclasses. Skip base classes 

259 # that are missing the attribute (such as object). 

260 if hasattr(parent, name): 

261 if id(getattr(parent, name)) == attr_id: 

262 return False 

263 return True 

264 

265 @classmethod 

266 def _make_const_mapping(cls, property_key: str, constant: Any) -> Callable: 

267 """Make a translator method that returns a constant value. 

268 

269 Parameters 

270 ---------- 

271 property_key : `str` 

272 Name of the property to be calculated (for the docstring). 

273 constant : `str` or `numbers.Number` 

274 Value to return for this translator. 

275 

276 Returns 

277 ------- 

278 f : `~collections.abc.Callable` 

279 Function returning the constant. 

280 """ 

281 

282 def constant_translator(self: MetadataTranslator) -> Any: 

283 return constant 

284 

285 if property_key in cls.all_properties: 285 ↛ 289line 285 didn't jump to line 289 because the condition on line 285 was always true

286 property_doc = cls.all_properties[property_key].doc 

287 return_type = cls.all_properties[property_key].py_type 

288 else: 

289 return_type = type(constant) 

290 property_doc = f"Returns constant value for '{property_key}' property" 

291 

292 constant_translator.__annotations__["return"] = return_type 

293 if return_type.__module__ == "builtins": 

294 full_name = return_type.__name__ 

295 else: 

296 full_name = f"{return_type.__module__}.{return_type.__qualname__}" 

297 

298 constant_translator.__doc__ = f"""{textwrap.dedent(property_doc)} 

299 

300:returns: Translated property that is fixed to a single value by the translator. 

301:rtype: `{full_name}` 

302""" 

303 return constant_translator 

304 

305 @classmethod 

306 def _make_trivial_mapping( 

307 cls, 

308 property_key: str, 

309 header_key: str | Sequence[str], 

310 default: Any | None = None, 

311 minimum: Any | None = None, 

312 maximum: Any | None = None, 

313 unit: astropy.unit.Unit | None = None, 

314 checker: Callable | None = None, 

315 ) -> Callable: 

316 """Make a translator method returning a header value. 

317 

318 The header value can be converted to a `~astropy.units.Quantity` 

319 if desired, and can also have its value validated. 

320 

321 See `MetadataTranslator.validate_value()` for details on the use 

322 of default parameters. 

323 

324 Parameters 

325 ---------- 

326 property_key : `str` 

327 Name of the translator to be constructed (for the docstring). 

328 header_key : `str` or `list` of `str` 

329 Name of the key to look up in the header. If a `list` each 

330 key will be tested in turn until one matches. This can deal with 

331 header styles that evolve over time. 

332 default : `numbers.Number` or `astropy.units.Quantity`, `str`, optional 

333 If not `None`, default value to be used if the parameter read from 

334 the header is not defined or if the header is missing. 

335 minimum : `numbers.Number` or `astropy.units.Quantity`, optional 

336 If not `None`, and if ``default`` is not `None`, minimum value 

337 acceptable for this parameter. 

338 maximum : `numbers.Number` or `astropy.units.Quantity`, optional 

339 If not `None`, and if ``default`` is not `None`, maximum value 

340 acceptable for this parameter. 

341 unit : `astropy.units.Unit`, optional 

342 If not `None`, the value read from the header will be converted 

343 to a `~astropy.units.Quantity`. Only supported for numeric values. 

344 checker : `~collections.abc.Callable`, optional 

345 Callback function to be used by the translator method in case the 

346 keyword is not present. Function will be executed as if it is 

347 a method of the translator class. Running without raising an 

348 exception will allow the default to be used. Should usually raise 

349 `KeyError`. 

350 

351 Returns 

352 ------- 

353 t : `~collections.abc.Callable` 

354 Function implementing a translator with the specified 

355 parameters. 

356 """ 

357 if property_key in cls.all_properties: 357 ↛ 362line 357 didn't jump to line 362 because the condition on line 357 was always true

358 property_doc = cls.all_properties[property_key].doc 

359 return_type = cls.all_properties[property_key].str_type 

360 return_pytype = cls.all_properties[property_key].py_type 

361 else: 

362 return_type = "str` or `numbers.Number" 

363 return_pytype = Any 

364 property_doc = f"Map '{header_key}' header keyword to '{property_key}' property" 

365 

366 def trivial_translator(self: MetadataTranslator) -> Any: 

367 if unit is not None: 

368 q = self.quantity_from_card( 

369 header_key, unit, default=default, minimum=minimum, maximum=maximum, checker=checker 

370 ) 

371 # Convert to Angle if this quantity is an angle 

372 # The extra checks come from pylance complaining about the 

373 # else branch above assigning Any to return_pytype. 

374 if isinstance(return_pytype, type): 

375 return_pytype_cls = cast(type[Any], return_pytype) 

376 if issubclass(return_pytype_cls, Angle): 

377 q = Angle(q) 

378 return q 

379 

380 for key in ensure_iterable(header_key): 

381 if self.is_key_ok(key): 

382 value = self._header[key] 

383 if default is not None and not isinstance(value, str): 

384 value = self.validate_value(value, default, minimum=minimum, maximum=maximum) 

385 self._used_these_cards(key) 

386 break 

387 else: 

388 # No keywords found, use default, checking first, or raise 

389 # A None default is only allowed if a checker is provided. 

390 if checker is not None: 

391 try: 

392 checker(self) 

393 except Exception as e: 

394 raise KeyError(f"Could not find {header_key} in header") from e 

395 return default 

396 elif default is not None: 

397 value = default 

398 else: 

399 raise KeyError(f"Could not find {header_key} in header") 

400 

401 # If we know this is meant to be a string, force to a string. 

402 # Sometimes headers represent items as integers which generically 

403 # we want as strings (eg OBSID). Sometimes also floats are 

404 # written as "NaN" strings. 

405 casts = {"str": str, "float": float, "int": int} 

406 if return_type in casts and not isinstance(value, casts[return_type]) and value is not None: 

407 value = casts[return_type](value) 

408 

409 return value 

410 

411 trivial_translator.__annotations__["return"] = return_pytype 

412 

413 # Docstring inheritance means it is confusing to specify here 

414 # exactly which header value is being used. 

415 trivial_translator.__doc__ = f"""{textwrap.dedent(property_doc)} 

416 

417:returns: Translated value derived directly from a single header. 

418:rtype: `{return_type}` 

419""" 

420 return trivial_translator 

421 

422 @classmethod 

423 def __init_subclass__(cls, **kwargs: Any) -> None: 

424 """Register all subclasses with the base class and create dynamic 

425 translator methods. 

426 

427 The method provides two facilities. Firstly, every subclass 

428 of `MetadataTranslator` that includes a ``name`` class property is 

429 registered as a translator class that could be selected when automatic 

430 header translation is attempted. Only name translator subclasses that 

431 correspond to a complete instrument. Translation classes providing 

432 generic translation support for multiple instrument translators should 

433 not be named. 

434 

435 The second feature of this method is to convert simple translations 

436 to full translator methods. Sometimes a translation is fixed (for 

437 example a specific instrument name should be used) and rather than 

438 provide a full ``to_property()`` translation method the mapping can be 

439 defined in a class variable named ``_constMap``. Similarly, for 

440 one-to-one trivial mappings from a header to a property, 

441 ``_trivialMap`` can be defined. Trivial mappings are a dict mapping a 

442 generic property to either a header keyword, or a tuple consisting of 

443 the header keyword and a dict containing key value pairs suitable for 

444 the `MetadataTranslator.quantity_from_card` method. 

445 

446 Parameters 

447 ---------- 

448 **kwargs : `dict` 

449 Arbitrary parameters passed to parent class. 

450 """ 

451 super().__init_subclass__(**kwargs) 

452 

453 # Only register classes with declared names 

454 if hasattr(cls, "name") and cls.name is not None: 

455 if cls.name in MetadataTranslator.translators: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 log.warning( 

457 "%s: Replacing %s translator with %s", 

458 cls.name, 

459 MetadataTranslator.translators[cls.name], 

460 cls, 

461 ) 

462 MetadataTranslator.translators[cls.name] = cls 

463 

464 # Check that we have not inherited constant/trivial mappings from 

465 # parent class that we have already applied. Empty maps are always 

466 # assumed okay 

467 const_map = cls._const_map if cls._const_map and cls.defined_in_this_class("_const_map") else {} 

468 trivial_map = ( 

469 cls._trivial_map if cls._trivial_map and cls.defined_in_this_class("_trivial_map") else {} 

470 ) 

471 

472 # Check for shadowing 

473 trivials = set(trivial_map.keys()) 

474 constants = set(const_map.keys()) 

475 both = trivials & constants 

476 if both: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 log.warning("%s: defined in both const_map and trivial_map: %s", cls.__name__, ", ".join(both)) 

478 

479 all = trivials | constants 

480 for name in all: 

481 if cls.defined_in_this_class(f"to_{name}"): 481 ↛ 484line 481 didn't jump to line 484 because the condition on line 481 was never true

482 # Must be one of trivial or constant. If in both then constant 

483 # overrides trivial. 

484 location = "by _trivial_map" 

485 if name in constants: 

486 location = "by _const_map" 

487 log.warning( 

488 "%s: %s is defined explicitly but will be replaced %s", cls.__name__, name, location 

489 ) 

490 

491 properties = set(PROPERTIES) | {"ext_" + pp for pp in cls.extensions} 

492 cls.all_properties = dict(PROPERTIES) 

493 cls.all_properties.update({"ext_" + pp: dd for pp, dd in cls.extensions.items()}) 

494 

495 # Go through the trival mappings for this class and create 

496 # corresponding translator methods 

497 for property_key, header_key in trivial_map.items(): 

498 kwargs = {} 

499 if type(header_key) is tuple: 

500 kwargs = header_key[1] 

501 header_key = header_key[0] 

502 method = f"to_{property_key}" 

503 translator = cls._make_trivial_mapping(property_key, header_key, **kwargs) 

504 translator = cache_translation(translator, method=method) 

505 _set_method_metadata(translator, cls, method) 

506 setattr(cls, method, translator) 

507 if property_key not in properties: 507 ↛ 508line 507 didn't jump to line 508 because the condition on line 507 was never true

508 log.warning(f"Unexpected trivial translator for '{property_key}' defined in {cls}") 

509 

510 # Go through the constant mappings for this class and create 

511 # corresponding translator methods 

512 for property_key, constant in const_map.items(): 

513 translator = cls._make_const_mapping(property_key, constant) 

514 method = f"to_{property_key}" 

515 _set_method_metadata(translator, cls, method) 

516 setattr(cls, method, translator) 

517 if property_key not in properties: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 log.warning(f"Unexpected constant translator for '{property_key}' defined in {cls}") 

519 

520 def __init__(self, header: Mapping[str, Any], filename: ResourcePathExpression | None = None) -> None: 

521 self._header = header 

522 if filename is not None: 

523 filename = str(ResourcePath(filename, forceAbsolute=True)) 

524 self.filename = filename 

525 self._used_cards: set[str] = set() 

526 

527 # Prefix to use for warnings about failed translations 

528 self._log_prefix_cache: str | None = None 

529 

530 # Cache assumes header is read-only once stored in object 

531 self._translation_cache: dict[str, Any] = {} 

532 

533 @classmethod 

534 @abstractmethod 

535 def can_translate(cls, header: Mapping[str, Any], filename: str | None = None) -> bool: 

536 """Indicate whether this translation class can translate the 

537 supplied header. 

538 

539 Parameters 

540 ---------- 

541 header : `dict`-like 

542 Header to convert to standardized form. 

543 filename : `str`, optional 

544 Name of file being translated. 

545 

546 Returns 

547 ------- 

548 can : `bool` 

549 `True` if the header is recognized by this class. `False` 

550 otherwise. 

551 """ 

552 raise NotImplementedError() 

553 

554 @classmethod 

555 def can_translate_with_options( 

556 cls, header: Mapping[str, Any], options: dict[str, Any], filename: str | None = None 

557 ) -> bool: 

558 """Determine if a header can be translated with different criteria. 

559 

560 Parameters 

561 ---------- 

562 header : `dict`-like 

563 Header to convert to standardized form. 

564 options : `dict` 

565 Headers to try to determine whether this header can 

566 be translated by this class. If a card is found it will 

567 be compared with the expected value and will return that 

568 comparison. Each card will be tried in turn until one is 

569 found. 

570 filename : `str`, optional 

571 Name of file being translated. 

572 

573 Returns 

574 ------- 

575 can : `bool` 

576 `True` if the header is recognized by this class. `False` 

577 otherwise. 

578 

579 Notes 

580 ----- 

581 Intended to be used from within `can_translate` implementations 

582 for specific translators. Is not intended to be called directly 

583 from `determine_translator`. 

584 """ 

585 for card, value in options.items(): 

586 if card in header: 

587 return header[card] == value 

588 return False 

589 

590 @classmethod 

591 def determine_translator( 

592 cls, header: Mapping[str, Any], filename: str | None = None 

593 ) -> type[MetadataTranslator]: 

594 """Determine a translation class by examining the header. 

595 

596 Parameters 

597 ---------- 

598 header : `dict`-like 

599 Representation of a header. 

600 filename : `str`, optional 

601 Name of file being translated. 

602 

603 Returns 

604 ------- 

605 translator : `type` [`MetadataTranslator`] 

606 Translation class that knows how to extract metadata from 

607 the supplied header. 

608 

609 Raises 

610 ------ 

611 ValueError 

612 None of the registered translation classes understood the supplied 

613 header. 

614 """ 

615 file_msg = "" 

616 if filename is not None: 

617 file_msg = f" from {filename}" 

618 for name, trans in cls.translators.items(): 

619 if trans.can_translate(header, filename=filename): 

620 log.debug("Using translation class %s%s", name, file_msg) 

621 return trans 

622 

623 plugins = [p.name for p in entry_points(group="astro_metadata_translators")] 

624 plugin_msg = "" 

625 if plugins: 

626 plugin_names = ", ".join(plugins) 

627 plugin_msg = f". (available plugins: {plugin_names})" 

628 

629 raise ValueError( 

630 f"None of the registered translation classes {list(cls.translators.keys())}" 

631 f" understood this header{file_msg}{plugin_msg}" 

632 ) 

633 

634 @classmethod 

635 def get_translator_by_name(cls, translator_name: str) -> type[MetadataTranslator] | None: 

636 """Given the name of a translator, return the corresponding class. 

637 

638 Parameters 

639 ---------- 

640 translator_name : `str` 

641 The registered name of the translator. 

642 

643 Returns 

644 ------- 

645 translator : `type` [`MetadataTranslator`] or `None` 

646 Translation class corresponding to the supplied name. Returns 

647 `None` if the translator is not known. 

648 """ 

649 if translator_name in cls.translators: 

650 return cls.translators[translator_name] 

651 return None 

652 

653 @classmethod 

654 def translator_version(cls) -> str: 

655 """Return the version string for this translator class. 

656 

657 Returns 

658 ------- 

659 version : `str` 

660 String identifying the version of this translator. 

661 

662 Notes 

663 ----- 

664 Assumes that the version is available from the ``__version__`` 

665 variable in the parent module. If this is not the case a translator 

666 should subclass this method. 

667 """ 

668 if cls in _VERSION_CACHE: 

669 return _VERSION_CACHE[cls] 

670 

671 version = "unknown" 

672 module_name = cls.__module__ 

673 components = module_name.split(".") 

674 while components: 

675 # This class has already been imported so importing it 

676 # should work. 

677 module = importlib.import_module(".".join(components)) 

678 if hasattr(module, v := "__version__"): 

679 version = getattr(module, v) 

680 if version == "unknown": 

681 # LSST software will have a fingerprint 

682 if hasattr(module, v := "__fingerprint__"): 

683 version = getattr(module, v) 

684 break 

685 else: 

686 # Remove last component from module name and try again 

687 components.pop() 

688 

689 _VERSION_CACHE[cls] = version 

690 return version 

691 

692 @classmethod 

693 def fix_header( 

694 cls, header: MutableMapping[str, Any], instrument: str, obsid: str, filename: str | None = None 

695 ) -> bool: 

696 """Apply global fixes to a supplied header. 

697 

698 Parameters 

699 ---------- 

700 header : `dict` 

701 The header to correct. Correction is in place. 

702 instrument : `str` 

703 The name of the instrument. 

704 obsid : `str` 

705 Unique observation identifier associated with this header. 

706 Will always be provided. 

707 filename : `str`, optional 

708 Filename associated with this header. May not be set since headers 

709 can be fixed independently of any filename being known. 

710 

711 Returns 

712 ------- 

713 modified : `bool` 

714 `True` if a correction was applied. 

715 

716 Notes 

717 ----- 

718 This method is intended to support major discrepancies in headers 

719 such as: 

720 

721 * Periods of time where headers are known to be incorrect in some 

722 way that can be fixed either by deriving the correct value from 

723 the existing value or understanding the that correction is static 

724 for the given time. This requires that the date header is 

725 known. 

726 * The presence of a certain value is always wrong and should be 

727 corrected with a new static value regardless of date. 

728 

729 It is assumed that one off problems with headers have been applied 

730 before this method is called using the per-obsid correction system. 

731 

732 Usually called from `astro_metadata_translator.fix_header`. 

733 

734 For log messages, do not assume that the filename will be present. 

735 Always write log messages to fall back on using the ``obsid`` if 

736 ``filename`` is `None`. 

737 """ 

738 return False 

739 

740 @staticmethod 

741 def _construct_log_prefix(obsid: str, filename: str | None = None) -> str: 

742 """Construct a log prefix string from the obsid and filename. 

743 

744 Parameters 

745 ---------- 

746 obsid : `str` 

747 The observation identifier. 

748 filename : `str`, optional 

749 The filename associated with the header being translated. 

750 Can be `None`. 

751 """ 

752 if filename: 

753 return f"{filename}({obsid})" 

754 return obsid 

755 

756 @property 

757 def _log_prefix(self) -> str: 

758 """Return standard prefix that can be used for log messages to report 

759 useful context. 

760 

761 Will be either the filename and obsid, or just the obsid depending 

762 on whether a filename is known. 

763 

764 Returns 

765 ------- 

766 prefix : `str` 

767 The prefix to use. 

768 """ 

769 if self._log_prefix_cache is None: 

770 # Protect against the unfortunate event of the obsid failing to 

771 # be calculated. This should be rare but should not prevent a log 

772 # message from appearing. 

773 try: 

774 obsid = self.to_observation_id() 

775 except Exception: 

776 obsid = "unknown_obsid" 

777 self._log_prefix_cache = self._construct_log_prefix(obsid, self.filename) 

778 return self._log_prefix_cache 

779 

780 def _used_these_cards(self, *args: str) -> None: 

781 """Indicate that the supplied cards have been used for translation. 

782 

783 Parameters 

784 ---------- 

785 *args : sequence of `str` 

786 Keywords used to process a translation. 

787 """ 

788 self._used_cards.update(set(args)) 

789 

790 def cards_used(self) -> frozenset[str]: 

791 """Cards used during metadata extraction. 

792 

793 Returns 

794 ------- 

795 used : `frozenset` of `str` 

796 Cards used when extracting metadata. 

797 """ 

798 return frozenset(self._used_cards) 

799 

800 @staticmethod 

801 def validate_value( 

802 value: float, default: float, minimum: float | None = None, maximum: float | None = None 

803 ) -> float: 

804 """Validate the supplied value, returning a new value if out of range. 

805 

806 Parameters 

807 ---------- 

808 value : `float` 

809 Value to be validated. 

810 default : `float` 

811 Default value to use if supplied value is invalid or out of range. 

812 Assumed to be in the same units as the value expected in the 

813 header. 

814 minimum : `float` 

815 Minimum possible valid value, optional. If the calculated value 

816 is below this value, the default value will be used. 

817 maximum : `float` 

818 Maximum possible valid value, optional. If the calculated value 

819 is above this value, the default value will be used. 

820 

821 Returns 

822 ------- 

823 value : `float` 

824 Either the supplied value, or a default value. 

825 """ 

826 if value is None or math.isnan(value): 

827 value = default 

828 else: 

829 if minimum is not None and value < minimum: 

830 value = default 

831 elif maximum is not None and value > maximum: 

832 value = default 

833 return value 

834 

835 @staticmethod 

836 def is_keyword_defined(header: Mapping[str, Any], keyword: str | None) -> bool: 

837 """Return `True` if the value associated with the named keyword is 

838 present in the supplied header and defined. 

839 

840 Parameters 

841 ---------- 

842 header : `dict`-lik 

843 Header to use as reference. 

844 keyword : `str` 

845 Keyword to check against header. 

846 

847 Returns 

848 ------- 

849 is_defined : `bool` 

850 `True` if the header is present and not-`None`. `False` otherwise. 

851 """ 

852 if keyword is None or keyword not in header: 

853 return False 

854 

855 if header[keyword] is None: 

856 return False 

857 

858 # Special case Astropy undefined value 

859 if isinstance(header[keyword], astropy.io.fits.card.Undefined): 

860 return False 

861 

862 return True 

863 

864 def resource_root(self) -> tuple[str | None, str | None]: 

865 """Return package resource to use to locate correction resources within 

866 an installed package. 

867 

868 Returns 

869 ------- 

870 resource_package : `str` 

871 Package resource name. `None` if no package resource are to be 

872 used. 

873 resource_root : `str` 

874 The name of the resource root. `None` if no package resources 

875 are to be used. 

876 """ 

877 return (self.default_resource_package, self.default_resource_root) 

878 

879 def search_paths(self) -> list[str]: 

880 """Search paths to use when searching for header fix up correction 

881 files. 

882 

883 Returns 

884 ------- 

885 paths : `list` 

886 Directory paths to search. Can be an empty list if no special 

887 directories are defined. 

888 

889 Notes 

890 ----- 

891 Uses the classes ``default_search_path`` property if defined. 

892 """ 

893 if self.default_search_path is not None: 

894 return list(self.default_search_path) 

895 return [] 

896 

897 def is_key_ok(self, keyword: str | None) -> bool: 

898 """Return `True` if the value associated with the named keyword is 

899 present in this header and defined. 

900 

901 Parameters 

902 ---------- 

903 keyword : `str` 

904 Keyword to check against header. 

905 

906 Returns 

907 ------- 

908 is_ok : `bool` 

909 `True` if the header is present and not-`None`. `False` otherwise. 

910 """ 

911 return self.is_keyword_defined(self._header, keyword) 

912 

913 def are_keys_ok(self, keywords: Iterable[str]) -> bool: 

914 """Are the supplied keys all present and defined?. 

915 

916 Parameters 

917 ---------- 

918 keywords : iterable of `str` 

919 Keywords to test. 

920 

921 Returns 

922 ------- 

923 all_ok : `bool` 

924 `True` if all supplied keys are present and defined. 

925 """ 

926 for k in keywords: 

927 if not self.is_key_ok(k): 

928 return False 

929 return True 

930 

931 def quantity_from_card( 

932 self, 

933 keywords: str | Sequence[str], 

934 unit: u.Unit, 

935 default: float | None = None, 

936 minimum: float | None = None, 

937 maximum: float | None = None, 

938 checker: Callable | None = None, 

939 ) -> u.Quantity: 

940 """Calculate a Astropy Quantity from a header card and a unit. 

941 

942 Parameters 

943 ---------- 

944 keywords : `str` or `list` of `str` 

945 Keyword to use from header. If a list each keyword will be tried 

946 in turn until one matches. 

947 unit : `astropy.units.UnitBase` 

948 Unit of the item in the header. 

949 default : `float`, optional 

950 Default value to use if the header value is invalid. Assumed 

951 to be in the same units as the value expected in the header. If 

952 None, no default value is used. 

953 minimum : `float`, optional 

954 Minimum possible valid value, optional. If the calculated value 

955 is below this value, the default value will be used. 

956 maximum : `float`, optional 

957 Maximum possible valid value, optional. If the calculated value 

958 is above this value, the default value will be used. 

959 checker : `~collections.abc.Callable`, optional 

960 Callback function to be used by the translator method in case the 

961 keyword is not present. Function will be executed as if it is 

962 a method of the translator class. Running without raising an 

963 exception will allow the default to be used. Should usually raise 

964 `KeyError`. 

965 

966 Returns 

967 ------- 

968 q : `astropy.units.Quantity` 

969 Quantity representing the header value. 

970 

971 Raises 

972 ------ 

973 KeyError 

974 The supplied header key is not present. 

975 """ 

976 keyword_list = [keywords] if isinstance(keywords, str) else list(keywords) 

977 for k in keyword_list: 

978 if self.is_key_ok(k): 

979 value = self._header[k] 

980 keyword = k 

981 break 

982 else: 

983 if checker is not None: 

984 try: 

985 checker(self) 

986 value = default 

987 if value is not None: 

988 value = u.Quantity(value, unit=unit) 

989 return value 

990 except Exception: 

991 pass 

992 raise KeyError(f"Could not find {keywords} in header") 

993 if isinstance(value, str): 

994 # Sometimes the header has the wrong type in it but this must 

995 # be a number if we are creating a quantity. 

996 value = float(value) 

997 self._used_these_cards(keyword) 

998 if default is not None: 

999 value = self.validate_value(value, default, maximum=maximum, minimum=minimum) 

1000 return u.Quantity(value, unit=unit) 

1001 

1002 def _join_keyword_values(self, keywords: Iterable[str], delim: str = "+") -> str: 

1003 """Join values of all defined keywords with the specified delimiter. 

1004 

1005 Parameters 

1006 ---------- 

1007 keywords : iterable of `str` 

1008 Keywords to look for in header. 

1009 delim : `str`, optional 

1010 Character to use to join the values together. 

1011 

1012 Returns 

1013 ------- 

1014 joined : `str` 

1015 String formed from all the keywords found in the header with 

1016 defined values joined by the delimiter. Empty string if no 

1017 defined keywords found. 

1018 """ 

1019 values = [] 

1020 for k in keywords: 

1021 if self.is_key_ok(k): 

1022 values.append(self._header[k]) 

1023 self._used_these_cards(k) 

1024 

1025 if values: 

1026 joined = delim.join(str(v) for v in values) 

1027 else: 

1028 joined = "" 

1029 

1030 return joined 

1031 

1032 @cache_translation 

1033 def to_detector_unique_name(self) -> str: 

1034 """Return a unique name for the detector. 

1035 

1036 Base class implementation attempts to combine ``detector_name`` with 

1037 ``detector_group``. Group is only used if not `None`. 

1038 

1039 Can be over-ridden by specialist translator class. 

1040 

1041 Returns 

1042 ------- 

1043 name : `str` 

1044 ``detector_group``_``detector_name`` if ``detector_group`` is 

1045 defined, else the ``detector_name`` is assumed to be unique. 

1046 If neither return a valid value an exception is raised. 

1047 

1048 Raises 

1049 ------ 

1050 NotImplementedError 

1051 Raised if neither detector_name nor detector_group is defined. 

1052 """ 

1053 name = self.to_detector_name() 

1054 group = self.to_detector_group() 

1055 

1056 if group is None and name is None: 

1057 raise NotImplementedError("Can not determine unique name from detector_group and detector_name") 

1058 

1059 if group is not None: 

1060 return f"{group}_{name}" 

1061 

1062 return name 

1063 

1064 @cache_translation 

1065 def to_exposure_group(self) -> str | None: 

1066 """Return the group label associated with this exposure. 

1067 

1068 Base class implementation returns the ``exposure_id`` in string 

1069 form. A subclass may do something different. 

1070 

1071 Returns 

1072 ------- 

1073 name : `str` 

1074 The ``exposure_id`` converted to a string. 

1075 """ 

1076 exposure_id = self.to_exposure_id() 

1077 if exposure_id is None: 

1078 # mypy does not think this can ever happen but play it safe 

1079 # with subclasses. 

1080 return None # type: ignore 

1081 else: 

1082 return str(exposure_id) 

1083 

1084 @cache_translation 

1085 def to_observation_reason(self) -> str: 

1086 """Return the reason this observation was taken. 

1087 

1088 Base class implementation returns the ``science`` if the 

1089 ``observation_type`` is science, else ``unknown``. 

1090 A subclass may do something different. 

1091 

1092 Returns 

1093 ------- 

1094 name : `str` 

1095 The reason for this observation. 

1096 """ 

1097 obstype = self.to_observation_type() 

1098 if obstype == "science": 

1099 return "science" 

1100 return "unknown" 

1101 

1102 @cache_translation 

1103 def to_exposure_time_requested(self) -> astropy.units.Quantity: 

1104 """Return the requested exposure time in seconds. 

1105 

1106 Base class implementations returns the same value as ``exposure_time``. 

1107 This information may not be available for all instruments. 

1108 

1109 Returns 

1110 ------- 

1111 exptime : `astropy.units.Quantity` 

1112 The recorded exposure time in seconds. 

1113 """ 

1114 return self.to_exposure_time() 

1115 

1116 @cache_translation 

1117 def to_altaz_end(self) -> astropy.coordinates.AltAz | None: 

1118 """Return the AltAz for the end of the observation. 

1119 

1120 Base class implementation returns `None`. Subclasses should override 

1121 if the value is known. 

1122 

1123 Returns 

1124 ------- 

1125 altaz : `astropy.coordinates.AltAz` or `None` 

1126 The AltAz for the end of the observation. 

1127 """ 

1128 return None 

1129 

1130 @classmethod 

1131 def observing_date_to_offset(cls, observing_date: astropy.time.Time) -> astropy.time.TimeDelta | None: 

1132 """Calculate the observing day offset to apply for a given observation. 

1133 

1134 In some cases the definition of the observing day offset has changed 

1135 during the lifetime of the instrument. For example lab data might 

1136 have a different offset to that when the instrument is on the 

1137 telescope. 

1138 

1139 Parameters 

1140 ---------- 

1141 observing_date : `astropy.time.Time` 

1142 The observation date. 

1143 

1144 Returns 

1145 ------- 

1146 offset : `astropy.time.TimeDelta` or `None` 

1147 The offset to apply when calculating the observing day for a 

1148 specific time of observation. `None` implies the offset 

1149 is not known for that date. 

1150 """ 

1151 return None 

1152 

1153 @classmethod 

1154 def observing_date_to_observing_day( 

1155 cls, observing_date: astropy.time.Time, offset: astropy.time.TimeDelta | int | None 

1156 ) -> int: 

1157 """Return the YYYYMMDD integer corresponding to the observing day. 

1158 

1159 The offset is subtracted from the time of observation before 

1160 calculating the year, month and day. 

1161 

1162 Parameters 

1163 ---------- 

1164 observing_date : `astropy.time.Time` 

1165 The observation date. 

1166 offset : `astropy.time.TimeDelta` | `numbers.Real` | None 

1167 The offset to subtract from the observing date when calculating 

1168 the observing day. If a plain number is given it is taken to be 

1169 in units of seconds. If `None` no offset is applied. 

1170 

1171 Returns 

1172 ------- 

1173 day : `int` 

1174 The observing day as an integer of form YYYYMMDD. 

1175 

1176 Notes 

1177 ----- 

1178 For example, if the offset is +12 hours both 2023-07-06T13:00 and 

1179 2023-07-07T11:00 will return an observing day of 20230706 because 

1180 the observing day goes from 2023-07-06T12:00 to 2023-07-07T12:00. 

1181 """ 

1182 observing_date = observing_date.tai 

1183 if offset: 

1184 if isinstance(offset, numbers.Real): 

1185 offset = astropy.time.TimeDelta(offset, format="sec", scale="tai") 

1186 observing_date -= offset 

1187 return int(observing_date.strftime("%Y%m%d")) 

1188 

1189 @cache_translation 

1190 def to_observing_day_offset(self) -> astropy.time.TimeDelta | None: 

1191 """Return the offset required to calculate observing day. 

1192 

1193 Base class implementation returns `None`. 

1194 

1195 Returns 

1196 ------- 

1197 offset : `astropy.time.TimeDelta` or `None` 

1198 The offset to apply. Returns `None` if the offset is not defined. 

1199 

1200 Notes 

1201 ----- 

1202 This offset must be subtracted from a time of observation to calculate 

1203 the observing day. This offset must be added to the YYYYMMDDT00:00 

1204 observing day to calculate the time span coverage of the observing day. 

1205 """ 

1206 datetime_begin = self.to_datetime_begin() 

1207 if datetime_begin is None: 

1208 return None 

1209 return self.observing_date_to_offset(datetime_begin) 

1210 

1211 @cache_translation 

1212 def to_observing_day(self) -> int: 

1213 """Return the YYYYMMDD integer corresponding to the observing day. 

1214 

1215 Returns 

1216 ------- 

1217 day : `int` 

1218 The observing day as an integer of form YYYYMMDD. If the header 

1219 is broken and is unable to obtain a date of observation, ``0`` 

1220 is returned and the assumption is made that the problem will 

1221 be caught elsewhere. 

1222 

1223 Notes 

1224 ----- 

1225 Base class implementation uses the TAI date of the start of the 

1226 observation corrected by the observing day offset. If that offset 

1227 is `None` no offset will be applied. 

1228 

1229 The offset is subtracted from the time of observation before 

1230 calculating the year, month and day. 

1231 

1232 For example, if the offset is +12 hours both 2023-07-06T13:00 and 

1233 2023-07-07T11:00 will return an observing day of 20230706 because 

1234 the observing day goes from 2023-07-06T12:00 to 2023-07-07T12:00. 

1235 """ 

1236 datetime_begin = self.to_datetime_begin() 

1237 if datetime_begin is None: 

1238 return 0 

1239 offset = self.to_observing_day_offset() 

1240 return self.observing_date_to_observing_day(datetime_begin.tai, offset) 

1241 

1242 @cache_translation 

1243 def to_observation_counter(self) -> int: 

1244 """Return an integer corresponding to how this observation relates 

1245 to other observations. 

1246 

1247 Returns 

1248 ------- 

1249 sequence : `int` 

1250 The observation counter. Always ``0`` for this implementation. 

1251 

1252 Notes 

1253 ----- 

1254 Base class implementation returns ``0`` to indicate that it is not 

1255 known how an observatory will define a counter. Some observatories 

1256 may not use the concept, others may use a counter that increases 

1257 for every observation taken for that instrument, and others may 

1258 define it to be a counter within an observing day. 

1259 """ 

1260 return 0 

1261 

1262 @cache_translation 

1263 def to_group_counter_start(self) -> int: 

1264 """Return the observation counter of the observation that began 

1265 this group. 

1266 

1267 The definition of the relevant group is up to the metadata 

1268 translator. It can be the first observation in the exposure_group 

1269 or the first observation in the visit, but must be derivable 

1270 from the metadata of this observation. 

1271 

1272 Returns 

1273 ------- 

1274 counter : `int` 

1275 The observation counter for the start of the relevant group. 

1276 Default implementation always returns the observation counter 

1277 of this observation. 

1278 """ 

1279 return self.to_observation_counter() 

1280 

1281 @cache_translation 

1282 def to_group_counter_end(self) -> int: 

1283 """Return the observation counter of the observation that ends 

1284 this group. 

1285 

1286 The definition of the relevant group is up to the metadata 

1287 translator. It can be the last observation in the exposure_group 

1288 or the last observation in the visit, but must be derivable 

1289 from the metadata of this observation. It is of course possible 

1290 that the last observation in the group does not exist if a sequence 

1291 of observations was not completed. 

1292 

1293 Returns 

1294 ------- 

1295 counter : `int` 

1296 The observation counter for the end of the relevant group. 

1297 Default implementation always returns the observation counter 

1298 of this observation. 

1299 """ 

1300 return self.to_observation_counter() 

1301 

1302 @cache_translation 

1303 def to_has_simulated_content(self) -> bool: 

1304 """Return a boolean indicating whether any part of the observation 

1305 was simulated. 

1306 

1307 Returns 

1308 ------- 

1309 is_simulated : `bool` 

1310 `True` if this exposure has simulated content. This can be 

1311 if some parts of the metadata or data were simulated. Default 

1312 implementation always returns `False`. 

1313 """ 

1314 return False 

1315 

1316 @cache_translation 

1317 def to_focus_z(self) -> u.Quantity: 

1318 """Return a default defocal distance of 0.0 mm if there is no 

1319 keyword for defocal distance in the header. The default 

1320 keyword for defocal distance is ``FOCUSZ``. 

1321 

1322 Returns 

1323 ------- 

1324 focus_z: `astropy.units.Quantity` 

1325 The defocal distance from header or the 0.0mm default. 

1326 """ 

1327 return 0.0 * u.mm 

1328 

1329 @cache_translation 

1330 def to_can_see_sky(self) -> bool | None: 

1331 """Return whether the observation can see the sky or not. 

1332 

1333 Returns 

1334 ------- 

1335 can_see_sky : `bool` or `None` 

1336 `True` if the detector is receiving photons from the sky. 

1337 `False` if the sky is not visible to the detector. 

1338 `None` if the metadata translator does not know one way or the 

1339 other. 

1340 

1341 Notes 

1342 ----- 

1343 The base class translator uses a simple heuristic of returning 

1344 `True` if the observation type is "science" or "object" and `False` 

1345 if the observation type is "bias" or "dark". For all other cases it 

1346 will return `None`. 

1347 """ 

1348 obs_type = self.to_observation_type() 

1349 if obs_type is not None: 

1350 obs_type = obs_type.lower() 

1351 

1352 if obs_type in self._sky_observation_types: 

1353 return True 

1354 if obs_type in self._non_sky_observation_types: 

1355 return False 

1356 return None 

1357 

1358 @classmethod 

1359 def determine_translatable_headers( 

1360 cls, filename: ResourcePathExpression, primary: MutableMapping[str, Any] | None = None 

1361 ) -> Iterator[MutableMapping[str, Any]]: 

1362 """Given a file return all the headers usable for metadata translation. 

1363 

1364 This method can optionally be given a header from the file. This 

1365 header will generally be the primary header or a merge of the first 

1366 two headers. 

1367 

1368 In the base class implementation it is assumed that 

1369 this supplied header is the only useful header for metadata translation 

1370 and it will be returned unchanged if given. This can avoid 

1371 unnecessarily re-opening the file and re-reading the header when the 

1372 content is already known. 

1373 

1374 If no header is supplied, a header will be read from the supplied 

1375 file using `~.file_helpers.read_basic_metadata_from_file`, allowing it 

1376 to merge the primary and secondary header of a multi-extension FITS 

1377 file. Subclasses can read the header from the data file using whatever 

1378 technique is best for that instrument. 

1379 

1380 Subclasses can return multiple headers and ignore the externally 

1381 supplied header. They can also merge it with another header and return 

1382 a new derived header if that is required by the particular data file. 

1383 There is no requirement for the supplied header to be used. 

1384 

1385 Parameters 

1386 ---------- 

1387 filename : `str` or `lsst.resources.ResourcePathExpression` 

1388 Path to a file in a format understood by this translator. 

1389 primary : `dict`-like, optional 

1390 The primary header obtained by the caller. This is sometimes 

1391 already known, for example if a system is trying to bootstrap 

1392 without already knowing what data is in the file. For many 

1393 instruments where the primary header is the only relevant 

1394 header, the primary header will be returned with no further 

1395 action. 

1396 

1397 Yields 

1398 ------ 

1399 headers : iterator of `dict`-like 

1400 A header usable for metadata translation. For this base 

1401 implementation it will be either the supplied primary header 

1402 or a header read from the file. This implementation will only 

1403 ever yield a single header. 

1404 

1405 Notes 

1406 ----- 

1407 Each translator class can have code specifically tailored to its 

1408 own file format. It is important not to call this method with 

1409 an incorrect translator class. The normal paradigm is for the 

1410 caller to have read the first header and then called 

1411 `determine_translator()` on the result to work out which translator 

1412 class to then call to obtain the real headers to be used for 

1413 translation. 

1414 """ 

1415 if primary is not None: 

1416 yield primary 

1417 else: 

1418 # Prevent circular import by deferring 

1419 from .file_helpers import read_basic_metadata_from_file 

1420 

1421 # Merge primary and secondary header if they exist. 

1422 header = read_basic_metadata_from_file(filename, -1) 

1423 assert header is not None # for mypy since can_raise=True 

1424 yield header 

1425 

1426 

1427def _make_abstract_translator_method( 

1428 property: str, doc: str, return_typedoc: str, return_type: type 

1429) -> Callable: 

1430 """Create a an abstract translation method for this property. 

1431 

1432 Parameters 

1433 ---------- 

1434 property : `str` 

1435 Name of the translator for property to be created. 

1436 doc : `str` 

1437 Description of the property. 

1438 return_typedoc : `str` 

1439 Type string of this property (used in the doc string). 

1440 return_type : `type` 

1441 Type of this property. 

1442 

1443 Returns 

1444 ------- 

1445 m : `~collections.abc.Callable` 

1446 Translator method for this property. 

1447 """ 

1448 

1449 def to_property(self: MetadataTranslator) -> None: 

1450 raise NotImplementedError(f"Translator for '{property}' undefined.") 

1451 

1452 to_property.__doc__ = f"""Return value of {property} from headers. 

1453 

1454{textwrap.dedent(doc)} 

1455 

1456:returns: The translated property. 

1457:rtype: `{return_typedoc}` 

1458 """ 

1459 return to_property 

1460 

1461 

1462# Make abstract methods for all the translators methods. 

1463# Unfortunately registering them as abstractmethods does not work 

1464# as these assignments come after the class has been created. 

1465# Assigning to __abstractmethods__ directly does work but interacts 

1466# poorly with the metaclass automatically generating methods from 

1467# _trivialMap and _constMap. 

1468# Note that subclasses that provide extension properties are assumed to not 

1469# need abstract methods created for them. 

1470 

1471# Allow for concrete translator methods to exist in the base class 

1472# These translator methods can be defined in terms of other properties 

1473CONCRETE = set() 

1474 

1475for name, definition in PROPERTIES.items(): 

1476 method = f"to_{name}" 

1477 if not MetadataTranslator.defined_in_this_class(method): 

1478 func = _make_abstract_translator_method(name, definition.doc, definition.str_type, definition.py_type) 

1479 _set_method_metadata(func, MetadataTranslator, method) 

1480 setattr(MetadataTranslator, method, abstractmethod(func)) 

1481 else: 

1482 CONCRETE.add(method) 

1483 

1484 

1485class StubTranslator(MetadataTranslator): 

1486 """Translator where all the translations are stubbed out and issue 

1487 warnings. 

1488 

1489 This translator can be used as a base class whilst developing a new 

1490 translator. It allows testing to proceed without being required to fully 

1491 define all translation methods. Once complete the class should be 

1492 removed from the inheritance tree. 

1493 """ 

1494 

1495 pass 

1496 

1497 

1498def _make_forwarded_stub_translator_method( 

1499 cls_: type[MetadataTranslator], property: str, doc: str, return_typedoc: str, return_type: type 

1500) -> Callable: 

1501 """Create a stub translation method for this property that calls the 

1502 base method and catches `NotImplementedError`. 

1503 

1504 Parameters 

1505 ---------- 

1506 cls_ : `type` 

1507 Class to use when referencing `super()`. This would usually be 

1508 `StubTranslator`. 

1509 property : `str` 

1510 Name of the translator for property to be created. 

1511 doc : `str` 

1512 Description of the property. 

1513 return_typedoc : `str` 

1514 Type string of this property (used in the doc string). 

1515 return_type : `type` 

1516 Type of this property. 

1517 

1518 Returns 

1519 ------- 

1520 m : `~collections.abc.Callable` 

1521 Stub translator method for this property. 

1522 """ 

1523 method = f"to_{property}" 

1524 

1525 def to_stub(self: MetadataTranslator) -> Any: 

1526 parent = getattr(super(cls_, self), method, None) 

1527 try: 

1528 if parent is not None: 

1529 return parent() 

1530 except NotImplementedError: 

1531 pass 

1532 

1533 warnings.warn( 

1534 f"Please implement translator for property '{property}' for translator {self}", stacklevel=3 

1535 ) 

1536 return None 

1537 

1538 to_stub.__doc__ = f"""Unimplemented forwarding translator for {property}. 

1539 

1540{textwrap.dedent(doc)} 

1541 

1542Calls the base class translation method and if that fails with 

1543`NotImplementedError` issues a warning reminding the implementer to 

1544override this method. 

1545 

1546:returns: Always returns `None`. 

1547:rtype: `None` or `{return_typedoc}` 

1548""" 

1549 return to_stub 

1550 

1551 

1552# Create stub translation methods for each property. These stubs warn 

1553# rather than fail and should be overridden by translators. 

1554for name in PROPERTIES: 

1555 definition = PROPERTIES[name] 

1556 method = f"to_{name}" 

1557 func = _make_forwarded_stub_translator_method( 

1558 StubTranslator, # type: ignore 

1559 name, 

1560 definition.doc, 

1561 definition.str_type, 

1562 definition.py_type, 

1563 ) 

1564 _set_method_metadata(func, StubTranslator, method) 

1565 setattr(StubTranslator, method, func)