Coverage for python / lsst / pipe / tasks / multiBand.py: 0%
344 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:06 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:06 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["DetectCoaddSourcesConfig", "DetectCoaddSourcesTask",
23 "MeasureMergedCoaddSourcesConfig", "MeasureMergedCoaddSourcesTask",
24 ]
26import numpy as np
28from lsst.geom import Extent2I
29from lsst.pipe.base import (
30 AnnotatedPartialOutputsError,
31 Struct,
32 PipelineTask,
33 PipelineTaskConfig,
34 PipelineTaskConnections
35)
36import lsst.pipe.base.connectionTypes as cT
37from lsst.pex.config import Field, ConfigurableField, ChoiceField
38from lsst.meas.algorithms import (
39 DynamicDetectionTask,
40 ExceedsMaxVarianceScaleError,
41 InsufficientSourcesError,
42 PsfGenerationError,
43 ReferenceObjectLoader,
44 ScaleVarianceTask,
45 SetPrimaryFlagsTask,
46 TooManyMaskedPixelsError,
47 ZeroFootprintError,
48)
49from lsst.meas.base import (
50 SingleFrameMeasurementTask,
51 ApplyApCorrTask,
52 CatalogCalculationTask,
53 SkyMapIdGeneratorConfig,
54)
55from lsst.meas.extensions.scarlet.io import updateCatalogFootprints
56from lsst.meas.astrom import DirectMatchTask, denormalizeMatches
57from lsst.pipe.tasks.propagateSourceFlags import PropagateSourceFlagsTask
58import lsst.afw.image as afwImage
59import lsst.afw.math as afwMath
60import lsst.afw.table as afwTable
61from lsst.daf.base import PropertyList
62from lsst.skymap import BaseSkyMap
64# NOTE: these imports are a convenience so multiband users only have to import this file.
65from .mergeDetections import MergeDetectionsConfig, MergeDetectionsTask # noqa: F401
66from .mergeMeasurements import MergeMeasurementsConfig, MergeMeasurementsTask # noqa: F401
67from .multiBandUtils import CullPeaksConfig # noqa: F401
68from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiConfig # noqa: F401
69from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiTask # noqa: F401
72"""
73New set types:
74* deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
75* deepCoadd_mergeDet: merged detections (tract, patch)
76* deepCoadd_meas: measurements of merged detections (tract, patch, filter)
77* deepCoadd_ref: reference sources (tract, patch)
78All of these have associated *_schema catalogs that require no data ID and hold no records.
80In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
81the mergeDet, meas, and ref dataset Footprints:
82* deepCoadd_peak_schema
83"""
86##############################################################################################################
87class DetectCoaddSourcesConnections(PipelineTaskConnections,
88 dimensions=("tract", "patch", "band", "skymap"),
89 defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"}):
90 detectionSchema = cT.InitOutput(
91 doc="Schema of the detection catalog",
92 name="{outputCoaddName}Coadd_det_schema",
93 storageClass="SourceCatalog",
94 )
95 exposure = cT.Input(
96 doc="Exposure on which detections are to be performed. ",
97 name="{inputCoaddName}Coadd",
98 storageClass="ExposureF",
99 dimensions=("tract", "patch", "band", "skymap")
100 )
101 exposure_cells = cT.Input(
102 doc="Exposure on which detections are to be performed. ",
103 name="{inputCoaddName}CoaddCell",
104 storageClass="MultipleCellCoadd",
105 dimensions=("tract", "patch", "band", "skymap"),
106 )
107 skyMap = cT.Input(
108 doc="Description of the skymap's tracts and patches.",
109 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
110 storageClass="SkyMap",
111 dimensions=("skymap",),
112 )
113 outputBackgrounds = cT.Output(
114 doc="Output Backgrounds used in detection",
115 name="{outputCoaddName}Coadd_calexp_background",
116 storageClass="Background",
117 dimensions=("tract", "patch", "band", "skymap")
118 )
119 outputSources = cT.Output(
120 doc="Detected sources catalog",
121 name="{outputCoaddName}Coadd_det",
122 storageClass="SourceCatalog",
123 dimensions=("tract", "patch", "band", "skymap")
124 )
125 outputExposure = cT.Output(
126 doc="Exposure post detection",
127 name="{outputCoaddName}Coadd_calexp",
128 storageClass="ExposureF",
129 dimensions=("tract", "patch", "band", "skymap")
130 )
132 def __init__(self, *, config=None):
133 super().__init__(config=config)
134 assert isinstance(config, DetectCoaddSourcesConfig)
136 if config.useCellCoadds:
137 del self.exposure
138 else:
139 del self.exposure_cells
141 if not self.config.forceExactBinning:
142 del self.skyMap
143 if self.config.writeOnlyBackgrounds:
144 del self.outputExposure
145 del self.outputSources
146 del self.detectionSchema
149class DetectCoaddSourcesConfig(PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
150 """Configuration parameters for the DetectCoaddSourcesTask
151 """
153 doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?")
154 scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc="Variance rescaling")
155 detection = ConfigurableField(target=DynamicDetectionTask, doc="Source detection")
156 coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
157 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?")
158 hasFakes = Field(
159 dtype=bool,
160 default=False,
161 doc="Should be set to True if fake sources have been inserted into the input data.",
162 )
163 idGenerator = SkyMapIdGeneratorConfig.make_field()
164 forceExactBinning = Field(
165 dtype=bool,
166 default=False,
167 doc=(
168 "Check that the background bin size evenly divides the patch inner region, and "
169 "crop the outer region to an integer number of bins."
170 )
171 )
172 writeOnlyBackgrounds = Field(dtype=bool, default=False, doc="If true, only save the background models.")
173 writeEmptyBackgrounds = Field(
174 dtype=bool,
175 default=True,
176 doc=(
177 "If true, save a placeholder background with NaNs in all bins (but the right geometry) when "
178 "there are no pixels to compute a background from. This can be useful if a later task combines "
179 "backgrounds from multiple patches as input."
180 )
181 )
183 def setDefaults(self):
184 super().setDefaults()
185 self.detection.thresholdType = "pixel_stdev"
186 self.detection.isotropicGrow = True
187 # Coadds are made from background-subtracted CCDs, so any background subtraction should be very basic
188 self.detection.reEstimateBackground = False
189 self.detection.background.useApprox = False
190 self.detection.background.binSize = 4096
191 self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER'
192 self.detection.doTempWideBackground = True # Suppress large footprints that overwhelm the deblender
193 # Include band in packed data IDs that go into object IDs (None -> "as
194 # many bands as are defined", rather than the default of zero).
195 self.idGenerator.packer.n_bands = None
198class DetectCoaddSourcesTask(PipelineTask):
199 """Detect sources on a single filter coadd.
201 Coadding individual visits requires each exposure to be warped. This
202 introduces covariance in the noise properties across pixels. Before
203 detection, we correct the coadd variance by scaling the variance plane in
204 the coadd to match the observed variance. This is an approximate
205 approach -- strictly, we should propagate the full covariance matrix --
206 but it is simple and works well in practice.
208 After scaling the variance plane, we detect sources and generate footprints
209 by delegating to the @ref SourceDetectionTask_ "detection" subtask.
211 DetectCoaddSourcesTask is meant to be run after assembling a coadded image
212 in a given band. The purpose of the task is to update the background,
213 detect all sources in a single band and generate a set of parent
214 footprints. Subsequent tasks in the multi-band processing procedure will
215 merge sources across bands and, eventually, perform forced photometry.
217 Parameters
218 ----------
219 schema : `lsst.afw.table.Schema`, optional
220 Initial schema for the output catalog, modified-in place to include all
221 fields set by this task. If None, the source minimal schema will be used.
222 **kwargs
223 Additional keyword arguments.
224 """
226 _DefaultName = "detectCoaddSources"
227 ConfigClass = DetectCoaddSourcesConfig
229 def __init__(self, schema=None, **kwargs):
230 # N.B. Super is used here to handle the multiple inheritance of PipelineTasks, the init tree
231 # call structure has been reviewed carefully to be sure super will work as intended.
232 super().__init__(**kwargs)
233 if schema is None:
234 schema = afwTable.SourceTable.makeMinimalSchema()
235 self.schema = schema
236 self.makeSubtask("detection", schema=self.schema)
237 if self.config.doScaleVariance:
238 self.makeSubtask("scaleVariance")
240 self.detectionSchema = afwTable.SourceCatalog(self.schema)
242 def runQuantum(self, butlerQC, inputRefs, outputRefs):
243 inputs = butlerQC.get(inputRefs)
244 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
246 if self.config.useCellCoadds:
247 multiple_cell_coadd = inputs.pop("exposure_cells")
248 exposure = multiple_cell_coadd.stitch().asExposure()
249 else:
250 exposure = inputs.pop("exposure")
252 skyMap = inputs.pop("skyMap", None)
253 if skyMap is not None:
254 patchInfo = skyMap[butlerQC.quantum.dataId["tract"]][butlerQC.quantum.dataId["patch"]]
255 else:
256 patchInfo = None
258 assert not inputs, "runQuantum got more inputs than expected."
259 try:
260 outputs = self.run(
261 exposure=exposure,
262 idFactory=idGenerator.make_table_id_factory(),
263 expId=idGenerator.catalog_id,
264 patchInfo=patchInfo,
265 )
266 except (
267 TooManyMaskedPixelsError,
268 ExceedsMaxVarianceScaleError,
269 InsufficientSourcesError,
270 PsfGenerationError,
271 ZeroFootprintError,
272 ) as e:
273 if self.config.writeEmptyBackgrounds:
274 butlerQC.put(self._makeEmptyBackground(exposure, patchInfo), outputRefs.outputBackgrounds)
275 butlerQC.put(exposure, outputRefs.outputExposure)
276 error = AnnotatedPartialOutputsError.annotate(
277 e,
278 self,
279 exposure,
280 log=self.log,
281 )
282 raise error from e
284 butlerQC.put(outputs, outputRefs)
286 def run(self, exposure, idFactory, expId, patchInfo=None):
287 """Run detection on an exposure.
289 First scale the variance plane to match the observed variance
290 using ``ScaleVarianceTask``. Then invoke the ``SourceDetectionTask_`` "detection" subtask to
291 detect sources.
293 Parameters
294 ----------
295 exposure : `lsst.afw.image.Exposure`
296 Exposure on which to detect (may be background-subtracted and scaled,
297 depending on configuration).
298 idFactory : `lsst.afw.table.IdFactory`
299 IdFactory to set source identifiers.
300 expId : `int`
301 Exposure identifier (integer) for RNG seed.
302 patchInfo : `lsst.skymap.PatchInfo`, optional
303 Description of the patch geometry. Only needed if
304 `~DetectCoaddSourceConfig.forceExactBinning` is `True`.
306 Returns
307 -------
308 result : `lsst.pipe.base.Struct`
309 Results as a struct with attributes:
311 ``sources``
312 Catalog of detections (`lsst.afw.table.SourceCatalog`).
313 ``backgrounds``
314 List of backgrounds (`list`).
315 """
316 if self.config.forceExactBinning:
317 exposure = self._cropToExactBinning(exposure, patchInfo)
318 if self.config.doScaleVariance:
319 varScale = self.scaleVariance.run(exposure.maskedImage)
320 exposure.getMetadata().add("VARIANCE_SCALE", varScale)
321 backgrounds = afwMath.BackgroundList()
322 table = afwTable.SourceTable.make(self.schema, idFactory)
323 detections = self.detection.run(table, exposure, expId=expId)
324 sources = detections.sources
325 if hasattr(detections, "background") and detections.background:
326 for bg in detections.background:
327 backgrounds.append(bg)
328 if len(backgrounds) == 0:
329 # Persist a constant background with value of NaN to get around
330 # inability to persist empty BackgroundList.
331 emptyBg = self._makeEmptyBackground(exposure, patchInfo)
332 backgrounds.append(emptyBg)
334 return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
336 def _cropToExactBinning(self, exposure, patchInfo):
337 """Crop a coadd `~lsst.afw.image.Exposure` instance to ensure exact
338 background binning.
340 Parameters
341 ----------
342 exposure : `lsst.afw.image.Exposure`
343 Exposure to crop, assumed to cover the patch outer bounding box.
344 patchInfo : `lsst.skymap.PatchInfo`
345 Description of the patch geometry.
347 Returns
348 -------
349 cropped : `lsst.afw.image.Exposure`
350 View of ``exposure`` with background bins that evenly divide both
351 the full cropped image and the patch inner region. The bounding
352 box is guaranteed to contain the patch inner bounding box and be
353 contained by the patch outer bounding box.
355 Raises
356 ------
357 ValueError
358 Raised if the patch inner region width or height is not a multiple
359 of the background bin size.
360 """
361 bbox = patchInfo.getInnerBBox()
362 if bbox.width % self.detection.background.binSizeX:
363 raise ValueError(
364 f"Patch inner width {bbox.width} does not evenly "
365 f"divide bin width {self.detection.background.binSizeX}."
366 )
367 if bbox.height % self.detection.background.binSizeY:
368 raise ValueError(
369 f"Patch inner height {bbox.height} does not evenly "
370 f"divide bin height {self.detection.background.binSizeY}."
371 )
372 outer_bbox = patchInfo.getOuterBBox()
373 n_bins_grow_x = (bbox.x.begin - outer_bbox.x.begin) // self.detection.background.binSizeX
374 n_bins_grow_y = (bbox.y.begin - outer_bbox.y.begin) // self.detection.background.binSizeY
375 bbox.grow(
376 Extent2I(
377 n_bins_grow_x*self.detection.background.binSizeX,
378 n_bins_grow_y*self.detection.background.binSizeY,
379 )
380 )
381 assert outer_bbox.contains(bbox)
382 assert bbox.contains(patchInfo.getInnerBBox())
383 assert bbox.width % self.detection.background.binSizeX == 0
384 assert bbox.height % self.detection.background.binSizeY == 0
385 return exposure[bbox]
387 def _makeEmptyBackground(self, exposure, patchInfo=None):
388 """Construct an empty `lsst.afw.math.BackgroundList` with NaN values.
390 Parameters
391 ----------
392 exposure : `lsst.afw.image.Exposure`
393 Exposure that the background should correspond to.
394 patchInfo : `lsst.skymap.PatchInfo`, optional
395 Description of the patch geometry. Only needed if
396 `~DetectCoaddSourceConfig.forceExactBinning` is `True`.
398 Returns
399 -------
400 background : `lsst.afw.math.BackgroundList`
401 A background object with a single layer and the same bin geometry
402 that a background for that exposure would have had if it had enough
403 usable pixels. This object cannot actually be used for background
404 subtraction.
405 """
406 # Create a backgroundList with one entry whose "stats image" is NaNs
407 # and has all pixels set as NO_DATA.
408 if self.config.forceExactBinning:
409 exposure = self._cropToExactBinning(exposure, patchInfo).clone()
411 bgLevel = np.nan
412 bgStats = afwImage.MaskedImageF(1, 1)
413 bgStats.set(bgLevel, 0, bgLevel)
414 bg = afwMath.BackgroundMI(exposure.getBBox(), bgStats)
415 bgData = (bg, afwMath.Interpolate.LINEAR, afwMath.REDUCE_INTERP_ORDER,
416 afwMath.ApproximateControl.UNKNOWN, 0, 0, False)
417 background = afwMath.BackgroundList()
418 background.append(bgData)
419 for bg, *_ in background:
420 stats = bg.getStatsImage()
421 stats.mask.array[:, :] = stats.mask.getPlaneBitMask("NO_DATA")
422 stats.variance.array[:, :] = 0.0
423 return background
426class MeasureMergedCoaddSourcesConnections(
427 PipelineTaskConnections,
428 dimensions=("tract", "patch", "band", "skymap"),
429 defaultTemplates={
430 "inputCoaddName": "deep",
431 "outputCoaddName": "deep",
432 "deblendedCatalog": "deblendedFlux",
433 },
434 deprecatedTemplates={
435 # TODO[DM-47797]: remove this deprecated connection template.
436 "deblendedCatalog": "Support for old deblender outputs will be removed after v29."
437 },
438):
439 inputSchema = cT.InitInput(
440 doc="Input schema for measure merged task produced by a deblender or detection task",
441 name="{inputCoaddName}Coadd_deblendedFlux_schema",
442 storageClass="SourceCatalog"
443 )
444 outputSchema = cT.InitOutput(
445 doc="Output schema after all new fields are added by task",
446 name="{inputCoaddName}Coadd_meas_schema",
447 storageClass="SourceCatalog"
448 )
449 # TODO[DM-47797]: remove this deprecated connection.
450 refCat = cT.PrerequisiteInput(
451 doc="Reference catalog used to match measured sources against known sources",
452 name="ref_cat",
453 storageClass="SimpleCatalog",
454 dimensions=("skypix",),
455 deferLoad=True,
456 multiple=True,
457 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
458 )
459 exposure = cT.Input(
460 doc="Input non-cell-based coadd image",
461 name="{inputCoaddName}Coadd_calexp",
462 storageClass="ExposureF",
463 dimensions=("tract", "patch", "band", "skymap")
464 )
465 exposure_cells = cT.Input(
466 doc="Input cell-based coadd image",
467 name="{inputCoaddName}CoaddCell",
468 storageClass="MultipleCellCoadd",
469 dimensions=("tract", "patch", "band", "skymap"),
470 )
471 background = cT.Input(
472 doc="Background to subtract from cell-based coadd image",
473 name="{inputCoaddName}Coadd_calexp_background",
474 storageClass="Background",
475 dimensions=("tract", "patch", "band", "skymap")
476 )
477 skyMap = cT.Input(
478 doc="SkyMap to use in processing",
479 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
480 storageClass="SkyMap",
481 dimensions=("skymap",),
482 )
483 # TODO[DM-47424]: remove this deprecated connection.
484 visitCatalogs = cT.Input(
485 doc="Deprecated and unused.",
486 name="src",
487 dimensions=("instrument", "visit", "detector"),
488 storageClass="SourceCatalog",
489 multiple=True,
490 deprecated="Deprecated and unused. Will be removed after v29.",
491 )
492 sourceTableHandles = cT.Input(
493 doc=("Source tables that are derived from the ``CalibrateTask`` sources. "
494 "These tables contain astrometry and photometry flags, and optionally "
495 "PSF flags."),
496 name="sourceTable_visit",
497 storageClass="ArrowAstropy",
498 dimensions=("instrument", "visit"),
499 multiple=True,
500 deferLoad=True,
501 )
502 finalizedSourceTableHandles = cT.Input(
503 doc=("Finalized source tables from ``FinalizeCalibrationTask``. These "
504 "tables contain PSF flags from the finalized PSF estimation."),
505 name="finalized_src_table",
506 storageClass="ArrowAstropy",
507 dimensions=("instrument", "visit"),
508 multiple=True,
509 deferLoad=True,
510 )
511 finalVisitSummaryHandles = cT.Input(
512 doc="Final visit summary table",
513 name="finalVisitSummary",
514 storageClass="ExposureCatalog",
515 dimensions=("instrument", "visit"),
516 multiple=True,
517 deferLoad=True,
518 )
519 # TODO[DM-47797]: remove this deprecated connection.
520 inputCatalog = cT.Input(
521 doc=("Name of the input catalog to use."
522 "If the single band deblender was used this should be 'deblendedFlux."
523 "If the multi-band deblender was used this should be 'deblendedModel, "
524 "or deblendedFlux if the multiband deblender was configured to output "
525 "deblended flux catalogs. If no deblending was performed this should "
526 "be 'mergeDet'"),
527 name="{inputCoaddName}Coadd_{deblendedCatalog}",
528 storageClass="SourceCatalog",
529 deprecated="Support for old deblender outputs will be removed after v29.",
530 dimensions=("tract", "patch", "band", "skymap"),
531 )
532 scarletCatalog = cT.Input(
533 doc="Catalogs produced by multiband deblending",
534 name="{inputCoaddName}Coadd_deblendedCatalog",
535 storageClass="SourceCatalog",
536 dimensions=("tract", "patch", "skymap"),
537 )
538 scarletModels = cT.Input(
539 doc="Multiband scarlet models produced by the deblender",
540 name="{inputCoaddName}Coadd_scarletModelData",
541 storageClass="LsstScarletModelData",
542 dimensions=("tract", "patch", "skymap"),
543 )
544 outputSources = cT.Output(
545 doc="Source catalog containing all the measurement information generated in this task",
546 name="{outputCoaddName}Coadd_meas",
547 dimensions=("tract", "patch", "band", "skymap"),
548 storageClass="SourceCatalog",
549 )
550 # TODO[DM-47797]: remove this deprecated connection.
551 matchResult = cT.Output(
552 doc="Match catalog produced by configured matcher, optional on doMatchSources",
553 name="{outputCoaddName}Coadd_measMatch",
554 dimensions=("tract", "patch", "band", "skymap"),
555 storageClass="Catalog",
556 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
557 )
558 # TODO[DM-47797]: remove this deprecated connection.
559 denormMatches = cT.Output(
560 doc="Denormalized Match catalog produced by configured matcher, optional on "
561 "doWriteMatchesDenormalized",
562 name="{outputCoaddName}Coadd_measMatchFull",
563 dimensions=("tract", "patch", "band", "skymap"),
564 storageClass="Catalog",
565 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
566 )
568 def __init__(self, *, config=None):
569 super().__init__(config=config)
570 del self.visitCatalogs
571 if not config.doPropagateFlags:
572 del self.sourceTableHandles
573 del self.finalizedSourceTableHandles
574 else:
575 # Check for types of flags required.
576 if not config.propagateFlags.source_flags:
577 del self.sourceTableHandles
578 if not config.propagateFlags.finalized_source_flags:
579 del self.finalizedSourceTableHandles
580 # TODO[DM-47797]: only the 'if' block contents here should survive.
581 if config.inputCatalog == "deblendedCatalog":
582 del self.inputCatalog
583 if not config.doAddFootprints:
584 del self.scarletModels
585 else:
586 del self.deblendedCatalog
587 del self.scarletModels
589 # TODO[DM-47797]: delete the conditionals below.
590 if not config.doMatchSources:
591 del self.refCat
592 del self.matchResult
594 if not config.doWriteMatchesDenormalized:
595 del self.denormMatches
597 if config.useCellCoadds:
598 del self.exposure
599 else:
600 del self.exposure_cells
601 del self.background
604class MeasureMergedCoaddSourcesConfig(PipelineTaskConfig,
605 pipelineConnections=MeasureMergedCoaddSourcesConnections):
606 """Configuration parameters for the MeasureMergedCoaddSourcesTask
607 """
608 inputCatalog = ChoiceField(
609 dtype=str,
610 default="deblendedCatalog",
611 allowed={
612 "deblendedCatalog": "Output catalog from ScarletDeblendTask",
613 "deblendedFlux": "Output catalog from SourceDeblendTask",
614 "mergeDet": "The merged detections before deblending."
615 },
616 doc="The name of the input catalog.",
617 # TODO[DM-47797]: remove this config option and anything using it.
618 deprecated="Support for old deblender outputs will be removed after v29.",
619 )
620 doAddFootprints = Field(dtype=bool,
621 default=True,
622 doc="Whether or not to add footprints to the input catalog from scarlet models. "
623 "This should be true whenever using the multi-band deblender, "
624 "otherwise this should be False.")
625 doConserveFlux = Field(dtype=bool, default=True,
626 doc="Whether to use the deblender models as templates to re-distribute the flux "
627 "from the 'exposure' (True), or to perform measurements on the deblender "
628 "model footprints.")
629 doStripFootprints = Field(dtype=bool, default=True,
630 doc="Whether to strip footprints from the output catalog before "
631 "saving to disk. "
632 "This is usually done when using scarlet models to save disk space.")
633 useCellCoadds = Field(dtype=bool, default=False, doc="Whether to use cell coadds?")
634 measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement")
635 setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc="Set flags for primary tract/patch")
636 doPropagateFlags = Field(
637 dtype=bool, default=True,
638 doc="Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
639 )
640 propagateFlags = ConfigurableField(target=PropagateSourceFlagsTask, doc="Propagate source flags to coadd")
641 doMatchSources = Field(
642 dtype=bool,
643 default=False,
644 doc="Match sources to reference catalog?",
645 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
646 )
647 match = ConfigurableField(
648 target=DirectMatchTask,
649 doc="Matching to reference catalog",
650 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
651 )
652 doWriteMatchesDenormalized = Field(
653 dtype=bool,
654 default=False,
655 doc=("Write reference matches in denormalized format? "
656 "This format uses more disk space, but is more convenient to read."),
657 deprecated="Reference matching in measureCoaddSources will be removed after v29.",
658 )
659 coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
660 psfCache = Field(dtype=int, default=100, doc="Size of psfCache")
661 checkUnitsParseStrict = Field(
662 doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
663 dtype=str,
664 default="raise",
665 )
666 doApCorr = Field(
667 dtype=bool,
668 default=True,
669 doc="Apply aperture corrections"
670 )
671 applyApCorr = ConfigurableField(
672 target=ApplyApCorrTask,
673 doc="Subtask to apply aperture corrections"
674 )
675 doRunCatalogCalculation = Field(
676 dtype=bool,
677 default=True,
678 doc='Run catalogCalculation task'
679 )
680 catalogCalculation = ConfigurableField(
681 target=CatalogCalculationTask,
682 doc="Subtask to run catalogCalculation plugins on catalog"
683 )
685 hasFakes = Field(
686 dtype=bool,
687 default=False,
688 doc="Should be set to True if fake sources have been inserted into the input data."
689 )
690 idGenerator = SkyMapIdGeneratorConfig.make_field()
692 @property
693 def refObjLoader(self):
694 return self.match.refObjLoader
696 def setDefaults(self):
697 super().setDefaults()
698 self.measurement.plugins.names |= ['base_InputCount',
699 'base_Variance',
700 'base_LocalPhotoCalib',
701 'base_LocalWcs']
703 # TODO: Remove STREAK in DM-44658, streak masking to happen only in
704 # ip_diffim; if we can propagate the streak mask from diffim, we can
705 # still set flags with it here.
706 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['CLIPPED', 'SENSOR_EDGE',
707 'INEXACT_PSF']
708 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['CLIPPED', 'SENSOR_EDGE',
709 'INEXACT_PSF']
711 def validate(self):
712 super().validate()
714 if not self.doMatchSources and self.doWriteMatchesDenormalized:
715 raise ValueError("Cannot set doWriteMatchesDenormalized if doMatchSources is False.")
718class MeasureMergedCoaddSourcesTask(PipelineTask):
719 """Deblend sources from main catalog in each coadd seperately and measure.
721 Use peaks and footprints from a master catalog to perform deblending and
722 measurement in each coadd.
724 Given a master input catalog of sources (peaks and footprints) or deblender
725 outputs(including a HeavyFootprint in each band), measure each source on
726 the coadd. Repeating this procedure with the same master catalog across
727 multiple coadds will generate a consistent set of child sources.
729 The deblender retains all peaks and deblends any missing peaks (dropouts in
730 that band) as PSFs. Source properties are measured and the @c is-primary
731 flag (indicating sources with no children) is set. Visit flags are
732 propagated to the coadd sources.
734 Optionally, we can match the coadd sources to an external reference
735 catalog.
737 After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we
738 have a set of per-band catalogs. The next stage in the multi-band
739 processing procedure will merge these measurements into a suitable catalog
740 for driving forced photometry.
742 Parameters
743 ----------
744 schema : ``lsst.afw.table.Schema`, optional
745 The schema of the merged detection catalog used as input to this one.
746 peakSchema : ``lsst.afw.table.Schema`, optional
747 The schema of the PeakRecords in the Footprints in the merged detection catalog.
748 refObjLoader : `lsst.meas.algorithms.ReferenceObjectLoader`, optional
749 An instance of ReferenceObjectLoader that supplies an external reference
750 catalog. May be None if the loader can be constructed from the butler argument or all steps
751 requiring a reference catalog are disabled.
752 initInputs : `dict`, optional
753 Dictionary that can contain a key ``inputSchema`` containing the
754 input schema. If present will override the value of ``schema``.
755 **kwargs
756 Additional keyword arguments.
757 """
759 _DefaultName = "measureCoaddSources"
760 ConfigClass = MeasureMergedCoaddSourcesConfig
762 def __init__(self, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
763 **kwargs):
764 super().__init__(**kwargs)
765 self.deblended = self.config.inputCatalog.startswith("deblended")
766 self.inputCatalog = "Coadd_" + self.config.inputCatalog
767 if initInputs is not None:
768 schema = initInputs['inputSchema'].schema
769 if schema is None:
770 raise ValueError("Schema must be defined.")
771 self.schemaMapper = afwTable.SchemaMapper(schema)
772 self.schemaMapper.addMinimalSchema(schema)
773 self.schema = self.schemaMapper.getOutputSchema()
774 self.algMetadata = PropertyList()
775 self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata)
776 self.makeSubtask("setPrimaryFlags", schema=self.schema)
777 # TODO[DM-47797]: remove match subtask
778 if self.config.doMatchSources:
779 self.makeSubtask("match", refObjLoader=refObjLoader)
780 if self.config.doPropagateFlags:
781 self.makeSubtask("propagateFlags", schema=self.schema)
782 self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
783 if self.config.doApCorr:
784 self.makeSubtask("applyApCorr", schema=self.schema)
785 if self.config.doRunCatalogCalculation:
786 self.makeSubtask("catalogCalculation", schema=self.schema)
788 self.outputSchema = afwTable.SourceCatalog(self.schema)
790 def runQuantum(self, butlerQC, inputRefs, outputRefs):
791 inputs = butlerQC.get(inputRefs)
793 # TODO[DM-47797]: remove this block
794 if self.config.doMatchSources:
795 refObjLoader = ReferenceObjectLoader([ref.datasetRef.dataId for ref in inputRefs.refCat],
796 inputs.pop('refCat'),
797 name=self.config.connections.refCat,
798 config=self.config.refObjLoader,
799 log=self.log)
800 self.match.setRefObjLoader(refObjLoader)
802 if self.config.useCellCoadds:
803 multiple_cell_coadd = inputs.pop("exposure_cells")
804 stitched_coadd = multiple_cell_coadd.stitch()
805 exposure = stitched_coadd.asExposure()
806 background = inputs.pop("background")
807 exposure.image -= background.getImage()
809 ccdInputs = stitched_coadd.ccds
810 apCorrMap = stitched_coadd.ap_corr_map
811 band = inputRefs.exposure_cells.dataId["band"]
812 else:
813 exposure = inputs.pop("exposure")
814 # Set psfcache
815 # move this to run after gen2 deprecation
816 exposure.getPsf().setCacheCapacity(self.config.psfCache)
818 ccdInputs = exposure.getInfo().getCoaddInputs().ccds
819 apCorrMap = exposure.getInfo().getApCorrMap()
820 band = inputRefs.exposure.dataId["band"]
822 # Get unique integer ID for IdFactory and RNG seeds; only the latter
823 # should really be used as the IDs all come from the input catalog.
824 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
826 # Transform inputCatalog
827 table = afwTable.SourceTable.make(self.schema, idGenerator.make_table_id_factory())
828 sources = afwTable.SourceCatalog(table)
829 # Load the correct input catalog
830 if "scarletCatalog" in inputs:
831 inputCatalog = inputs.pop("scarletCatalog")
832 catalogRef = inputRefs.scarletCatalog
833 else:
834 inputCatalog = inputs.pop("inputCatalog")
835 catalogRef = inputRefs.inputCatalog
836 sources.extend(inputCatalog, self.schemaMapper)
837 del inputCatalog
838 # Add the HeavyFootprints to the deblended sources
839 if self.config.doAddFootprints:
840 modelData = inputs.pop('scarletModels')
841 if self.config.doConserveFlux:
842 imageForRedistribution = exposure
843 else:
844 imageForRedistribution = None
845 updateCatalogFootprints(
846 modelData=modelData,
847 catalog=sources,
848 band=band,
849 imageForRedistribution=imageForRedistribution,
850 removeScarletData=True,
851 updateFluxColumns=True,
852 )
853 table = sources.getTable()
854 table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
856 skyMap = inputs.pop('skyMap')
857 tractNumber = catalogRef.dataId['tract']
858 tractInfo = skyMap[tractNumber]
859 patchInfo = tractInfo.getPatchInfo(catalogRef.dataId['patch'])
860 skyInfo = Struct(
861 skyMap=skyMap,
862 tractInfo=tractInfo,
863 patchInfo=patchInfo,
864 wcs=tractInfo.getWcs(),
865 bbox=patchInfo.getOuterBBox()
866 )
868 if self.config.doPropagateFlags:
869 if "sourceTableHandles" in inputs:
870 sourceTableHandles = inputs.pop("sourceTableHandles")
871 sourceTableHandleDict = {handle.dataId["visit"]: handle for handle in sourceTableHandles}
872 else:
873 sourceTableHandleDict = None
874 if "finalizedSourceTableHandles" in inputs:
875 finalizedSourceTableHandles = inputs.pop("finalizedSourceTableHandles")
876 finalizedSourceTableHandleDict = {handle.dataId["visit"]: handle
877 for handle in finalizedSourceTableHandles}
878 else:
879 finalizedSourceTableHandleDict = None
880 if "finalVisitSummaryHandles" in inputs:
881 finalVisitSummaryHandles = inputs.pop("finalVisitSummaryHandles")
882 finalVisitSummaryHandleDict = {handle.dataId["visit"]: handle
883 for handle in finalVisitSummaryHandles}
884 else:
885 finalVisitSummaryHandleDict = None
887 assert not inputs, "runQuantum got more inputs than expected."
888 outputs = self.run(
889 exposure=exposure,
890 sources=sources,
891 skyInfo=skyInfo,
892 exposureId=idGenerator.catalog_id,
893 ccdInputs=ccdInputs,
894 sourceTableHandleDict=sourceTableHandleDict,
895 finalizedSourceTableHandleDict=finalizedSourceTableHandleDict,
896 finalVisitSummaryHandleDict=finalVisitSummaryHandleDict,
897 apCorrMap=apCorrMap,
898 )
899 # Strip HeavyFootprints to save space on disk
900 if self.config.doStripFootprints:
901 sources = outputs.outputSources
902 for source in sources[sources["parent"] != 0]:
903 source.setFootprint(None)
904 butlerQC.put(outputs, outputRefs)
906 def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
907 sourceTableHandleDict=None, finalizedSourceTableHandleDict=None, finalVisitSummaryHandleDict=None,
908 apCorrMap=None):
909 """Run measurement algorithms on the input exposure, and optionally populate the
910 resulting catalog with extra information.
912 Parameters
913 ----------
914 exposure : `lsst.afw.exposure.Exposure`
915 The input exposure on which measurements are to be performed.
916 sources : `lsst.afw.table.SourceCatalog`
917 A catalog built from the results of merged detections, or
918 deblender outputs.
919 parentCatalog : `lsst.afw.table.SourceCatalog`
920 Catalog of parent sources corresponding to sources.
921 skyInfo : `lsst.pipe.base.Struct`
922 A struct containing information about the position of the input exposure within
923 a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box.
924 exposureId : `int` or `bytes`
925 Packed unique number or bytes unique to the input exposure.
926 ccdInputs : `lsst.afw.table.ExposureCatalog`, optional
927 Catalog containing information on the individual visits which went into making
928 the coadd.
929 sourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
930 Dict for sourceTable_visit handles (key is visit) for propagating flags.
931 These tables contain astrometry and photometry flags, and optionally PSF flags.
932 finalizedSourceTableHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
933 Dict for finalized_src_table handles (key is visit) for propagating flags.
934 These tables contain PSF flags from the finalized PSF estimation.
935 finalVisitSummaryHandleDict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`], optional
936 Dict for visit_summary handles (key is visit) for visit-level information.
937 These tables contain the WCS information of the single-visit input images.
938 apCorrMap : `lsst.afw.image.ApCorrMap`, optional
939 Aperture correction map attached to the ``exposure``. If None, it
940 will be read from the ``exposure``.
942 Returns
943 -------
944 results : `lsst.pipe.base.Struct`
945 Results of running measurement task. Will contain the catalog in the
946 sources attribute. Optionally will have results of matching to a
947 reference catalog in the matchResults attribute, and denormalized
948 matches in the denormMatches attribute.
949 """
950 if self.config.doPropagateFlags:
951 # These mask planes may not be defined on the coadds always.
952 # We add the mask planes, which is a no-op if already defined.
953 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpAnywhere:
954 exposure.mask.addMaskPlane(maskPlane)
955 for maskPlane in self.config.measurement.plugins["base_PixelFlags"].masksFpCenter:
956 exposure.mask.addMaskPlane(maskPlane)
958 self.measurement.run(sources, exposure, exposureId=exposureId)
960 if self.config.doApCorr:
961 if apCorrMap is None:
962 apCorrMap = exposure.getInfo().getApCorrMap()
963 self.applyApCorr.run(
964 catalog=sources,
965 apCorrMap=apCorrMap,
966 )
968 # TODO DM-11568: this contiguous check-and-copy could go away if we
969 # reserve enough space during SourceDetection and/or SourceDeblend.
970 # NOTE: sourceSelectors require contiguous catalogs, so ensure
971 # contiguity now, so views are preserved from here on.
972 if not sources.isContiguous():
973 sources = sources.copy(deep=True)
975 if self.config.doRunCatalogCalculation:
976 self.catalogCalculation.run(sources)
978 self.setPrimaryFlags.run(sources, skyMap=skyInfo.skyMap, tractInfo=skyInfo.tractInfo,
979 patchInfo=skyInfo.patchInfo)
980 if self.config.doPropagateFlags:
981 self.propagateFlags.run(
982 sources,
983 ccdInputs,
984 sourceTableHandleDict,
985 finalizedSourceTableHandleDict,
986 finalVisitSummaryHandleDict,
987 )
989 results = Struct()
991 # TODO[DM-47797]: remove this block
992 if self.config.doMatchSources:
993 matchResult = self.match.run(sources, exposure.getInfo().getFilter().bandLabel)
994 matches = afwTable.packMatches(matchResult.matches)
995 matches.table.setMetadata(matchResult.matchMeta)
996 results.matchResult = matches
997 if self.config.doWriteMatchesDenormalized:
998 if matchResult.matches:
999 denormMatches = denormalizeMatches(matchResult.matches, matchResult.matchMeta)
1000 else:
1001 self.log.warning("No matches, so generating dummy denormalized matches file")
1002 denormMatches = afwTable.BaseCatalog(afwTable.Schema())
1003 denormMatches.setMetadata(PropertyList())
1004 denormMatches.getMetadata().add("COMMENT",
1005 "This catalog is empty because no matches were found.")
1006 results.denormMatches = denormMatches
1007 results.denormMatches = denormMatches
1009 results.outputSources = sources
1010 return results