Coverage for python / lsst / obs / lsst / translators / lsst.py: 23%
451 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:11 +0000
1# This file is currently part of obs_lsst but is written to allow it
2# to be migrated to the astro_metadata_translator package at a later date.
3#
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file in this directory for details of code ownership.
7#
8# Use of this source code is governed by a 3-clause BSD-style
9# license that can be found in the LICENSE file.
11"""Metadata translation support code for LSST headers"""
13__all__ = ("TZERO", "SIMONYI_LOCATION", "read_detector_ids",
14 "compute_detector_exposure_id_generic", "LsstBaseTranslator",
15 "SIMONYI_TELESCOPE")
17import yaml
18import logging
19import re
20import datetime
21import hashlib
23import astropy.coordinates
24import astropy.units as u
25from astropy.time import Time, TimeDelta
26from astropy.coordinates import EarthLocation
28from lsst.resources import ResourcePath, ResourcePathExpression
30from astro_metadata_translator import cache_translation, FitsTranslator
31from astro_metadata_translator.translators.helpers import tracking_from_degree_headers, \
32 altaz_from_degree_headers
35TZERO = Time("2015-01-01T00:00", format="isot", scale="utc")
36TZERO_DATETIME = TZERO.to_datetime()
38# Delimiter to use for multiple filters/gratings
39FILTER_DELIMITER = "~"
41# Regex to use for parsing a GROUPID string
42GROUP_RE = re.compile(r"^(\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d)\.(\d\d\d)(?:[\+#](\d+))?$")
44# LSST Default location in the absence of headers
45SIMONYI_LOCATION = EarthLocation.from_geodetic(-70.749417, -30.244639, 2663.0)
47# Name of the main survey telescope
48SIMONYI_TELESCOPE = "Simonyi Survey Telescope"
50# Supported controller codes.
51# The order here directly relates to the resulting exposure ID
52# calculation. Do not reorder. Add new ones to the end.
53# OCS, CCS, pHosim, P for simulated OCS, Q for simulated CCS, S for
54# simulated images.
55SIMULATED_CONTROLLERS = "HPQS"
56CONTROLLERS = "OC" + SIMULATED_CONTROLLERS
58# Number of decimal digits allocated to the sequence number in exposure_ids.
59_SEQNUM_MAXDIGITS = 5
61# Number of decimal digits allocated to the day of observation (and controller
62# code) in exposure_ids.
63_DAYOBS_MAXDIGITS = 8
65# Value added to day_obs for controllers after the default.
66_CONTROLLER_INCREMENT = 1000_00_00
68# Number of decimal digits used by exposure_ids.
69EXPOSURE_ID_MAXDIGITS = _SEQNUM_MAXDIGITS + _DAYOBS_MAXDIGITS
71log = logging.getLogger(__name__)
74def read_detector_ids(policyFile: ResourcePathExpression):
75 """Read a camera policy file and retrieve the mapping from CCD name
76 to ID.
78 Parameters
79 ----------
80 policyFile : `lsst.resources.ResourcePathExpression`
81 Name of YAML policy file to read, relative to the resources root
82 directory. This usually means that the "policy" directory is included.
84 Returns
85 -------
86 mapping : `dict` of `str` to (`int`, `str`)
87 A `dict` with keys being the full names of the detectors, and the
88 value is a `tuple` containing the integer detector number and the
89 detector serial number.
91 Notes
92 -----
93 Reads the camera YAML definition file directly and extracts just the
94 IDs and serials. This routine does not use the standard
95 `~lsst.obs.base.yamlCamera.YAMLCamera` infrastructure or
96 `lsst.afw.cameraGeom`. This is because the translators are intended to
97 have minimal dependencies on LSST infrastructure.
98 """
99 root = ResourcePath("resource://lsst.obs.lsst/resources/", forceDirectory=True)
100 resource = ResourcePath(policyFile, root=root)
101 try:
102 # Use the fast parser since these files are large
103 camera = yaml.load(resource.read(), Loader=yaml.CSafeLoader)
104 except OSError as e:
105 raise ValueError(f"Could not load camera policy file {resource}") from e
107 mapping = {}
108 for ccd, value in camera["CCDs"].items():
109 mapping[ccd] = (int(value["id"]), value["serial"])
111 return mapping
114def compute_detector_exposure_id_generic(exposure_id, detector_num, max_num):
115 """Compute the detector_exposure_id from the exposure id and the
116 detector number.
118 Parameters
119 ----------
120 exposure_id : `int`
121 The exposure ID.
122 detector_num : `int`
123 The detector number.
124 max_num : `int`
125 Maximum number of detectors to make space for.
127 Returns
128 -------
129 detector_exposure_id : `int`
130 Computed ID.
132 Raises
133 ------
134 ValueError
135 The detector number is out of range.
136 """
138 if detector_num is None:
139 raise ValueError("Detector number must be defined.")
140 if detector_num >= max_num or detector_num < 0:
141 raise ValueError(f"Detector number out of range 0 <= {detector_num} < {max_num}")
143 return max_num*exposure_id + detector_num
146class LsstBaseTranslator(FitsTranslator):
147 """Translation methods useful for all LSST-style headers."""
149 _const_map = {}
150 _trivial_map = {}
151 default_resource_package = "lsst.obs.lsst"
152 default_resource_root = "resources/corrections/"
154 # Do not specify a name for this translator
155 cameraPolicyFile = None
156 """Path to policy file relative to obs_lsst root."""
158 detectorMapping = None
159 """Mapping of detector name to detector number and serial."""
161 detectorSerials = None
162 """Mapping of detector serial number to raft, number, and name."""
164 DETECTOR_MAX = 1000
165 """Maximum number of detectors to use when calculating the
166 detector_exposure_id.
168 Note that because this is the maximum number *of* detectors, for
169 zero-based ``detector_num`` values this is one greater than the maximum
170 ``detector_num``. It is also often rounded up to the nearest power of
171 10 anyway, to allow ``detector_exposure_id`` values to be easily decoded by
172 humans.
173 """
175 _DEFAULT_LOCATION = SIMONYI_LOCATION
176 """Default telescope location in absence of relevant FITS headers."""
178 _ROLLOVER_TIME = TimeDelta(12*60*60, scale="tai", format="sec")
179 """Time delta for the definition of a Rubin Observatory start of day.
180 Used when the header is missing. See LSE-400 or SITCOMTN-032 for details.
181 """
183 _non_sky_observation_types: tuple[str, ...] = ("bias", "dark", "flat")
184 """Observation types that correspond to an observation where the detector
185 can not see sky photons.
186 """
188 _can_check_obstype_for_can_see_sky: bool = True
189 """If can_see_sky can not be determined, allow usage of observation type
190 if `True`.
191 """
193 @classmethod
194 def __init_subclass__(cls, **kwargs):
195 """Ensure that subclasses clear their own detector mapping entries
196 such that subclasses of translators that use detector mappings
197 do not pick up the incorrect values from a parent."""
199 cls.detectorMapping = None
200 cls.detectorSerials = None
202 super().__init_subclass__(**kwargs)
204 @classmethod
205 def observing_date_to_offset(cls, observing_date: astropy.time.Time) -> astropy.time.TimeDelta | None:
206 """Return the offset to use when calculating the observing day.
208 Parameters
209 ----------
210 observing_date : `astropy.time.Time`
211 The date of the observation. Unused.
213 Returns
214 -------
215 offset : `astropy.time.TimeDelta`
216 The offset to apply. The default implementation returns a fixed
217 number but subclasses can return a different value depending
218 on whether the instrument is in the instrument lab or on the
219 mountain.
220 """
221 return cls._ROLLOVER_TIME
223 @classmethod
224 def compute_detector_exposure_id(cls, exposure_id, detector_num):
225 """Compute the detector exposure ID from detector number and
226 exposure ID.
228 This is a helper method to allow code working outside the translator
229 infrastructure to use the same algorithm.
231 Parameters
232 ----------
233 exposure_id : `int`
234 Unique exposure ID.
235 detector_num : `int`
236 Detector number.
238 Returns
239 -------
240 detector_exposure_id : `int`
241 The calculated ID.
242 """
243 from .._packer import RubinDimensionPacker
245 config = RubinDimensionPacker.ConfigClass()
246 config.use_controllers()
247 return RubinDimensionPacker.pack_id_pair(exposure_id, detector_num, config=config)
249 @classmethod
250 def max_detector_exposure_id(cls):
251 """The maximum detector exposure ID expected to be generated by
252 this instrument.
254 Returns
255 -------
256 max_id : `int`
257 The maximum value.
258 """
259 max_exposure_id = cls.max_exposure_id()
260 # We subtract 1 from DETECTOR_MAX because LSST detector_num values are
261 # zero-based, and detector_max is the maximum number *of* detectors,
262 # while this returns the (inclusive) maximum ID value.
263 return cls.compute_detector_exposure_id(max_exposure_id, cls.DETECTOR_MAX - 1)
265 @classmethod
266 def max_exposure_id(cls):
267 """The maximum exposure ID expected from this instrument.
269 Returns
270 -------
271 max_exposure_id : `int`
272 The maximum value.
274 Notes
275 -----
276 The value is hard-coded to reflect historical values that were used
277 for various controllers before the sequence counter was unified.
278 """
279 # Assumes maximum observing date of 2050-12-31, 99,999 exposures per
280 # day and 6 controllers.
281 return 7050123199999
283 @classmethod
284 def detector_mapping(cls):
285 """Returns the mapping of full name to detector ID and serial.
287 Returns
288 -------
289 mapping : `dict` of `str`:`tuple`
290 Returns the mapping of full detector name (group+detector)
291 to detector number and serial.
293 Raises
294 ------
295 ValueError
296 Raised if no camera policy file has been registered with this
297 translation class.
299 Notes
300 -----
301 Will construct the mapping if none has previously been constructed.
302 """
303 if cls.cameraPolicyFile is not None:
304 if cls.detectorMapping is None:
305 cls.detectorMapping = read_detector_ids(cls.cameraPolicyFile)
306 else:
307 raise ValueError(f"Translation class '{cls.__name__}' has no registered camera policy file")
309 return cls.detectorMapping
311 @classmethod
312 def detector_serials(cls):
313 """Obtain the mapping of detector serial to detector group, name,
314 and number.
316 Returns
317 -------
318 info : `dict` of `tuple` of (`str`, `str`, `int`)
319 A `dict` with the serial numbers as keys and values of detector
320 group, name, and number.
321 """
322 if cls.detectorSerials is None:
323 detector_mapping = cls.detector_mapping()
325 if detector_mapping is not None:
326 # Form mapping to go from serial number to names/numbers
327 serials = {}
328 for fullname, (id, serial) in cls.detectorMapping.items():
329 raft, detector_name = fullname.split("_")
330 if serial in serials:
331 raise RuntimeError(f"Serial {serial} is defined in multiple places")
332 serials[serial] = (raft, detector_name, id)
333 cls.detectorSerials = serials
334 else:
335 raise RuntimeError("Unable to obtain detector mapping information")
337 return cls.detectorSerials
339 @classmethod
340 def compute_detector_num_from_name(cls, detector_group, detector_name):
341 """Helper method to return the detector number from the name.
343 Parameters
344 ----------
345 detector_group : `str`
346 Name of the detector grouping. This is generally the raft name.
347 detector_name : `str`
348 Detector name.
350 Returns
351 -------
352 num : `int`
353 Detector number.
354 """
355 fullname = f"{detector_group}_{detector_name}"
357 num = None
358 detector_mapping = cls.detector_mapping()
359 if detector_mapping is None:
360 raise RuntimeError("Unable to obtain detector mapping information")
362 if fullname in detector_mapping:
363 num = detector_mapping[fullname]
364 else:
365 log.warning(f"Unable to determine detector number from detector name {fullname}")
366 return None
368 return num[0]
370 @classmethod
371 def compute_detector_info_from_serial(cls, detector_serial):
372 """Helper method to return the detector information from the serial.
374 Parameters
375 ----------
376 detector_serial : `str`
377 Detector serial ID.
379 Returns
380 -------
381 info : `tuple` of (`str`, `str`, `int`)
382 Detector group, name, and number.
383 """
384 serial_mapping = cls.detector_serials()
385 if serial_mapping is None:
386 raise RuntimeError("Unable to obtain serial mapping information")
388 if detector_serial in serial_mapping:
389 info = serial_mapping[detector_serial]
390 else:
391 raise RuntimeError("Unable to determine detector information from detector serial"
392 f" {detector_serial}")
394 return info
396 @staticmethod
397 def compute_exposure_id(dayobs, seqnum, controller=None):
398 """Helper method to calculate the exposure_id.
400 Parameters
401 ----------
402 dayobs : `str` or `int`
403 Day of observation in either YYYYMMDD or YYYY-MM-DD format.
404 If the string looks like ISO format it will be truncated before the
405 ``T`` before being handled.
406 seqnum : `int` or `str`
407 Sequence number.
408 controller : `str`, optional
409 Controller to use. If this is "O", no change is made to the
410 exposure ID. Before Oct 5 2023, if it is "C" a 1000 is added to the
411 year component of the exposure ID. If it is "H" a 2000 is added to
412 the year component. Before Apr 18 2025, this sequence continues
413 with "P", "Q", and "S" controllers. `None` indicates that the
414 controller is not relevant to the exposure ID calculation
415 (generally this is the case for test stand data).
417 Returns
418 -------
419 exposure_id : `int`
420 Exposure ID in form YYYYMMDDnnnnn form.
421 """
422 if isinstance(seqnum, str):
423 seqnum = int(seqnum)
424 # We really want an integer but the checks require a str.
425 if isinstance(dayobs, int):
426 dayobs = str(dayobs)
428 if "T" in dayobs:
429 dayobs = dayobs[:dayobs.find("T")]
431 dayobs = dayobs.replace("-", "")
433 if len(dayobs) != 8:
434 raise ValueError(f"Malformed dayobs: {dayobs}")
436 # Expect no more than 99,999 exposures in a day
437 if seqnum >= 10**_SEQNUM_MAXDIGITS:
438 raise ValueError(f"Sequence number ({seqnum}) exceeds limit")
440 dayobs = int(dayobs)
441 if dayobs > 20231004 and controller == "C":
442 # As of this date the CCS controller has a unified counter
443 # with the OCS, so there is no need to adjust the dayobs
444 # to make unique exposure IDs.
445 controller = None
446 elif dayobs > 20250417 and controller in {"P", "S", "Q"}:
447 # At some point in the past the PSQ and OC controller sequence
448 # counters were unified. To avoid confusion with previous files
449 # that may already be ingested where we do not want to change
450 # the exposure ID, only assume identical sequences from this date.
451 controller = None
453 # Camera control changes the exposure ID
454 if controller is not None:
455 index = CONTROLLERS.find(controller)
456 if index == -1:
457 raise ValueError(f"Supplied controller, '{controller}' is not "
458 f"in supported list: {CONTROLLERS}")
460 # Increment a thousand years per controller
461 dayobs += _CONTROLLER_INCREMENT * index
463 # Form the number as a string zero padding the sequence number
464 idstr = f"{dayobs}{seqnum:0{_SEQNUM_MAXDIGITS}d}"
466 # Exposure ID has to be an integer
467 return int(idstr)
469 @staticmethod
470 def unpack_exposure_id(exposure_id):
471 """Unpack an exposure ID into dayobs, seqnum, and controller.
473 Parameters
474 ----------
475 exposure_id : `int`
476 Integer exposure ID produced by `compute_exposure_id`.
478 Returns
479 -------
480 dayobs : `str`
481 Day of observation as a YYYYMMDD string.
482 seqnum : `int`
483 Sequence number.
484 controller : `str`
485 Controller code. Will be ``O`` (but should be ignored) for IDs
486 produced by calling `compute_exposure_id` with ``controller=None``.
487 """
488 dayobs, seqnum = divmod(exposure_id, 10**_SEQNUM_MAXDIGITS)
489 controller_index = dayobs // _CONTROLLER_INCREMENT - 2
490 dayobs -= controller_index * _CONTROLLER_INCREMENT
491 return (str(dayobs), seqnum, CONTROLLERS[controller_index], )
493 def _is_on_mountain(self):
494 """Indicate whether these data are coming from the instrument
495 installed on the mountain.
497 Returns
498 -------
499 is : `bool`
500 `True` if instrument is on the mountain.
501 """
502 if "TSTAND" in self._header:
503 return False
504 return True
506 def is_on_sky(self):
507 """Determine if this is an on-sky observation.
509 Returns
510 -------
511 is_on_sky : `bool`
512 Returns True if this is a observation on sky on the
513 summit.
514 """
515 # For LSST we think on sky unless tracksys is local
516 if self.is_key_ok("TRACKSYS"):
517 if self._header["TRACKSYS"].lower() == "local":
518 # not on sky
519 return False
521 # These are obviously not on sky
522 if self.to_observation_type() in self._non_sky_observation_types:
523 return False
525 return self._is_on_mountain()
527 @cache_translation
528 def to_location(self):
529 # Docstring will be inherited. Property defined in properties.py
530 if not self._is_on_mountain():
531 return None
532 try:
533 # Try standard FITS headers
534 return super().to_location()
535 except (KeyError, TypeError):
536 return self._DEFAULT_LOCATION
538 @cache_translation
539 def to_datetime_begin(self):
540 # Docstring will be inherited. Property defined in properties.py
541 # Prefer -BEG over -OBS. Let it fail with KeyError if no headers
542 # can be found.
543 date_key = "MJD-BEG"
544 date_fmt = "mjd"
545 for k in ("MJD-BEG", "DATE-BEG", "MJD-OBS", "DATE-OBS"):
546 if self.is_key_ok(k):
547 date_key = k
548 date_fmt = "mjd" if k.startswith("MJD") else "fits"
549 break
551 self._used_these_cards(date_key)
552 return Time(self._header[date_key], scale="tai", format=date_fmt)
554 @cache_translation
555 def to_datetime_end(self):
556 # Docstring will be inherited. Property defined in properties.py
557 if self.is_key_ok("DATE-END"):
558 return super().to_datetime_end()
560 exposure_time = self.to_exposure_time()
561 if exposure_time.value < 0.0:
562 # Some translators deliberately return -1.0s if the exposure
563 # time can not be determined. In that scenario set end time
564 # to the same value as the start time.
565 return self.to_datetime_begin()
567 return self.to_datetime_begin() + exposure_time
569 @cache_translation
570 def to_detector_num(self):
571 # Docstring will be inherited. Property defined in properties.py
572 raft = self.to_detector_group()
573 detector = self.to_detector_name()
574 return self.compute_detector_num_from_name(raft, detector)
576 @cache_translation
577 def to_detector_exposure_id(self):
578 # Docstring will be inherited. Property defined in properties.py
579 exposure_id = self.to_exposure_id()
580 num = self.to_detector_num()
581 return self.compute_detector_exposure_id(exposure_id, num)
583 @cache_translation
584 def to_observation_type(self):
585 # Docstring will be inherited. Property defined in properties.py
586 obstype = self._header["IMGTYPE"]
587 self._used_these_cards("IMGTYPE")
588 obstype = obstype.lower()
589 if obstype in ("skyexp", "object"):
590 obstype = "science"
591 return obstype
593 @cache_translation
594 def to_observation_reason(self):
595 # Docstring will be inherited. Property defined in properties.py
596 for key in ("REASON", "TESTTYPE"):
597 if self.is_key_ok(key):
598 reason = self._header[key]
599 self._used_these_cards(key)
600 return reason.lower()
601 # no specific header present so use the default translation
602 return super().to_observation_reason()
604 @cache_translation
605 def to_dark_time(self):
606 """Calculate the dark time.
608 If a DARKTIME header is not found, the value is assumed to be
609 identical to the exposure time.
611 Returns
612 -------
613 dark : `astropy.units.Quantity`
614 The dark time in seconds.
615 """
616 if self.is_key_ok("DARKTIME"):
617 darktime = self._header["DARKTIME"]*u.s
618 self._used_these_cards("DARKTIME")
619 else:
620 log.warning("%s: Unable to determine dark time. Setting from exposure time.",
621 self._log_prefix)
622 darktime = self.to_exposure_time()
623 return darktime
625 def _get_controller_code(self) -> str | None:
626 """Return the controller code.
628 Returns
629 -------
630 code : `str`
631 Single character code representing the controller. Returns
632 `None` if no controller can be determined.
633 """
634 key = "CONTRLLR"
635 if self.is_key_ok(key):
636 controller = self._header[key]
637 self._used_these_cards(key)
638 else:
639 controller = None
640 return controller
642 @cache_translation
643 def to_exposure_id(self):
644 """Generate a unique exposure ID number
646 This is a combination of DAYOBS and SEQNUM, and optionally
647 CONTRLLR.
649 Returns
650 -------
651 exposure_id : `int`
652 Unique exposure number.
653 """
654 if "CALIB_ID" in self._header:
655 self._used_these_cards("CALIB_ID")
656 return None
658 dayobs = self._header["DAYOBS"]
659 seqnum = self._header["SEQNUM"]
660 self._used_these_cards("DAYOBS", "SEQNUM")
662 controller = self._get_controller_code()
664 return self.compute_exposure_id(dayobs, seqnum, controller=controller)
666 @cache_translation
667 def to_visit_id(self):
668 """Calculate the visit associated with this exposure.
670 Notes
671 -----
672 For LATISS and LSSTCam the default visit is derived from the
673 exposure group. For other instruments we return the exposure_id.
674 """
676 exposure_group = self.to_exposure_group()
677 # If the group is an int we return it
678 try:
679 visit_id = int(exposure_group)
680 return visit_id
681 except ValueError:
682 pass
684 # A Group is defined as ISO date with an extension
685 # The integer must be the same for a given group so we can never
686 # use datetime_begin.
687 # Nominally a GROUPID looks like "ISODATE+N" where the +N is
688 # optional. This can be converted to seconds since epoch with
689 # an adjustment for N.
690 # For early data lacking that form we hash the group and return
691 # the int.
692 matches_date = GROUP_RE.match(exposure_group)
693 if matches_date:
694 iso_str = matches_date.group(1)
695 fraction = matches_date.group(2)
696 n = matches_date.group(3)
697 if n is not None:
698 n = int(n)
699 else:
700 n = 0
701 iso = datetime.datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S")
703 tdelta = iso - TZERO_DATETIME
704 epoch = int(tdelta.total_seconds())
706 # Form the integer from EPOCH + 3 DIGIT FRAC + 0-pad N
707 visit_id = int(f"{epoch}{fraction}{n:04d}")
708 else:
709 # Non-standard string so convert to numbers
710 # using a hash function. Use the first N hex digits
711 group_bytes = exposure_group.encode("us-ascii")
712 hasher = hashlib.blake2b(group_bytes)
713 # Need to be big enough it does not possibly clash with the
714 # date-based version above
715 digest = hasher.hexdigest()[:14]
716 visit_id = int(digest, base=16)
718 # To help with hash collision, append the string length
719 visit_id = int(f"{visit_id}{len(exposure_group):02d}")
721 return visit_id
723 @cache_translation
724 def to_physical_filter(self):
725 """Calculate the physical filter name.
727 Returns
728 -------
729 filter : `str`
730 Name of filter. Can be a combination of FILTER, FILTER1 and FILTER2
731 headers joined by a "~". Returns "unknown" if no filter is declared
732 """
733 joined = self._join_keyword_values(["FILTER", "FILTER1", "FILTER2"], delim=FILTER_DELIMITER)
734 if not joined:
735 joined = "unknown"
737 # Replace instances of "NONE" with "none".
738 joined = joined.replace("NONE", "none")
740 return joined
742 @cache_translation
743 def to_tracking_radec(self):
744 # Do not even attempt to attach an RA/Dec for observations that we
745 # know are not going to be tracking. The Rubin OCS can sometimes
746 # report the telescope is tracking when it's not when doing
747 # calibrations like these. Darks are sometimes taken whilst tracking
748 # to test stability so those are special-cased.
749 non_sky_obstypes = {t for t in self._non_sky_observation_types if t != "dark"}
750 if self.to_observation_type() in non_sky_obstypes:
751 return None
753 # Not an observation that is tracking in RA/Dec so it is not
754 # appropriate to report a value for this.
755 if self.are_keys_ok(["TRACKSYS"]) and self._header["TRACKSYS"] != "RADEC":
756 return None
758 # RA/DEC are *derived* headers and for the case where the DATE-BEG
759 # is 1970 they are garbage and should not be used.
760 try:
761 if self._header.get("DATE-OBS") == self._header["DATE"]:
762 # A fixed up date -- use AZEL as source of truth
763 altaz = self.to_altaz_begin()
764 radec = astropy.coordinates.SkyCoord(altaz.transform_to(astropy.coordinates.ICRS()),
765 obstime=altaz.obstime,
766 location=altaz.location)
767 else:
768 radecsys = ("RADESYS",)
769 radecpairs = (("RASTART", "DECSTART"), ("RA", "DEC"))
770 radec = tracking_from_degree_headers(self, radecsys, radecpairs)
771 except Exception:
772 # If this observation was not formally on sky then we are allowed
773 # to return None.
774 if self.is_on_sky():
775 raise
776 radec = None
778 return radec
780 @cache_translation
781 def to_altaz_begin(self):
782 return self._to_altaz("AZSTART", "ELSTART")
784 @cache_translation
785 def to_altaz_end(self):
786 return self._to_altaz("AZEND", "ELEND")
788 def _to_altaz(self, az_key, el_key):
789 if not self._is_on_mountain():
790 return None
792 # H controller data are sometimes science observations without
793 # having AZx header. The code lets those return nothing.
794 if self._get_controller_code() == "H" and not self.are_keys_ok([el_key, az_key]):
795 return None
797 # Always attempt to find the alt/az values regardless of observation
798 # type.
799 altaz = altaz_from_degree_headers(self, ((el_key, az_key),),
800 self.to_datetime_begin(), is_zd=False, max_alt=95.55, min_alt=-5.55)
801 self._used_these_cards(el_key, az_key)
802 return altaz
804 @cache_translation
805 def to_exposure_group(self):
806 """Calculate the exposure group string.
808 For LSSTCam and LATISS this is read from the ``GROUPID`` header.
809 If that header is missing the exposure_id is returned instead as
810 a string.
811 """
812 if self.is_key_ok("GROUPID"):
813 exposure_group = self._header["GROUPID"]
814 self._used_these_cards("GROUPID")
815 # Sometimes people forget to quote date strings in YAML
816 # correction files. This is a problem because we are assuming
817 # strings for matching across multiple exposures and if the
818 # value in the YAML file is not milliseconds then there is
819 # a potential disaster.
820 if isinstance(exposure_group, datetime.datetime):
821 exposure_group = exposure_group.isoformat(timespec="milliseconds")
822 return exposure_group
823 return super().to_exposure_group()
825 @cache_translation
826 def to_focus_z(self):
827 """Return the defocal distance of the camera in units of mm.
828 If there is no ``FOCUSZ`` value in the header it will return
829 the default 0.0mm value.
831 Returns
832 -------
833 focus_z: `astropy.units.Quantity`
834 The defocal distance from header in mm or the 0.0mm default
835 """
836 if self.is_key_ok("FOCUSZ"):
837 # Some broken files have strings instead of floats.
838 focus_z = float(self._header["FOCUSZ"])
839 return focus_z * u.mm
840 return super().to_focus_z()
842 @staticmethod
843 def _is_filter_empty(filter):
844 """Return true if the supplied filter indicates an empty filter slot
846 Parameters
847 ----------
848 filter : `str`
849 The filter string to check.
851 Returns
852 -------
853 is_empty : `bool`
854 `True` if the filter string looks like it is referring to an
855 empty filter slot. For example this can be if the filter is
856 "empty" or "empty_2".
857 """
858 return bool(re.match(r"empty_?\d*$", filter.lower()))
860 def _determine_primary_filter(self):
861 """Determine the primary filter from the ``FILTER`` header.
863 Returns
864 -------
865 filter : `str`
866 The contents of the ``FILTER`` header with some appropriate
867 defaulting.
868 """
870 if self.is_key_ok("FILTER"):
871 physical_filter = self._header["FILTER"]
872 self._used_these_cards("FILTER")
874 if self._is_filter_empty(physical_filter):
875 physical_filter = "empty"
876 else:
877 # Be explicit about having no knowledge of the filter
878 # by setting it to "unknown". It should always have a value.
879 physical_filter = "unknown"
881 # Warn if the filter being unknown is important
882 obstype = self.to_observation_type()
883 if obstype not in ("bias", "dark"):
884 log.warning("%s: Unable to determine the filter",
885 self._log_prefix)
887 return physical_filter
889 @cache_translation
890 def to_observing_day(self):
891 """Return the day of observation as YYYYMMDD integer.
893 For LSSTCam and other compliant instruments this is the value
894 of the DAYOBS header.
896 Returns
897 -------
898 obs_day : `int`
899 The day of observation.
900 """
901 if self.is_key_ok("DAYOBS"):
902 self._used_these_cards("DAYOBS")
903 return int(self._header["DAYOBS"])
905 return super().to_observing_day()
907 @cache_translation
908 def to_observation_counter(self):
909 """Return the sequence number within the observing day.
911 Returns
912 -------
913 counter : `int`
914 The sequence number for this day.
915 """
916 if self.is_key_ok("SEQNUM"):
917 # Some older LATISS data may not have the header
918 # but this is corrected in fix_header for LATISS.
919 self._used_these_cards("SEQNUM")
920 return int(self._header["SEQNUM"])
922 # This indicates a problem so we warn and return a 0
923 log.warning("%s: Unable to determine the observation counter so returning 0",
924 self._log_prefix)
925 return 0
927 @cache_translation
928 def to_boresight_rotation_coord(self):
929 """Boresight rotation angle.
931 Only relevant for science observations.
932 """
933 unknown = "unknown"
934 if not self.is_on_sky():
935 return unknown
937 self._used_these_cards("ROTCOORD")
938 coord = self._header.get("ROTCOORD", unknown)
939 if coord is None:
940 coord = unknown
941 return coord
943 @cache_translation
944 def to_boresight_airmass(self):
945 """Calculate airmass at boresight at start of observation.
947 Notes
948 -----
949 Early data are missing AMSTART header so we fall back to calculating
950 it from ELSTART.
951 """
952 if not self.is_on_sky():
953 return None
955 # This observation should have AMSTART
956 amkey = "AMSTART"
957 if self.is_key_ok(amkey):
958 self._used_these_cards(amkey)
959 return self._header[amkey]
961 # Instead we need to look at azel
962 altaz = self.to_altaz_begin()
963 if altaz is not None:
964 return altaz.secz.to_value()
966 log.warning("%s: Unable to determine airmass of a science observation, returning 1.",
967 self._log_prefix)
968 return 1.0
970 @cache_translation
971 def to_group_counter_start(self):
972 # Effectively the start of the visit as determined by the headers.
973 counter = self.to_observation_counter()
974 # Older data does not have the CURINDEX header.
975 if self.is_key_ok("CURINDEX"):
976 # CURINDEX is 1-based.
977 seq_start = counter - self._header["CURINDEX"] + 1
978 self._used_these_cards("CURINDEX")
979 return seq_start
980 else:
981 # If the counter is 0 we need to pick something else
982 # that is not going to confuse the visit calculation
983 # (since setting everything to 0 will make one big visit).
984 return counter if counter != 0 else self.to_exposure_id()
986 @cache_translation
987 def to_group_counter_end(self):
988 # Effectively the end of the visit as determined by the headers.
989 counter = self.to_observation_counter()
990 # Older data does not have the CURINDEX or MAXINDEX headers.
991 if self.is_key_ok("CURINDEX") and self.is_key_ok("MAXINDEX"):
992 # CURINDEX is 1-based. CURINDEX == MAXINDEX indicates the
993 # final exposure in the sequence.
994 remaining = self._header["MAXINDEX"] - self._header["CURINDEX"]
995 seq_end = counter + remaining
996 self._used_these_cards("CURINDEX", "MAXINDEX")
997 return seq_end
998 else:
999 # If the counter is 0 we need to pick something else
1000 # that is not going to confuse the visit calculation
1001 # (since setting everything to 0 will make one big visit).
1002 return counter if counter != 0 else self.to_exposure_id()
1004 @cache_translation
1005 def to_has_simulated_content(self):
1006 # Check all the simulation flags.
1007 # We do not know all the simulation flags that we may have so
1008 # must check every header key. Ideally HIERARCH SIMULATE would
1009 # be a hierarchical header so _header["SIMULATE"] would return
1010 # everything. The header looks like:
1011 #
1012 # HIERARCH SIMULATE ATMCS = / ATMCS Simulation Mode
1013 # HIERARCH SIMULATE ATHEXAPOD = 0 / ATHexapod Simulation Mode
1014 # HIERARCH SIMULATE ATPNEUMATICS = / ATPneumatics Simulation Mode
1015 # HIERARCH SIMULATE ATDOME = 1 / ATDome Simulation Mode
1016 # HIERARCH SIMULATE ATSPECTROGRAPH = 0 / ATSpectrograph Simulation Mode
1017 #
1018 # So any header that includes "SIMULATE" in the key name and has a
1019 # true value implies that something in the data is simulated.
1020 for k, v in self._header.items():
1021 if "SIMULATE" in k and v:
1022 self._used_these_cards(k)
1023 return True
1025 # If the controller is H, P, S, or Q then the data are simulated.
1026 controller = self._get_controller_code()
1027 if controller:
1028 if controller in SIMULATED_CONTROLLERS:
1029 return True
1031 # No simulation flags set.
1032 return False
1034 @cache_translation
1035 def to_relative_humidity(self) -> float | None:
1036 key = "HUMIDITY"
1037 if self.is_key_ok(key):
1038 self._used_these_cards(key)
1039 return self._header[key]
1041 return None
1043 @cache_translation
1044 def to_pressure(self):
1045 key = "PRESSURE"
1046 if self.is_key_ok(key):
1047 value = self._header[key]
1048 self._used_these_cards(key)
1049 # There has been an inconsistency in units for the pressure reading
1050 # so we need to adjust for this.
1051 if value > 10_000:
1052 unit = u.Pa
1053 else:
1054 unit = u.hPa
1055 return value * unit
1057 return None
1059 @cache_translation
1060 def to_temperature(self):
1061 key = "AIRTEMP"
1062 if self.is_key_ok(key):
1063 self._used_these_cards(key)
1064 return self._header[key] * u.deg_C
1065 return None
1067 @cache_translation
1068 def to_can_see_sky(self) -> bool | None:
1069 key = "SHUTTIME"
1070 if self.is_key_ok(key) and self._header[key] == 0.0:
1071 # Shutter never opened so impossible to see sky.
1072 self._used_these_cards(key)
1073 return False
1075 key = "VIGN_MIN"
1076 if self.is_key_ok(key):
1077 self._used_these_cards(key)
1078 vignetted = self._header[key]
1079 match vignetted:
1080 case "FULLY":
1081 return False
1082 case "UNKNOWN":
1083 return None
1084 case _:
1085 return True
1087 # Fallback to using the observation type if the key is missing.
1088 # PhoSim always falls back.
1089 if self._can_check_obstype_for_can_see_sky or self._get_controller_code() == "H":
1090 return super().to_can_see_sky()
1092 # Unknown state.
1093 return None