Coverage for python / lsst / obs / lsst / translators / lsst.py: 23%

453 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:13 +0000

1# This file is currently part of obs_lsst but is written to allow it 

2# to be migrated to the astro_metadata_translator package at a later date. 

3# 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file in this directory for details of code ownership. 

7# 

8# Use of this source code is governed by a 3-clause BSD-style 

9# license that can be found in the LICENSE file. 

10 

11"""Metadata translation support code for LSST headers""" 

12 

13__all__ = ("TZERO", "SIMONYI_LOCATION", "read_detector_ids", 

14 "compute_detector_exposure_id_generic", "LsstBaseTranslator", 

15 "SIMONYI_TELESCOPE") 

16 

17import os.path 

18import yaml 

19import logging 

20import re 

21import datetime 

22import hashlib 

23 

24import astropy.coordinates 

25import astropy.units as u 

26from astropy.time import Time, TimeDelta 

27from astropy.coordinates import EarthLocation 

28 

29from lsst.utils import getPackageDir 

30 

31from astro_metadata_translator import cache_translation, FitsTranslator 

32from astro_metadata_translator.translators.helpers import tracking_from_degree_headers, \ 

33 altaz_from_degree_headers 

34 

35 

36TZERO = Time("2015-01-01T00:00", format="isot", scale="utc") 

37TZERO_DATETIME = TZERO.to_datetime() 

38 

39# Delimiter to use for multiple filters/gratings 

40FILTER_DELIMITER = "~" 

41 

42# Regex to use for parsing a GROUPID string 

43GROUP_RE = re.compile(r"^(\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d)\.(\d\d\d)(?:[\+#](\d+))?$") 

44 

45# LSST Default location in the absence of headers 

46SIMONYI_LOCATION = EarthLocation.from_geodetic(-70.749417, -30.244639, 2663.0) 

47 

48# Name of the main survey telescope 

49SIMONYI_TELESCOPE = "Simonyi Survey Telescope" 

50 

51# Supported controller codes. 

52# The order here directly relates to the resulting exposure ID 

53# calculation. Do not reorder. Add new ones to the end. 

54# OCS, CCS, pHosim, P for simulated OCS, Q for simulated CCS, S for 

55# simulated images. 

56SIMULATED_CONTROLLERS = "HPQS" 

57CONTROLLERS = "OC" + SIMULATED_CONTROLLERS 

58 

59# Number of decimal digits allocated to the sequence number in exposure_ids. 

60_SEQNUM_MAXDIGITS = 5 

61 

62# Number of decimal digits allocated to the day of observation (and controller 

63# code) in exposure_ids. 

64_DAYOBS_MAXDIGITS = 8 

65 

66# Value added to day_obs for controllers after the default. 

67_CONTROLLER_INCREMENT = 1000_00_00 

68 

69# Number of decimal digits used by exposure_ids. 

70EXPOSURE_ID_MAXDIGITS = _SEQNUM_MAXDIGITS + _DAYOBS_MAXDIGITS 

71 

72obs_lsst_packageDir = getPackageDir("obs_lsst") 

73 

74log = logging.getLogger(__name__) 

75 

76 

77def read_detector_ids(policyFile): 

78 """Read a camera policy file and retrieve the mapping from CCD name 

79 to ID. 

80 

81 Parameters 

82 ---------- 

83 policyFile : `str` 

84 Name of YAML policy file to read, relative to the obs_lsst 

85 package. 

86 

87 Returns 

88 ------- 

89 mapping : `dict` of `str` to (`int`, `str`) 

90 A `dict` with keys being the full names of the detectors, and the 

91 value is a `tuple` containing the integer detector number and the 

92 detector serial number. 

93 

94 Notes 

95 ----- 

96 Reads the camera YAML definition file directly and extracts just the 

97 IDs and serials. This routine does not use the standard 

98 `~lsst.obs.base.yamlCamera.YAMLCamera` infrastructure or 

99 `lsst.afw.cameraGeom`. This is because the translators are intended to 

100 have minimal dependencies on LSST infrastructure. 

101 """ 

102 

103 file = os.path.join(obs_lsst_packageDir, policyFile) 

104 try: 

105 with open(file) as fh: 

106 # Use the fast parser since these files are large 

107 camera = yaml.load(fh, Loader=yaml.CSafeLoader) 

108 except OSError as e: 

109 raise ValueError(f"Could not load camera policy file {file}") from e 

110 

111 mapping = {} 

112 for ccd, value in camera["CCDs"].items(): 

113 mapping[ccd] = (int(value["id"]), value["serial"]) 

114 

115 return mapping 

116 

117 

118def compute_detector_exposure_id_generic(exposure_id, detector_num, max_num): 

119 """Compute the detector_exposure_id from the exposure id and the 

120 detector number. 

121 

122 Parameters 

123 ---------- 

124 exposure_id : `int` 

125 The exposure ID. 

126 detector_num : `int` 

127 The detector number. 

128 max_num : `int` 

129 Maximum number of detectors to make space for. 

130 

131 Returns 

132 ------- 

133 detector_exposure_id : `int` 

134 Computed ID. 

135 

136 Raises 

137 ------ 

138 ValueError 

139 The detector number is out of range. 

140 """ 

141 

142 if detector_num is None: 

143 raise ValueError("Detector number must be defined.") 

144 if detector_num >= max_num or detector_num < 0: 

145 raise ValueError(f"Detector number out of range 0 <= {detector_num} < {max_num}") 

146 

147 return max_num*exposure_id + detector_num 

148 

149 

150class LsstBaseTranslator(FitsTranslator): 

151 """Translation methods useful for all LSST-style headers.""" 

152 

153 _const_map = {} 

154 _trivial_map = {} 

155 

156 # Do not specify a name for this translator 

157 cameraPolicyFile = None 

158 """Path to policy file relative to obs_lsst root.""" 

159 

160 detectorMapping = None 

161 """Mapping of detector name to detector number and serial.""" 

162 

163 detectorSerials = None 

164 """Mapping of detector serial number to raft, number, and name.""" 

165 

166 DETECTOR_MAX = 1000 

167 """Maximum number of detectors to use when calculating the 

168 detector_exposure_id. 

169 

170 Note that because this is the maximum number *of* detectors, for 

171 zero-based ``detector_num`` values this is one greater than the maximum 

172 ``detector_num``. It is also often rounded up to the nearest power of 

173 10 anyway, to allow ``detector_exposure_id`` values to be easily decoded by 

174 humans. 

175 """ 

176 

177 _DEFAULT_LOCATION = SIMONYI_LOCATION 

178 """Default telescope location in absence of relevant FITS headers.""" 

179 

180 _ROLLOVER_TIME = TimeDelta(12*60*60, scale="tai", format="sec") 

181 """Time delta for the definition of a Rubin Observatory start of day. 

182 Used when the header is missing. See LSE-400 or SITCOMTN-032 for details. 

183 """ 

184 

185 _non_sky_observation_types: tuple[str, ...] = ("bias", "dark", "flat") 

186 """Observation types that correspond to an observation where the detector 

187 can not see sky photons. 

188 """ 

189 

190 _can_check_obstype_for_can_see_sky: bool = True 

191 """If can_see_sky can not be determined, allow usage of observation type 

192 if `True`. 

193 """ 

194 

195 @classmethod 

196 def __init_subclass__(cls, **kwargs): 

197 """Ensure that subclasses clear their own detector mapping entries 

198 such that subclasses of translators that use detector mappings 

199 do not pick up the incorrect values from a parent.""" 

200 

201 cls.detectorMapping = None 

202 cls.detectorSerials = None 

203 

204 super().__init_subclass__(**kwargs) 

205 

206 def search_paths(self): 

207 """Search paths to use for LSST data when looking for header correction 

208 files. 

209 

210 Returns 

211 ------- 

212 path : `list` 

213 List with a single element containing the full path to the 

214 ``corrections`` directory within the ``obs_lsst`` package. 

215 """ 

216 return [os.path.join(obs_lsst_packageDir, "corrections")] 

217 

218 @classmethod 

219 def observing_date_to_offset(cls, observing_date: astropy.time.Time) -> astropy.time.TimeDelta | None: 

220 """Return the offset to use when calculating the observing day. 

221 

222 Parameters 

223 ---------- 

224 observing_date : `astropy.time.Time` 

225 The date of the observation. Unused. 

226 

227 Returns 

228 ------- 

229 offset : `astropy.time.TimeDelta` 

230 The offset to apply. The default implementation returns a fixed 

231 number but subclasses can return a different value depending 

232 on whether the instrument is in the instrument lab or on the 

233 mountain. 

234 """ 

235 return cls._ROLLOVER_TIME 

236 

237 @classmethod 

238 def compute_detector_exposure_id(cls, exposure_id, detector_num): 

239 """Compute the detector exposure ID from detector number and 

240 exposure ID. 

241 

242 This is a helper method to allow code working outside the translator 

243 infrastructure to use the same algorithm. 

244 

245 Parameters 

246 ---------- 

247 exposure_id : `int` 

248 Unique exposure ID. 

249 detector_num : `int` 

250 Detector number. 

251 

252 Returns 

253 ------- 

254 detector_exposure_id : `int` 

255 The calculated ID. 

256 """ 

257 from .._packer import RubinDimensionPacker 

258 

259 config = RubinDimensionPacker.ConfigClass() 

260 config.use_controllers() 

261 return RubinDimensionPacker.pack_id_pair(exposure_id, detector_num, config=config) 

262 

263 @classmethod 

264 def max_detector_exposure_id(cls): 

265 """The maximum detector exposure ID expected to be generated by 

266 this instrument. 

267 

268 Returns 

269 ------- 

270 max_id : `int` 

271 The maximum value. 

272 """ 

273 max_exposure_id = cls.max_exposure_id() 

274 # We subtract 1 from DETECTOR_MAX because LSST detector_num values are 

275 # zero-based, and detector_max is the maximum number *of* detectors, 

276 # while this returns the (inclusive) maximum ID value. 

277 return cls.compute_detector_exposure_id(max_exposure_id, cls.DETECTOR_MAX - 1) 

278 

279 @classmethod 

280 def max_exposure_id(cls): 

281 """The maximum exposure ID expected from this instrument. 

282 

283 Returns 

284 ------- 

285 max_exposure_id : `int` 

286 The maximum value. 

287 

288 Notes 

289 ----- 

290 The value is hard-coded to reflect historical values that were used 

291 for various controllers before the sequence counter was unified. 

292 """ 

293 # Assumes maximum observing date of 2050-12-31, 99,999 exposures per 

294 # day and 6 controllers. 

295 return 7050123199999 

296 

297 @classmethod 

298 def detector_mapping(cls): 

299 """Returns the mapping of full name to detector ID and serial. 

300 

301 Returns 

302 ------- 

303 mapping : `dict` of `str`:`tuple` 

304 Returns the mapping of full detector name (group+detector) 

305 to detector number and serial. 

306 

307 Raises 

308 ------ 

309 ValueError 

310 Raised if no camera policy file has been registered with this 

311 translation class. 

312 

313 Notes 

314 ----- 

315 Will construct the mapping if none has previously been constructed. 

316 """ 

317 if cls.cameraPolicyFile is not None: 

318 if cls.detectorMapping is None: 

319 cls.detectorMapping = read_detector_ids(cls.cameraPolicyFile) 

320 else: 

321 raise ValueError(f"Translation class '{cls.__name__}' has no registered camera policy file") 

322 

323 return cls.detectorMapping 

324 

325 @classmethod 

326 def detector_serials(cls): 

327 """Obtain the mapping of detector serial to detector group, name, 

328 and number. 

329 

330 Returns 

331 ------- 

332 info : `dict` of `tuple` of (`str`, `str`, `int`) 

333 A `dict` with the serial numbers as keys and values of detector 

334 group, name, and number. 

335 """ 

336 if cls.detectorSerials is None: 

337 detector_mapping = cls.detector_mapping() 

338 

339 if detector_mapping is not None: 

340 # Form mapping to go from serial number to names/numbers 

341 serials = {} 

342 for fullname, (id, serial) in cls.detectorMapping.items(): 

343 raft, detector_name = fullname.split("_") 

344 if serial in serials: 

345 raise RuntimeError(f"Serial {serial} is defined in multiple places") 

346 serials[serial] = (raft, detector_name, id) 

347 cls.detectorSerials = serials 

348 else: 

349 raise RuntimeError("Unable to obtain detector mapping information") 

350 

351 return cls.detectorSerials 

352 

353 @classmethod 

354 def compute_detector_num_from_name(cls, detector_group, detector_name): 

355 """Helper method to return the detector number from the name. 

356 

357 Parameters 

358 ---------- 

359 detector_group : `str` 

360 Name of the detector grouping. This is generally the raft name. 

361 detector_name : `str` 

362 Detector name. 

363 

364 Returns 

365 ------- 

366 num : `int` 

367 Detector number. 

368 """ 

369 fullname = f"{detector_group}_{detector_name}" 

370 

371 num = None 

372 detector_mapping = cls.detector_mapping() 

373 if detector_mapping is None: 

374 raise RuntimeError("Unable to obtain detector mapping information") 

375 

376 if fullname in detector_mapping: 

377 num = detector_mapping[fullname] 

378 else: 

379 log.warning(f"Unable to determine detector number from detector name {fullname}") 

380 return None 

381 

382 return num[0] 

383 

384 @classmethod 

385 def compute_detector_info_from_serial(cls, detector_serial): 

386 """Helper method to return the detector information from the serial. 

387 

388 Parameters 

389 ---------- 

390 detector_serial : `str` 

391 Detector serial ID. 

392 

393 Returns 

394 ------- 

395 info : `tuple` of (`str`, `str`, `int`) 

396 Detector group, name, and number. 

397 """ 

398 serial_mapping = cls.detector_serials() 

399 if serial_mapping is None: 

400 raise RuntimeError("Unable to obtain serial mapping information") 

401 

402 if detector_serial in serial_mapping: 

403 info = serial_mapping[detector_serial] 

404 else: 

405 raise RuntimeError("Unable to determine detector information from detector serial" 

406 f" {detector_serial}") 

407 

408 return info 

409 

410 @staticmethod 

411 def compute_exposure_id(dayobs, seqnum, controller=None): 

412 """Helper method to calculate the exposure_id. 

413 

414 Parameters 

415 ---------- 

416 dayobs : `str` or `int` 

417 Day of observation in either YYYYMMDD or YYYY-MM-DD format. 

418 If the string looks like ISO format it will be truncated before the 

419 ``T`` before being handled. 

420 seqnum : `int` or `str` 

421 Sequence number. 

422 controller : `str`, optional 

423 Controller to use. If this is "O", no change is made to the 

424 exposure ID. Before Oct 5 2023, if it is "C" a 1000 is added to the 

425 year component of the exposure ID. If it is "H" a 2000 is added to 

426 the year component. Before Apr 18 2025, this sequence continues 

427 with "P", "Q", and "S" controllers. `None` indicates that the 

428 controller is not relevant to the exposure ID calculation 

429 (generally this is the case for test stand data). 

430 

431 Returns 

432 ------- 

433 exposure_id : `int` 

434 Exposure ID in form YYYYMMDDnnnnn form. 

435 """ 

436 if isinstance(seqnum, str): 

437 seqnum = int(seqnum) 

438 # We really want an integer but the checks require a str. 

439 if isinstance(dayobs, int): 

440 dayobs = str(dayobs) 

441 

442 if "T" in dayobs: 

443 dayobs = dayobs[:dayobs.find("T")] 

444 

445 dayobs = dayobs.replace("-", "") 

446 

447 if len(dayobs) != 8: 

448 raise ValueError(f"Malformed dayobs: {dayobs}") 

449 

450 # Expect no more than 99,999 exposures in a day 

451 if seqnum >= 10**_SEQNUM_MAXDIGITS: 

452 raise ValueError(f"Sequence number ({seqnum}) exceeds limit") 

453 

454 dayobs = int(dayobs) 

455 if dayobs > 20231004 and controller == "C": 

456 # As of this date the CCS controller has a unified counter 

457 # with the OCS, so there is no need to adjust the dayobs 

458 # to make unique exposure IDs. 

459 controller = None 

460 elif dayobs > 20250417 and controller in {"P", "S", "Q"}: 

461 # At some point in the past the PSQ and OC controller sequence 

462 # counters were unified. To avoid confusion with previous files 

463 # that may already be ingested where we do not want to change 

464 # the exposure ID, only assume identical sequences from this date. 

465 controller = None 

466 

467 # Camera control changes the exposure ID 

468 if controller is not None: 

469 index = CONTROLLERS.find(controller) 

470 if index == -1: 

471 raise ValueError(f"Supplied controller, '{controller}' is not " 

472 f"in supported list: {CONTROLLERS}") 

473 

474 # Increment a thousand years per controller 

475 dayobs += _CONTROLLER_INCREMENT * index 

476 

477 # Form the number as a string zero padding the sequence number 

478 idstr = f"{dayobs}{seqnum:0{_SEQNUM_MAXDIGITS}d}" 

479 

480 # Exposure ID has to be an integer 

481 return int(idstr) 

482 

483 @staticmethod 

484 def unpack_exposure_id(exposure_id): 

485 """Unpack an exposure ID into dayobs, seqnum, and controller. 

486 

487 Parameters 

488 ---------- 

489 exposure_id : `int` 

490 Integer exposure ID produced by `compute_exposure_id`. 

491 

492 Returns 

493 ------- 

494 dayobs : `str` 

495 Day of observation as a YYYYMMDD string. 

496 seqnum : `int` 

497 Sequence number. 

498 controller : `str` 

499 Controller code. Will be ``O`` (but should be ignored) for IDs 

500 produced by calling `compute_exposure_id` with ``controller=None``. 

501 """ 

502 dayobs, seqnum = divmod(exposure_id, 10**_SEQNUM_MAXDIGITS) 

503 controller_index = dayobs // _CONTROLLER_INCREMENT - 2 

504 dayobs -= controller_index * _CONTROLLER_INCREMENT 

505 return (str(dayobs), seqnum, CONTROLLERS[controller_index], ) 

506 

507 def _is_on_mountain(self): 

508 """Indicate whether these data are coming from the instrument 

509 installed on the mountain. 

510 

511 Returns 

512 ------- 

513 is : `bool` 

514 `True` if instrument is on the mountain. 

515 """ 

516 if "TSTAND" in self._header: 

517 return False 

518 return True 

519 

520 def is_on_sky(self): 

521 """Determine if this is an on-sky observation. 

522 

523 Returns 

524 ------- 

525 is_on_sky : `bool` 

526 Returns True if this is a observation on sky on the 

527 summit. 

528 """ 

529 # For LSST we think on sky unless tracksys is local 

530 if self.is_key_ok("TRACKSYS"): 

531 if self._header["TRACKSYS"].lower() == "local": 

532 # not on sky 

533 return False 

534 

535 # These are obviously not on sky 

536 if self.to_observation_type() in self._non_sky_observation_types: 

537 return False 

538 

539 return self._is_on_mountain() 

540 

541 @cache_translation 

542 def to_location(self): 

543 # Docstring will be inherited. Property defined in properties.py 

544 if not self._is_on_mountain(): 

545 return None 

546 try: 

547 # Try standard FITS headers 

548 return super().to_location() 

549 except (KeyError, TypeError): 

550 return self._DEFAULT_LOCATION 

551 

552 @cache_translation 

553 def to_datetime_begin(self): 

554 # Docstring will be inherited. Property defined in properties.py 

555 # Prefer -BEG over -OBS. Let it fail with KeyError if no headers 

556 # can be found. 

557 date_key = "MJD-BEG" 

558 date_fmt = "mjd" 

559 for k in ("MJD-BEG", "DATE-BEG", "MJD-OBS", "DATE-OBS"): 

560 if self.is_key_ok(k): 

561 date_key = k 

562 date_fmt = "mjd" if k.startswith("MJD") else "fits" 

563 break 

564 

565 self._used_these_cards(date_key) 

566 return Time(self._header[date_key], scale="tai", format=date_fmt) 

567 

568 @cache_translation 

569 def to_datetime_end(self): 

570 # Docstring will be inherited. Property defined in properties.py 

571 if self.is_key_ok("DATE-END"): 

572 return super().to_datetime_end() 

573 

574 exposure_time = self.to_exposure_time() 

575 if exposure_time.value < 0.0: 

576 # Some translators deliberately return -1.0s if the exposure 

577 # time can not be determined. In that scenario set end time 

578 # to the same value as the start time. 

579 return self.to_datetime_begin() 

580 

581 return self.to_datetime_begin() + exposure_time 

582 

583 @cache_translation 

584 def to_detector_num(self): 

585 # Docstring will be inherited. Property defined in properties.py 

586 raft = self.to_detector_group() 

587 detector = self.to_detector_name() 

588 return self.compute_detector_num_from_name(raft, detector) 

589 

590 @cache_translation 

591 def to_detector_exposure_id(self): 

592 # Docstring will be inherited. Property defined in properties.py 

593 exposure_id = self.to_exposure_id() 

594 num = self.to_detector_num() 

595 return self.compute_detector_exposure_id(exposure_id, num) 

596 

597 @cache_translation 

598 def to_observation_type(self): 

599 # Docstring will be inherited. Property defined in properties.py 

600 obstype = self._header["IMGTYPE"] 

601 self._used_these_cards("IMGTYPE") 

602 obstype = obstype.lower() 

603 if obstype in ("skyexp", "object"): 

604 obstype = "science" 

605 return obstype 

606 

607 @cache_translation 

608 def to_observation_reason(self): 

609 # Docstring will be inherited. Property defined in properties.py 

610 for key in ("REASON", "TESTTYPE"): 

611 if self.is_key_ok(key): 

612 reason = self._header[key] 

613 self._used_these_cards(key) 

614 return reason.lower() 

615 # no specific header present so use the default translation 

616 return super().to_observation_reason() 

617 

618 @cache_translation 

619 def to_dark_time(self): 

620 """Calculate the dark time. 

621 

622 If a DARKTIME header is not found, the value is assumed to be 

623 identical to the exposure time. 

624 

625 Returns 

626 ------- 

627 dark : `astropy.units.Quantity` 

628 The dark time in seconds. 

629 """ 

630 if self.is_key_ok("DARKTIME"): 

631 darktime = self._header["DARKTIME"]*u.s 

632 self._used_these_cards("DARKTIME") 

633 else: 

634 log.warning("%s: Unable to determine dark time. Setting from exposure time.", 

635 self._log_prefix) 

636 darktime = self.to_exposure_time() 

637 return darktime 

638 

639 def _get_controller_code(self) -> str | None: 

640 """Return the controller code. 

641 

642 Returns 

643 ------- 

644 code : `str` 

645 Single character code representing the controller. Returns 

646 `None` if no controller can be determined. 

647 """ 

648 key = "CONTRLLR" 

649 if self.is_key_ok(key): 

650 controller = self._header[key] 

651 self._used_these_cards(key) 

652 else: 

653 controller = None 

654 return controller 

655 

656 @cache_translation 

657 def to_exposure_id(self): 

658 """Generate a unique exposure ID number 

659 

660 This is a combination of DAYOBS and SEQNUM, and optionally 

661 CONTRLLR. 

662 

663 Returns 

664 ------- 

665 exposure_id : `int` 

666 Unique exposure number. 

667 """ 

668 if "CALIB_ID" in self._header: 

669 self._used_these_cards("CALIB_ID") 

670 return None 

671 

672 dayobs = self._header["DAYOBS"] 

673 seqnum = self._header["SEQNUM"] 

674 self._used_these_cards("DAYOBS", "SEQNUM") 

675 

676 controller = self._get_controller_code() 

677 

678 return self.compute_exposure_id(dayobs, seqnum, controller=controller) 

679 

680 @cache_translation 

681 def to_visit_id(self): 

682 """Calculate the visit associated with this exposure. 

683 

684 Notes 

685 ----- 

686 For LATISS and LSSTCam the default visit is derived from the 

687 exposure group. For other instruments we return the exposure_id. 

688 """ 

689 

690 exposure_group = self.to_exposure_group() 

691 # If the group is an int we return it 

692 try: 

693 visit_id = int(exposure_group) 

694 return visit_id 

695 except ValueError: 

696 pass 

697 

698 # A Group is defined as ISO date with an extension 

699 # The integer must be the same for a given group so we can never 

700 # use datetime_begin. 

701 # Nominally a GROUPID looks like "ISODATE+N" where the +N is 

702 # optional. This can be converted to seconds since epoch with 

703 # an adjustment for N. 

704 # For early data lacking that form we hash the group and return 

705 # the int. 

706 matches_date = GROUP_RE.match(exposure_group) 

707 if matches_date: 

708 iso_str = matches_date.group(1) 

709 fraction = matches_date.group(2) 

710 n = matches_date.group(3) 

711 if n is not None: 

712 n = int(n) 

713 else: 

714 n = 0 

715 iso = datetime.datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S") 

716 

717 tdelta = iso - TZERO_DATETIME 

718 epoch = int(tdelta.total_seconds()) 

719 

720 # Form the integer from EPOCH + 3 DIGIT FRAC + 0-pad N 

721 visit_id = int(f"{epoch}{fraction}{n:04d}") 

722 else: 

723 # Non-standard string so convert to numbers 

724 # using a hash function. Use the first N hex digits 

725 group_bytes = exposure_group.encode("us-ascii") 

726 hasher = hashlib.blake2b(group_bytes) 

727 # Need to be big enough it does not possibly clash with the 

728 # date-based version above 

729 digest = hasher.hexdigest()[:14] 

730 visit_id = int(digest, base=16) 

731 

732 # To help with hash collision, append the string length 

733 visit_id = int(f"{visit_id}{len(exposure_group):02d}") 

734 

735 return visit_id 

736 

737 @cache_translation 

738 def to_physical_filter(self): 

739 """Calculate the physical filter name. 

740 

741 Returns 

742 ------- 

743 filter : `str` 

744 Name of filter. Can be a combination of FILTER, FILTER1 and FILTER2 

745 headers joined by a "~". Returns "unknown" if no filter is declared 

746 """ 

747 joined = self._join_keyword_values(["FILTER", "FILTER1", "FILTER2"], delim=FILTER_DELIMITER) 

748 if not joined: 

749 joined = "unknown" 

750 

751 # Replace instances of "NONE" with "none". 

752 joined = joined.replace("NONE", "none") 

753 

754 return joined 

755 

756 @cache_translation 

757 def to_tracking_radec(self): 

758 # Do not even attempt to attach an RA/Dec for observations that we 

759 # know are not going to be tracking. The Rubin OCS can sometimes 

760 # report the telescope is tracking when it's not when doing 

761 # calibrations like these. Darks are sometimes taken whilst tracking 

762 # to test stability so those are special-cased. 

763 non_sky_obstypes = {t for t in self._non_sky_observation_types if t != "dark"} 

764 if self.to_observation_type() in non_sky_obstypes: 

765 return None 

766 

767 # Not an observation that is tracking in RA/Dec so it is not 

768 # appropriate to report a value for this. 

769 if self.are_keys_ok(["TRACKSYS"]) and self._header["TRACKSYS"] != "RADEC": 

770 return None 

771 

772 # RA/DEC are *derived* headers and for the case where the DATE-BEG 

773 # is 1970 they are garbage and should not be used. 

774 try: 

775 if self._header["DATE-OBS"] == self._header["DATE"]: 

776 # A fixed up date -- use AZEL as source of truth 

777 altaz = self.to_altaz_begin() 

778 radec = astropy.coordinates.SkyCoord(altaz.transform_to(astropy.coordinates.ICRS()), 

779 obstime=altaz.obstime, 

780 location=altaz.location) 

781 else: 

782 radecsys = ("RADESYS",) 

783 radecpairs = (("RASTART", "DECSTART"), ("RA", "DEC")) 

784 radec = tracking_from_degree_headers(self, radecsys, radecpairs) 

785 except Exception: 

786 # If this observation was not formally on sky then we are allowed 

787 # to return None. 

788 if self.is_on_sky(): 

789 raise 

790 radec = None 

791 

792 return radec 

793 

794 @cache_translation 

795 def to_altaz_begin(self): 

796 return self._to_altaz("AZSTART", "ELSTART") 

797 

798 @cache_translation 

799 def to_altaz_end(self): 

800 return self._to_altaz("AZEND", "ELEND") 

801 

802 def _to_altaz(self, az_key, el_key): 

803 if not self._is_on_mountain(): 

804 return None 

805 

806 # H controller data are sometimes science observations without 

807 # having AZx header. The code lets those return nothing. 

808 if self._get_controller_code() == "H" and not self.are_keys_ok([el_key, az_key]): 

809 return None 

810 

811 # Always attempt to find the alt/az values regardless of observation 

812 # type. 

813 altaz = altaz_from_degree_headers(self, ((el_key, az_key),), 

814 self.to_datetime_begin(), is_zd=False, max_alt=95.55, min_alt=-5.55) 

815 self._used_these_cards(el_key, az_key) 

816 return altaz 

817 

818 @cache_translation 

819 def to_exposure_group(self): 

820 """Calculate the exposure group string. 

821 

822 For LSSTCam and LATISS this is read from the ``GROUPID`` header. 

823 If that header is missing the exposure_id is returned instead as 

824 a string. 

825 """ 

826 if self.is_key_ok("GROUPID"): 

827 exposure_group = self._header["GROUPID"] 

828 self._used_these_cards("GROUPID") 

829 # Sometimes people forget to quote date strings in YAML 

830 # correction files. This is a problem because we are assuming 

831 # strings for matching across multiple exposures and if the 

832 # value in the YAML file is not milliseconds then there is 

833 # a potential disaster. 

834 if isinstance(exposure_group, datetime.datetime): 

835 exposure_group = exposure_group.isoformat(timespec="milliseconds") 

836 return exposure_group 

837 return super().to_exposure_group() 

838 

839 @cache_translation 

840 def to_focus_z(self): 

841 """Return the defocal distance of the camera in units of mm. 

842 If there is no ``FOCUSZ`` value in the header it will return 

843 the default 0.0mm value. 

844 

845 Returns 

846 ------- 

847 focus_z: `astropy.units.Quantity` 

848 The defocal distance from header in mm or the 0.0mm default 

849 """ 

850 if self.is_key_ok("FOCUSZ"): 

851 # Some broken files have strings instead of floats. 

852 focus_z = float(self._header["FOCUSZ"]) 

853 return focus_z * u.mm 

854 return super().to_focus_z() 

855 

856 @staticmethod 

857 def _is_filter_empty(filter): 

858 """Return true if the supplied filter indicates an empty filter slot 

859 

860 Parameters 

861 ---------- 

862 filter : `str` 

863 The filter string to check. 

864 

865 Returns 

866 ------- 

867 is_empty : `bool` 

868 `True` if the filter string looks like it is referring to an 

869 empty filter slot. For example this can be if the filter is 

870 "empty" or "empty_2". 

871 """ 

872 return bool(re.match(r"empty_?\d*$", filter.lower())) 

873 

874 def _determine_primary_filter(self): 

875 """Determine the primary filter from the ``FILTER`` header. 

876 

877 Returns 

878 ------- 

879 filter : `str` 

880 The contents of the ``FILTER`` header with some appropriate 

881 defaulting. 

882 """ 

883 

884 if self.is_key_ok("FILTER"): 

885 physical_filter = self._header["FILTER"] 

886 self._used_these_cards("FILTER") 

887 

888 if self._is_filter_empty(physical_filter): 

889 physical_filter = "empty" 

890 else: 

891 # Be explicit about having no knowledge of the filter 

892 # by setting it to "unknown". It should always have a value. 

893 physical_filter = "unknown" 

894 

895 # Warn if the filter being unknown is important 

896 obstype = self.to_observation_type() 

897 if obstype not in ("bias", "dark"): 

898 log.warning("%s: Unable to determine the filter", 

899 self._log_prefix) 

900 

901 return physical_filter 

902 

903 @cache_translation 

904 def to_observing_day(self): 

905 """Return the day of observation as YYYYMMDD integer. 

906 

907 For LSSTCam and other compliant instruments this is the value 

908 of the DAYOBS header. 

909 

910 Returns 

911 ------- 

912 obs_day : `int` 

913 The day of observation. 

914 """ 

915 if self.is_key_ok("DAYOBS"): 

916 self._used_these_cards("DAYOBS") 

917 return int(self._header["DAYOBS"]) 

918 

919 return super().to_observing_day() 

920 

921 @cache_translation 

922 def to_observation_counter(self): 

923 """Return the sequence number within the observing day. 

924 

925 Returns 

926 ------- 

927 counter : `int` 

928 The sequence number for this day. 

929 """ 

930 if self.is_key_ok("SEQNUM"): 

931 # Some older LATISS data may not have the header 

932 # but this is corrected in fix_header for LATISS. 

933 self._used_these_cards("SEQNUM") 

934 return int(self._header["SEQNUM"]) 

935 

936 # This indicates a problem so we warn and return a 0 

937 log.warning("%s: Unable to determine the observation counter so returning 0", 

938 self._log_prefix) 

939 return 0 

940 

941 @cache_translation 

942 def to_boresight_rotation_coord(self): 

943 """Boresight rotation angle. 

944 

945 Only relevant for science observations. 

946 """ 

947 unknown = "unknown" 

948 if not self.is_on_sky(): 

949 return unknown 

950 

951 self._used_these_cards("ROTCOORD") 

952 coord = self._header.get("ROTCOORD", unknown) 

953 if coord is None: 

954 coord = unknown 

955 return coord 

956 

957 @cache_translation 

958 def to_boresight_airmass(self): 

959 """Calculate airmass at boresight at start of observation. 

960 

961 Notes 

962 ----- 

963 Early data are missing AMSTART header so we fall back to calculating 

964 it from ELSTART. 

965 """ 

966 if not self.is_on_sky(): 

967 return None 

968 

969 # This observation should have AMSTART 

970 amkey = "AMSTART" 

971 if self.is_key_ok(amkey): 

972 self._used_these_cards(amkey) 

973 return self._header[amkey] 

974 

975 # Instead we need to look at azel 

976 altaz = self.to_altaz_begin() 

977 if altaz is not None: 

978 return altaz.secz.to_value() 

979 

980 log.warning("%s: Unable to determine airmass of a science observation, returning 1.", 

981 self._log_prefix) 

982 return 1.0 

983 

984 @cache_translation 

985 def to_group_counter_start(self): 

986 # Effectively the start of the visit as determined by the headers. 

987 counter = self.to_observation_counter() 

988 # Older data does not have the CURINDEX header. 

989 if self.is_key_ok("CURINDEX"): 

990 # CURINDEX is 1-based. 

991 seq_start = counter - self._header["CURINDEX"] + 1 

992 self._used_these_cards("CURINDEX") 

993 return seq_start 

994 else: 

995 # If the counter is 0 we need to pick something else 

996 # that is not going to confuse the visit calculation 

997 # (since setting everything to 0 will make one big visit). 

998 return counter if counter != 0 else self.to_exposure_id() 

999 

1000 @cache_translation 

1001 def to_group_counter_end(self): 

1002 # Effectively the end of the visit as determined by the headers. 

1003 counter = self.to_observation_counter() 

1004 # Older data does not have the CURINDEX or MAXINDEX headers. 

1005 if self.is_key_ok("CURINDEX") and self.is_key_ok("MAXINDEX"): 

1006 # CURINDEX is 1-based. CURINDEX == MAXINDEX indicates the 

1007 # final exposure in the sequence. 

1008 remaining = self._header["MAXINDEX"] - self._header["CURINDEX"] 

1009 seq_end = counter + remaining 

1010 self._used_these_cards("CURINDEX", "MAXINDEX") 

1011 return seq_end 

1012 else: 

1013 # If the counter is 0 we need to pick something else 

1014 # that is not going to confuse the visit calculation 

1015 # (since setting everything to 0 will make one big visit). 

1016 return counter if counter != 0 else self.to_exposure_id() 

1017 

1018 @cache_translation 

1019 def to_has_simulated_content(self): 

1020 # Check all the simulation flags. 

1021 # We do not know all the simulation flags that we may have so 

1022 # must check every header key. Ideally HIERARCH SIMULATE would 

1023 # be a hierarchical header so _header["SIMULATE"] would return 

1024 # everything. The header looks like: 

1025 # 

1026 # HIERARCH SIMULATE ATMCS = / ATMCS Simulation Mode 

1027 # HIERARCH SIMULATE ATHEXAPOD = 0 / ATHexapod Simulation Mode 

1028 # HIERARCH SIMULATE ATPNEUMATICS = / ATPneumatics Simulation Mode 

1029 # HIERARCH SIMULATE ATDOME = 1 / ATDome Simulation Mode 

1030 # HIERARCH SIMULATE ATSPECTROGRAPH = 0 / ATSpectrograph Simulation Mode 

1031 # 

1032 # So any header that includes "SIMULATE" in the key name and has a 

1033 # true value implies that something in the data is simulated. 

1034 for k, v in self._header.items(): 

1035 if "SIMULATE" in k and v: 

1036 self._used_these_cards(k) 

1037 return True 

1038 

1039 # If the controller is H, P, S, or Q then the data are simulated. 

1040 controller = self._get_controller_code() 

1041 if controller: 

1042 if controller in SIMULATED_CONTROLLERS: 

1043 return True 

1044 

1045 # No simulation flags set. 

1046 return False 

1047 

1048 @cache_translation 

1049 def to_relative_humidity(self) -> float | None: 

1050 key = "HUMIDITY" 

1051 if self.is_key_ok(key): 

1052 self._used_these_cards(key) 

1053 return self._header[key] 

1054 

1055 return None 

1056 

1057 @cache_translation 

1058 def to_pressure(self): 

1059 key = "PRESSURE" 

1060 if self.is_key_ok(key): 

1061 value = self._header[key] 

1062 self._used_these_cards(key) 

1063 # There has been an inconsistency in units for the pressure reading 

1064 # so we need to adjust for this. 

1065 if value > 10_000: 

1066 unit = u.Pa 

1067 else: 

1068 unit = u.hPa 

1069 return value * unit 

1070 

1071 return None 

1072 

1073 @cache_translation 

1074 def to_temperature(self): 

1075 key = "AIRTEMP" 

1076 if self.is_key_ok(key): 

1077 self._used_these_cards(key) 

1078 return self._header[key] * u.deg_C 

1079 return None 

1080 

1081 @cache_translation 

1082 def to_can_see_sky(self) -> bool | None: 

1083 key = "SHUTTIME" 

1084 if self.is_key_ok(key) and self._header[key] == 0.0: 

1085 # Shutter never opened so impossible to see sky. 

1086 self._used_these_cards(key) 

1087 return False 

1088 

1089 key = "VIGN_MIN" 

1090 if self.is_key_ok(key): 

1091 self._used_these_cards(key) 

1092 vignetted = self._header[key] 

1093 match vignetted: 

1094 case "FULLY": 

1095 return False 

1096 case "UNKNOWN": 

1097 return None 

1098 case _: 

1099 return True 

1100 

1101 # Fallback to using the observation type if the key is missing. 

1102 # PhoSim always falls back. 

1103 if self._can_check_obstype_for_can_see_sky or self._get_controller_code() == "H": 

1104 return super().to_can_see_sky() 

1105 

1106 # Unknown state. 

1107 return None