Coverage for tests / test_forcedPhot.py: 39%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:50 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Tests of the various forced photometry tasks. 

23 

24These tests primarily confirm that their respective Tasks can be configured and 

25run without errors, but do not check anything about their algorithmic quality. 

26""" 

27 

28 

29import dataclasses 

30import unittest 

31 

32import astropy.table 

33import numpy as np 

34import pandas as pd 

35 

36import lsst.afw.image 

37from lsst.afw.geom import SkyWcs 

38from lsst.afw.math import ChebyshevBoundedField 

39from lsst.afw.table import SourceCatalog 

40from lsst.daf.butler import DataCoordinate, DatasetRef, DimensionUniverse 

41from lsst.meas.base import ForcedPhotCcdTask, ForcedPhotCcdFromDataFrameTask 

42from lsst.pipe.base import InMemoryDatasetHandle, PipelineGraph, Struct 

43import lsst.meas.base.tests 

44import lsst.utils.tests 

45 

46skyCenter = lsst.geom.SpherePoint(245.0, -45.0, lsst.geom.degrees) 

47 

48 

49@dataclasses.dataclass 

50class _MockQuantum: 

51 dataId: DataCoordinate 

52 

53 

54class _MockRefsStruct: 

55 

56 def __init__(self, datasets: dict[str, object], refs: dict[str, DatasetRef | list[DatasetRef]]): 

57 self._datasets = datasets 

58 self._refs = refs 

59 

60 def __getattr__(self, name): 

61 return self._refs[name] 

62 

63 

64@dataclasses.dataclass 

65class _MockQuantumContext: 

66 

67 quantum: _MockQuantum 

68 outputs: dict 

69 

70 def get(self, inputs: _MockRefsStruct) -> object: 

71 return inputs._datasets 

72 

73 def put(self, datasets: Struct, outputs: _MockRefsStruct) -> None: 

74 outputs._datasets = datasets.__dict__.copy() 

75 

76 

77@dataclasses.dataclass 

78class _MockTractInfo: 

79 

80 wcs: SkyWcs 

81 

82 def getWcs(self): 

83 return self.wcs 

84 

85 

86@dataclasses.dataclass 

87class _MockSkyMap: 

88 

89 wcs: SkyWcs 

90 

91 def __getitem__(self, tract): 

92 return _MockTractInfo(self.wcs) 

93 

94 

95class ForcedPhotometryTests: 

96 """Base class for tests of forced photometry tasks. 

97 

98 Creates a simple test image and catalog to run forced photometry on. 

99 """ 

100 def setUp(self): 

101 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Point2I(100, 100)) 

102 dataset = lsst.meas.base.tests.TestDataset(bbox, crval=skyCenter) 

103 dataset.addSource(instFlux=1000, centroid=lsst.geom.Point2D(30, 30)) 

104 dataset.addSource(instFlux=10000, centroid=lsst.geom.Point2D(60, 70)) 

105 

106 schema = dataset.makeMinimalSchema() 

107 self.exposure, self.refCat = dataset.realize(noise=10, schema=schema) 

108 # Simple aperture correction map in case the task needs it. 

109 apCorrMap = lsst.afw.image.ApCorrMap() 

110 apCorrMap["base_PsfFlux_instFlux"] = ChebyshevBoundedField(bbox, np.array([[2.0]])) 

111 apCorrMap["base_PsfFlux_instFluxErr"] = ChebyshevBoundedField(bbox, np.array([[3.0]])) 

112 self.exposure.info.setApCorrMap(apCorrMap) 

113 

114 # Offset WCS so that the forced coordinates don't match the truth. 

115 self.offsetWcs = dataset.makePerturbedWcs(self.exposure.wcs) 

116 

117 self.universe = DimensionUniverse() 

118 self.data_id = DataCoordinate.standardize( 

119 instrument="cam", skymap="map", tract=0, visit=1, detector=2, universe=self.universe, 

120 ) 

121 self.quantum_context = _MockQuantumContext( 

122 quantum=_MockQuantum(dataId=self.data_id), 

123 outputs={}, 

124 ) 

125 self.inputs = { 

126 "exposure": self.exposure, 

127 "skyMap": _MockSkyMap(self.offsetWcs), 

128 } 

129 

130 

131class ForcedPhotCcdTaskTestCase(ForcedPhotometryTests, lsst.utils.tests.TestCase): 

132 def testRun(self): 

133 """Test ForcedPhotCcdTask.run.""" 

134 config = ForcedPhotCcdTask.ConfigClass() 

135 task = ForcedPhotCcdTask(refSchema=self.refCat.schema, config=config) 

136 measCat = task.measurement.generateMeasCat(self.exposure, self.refCat, self.exposure.wcs) 

137 

138 task.run(measCat, self.exposure, self.refCat, self.offsetWcs) 

139 

140 # Check that something was measured. 

141 self.assertTrue(np.isfinite(measCat["base_TransformedCentroid_x"]).all()) 

142 self.assertTrue(np.isfinite(measCat["base_TransformedCentroid_y"]).all()) 

143 self.assertTrue(np.isfinite(measCat["base_PsfFlux_instFlux"]).all()) 

144 # We use an offset WCS, so the transformed centroids should not exactly 

145 # match the original positions. 

146 self.assertFloatsNotEqual(measCat["base_TransformedCentroid_x"], self.refCat['truth_x']) 

147 self.assertFloatsNotEqual(measCat["base_TransformedCentroid_y"], self.refCat['truth_y']) 

148 

149 def testRunQuantum(self): 

150 """Test ForcedPhotCcdTask.runQuantum.""" 

151 config = ForcedPhotCcdTask.ConfigClass() 

152 self.checkRunQuantum( 

153 config, 

154 lambda _: SourceCatalog(self.refCat.schema), 

155 InMemoryDatasetHandle(self.refCat), 

156 "base_TransformedCentroid" 

157 ) 

158 

159 def testRunQuantumArrowAstropy(self): 

160 """Test ForcedPhotCcdTask.runQuantum, with input reconfigured to 

161 ArrowAstropy. 

162 """ 

163 config = ForcedPhotCcdTask.ConfigClass() 

164 config.configureParquetRefCat() 

165 ref_cat = astropy.table.Table( 

166 { 

167 "diaObjectId": self.refCat["id"], 

168 "ra": self.refCat["coord_ra"]*180/np.pi, 

169 "dec": self.refCat["coord_dec"]*180/np.pi 

170 } 

171 ) 

172 self.checkRunQuantum( 

173 config, 

174 get_init_input=None, 

175 ref_cat_input=InMemoryDatasetHandle(ref_cat, storageClass="ArrowAstropy"), 

176 centroid_name="base_TransformedCentroidFromCoord" 

177 ) 

178 

179 def testFromDataFrameRun(self): 

180 """Test ForcedPhotCcdFromDataFrameTask.run.""" 

181 # Testing run() for this task ignores the dataframe->SourceCatalog 

182 # conversion that happens in runQuantum, but that should be tested 

183 # separately. 

184 config = ForcedPhotCcdFromDataFrameTask.ConfigClass() 

185 task = ForcedPhotCcdFromDataFrameTask(refSchema=self.refCat.schema, config=config) 

186 measCat = task.measurement.generateMeasCat(self.exposure, self.refCat, self.exposure.wcs) 

187 

188 task.run(measCat, self.exposure, self.refCat, self.offsetWcs) 

189 

190 # Check that something was measured. 

191 self.assertTrue(np.isfinite(measCat["base_TransformedCentroidFromCoord_x"]).all()) 

192 self.assertTrue(np.isfinite(measCat["base_TransformedCentroidFromCoord_y"]).all()) 

193 self.assertTrue(np.isfinite(measCat["base_PsfFlux_instFlux"]).all()) 

194 # We use an offset WCS, so the transformed centroids should not exactly 

195 # match the original positions. 

196 self.assertFloatsNotEqual(measCat["base_TransformedCentroidFromCoord_x"], self.refCat['truth_x']) 

197 self.assertFloatsNotEqual(measCat["base_TransformedCentroidFromCoord_y"], self.refCat['truth_y']) 

198 

199 def testFromDataFrameRunQuantum(self): 

200 """Test ForcedPhotCcdFromDataFrameTask.runQuantum.""" 

201 config = ForcedPhotCcdFromDataFrameTask.ConfigClass() 

202 ref_cat = pd.DataFrame( 

203 { 

204 "diaObjectId": self.refCat["id"], 

205 "ra": self.refCat["coord_ra"]*180/np.pi, 

206 "dec": self.refCat["coord_dec"]*180/np.pi 

207 } 

208 ) 

209 self.checkRunQuantum( 

210 config, 

211 get_init_input=None, 

212 ref_cat_input=InMemoryDatasetHandle(ref_cat, storageClass="DataFrame"), 

213 centroid_name="base_TransformedCentroidFromCoord", 

214 ) 

215 

216 def checkRunQuantum(self, config, get_init_input, ref_cat_input, centroid_name, 

217 task_class=ForcedPhotCcdTask): 

218 """Run tests on a runQuantum method. 

219 

220 Parameters 

221 ---------- 

222 config : `lsst.meas.base.ForcedPhotCcdConfig` 

223 Configuration for the task. 

224 get_init_input : callable or `None` 

225 Callable that takes a single ignored argument and returns the 

226 init-input object expected by the task at construction. 

227 ref_cat_input : `lsst.pipe.base.InMemoryDatasetHandle` 

228 Handle holding the input reference catalog. 

229 centroid_name : `str` 

230 Base name of the centroid plugin. 

231 task_class : `type`, optional 

232 Subclass of `ForcedPhotCcdTask` to use. 

233 """ 

234 config.useVisitSummary = False 

235 config.doApplySkyCorr = False 

236 config.idGenerator.packer.name = "observation" 

237 config.idGenerator.packer["observation"].n_detectors = 5 

238 config.idGenerator.packer["observation"].n_observations = 10 

239 pipeline_graph = PipelineGraph(universe=self.universe) 

240 pipeline_graph.add_task("forcedPhotCcd", task_class, config) 

241 pipeline_graph.resolve(dimensions=self.universe) 

242 init_outputs = [] 

243 (task,) = pipeline_graph.instantiate_tasks(get_init_input=get_init_input, init_outputs=init_outputs) 

244 ((output_schema_cat, _),) = init_outputs 

245 self.inputs["refCat"] = [ref_cat_input] 

246 exposure_dataset_type = pipeline_graph.dataset_types["calexp"].dataset_type 

247 input_refs = _MockRefsStruct( 

248 self.inputs, 

249 { 

250 # This particular runQuantum mostly just gets all inputs at 

251 # once, but it does need one DatasetRef with a proper data ID. 

252 "exposure": DatasetRef( 

253 exposure_dataset_type, 

254 self.quantum_context.quantum.dataId.subset(exposure_dataset_type.dimensions), 

255 run="arbitrary", 

256 ) 

257 } 

258 ) 

259 output_refs = _MockRefsStruct({}, {}) 

260 task.runQuantum(self.quantum_context, input_refs, output_refs) 

261 measCat = output_refs._datasets["measCat"] 

262 self.assertEqual(output_schema_cat.schema, measCat.schema) 

263 # Check that something was measured. 

264 self.assertTrue(np.isfinite(measCat[f"{centroid_name}_x"]).all()) 

265 self.assertTrue(np.isfinite(measCat[f"{centroid_name}_y"]).all()) 

266 self.assertTrue(np.isfinite(measCat["base_PsfFlux_instFlux"]).all()) 

267 # We use an offset WCS, so the transformed centroids should not exactly 

268 # match the original positions. 

269 self.assertFloatsNotEqual(measCat[f"{centroid_name}_x"], self.refCat['truth_x']) 

270 self.assertFloatsNotEqual(measCat[f"{centroid_name}_y"], self.refCat['truth_y']) 

271 

272 

273class MemoryTester(lsst.utils.tests.MemoryTestCase): 

274 pass 

275 

276 

277def setup_module(module): 

278 lsst.utils.tests.init() 

279 

280 

281if __name__ == "__main__": 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 lsst.utils.tests.init() 

283 unittest.main()