Coverage for python / lsst / obs / cfht / rawFormatter.py: 0%
43 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:11 +0000
1# This file is part of obs_cfht.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import lsst.afw.image
23import lsst.afw.fits
24from lsst.obs.base import FitsRawFormatterBase
25from astro_metadata_translator import MegaPrimeTranslator, fix_header
26import logging
28from .cfhtFilters import MEGAPRIME_FILTER_DEFINITIONS
29from ._instrument import MegaPrime
31__all__ = ("MegaPrimeRawFormatter",)
33log = logging.getLogger(__name__)
36class MegaPrimeRawFormatter(FitsRawFormatterBase):
37 """Gen3 Butler formatter for MegaPrime raw data.
39 MegaPrime uses multi-extension FITS files.
40 """
42 translatorClass = MegaPrimeTranslator
43 filterDefinitions = MEGAPRIME_FILTER_DEFINITIONS
45 def getDetector(self, id):
46 return MegaPrime().getCamera()[id]
48 def _toExtName(self, detectorId):
49 """Return the extension name associated with the given detector.
51 Parameters
52 ----------
53 detectorId : `int`
54 The detector ID to search for.
56 Returns
57 -------
58 name : `str`
59 The name of the extension associated with this detector.
60 """
61 return f"ccd{detectorId:02d}"
63 def _scanHdus(self, filename, detectorId):
64 """Scan through a file for the HDU containing data from one detector.
66 Parameters
67 ----------
68 filename : `str`
69 The file to search through.
70 detectorId : `int`
71 The detector id to search for.
73 Returns
74 -------
75 index : `int`
76 The index of the HDU with the requested data.
77 metadata: `lsst.daf.base.PropertyList`
78 The metadata read from the header for that detector id.
80 Raises
81 ------
82 ValueError
83 Raised if detectorId is not found in any of the file HDUs
84 """
85 fitsData = lsst.afw.fits.Fits(filename, 'r')
86 # NOTE: The primary header (HDU=0) does not contain detector data.
87 extname = self._toExtName(detectorId)
88 for i in range(1, fitsData.countHdus()):
89 fitsData.setHdu(i)
90 metadata = fitsData.readMetadata()
91 if metadata.get("EXTNAME") == extname:
92 log.debug("Found detector %s in extension %s", detectorId, i)
93 return i, metadata
94 else:
95 raise ValueError(f"Did not find detectorId={detectorId} in any HDU of {filename}.")
97 def _determineHDU(self, detectorId):
98 """Determine the correct HDU number for a given detector id.
100 Parameters
101 ----------
102 detectorId : `int`
103 The detector id to search for.
105 Returns
106 -------
107 index : `int`
108 The index of the HDU with the requested data.
109 metadata : `lsst.daf.base.PropertyList`
110 The metadata read from the header for that detector id.
112 Raises
113 ------
114 ValueError
115 Raised if detectorId is not found in any of the file HDUs
116 """
117 filename = self._reader_path
118 # We start by assuming that ccdN is HDU N+1
119 index = detectorId + 1
120 reader = self.ReaderClass(filename, hdu=index)
121 metadata = reader.readMetadata()
123 # There may be two EXTNAME headers but the second one is the one
124 # we want (the first indicates compression).
125 if metadata.get("EXTNAME") == self._toExtName(detectorId):
126 return index, metadata
128 log.info("Did not find detector=%s at expected HDU=%s in %s: scanning through all HDUs.",
129 detectorId, index, filename)
130 return self._scanHdus(filename, detectorId)
132 def readMetadata(self):
133 # Headers are duplicated so no need to merge with primary
134 index, metadata = self._determineHDU(self.data_id["detector"])
135 fix_header(metadata)
136 return metadata
138 def readImage(self):
139 index, metadata = self._determineHDU(self.data_id["detector"])
140 reader = self.ReaderClass(self._reader_path, hdu=index)
141 return reader.read()