Coverage for python / lsst / meas / extensions / scarlet / scarletDeblendTask.py: 18%
513 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:58 +0000
1# This file is part of meas_extensions_scarlet.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import logging
25from dataclasses import dataclass
26from functools import partial
27import time
28from typing import cast
30import lsst.afw.detection as afwDet
31import lsst.afw.geom.ellipses as afwEll
32import lsst.afw.image as afwImage
33import lsst.afw.table as afwTable
34import lsst.pex.config as pexConfig
35import lsst.pipe.base as pipeBase
36import lsst.scarlet.lite as scl
37import numpy as np
38from lsst.scarlet.lite.detect_pybind11 import get_footprints
39from scipy import ndimage
40from lsst.utils.logging import PeriodicLogger
41from lsst.utils.timer import timeMethod
43from . import io, utils
44from .footprint import scarletFootprintToAfw
45from .source import IsolatedSource
47# Scarlet and proxmin have a different definition of log levels than the stack,
48# so even "warnings" occur far more often than we would like.
49# So for now we only display scarlet and proxmin errors, as all other
50# scarlet outputs would be considered "TRACE" by our standards.
51scarletLogger = logging.getLogger("scarlet")
52scarletLogger.setLevel(logging.ERROR)
53proxminLogger = logging.getLogger("proxmin")
54proxminLogger.setLevel(logging.ERROR)
56__all__ = ["deblend", "ScarletDeblendContext", "ScarletDeblendConfig", "ScarletDeblendTask"]
58logger = logging.getLogger(__name__)
61class DeblenderError(Exception):
62 """Exception raised when the deblender fails.
64 This is used to catch errors in the deblender and set the appropriate flags
65 on the parent source.
66 """
68 def __init__(
69 self,
70 message: str,
71 parent: afwTable.source.SourceRecord,
72 errorName: str,
73 ):
74 super().__init__(message)
75 self.message = message
76 self.parent = parent
77 self.errorName = errorName
79 def __str__(self) -> str:
80 return f"DeblenderError: {self.args[0]} (parent: {self.parent})"
83class DeblenderSkippedError(Exception):
84 """Exception raised when a blend is skipped.
86 This is used to catch cases where the deblender does not process
87 a deconvolved parent because it is skipped for some reason.
88 """
89 def __init__(self, message: str, parent: afwTable.source.SourceRecord, skipKey):
90 super().__init__(message)
91 self.message = message
92 self.parent = parent
93 self.skipKey = skipKey
95 def __str__(self) -> str:
96 return f"DeblenderSkippedError: {self.args[0]} (parent: {self.parent}, skipKey: {self.skipKey})"
99def _checkBlendConvergence(blend: scl.Blend, f_rel: float) -> bool:
100 """Check whether or not a blend has converged"""
101 deltaLoss = np.abs(blend.loss[-2] - blend.loss[-1])
102 convergence = f_rel * np.abs(blend.loss[-1])
103 return deltaLoss < convergence
106def isPseudoSource(source: afwTable.source.SourceRecord, pseudoColumns: list[str]) -> bool:
107 """Check if a source is a pseudo source.
109 This is mostly for skipping sky objects,
110 but any other column can also be added to disable
111 deblending on a parent or individual source when
112 set to `True`.
114 Parameters
115 ----------
116 source :
117 The source to check for the pseudo bit.
118 pseudoColumns :
119 A list of columns to check for pseudo sources.
120 """
121 isPseudo = False
122 for col in pseudoColumns:
123 try:
124 isPseudo |= source[col]
125 except KeyError:
126 pass
127 return isPseudo
130def _getDeconvolvedFootprints(
131 mDeconvolved: afwImage.MultibandExposure,
132 sources: afwTable.SourceCatalog,
133 config: ScarletDeblendConfig,
134) -> tuple[list[scl.detect.Footprint], scl.Image]:
135 """Detect footprints in the deconvolved image
137 Parameters
138 ----------
139 mDeconvolved :
140 The deconvolved multiband exposure to detect footprints in.
141 sources :
142 The source catalog for the entire coadd.
143 config :
144 The configuration for the deblender.
146 Returns
147 -------
148 footprints :
149 The detected footprints in the deconvolved image.
150 footprintImage :
151 A footprint image as returned by `scarlet.detect.footprints_to_image`.
152 """
153 bbox = mDeconvolved.getBBox()
154 xmin, ymin = bbox.getMin()
155 sigma = np.nanmedian(np.sqrt(mDeconvolved.variance.array), axis=(1, 2))
156 detect = np.nansum(mDeconvolved.image.array/sigma[:, None, None], axis=0)
158 # We don't use the variance here because testing in DM-47738
159 # has shown that we get better results without it
160 # (we're detecting footprints without peaks in the deconvolved,
161 # noise reduced image).
162 footprints = scl.detect.detect_footprints(
163 images=np.array([detect]),
164 variance=np.ones((1, detect.shape[0], detect.shape[1]), dtype=detect.dtype),
165 scales=1,
166 min_area=config.minDeconvolvedArea,
167 footprint_thresh=config.footprintSNRThresh,
168 find_peaks=False,
169 origin=(ymin, xmin)
170 )
172 # Create an indexed image of the footprints so that the value of a pixel
173 # gives the index + 1 of the footprints that contain that pixel.
174 scarletBox = utils.bboxToScarletBox(bbox)
175 footprintImage = scl.detect.footprints_to_image(footprints, scarletBox)
177 # Ensure that there is a minimal footprint for all peaks
178 # in the detection catalog.
179 footprintArray = footprintImage.data > 0
180 for source in sources:
181 for peak in source.getFootprint().peaks:
182 x, y = peak.getI()
183 footprintArray[y - ymin, x - xmin] = True
185 # Grow the footprints
186 psfMinimalSize = config.growSize * 2 + 1
187 detectionArray = ndimage.binary_dilation(
188 footprintArray,
189 scl.utils.get_circle_mask(psfMinimalSize, bool)
190 )
192 # Ensure that all of the deconvolved footprints are contained within
193 # a single footprint from the input catalog.
194 # This should be unecessary, however creating entries in the output
195 # catalog will produce unexpected results if a deconvolved footprint
196 # is in more than one footprint from the source catalog or has
197 # flux outside of its parent footprint.
198 sourceImage = afwDet.footprintsToNumpy(sources, shape=detect.shape, xy0=(xmin, ymin))
199 detectionArray = detectionArray * sourceImage
201 footprints = get_footprints(
202 image=detectionArray.astype(np.float32),
203 min_separation=0,
204 min_area=1,
205 peak_thresh=0,
206 footprint_thresh=0,
207 find_peaks=False,
208 y0=ymin,
209 x0=xmin,
210 )
212 footprintImage = scl.detect.footprints_to_image(footprints, scarletBox)
214 return footprints, footprintImage
217@dataclass(kw_only=True)
218class ScarletDeblendContext:
219 """Context with parameters and config options for deblending
221 Attributes
222 ----------
223 monotonicity :
224 The monotonicity operator.
225 observation :
226 The observation for the entire coadd.
227 deconvolved :
228 The deconvolved image.
229 residual :
230 The residual image
231 (observation - deconvolved mode convolved with difference kernel).
232 footprints :
233 The footprints in the deconvolved image.
234 footprintImage :
235 An indexed image of the scarlet footprints so that the value
236 of a pixel gives the index + 1 of the footprints that
237 contain that pixel.
238 config :
239 The configuration for the deblender.
240 """
241 monotonicity: scl.operators.Monotonicity
242 observation: scl.Observation
243 deconvolved: scl.Image
244 footprints: list[scl.Footprint]
245 footprintImage: scl.Image
246 config: ScarletDeblendConfig
248 @staticmethod
249 def build(
250 mExposure: afwImage.MultibandExposure,
251 mDeconvolved: afwImage.MultibandExposure,
252 catalog: afwTable.SourceCatalog,
253 config: ScarletDeblendConfig,
254 ) -> ScarletDeblendContext:
255 """Build the context from a minimal set of inputs
257 Parameters
258 ----------
259 mExposure :
260 The multiband exposure for the entire coadd.
261 mDeconvolved :
262 The deconvolved multiband exposure for the entire coadd.
263 catalog :
264 The parent catalog for the entire coadd.
265 config :
266 The configuration for the deblender.
267 """
268 # The PSF of the model in the deconvolved space
269 modelPsf = scl.utils.integrated_circular_gaussian(
270 sigma=config.modelPsfSigma
271 )
272 # Initialize the monotonicity operator with a size of 101 x 101 pixels.
273 # Note: If a component is > 101x101 in either axis then the
274 # monotonicity operator will resize itself.
275 monotonicity = scl.operators.Monotonicity((101, 101))
276 # Build the observation for the entire coadd
277 observation = utils.buildObservation(
278 modelPsf=modelPsf,
279 psfCenter=mExposure.getBBox().getCenter(),
280 mExposure=mExposure,
281 badPixelMasks=config.badMask,
282 useWeights=config.useWeights,
283 convolutionType=config.convolutionType,
284 catalog=catalog,
285 )
287 # Create the deconvolved image
288 yx0 = observation.images.yx0
289 bands = observation.images.bands
290 deconvolved = scl.Image(mDeconvolved.image.array, bands=mDeconvolved.bands, yx0=yx0)
291 if len(bands) == 1:
292 deconvolved = deconvolved[bands[0]:]
293 else:
294 deconvolved = deconvolved[bands]
296 # Detect footprints in the deconvolved image
297 footprints, footprintImage = _getDeconvolvedFootprints(
298 mDeconvolved=mDeconvolved,
299 sources=catalog,
300 config=config,
301 )
303 return ScarletDeblendContext(
304 monotonicity=monotonicity,
305 observation=observation,
306 deconvolved=deconvolved,
307 footprints=footprints,
308 footprintImage=footprintImage,
309 config=config,
310 )
313def deblend(
314 context: ScarletDeblendContext,
315 footprint: afwDet.Footprint,
316 config: ScarletDeblendConfig,
317 spectrumInit: bool = True,
318) -> scl.Blend:
319 """Deblend a parent footprint
321 Parameters
322 ----------
323 context :
324 Context with parameters and config options for deblending
325 footprint :
326 The parent footprint to deblend
327 config :
328 The configuration for the deblender
329 spectrumInit :
330 Whether or not to initialize the sources with their best-fit spectra
332 Returns
333 -------
334 blend : `scarlet.lite.Blend`
335 The blend this is to be deblended
336 skippedSources : `list[int]`
337 Indices of sources that were skipped due to no flux.
338 This usually means that a source was a spurrious detection in one
339 band that should not have been included in the merged catalog.
340 skippedBands : `list[str]`
341 Bands that were skipped because a PSF could not be generated for them.
342 """
343 # Define the bounding boxes in lsst.geom.Box2I and lsst.scarlet.lite.Box
344 footBox = footprint.getBBox()
345 bbox = utils.bboxToScarletBox(footBox)
347 # Extract the observation that covers the footprint and make
348 # a copy so that the changes don't affect the original observation.
349 observation = context.observation[:, bbox].copy()
350 footprintData = footprint.spans.asArray()
352 # Mask the pixels outside of the footprint
353 observation.weights.data[:] *= footprintData
355 # Convert the peaks into an array
356 peaks = [
357 np.array([peak.getIy(), peak.getIx()], dtype=int)
358 for peak in footprint.peaks
359 if not isPseudoSource(peak, config.pseudoColumns)
360 ]
362 detect_image = np.sum(context.deconvolved[:, bbox].data, axis=0)
364 # Initialize the sources
365 sources = scl.initialization.FactorizedInitialization(
366 observation=observation,
367 centers=peaks,
368 detect=detect_image,
369 min_snr=config.minSNR,
370 monotonicity=context.monotonicity,
371 bg_thresh=config.backgroundThresh,
372 initial_bg_thresh=config.initialBackgroundThresh,
373 ).sources
375 blend = scl.Blend(sources, observation)
377 # Initialize each source with its best fit spectrum
378 if spectrumInit:
379 try:
380 blend.fit_spectra()
381 except Exception as e:
382 # If the spectrum initialization fails, we will just skip it
383 # and use the default spectrum.
384 logger.warning(
385 "Spectrum initialization failed with error: %s", e, exc_info=True
386 )
388 # Set the optimizer
389 if config.optimizer == "adaprox":
390 blend.parameterize(
391 partial(
392 scl.component.default_adaprox_parameterization,
393 noise_rms=observation.noise_rms / 10,
394 )
395 )
396 elif config.optimizer == "fista":
397 blend.parameterize(scl.component.default_fista_parameterization)
398 else:
399 raise ValueError("Unrecognized optimizer. Must be either 'adaprox' or 'fista'.")
401 if config.maxIter > 0:
402 blend.fit(
403 max_iter=config.maxIter,
404 e_rel=config.relativeError,
405 min_iter=config.minIter,
406 resize=config.resizeFrequency,
407 )
408 else:
409 loss = (blend.observation.images - blend.get_model(convolve=True)).data
410 blend.loss = [np.sum(loss), np.sum(loss)]
412 # Attach the peak to all of the initialized sources
413 for k, center in enumerate(peaks):
414 # This is just to make sure that there isn't a coding bug
415 if len(sources[k].components) > 0 and np.any(sources[k].center != center):
416 raise ValueError(
417 f"Misaligned center, expected {center} but got {sources[k].center}"
418 )
419 # Store the record for the peak with the appropriate source
420 sources[k].detectedPeak = footprint.peaks[k]
422 return blend
425class ScarletDeblendConfig(pexConfig.Config):
426 """MultibandDeblendConfig
428 Configuration for the multiband deblender.
429 The parameters are organized by the parameter types, which are
430 - Stopping Criteria: Used to determine if the fit has converged
431 - Position Fitting Criteria: Used to fit the positions of the peaks
432 - Constraints: Used to apply constraints to the peaks and their components
433 - Other: Parameters that don't fit into the above categories
434 """
436 # Stopping Criteria
437 minIter = pexConfig.Field[int](
438 default=5,
439 doc="Minimum number of iterations before the optimizer is allowed to stop.",
440 )
441 maxIter = pexConfig.Field[int](
442 default=300,
443 doc=("Maximum number of iterations to deblend a single parent"),
444 )
445 relativeError = pexConfig.Field[float](
446 default=1e-3,
447 doc=(
448 "Change in the loss function between iterations to exit fitter. "
449 "Typically this is `1e-3` if measurements will be made on the "
450 "flux re-distributed models and `1e-4` when making measurements "
451 "on the models themselves."
452 ),
453 )
454 resizeFrequency = pexConfig.Field[int](
455 default=3,
456 doc="Number of iterations between resizing sources.",
457 )
459 # Constraints
460 morphThresh = pexConfig.Field[float](
461 default=1,
462 doc="Fraction of background RMS a pixel must have"
463 "to be included in the initial morphology",
464 )
465 # Lite Parameters
466 # All of these parameters (except version) are only valid if version='lite'
467 version = pexConfig.ChoiceField[str](
468 default="lite",
469 allowed={
470 "lite": "LSST optimized version of scarlet for survey data from a single instrument",
471 },
472 doc="The version of scarlet to use.",
473 deprecated="This field is deprecated since the ony available `version` is `lite` "
474 "and will be removed after v29.0",
475 )
476 optimizer = pexConfig.ChoiceField[str](
477 default="adaprox",
478 allowed={
479 "adaprox": "Proximal ADAM optimization",
480 "fista": "Accelerated proximal gradient method",
481 },
482 doc="The optimizer to use for fitting parameters.",
483 )
484 morphImage = pexConfig.ChoiceField[str](
485 default="chi2",
486 allowed={
487 "chi2": "Initialize sources on a chi^2 image made from all available bands",
488 },
489 doc="The type of image to use for initializing the morphology. "
490 "Must be either 'chi2' or 'wavelet'. ",
491 deprecated="This field is deprecated since testing has shown that only 'chi2' should be used "
492 "and 'wavelet' has been broken since v27.0. "
493 "This field will be removed in v29.0",
494 )
495 backgroundThresh = pexConfig.Field[float](
496 default=1.0,
497 doc="Fraction of background to use for a sparsity threshold. "
498 "This prevents sources from growing unrealistically outside "
499 "the parent footprint while still modeling flux correctly "
500 "for bright sources.",
501 )
502 initialBackgroundThresh = pexConfig.Field[float](
503 default=1.0,
504 doc="Same as `backgroundThresh` but used only for source initialization.",
505 )
506 maxProxIter = pexConfig.Field[int](
507 default=1,
508 doc="Maximum number of proximal operator iterations inside of each "
509 "iteration of the optimizer. "
510 "This config field is only used if version='lite' and optimizer='adaprox'.",
511 )
512 waveletScales = pexConfig.Field[int](
513 default=5,
514 doc="Number of wavelet scales to use for wavelet initialization. "
515 "This field is only used when `version`='lite' and `morphImage`='wavelet'.",
516 deprecated="This field is deprecated along with `morphImage` and will be removed in v29.0.",
517 )
519 # Other scarlet paremeters
520 useWeights = pexConfig.Field[bool](
521 default=True,
522 doc=(
523 "Whether or not use use inverse variance weighting."
524 "If `useWeights` is `False` then flat weights are used"
525 ),
526 )
527 modelPsfSize = pexConfig.Field[int](
528 default=11, doc="Model PSF side length in pixels"
529 )
530 modelPsfSigma = pexConfig.Field[float](
531 default=0.8, doc="Define sigma for the model frame PSF"
532 )
533 minSNR = pexConfig.Field[float](
534 default=50,
535 doc="Minimum Signal to noise to accept the source."
536 "Sources with lower flux will be initialized with the PSF but updated "
537 "like an ordinary ExtendedSource (known in scarlet as a `CompactSource`).",
538 )
539 saveTemplates = pexConfig.Field[bool](
540 default=True, doc="Whether or not to save the SEDs and templates"
541 )
542 processSingles = pexConfig.Field[bool](
543 default=False,
544 doc="Whether or not to process isolated sources in the deblender",
545 )
546 persistIsolated = pexConfig.Field[bool](
547 default=True,
548 doc="Whether or not to persist isolated sources in the scarlet models",
549 )
550 convolutionType = pexConfig.Field[str](
551 default="fft",
552 doc="Type of convolution to render the model to the observations.\n"
553 "- 'fft': perform convolutions in Fourier space\n"
554 "- 'real': peform convolutions in real space.",
555 )
556 sourceModel = pexConfig.Field[str](
557 default="double",
558 doc=(
559 "How to determine which model to use for sources, from\n"
560 "- 'single': use a single component for all sources\n"
561 "- 'double': use a bulge disk model for all sources\n"
562 "- 'compact': use a single component model, initialzed with a point source morphology, "
563 " for all sources\n"
564 "- 'point': use a point-source model for all sources\n"
565 "- 'fit: use a PSF fitting model to determine the number of components (not yet implemented)"
566 ),
567 deprecated="This field will be deprecated when the default for `version` is changed to `lite`.",
568 )
569 setSpectra = pexConfig.Field[bool](
570 default=True,
571 doc="Whether or not to solve for the best-fit spectra during initialization. "
572 "This makes initialization slightly longer, as it requires a convolution "
573 "to set the optimal spectra, but results in a much better initial log-likelihood "
574 "and reduced total runtime, with convergence in fewer iterations."
575 "This option is only used when "
576 "peaks*area < `maxSpectrumCutoff` will use the improved initialization.",
577 )
578 footprintSNRThresh = pexConfig.Field[float](
579 default=5.0,
580 doc="Minimum SNR for a pixel to be detected in a footprint.",
581 )
582 growSize = pexConfig.Field[int](
583 default=2,
584 doc="Number of pixels to grow the deconvolved footprints before final detection.",
585 )
587 # Mask-plane restrictions
588 badMask = pexConfig.ListField[str](
589 default=utils.defaultBadPixelMasks,
590 doc="Whether or not to process isolated sources in the deblender",
591 )
592 statsMask = pexConfig.ListField[str](
593 default=["SAT", "INTRP", "NO_DATA"],
594 doc="Mask planes to ignore when performing statistics",
595 )
596 maskLimits = pexConfig.DictField(
597 keytype=str,
598 itemtype=float,
599 default={},
600 doc=(
601 "Mask planes with the corresponding limit on the fraction of masked pixels. "
602 "Sources violating this limit will not be deblended. "
603 "If the fraction is `0` then the limit is a single pixel."
604 ),
605 )
606 minDeconvolvedArea = pexConfig.Field[int](
607 default=9,
608 doc="Minimum area for a single footprint in the deconvolved image. "
609 "Detected footprints smaller than this will not be created.",
610 )
612 # Size restrictions
613 maxNumberOfPeaks = pexConfig.Field[int](
614 default=600,
615 doc=(
616 "Only deblend the brightest maxNumberOfPeaks peaks in the parent"
617 " (<= 0: unlimited)"
618 ),
619 )
620 maxFootprintArea = pexConfig.Field[int](
621 default=2_000_000,
622 doc=(
623 "Maximum area for footprints before they are ignored as large; "
624 "non-positive means no threshold applied"
625 ),
626 )
627 maxAreaTimesPeaks = pexConfig.Field[int](
628 default=1_000_000_000,
629 doc=(
630 "Maximum rectangular footprint area * nPeaks in the footprint. "
631 "This was introduced in DM-33690 to prevent fields that are crowded or have a "
632 "LSB galaxy that causes memory intensive initialization in scarlet from dominating "
633 "the overall runtime and/or causing the task to run out of memory. "
634 "(<= 0: unlimited)"
635 ),
636 )
637 maxFootprintSize = pexConfig.Field[int](
638 default=0,
639 doc=(
640 "Maximum linear dimension for footprints before they are ignored "
641 "as large; non-positive means no threshold applied"
642 ),
643 )
644 minFootprintAxisRatio = pexConfig.Field[float](
645 default=0.0,
646 doc=(
647 "Minimum axis ratio for footprints before they are ignored "
648 "as large; non-positive means no threshold applied"
649 ),
650 )
651 maxSpectrumCutoff = pexConfig.Field[int](
652 default=1_000_000,
653 doc=(
654 "Maximum number of pixels * number of sources in a blend. "
655 "This is different than `maxFootprintArea` because this isn't "
656 "the footprint area but the area of the bounding box that "
657 "contains the footprint, and is also multiplied by the number of"
658 "sources in the footprint. This prevents large skinny blends with "
659 "a high density of sources from running out of memory. "
660 "If `maxSpectrumCutoff == -1` then there is no cutoff."
661 ),
662 )
663 # Failure modes
664 fallback = pexConfig.Field[bool](
665 default=True,
666 doc="Whether or not to fallback to a smaller number of components if a source does not initialize",
667 )
668 notDeblendedMask = pexConfig.Field[str](
669 default="NOT_DEBLENDED",
670 optional=True,
671 doc="Mask name for footprints not deblended, or None",
672 )
673 catchFailures = pexConfig.Field[bool](
674 default=True,
675 doc=(
676 "If True, catch exceptions thrown by the deblender, log them, "
677 "and set a flag on the parent, instead of letting them propagate up"
678 ),
679 )
681 # Other options
682 columnInheritance = pexConfig.DictField(
683 keytype=str,
684 itemtype=str,
685 default={
686 "deblend_nChild": "deblend_parentNChild",
687 "deblend_nPeaks": "deblend_parentNPeaks",
688 },
689 doc="Columns to pass from the parent to the child. "
690 "This is no longer used since the object and parent catalogs contain different columns.",
691 deprecated="This field is deprecated along with `morphImage` and will be removed after v30.0.",
692 )
693 pseudoColumns = pexConfig.ListField[str](
694 default=["merge_peak_sky", "sky_source"],
695 doc="Names of flags which should never be deblended.",
696 )
697 measureParents = pexConfig.Field[bool](
698 default=False,
699 doc="Whether to add parents to the object catalog for measurement in downstream tasks.",
700 )
702 # Testing options
703 # Some obs packages and ci packages run the full pipeline on a small
704 # subset of data to test that the pipeline is functioning properly.
705 # This is not meant as scientific validation, so it can be useful
706 # to only run on a small subset of the data that is large enough to
707 # test the desired pipeline features but not so long that the deblender
708 # is the tall pole in terms of execution times.
709 useCiLimits = pexConfig.Field[bool](
710 default=False,
711 doc="Limit the number of sources deblended for CI to prevent long build times",
712 )
713 ciDeblendChildRange = pexConfig.ListField[int](
714 default=[5, 10],
715 doc="Only deblend parent Footprints with a number of peaks in the (inclusive) range indicated."
716 "If `useCiLimits==False` then this parameter is ignored.",
717 )
718 ciNumParentsToDeblend = pexConfig.Field[int](
719 default=10,
720 doc="Only use the first `ciNumParentsToDeblend` parent footprints with a total peak count "
721 "within `ciDebledChildRange`. "
722 "If `useCiLimits==False` then this parameter is ignored.",
723 )
726class ScarletDeblendTask(pipeBase.Task):
727 """ScarletDeblendTask
729 Split blended sources into individual sources.
731 This task has no return value; it only modifies the SourceCatalog in-place.
732 """
734 ConfigClass = ScarletDeblendConfig
735 _DefaultName = "scarletDeblend"
737 def __init__(
738 self,
739 schema: afwTable.Schema,
740 peakSchema: afwTable.Schema = None,
741 **kwargs
742 ):
743 """Create the task, adding necessary fields to the given schema.
745 Parameters
746 ----------
747 schema :
748 Schema object for measurement fields; will be modified in-place.
749 peakSchema :
750 Schema of Footprint Peaks that will be passed to the deblender.
751 Any fields beyond the PeakTable minimal schema will be transferred
752 to the main source Schema. If None, no fields will be transferred
753 from the Peaks.
754 **kwargs
755 Passed to Task.__init__.
756 """
757 pipeBase.Task.__init__(self, **kwargs)
759 peakMinimalSchema = afwDet.PeakTable.makeMinimalSchema()
760 if peakSchema is None:
761 # In this case, the peakSchemaMapper will transfer nothing, but
762 # we'll still have one
763 # to simplify downstream code
764 self.peakSchemaMapper = afwTable.SchemaMapper(peakMinimalSchema, schema)
765 else:
766 self.peakSchemaMapper = afwTable.SchemaMapper(peakSchema, schema)
767 for item in peakSchema:
768 if item.key not in peakMinimalSchema:
769 self.peakSchemaMapper.addMapping(item.key, item.field)
770 # Because SchemaMapper makes a copy of the output schema
771 # you give its ctor, it isn't updating this Schema in
772 # place. That's probably a design flaw, but in the
773 # meantime, we'll keep that schema in sync with the
774 # peakSchemaMapper.getOutputSchema() manually, by adding
775 # the same fields to both.
776 schema.addField(item.field)
777 assert (
778 schema == self.peakSchemaMapper.getOutputSchema()
779 ), "Logic bug mapping schemas"
781 # Add the parent keys to the parent catalog schema
782 self.parentSchemaMapper = afwTable.SchemaMapper(schema)
783 self.parentSchemaMapper.addMinimalSchema(schema, doMap=True)
784 parentOutSchema = self.parentSchemaMapper.editOutputSchema()
785 self._addParentSchemaKeys(parentOutSchema)
786 self.parentSchema = parentOutSchema
787 self.parentPeakSchemaMapper = afwTable.SchemaMapper(peakMinimalSchema, self.parentSchema)
789 # Add keys for isolated sources and deblended children to the schema.
790 self._addChildSchemaKeys(schema)
791 self.objectSchema = schema
792 self.objectPeakSchemaMapper = afwTable.SchemaMapper(peakMinimalSchema, self.objectSchema)
793 self.toCopyFromParent = [
794 name
795 for item in self.objectSchema
796 if ((name := item.field.getName()).startswith("merge_footprint")
797 or (name := item.field.getName()).startswith("merge_peak"))
798 ]
800 def _addParentSchemaKeys(self, schema: afwTable.Schema):
801 """Add parent specific keys to the schema"""
802 # Parent (blend) fields
803 schema.addField(
804 "deblend_runtime", type=np.float32, doc="runtime in ms"
805 )
806 schema.addField(
807 "deblend_iterations", type=np.int32, doc="iterations to converge"
808 )
809 schema.addField(
810 "deblend_nChild",
811 type=np.int32,
812 doc="Number of children this object has (defaults to 0)",
813 )
814 schema.addField(
815 "deblend_nPeaks",
816 type=np.int32,
817 doc="Number of initial peaks in the blend. "
818 "This includes peaks that may have been culled "
819 "during deblending or failed to deblend",
820 )
821 schema.addField(
822 "deblend_spectrumInitFlag",
823 type="Flag",
824 doc="True when scarlet initializes sources "
825 "in the blend with a more accurate spectrum. "
826 "The algorithm uses a lot of memory, "
827 "so large dense blends will use "
828 "a less accurate initialization.",
829 )
830 # Skipped flags
831 schema.addField(
832 "deblend_skipped", type="Flag", doc="Deblender skipped this source"
833 )
834 schema.addField(
835 "deblend_skipped_isolatedParent",
836 type="Flag",
837 doc="The source has only a single peak " "and was not deblended",
838 )
839 schema.addField(
840 "deblend_skipped_isPseudo",
841 type="Flag",
842 doc='The source is identified as a "pseudo" source and '
843 "was not deblended",
844 )
845 schema.addField(
846 "deblend_skipped_tooManyPeaks",
847 type="Flag",
848 doc="Source had too many peaks; " "only the brightest were included",
849 )
850 schema.addField(
851 "deblend_skipped_parentTooBig",
852 type="Flag",
853 doc="Parent footprint covered too many pixels",
854 )
855 schema.addField(
856 "deblend_skipped_masked",
857 type="Flag",
858 doc="Parent footprint had too many masked pixels",
859 )
860 # Convergence flags
861 schema.addField(
862 "deblend_blendConvergenceFailedFlag",
863 type="Flag",
864 doc="at least one source in the blend" "failed to converge",
865 )
866 # Error flags
867 schema.addField(
868 "deblend_failed", type="Flag", doc="Deblending failed on source"
869 )
870 schema.addField(
871 "deblend_childFailed",
872 type="Flag",
873 doc="Deblending failed on at least one child blend. "
874 " This is set in a parent when at least one of its children "
875 "is a blend that failed to deblend.",
876 )
877 schema.addField(
878 "deblend_error",
879 type="String",
880 size=25,
881 doc="Name of error if the blend failed",
882 )
883 schema.addField(
884 "deblend_incompleteData",
885 type="Flag",
886 doc="True when a blend has at least one band "
887 "that could not generate a PSF and was "
888 "not included in the model.",
889 )
890 schema.addField(
891 "deblend_nComponents",
892 type=np.int32,
893 doc="Number of components in a ScarletLiteSource. "
894 "If `config.version != 'lite' then "
895 "this column is set to zero.",
896 )
897 schema.addField(
898 "deblend_chi2",
899 type=np.float32,
900 doc="Final reduced chi2 (per pixel), used to identify goodness of fit.",
901 )
903 def _addChildSchemaKeys(self, schema: afwTable.Schema):
904 """Add deblender specific keys to the schema"""
905 afwTable.Point2IKey.addFields(
906 schema,
907 name="deblend_peak_center",
908 doc="Center used to apply constraints in scarlet",
909 unit="pixel",
910 )
911 schema.addField(
912 "deblend_nChild",
913 type=np.int32,
914 doc="Number of children this object has (defaults to 0)",
915 )
916 schema.addField(
917 "deblend_peakId",
918 type=np.int32,
919 doc="ID of the peak in the parent footprint. "
920 "This is not unique, but the combination of 'parent'"
921 "and 'peakId' should be for all child sources. "
922 "Top level blends with no parents have 'peakId=0'",
923 )
924 schema.addField(
925 "deblend_peak_instFlux",
926 type=float,
927 units="count",
928 doc="The instFlux at the peak position of deblended mode",
929 )
930 schema.addField(
931 "deblend_scarletFlux", type=np.float32, doc="Flux measurement from scarlet"
932 )
933 schema.addField(
934 "deblend_edgePixels",
935 type="Flag",
936 doc="Source had flux on the edge of the parent footprint",
937 )
938 schema.addField(
939 "deblend_dataCoverage",
940 type=np.float32,
941 doc="Fraction of pixels with data. "
942 "In other words, 1 - fraction of pixels with NO_DATA set.",
943 )
944 schema.addField(
945 "deblend_zeroFlux", type="Flag", doc="Source has zero flux."
946 )
947 # Blendedness/classification metrics
948 schema.addField(
949 "deblend_maxOverlap",
950 type=np.float32,
951 doc="Maximum overlap with all of the other neighbors flux "
952 "combined."
953 "This is useful as a metric for determining how blended a "
954 "source is because if it only overlaps with other sources "
955 "at or below the noise level, it is likely to be a mostly "
956 "isolated source in the deconvolved model frame.",
957 )
958 schema.addField(
959 "deblend_fluxOverlap",
960 type=np.float32,
961 doc="This is the total flux from neighboring objects that "
962 "overlaps with this source.",
963 )
964 schema.addField(
965 "deblend_fluxOverlapFraction",
966 type=np.float32,
967 doc="This is the fraction of "
968 "`flux from neighbors/source flux` "
969 "for a given source within the source's"
970 "footprint.",
971 )
972 schema.addField(
973 "deblend_blendedness",
974 type=np.float32,
975 doc="The Bosch et al. 2018 metric for 'blendedness.' ",
976 )
977 schema.addField(
978 "deblend_blendId",
979 type=np.int64,
980 doc="Parents in the catalog may be subdivided by deblending "
981 "into multiple deconvolved blends that are each "
982 "deblended separately. This is the ID of the "
983 "deconvolved blend in the catalog."
984 )
985 schema.addField(
986 "deblend_blendNChild",
987 type=np.int32,
988 doc="The number of children in the deconvolved blend."
989 )
990 schema.addField(
991 "deblend_parentNPeaks",
992 type=np.int32,
993 doc="deblend_nPeaks from this records parent.",
994 )
995 schema.addField(
996 "deblend_parentNChild",
997 type=np.int32,
998 doc="deblend_nChild from this records parent.",
999 )
1000 schema.addField(
1001 "deblend_nComponents",
1002 type=np.int32,
1003 doc="Number of components in a ScarletLiteSource. "
1004 "If `config.version != 'lite'`then "
1005 "this column is set to zero.",
1006 )
1007 schema.addField(
1008 "deblend_chi2",
1009 type=np.float32,
1010 doc="Final reduced chi2 (per pixel), used to identify goodness of fit.",
1011 )
1013 @timeMethod
1014 def run(
1015 self,
1016 mExposure: afwImage.MultibandExposure,
1017 mDeconvolved: afwImage.MultibandExposure,
1018 mergedSources: afwTable.SourceCatalog,
1019 ) -> pipeBase.Struct:
1020 """Get the psf from each exposure and then run deblend().
1022 Parameters
1023 ----------
1024 mExposure :
1025 The exposures should be co-added images of the same
1026 shape and region of the sky.
1027 mDeconvolved :
1028 The deconvolved images of the same shape and region of the sky.
1029 mergedSources :
1030 The merged `SourceCatalog` that contains parent footprints
1031 to (potentially) deblend.
1033 Returns
1034 -------
1035 templateCatalogs: dict
1036 Keys are the names of the bands and the values are
1037 `lsst.afw.table.source.source.SourceCatalog`'s.
1038 These are catalogs with heavy footprints that are the templates
1039 created by the multiband templates.
1040 """
1041 # Create a table to hold object records
1042 table = afwTable.SourceTable.make(self.objectSchema)
1043 objectCatalog = afwTable.SourceCatalog(table)
1045 # Create a table to hold parent records
1046 table = afwTable.SourceTable.make(self.parentSchema, mergedSources.getIdFactory())
1047 parentCatalog = afwTable.SourceCatalog(table)
1048 parentCatalog.extend(mergedSources, self.parentSchemaMapper)
1050 return self.deblend(mExposure, mDeconvolved, objectCatalog, parentCatalog)
1052 @timeMethod
1053 def deblend(
1054 self,
1055 mExposure: afwImage.MultibandExposure,
1056 mDeconvolved: afwImage.MultibandExposure,
1057 objectCatalog: afwTable.SourceCatalog,
1058 parentCatalog: afwTable.SourceCatalog,
1059 ) -> pipeBase.Struct:
1060 """Deblend a data cube of multiband images
1062 Deblending iterates over sources from the input catalog,
1063 which are blends of peaks with overlapping PSFs (depth 0 parents).
1064 In many cases those footprints can be subdived into multiple
1065 deconvolved footprints, which have an intermediate
1066 parent record added to the catalog and are be deblended separately.
1067 All deblended peaks have a source record added to the catalog,
1068 each of which has a depth one greater than the parent.
1070 Parameters
1071 ----------
1072 mExposure :
1073 The exposures should be co-added images of the same
1074 shape and region of the sky.
1075 mDeconvolved :
1076 The deconvolved images of the same shape and region of the sky.
1077 objectCatalog :
1078 An empty `SourceCatalog` with the schema to hold isolated sources
1079 and deblended children. This catalog is filled in place.
1080 parentCatalog :
1081 The merged `SourceCatalog` that contains parent footprints
1082 to (potentially) deblend. If a parent is subdivided into
1083 multiple deconvolved parents, the deconvolved parents are
1084 added to this catalog in place.
1086 Returns
1087 -------
1088 deblendedCatalog :
1089 The ``deblendedCatalog`` isolated and deblended child sources.
1090 scarletModelData :
1091 The persistable data model for the deblender.
1092 objectParents :
1093 The parent catalog with deconvolved parents added.
1094 """
1096 # Cull footprints if required by ci
1097 if self.config.useCiLimits:
1098 self.log.info(
1099 "Using CI catalog limits, the original number of sources to deblend was %d.",
1100 len(objectCatalog),
1101 )
1102 # Select parents with a number of children in the range
1103 # config.ciDeblendChildRange
1104 minChildren, maxChildren = self.config.ciDeblendChildRange
1105 nPeaks = np.array([len(src.getFootprint().peaks) for src in parentCatalog])
1106 childrenInRange = np.where(
1107 (nPeaks >= minChildren) & (nPeaks <= maxChildren)
1108 )[0]
1109 if len(childrenInRange) < self.config.ciNumParentsToDeblend:
1110 raise ValueError(
1111 "Fewer than ciNumParentsToDeblend children were contained in the range "
1112 "indicated by ciDeblendChildRange. Adjust this range to include more "
1113 "parents."
1114 )
1115 # Keep all of the isolated parents and the first
1116 # `ciNumParentsToDeblend` children
1117 parents = nPeaks == 1
1118 children = np.zeros((len(parentCatalog),), dtype=bool)
1119 children[childrenInRange[: self.config.ciNumParentsToDeblend]] = True
1120 parentCatalog = parentCatalog[parents | children]
1121 # We need to update the IdFactory, otherwise the the source ids
1122 # will not be sequential
1123 idFactory = parentCatalog.getIdFactory()
1124 maxId = np.max(parentCatalog["id"])
1125 idFactory.notify(maxId)
1126 del children
1128 self.log.info(
1129 "Deblending %d sources in %d exposure bands", len(parentCatalog), len(mExposure),
1130 )
1131 periodicLog = PeriodicLogger(self.log)
1133 # Add the NOT_DEBLENDED mask to the mask plane in each band
1134 if self.config.notDeblendedMask:
1135 for mask in mExposure.mask:
1136 mask.addMaskPlane(self.config.notDeblendedMask)
1138 # Create the context for the entire coadd
1139 context = ScarletDeblendContext.build(
1140 mExposure=mExposure,
1141 mDeconvolved=mDeconvolved,
1142 config=self.config,
1143 catalog=parentCatalog,
1144 )
1145 nBands = len(context.observation.bands)
1147 # Initialize the persistable ScarletModelData object
1148 modelData = io.LsstScarletModelData(metadata={
1149 "model_psf": context.observation.model_psf[0],
1150 "psf": context.observation.psfs,
1151 "bands": context.observation.bands,
1152 })
1154 if self.config.persistIsolated:
1155 # Add isolated sources to the model data
1156 for source in parentCatalog:
1157 if (
1158 len(source.getFootprint().peaks) == 1
1159 and not isPseudoSource(source, self.config.pseudoColumns)
1160 ):
1161 isolated = IsolatedSource.from_footprint(
1162 footprint=source.getFootprint(),
1163 mCoadd=mExposure,
1164 dtype=np.float32,
1165 )
1166 modelData.isolated[source.getId()] = isolated.to_data()
1168 # Attach full image objects to the task to simplify the API
1169 # and use for debugging.
1170 self.objectCatalog = objectCatalog
1171 self.parentCatalog = parentCatalog
1172 self.context = context
1173 self.modelData = modelData
1174 self.mExposure = mExposure
1176 # Subdivide the psf blended parents into deconvolved parents
1177 # using the deconvolved footprints stored in the context.
1178 nParents = len(parentCatalog)
1179 self._initializeCatalogs(
1180 objectCatalog,
1181 parentCatalog,
1182 context,
1183 )
1184 nBlends = len(parentCatalog)
1186 self.log.info(
1187 "Subdivided %d top level parents to create %d deconvolved parents.",
1188 nParents,
1189 nBlends-nParents,
1190 )
1192 # Deblend sources
1193 for parentIndex in range(nParents):
1194 # Log a message if it has been a while since the last log.
1195 periodicLog.log(
1196 "Deblended %d out of %d parents",
1197 parentIndex,
1198 nParents,
1199 )
1201 parentRecord = parentCatalog[parentIndex]
1202 blendRecords = parentCatalog[parentCatalog["parent"] == parentRecord.getId()]
1204 if len(blendRecords) == 0:
1205 # The source is isolated.
1206 if self.config.processSingles:
1207 blendRecords = [parentRecord]
1208 else:
1209 continue
1211 self.log.trace(
1212 "Split parent %d into %d deconvolved parents",
1213 parentRecord.getId(),
1214 len(blendRecords),
1215 )
1216 # Create an image to keep track of the cumulative model
1217 # for all sub blends in the parent footprint.
1218 parentFootprint = parentRecord.getFootprint()
1219 bbox = parentFootprint.getBBox()
1220 width, height = bbox.getDimensions()
1221 x0, y0 = bbox.getMin()
1222 emptyModel = np.zeros(
1223 (nBands, height, width),
1224 dtype=mExposure.image.array.dtype,
1225 )
1226 parentModel = scl.Image(
1227 emptyModel,
1228 bands=context.observation.images.bands,
1229 yx0=(y0, x0),
1230 )
1232 sourceRecords = []
1233 parentBlends = {}
1234 for blendRecord in blendRecords:
1235 # Log a message if it has been a while since the last log.
1236 periodicLog.log(
1237 "Deblended %d out of %d parents",
1238 parentIndex,
1239 nParents,
1240 )
1241 try:
1242 blend, blendModel, chi2 = self._deblendParent(blendRecord)
1243 except DeblenderSkippedError as e:
1244 self._skipBlend(blendRecord, e.skipKey, e.message)
1245 parentRecord.set("deblend_skipped", True)
1246 parentRecord.set(e.skipKey, True)
1247 continue
1248 except DeblenderError as e:
1249 blendRecord.set("deblend_error", e.errorName)
1250 blendRecord.set("deblend_failed", True)
1251 self._skipBlend(blendRecord, "deblend_failed", e.message)
1252 parentRecord.set("deblend_childFailed", True)
1253 continue
1255 # Update the parent model
1256 parentModel.insert(blendModel)
1258 # Add each deblended source to the catalog
1259 for scarletSource in blend.sources:
1260 # Add all fields except the HeavyFootprint to the
1261 # source record
1262 scarletSource.metadata = {}
1263 scarletSource.metadata["peak_id"] = scarletSource.detectedPeak.getId()
1264 sourceRecord = self._addDeblendedSource(
1265 parentId=parentRecord.getId(),
1266 blendRecord=blendRecord,
1267 peak=scarletSource.detectedPeak,
1268 objectCatalog=objectCatalog,
1269 scarletSource=scarletSource,
1270 chi2=chi2,
1271 )
1272 scarletSource.metadata["id"] = sourceRecord.getId()
1273 sourceRecords.append(sourceRecord)
1275 # Store the blend information so that it can be persisted
1276 blendData = blend.to_data()
1277 parentBlends[blendRecord.getId()] = blendData
1279 if len(parentBlends) == 0:
1280 # All of the deconvolved blends failed to deblend
1281 self._updateParentRecord(
1282 parentRecord=blendRecord,
1283 nPeaks=len(parentRecord.getFootprint().peaks),
1284 nChild=0,
1285 nComponents=0,
1286 runtime=np.nan,
1287 iterations=0,
1288 logL=np.nan,
1289 chi2=np.nan,
1290 spectrumInit=False,
1291 converged=False,
1292 )
1293 continue
1295 # Calculate the reduced chi2 for the PSF parent
1296 parentFootprintImage = parentModel.data > 0
1297 chi2 = utils.calcChi2(parentModel, context.observation, parentFootprintImage)
1299 # Update the parent record with the deblending results
1300 self._updateParentRecord(
1301 parentRecord=parentRecord,
1302 nPeaks=len(parentFootprint.peaks),
1303 nChild=np.sum([child["deblend_nChild"] for child in blendRecords]),
1304 nComponents=np.sum([child["deblend_nComponents"] for child in blendRecords]),
1305 runtime=np.sum([child["deblend_runtime"] for child in blendRecords]),
1306 iterations=np.sum([child["deblend_iterations"] for child in blendRecords]),
1307 logL=np.nan,
1308 chi2=np.sum(chi2.data)/np.sum(parentFootprintImage),
1309 spectrumInit=np.all([
1310 child["deblend_spectrumInitFlag"]
1311 for child in blendRecords
1312 ]), # type: ignore
1313 converged=np.all([
1314 child["deblend_blendConvergenceFailedFlag"]
1315 for child in blendRecords
1316 ]), # type: ignore
1317 )
1318 # Persist parent columns to the children
1319 for child in sourceRecords:
1320 for key in self.toCopyFromParent:
1321 child.set(key, parentRecord.get(key))
1322 for parentColumn, childColumn in self.config.columnInheritance.items():
1323 child.set(childColumn, parentRecord.get(parentColumn))
1325 # Persist the blend data
1326 modelData.blends[parentRecord.getId()] = scl.io.HierarchicalBlendData(
1327 children=parentBlends,
1328 metadata={
1329 "spans": parentFootprint.getSpans().asArray(),
1330 "origin": (y0, x0),
1331 }
1332 )
1334 nDeblendedSources = np.sum(objectCatalog["parent"] != 0)
1335 self.log.info(
1336 "Deblender results: %d parent sources were "
1337 "split into %d deconvolved parents,"
1338 "resulting in %d deblended sources, "
1339 "for a total catalog size of %d sources",
1340 nParents,
1341 nBlends,
1342 nDeblendedSources,
1343 len(objectCatalog),
1344 )
1346 table = afwTable.SourceTable.make(self.objectSchema)
1347 sortedCatalog = afwTable.SourceCatalog(table)
1348 sortedCatalog.extend(objectCatalog, deep=True)
1349 table = afwTable.SourceTable.make(self.parentSchema)
1350 sortedBlendCatalog = afwTable.SourceCatalog(table)
1351 sortedBlendCatalog.extend(parentCatalog, deep=True)
1353 return pipeBase.Struct(
1354 deblendedCatalog=sortedCatalog,
1355 scarletModelData=modelData,
1356 objectParents=sortedBlendCatalog,
1357 )
1359 def _deblendParent(
1360 self,
1361 blendRecord: afwTable.SourceRecord,
1362 ) -> tuple[scl.Blend, scl.Image, scl.Image]:
1363 """Deblend a parent source record
1365 Parameters
1366 ----------
1367 blendRecord :
1368 The parent source record to deblend.
1370 Returns
1371 -------
1372 blend :
1373 The `scl.Blend` object that contains the deblended sources.
1374 blendModel :
1375 The `scl.Image` model of the blend.
1376 chi2 :
1377 The reduced chi2 of the blend model.
1378 """
1379 footprint = blendRecord.getFootprint()
1380 bbox = footprint.getBBox()
1381 peaks = footprint.getPeaks()
1383 # Since we use the first peak for the parent object, we should
1384 # propagate its flags to the parent source.
1385 blendRecord.assign(peaks[0], self.parentPeakSchemaMapper)
1387 # Skip the source if it meets the skipping criteria
1388 isSkipped = self._checkSkipped(blendRecord, self.mExposure)
1389 if isSkipped is not None:
1390 skipKey, skipMessage = isSkipped
1391 raise DeblenderSkippedError(
1392 skipMessage,
1393 blendRecord.getId(),
1394 skipKey,
1395 )
1397 self.log.trace(
1398 "Blend %d: deblending {%d} peaks",
1399 blendRecord.getId(),
1400 len(peaks),
1401 )
1402 # Choose whether or not to use improved spectral initialization.
1403 # This significantly cuts down on the number of iterations
1404 # that the optimizer needs and usually results in a better
1405 # fit.
1406 # But using least squares on a very large blend causes memory
1407 # issues, so it is not done for large blends
1408 if self.config.setSpectra:
1409 if self.config.maxSpectrumCutoff <= 0:
1410 spectrumInit = True
1411 else:
1412 spectrumInit = (
1413 len(footprint.peaks) * bbox.getArea() < self.config.maxSpectrumCutoff
1414 )
1415 else:
1416 spectrumInit = False
1418 try:
1419 t0 = time.monotonic()
1420 # Build the parameter lists with the same ordering
1421 blend = deblend(self.context, footprint, self.config, spectrumInit)
1422 tf = time.monotonic()
1423 runtime = (tf - t0) * 1000
1424 converged = _checkBlendConvergence(blend, self.config.relativeError)
1425 # Store the number of components in the blend
1426 nComponents = len(blend.components)
1427 nChild = len(blend.sources)
1428 # Catch all errors and filter out the ones that we know about
1429 except Exception as e:
1430 blendError = type(e).__name__
1431 if self.config.catchFailures:
1432 # Make it easy to find UnknownErrors in the log file
1433 self.log.warn("UnknownError")
1434 import traceback
1435 traceback.print_exc()
1436 else:
1437 raise
1439 raise DeblenderError(
1440 f"Unable to deblend parent {blendRecord.getId()}: {blendError}",
1441 blendRecord.getId(),
1442 blendError,
1443 )
1445 # Calculate the reduced chi2
1446 blendModel = blend.get_model(convolve=False)
1447 blendFootprintImage = blendModel.data > 0
1448 chi2 = utils.calcChi2(blendModel, self.context.observation, blendFootprintImage)
1450 # Update the blend record with the deblending results
1451 self._updateParentRecord(
1452 parentRecord=blendRecord,
1453 nPeaks=len(peaks),
1454 nChild=nChild,
1455 nComponents=nComponents,
1456 runtime=runtime,
1457 iterations=len(blend.loss),
1458 logL=blend.loss[-1],
1459 chi2=np.sum(chi2.data)/np.sum(blendFootprintImage),
1460 spectrumInit=spectrumInit,
1461 converged=converged,
1462 )
1464 return blend, blendModel, chi2
1466 def _isLargeFootprint(self, footprint: afwDet.Footprint) -> bool:
1467 """Returns whether a Footprint is large
1469 'Large' is defined by thresholds on the area, size and axis ratio,
1470 and total area of the bounding box multiplied by
1471 the number of children.
1472 These may be disabled independently by configuring them to be
1473 non-positive.
1474 """
1475 if (
1476 self.config.maxFootprintArea > 0
1477 and footprint.getBBox().getArea() > self.config.maxFootprintArea
1478 ):
1479 return True
1480 if self.config.maxFootprintSize > 0:
1481 bbox = footprint.getBBox()
1482 if max(bbox.getWidth(), bbox.getHeight()) > self.config.maxFootprintSize:
1483 return True
1484 if self.config.minFootprintAxisRatio > 0:
1485 axes = afwEll.Axes(footprint.getShape())
1486 if axes.getB() < self.config.minFootprintAxisRatio * axes.getA():
1487 return True
1488 if self.config.maxAreaTimesPeaks > 0:
1489 if (
1490 footprint.getBBox().getArea() * len(footprint.peaks)
1491 > self.config.maxAreaTimesPeaks
1492 ):
1493 return True
1494 return False
1496 def _isMasked(self, footprint: afwDet.Footprint, mExposure: afwImage.MultibandExposure) -> bool:
1497 """Returns whether the footprint violates the mask limits
1499 Parameters
1500 ----------
1501 footprint :
1502 The footprint to check for masked pixels
1503 mExposure :
1504 The multiband exposure to check for masked pixels.
1506 Returns
1507 -------
1508 isMasked : `bool`
1509 `True` if `self.config.maskPlaneLimits` is less than the
1510 fraction of pixels for a given mask in
1511 `self.config.maskLimits`.
1512 """
1513 bbox = footprint.getBBox()
1514 mask = np.bitwise_or.reduce(mExposure.mask[:, bbox].array, axis=0)
1515 size = float(footprint.getArea())
1516 for maskName, limit in self.config.maskLimits.items():
1517 maskVal = mExposure.mask.getPlaneBitMask(maskName)
1518 _mask = afwImage.MaskX(mask & maskVal, xy0=bbox.getMin())
1519 # spanset of masked pixels
1520 maskedSpan = footprint.spans.intersect(_mask, maskVal)
1521 if (maskedSpan.getArea()) / size > limit:
1522 return True
1523 return False
1525 def _skipBlend(
1526 self,
1527 blendRecord: afwTable.SourceRecord,
1528 skipKey: str,
1529 logMessage: str | None,
1530 ):
1531 """Update a parent record that is not being deblended.
1533 This is a fairly trivial function but is implemented to ensure
1534 that a skipped blend updates the appropriate columns
1535 consistently, and always has a flag to mark the reason that
1536 it is being skipped.
1538 Parameters
1539 ----------
1540 blendRecord :
1541 The blend record to flag as skipped.
1542 skipKey :
1543 The name of the flag to mark the reason for skipping.
1544 logMessage :
1545 The message to display in a log.trace when a source
1546 is skipped.
1547 """
1548 if logMessage is not None:
1549 self.log.trace(logMessage)
1550 footprint = blendRecord.getFootprint()
1551 self._updateParentRecord(
1552 parentRecord=blendRecord,
1553 nPeaks=len(footprint.peaks),
1554 nChild=0,
1555 nComponents=0,
1556 runtime=np.nan,
1557 iterations=0,
1558 logL=np.nan,
1559 chi2=np.nan,
1560 spectrumInit=False,
1561 converged=False,
1562 )
1564 # Mark the source as skipped by the deblender and
1565 # flag the reason why.
1566 blendRecord.set("deblend_skipped", True)
1567 blendRecord.set(skipKey, True)
1569 # Add the NOT_DEBLENDED mask to the mask plane in each band
1570 if self.config.notDeblendedMask:
1571 for mask in self.mExposure.mask:
1572 footprint.spans.setMask(
1573 mask, mask.getPlaneBitMask(self.config.notDeblendedMask)
1574 )
1576 def _checkSkipped(
1577 self,
1578 parent: afwTable.SourceRecord,
1579 mExposure: afwImage.MultibandExposure
1580 ) -> tuple[afwTable.Key, str] | None:
1581 """Update a parent record that is not being deblended.
1583 This is a fairly trivial function but is implemented to ensure
1584 that a skipped parent updates the appropriate columns
1585 consistently, and always has a flag to mark the reason that
1586 it is being skipped.
1588 Parameters
1589 ----------
1590 parent :
1591 The parent record to flag as skipped.
1592 mExposure :
1593 The exposures should be co-added images of the same
1594 shape and region of the sky.
1596 Returns
1597 -------
1598 skip :
1599 `True` if the deblender will skip the parent
1600 """
1601 skipKey = None
1602 skipMessage = ""
1603 footprint = parent.getFootprint()
1604 if isPseudoSource(parent, self.config.pseudoColumns):
1605 # We also skip pseudo sources, like sky objects, which
1606 # are intended to be skipped.
1607 skipKey = "deblend_skipped_isPseudo"
1608 elif self._isLargeFootprint(footprint):
1609 # The footprint is above the maximum footprint size limit
1610 skipKey = "deblend_skipped_parentTooBig"
1611 skipMessage = f"Parent {parent.getId()}: skipping large footprint"
1612 elif self._isMasked(footprint, mExposure):
1613 # The footprint exceeds the maximum number of masked pixels
1614 skipKey = "deblend_skipped_masked"
1615 skipMessage = f"Parent {parent.getId()}: skipping masked footprint"
1616 elif (
1617 self.config.maxNumberOfPeaks > 0
1618 and len(footprint.peaks) > self.config.maxNumberOfPeaks
1619 ):
1620 # Unlike meas_deblender, in scarlet we skip the entire blend
1621 # if the number of peaks exceeds max peaks, since neglecting
1622 # to model any peaks often results in catastrophic failure
1623 # of scarlet to generate models for the brighter sources.
1624 skipKey = "deblend_skipped_tooManyPeaks"
1625 skipMessage = f"Parent {parent.getId()}: skipping blend with too many peaks"
1626 if skipKey is not None:
1627 return (cast(afwTable.Key, skipKey), skipMessage)
1628 return None
1630 def _updateParentRecord(
1631 self,
1632 parentRecord: afwTable.SourceRecord,
1633 nPeaks: int,
1634 nChild: int,
1635 nComponents: int,
1636 runtime: float,
1637 iterations: int,
1638 logL: float,
1639 chi2: float,
1640 spectrumInit: bool,
1641 converged: bool,
1642 ):
1643 """Update a parent record in all of the single band catalogs.
1645 Ensure that all locations that update a blend record,
1646 whether it is skipped or updated after deblending,
1647 update all of the appropriate columns.
1649 Parameters
1650 ----------
1651 blendRecord :
1652 The catalog record to update.
1653 nPeaks :
1654 Number of peaks in the parent footprint.
1655 nChild :
1656 Number of children deblended from the parent.
1657 This may differ from `nPeaks` if some of the peaks
1658 were culled and have no deblended model.
1659 nComponents :
1660 Total number of components in the parent.
1661 This is usually different than the number of children,
1662 since it is common for a single source to have multiple
1663 components.
1664 runtime :
1665 Total runtime for deblending.
1666 iterations :
1667 Total number of iterations in scarlet before convergence.
1668 logL :
1669 Final log likelihood of the blend.
1670 chi2 :
1671 Final reduced chi2 of the blend.
1672 spectrumInit :
1673 True when scarlet used `set_spectra` to initialize all
1674 sources with better initial intensities.
1675 converged :
1676 True when the optimizer reached convergence before
1677 reaching the maximum number of iterations.
1678 """
1679 parentRecord.set("deblend_nPeaks", nPeaks)
1680 parentRecord.set("deblend_nChild", nChild)
1681 parentRecord.set("deblend_nComponents", nComponents)
1682 parentRecord.set("deblend_runtime", runtime)
1683 parentRecord.set("deblend_iterations", iterations)
1684 parentRecord.set("deblend_spectrumInitFlag", spectrumInit)
1685 parentRecord.set("deblend_blendConvergenceFailedFlag", converged)
1686 parentRecord.set("deblend_chi2", chi2)
1688 def _initializeCatalogs(
1689 self,
1690 objectCatalog: afwTable.SourceCatalog,
1691 parentCatalog: afwTable.SourceCatalog,
1692 context: ScarletDeblendContext,
1693 ) -> None:
1694 """Create footprints for deconvolved parents
1696 Each parent may be subdivided into multiple blends that are
1697 isolated in deconvolved space but still blended in the image.
1698 This method finds all of the deconvolved footprints that overlap
1699 with a single parent footprint from the input catalog and
1700 returns a dictionary to map the parent ids to a list of
1701 deconvolved footprints.
1703 Parameters
1704 ----------
1705 objectCatalog :
1706 The catalog of isolated and deblended sources.
1707 parentCatalog :
1708 The catalog of parents and deconvolved parents used for deblending.
1709 context :
1710 The context for the entire coadd.
1711 """
1712 nParents = len(parentCatalog)
1714 isolated = []
1715 for n in range(nParents):
1716 parent = parentCatalog[n]
1717 parentFoot = parent.getFootprint()
1718 # Since we use the first peak for the parent object, we should
1719 # propagate its flags to the parent source.
1720 # For example, this propagates `merge_peak_sky` to the parent
1721 parent.assign(parent.getFootprint().peaks[0], self.peakSchemaMapper)
1723 if isPseudoSource(parent, self.config.pseudoColumns):
1724 # Skip pseudo sources
1725 # Note: this does not flag isolated sources as skipped or
1726 # set the NOT_DEBLENDED mask in the exposure,
1727 # since these aren't really any skipped blends.
1728 isolated.append(parent)
1729 continue
1731 if len(parentFoot.peaks) == 1:
1732 # Isolated source and we are not processing singles
1733 isolated.append(parent)
1734 parent.set("deblend_skipped_isolatedParent", True)
1735 continue
1737 # Find deconvolved footprints that intersect with the parent
1738 # and add them to the blend catalog.
1739 parentId = parent.getId()
1740 self._buildIntersectingFootprints(
1741 parentId,
1742 parentFoot,
1743 parentCatalog,
1744 context.footprints,
1745 context.footprintImage
1746 )
1748 # Update the IdFactory to account for all of the parents
1749 # and deconvolved parents added to the catalog.
1750 idFactory = objectCatalog.getIdFactory()
1751 maxId = np.max(parentCatalog["id"])
1752 idFactory.notify(maxId)
1754 # Add the isolated sources to the object catalog.
1755 # This has to be done before adding the other deblended children
1756 # because a SourceCatalog must be ordered by parentId and all isolated
1757 # sources have parentId == 0.
1758 for parent in isolated:
1759 self._addIsolatedSource(parent, objectCatalog)
1761 def _buildIntersectingFootprints(
1762 self,
1763 parentId: int,
1764 afwFootprint: afwDet.Footprint,
1765 parentCatalog: afwTable.SourceCatalog,
1766 sclFootprints: list[scl.detect.Footprint],
1767 footprintImage: scl.Image,
1768 ) -> None:
1769 """Get the intersection of two footprints
1771 Parameters
1772 ----------
1773 parentId :
1774 The parent id containing the footprints.
1775 afwFootprint :
1776 The afw footprint
1777 parentCatalog :
1778 The catalog of parents and deconvolved parents.
1779 sclFootprints :
1780 List of scarlet lite Footprints.
1781 footprintImage :
1782 An indexed image of the scarlet footprints so that the value
1783 of a pixel gives the index + 1 of the footprints that
1784 contain that pixel.
1786 Returns
1787 -------
1788 intersection :
1789 The intersection of the two footprints
1790 """
1791 footprintIndices = set()
1792 ymin, xmin = footprintImage.bbox.origin
1794 # Get the index of the deconvolved footprint at the peak location
1795 for peak in afwFootprint.peaks:
1796 x = peak["i_x"] - xmin
1797 y = peak["i_y"] - ymin
1798 try:
1799 footprintIndex = footprintImage.data[y, x] - 1
1800 except IndexError:
1801 raise RuntimeError(f"no footprint at ({y}, {x})")
1802 if footprintIndex >= 0:
1803 footprintIndices.add(footprintIndex)
1805 # Get the intersection of each deconvolved footprint with
1806 # the parent footprint.
1807 for index in footprintIndices:
1808 _sclFootprint = scarletFootprintToAfw(sclFootprints[index])
1809 intersection = afwFootprint.intersect(_sclFootprint, copyPeaks=True)
1810 if len(intersection.peaks) > 0:
1811 self._addBlendRecord(
1812 parentId=parentId,
1813 parentCatalog=parentCatalog,
1814 footprint=intersection,
1815 )
1817 def _addBlendRecord(
1818 self,
1819 parentId: int,
1820 parentCatalog: afwTable.SourceCatalog,
1821 footprint: afwDet.Footprint,
1822 ) -> None:
1823 """Add deconvolved parents to the parent catalog
1825 Each parent may be subdivided into multiple blends that are
1826 isolated in deconvolved space but still blended in the image.
1827 This function adds the sub-parents to the catalog.
1829 Parameters
1830 ----------
1831 parentId :
1832 The ID of the parent of the sub-parents.
1833 parentCatalog :
1834 The catalog of parents and deconvolved parents.
1835 footprint :
1836 The footprint of the deconvolved parent.
1837 """
1838 blendRecord = parentCatalog.addNew()
1839 blendRecord.setParent(parentId)
1840 blendRecord.setFootprint(footprint)
1842 def _addDeblendedSource(
1843 self,
1844 parentId: int,
1845 blendRecord: afwTable.SourceRecord,
1846 peak: afwDet.PeakRecord,
1847 objectCatalog: afwTable.SourceCatalog,
1848 scarletSource: scl.Source,
1849 chi2: scl.Image,
1850 ):
1851 """Add a deblended source to a catalog.
1853 This creates a new child in the source catalog,
1854 assigning it a parent id, and adding all columns
1855 that are independent across all filter bands and not
1856 updated after deblending.
1858 Parameters
1859 ----------
1860 parent :
1861 The parent of the new child record.
1862 blendRecord :
1863 The deconvolved parent of the new child record.
1864 peak :
1865 The peak record for the peak from the parent peak catalog.
1866 catalog :
1867 The source catalog that the child is added to.
1868 scarletSource :
1869 The scarlet model for the new source record.
1870 chi2 :
1871 The chi2 for each pixel.
1873 Returns
1874 -------
1875 src :
1876 The new child source record.
1877 """
1878 src = objectCatalog.addNew()
1879 # The peak catalog is the same for all bands,
1880 # so we just use the first peak catalog
1881 src.assign(peak, self.peakSchemaMapper)
1882 src.setParent(parentId)
1883 src.set("deblend_blendId", blendRecord.getId())
1884 src.set("deblend_blendNChild", len(blendRecord.getFootprint().peaks))
1885 src.set("deblend_nChild", 0)
1887 # Set the position of the peak from the parent footprint
1888 # This will make it easier to match the same source across
1889 # deblenders and across observations, where the peak
1890 # position is unlikely to change unless enough time passes
1891 # for a source to move on the sky.
1892 src.set("deblend_peak_center_x", peak["i_x"])
1893 src.set("deblend_peak_center_y", peak["i_y"])
1894 src.set("deblend_peakId", peak["id"])
1896 # Store the number of components for the source
1897 src.set("deblend_nComponents", len(scarletSource.components))
1899 # Calculate the reduced chi2 for the source
1900 area = np.sum(scarletSource.get_model().data > 0)
1901 src.set("deblend_chi2", np.sum(chi2[:, scarletSource.bbox].data/area))
1903 return src
1905 def _addIsolatedSource(
1906 self,
1907 parentRecord: afwTable.SourceRecord,
1908 objectCatalog: afwTable.SourceCatalog,
1909 ) -> afwTable.SourceRecord:
1910 """Add an isolated source to the catalog.
1912 This creates a new record in the object catalog,
1913 and assigns values for all fields not dependent on deblending.
1915 Parameters
1916 ----------
1917 parentRecord :
1918 The parent record to copy values from.
1919 objectCatalog :
1920 The source catalog that the child is added to.
1922 Returns
1923 -------
1924 src :
1925 The new isolated source record.
1926 """
1927 # The isolated source has the same peak as the parent
1928 footprint = parentRecord.getFootprint()
1929 peak = footprint.peaks[0]
1930 src = objectCatalog.addNew()
1931 src.assign(peak, self.peakSchemaMapper)
1932 src.setParent(0)
1933 # The id of the isolated source is the same as the parent
1934 src.setId(parentRecord.getId())
1935 src.setFootprint(footprint)
1936 src.set("deblend_blendId", 0)
1937 src.set("deblend_blendNChild", 0)
1938 src.set("deblend_peak_center_x", peak["i_x"])
1939 src.set("deblend_peak_center_y", peak["i_y"])
1940 src.set("deblend_peakId", peak["id"])
1942 # Isolated sources are not modeled, so they don't have components
1943 src.set("deblend_nComponents", 0)
1945 # There are no children so we need to update the parent
1946 # record and add the isolated source to the object catalog
1947 self._updateParentRecord(
1948 parentRecord=parentRecord,
1949 nPeaks=len(parentRecord.getFootprint().peaks),
1950 nChild=0,
1951 nComponents=0,
1952 runtime=0.0,
1953 iterations=0,
1954 logL=np.nan,
1955 chi2=np.nan,
1956 spectrumInit=False,
1957 converged=True,
1958 )
1960 # Persist parent columns to the isolated source
1961 # This is usually detection flags, like merge_peak_sky
1962 # or merge_footprint_r, etc.
1963 for key in self.toCopyFromParent:
1964 src.set(key, parentRecord.get(key))
1966 return src