Coverage for python / lsst / obs / lsst / translators / lsst.py: 23%

451 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 08:39 +0000

1# This file is currently part of obs_lsst but is written to allow it 

2# to be migrated to the astro_metadata_translator package at a later date. 

3# 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file in this directory for details of code ownership. 

7# 

8# Use of this source code is governed by a 3-clause BSD-style 

9# license that can be found in the LICENSE file. 

10 

11"""Metadata translation support code for LSST headers""" 

12 

13__all__ = ("TZERO", "SIMONYI_LOCATION", "read_detector_ids", 

14 "compute_detector_exposure_id_generic", "LsstBaseTranslator", 

15 "SIMONYI_TELESCOPE") 

16 

17import yaml 

18import logging 

19import re 

20import datetime 

21import hashlib 

22 

23import astropy.coordinates 

24import astropy.units as u 

25from astropy.time import Time, TimeDelta 

26from astropy.coordinates import EarthLocation 

27 

28from lsst.resources import ResourcePath, ResourcePathExpression 

29 

30from astro_metadata_translator import cache_translation, FitsTranslator 

31from astro_metadata_translator.translators.helpers import tracking_from_degree_headers, \ 

32 altaz_from_degree_headers 

33 

34 

35TZERO = Time("2015-01-01T00:00", format="isot", scale="utc") 

36TZERO_DATETIME = TZERO.to_datetime() 

37 

38# Delimiter to use for multiple filters/gratings 

39FILTER_DELIMITER = "~" 

40 

41# Regex to use for parsing a GROUPID string 

42GROUP_RE = re.compile(r"^(\d\d\d\d\-\d\d\-\d\dT\d\d:\d\d:\d\d)\.(\d\d\d)(?:[\+#](\d+))?$") 

43 

44# LSST Default location in the absence of headers 

45SIMONYI_LOCATION = EarthLocation.from_geodetic(-70.749417, -30.244639, 2663.0) 

46 

47# Name of the main survey telescope 

48SIMONYI_TELESCOPE = "Simonyi Survey Telescope" 

49 

50# Supported controller codes. 

51# The order here directly relates to the resulting exposure ID 

52# calculation. Do not reorder. Add new ones to the end. 

53# OCS, CCS, pHosim, P for simulated OCS, Q for simulated CCS, S for 

54# simulated images. 

55SIMULATED_CONTROLLERS = "HPQS" 

56CONTROLLERS = "OC" + SIMULATED_CONTROLLERS 

57 

58# Number of decimal digits allocated to the sequence number in exposure_ids. 

59_SEQNUM_MAXDIGITS = 5 

60 

61# Number of decimal digits allocated to the day of observation (and controller 

62# code) in exposure_ids. 

63_DAYOBS_MAXDIGITS = 8 

64 

65# Value added to day_obs for controllers after the default. 

66_CONTROLLER_INCREMENT = 1000_00_00 

67 

68# Number of decimal digits used by exposure_ids. 

69EXPOSURE_ID_MAXDIGITS = _SEQNUM_MAXDIGITS + _DAYOBS_MAXDIGITS 

70 

71log = logging.getLogger(__name__) 

72 

73 

74def read_detector_ids(policyFile: ResourcePathExpression): 

75 """Read a camera policy file and retrieve the mapping from CCD name 

76 to ID. 

77 

78 Parameters 

79 ---------- 

80 policyFile : `lsst.resources.ResourcePathExpression` 

81 Name of YAML policy file to read, relative to the resources root 

82 directory. This usually means that the "policy" directory is included. 

83 

84 Returns 

85 ------- 

86 mapping : `dict` of `str` to (`int`, `str`) 

87 A `dict` with keys being the full names of the detectors, and the 

88 value is a `tuple` containing the integer detector number and the 

89 detector serial number. 

90 

91 Notes 

92 ----- 

93 Reads the camera YAML definition file directly and extracts just the 

94 IDs and serials. This routine does not use the standard 

95 `~lsst.obs.base.yamlCamera.YAMLCamera` infrastructure or 

96 `lsst.afw.cameraGeom`. This is because the translators are intended to 

97 have minimal dependencies on LSST infrastructure. 

98 """ 

99 root = ResourcePath("resource://lsst.obs.lsst/resources/", forceDirectory=True) 

100 resource = ResourcePath(policyFile, root=root) 

101 try: 

102 # Use the fast parser since these files are large 

103 camera = yaml.load(resource.read(), Loader=yaml.CSafeLoader) 

104 except OSError as e: 

105 raise ValueError(f"Could not load camera policy file {resource}") from e 

106 

107 mapping = {} 

108 for ccd, value in camera["CCDs"].items(): 

109 mapping[ccd] = (int(value["id"]), value["serial"]) 

110 

111 return mapping 

112 

113 

114def compute_detector_exposure_id_generic(exposure_id, detector_num, max_num): 

115 """Compute the detector_exposure_id from the exposure id and the 

116 detector number. 

117 

118 Parameters 

119 ---------- 

120 exposure_id : `int` 

121 The exposure ID. 

122 detector_num : `int` 

123 The detector number. 

124 max_num : `int` 

125 Maximum number of detectors to make space for. 

126 

127 Returns 

128 ------- 

129 detector_exposure_id : `int` 

130 Computed ID. 

131 

132 Raises 

133 ------ 

134 ValueError 

135 The detector number is out of range. 

136 """ 

137 

138 if detector_num is None: 

139 raise ValueError("Detector number must be defined.") 

140 if detector_num >= max_num or detector_num < 0: 

141 raise ValueError(f"Detector number out of range 0 <= {detector_num} < {max_num}") 

142 

143 return max_num*exposure_id + detector_num 

144 

145 

146class LsstBaseTranslator(FitsTranslator): 

147 """Translation methods useful for all LSST-style headers.""" 

148 

149 _const_map = {} 

150 _trivial_map = {} 

151 default_resource_package = "lsst.obs.lsst" 

152 default_resource_root = "resources/corrections/" 

153 

154 # Do not specify a name for this translator 

155 cameraPolicyFile = None 

156 """Path to policy file relative to obs_lsst root.""" 

157 

158 detectorMapping = None 

159 """Mapping of detector name to detector number and serial.""" 

160 

161 detectorSerials = None 

162 """Mapping of detector serial number to raft, number, and name.""" 

163 

164 DETECTOR_MAX = 1000 

165 """Maximum number of detectors to use when calculating the 

166 detector_exposure_id. 

167 

168 Note that because this is the maximum number *of* detectors, for 

169 zero-based ``detector_num`` values this is one greater than the maximum 

170 ``detector_num``. It is also often rounded up to the nearest power of 

171 10 anyway, to allow ``detector_exposure_id`` values to be easily decoded by 

172 humans. 

173 """ 

174 

175 _DEFAULT_LOCATION = SIMONYI_LOCATION 

176 """Default telescope location in absence of relevant FITS headers.""" 

177 

178 _ROLLOVER_TIME = TimeDelta(12*60*60, scale="tai", format="sec") 

179 """Time delta for the definition of a Rubin Observatory start of day. 

180 Used when the header is missing. See LSE-400 or SITCOMTN-032 for details. 

181 """ 

182 

183 _non_sky_observation_types: tuple[str, ...] = ("bias", "dark", "flat") 

184 """Observation types that correspond to an observation where the detector 

185 can not see sky photons. 

186 """ 

187 

188 _can_check_obstype_for_can_see_sky: bool = True 

189 """If can_see_sky can not be determined, allow usage of observation type 

190 if `True`. 

191 """ 

192 

193 @classmethod 

194 def __init_subclass__(cls, **kwargs): 

195 """Ensure that subclasses clear their own detector mapping entries 

196 such that subclasses of translators that use detector mappings 

197 do not pick up the incorrect values from a parent.""" 

198 

199 cls.detectorMapping = None 

200 cls.detectorSerials = None 

201 

202 super().__init_subclass__(**kwargs) 

203 

204 @classmethod 

205 def observing_date_to_offset(cls, observing_date: astropy.time.Time) -> astropy.time.TimeDelta | None: 

206 """Return the offset to use when calculating the observing day. 

207 

208 Parameters 

209 ---------- 

210 observing_date : `astropy.time.Time` 

211 The date of the observation. Unused. 

212 

213 Returns 

214 ------- 

215 offset : `astropy.time.TimeDelta` 

216 The offset to apply. The default implementation returns a fixed 

217 number but subclasses can return a different value depending 

218 on whether the instrument is in the instrument lab or on the 

219 mountain. 

220 """ 

221 return cls._ROLLOVER_TIME 

222 

223 @classmethod 

224 def compute_detector_exposure_id(cls, exposure_id, detector_num): 

225 """Compute the detector exposure ID from detector number and 

226 exposure ID. 

227 

228 This is a helper method to allow code working outside the translator 

229 infrastructure to use the same algorithm. 

230 

231 Parameters 

232 ---------- 

233 exposure_id : `int` 

234 Unique exposure ID. 

235 detector_num : `int` 

236 Detector number. 

237 

238 Returns 

239 ------- 

240 detector_exposure_id : `int` 

241 The calculated ID. 

242 """ 

243 from .._packer import RubinDimensionPacker 

244 

245 config = RubinDimensionPacker.ConfigClass() 

246 config.use_controllers() 

247 return RubinDimensionPacker.pack_id_pair(exposure_id, detector_num, config=config) 

248 

249 @classmethod 

250 def max_detector_exposure_id(cls): 

251 """The maximum detector exposure ID expected to be generated by 

252 this instrument. 

253 

254 Returns 

255 ------- 

256 max_id : `int` 

257 The maximum value. 

258 """ 

259 max_exposure_id = cls.max_exposure_id() 

260 # We subtract 1 from DETECTOR_MAX because LSST detector_num values are 

261 # zero-based, and detector_max is the maximum number *of* detectors, 

262 # while this returns the (inclusive) maximum ID value. 

263 return cls.compute_detector_exposure_id(max_exposure_id, cls.DETECTOR_MAX - 1) 

264 

265 @classmethod 

266 def max_exposure_id(cls): 

267 """The maximum exposure ID expected from this instrument. 

268 

269 Returns 

270 ------- 

271 max_exposure_id : `int` 

272 The maximum value. 

273 

274 Notes 

275 ----- 

276 The value is hard-coded to reflect historical values that were used 

277 for various controllers before the sequence counter was unified. 

278 """ 

279 # Assumes maximum observing date of 2050-12-31, 99,999 exposures per 

280 # day and 6 controllers. 

281 return 7050123199999 

282 

283 @classmethod 

284 def detector_mapping(cls): 

285 """Returns the mapping of full name to detector ID and serial. 

286 

287 Returns 

288 ------- 

289 mapping : `dict` of `str`:`tuple` 

290 Returns the mapping of full detector name (group+detector) 

291 to detector number and serial. 

292 

293 Raises 

294 ------ 

295 ValueError 

296 Raised if no camera policy file has been registered with this 

297 translation class. 

298 

299 Notes 

300 ----- 

301 Will construct the mapping if none has previously been constructed. 

302 """ 

303 if cls.cameraPolicyFile is not None: 

304 if cls.detectorMapping is None: 

305 cls.detectorMapping = read_detector_ids(cls.cameraPolicyFile) 

306 else: 

307 raise ValueError(f"Translation class '{cls.__name__}' has no registered camera policy file") 

308 

309 return cls.detectorMapping 

310 

311 @classmethod 

312 def detector_serials(cls): 

313 """Obtain the mapping of detector serial to detector group, name, 

314 and number. 

315 

316 Returns 

317 ------- 

318 info : `dict` of `tuple` of (`str`, `str`, `int`) 

319 A `dict` with the serial numbers as keys and values of detector 

320 group, name, and number. 

321 """ 

322 if cls.detectorSerials is None: 

323 detector_mapping = cls.detector_mapping() 

324 

325 if detector_mapping is not None: 

326 # Form mapping to go from serial number to names/numbers 

327 serials = {} 

328 for fullname, (id, serial) in cls.detectorMapping.items(): 

329 raft, detector_name = fullname.split("_") 

330 if serial in serials: 

331 raise RuntimeError(f"Serial {serial} is defined in multiple places") 

332 serials[serial] = (raft, detector_name, id) 

333 cls.detectorSerials = serials 

334 else: 

335 raise RuntimeError("Unable to obtain detector mapping information") 

336 

337 return cls.detectorSerials 

338 

339 @classmethod 

340 def compute_detector_num_from_name(cls, detector_group, detector_name): 

341 """Helper method to return the detector number from the name. 

342 

343 Parameters 

344 ---------- 

345 detector_group : `str` 

346 Name of the detector grouping. This is generally the raft name. 

347 detector_name : `str` 

348 Detector name. 

349 

350 Returns 

351 ------- 

352 num : `int` 

353 Detector number. 

354 """ 

355 fullname = f"{detector_group}_{detector_name}" 

356 

357 num = None 

358 detector_mapping = cls.detector_mapping() 

359 if detector_mapping is None: 

360 raise RuntimeError("Unable to obtain detector mapping information") 

361 

362 if fullname in detector_mapping: 

363 num = detector_mapping[fullname] 

364 else: 

365 log.warning(f"Unable to determine detector number from detector name {fullname}") 

366 return None 

367 

368 return num[0] 

369 

370 @classmethod 

371 def compute_detector_info_from_serial(cls, detector_serial): 

372 """Helper method to return the detector information from the serial. 

373 

374 Parameters 

375 ---------- 

376 detector_serial : `str` 

377 Detector serial ID. 

378 

379 Returns 

380 ------- 

381 info : `tuple` of (`str`, `str`, `int`) 

382 Detector group, name, and number. 

383 """ 

384 serial_mapping = cls.detector_serials() 

385 if serial_mapping is None: 

386 raise RuntimeError("Unable to obtain serial mapping information") 

387 

388 if detector_serial in serial_mapping: 

389 info = serial_mapping[detector_serial] 

390 else: 

391 raise RuntimeError("Unable to determine detector information from detector serial" 

392 f" {detector_serial}") 

393 

394 return info 

395 

396 @staticmethod 

397 def compute_exposure_id(dayobs, seqnum, controller=None): 

398 """Helper method to calculate the exposure_id. 

399 

400 Parameters 

401 ---------- 

402 dayobs : `str` or `int` 

403 Day of observation in either YYYYMMDD or YYYY-MM-DD format. 

404 If the string looks like ISO format it will be truncated before the 

405 ``T`` before being handled. 

406 seqnum : `int` or `str` 

407 Sequence number. 

408 controller : `str`, optional 

409 Controller to use. If this is "O", no change is made to the 

410 exposure ID. Before Oct 5 2023, if it is "C" a 1000 is added to the 

411 year component of the exposure ID. If it is "H" a 2000 is added to 

412 the year component. Before Apr 18 2025, this sequence continues 

413 with "P", "Q", and "S" controllers. `None` indicates that the 

414 controller is not relevant to the exposure ID calculation 

415 (generally this is the case for test stand data). 

416 

417 Returns 

418 ------- 

419 exposure_id : `int` 

420 Exposure ID in form YYYYMMDDnnnnn form. 

421 """ 

422 if isinstance(seqnum, str): 

423 seqnum = int(seqnum) 

424 # We really want an integer but the checks require a str. 

425 if isinstance(dayobs, int): 

426 dayobs = str(dayobs) 

427 

428 if "T" in dayobs: 

429 dayobs = dayobs[:dayobs.find("T")] 

430 

431 dayobs = dayobs.replace("-", "") 

432 

433 if len(dayobs) != 8: 

434 raise ValueError(f"Malformed dayobs: {dayobs}") 

435 

436 # Expect no more than 99,999 exposures in a day 

437 if seqnum >= 10**_SEQNUM_MAXDIGITS: 

438 raise ValueError(f"Sequence number ({seqnum}) exceeds limit") 

439 

440 dayobs = int(dayobs) 

441 if dayobs > 20231004 and controller == "C": 

442 # As of this date the CCS controller has a unified counter 

443 # with the OCS, so there is no need to adjust the dayobs 

444 # to make unique exposure IDs. 

445 controller = None 

446 elif dayobs > 20250417 and controller in {"P", "S", "Q"}: 

447 # At some point in the past the PSQ and OC controller sequence 

448 # counters were unified. To avoid confusion with previous files 

449 # that may already be ingested where we do not want to change 

450 # the exposure ID, only assume identical sequences from this date. 

451 controller = None 

452 

453 # Camera control changes the exposure ID 

454 if controller is not None: 

455 index = CONTROLLERS.find(controller) 

456 if index == -1: 

457 raise ValueError(f"Supplied controller, '{controller}' is not " 

458 f"in supported list: {CONTROLLERS}") 

459 

460 # Increment a thousand years per controller 

461 dayobs += _CONTROLLER_INCREMENT * index 

462 

463 # Form the number as a string zero padding the sequence number 

464 idstr = f"{dayobs}{seqnum:0{_SEQNUM_MAXDIGITS}d}" 

465 

466 # Exposure ID has to be an integer 

467 return int(idstr) 

468 

469 @staticmethod 

470 def unpack_exposure_id(exposure_id): 

471 """Unpack an exposure ID into dayobs, seqnum, and controller. 

472 

473 Parameters 

474 ---------- 

475 exposure_id : `int` 

476 Integer exposure ID produced by `compute_exposure_id`. 

477 

478 Returns 

479 ------- 

480 dayobs : `str` 

481 Day of observation as a YYYYMMDD string. 

482 seqnum : `int` 

483 Sequence number. 

484 controller : `str` 

485 Controller code. Will be ``O`` (but should be ignored) for IDs 

486 produced by calling `compute_exposure_id` with ``controller=None``. 

487 """ 

488 dayobs, seqnum = divmod(exposure_id, 10**_SEQNUM_MAXDIGITS) 

489 controller_index = dayobs // _CONTROLLER_INCREMENT - 2 

490 dayobs -= controller_index * _CONTROLLER_INCREMENT 

491 return (str(dayobs), seqnum, CONTROLLERS[controller_index], ) 

492 

493 def _is_on_mountain(self): 

494 """Indicate whether these data are coming from the instrument 

495 installed on the mountain. 

496 

497 Returns 

498 ------- 

499 is : `bool` 

500 `True` if instrument is on the mountain. 

501 """ 

502 if "TSTAND" in self._header: 

503 return False 

504 return True 

505 

506 def is_on_sky(self): 

507 """Determine if this is an on-sky observation. 

508 

509 Returns 

510 ------- 

511 is_on_sky : `bool` 

512 Returns True if this is a observation on sky on the 

513 summit. 

514 """ 

515 # For LSST we think on sky unless tracksys is local 

516 if self.is_key_ok("TRACKSYS"): 

517 if self._header["TRACKSYS"].lower() == "local": 

518 # not on sky 

519 return False 

520 

521 # These are obviously not on sky 

522 if self.to_observation_type() in self._non_sky_observation_types: 

523 return False 

524 

525 return self._is_on_mountain() 

526 

527 @cache_translation 

528 def to_location(self): 

529 # Docstring will be inherited. Property defined in properties.py 

530 if not self._is_on_mountain(): 

531 return None 

532 try: 

533 # Try standard FITS headers 

534 return super().to_location() 

535 except (KeyError, TypeError): 

536 return self._DEFAULT_LOCATION 

537 

538 @cache_translation 

539 def to_datetime_begin(self): 

540 # Docstring will be inherited. Property defined in properties.py 

541 # Prefer -BEG over -OBS. Let it fail with KeyError if no headers 

542 # can be found. 

543 date_key = "MJD-BEG" 

544 date_fmt = "mjd" 

545 for k in ("MJD-BEG", "DATE-BEG", "MJD-OBS", "DATE-OBS"): 

546 if self.is_key_ok(k): 

547 date_key = k 

548 date_fmt = "mjd" if k.startswith("MJD") else "fits" 

549 break 

550 

551 self._used_these_cards(date_key) 

552 return Time(self._header[date_key], scale="tai", format=date_fmt) 

553 

554 @cache_translation 

555 def to_datetime_end(self): 

556 # Docstring will be inherited. Property defined in properties.py 

557 if self.is_key_ok("DATE-END"): 

558 return super().to_datetime_end() 

559 

560 exposure_time = self.to_exposure_time() 

561 if exposure_time.value < 0.0: 

562 # Some translators deliberately return -1.0s if the exposure 

563 # time can not be determined. In that scenario set end time 

564 # to the same value as the start time. 

565 return self.to_datetime_begin() 

566 

567 return self.to_datetime_begin() + exposure_time 

568 

569 @cache_translation 

570 def to_detector_num(self): 

571 # Docstring will be inherited. Property defined in properties.py 

572 raft = self.to_detector_group() 

573 detector = self.to_detector_name() 

574 return self.compute_detector_num_from_name(raft, detector) 

575 

576 @cache_translation 

577 def to_detector_exposure_id(self): 

578 # Docstring will be inherited. Property defined in properties.py 

579 exposure_id = self.to_exposure_id() 

580 num = self.to_detector_num() 

581 return self.compute_detector_exposure_id(exposure_id, num) 

582 

583 @cache_translation 

584 def to_observation_type(self): 

585 # Docstring will be inherited. Property defined in properties.py 

586 obstype = self._header["IMGTYPE"] 

587 self._used_these_cards("IMGTYPE") 

588 obstype = obstype.lower() 

589 if obstype in ("skyexp", "object"): 

590 obstype = "science" 

591 return obstype 

592 

593 @cache_translation 

594 def to_observation_reason(self): 

595 # Docstring will be inherited. Property defined in properties.py 

596 for key in ("REASON", "TESTTYPE"): 

597 if self.is_key_ok(key): 

598 reason = self._header[key] 

599 self._used_these_cards(key) 

600 return reason.lower() 

601 # no specific header present so use the default translation 

602 return super().to_observation_reason() 

603 

604 @cache_translation 

605 def to_dark_time(self): 

606 """Calculate the dark time. 

607 

608 If a DARKTIME header is not found, the value is assumed to be 

609 identical to the exposure time. 

610 

611 Returns 

612 ------- 

613 dark : `astropy.units.Quantity` 

614 The dark time in seconds. 

615 """ 

616 if self.is_key_ok("DARKTIME"): 

617 darktime = self._header["DARKTIME"]*u.s 

618 self._used_these_cards("DARKTIME") 

619 else: 

620 log.warning("%s: Unable to determine dark time. Setting from exposure time.", 

621 self._log_prefix) 

622 darktime = self.to_exposure_time() 

623 return darktime 

624 

625 def _get_controller_code(self) -> str | None: 

626 """Return the controller code. 

627 

628 Returns 

629 ------- 

630 code : `str` 

631 Single character code representing the controller. Returns 

632 `None` if no controller can be determined. 

633 """ 

634 key = "CONTRLLR" 

635 if self.is_key_ok(key): 

636 controller = self._header[key] 

637 self._used_these_cards(key) 

638 else: 

639 controller = None 

640 return controller 

641 

642 @cache_translation 

643 def to_exposure_id(self): 

644 """Generate a unique exposure ID number 

645 

646 This is a combination of DAYOBS and SEQNUM, and optionally 

647 CONTRLLR. 

648 

649 Returns 

650 ------- 

651 exposure_id : `int` 

652 Unique exposure number. 

653 """ 

654 if "CALIB_ID" in self._header: 

655 self._used_these_cards("CALIB_ID") 

656 return None 

657 

658 dayobs = self._header["DAYOBS"] 

659 seqnum = self._header["SEQNUM"] 

660 self._used_these_cards("DAYOBS", "SEQNUM") 

661 

662 controller = self._get_controller_code() 

663 

664 return self.compute_exposure_id(dayobs, seqnum, controller=controller) 

665 

666 @cache_translation 

667 def to_visit_id(self): 

668 """Calculate the visit associated with this exposure. 

669 

670 Notes 

671 ----- 

672 For LATISS and LSSTCam the default visit is derived from the 

673 exposure group. For other instruments we return the exposure_id. 

674 """ 

675 

676 exposure_group = self.to_exposure_group() 

677 # If the group is an int we return it 

678 try: 

679 visit_id = int(exposure_group) 

680 return visit_id 

681 except ValueError: 

682 pass 

683 

684 # A Group is defined as ISO date with an extension 

685 # The integer must be the same for a given group so we can never 

686 # use datetime_begin. 

687 # Nominally a GROUPID looks like "ISODATE+N" where the +N is 

688 # optional. This can be converted to seconds since epoch with 

689 # an adjustment for N. 

690 # For early data lacking that form we hash the group and return 

691 # the int. 

692 matches_date = GROUP_RE.match(exposure_group) 

693 if matches_date: 

694 iso_str = matches_date.group(1) 

695 fraction = matches_date.group(2) 

696 n = matches_date.group(3) 

697 if n is not None: 

698 n = int(n) 

699 else: 

700 n = 0 

701 iso = datetime.datetime.strptime(iso_str, "%Y-%m-%dT%H:%M:%S") 

702 

703 tdelta = iso - TZERO_DATETIME 

704 epoch = int(tdelta.total_seconds()) 

705 

706 # Form the integer from EPOCH + 3 DIGIT FRAC + 0-pad N 

707 visit_id = int(f"{epoch}{fraction}{n:04d}") 

708 else: 

709 # Non-standard string so convert to numbers 

710 # using a hash function. Use the first N hex digits 

711 group_bytes = exposure_group.encode("us-ascii") 

712 hasher = hashlib.blake2b(group_bytes) 

713 # Need to be big enough it does not possibly clash with the 

714 # date-based version above 

715 digest = hasher.hexdigest()[:14] 

716 visit_id = int(digest, base=16) 

717 

718 # To help with hash collision, append the string length 

719 visit_id = int(f"{visit_id}{len(exposure_group):02d}") 

720 

721 return visit_id 

722 

723 @cache_translation 

724 def to_physical_filter(self): 

725 """Calculate the physical filter name. 

726 

727 Returns 

728 ------- 

729 filter : `str` 

730 Name of filter. Can be a combination of FILTER, FILTER1 and FILTER2 

731 headers joined by a "~". Returns "unknown" if no filter is declared 

732 """ 

733 joined = self._join_keyword_values(["FILTER", "FILTER1", "FILTER2"], delim=FILTER_DELIMITER) 

734 if not joined: 

735 joined = "unknown" 

736 

737 # Replace instances of "NONE" with "none". 

738 joined = joined.replace("NONE", "none") 

739 

740 return joined 

741 

742 @cache_translation 

743 def to_tracking_radec(self): 

744 # Do not even attempt to attach an RA/Dec for observations that we 

745 # know are not going to be tracking. The Rubin OCS can sometimes 

746 # report the telescope is tracking when it's not when doing 

747 # calibrations like these. Darks are sometimes taken whilst tracking 

748 # to test stability so those are special-cased. 

749 non_sky_obstypes = {t for t in self._non_sky_observation_types if t != "dark"} 

750 if self.to_observation_type() in non_sky_obstypes: 

751 return None 

752 

753 # Not an observation that is tracking in RA/Dec so it is not 

754 # appropriate to report a value for this. 

755 if self.are_keys_ok(["TRACKSYS"]) and self._header["TRACKSYS"] != "RADEC": 

756 return None 

757 

758 # RA/DEC are *derived* headers and for the case where the DATE-BEG 

759 # is 1970 they are garbage and should not be used. 

760 try: 

761 if self._header.get("DATE-OBS") == self._header["DATE"]: 

762 # A fixed up date -- use AZEL as source of truth 

763 altaz = self.to_altaz_begin() 

764 radec = astropy.coordinates.SkyCoord(altaz.transform_to(astropy.coordinates.ICRS()), 

765 obstime=altaz.obstime, 

766 location=altaz.location) 

767 else: 

768 radecsys = ("RADESYS",) 

769 radecpairs = (("RASTART", "DECSTART"), ("RA", "DEC")) 

770 radec = tracking_from_degree_headers(self, radecsys, radecpairs) 

771 except Exception: 

772 # If this observation was not formally on sky then we are allowed 

773 # to return None. 

774 if self.is_on_sky(): 

775 raise 

776 radec = None 

777 

778 return radec 

779 

780 @cache_translation 

781 def to_altaz_begin(self): 

782 return self._to_altaz("AZSTART", "ELSTART") 

783 

784 @cache_translation 

785 def to_altaz_end(self): 

786 return self._to_altaz("AZEND", "ELEND") 

787 

788 def _to_altaz(self, az_key, el_key): 

789 if not self._is_on_mountain(): 

790 return None 

791 

792 # H controller data are sometimes science observations without 

793 # having AZx header. The code lets those return nothing. 

794 if self._get_controller_code() == "H" and not self.are_keys_ok([el_key, az_key]): 

795 return None 

796 

797 # Always attempt to find the alt/az values regardless of observation 

798 # type. 

799 altaz = altaz_from_degree_headers(self, ((el_key, az_key),), 

800 self.to_datetime_begin(), is_zd=False, max_alt=95.55, min_alt=-5.55) 

801 self._used_these_cards(el_key, az_key) 

802 return altaz 

803 

804 @cache_translation 

805 def to_exposure_group(self): 

806 """Calculate the exposure group string. 

807 

808 For LSSTCam and LATISS this is read from the ``GROUPID`` header. 

809 If that header is missing the exposure_id is returned instead as 

810 a string. 

811 """ 

812 if self.is_key_ok("GROUPID"): 

813 exposure_group = self._header["GROUPID"] 

814 self._used_these_cards("GROUPID") 

815 # Sometimes people forget to quote date strings in YAML 

816 # correction files. This is a problem because we are assuming 

817 # strings for matching across multiple exposures and if the 

818 # value in the YAML file is not milliseconds then there is 

819 # a potential disaster. 

820 if isinstance(exposure_group, datetime.datetime): 

821 exposure_group = exposure_group.isoformat(timespec="milliseconds") 

822 return exposure_group 

823 return super().to_exposure_group() 

824 

825 @cache_translation 

826 def to_focus_z(self): 

827 """Return the defocal distance of the camera in units of mm. 

828 If there is no ``FOCUSZ`` value in the header it will return 

829 the default 0.0mm value. 

830 

831 Returns 

832 ------- 

833 focus_z: `astropy.units.Quantity` 

834 The defocal distance from header in mm or the 0.0mm default 

835 """ 

836 if self.is_key_ok("FOCUSZ"): 

837 # Some broken files have strings instead of floats. 

838 focus_z = float(self._header["FOCUSZ"]) 

839 return focus_z * u.mm 

840 return super().to_focus_z() 

841 

842 @staticmethod 

843 def _is_filter_empty(filter): 

844 """Return true if the supplied filter indicates an empty filter slot 

845 

846 Parameters 

847 ---------- 

848 filter : `str` 

849 The filter string to check. 

850 

851 Returns 

852 ------- 

853 is_empty : `bool` 

854 `True` if the filter string looks like it is referring to an 

855 empty filter slot. For example this can be if the filter is 

856 "empty" or "empty_2". 

857 """ 

858 return bool(re.match(r"empty_?\d*$", filter.lower())) 

859 

860 def _determine_primary_filter(self): 

861 """Determine the primary filter from the ``FILTER`` header. 

862 

863 Returns 

864 ------- 

865 filter : `str` 

866 The contents of the ``FILTER`` header with some appropriate 

867 defaulting. 

868 """ 

869 

870 if self.is_key_ok("FILTER"): 

871 physical_filter = self._header["FILTER"] 

872 self._used_these_cards("FILTER") 

873 

874 if self._is_filter_empty(physical_filter): 

875 physical_filter = "empty" 

876 else: 

877 # Be explicit about having no knowledge of the filter 

878 # by setting it to "unknown". It should always have a value. 

879 physical_filter = "unknown" 

880 

881 # Warn if the filter being unknown is important 

882 obstype = self.to_observation_type() 

883 if obstype not in ("bias", "dark"): 

884 log.warning("%s: Unable to determine the filter", 

885 self._log_prefix) 

886 

887 return physical_filter 

888 

889 @cache_translation 

890 def to_observing_day(self): 

891 """Return the day of observation as YYYYMMDD integer. 

892 

893 For LSSTCam and other compliant instruments this is the value 

894 of the DAYOBS header. 

895 

896 Returns 

897 ------- 

898 obs_day : `int` 

899 The day of observation. 

900 """ 

901 if self.is_key_ok("DAYOBS"): 

902 self._used_these_cards("DAYOBS") 

903 return int(self._header["DAYOBS"]) 

904 

905 return super().to_observing_day() 

906 

907 @cache_translation 

908 def to_observation_counter(self): 

909 """Return the sequence number within the observing day. 

910 

911 Returns 

912 ------- 

913 counter : `int` 

914 The sequence number for this day. 

915 """ 

916 if self.is_key_ok("SEQNUM"): 

917 # Some older LATISS data may not have the header 

918 # but this is corrected in fix_header for LATISS. 

919 self._used_these_cards("SEQNUM") 

920 return int(self._header["SEQNUM"]) 

921 

922 # This indicates a problem so we warn and return a 0 

923 log.warning("%s: Unable to determine the observation counter so returning 0", 

924 self._log_prefix) 

925 return 0 

926 

927 @cache_translation 

928 def to_boresight_rotation_coord(self): 

929 """Boresight rotation angle. 

930 

931 Only relevant for science observations. 

932 """ 

933 unknown = "unknown" 

934 if not self.is_on_sky(): 

935 return unknown 

936 

937 self._used_these_cards("ROTCOORD") 

938 coord = self._header.get("ROTCOORD", unknown) 

939 if coord is None: 

940 coord = unknown 

941 return coord 

942 

943 @cache_translation 

944 def to_boresight_airmass(self): 

945 """Calculate airmass at boresight at start of observation. 

946 

947 Notes 

948 ----- 

949 Early data are missing AMSTART header so we fall back to calculating 

950 it from ELSTART. 

951 """ 

952 if not self.is_on_sky(): 

953 return None 

954 

955 # This observation should have AMSTART 

956 amkey = "AMSTART" 

957 if self.is_key_ok(amkey): 

958 self._used_these_cards(amkey) 

959 return self._header[amkey] 

960 

961 # Instead we need to look at azel 

962 altaz = self.to_altaz_begin() 

963 if altaz is not None: 

964 return altaz.secz.to_value() 

965 

966 log.warning("%s: Unable to determine airmass of a science observation, returning 1.", 

967 self._log_prefix) 

968 return 1.0 

969 

970 @cache_translation 

971 def to_group_counter_start(self): 

972 # Effectively the start of the visit as determined by the headers. 

973 counter = self.to_observation_counter() 

974 # Older data does not have the CURINDEX header. 

975 if self.is_key_ok("CURINDEX"): 

976 # CURINDEX is 1-based. 

977 seq_start = counter - self._header["CURINDEX"] + 1 

978 self._used_these_cards("CURINDEX") 

979 return seq_start 

980 else: 

981 # If the counter is 0 we need to pick something else 

982 # that is not going to confuse the visit calculation 

983 # (since setting everything to 0 will make one big visit). 

984 return counter if counter != 0 else self.to_exposure_id() 

985 

986 @cache_translation 

987 def to_group_counter_end(self): 

988 # Effectively the end of the visit as determined by the headers. 

989 counter = self.to_observation_counter() 

990 # Older data does not have the CURINDEX or MAXINDEX headers. 

991 if self.is_key_ok("CURINDEX") and self.is_key_ok("MAXINDEX"): 

992 # CURINDEX is 1-based. CURINDEX == MAXINDEX indicates the 

993 # final exposure in the sequence. 

994 remaining = self._header["MAXINDEX"] - self._header["CURINDEX"] 

995 seq_end = counter + remaining 

996 self._used_these_cards("CURINDEX", "MAXINDEX") 

997 return seq_end 

998 else: 

999 # If the counter is 0 we need to pick something else 

1000 # that is not going to confuse the visit calculation 

1001 # (since setting everything to 0 will make one big visit). 

1002 return counter if counter != 0 else self.to_exposure_id() 

1003 

1004 @cache_translation 

1005 def to_has_simulated_content(self): 

1006 # Check all the simulation flags. 

1007 # We do not know all the simulation flags that we may have so 

1008 # must check every header key. Ideally HIERARCH SIMULATE would 

1009 # be a hierarchical header so _header["SIMULATE"] would return 

1010 # everything. The header looks like: 

1011 # 

1012 # HIERARCH SIMULATE ATMCS = / ATMCS Simulation Mode 

1013 # HIERARCH SIMULATE ATHEXAPOD = 0 / ATHexapod Simulation Mode 

1014 # HIERARCH SIMULATE ATPNEUMATICS = / ATPneumatics Simulation Mode 

1015 # HIERARCH SIMULATE ATDOME = 1 / ATDome Simulation Mode 

1016 # HIERARCH SIMULATE ATSPECTROGRAPH = 0 / ATSpectrograph Simulation Mode 

1017 # 

1018 # So any header that includes "SIMULATE" in the key name and has a 

1019 # true value implies that something in the data is simulated. 

1020 for k, v in self._header.items(): 

1021 if "SIMULATE" in k and v: 

1022 self._used_these_cards(k) 

1023 return True 

1024 

1025 # If the controller is H, P, S, or Q then the data are simulated. 

1026 controller = self._get_controller_code() 

1027 if controller: 

1028 if controller in SIMULATED_CONTROLLERS: 

1029 return True 

1030 

1031 # No simulation flags set. 

1032 return False 

1033 

1034 @cache_translation 

1035 def to_relative_humidity(self) -> float | None: 

1036 key = "HUMIDITY" 

1037 if self.is_key_ok(key): 

1038 self._used_these_cards(key) 

1039 return self._header[key] 

1040 

1041 return None 

1042 

1043 @cache_translation 

1044 def to_pressure(self): 

1045 key = "PRESSURE" 

1046 if self.is_key_ok(key): 

1047 value = self._header[key] 

1048 self._used_these_cards(key) 

1049 # There has been an inconsistency in units for the pressure reading 

1050 # so we need to adjust for this. 

1051 if value > 10_000: 

1052 unit = u.Pa 

1053 else: 

1054 unit = u.hPa 

1055 return value * unit 

1056 

1057 return None 

1058 

1059 @cache_translation 

1060 def to_temperature(self): 

1061 key = "AIRTEMP" 

1062 if self.is_key_ok(key): 

1063 self._used_these_cards(key) 

1064 return self._header[key] * u.deg_C 

1065 return None 

1066 

1067 @cache_translation 

1068 def to_can_see_sky(self) -> bool | None: 

1069 key = "SHUTTIME" 

1070 if self.is_key_ok(key) and self._header[key] == 0.0: 

1071 # Shutter never opened so impossible to see sky. 

1072 self._used_these_cards(key) 

1073 return False 

1074 

1075 key = "VIGN_MIN" 

1076 if self.is_key_ok(key): 

1077 self._used_these_cards(key) 

1078 vignetted = self._header[key] 

1079 match vignetted: 

1080 case "FULLY": 

1081 return False 

1082 case "UNKNOWN": 

1083 return None 

1084 case _: 

1085 return True 

1086 

1087 # Fallback to using the observation type if the key is missing. 

1088 # PhoSim always falls back. 

1089 if self._can_check_obstype_for_can_see_sky or self._get_controller_code() == "H": 

1090 return super().to_can_see_sky() 

1091 

1092 # Unknown state. 

1093 return None