Coverage for python / lsst / obs / lsst / translators / lsst.py: 23%
453 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:58 +0000
1# This file is currently part of obs_lsst but is written to allow it
2# to be migrated to the astro_metadata_translator package at a later date.
3#
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the LICENSE file in this directory for details of code ownership.
7#
8# Use of this source code is governed by a 3-clause BSD-style
9# license that can be found in the LICENSE file.
11"""Metadata translation support code for LSST headers"""
13__all__ = ("TZERO", "SIMONYI_LOCATION", "read_detector_ids",
14 "compute_detector_exposure_id_generic", "LsstBaseTranslator",
15 "SIMONYI_TELESCOPE")
17import os.path
18import yaml
19import logging
20import re
21import datetime
22import hashlib
24import astropy.coordinates
25import astropy.units as u
26from astropy.time import Time, TimeDelta
27from astropy.coordinates import EarthLocation
29from lsst.utils import getPackageDir
31from astro_metadata_translator import cache_translation, FitsTranslator
32from astro_metadata_translator.translators.helpers import tracking_from_degree_headers, \
33 altaz_from_degree_headers
36TZERO = Time("2015-01-01T00:00", format="isot", scale="utc")
37TZERO_DATETIME = TZERO.to_datetime()
39# Delimiter to use for multiple filters/gratings
40FILTER_DELIMITER = "~"
42# Regex to use for parsing a GROUPID string
43GROUP_RE = re.compile(r"^(\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d)\.(\d\d\d)(?:[\+#](\d+))?$")
45# LSST Default location in the absence of headers
46SIMONYI_LOCATION = EarthLocation.from_geodetic(-70.749417, -30.244639, 2663.0)
48# Name of the main survey telescope
49SIMONYI_TELESCOPE = "Simonyi Survey Telescope"
51# Supported controller codes.
52# The order here directly relates to the resulting exposure ID
53# calculation. Do not reorder. Add new ones to the end.
54# OCS, CCS, pHosim, P for simulated OCS, Q for simulated CCS, S for
55# simulated images.
56SIMULATED_CONTROLLERS = "HPQS"
57CONTROLLERS = "OC" + SIMULATED_CONTROLLERS
59# Number of decimal digits allocated to the sequence number in exposure_ids.
60_SEQNUM_MAXDIGITS = 5
62# Number of decimal digits allocated to the day of observation (and controller
63# code) in exposure_ids.
64_DAYOBS_MAXDIGITS = 8
66# Value added to day_obs for controllers after the default.
67_CONTROLLER_INCREMENT = 1000_00_00
69# Number of decimal digits used by exposure_ids.
70EXPOSURE_ID_MAXDIGITS = _SEQNUM_MAXDIGITS + _DAYOBS_MAXDIGITS
72obs_lsst_packageDir = getPackageDir("obs_lsst")
74log = logging.getLogger(__name__)
77def read_detector_ids(policyFile):
78 """Read a camera policy file and retrieve the mapping from CCD name
79 to ID.
81 Parameters
82 ----------
83 policyFile : `str`
84 Name of YAML policy file to read, relative to the obs_lsst
85 package.
87 Returns
88 -------
89 mapping : `dict` of `str` to (`int`, `str`)
90 A `dict` with keys being the full names of the detectors, and the
91 value is a `tuple` containing the integer detector number and the
92 detector serial number.
94 Notes
95 -----
96 Reads the camera YAML definition file directly and extracts just the
97 IDs and serials. This routine does not use the standard
98 `~lsst.obs.base.yamlCamera.YAMLCamera` infrastructure or
99 `lsst.afw.cameraGeom`. This is because the translators are intended to
100 have minimal dependencies on LSST infrastructure.
101 """
103 file = os.path.join(obs_lsst_packageDir, policyFile)
104 try:
105 with open(file) as fh:
106 # Use the fast parser since these files are large
107 camera = yaml.load(fh, Loader=yaml.CSafeLoader)
108 except OSError as e:
109 raise ValueError(f"Could not load camera policy file {file}") from e
111 mapping = {}
112 for ccd, value in camera["CCDs"].items():
113 mapping[ccd] = (int(value["id"]), value["serial"])
115 return mapping
118def compute_detector_exposure_id_generic(exposure_id, detector_num, max_num):
119 """Compute the detector_exposure_id from the exposure id and the
120 detector number.
122 Parameters
123 ----------
124 exposure_id : `int`
125 The exposure ID.
126 detector_num : `int`
127 The detector number.
128 max_num : `int`
129 Maximum number of detectors to make space for.
131 Returns
132 -------
133 detector_exposure_id : `int`
134 Computed ID.
136 Raises
137 ------
138 ValueError
139 The detector number is out of range.
140 """
142 if detector_num is None:
143 raise ValueError("Detector number must be defined.")
144 if detector_num >= max_num or detector_num < 0:
145 raise ValueError(f"Detector number out of range 0 <= {detector_num} < {max_num}")
147 return max_num*exposure_id + detector_num
150class LsstBaseTranslator(FitsTranslator):
151 """Translation methods useful for all LSST-style headers."""
153 _const_map = {}
154 _trivial_map = {}
156 # Do not specify a name for this translator
157 cameraPolicyFile = None
158 """Path to policy file relative to obs_lsst root."""
160 detectorMapping = None
161 """Mapping of detector name to detector number and serial."""
163 detectorSerials = None
164 """Mapping of detector serial number to raft, number, and name."""
166 DETECTOR_MAX = 1000
167 """Maximum number of detectors to use when calculating the
168 detector_exposure_id.
170 Note that because this is the maximum number *of* detectors, for
171 zero-based ``detector_num`` values this is one greater than the maximum
172 ``detector_num``. It is also often rounded up to the nearest power of
173 10 anyway, to allow ``detector_exposure_id`` values to be easily decoded by
174 humans.
175 """
177 _DEFAULT_LOCATION = SIMONYI_LOCATION
178 """Default telescope location in absence of relevant FITS headers."""
180 _ROLLOVER_TIME = TimeDelta(12*60*60, scale="tai", format="sec")
181 """Time delta for the definition of a Rubin Observatory start of day.
182 Used when the header is missing. See LSE-400 or SITCOMTN-032 for details.
183 """
185 _non_sky_observation_types: tuple[str, ...] = ("bias", "dark", "flat")
186 """Observation types that correspond to an observation where the detector
187 can not see sky photons.
188 """
190 _can_check_obstype_for_can_see_sky: bool = True
191 """If can_see_sky can not be determined, allow usage of observation type
192 if `True`.
193 """
195 @classmethod
196 def __init_subclass__(cls, **kwargs):
197 """Ensure that subclasses clear their own detector mapping entries
198 such that subclasses of translators that use detector mappings
199 do not pick up the incorrect values from a parent."""
201 cls.detectorMapping = None
202 cls.detectorSerials = None
204 super().__init_subclass__(**kwargs)
206 def search_paths(self):
207 """Search paths to use for LSST data when looking for header correction
208 files.
210 Returns
211 -------
212 path : `list`
213 List with a single element containing the full path to the
214 ``corrections`` directory within the ``obs_lsst`` package.
215 """
216 return [os.path.join(obs_lsst_packageDir, "corrections")]
218 @classmethod
219 def observing_date_to_offset(cls, observing_date: astropy.time.Time) -> astropy.time.TimeDelta | None:
220 """Return the offset to use when calculating the observing day.
222 Parameters
223 ----------
224 observing_date : `astropy.time.Time`
225 The date of the observation. Unused.
227 Returns
228 -------
229 offset : `astropy.time.TimeDelta`
230 The offset to apply. The default implementation returns a fixed
231 number but subclasses can return a different value depending
232 on whether the instrument is in the instrument lab or on the
233 mountain.
234 """
235 return cls._ROLLOVER_TIME
237 @classmethod
238 def compute_detector_exposure_id(cls, exposure_id, detector_num):
239 """Compute the detector exposure ID from detector number and
240 exposure ID.
242 This is a helper method to allow code working outside the translator
243 infrastructure to use the same algorithm.
245 Parameters
246 ----------
247 exposure_id : `int`
248 Unique exposure ID.
249 detector_num : `int`
250 Detector number.
252 Returns
253 -------
254 detector_exposure_id : `int`
255 The calculated ID.
256 """
257 from .._packer import RubinDimensionPacker
259 config = RubinDimensionPacker.ConfigClass()
260 config.use_controllers()
261 return RubinDimensionPacker.pack_id_pair(exposure_id, detector_num, config=config)
263 @classmethod
264 def max_detector_exposure_id(cls):
265 """The maximum detector exposure ID expected to be generated by
266 this instrument.
268 Returns
269 -------
270 max_id : `int`
271 The maximum value.
272 """
273 max_exposure_id = cls.max_exposure_id()
274 # We subtract 1 from DETECTOR_MAX because LSST detector_num values are
275 # zero-based, and detector_max is the maximum number *of* detectors,
276 # while this returns the (inclusive) maximum ID value.
277 return cls.compute_detector_exposure_id(max_exposure_id, cls.DETECTOR_MAX - 1)
279 @classmethod
280 def max_exposure_id(cls):
281 """The maximum exposure ID expected from this instrument.
283 Returns
284 -------
285 max_exposure_id : `int`
286 The maximum value.
288 Notes
289 -----
290 The value is hard-coded to reflect historical values that were used
291 for various controllers before the sequence counter was unified.
292 """
293 # Assumes maximum observing date of 2050-12-31, 99,999 exposures per
294 # day and 6 controllers.
295 return 7050123199999
297 @classmethod
298 def detector_mapping(cls):
299 """Returns the mapping of full name to detector ID and serial.
301 Returns
302 -------
303 mapping : `dict` of `str`:`tuple`
304 Returns the mapping of full detector name (group+detector)
305 to detector number and serial.
307 Raises
308 ------
309 ValueError
310 Raised if no camera policy file has been registered with this
311 translation class.
313 Notes
314 -----
315 Will construct the mapping if none has previously been constructed.
316 """
317 if cls.cameraPolicyFile is not None:
318 if cls.detectorMapping is None:
319 cls.detectorMapping = read_detector_ids(cls.cameraPolicyFile)
320 else:
321 raise ValueError(f"Translation class '{cls.__name__}' has no registered camera policy file")
323 return cls.detectorMapping
325 @classmethod
326 def detector_serials(cls):
327 """Obtain the mapping of detector serial to detector group, name,
328 and number.
330 Returns
331 -------
332 info : `dict` of `tuple` of (`str`, `str`, `int`)
333 A `dict` with the serial numbers as keys and values of detector
334 group, name, and number.
335 """
336 if cls.detectorSerials is None:
337 detector_mapping = cls.detector_mapping()
339 if detector_mapping is not None:
340 # Form mapping to go from serial number to names/numbers
341 serials = {}
342 for fullname, (id, serial) in cls.detectorMapping.items():
343 raft, detector_name = fullname.split("_")
344 if serial in serials:
345 raise RuntimeError(f"Serial {serial} is defined in multiple places")
346 serials[serial] = (raft, detector_name, id)
347 cls.detectorSerials = serials
348 else:
349 raise RuntimeError("Unable to obtain detector mapping information")
351 return cls.detectorSerials
353 @classmethod
354 def compute_detector_num_from_name(cls, detector_group, detector_name):
355 """Helper method to return the detector number from the name.
357 Parameters
358 ----------
359 detector_group : `str`
360 Name of the detector grouping. This is generally the raft name.
361 detector_name : `str`
362 Detector name.
364 Returns
365 -------
366 num : `int`
367 Detector number.
368 """
369 fullname = f"{detector_group}_{detector_name}"
371 num = None
372 detector_mapping = cls.detector_mapping()
373 if detector_mapping is None:
374 raise RuntimeError("Unable to obtain detector mapping information")
376 if fullname in detector_mapping:
377 num = detector_mapping[fullname]
378 else:
379 log.warning(f"Unable to determine detector number from detector name {fullname}")
380 return None
382 return num[0]
384 @classmethod
385 def compute_detector_info_from_serial(cls, detector_serial):
386 """Helper method to return the detector information from the serial.
388 Parameters
389 ----------
390 detector_serial : `str`
391 Detector serial ID.
393 Returns
394 -------
395 info : `tuple` of (`str`, `str`, `int`)
396 Detector group, name, and number.
397 """
398 serial_mapping = cls.detector_serials()
399 if serial_mapping is None:
400 raise RuntimeError("Unable to obtain serial mapping information")
402 if detector_serial in serial_mapping:
403 info = serial_mapping[detector_serial]
404 else:
405 raise RuntimeError("Unable to determine detector information from detector serial"
406 f" {detector_serial}")
408 return info
410 @staticmethod
411 def compute_exposure_id(dayobs, seqnum, controller=None):
412 """Helper method to calculate the exposure_id.
414 Parameters
415 ----------
416 dayobs : `str` or `int`
417 Day of observation in either YYYYMMDD or YYYY-MM-DD format.
418 If the string looks like ISO format it will be truncated before the
419 ``T`` before being handled.
420 seqnum : `int` or `str`
421 Sequence number.
422 controller : `str`, optional
423 Controller to use. If this is "O", no change is made to the
424 exposure ID. Before Oct 5 2023, if it is "C" a 1000 is added to the
425 year component of the exposure ID. If it is "H" a 2000 is added to
426 the year component. Before Apr 18 2025, this sequence continues
427 with "P", "Q", and "S" controllers. `None` indicates that the
428 controller is not relevant to the exposure ID calculation
429 (generally this is the case for test stand data).
431 Returns
432 -------
433 exposure_id : `int`
434 Exposure ID in form YYYYMMDDnnnnn form.
435 """
436 if isinstance(seqnum, str):
437 seqnum = int(seqnum)
438 # We really want an integer but the checks require a str.
439 if isinstance(dayobs, int):
440 dayobs = str(dayobs)
442 if "T" in dayobs:
443 dayobs = dayobs[:dayobs.find("T")]
445 dayobs = dayobs.replace("-", "")
447 if len(dayobs) != 8:
448 raise ValueError(f"Malformed dayobs: {dayobs}")
450 # Expect no more than 99,999 exposures in a day
451 if seqnum >= 10**_SEQNUM_MAXDIGITS:
452 raise ValueError(f"Sequence number ({seqnum}) exceeds limit")
454 dayobs = int(dayobs)
455 if dayobs > 20231004 and controller == "C":
456 # As of this date the CCS controller has a unified counter
457 # with the OCS, so there is no need to adjust the dayobs
458 # to make unique exposure IDs.
459 controller = None
460 elif dayobs > 20250417 and controller in {"P", "S", "Q"}:
461 # At some point in the past the PSQ and OC controller sequence
462 # counters were unified. To avoid confusion with previous files
463 # that may already be ingested where we do not want to change
464 # the exposure ID, only assume identical sequences from this date.
465 controller = None
467 # Camera control changes the exposure ID
468 if controller is not None:
469 index = CONTROLLERS.find(controller)
470 if index == -1:
471 raise ValueError(f"Supplied controller, '{controller}' is not "
472 f"in supported list: {CONTROLLERS}")
474 # Increment a thousand years per controller
475 dayobs += _CONTROLLER_INCREMENT * index
477 # Form the number as a string zero padding the sequence number
478 idstr = f"{dayobs}{seqnum:0{_SEQNUM_MAXDIGITS}d}"
480 # Exposure ID has to be an integer
481 return int(idstr)
483 @staticmethod
484 def unpack_exposure_id(exposure_id):
485 """Unpack an exposure ID into dayobs, seqnum, and controller.
487 Parameters
488 ----------
489 exposure_id : `int`
490 Integer exposure ID produced by `compute_exposure_id`.
492 Returns
493 -------
494 dayobs : `str`
495 Day of observation as a YYYYMMDD string.
496 seqnum : `int`
497 Sequence number.
498 controller : `str`
499 Controller code. Will be ``O`` (but should be ignored) for IDs
500 produced by calling `compute_exposure_id` with ``controller=None``.
501 """
502 dayobs, seqnum = divmod(exposure_id, 10**_SEQNUM_MAXDIGITS)
503 controller_index = dayobs // _CONTROLLER_INCREMENT - 2
504 dayobs -= controller_index * _CONTROLLER_INCREMENT
505 return (str(dayobs), seqnum, CONTROLLERS[controller_index], )
507 def _is_on_mountain(self):
508 """Indicate whether these data are coming from the instrument
509 installed on the mountain.
511 Returns
512 -------
513 is : `bool`
514 `True` if instrument is on the mountain.
515 """
516 if "TSTAND" in self._header:
517 return False
518 return True
520 def is_on_sky(self):
521 """Determine if this is an on-sky observation.
523 Returns
524 -------
525 is_on_sky : `bool`
526 Returns True if this is a observation on sky on the
527 summit.
528 """
529 # For LSST we think on sky unless tracksys is local
530 if self.is_key_ok("TRACKSYS"):
531 if self._header["TRACKSYS"].lower() == "local":
532 # not on sky
533 return False
535 # These are obviously not on sky
536 if self.to_observation_type() in self._non_sky_observation_types:
537 return False
539 return self._is_on_mountain()
541 @cache_translation
542 def to_location(self):
543 # Docstring will be inherited. Property defined in properties.py
544 if not self._is_on_mountain():
545 return None
546 try:
547 # Try standard FITS headers
548 return super().to_location()
549 except (KeyError, TypeError):
550 return self._DEFAULT_LOCATION
552 @cache_translation
553 def to_datetime_begin(self):
554 # Docstring will be inherited. Property defined in properties.py
555 # Prefer -BEG over -OBS. Let it fail with KeyError if no headers
556 # can be found.
557 date_key = "MJD-BEG"
558 date_fmt = "mjd"
559 for k in ("MJD-BEG", "DATE-BEG", "MJD-OBS", "DATE-OBS"):
560 if self.is_key_ok(k):
561 date_key = k
562 date_fmt = "mjd" if k.startswith("MJD") else "fits"
563 break
565 self._used_these_cards(date_key)
566 return Time(self._header[date_key], scale="tai", format=date_fmt)
568 @cache_translation
569 def to_datetime_end(self):
570 # Docstring will be inherited. Property defined in properties.py
571 if self.is_key_ok("DATE-END"):
572 return super().to_datetime_end()
574 exposure_time = self.to_exposure_time()
575 if exposure_time.value < 0.0:
576 # Some translators deliberately return -1.0s if the exposure
577 # time can not be determined. In that scenario set end time
578 # to the same value as the start time.
579 return self.to_datetime_begin()
581 return self.to_datetime_begin() + exposure_time
583 @cache_translation
584 def to_detector_num(self):
585 # Docstring will be inherited. Property defined in properties.py
586 raft = self.to_detector_group()
587 detector = self.to_detector_name()
588 return self.compute_detector_num_from_name(raft, detector)
590 @cache_translation
591 def to_detector_exposure_id(self):
592 # Docstring will be inherited. Property defined in properties.py
593 exposure_id = self.to_exposure_id()
594 num = self.to_detector_num()
595 return self.compute_detector_exposure_id(exposure_id, num)
597 @cache_translation
598 def to_observation_type(self):
599 # Docstring will be inherited. Property defined in properties.py
600 obstype = self._header["IMGTYPE"]
601 self._used_these_cards("IMGTYPE")
602 obstype = obstype.lower()
603 if obstype in ("skyexp", "object"):
604 obstype = "science"
605 return obstype
607 @cache_translation
608 def to_observation_reason(self):
609 # Docstring will be inherited. Property defined in properties.py
610 for key in ("REASON", "TESTTYPE"):
611 if self.is_key_ok(key):
612 reason = self._header[key]
613 self._used_these_cards(key)
614 return reason.lower()
615 # no specific header present so use the default translation
616 return super().to_observation_reason()
618 @cache_translation
619 def to_dark_time(self):
620 """Calculate the dark time.
622 If a DARKTIME header is not found, the value is assumed to be
623 identical to the exposure time.
625 Returns
626 -------
627 dark : `astropy.units.Quantity`
628 The dark time in seconds.
629 """
630 if self.is_key_ok("DARKTIME"):
631 darktime = self._header["DARKTIME"]*u.s
632 self._used_these_cards("DARKTIME")
633 else:
634 log.warning("%s: Unable to determine dark time. Setting from exposure time.",
635 self._log_prefix)
636 darktime = self.to_exposure_time()
637 return darktime
639 def _get_controller_code(self) -> str | None:
640 """Return the controller code.
642 Returns
643 -------
644 code : `str`
645 Single character code representing the controller. Returns
646 `None` if no controller can be determined.
647 """
648 key = "CONTRLLR"
649 if self.is_key_ok(key):
650 controller = self._header[key]
651 self._used_these_cards(key)
652 else:
653 controller = None
654 return controller
656 @cache_translation
657 def to_exposure_id(self):
658 """Generate a unique exposure ID number
660 This is a combination of DAYOBS and SEQNUM, and optionally
661 CONTRLLR.
663 Returns
664 -------
665 exposure_id : `int`
666 Unique exposure number.
667 """
668 if "CALIB_ID" in self._header:
669 self._used_these_cards("CALIB_ID")
670 return None
672 dayobs = self._header["DAYOBS"]
673 seqnum = self._header["SEQNUM"]
674 self._used_these_cards("DAYOBS", "SEQNUM")
676 controller = self._get_controller_code()
678 return self.compute_exposure_id(dayobs, seqnum, controller=controller)
680 @cache_translation
681 def to_visit_id(self):
682 """Calculate the visit associated with this exposure.
684 Notes
685 -----
686 For LATISS and LSSTCam the default visit is derived from the
687 exposure group. For other instruments we return the exposure_id.
688 """
690 exposure_group = self.to_exposure_group()
691 # If the group is an int we return it
692 try:
693 visit_id = int(exposure_group)
694 return visit_id
695 except ValueError:
696 pass
698 # A Group is defined as ISO date with an extension
699 # The integer must be the same for a given group so we can never
700 # use datetime_begin.
701 # Nominally a GROUPID looks like "ISODATE+N" where the +N is
702 # optional. This can be converted to seconds since epoch with
703 # an adjustment for N.
704 # For early data lacking that form we hash the group and return
705 # the int.
706 matches_date = GROUP_RE.match(exposure_group)
707 if matches_date:
708 iso_str = matches_date.group(1)
709 fraction = matches_date.group(2)
710 n = matches_date.group(3)
711 if n is not None:
712 n = int(n)
713 else:
714 n = 0
715 iso = datetime.datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S")
717 tdelta = iso - TZERO_DATETIME
718 epoch = int(tdelta.total_seconds())
720 # Form the integer from EPOCH + 3 DIGIT FRAC + 0-pad N
721 visit_id = int(f"{epoch}{fraction}{n:04d}")
722 else:
723 # Non-standard string so convert to numbers
724 # using a hash function. Use the first N hex digits
725 group_bytes = exposure_group.encode("us-ascii")
726 hasher = hashlib.blake2b(group_bytes)
727 # Need to be big enough it does not possibly clash with the
728 # date-based version above
729 digest = hasher.hexdigest()[:14]
730 visit_id = int(digest, base=16)
732 # To help with hash collision, append the string length
733 visit_id = int(f"{visit_id}{len(exposure_group):02d}")
735 return visit_id
737 @cache_translation
738 def to_physical_filter(self):
739 """Calculate the physical filter name.
741 Returns
742 -------
743 filter : `str`
744 Name of filter. Can be a combination of FILTER, FILTER1 and FILTER2
745 headers joined by a "~". Returns "unknown" if no filter is declared
746 """
747 joined = self._join_keyword_values(["FILTER", "FILTER1", "FILTER2"], delim=FILTER_DELIMITER)
748 if not joined:
749 joined = "unknown"
751 # Replace instances of "NONE" with "none".
752 joined = joined.replace("NONE", "none")
754 return joined
756 @cache_translation
757 def to_tracking_radec(self):
758 # Do not even attempt to attach an RA/Dec for observations that we
759 # know are not going to be tracking. The Rubin OCS can sometimes
760 # report the telescope is tracking when it's not when doing
761 # calibrations like these. Darks are sometimes taken whilst tracking
762 # to test stability so those are special-cased.
763 non_sky_obstypes = {t for t in self._non_sky_observation_types if t != "dark"}
764 if self.to_observation_type() in non_sky_obstypes:
765 return None
767 # Not an observation that is tracking in RA/Dec so it is not
768 # appropriate to report a value for this.
769 if self.are_keys_ok(["TRACKSYS"]) and self._header["TRACKSYS"] != "RADEC":
770 return None
772 # RA/DEC are *derived* headers and for the case where the DATE-BEG
773 # is 1970 they are garbage and should not be used.
774 try:
775 if self._header["DATE-OBS"] == self._header["DATE"]:
776 # A fixed up date -- use AZEL as source of truth
777 altaz = self.to_altaz_begin()
778 radec = astropy.coordinates.SkyCoord(altaz.transform_to(astropy.coordinates.ICRS()),
779 obstime=altaz.obstime,
780 location=altaz.location)
781 else:
782 radecsys = ("RADESYS",)
783 radecpairs = (("RASTART", "DECSTART"), ("RA", "DEC"))
784 radec = tracking_from_degree_headers(self, radecsys, radecpairs)
785 except Exception:
786 # If this observation was not formally on sky then we are allowed
787 # to return None.
788 if self.is_on_sky():
789 raise
790 radec = None
792 return radec
794 @cache_translation
795 def to_altaz_begin(self):
796 return self._to_altaz("AZSTART", "ELSTART")
798 @cache_translation
799 def to_altaz_end(self):
800 return self._to_altaz("AZEND", "ELEND")
802 def _to_altaz(self, az_key, el_key):
803 if not self._is_on_mountain():
804 return None
806 # H controller data are sometimes science observations without
807 # having AZx header. The code lets those return nothing.
808 if self._get_controller_code() == "H" and not self.are_keys_ok([el_key, az_key]):
809 return None
811 # Always attempt to find the alt/az values regardless of observation
812 # type.
813 altaz = altaz_from_degree_headers(self, ((el_key, az_key),),
814 self.to_datetime_begin(), is_zd=False, max_alt=95.55, min_alt=-5.55)
815 self._used_these_cards(el_key, az_key)
816 return altaz
818 @cache_translation
819 def to_exposure_group(self):
820 """Calculate the exposure group string.
822 For LSSTCam and LATISS this is read from the ``GROUPID`` header.
823 If that header is missing the exposure_id is returned instead as
824 a string.
825 """
826 if self.is_key_ok("GROUPID"):
827 exposure_group = self._header["GROUPID"]
828 self._used_these_cards("GROUPID")
829 # Sometimes people forget to quote date strings in YAML
830 # correction files. This is a problem because we are assuming
831 # strings for matching across multiple exposures and if the
832 # value in the YAML file is not milliseconds then there is
833 # a potential disaster.
834 if isinstance(exposure_group, datetime.datetime):
835 exposure_group = exposure_group.isoformat(timespec="milliseconds")
836 return exposure_group
837 return super().to_exposure_group()
839 @cache_translation
840 def to_focus_z(self):
841 """Return the defocal distance of the camera in units of mm.
842 If there is no ``FOCUSZ`` value in the header it will return
843 the default 0.0mm value.
845 Returns
846 -------
847 focus_z: `astropy.units.Quantity`
848 The defocal distance from header in mm or the 0.0mm default
849 """
850 if self.is_key_ok("FOCUSZ"):
851 # Some broken files have strings instead of floats.
852 focus_z = float(self._header["FOCUSZ"])
853 return focus_z * u.mm
854 return super().to_focus_z()
856 @staticmethod
857 def _is_filter_empty(filter):
858 """Return true if the supplied filter indicates an empty filter slot
860 Parameters
861 ----------
862 filter : `str`
863 The filter string to check.
865 Returns
866 -------
867 is_empty : `bool`
868 `True` if the filter string looks like it is referring to an
869 empty filter slot. For example this can be if the filter is
870 "empty" or "empty_2".
871 """
872 return bool(re.match(r"empty_?\d*$", filter.lower()))
874 def _determine_primary_filter(self):
875 """Determine the primary filter from the ``FILTER`` header.
877 Returns
878 -------
879 filter : `str`
880 The contents of the ``FILTER`` header with some appropriate
881 defaulting.
882 """
884 if self.is_key_ok("FILTER"):
885 physical_filter = self._header["FILTER"]
886 self._used_these_cards("FILTER")
888 if self._is_filter_empty(physical_filter):
889 physical_filter = "empty"
890 else:
891 # Be explicit about having no knowledge of the filter
892 # by setting it to "unknown". It should always have a value.
893 physical_filter = "unknown"
895 # Warn if the filter being unknown is important
896 obstype = self.to_observation_type()
897 if obstype not in ("bias", "dark"):
898 log.warning("%s: Unable to determine the filter",
899 self._log_prefix)
901 return physical_filter
903 @cache_translation
904 def to_observing_day(self):
905 """Return the day of observation as YYYYMMDD integer.
907 For LSSTCam and other compliant instruments this is the value
908 of the DAYOBS header.
910 Returns
911 -------
912 obs_day : `int`
913 The day of observation.
914 """
915 if self.is_key_ok("DAYOBS"):
916 self._used_these_cards("DAYOBS")
917 return int(self._header["DAYOBS"])
919 return super().to_observing_day()
921 @cache_translation
922 def to_observation_counter(self):
923 """Return the sequence number within the observing day.
925 Returns
926 -------
927 counter : `int`
928 The sequence number for this day.
929 """
930 if self.is_key_ok("SEQNUM"):
931 # Some older LATISS data may not have the header
932 # but this is corrected in fix_header for LATISS.
933 self._used_these_cards("SEQNUM")
934 return int(self._header["SEQNUM"])
936 # This indicates a problem so we warn and return a 0
937 log.warning("%s: Unable to determine the observation counter so returning 0",
938 self._log_prefix)
939 return 0
941 @cache_translation
942 def to_boresight_rotation_coord(self):
943 """Boresight rotation angle.
945 Only relevant for science observations.
946 """
947 unknown = "unknown"
948 if not self.is_on_sky():
949 return unknown
951 self._used_these_cards("ROTCOORD")
952 coord = self._header.get("ROTCOORD", unknown)
953 if coord is None:
954 coord = unknown
955 return coord
957 @cache_translation
958 def to_boresight_airmass(self):
959 """Calculate airmass at boresight at start of observation.
961 Notes
962 -----
963 Early data are missing AMSTART header so we fall back to calculating
964 it from ELSTART.
965 """
966 if not self.is_on_sky():
967 return None
969 # This observation should have AMSTART
970 amkey = "AMSTART"
971 if self.is_key_ok(amkey):
972 self._used_these_cards(amkey)
973 return self._header[amkey]
975 # Instead we need to look at azel
976 altaz = self.to_altaz_begin()
977 if altaz is not None:
978 return altaz.secz.to_value()
980 log.warning("%s: Unable to determine airmass of a science observation, returning 1.",
981 self._log_prefix)
982 return 1.0
984 @cache_translation
985 def to_group_counter_start(self):
986 # Effectively the start of the visit as determined by the headers.
987 counter = self.to_observation_counter()
988 # Older data does not have the CURINDEX header.
989 if self.is_key_ok("CURINDEX"):
990 # CURINDEX is 1-based.
991 seq_start = counter - self._header["CURINDEX"] + 1
992 self._used_these_cards("CURINDEX")
993 return seq_start
994 else:
995 # If the counter is 0 we need to pick something else
996 # that is not going to confuse the visit calculation
997 # (since setting everything to 0 will make one big visit).
998 return counter if counter != 0 else self.to_exposure_id()
1000 @cache_translation
1001 def to_group_counter_end(self):
1002 # Effectively the end of the visit as determined by the headers.
1003 counter = self.to_observation_counter()
1004 # Older data does not have the CURINDEX or MAXINDEX headers.
1005 if self.is_key_ok("CURINDEX") and self.is_key_ok("MAXINDEX"):
1006 # CURINDEX is 1-based. CURINDEX == MAXINDEX indicates the
1007 # final exposure in the sequence.
1008 remaining = self._header["MAXINDEX"] - self._header["CURINDEX"]
1009 seq_end = counter + remaining
1010 self._used_these_cards("CURINDEX", "MAXINDEX")
1011 return seq_end
1012 else:
1013 # If the counter is 0 we need to pick something else
1014 # that is not going to confuse the visit calculation
1015 # (since setting everything to 0 will make one big visit).
1016 return counter if counter != 0 else self.to_exposure_id()
1018 @cache_translation
1019 def to_has_simulated_content(self):
1020 # Check all the simulation flags.
1021 # We do not know all the simulation flags that we may have so
1022 # must check every header key. Ideally HIERARCH SIMULATE would
1023 # be a hierarchical header so _header["SIMULATE"] would return
1024 # everything. The header looks like:
1025 #
1026 # HIERARCH SIMULATE ATMCS = / ATMCS Simulation Mode
1027 # HIERARCH SIMULATE ATHEXAPOD = 0 / ATHexapod Simulation Mode
1028 # HIERARCH SIMULATE ATPNEUMATICS = / ATPneumatics Simulation Mode
1029 # HIERARCH SIMULATE ATDOME = 1 / ATDome Simulation Mode
1030 # HIERARCH SIMULATE ATSPECTROGRAPH = 0 / ATSpectrograph Simulation Mode
1031 #
1032 # So any header that includes "SIMULATE" in the key name and has a
1033 # true value implies that something in the data is simulated.
1034 for k, v in self._header.items():
1035 if "SIMULATE" in k and v:
1036 self._used_these_cards(k)
1037 return True
1039 # If the controller is H, P, S, or Q then the data are simulated.
1040 controller = self._get_controller_code()
1041 if controller:
1042 if controller in SIMULATED_CONTROLLERS:
1043 return True
1045 # No simulation flags set.
1046 return False
1048 @cache_translation
1049 def to_relative_humidity(self) -> float | None:
1050 key = "HUMIDITY"
1051 if self.is_key_ok(key):
1052 self._used_these_cards(key)
1053 return self._header[key]
1055 return None
1057 @cache_translation
1058 def to_pressure(self):
1059 key = "PRESSURE"
1060 if self.is_key_ok(key):
1061 value = self._header[key]
1062 self._used_these_cards(key)
1063 # There has been an inconsistency in units for the pressure reading
1064 # so we need to adjust for this.
1065 if value > 10_000:
1066 unit = u.Pa
1067 else:
1068 unit = u.hPa
1069 return value * unit
1071 return None
1073 @cache_translation
1074 def to_temperature(self):
1075 key = "AIRTEMP"
1076 if self.is_key_ok(key):
1077 self._used_these_cards(key)
1078 return self._header[key] * u.deg_C
1079 return None
1081 @cache_translation
1082 def to_can_see_sky(self) -> bool | None:
1083 key = "SHUTTIME"
1084 if self.is_key_ok(key) and self._header[key] == 0.0:
1085 # Shutter never opened so impossible to see sky.
1086 self._used_these_cards(key)
1087 return False
1089 key = "VIGN_MIN"
1090 if self.is_key_ok(key):
1091 self._used_these_cards(key)
1092 vignetted = self._header[key]
1093 match vignetted:
1094 case "FULLY":
1095 return False
1096 case "UNKNOWN":
1097 return None
1098 case _:
1099 return True
1101 # Fallback to using the observation type if the key is missing.
1102 # PhoSim always falls back.
1103 if self._can_check_obstype_for_can_see_sky or self._get_controller_code() == "H":
1104 return super().to_can_see_sky()
1106 # Unknown state.
1107 return None