Coverage for python / lsst / drp / tasks / forcedPhotCoadd.py: 0%
129 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:43 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import lsst.afw.table
23import lsst.pex.config
24import lsst.pipe.base as pipeBase
25from lsst.meas.base._id_generator import SkyMapIdGeneratorConfig
26from lsst.meas.base.applyApCorr import ApplyApCorrTask
27from lsst.meas.base.catalogCalculation import CatalogCalculationTask
28from lsst.meas.base.forcedMeasurement import ForcedMeasurementTask
29from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
31__all__ = ("ForcedPhotCoaddConfig", "ForcedPhotCoaddTask")
34class ForcedPhotCoaddConnections(
35 pipeBase.PipelineTaskConnections,
36 dimensions=("band", "skymap", "tract", "patch"),
37 defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"},
38):
39 inputSchema = pipeBase.connectionTypes.InitInput(
40 doc="Schema for the input measurement catalogs.",
41 name="{inputCoaddName}Coadd_ref_schema",
42 storageClass="SourceCatalog",
43 )
44 outputSchema = pipeBase.connectionTypes.InitOutput(
45 doc="Schema for the output forced measurement catalogs.",
46 name="{outputCoaddName}Coadd_forced_src_schema",
47 storageClass="SourceCatalog",
48 )
49 exposure = pipeBase.connectionTypes.Input(
50 doc="Input exposure to perform photometry on.",
51 name="{inputCoaddName}Coadd_calexp",
52 storageClass="ExposureF",
53 dimensions=["band", "skymap", "tract", "patch"],
54 )
55 exposure_cell = pipeBase.connectionTypes.Input(
56 doc="Input cell-based coadd exposure to perform photometry on.",
57 name="{inputCoaddName}CoaddCell",
58 storageClass="MultipleCellCoadd",
59 dimensions=["band", "skymap", "tract", "patch"],
60 )
61 background = pipeBase.connectionTypes.Input(
62 doc="Background to subtract from the exposure_cell.",
63 name="{inputCoaddName}Coadd_calexp_background",
64 storageClass="Background",
65 dimensions=["band", "skymap", "tract", "patch"],
66 )
67 refCat = pipeBase.connectionTypes.Input(
68 doc="Catalog of shapes and positions at which to force photometry.",
69 name="{inputCoaddName}Coadd_ref",
70 storageClass="SourceCatalog",
71 dimensions=["skymap", "tract", "patch"],
72 )
73 refCatInBand = pipeBase.connectionTypes.Input(
74 doc="Catalog of shapes and positions in the band having forced photometry done",
75 name="{inputCoaddName}Coadd_meas",
76 storageClass="SourceCatalog",
77 dimensions=("band", "skymap", "tract", "patch"),
78 )
79 footprintCatInBand = pipeBase.connectionTypes.Input(
80 doc="Catalog of footprints to attach to sources",
81 name="{inputCoaddName}Coadd_deblendedFlux",
82 storageClass="SourceCatalog",
83 dimensions=("band", "skymap", "tract", "patch"),
84 )
85 scarletModels = pipeBase.connectionTypes.Input(
86 doc="Multiband scarlet models produced by the deblender",
87 name="{inputCoaddName}Coadd_scarletModelData",
88 storageClass="LsstScarletModelData",
89 dimensions=("tract", "patch", "skymap"),
90 )
91 refWcs = pipeBase.connectionTypes.Input(
92 doc="Reference world coordinate system.",
93 name="{inputCoaddName}Coadd.wcs",
94 storageClass="Wcs",
95 dimensions=["band", "skymap", "tract", "patch"],
96 ) # used in place of a skymap wcs because of DM-28880
97 measCat = pipeBase.connectionTypes.Output(
98 doc="Output forced photometry catalog.",
99 name="{outputCoaddName}Coadd_forced_src",
100 storageClass="SourceCatalog",
101 dimensions=["band", "skymap", "tract", "patch"],
102 )
104 def __init__(self, *, config=None):
105 super().__init__(config=config)
106 if config is None:
107 return
109 if config.footprintDatasetName != "LsstScarletModelData":
110 self.inputs.remove("scarletModels")
111 if config.footprintDatasetName != "DeblendedFlux":
112 self.inputs.remove("footprintCatInBand")
113 if config.useCellCoadds:
114 self.inputs.remove("exposure")
115 else:
116 self.inputs.remove("exposure_cell")
117 self.inputs.remove("background")
120class ForcedPhotCoaddConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ForcedPhotCoaddConnections):
121 measurement = lsst.pex.config.ConfigurableField(
122 target=ForcedMeasurementTask, doc="subtask to do forced measurement"
123 )
124 coaddName = lsst.pex.config.Field(
125 doc="coadd name: typically one of deep or goodSeeing",
126 dtype=str,
127 default="deep",
128 )
129 useCellCoadds = lsst.pex.config.Field(
130 doc="Use cell-based coadds for forced measurements?",
131 dtype=bool,
132 default=False,
133 )
134 doApCorr = lsst.pex.config.Field(
135 dtype=bool, default=True, doc="Run subtask to apply aperture corrections"
136 )
137 applyApCorr = lsst.pex.config.ConfigurableField(
138 target=ApplyApCorrTask, doc="Subtask to apply aperture corrections"
139 )
140 catalogCalculation = lsst.pex.config.ConfigurableField(
141 target=CatalogCalculationTask, doc="Subtask to run catalogCalculation plugins on catalog"
142 )
143 footprintDatasetName = lsst.pex.config.Field(
144 doc="Dataset (without coadd prefix) that should be used to obtain (Heavy)Footprints for sources. "
145 "Must have IDs that match those of the reference catalog."
146 "If None, Footprints will be generated by transforming the reference Footprints.",
147 dtype=str,
148 default="LsstScarletModelData",
149 optional=True,
150 )
151 doConserveFlux = lsst.pex.config.Field(
152 dtype=bool,
153 default=True,
154 doc="Whether to use the deblender models as templates to re-distribute the flux "
155 "from the 'exposure' (True), or to perform measurements on the deblender model footprints. "
156 "If footprintDatasetName != 'LsstScarletModelData' then this field is ignored.",
157 )
158 doStripFootprints = lsst.pex.config.Field(
159 dtype=bool,
160 default=True,
161 doc="Whether to strip footprints from the output catalog before "
162 "saving to disk. "
163 "This is usually done when using scarlet models to save disk space.",
164 )
165 hasFakes = lsst.pex.config.Field(
166 dtype=bool,
167 default=False,
168 doc="Should be set to True if fake sources have been inserted into the input data.",
169 )
170 idGenerator = SkyMapIdGeneratorConfig.make_field()
172 def setDefaults(self):
173 # Docstring inherited.
174 # Make catalogCalculation a no-op by default as no modelFlux is setup
175 # by default in ForcedMeasurementTask
176 super().setDefaults()
178 self.catalogCalculation.plugins.names = []
179 self.measurement.copyColumns["id"] = "id"
180 self.measurement.copyColumns["parent"] = "parent"
181 self.measurement.plugins.names |= ["base_InputCount", "base_Variance"]
182 self.measurement.plugins["base_PixelFlags"].masksFpAnywhere = [
183 "CLIPPED",
184 "SENSOR_EDGE",
185 "REJECTED",
186 "INEXACT_PSF",
187 # TODO DM-44658 and DM-45980: don't have STREAK propagated yet.
188 # "STREAK",
189 ]
190 self.measurement.plugins["base_PixelFlags"].masksFpCenter = [
191 "CLIPPED",
192 "SENSOR_EDGE",
193 "REJECTED",
194 "INEXACT_PSF",
195 # "STREAK",
196 ]
199class ForcedPhotCoaddTask(pipeBase.PipelineTask):
200 """A pipeline task for performing forced measurement on coadd images.
202 Parameters
203 ----------
204 refSchema : `lsst.afw.table.Schema`, optional
205 The schema of the reference catalog, passed to the constructor of the
206 references subtask. Optional, but must be specified if ``initInputs``
207 is not; if both are specified, ``initInputs`` takes precedence.
208 initInputs : `dict`
209 Dictionary that can contain a key ``inputSchema`` containing the
210 schema. If present will override the value of ``refSchema``.
211 **kwds
212 Keyword arguments are passed to the supertask constructor.
213 """
215 ConfigClass = ForcedPhotCoaddConfig
216 _DefaultName = "forcedPhotCoadd"
217 dataPrefix = "deepCoadd_"
219 def __init__(self, refSchema=None, initInputs=None, **kwds):
220 super().__init__(**kwds)
222 if initInputs is not None:
223 refSchema = initInputs["inputSchema"].schema
225 if refSchema is None:
226 raise ValueError("No reference schema provided.")
227 self.makeSubtask("measurement", refSchema=refSchema)
228 # It is necessary to get the schema internal to the forced measurement
229 # task until such a time that the schema is not owned by the
230 # measurement task, but is passed in by an external caller.
231 if self.config.doApCorr:
232 self.makeSubtask("applyApCorr", schema=self.measurement.schema)
233 self.makeSubtask("catalogCalculation", schema=self.measurement.schema)
234 self.outputSchema = lsst.afw.table.SourceCatalog(self.measurement.schema)
236 def runQuantum(self, butlerQC, inputRefs, outputRefs):
237 inputs = butlerQC.get(inputRefs)
239 refCatInBand = inputs.pop("refCatInBand")
240 if self.config.footprintDatasetName == "LsstScarletModelData":
241 footprintData = inputs.pop("scarletModels")
242 elif self.config.footprintDatasetName == "DeblendedFlux":
243 footprintData = inputs.pop("footprintCatIndBand")
244 else:
245 footprintData = None
247 refCat = inputs.pop("refCat")
248 refWcs = inputs.pop("refWcs")
250 if self.config.useCellCoadds:
251 multiple_cell_coadd = inputs.pop("exposure_cell")
252 stitched_coadd = multiple_cell_coadd.stitch()
253 exposure = stitched_coadd.asExposure()
254 background = inputs.pop("background")
255 exposure.image -= background.getImage()
256 apCorrMap = stitched_coadd.ap_corr_map
257 dataId = inputRefs.exposure_cell.dataId
258 else:
259 exposure = inputs.pop("exposure")
260 apCorrMap = exposure.getInfo().getApCorrMap()
261 dataId = inputRefs.exposure.dataId
263 assert not inputs, "runQuantum got extra inputs."
265 measCat, exposureId = self.generateMeasCat(
266 dataId=dataId,
267 exposure=exposure,
268 refCat=refCat,
269 refCatInBand=refCatInBand,
270 refWcs=refWcs,
271 footprintData=footprintData,
272 )
273 outputs = self.run(
274 measCat=measCat,
275 exposure=exposure,
276 refCat=refCat,
277 refWcs=refWcs,
278 exposureId=exposureId,
279 apCorrMap=apCorrMap,
280 )
281 # Strip HeavyFootprints to save space on disk
282 if self.config.footprintDatasetName == "LsstScarletModelData" and self.config.doStripFootprints:
283 sources = outputs.measCat
284 for source in sources[sources["parent"] != 0]:
285 source.setFootprint(None)
286 butlerQC.put(outputs, outputRefs)
288 def generateMeasCat(self, dataId, exposure, refCat, refCatInBand, refWcs, footprintData):
289 """Generate a measurement catalog.
291 Parameters
292 ----------
293 dataId : `lsst.daf.butler.DataCoordinate`
294 Butler data ID for this image, with ``{tract, patch, band}`` keys.
295 exposure : `lsst.afw.image.exposure.Exposure`
296 Exposure to generate the catalog for.
297 refCat : `lsst.afw.table.SourceCatalog`
298 Catalog of shapes and positions at which to force photometry.
299 refCatInBand : `lsst.afw.table.SourceCatalog`
300 Catalog of shapes and position in the band forced photometry is
301 currently being performed
302 refWcs : `lsst.afw.image.SkyWcs`
303 Reference world coordinate system.
304 footprintData : `ScarletDataModel` or `lsst.afw.table.SourceCatalog`
305 Either the scarlet data models or the deblended catalog containings
306 footprints. If `footprintData` is `None` then the footprints
307 contained in `refCatInBand` are used.
309 Returns
310 -------
311 measCat : `lsst.afw.table.SourceCatalog`
312 Catalog of forced sources to measure.
313 expId : `int`
314 Unique binary id associated with the input exposure
316 Raises
317 ------
318 LookupError
319 Raised if a footprint with a given source id was in the reference
320 catalog but not in the reference catalog in band (meaning there was
321 some sort of mismatch in the two input catalogs)
322 """
323 id_generator = self.config.idGenerator.apply(dataId)
324 measCat = self.measurement.generateMeasCat(
325 exposure, refCat, refWcs, idFactory=id_generator.make_table_id_factory()
326 )
327 # attach footprints here as this can naturally live inside this method
328 if self.config.footprintDatasetName == "LsstScarletModelData":
329 # Load the scarlet models
330 self._attachScarletFootprints(
331 catalog=measCat, modelData=footprintData, exposure=exposure, band=dataId["band"]
332 )
333 else:
334 if self.config.footprintDatasetName is None:
335 footprintCat = refCatInBand
336 else:
337 footprintCat = footprintData
338 for srcRecord in measCat:
339 fpRecord = footprintCat.find(srcRecord.getId())
340 if fpRecord is None:
341 raise LookupError(
342 "Cannot find Footprint for source {}; please check that {} "
343 "IDs are compatible with reference source IDs".format(srcRecord.getId(), footprintCat)
344 )
345 srcRecord.setFootprint(fpRecord.getFootprint())
346 return measCat, id_generator.catalog_id
348 def run(self, measCat, exposure, refCat, refWcs, exposureId=None, apCorrMap=None):
349 """Perform forced measurement on a single exposure.
351 Parameters
352 ----------
353 measCat : `lsst.afw.table.SourceCatalog`
354 The measurement catalog, based on the sources listed in the
355 reference catalog.
356 exposure : `lsst.afw.image.Exposure`
357 The measurement image upon which to perform forced detection.
358 refCat : `lsst.afw.table.SourceCatalog`
359 The reference catalog of sources to measure.
360 refWcs : `lsst.afw.image.SkyWcs`
361 The WCS for the references.
362 exposureId : `int`
363 Optional unique exposureId used for random seed in measurement
364 task.
365 apCorrMap : `~lsst.afw.image.ApCorrMap`, optional
366 Aperture correction map to use for aperture corrections.
367 If not provided, the map is read from the exposure.
369 Returns
370 -------
371 result : ~`lsst.pipe.base.Struct`
372 Structure with fields:
374 ``measCat``
375 Catalog of forced measurement results
376 (`lsst.afw.table.SourceCatalog`).
377 """
378 # We want to cache repeated PSF evaluations at the same point coming
379 # from different measurement plugins. We assume each algorithm tries
380 # to evaluate the PSF twice, which is more than enough since many don't
381 # evaluate it at all, and there's no *good* reason for any algorithm to
382 # evaluate it more than once.
383 exposure.psf.setCacheCapacity(2 * len(self.config.measurement.plugins.names))
384 # Some mask planes may not be defined on the coadds always.
385 # We add the mask planes, which is a no-op if already defined.
386 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpAnywhere:
387 exposure.mask.addMaskPlane(maskPlane)
388 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpCenter:
389 exposure.mask.addMaskPlane(maskPlane)
390 self.measurement.run(measCat, exposure, refCat, refWcs, exposureId=exposureId)
391 if self.config.doApCorr:
392 if apCorrMap is None:
393 apCorrMap = exposure.getInfo().getApCorrMap()
394 self.applyApCorr.run(catalog=measCat, apCorrMap=apCorrMap)
396 self.catalogCalculation.run(measCat)
398 return pipeBase.Struct(measCat=measCat)
400 def _attachScarletFootprints(self, catalog, modelData, exposure, band):
401 """Attach scarlet models as HeavyFootprints"""
402 if self.config.doConserveFlux:
403 redistributeImage = exposure
404 else:
405 redistributeImage = None
406 # Attach the footprints
407 updateCatalogFootprints(
408 modelData=modelData,
409 catalog=catalog,
410 band=band,
411 imageForRedistribution=redistributeImage,
412 removeScarletData=True,
413 updateFluxColumns=False,
414 )