Coverage for tests / test_forcedPhot.py: 32%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:21 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:21 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Tests of the various forced photometry tasks.
24These tests primarily confirm that their respective Tasks can be configured and
25run without errors, but do not check anything about their algorithmic quality.
26"""
29import dataclasses
30import unittest
32import numpy as np
34import lsst.afw.image
35from lsst.afw.geom import SkyWcs
36from lsst.afw.math import ChebyshevBoundedField
37from lsst.daf.butler import DataCoordinate, DatasetRef, DimensionUniverse
38from lsst.pipe.tasks.forcedPhotDetector import ForcedPhotDetectorTask
39from lsst.pipe.base import InMemoryDatasetHandle, PipelineGraph, Struct
40import lsst.meas.base.tests
41import lsst.utils.tests
43skyCenter = lsst.geom.SpherePoint(245.0, -45.0, lsst.geom.degrees)
46@dataclasses.dataclass
47class _MockQuantum:
48 dataId: DataCoordinate
51class _MockRefsStruct:
53 def __init__(self, datasets: dict[str, object], refs: dict[str, DatasetRef | list[DatasetRef]]):
54 self._datasets = datasets
55 self._refs = refs
57 def __getattr__(self, name):
58 return self._refs[name]
61@dataclasses.dataclass
62class _MockQuantumContext:
64 quantum: _MockQuantum
65 outputs: dict
67 def get(self, inputs: _MockRefsStruct) -> object:
68 return inputs._datasets
70 def put(self, datasets: Struct, outputs: _MockRefsStruct) -> None:
71 outputs._datasets = datasets.__dict__.copy()
74@dataclasses.dataclass
75class _MockTractInfo:
77 wcs: SkyWcs
79 def getWcs(self):
80 return self.wcs
83@dataclasses.dataclass
84class _MockSkyMap:
86 wcs: SkyWcs
88 def __getitem__(self, tract):
89 return _MockTractInfo(self.wcs)
92class ForcedPhotometryTests:
93 """Base class for tests of forced photometry tasks.
95 Creates a simple test image and catalog to run forced photometry on.
96 """
97 def setUp(self):
98 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Point2I(100, 100))
99 dataset = lsst.meas.base.tests.TestDataset(bbox, crval=skyCenter)
100 dataset.addSource(instFlux=1000, centroid=lsst.geom.Point2D(30, 30))
101 dataset.addSource(instFlux=10000, centroid=lsst.geom.Point2D(60, 70))
103 diaDataset = lsst.meas.base.tests.TestDataset(bbox, crval=skyCenter)
104 diaDataset.addSource(instFlux=500, centroid=lsst.geom.Point2D(30, 30))
105 diaDataset.addSource(instFlux=12000, centroid=lsst.geom.Point2D(60, 70))
107 schema = dataset.makeMinimalSchema()
108 self.exposure, self.refCat = dataset.realize(noise=10, schema=schema)
109 self.diaExposure, self.diaRefCat = diaDataset.realize(noise=10, schema=schema)
110 # Simple aperture correction map in case the task needs it.
111 apCorrMap = lsst.afw.image.ApCorrMap()
112 apCorrMap["base_PsfFlux_instFlux"] = ChebyshevBoundedField(bbox, np.array([[2.0]]))
113 apCorrMap["base_PsfFlux_instFluxErr"] = ChebyshevBoundedField(bbox, np.array([[3.0]]))
114 self.exposure.info.setApCorrMap(apCorrMap)
116 # Convert the reference catalog to an astropy table.
117 refTable = self.refCat.asAstropy(copy=True)
118 refTable.rename_column("id", "objectId")
119 refTable.rename_column("slot_Centroid_x", "x")
120 refTable.rename_column("slot_Centroid_y", "y")
121 refTable["coord_ra"] = refTable["coord_ra"].to("deg")
122 refTable["coord_dec"] = refTable["coord_dec"].to("deg")
123 self.refTable = refTable
125 # Convert the dia reference catalog to an astropy table.
126 diaRefTable = self.diaRefCat.asAstropy(copy=True)
127 diaRefTable.rename_column("id", "objectId")
128 diaRefTable.rename_column("coord_ra", "ra")
129 diaRefTable.rename_column("coord_dec", "dec")
130 diaRefTable.rename_column("slot_Centroid_x", "x")
131 diaRefTable.rename_column("slot_Centroid_y", "y")
132 diaRefTable["ra"] = diaRefTable["ra"].to("deg")
133 diaRefTable["dec"] = diaRefTable["dec"].to("deg")
134 self.diaRefTable = diaRefTable
136 # Offset WCS so that the forced coordinates don't match the truth.
137 self.offsetWcs = dataset.makePerturbedWcs(self.exposure.wcs)
139 self.universe = DimensionUniverse()
140 self.data_id = DataCoordinate.standardize(
141 instrument="cam", skymap="map", tract=0, visit=1, detector=2, universe=self.universe,
142 band="i", physical_filter="LsstCam-i", day_obs=20250814,
143 )
144 self.quantum_context = _MockQuantumContext(
145 quantum=_MockQuantum(dataId=self.data_id),
146 outputs={},
147 )
148 self.inputs = {
149 "exposure": self.exposure,
150 "diaExposure": self.diaExposure,
151 "skyMap": _MockSkyMap(self.offsetWcs),
152 }
155class ForcedPhotDetectorTaskTestCase(ForcedPhotometryTests, lsst.utils.tests.TestCase):
156 def _check_results(self, measTable, refCat):
157 # Check that something was measured.
158 self.assertTrue(np.isfinite(measTable["base_TransformedCentroidFromCoord_x"]).all())
159 self.assertTrue(np.isfinite(measTable["base_TransformedCentroidFromCoord_y"]).all())
160 self.assertTrue(np.isfinite(measTable["base_PsfFlux_instFlux"]).all())
161 # We use an offset WCS, so the transformed centroids should not exactly
162 # match the original positions.
163 self.assertFloatsNotEqual(measTable["base_TransformedCentroidFromCoord_x"], refCat['truth_x'])
164 self.assertFloatsNotEqual(measTable["base_TransformedCentroidFromCoord_y"], refCat['truth_y'])
166 def testRun(self):
167 """Test ForcedPhotDetectorTask.run."""
168 config = ForcedPhotDetectorTask.ConfigClass()
169 task = ForcedPhotDetectorTask(config=config)
170 refTable = self.refTable
171 diaRefTable = self.diaRefTable
172 visit = self.data_id['visit']
173 detector = self.data_id['detector']
174 band = self.data_id['band']
175 refCat = task._makeMinimalSourceCatalogFromAstropy(refTable)
176 directCat = task._generateMeasCat(refCat)
177 diffCat = task._generateMeasCat(refCat)
178 result = task.run(
179 refCat,
180 np.arange((len(refTable)), dtype=np.int64),
181 visit,
182 detector,
183 self.offsetWcs,
184 directCat,
185 diffCat,
186 self.exposure,
187 self.diaExposure,
188 band,
189 )
190 catalog = result.outputCatalog
191 self._check_results(catalog['calexp'], refTable)
192 self._check_results(catalog['diff'], diaRefTable)
194 def testRunQuantum(self):
195 """Test ForcedPhotDetectorTask.runQuantum."""
196 config = ForcedPhotDetectorTask.ConfigClass()
197 config.idGenerator.packer.name = "observation"
198 config.idGenerator.packer["observation"].n_detectors = 5
199 config.idGenerator.packer["observation"].n_observations = 10
201 pipeline_graph = PipelineGraph(universe=self.universe)
202 pipeline_graph.add_task("ForcedPhotDetector", ForcedPhotDetectorTask, config)
203 pipeline_graph.resolve(dimensions=self.universe)
204 init_outputs = []
205 (task,) = pipeline_graph.instantiate_tasks(
206 get_init_input=None,
207 init_outputs=init_outputs,
208 )
209 self.inputs["refCat"] = [InMemoryDatasetHandle(self.refTable, storageClass="ArrowAstropy")]
210 exposure_dataset_type = pipeline_graph.dataset_types["visit_image"].dataset_type
211 dia_exposure_dataset_type = pipeline_graph.dataset_types["difference_image"].dataset_type
212 input_refs = _MockRefsStruct(
213 self.inputs,
214 {
215 # This particular runQuantum mostly just gets all inputs at
216 # once, but it does need one DatasetRef with a proper data ID.
217 "exposure": DatasetRef(
218 exposure_dataset_type,
219 self.quantum_context.quantum.dataId.subset(exposure_dataset_type.dimensions),
220 run="arbitrary",
221 ),
222 "diaExposure": DatasetRef(
223 dia_exposure_dataset_type,
224 self.quantum_context.quantum.dataId.subset(exposure_dataset_type.dimensions),
225 run="arbitrary",
226 ),
227 }
228 )
229 output_refs = _MockRefsStruct({}, {})
230 task.runQuantum(self.quantum_context, input_refs, output_refs)
231 catalog = output_refs._datasets["outputCatalog"]
233 for table, refCat in zip(
234 (catalog["calexp"], catalog["diff"]),
235 (self.refTable, self.diaRefTable),
236 ):
237 centroid_name = "base_TransformedCentroidFromCoord"
238 # Check that something was measured.
239 self.assertTrue(np.isfinite(table[f"{centroid_name}_x"]).all())
240 self.assertTrue(np.isfinite(table[f"{centroid_name}_y"]).all())
241 self.assertTrue(np.isfinite(table["base_PsfFlux_instFlux"]).all())
242 # We use an offset WCS, so the transformed centroids should not exactly
243 # match the original positions.
244 self.assertFloatsNotEqual(table[f"{centroid_name}_x"], refCat['truth_x'])
245 self.assertFloatsNotEqual(table[f"{centroid_name}_y"], refCat['truth_y'])
248class MemoryTester(lsst.utils.tests.MemoryTestCase):
249 pass
252def setup_module(module):
253 lsst.utils.tests.init()
256if __name__ == "__main__": 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 lsst.utils.tests.init()
258 unittest.main()