Coverage for tests / test_calibrate.py: 27%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:38 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Test ProcessCcdTask and its immediate subtasks. 

23""" 

24import logging 

25import os 

26import shutil 

27import tempfile 

28import unittest 

29 

30import lsst.utils.tests 

31import lsst.afw.image 

32import lsst.afw.math 

33import lsst.afw.table 

34import lsst.daf.butler.tests as butlerTests 

35from lsst.pipe.base import testUtils 

36from lsst.pipe.tasks.calibrate import CalibrateTask, CalibrateConfig 

37from lsst.pipe.tasks.characterizeImage import CharacterizeImageTask, CharacterizeImageConfig 

38import lsst.meas.extensions.piff.piffPsfDeterminer 

39 

40TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

41 

42 

43class CalibrateTaskTestCaseWithButler(lsst.utils.tests.TestCase): 

44 

45 @classmethod 

46 def _makeTestRepo(cls, root): 

47 """Create a repository with the metadata assumed by CalibrateTask. 

48 """ 

49 # In-memory for performance 

50 config = lsst.daf.butler.Config() 

51 config["datastore", "cls"] = "lsst.daf.butler.datastores.inMemoryDatastore.InMemoryDatastore" 

52 config["datastore", "checksum"] = False 

53 config["registry", "db"] = "sqlite:///:memory:" 

54 

55 butler = lsst.daf.butler.Butler.from_config( 

56 lsst.daf.butler.Butler.makeRepo(root, config=config), writeable=True 

57 ) 

58 cls.enterClassContext(butler) 

59 butler.registry.insertDimensionData( 

60 "instrument", 

61 {"name": "notACam", "visit_max": 256, "exposure_max": 256, "detector_max": 64}) 

62 butler.registry.insertDimensionData( 

63 "physical_filter", 

64 {"instrument": "notACam", "name": "r", "band": "r"}, 

65 ) 

66 if "day_obs" in butler.dimensions: 

67 butler.registry.insertDimensionData( 

68 "day_obs", 

69 {"id": 20240201, "instrument": "notACam"}, 

70 ) 

71 butler.registry.insertDimensionData( 

72 "visit", 

73 {"instrument": "notACam", "id": 101, "name": "101", "physical_filter": "r", "day_obs": 20240201}, 

74 ) 

75 butler.registry.insertDimensionData("detector", 

76 {"instrument": "notACam", "id": 42, "full_name": "42"}) 

77 return butler 

78 

79 @classmethod 

80 def setUpClass(cls): 

81 super().setUpClass() 

82 

83 cls.root = tempfile.mkdtemp() 

84 cls.repo = cls._makeTestRepo(cls.root) 

85 

86 butlerTests.addDatasetType( 

87 cls.repo, "icExp", {"instrument", "visit", "detector"}, 

88 "ExposureF") 

89 butlerTests.addDatasetType( 

90 cls.repo, "icExpBackground", {"instrument", "visit", "detector"}, 

91 "Background") 

92 butlerTests.addDatasetType( 

93 cls.repo, "icSrc", {"instrument", "visit", "detector"}, 

94 "SourceCatalog") 

95 butlerTests.addDatasetType( 

96 cls.repo, "cal_ref_cat", {"htm7"}, 

97 "SimpleCatalog") 

98 butlerTests.addDatasetType( 

99 cls.repo, "calexp", {"instrument", "visit", "detector"}, 

100 "ExposureF") 

101 butlerTests.addDatasetType( 

102 cls.repo, "src", {"instrument", "visit", "detector"}, 

103 "SourceCatalog") 

104 butlerTests.addDatasetType( 

105 cls.repo, "calexpBackground", {"instrument", "visit", "detector"}, 

106 "Background") 

107 butlerTests.addDatasetType( 

108 cls.repo, "srcMatch", {"instrument", "visit", "detector"}, 

109 "Catalog") 

110 butlerTests.addDatasetType( 

111 cls.repo, "srcMatchFull", {"instrument", "visit", "detector"}, 

112 "Catalog") 

113 

114 @classmethod 

115 def tearDownClass(cls): 

116 shutil.rmtree(cls.root, ignore_errors=True) 

117 super().tearDownClass() 

118 

119 def setUp(self): 

120 super().setUp() 

121 self.butler = butlerTests.makeTestCollection(self.repo, uniqueId=self.id()) 

122 

123 self.dataId = {"instrument": "notACam", "visit": 101, "detector": 42} 

124 # CalibrateTask absolutely requires an ExpandedDataCoordinate 

125 self.dataId = self.butler.registry.expandDataId(self.dataId) 

126 self.refcatId = {"htm7": 189584} 

127 

128 # Tests do no processing, so we don't need real data 

129 self.exposure = lsst.afw.image.ExposureF(10, 10) 

130 background = lsst.afw.math.BackgroundMI(self.exposure.getBBox(), self.exposure.getMaskedImage()) 

131 self.backgroundlist = lsst.afw.math.BackgroundList( 

132 (background, lsst.afw.math.Interpolate.UNKNOWN, lsst.afw.math.UndersampleStyle.THROW_EXCEPTION, 

133 lsst.afw.math.ApproximateControl.UNKNOWN, 0, 0, 1)) 

134 self.icSrc = lsst.afw.table.SourceCatalog() 

135 self.refcat = lsst.afw.table.SimpleCatalog() 

136 

137 self.butler.put(self.exposure, "icExp", self.dataId) 

138 self.butler.put(self.backgroundlist, "icExpBackground", self.dataId) 

139 self.butler.put(self.icSrc, "icSrc", self.dataId) 

140 self.butler.put(self.refcat, "cal_ref_cat", self.refcatId) 

141 

142 def testDoAstrometry(self): 

143 """Ensure correct inputs passed to run whether or not doAstrometry 

144 is set. 

145 """ 

146 allIds = {key: self.dataId for key in { 

147 "exposure", "background", "icSourceCat", "outputExposure", "outputCat", "outputBackground", 

148 "matches", "matchesDenormalized" 

149 }} 

150 allIds.update({key: [self.refcatId] for key in {"astromRefCat", "photoRefCat"}}) 

151 

152 self._checkDoRefcats(doAstrometry=True, doPhotoCal=True, ids=allIds) 

153 self._checkDoRefcats(doAstrometry=False, doPhotoCal=True, ids=allIds) 

154 

155 def testDoPhotoCal(self): 

156 """Ensure correct inputs passed to run whether or not doPhotoCal 

157 is set. 

158 """ 

159 allIds = {key: self.dataId for key in { 

160 "exposure", "background", "icSourceCat", "outputExposure", "outputCat", "outputBackground", 

161 "matches", "matchesDenormalized" 

162 }} 

163 allIds.update({key: [self.refcatId] for key in {"astromRefCat", "photoRefCat"}}) 

164 

165 self._checkDoRefcats(doAstrometry=True, doPhotoCal=True, ids=allIds) 

166 self._checkDoRefcats(doAstrometry=True, doPhotoCal=False, ids=allIds) 

167 

168 def _checkDoRefcats(self, doAstrometry, doPhotoCal, ids): 

169 """Test whether run is called with the correct arguments. 

170 

171 In the case of `CalibrateTask`, the inputs should not depend on the 

172 task configuration. 

173 

174 Parameters 

175 ---------- 

176 doAstrometry, doPhotoCal : `bool` 

177 Values of the config flags of the same name. 

178 ids : `dict` [`str`] 

179 A mapping from the input dataset type to the data ID of the 

180 dataset to process. 

181 """ 

182 config = CalibrateConfig() 

183 config.doWriteMatches = False # no real output to write 

184 config.doAstrometry = doAstrometry 

185 config.doPhotoCal = doPhotoCal 

186 config.connections.photoRefCat = "cal_ref_cat" 

187 config.connections.astromRefCat = "cal_ref_cat" 

188 config.idGenerator.packer.name = "observation" 

189 task = CalibrateTask(config=config) 

190 quantumId = ids["exposure"] 

191 

192 quantum = testUtils.makeQuantum(task, self.butler, quantumId, ids) 

193 run = testUtils.runTestQuantum(task, self.butler, quantum) 

194 

195 run.assert_called_once() 

196 self.assertEqual(run.call_args[0], ()) 

197 # Some arguments unprintable because we don't have a full environment 

198 # So just check which ones were passed in 

199 self.assertEqual(run.call_args[1].keys(), 

200 {"exposure", "idGenerator", "background", "icSourceCat"}) 

201 

202 

203class CalibrateTaskTestCaseNoButler(lsst.utils.tests.TestCase): 

204 

205 def testNoAperCorrMap(self): 

206 expPath = os.path.join(TESTDIR, "data", "v695833-e0-c000-a00.sci.fits") 

207 exposure = lsst.afw.image.ExposureF(expPath) 

208 

209 charImConfig = CharacterizeImageConfig() 

210 charImConfig.measurePsf.psfDeterminer = 'piff' 

211 charImConfig.measurePsf.psfDeterminer['piff'].spatialOrder = 0 

212 charImConfig.measureApCorr.sourceSelector["science"].doSignalToNoise = False 

213 charImTask = CharacterizeImageTask(config=charImConfig) 

214 charImResults = charImTask.run(exposure) 

215 calibConfig = CalibrateConfig() 

216 calibConfig.doAstrometry = False 

217 calibConfig.doPhotoCal = False 

218 calibConfig.doSkySources = False 

219 calibConfig.doComputeSummaryStats = False 

220 

221 # Force the aperture correction map to None (DM-39626) 

222 charImResults.exposure.info.setApCorrMap(None) 

223 calibTask = CalibrateTask(config=calibConfig) 

224 with self.assertLogs(level=logging.WARNING) as cm: 

225 _ = calibTask.run(charImResults.exposure) 

226 # Other warnings may also be emitted. 

227 warnings = '\n'.join(cm.output) 

228 self.assertIn("Image does not have valid aperture correction map", warnings) 

229 

230 

231def setup_module(module): 

232 lsst.utils.tests.init() 

233 

234 

235class MemoryTestCase(lsst.utils.tests.MemoryTestCase): 

236 pass 

237 

238 

239if __name__ == "__main__": 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 lsst.utils.tests.init() 

241 unittest.main()