Coverage for python / lsst / obs / cfht / rawFormatter.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:23 +0000

1# This file is part of obs_cfht. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import lsst.afw.image 

23import lsst.afw.fits 

24from lsst.obs.base import FitsRawFormatterBase 

25from astro_metadata_translator import MegaPrimeTranslator, fix_header 

26import logging 

27 

28from .cfhtFilters import MEGAPRIME_FILTER_DEFINITIONS 

29from ._instrument import MegaPrime 

30 

31__all__ = ("MegaPrimeRawFormatter",) 

32 

33log = logging.getLogger(__name__) 

34 

35 

36class MegaPrimeRawFormatter(FitsRawFormatterBase): 

37 """Gen3 Butler formatter for MegaPrime raw data. 

38 

39 MegaPrime uses multi-extension FITS files. 

40 """ 

41 

42 translatorClass = MegaPrimeTranslator 

43 filterDefinitions = MEGAPRIME_FILTER_DEFINITIONS 

44 

45 def getDetector(self, id): 

46 return MegaPrime().getCamera()[id] 

47 

48 def _toExtName(self, detectorId): 

49 """Return the extension name associated with the given detector. 

50 

51 Parameters 

52 ---------- 

53 detectorId : `int` 

54 The detector ID to search for. 

55 

56 Returns 

57 ------- 

58 name : `str` 

59 The name of the extension associated with this detector. 

60 """ 

61 return f"ccd{detectorId:02d}" 

62 

63 def _scanHdus(self, filename, detectorId): 

64 """Scan through a file for the HDU containing data from one detector. 

65 

66 Parameters 

67 ---------- 

68 filename : `str` 

69 The file to search through. 

70 detectorId : `int` 

71 The detector id to search for. 

72 

73 Returns 

74 ------- 

75 index : `int` 

76 The index of the HDU with the requested data. 

77 metadata: `lsst.daf.base.PropertyList` 

78 The metadata read from the header for that detector id. 

79 

80 Raises 

81 ------ 

82 ValueError 

83 Raised if detectorId is not found in any of the file HDUs 

84 """ 

85 fitsData = lsst.afw.fits.Fits(filename, 'r') 

86 # NOTE: The primary header (HDU=0) does not contain detector data. 

87 extname = self._toExtName(detectorId) 

88 for i in range(1, fitsData.countHdus()): 

89 fitsData.setHdu(i) 

90 metadata = fitsData.readMetadata() 

91 if metadata.get("EXTNAME") == extname: 

92 log.debug("Found detector %s in extension %s", detectorId, i) 

93 return i, metadata 

94 else: 

95 raise ValueError(f"Did not find detectorId={detectorId} in any HDU of {filename}.") 

96 

97 def _determineHDU(self, detectorId): 

98 """Determine the correct HDU number for a given detector id. 

99 

100 Parameters 

101 ---------- 

102 detectorId : `int` 

103 The detector id to search for. 

104 

105 Returns 

106 ------- 

107 index : `int` 

108 The index of the HDU with the requested data. 

109 metadata : `lsst.daf.base.PropertyList` 

110 The metadata read from the header for that detector id. 

111 

112 Raises 

113 ------ 

114 ValueError 

115 Raised if detectorId is not found in any of the file HDUs 

116 """ 

117 filename = self._reader_path 

118 # We start by assuming that ccdN is HDU N+1 

119 index = detectorId + 1 

120 reader = self.ReaderClass(filename, hdu=index) 

121 metadata = reader.readMetadata() 

122 

123 # There may be two EXTNAME headers but the second one is the one 

124 # we want (the first indicates compression). 

125 if metadata.get("EXTNAME") == self._toExtName(detectorId): 

126 return index, metadata 

127 

128 log.info("Did not find detector=%s at expected HDU=%s in %s: scanning through all HDUs.", 

129 detectorId, index, filename) 

130 return self._scanHdus(filename, detectorId) 

131 

132 def readMetadata(self): 

133 # Headers are duplicated so no need to merge with primary 

134 index, metadata = self._determineHDU(self.data_id["detector"]) 

135 fix_header(metadata) 

136 return metadata 

137 

138 def readImage(self): 

139 index, metadata = self._determineHDU(self.data_id["detector"]) 

140 reader = self.ReaderClass(self._reader_path, hdu=index) 

141 return reader.read()