Coverage for python / lsst / drp / tasks / fit_visit_background.py: 0%
302 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:19 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ()
26import dataclasses
27import uuid
28from collections.abc import Iterable, Mapping
29from functools import cached_property
30from typing import ClassVar, cast
32import numpy as np
33import pydantic
35from lsst.afw.cameraGeom import FOCAL_PLANE, PIXELS, Camera, Detector
36from lsst.afw.image import MaskedImageF
37from lsst.afw.math import (
38 ApproximateControl,
39 BackgroundList,
40 BackgroundMI,
41 ChebyshevBoundedField,
42 ChebyshevBoundedFieldControl,
43 Interpolate,
44 UndersampleStyle,
45)
46from lsst.geom import Box2D, Box2I, Point2I
47from lsst.pex.config import Field
48from lsst.pipe.base import (
49 InputQuantizedConnection,
50 InvalidQuantumError,
51 OutputQuantizedConnection,
52 PipelineTask,
53 PipelineTaskConfig,
54 PipelineTaskConnections,
55 QuantumContext,
56 Struct,
57)
58from lsst.pipe.base import connectionTypes as cT
61@dataclasses.dataclass(frozen=True)
62class FitInputData:
63 """Data points input to a focal-plane background fit.
65 All arrays are 1-d and have same length, and are considered read-only.
66 """
68 x: np.ndarray
69 """Focal plane X coordinates of the centers of bins."""
71 y: np.ndarray
72 """Focal plane Y coordinates of the centers of bins."""
74 z: np.ndarray
75 """Values of background-estimation bins."""
77 w: np.ndarray
78 """Weights of background-estimation bins, from variance**(-1/2)."""
80 sky_frame: np.ndarray | None = None
81 """Binned sky frame values.
83 This will be `None` if the sky frame is not included as a basis function.
84 """
86 detector_slices: dict[int, slice] = dataclasses.field(default_factory=dict)
87 """A mapping from detector ID to a slice that corresponds to the points
88 in that detector in the arrays.
90 This may be empty for single-detector objects.
91 """
93 def __post_init__(self) -> None:
94 # Make arrays immutable to avoid bugs, especially with caching.
95 self.x.flags.writeable = False
96 self.y.flags.writeable = False
97 self.z.flags.writeable = False
98 self.w.flags.writeable = False
99 if self.sky_frame is not None:
100 self.sky_frame.flags.writeable = False
102 def __len__(self) -> int:
103 return len(self.z)
105 def masked(self, mask: np.ndarray) -> FitInputData:
106 """Returned a masked copy of the data.
108 Parameters
109 ----------
110 mask : `numpy.ndarray`
111 Boolean or index array that selects the data to keep.
113 Returns
114 -------
115 masked : `FitInputData`
116 Data with mask applied. Detector slices are dropped.
117 """
118 return FitInputData(
119 x=self.x[mask],
120 y=self.y[mask],
121 z=self.z[mask],
122 w=self.w[mask],
123 sky_frame=self.sky_frame[mask] if self.sky_frame is not None else None,
124 )
126 def fit(self, matrix: np.ndarray) -> VisitBackgroundFit:
127 """Perform a linear least-squares fit with this data.
129 Parameters
130 ----------
131 matrix : `np.ndarray`
132 Design matrix to solve. First-dimension shape should equal
133 ``len(self)``.
135 Returns
136 -------
137 fit : `VisitBackgroundFit`
138 Object holding the best-fit coefficients that also remembers the
139 matrix and the input data.
140 """
141 if self.sky_frame is not None:
142 full_matrix = np.zeros((matrix.shape[0], matrix.shape[1] + 1), dtype=float)
143 full_matrix[:, :-1] = matrix
144 full_matrix[:, -1] = self.sky_frame
145 else:
146 full_matrix = matrix
147 coefficients, _, _, _ = np.linalg.lstsq(full_matrix * self.w[:, np.newaxis], self.z * self.w)
148 sky_frame_factor: float | None = None
149 if self.sky_frame is not None:
150 sky_frame_factor = coefficients[-1]
151 coefficients = coefficients[:-1]
152 return VisitBackgroundFit(
153 coefficients=coefficients, matrix=matrix, sky_frame_factor=sky_frame_factor, data=self
154 )
156 @classmethod
157 def concatenate(cls, data: Mapping[int, FitInputData]) -> FitInputData:
158 """Combine input data from multiple detectors.
160 Parameters
161 ----------
162 data : `~collections.abc.Mapping` [ `int`, `FitInputData` ]
163 Mapping from detector ID to the input data from that detector.
165 Returns
166 -------
167 concatenated : `FitInputData`
168 Concatenated input data.
169 """
170 i = 0
171 has_sky: bool = False
172 detector_slices: dict[int, slice] = {}
173 for detector_id, detector_data in data.items():
174 detector_slices[detector_id] = slice(i, i + len(detector_data))
175 i += len(detector_data)
176 has_sky = detector_data.sky_frame is not None
177 return cls(
178 x=np.concatenate([d.x for d in data.values()]),
179 y=np.concatenate([d.y for d in data.values()]),
180 z=np.concatenate([d.z for d in data.values()]),
181 w=np.concatenate([d.w for d in data.values()]),
182 sky_frame=(
183 np.concatenate([cast(np.ndarray, d.sky_frame) for d in data.values()]) if has_sky else None
184 ),
185 detector_slices=detector_slices,
186 )
189@dataclasses.dataclass(frozen=True)
190class VisitBackgroundFit:
191 """Results of a linear fit to a focal-plane background."""
193 coefficients: np.ndarray
194 """Coefficients of the fit (i.e. model parameters)."""
196 matrix: np.ndarray
197 """Matrix that evaluates the model at the data points.
199 Shape is ``(len(data), len(self.coefficients))``.
200 """
202 sky_frame_factor: float | None
203 """Coefficient for the sky frame basis function."""
205 data: FitInputData
206 """Data that went into the fit."""
208 def __post_init__(self) -> None:
209 # Make arrays immutable to avoid bugs, especially with caching.
210 self.coefficients.flags.writeable = False
211 self.matrix.flags.writeable = False
213 def masked(self, mask: np.ndarray) -> VisitBackgroundFit:
214 """Apply a mask to the data and matrix, and re-fit the coefficients
215 to return a new instance.
217 Parameters
218 ----------
219 mask : `numpy.ndarray`
220 Boolean or index array that selects the data to keep.
222 Returns
223 -------
224 masked_fit : `VisitBackgroundFit`
225 New fit with mask applied to data.
226 """
227 data = self.data.masked(mask)
228 matrix = self.matrix[mask, :]
229 return data.fit(matrix)
231 @cached_property
232 def model(self) -> np.ndarray:
233 """The model evaluated at the data points."""
234 result = np.dot(self.matrix, self.coefficients)
235 if self.sky_frame_factor is not None and self.data.sky_frame is not None:
236 result += self.sky_frame_factor * self.data.sky_frame
237 return result
239 @cached_property
240 def residuals(self) -> np.ndarray:
241 """The difference between the data and the model."""
242 return self.data.z - self.model
245class ChebyshevBasis(pydantic.BaseModel):
246 """A persistable description of a 2-d Chebyshev polynonmial basis."""
248 x_min: pydantic.StrictInt
249 x_max: pydantic.StrictInt
250 y_min: pydantic.StrictInt
251 y_max: pydantic.StrictInt
252 x_order: pydantic.StrictInt
253 y_order: pydantic.StrictInt
255 @classmethod
256 def from_bbox(cls, bbox: Box2I, x_order: int, y_order: int) -> ChebyshevBasis:
257 """Construct from a box object and polynomial orders.
259 Parameters
260 ----------
261 bbox : `lsst.geom.Box2I`
262 Bounding box used to scale the range of the Chebyshev polynomials
263 from (-1, 1).
264 x_order : `int`
265 Polynomial order (inclusive) in the X direction.
266 y_order : `int`
267 Polynomial order (inclusive) in the Y direction.
268 """
269 return cls(
270 x_min=bbox.x.min,
271 x_max=bbox.x.max,
272 y_min=bbox.y.min,
273 y_max=bbox.y.max,
274 x_order=x_order,
275 y_order=y_order,
276 )
278 @property
279 def bbox(self) -> Box2I:
280 """Bounding box used to scale the range of the Chebyshev polynomials
281 from (-1, 1).
282 """
283 return Box2I(minimum=Point2I(self.x_min, self.y_min), maximum=Point2I(self.x_max, self.y_max))
286class VisitBackgroundModel(pydantic.BaseModel):
287 """A persistable background model for a full visit."""
289 version: int = 1
290 """File format version number."""
292 chebyshev_basis: ChebyshevBasis
293 """Chebyshev basis in focal-plane coordinates."""
295 detector_ids: list[pydantic.StrictInt] | None = None
296 """IDs of detectors that have pistons.
298 When this is not `None`, `detector_types` must be `None` and the
299 zeroth-order Chebyshev term is dropped.
300 """
302 detector_types: list[pydantic.StrictStr] | None = None
303 """Detector physical types that have pistons.
305 When this is not `None`, `detector_ids` must be `None` and the zeroth-order
306 Chebyshev term is dropped.
307 """
309 sky_frame_datasets: dict[int, uuid.UUID] | None = None
310 """UUIDs of the binned sky frame datasets that were used as a basis
311 function in this fit, keyed by detector.
313 This is only set by `FitBackgroundTask.runQuantum`; it may be `None` even
314 if the sky frame was fit.
315 """
317 sky_frame_factor: float | None = None
318 """Factor the sky frame is multiplied by in this model."""
320 coefficients: list[pydantic.StrictFloat] = pydantic.Field(default_factory=list)
321 """Model parameters, not including the sky frame factor.
323 This may be empty if the model has not been fit yet. Chebyshev
324 coefficients are first, followed optionally by either ``detector_ids``
325 pistons or ``detector_types`` pistons, in the order set by those
326 attributes.
327 """
329 def set_detector_types(self, detectors: Iterable[Detector]) -> None:
330 """Set the `detector_types` attribute with all of the unique physical
331 types of the given detectors, in deterministic order.
332 """
333 self.detector_types = sorted(set(detector.getPhysicalType() for detector in detectors))
335 def make_matrix(
336 self,
337 x: np.ndarray,
338 y: np.ndarray,
339 detector_slices: Mapping[int, slice],
340 detectors: Mapping[int, Detector],
341 drop_chebyshev0: bool,
342 ) -> np.ndarray:
343 """Construct a matrix that can be used to fit for the coefficients of
344 this model or evaluate the model given coefficients.
346 Like the coefficients, this matrix never includes the sky frame term,
347 which must be applied separately.
349 Parameters
350 ----------
351 x : `numpy.ndarray`
352 Focal plane X coordinates of the data points.
353 y : `numpy.ndarray`
354 Focal plane Y coordinates of the data points.
355 detector_slices : `~collections.abc.Mapping` [`int`, `slice`]
356 Mapping from detector ID to the slice that corresponds to that
357 detector in ``x`` and ``y``.
358 detectors : `~collections.abc.Mapping` [`int`,\
359 `lsst.afw.cameraGeom.Detector`]
360 Descriptions of all detectors that the matrix will evaluate on.
361 drop_chebyshev0 : `bool`
362 Drop the zeroth-order Chebyshev polynomial term. This is useful
363 for models where some other combination of terms would be fully
364 degenerate with it.
366 Returns
367 -------
368 matrix : `numpy.ndarray`
369 A 2-d matrix with shape ``(len(x), len(coefficients))``, where the
370 latter is the length the ``coefficients`` *will* have after the
371 fit if the array is empty.
372 """
373 ctrl = ChebyshevBoundedFieldControl()
374 ctrl.orderX = self.chebyshev_basis.x_order
375 ctrl.orderY = self.chebyshev_basis.y_order
376 chebyshev_matrix = ChebyshevBoundedField.makeFitMatrix(self.chebyshev_basis.bbox, x, y, ctrl)
377 if drop_chebyshev0:
378 chebyshev_matrix = chebyshev_matrix[:, 1:]
379 n_data = chebyshev_matrix.shape[0]
380 n_chebyshev = chebyshev_matrix.shape[1]
381 n_coeffs = n_chebyshev
382 if self.detector_ids is not None:
383 n_coeffs = n_chebyshev + len(self.detector_ids)
384 matrix = np.zeros((n_data, n_coeffs), dtype=float)
385 matrix[:, :n_chebyshev] = chebyshev_matrix
386 for j, detector_id in enumerate(self.detector_ids, start=n_chebyshev):
387 detector_slice = detector_slices.get(detector_id, slice(0, 0))
388 matrix[detector_slice, j] = 1.0
389 elif self.detector_types is not None:
390 n_coeffs = n_chebyshev + len(self.detector_types)
391 matrix = np.zeros((n_data, n_coeffs), dtype=float)
392 matrix[:, :n_chebyshev] = chebyshev_matrix
393 for detector_id, detector in detectors.items():
394 j = n_chebyshev + self.detector_types.index(detector.getPhysicalType())
395 matrix[detector_slices[detector_id], j] = 1.0
396 else:
397 matrix = chebyshev_matrix
398 return matrix
401class FitVisitBackgroundConnections(PipelineTaskConnections, dimensions=["visit"]):
402 camera = cT.PrerequisiteInput(
403 "camera",
404 storageClass="Camera",
405 dimensions=["instrument"],
406 isCalibration=True,
407 )
408 input_backgrounds = cT.Input(
409 "preliminary_visit_image_background",
410 storageClass="Background",
411 multiple=True,
412 dimensions=["visit", "detector"],
413 )
414 sky_frame_backgrounds = cT.PrerequisiteInput(
415 "sky_frame_background",
416 storageClass="Background",
417 multiple=True,
418 dimensions=["detector", "physical_filter"],
419 isCalibration=True,
420 )
421 output_backgrounds = cT.Output(
422 "visit_background_diff",
423 storageClass="Background",
424 multiple=True,
425 dimensions=["visit", "detector"],
426 )
427 model = cT.Output(
428 "visit_background_model",
429 storageClass="VisitBackgroundModel",
430 dimensions=["visit"],
431 )
433 def __init__(self, *, config=None):
434 assert isinstance(config, FitVisitBackgroundConfig)
435 if not config.fit_sky_frame:
436 del self.sky_frame_binned
438 def adjustQuantum(self, inputs, outputs, label, data_id):
439 if self.config.fit_sky_frame:
440 # Make sure we have a sky frame for every detector, and trim out
441 # any we don't need.
442 _, input_refs = inputs["input_backgrounds"]
443 sky_connection, sky_refs = inputs["sky_frame_backgrounds"]
444 input_refs_by_detector = {ref.dataId["detector"]: ref for ref in input_refs}
445 sky_refs_by_detector = {ref.dataId["detector"]: ref for ref in sky_refs}
446 if not (sky_refs_by_detector.keys() >= input_refs_by_detector.keys()):
447 missing = input_refs_by_detector.keys() - sky_refs_by_detector.keys()
448 raise FileNotFoundError(
449 f"No binned sky frame for detector(s) {missing} in visit {data_id['visit']}."
450 )
451 inputs["sky_frame_backgrounds"] = (
452 sky_connection,
453 [sky_refs_by_detector[d] for d in input_refs_by_detector.keys()],
454 )
455 return super().adjustQuantum(inputs, outputs, label, data_id)
458class FitVisitBackgroundConfig(PipelineTaskConfig, pipelineConnections=FitVisitBackgroundConnections):
459 x_order = Field[int](
460 "Chebyshev polynomial order in focal-plane X.",
461 dtype=int,
462 default=6,
463 )
464 y_order = Field[int](
465 "Chebyshev polynomial order in focal-plane Y.",
466 dtype=int,
467 default=6,
468 )
469 fit_detector_pistons = Field[bool](
470 "If True, fit a constant to each detector in instead of the zeroth-order Chebyshev term.",
471 dtype=bool,
472 default=False,
473 )
474 fit_type_pistons = Field[bool](
475 "If True, fit a constant to each detector type (as defined by Detector.getPhysicalType()) "
476 "instead of the zeroth-order Chebyshev term.",
477 dtype=bool,
478 default=False,
479 )
480 fit_sky_frame = Field[bool](
481 "If True, fit with the binned sky frame as another basis function.",
482 dtype=bool,
483 default=True,
484 )
485 clip_positive_threshold = Field[float](
486 "Clip sample points with (data - model) greater than this threshold and re-fit.",
487 dtype=float,
488 optional=True,
489 default=50.0,
490 )
491 clip_negative_threshold = Field[float](
492 "Clip sample points with (data - model) less than this threshold and re-fit.",
493 dtype=float,
494 optional=True,
495 default=None,
496 )
497 clip_iterations = Field[int](
498 "Number of clip-and-refit iterations to perform. Ignored if no clipping is enabled.",
499 dtype=int,
500 default=10,
501 )
503 def validate(self):
504 if self.fit_detector_pistons and self.fit_type_pistons:
505 raise ValueError("Only one of 'fit_detector_pistons' and 'fit_type_pistons' may be True.")
506 return super().validate()
509class FitVisitBackgroundTask(PipelineTask):
510 """A task that fits a visit-level background by fitting a larger-scale
511 model to the binned values originally used in per-detector backgrounds.
513 Notes
514 -----
515 This task requires input `lsst.afw.math.BackgroundList` instances in which
516 all layers (i.e. from different rounds of subtraction) either use the same
517 bin size or have a single bin.
518 """
520 ConfigClass: ClassVar[type[FitVisitBackgroundConfig]] = FitVisitBackgroundConfig
522 _DefaultName: ClassVar[str] = "fitVisitBackground"
524 config: FitVisitBackgroundConfig
526 def runQuantum(
527 self,
528 butlerQC: QuantumContext,
529 inputRefs: InputQuantizedConnection,
530 outputRefs: OutputQuantizedConnection,
531 ) -> None:
532 camera = butlerQC.get(inputRefs.camera)
533 input_backgrounds = {
534 ref.dataId["detector"]: butlerQC.get(ref)
535 for ref in sorted(inputRefs.input_backgrounds, key=lambda ref: ref.dataId["detector"])
536 }
537 sky_frame_backgrounds: dict[int, BackgroundList] | None = None
538 if self.config.fit_sky_frame:
539 sky_frame_backgrounds = {
540 ref.dataId["detector"]: butlerQC.get(ref)
541 for ref in sorted(inputRefs.sky_frame_backgrounds, key=lambda ref: ref.dataId["detector"])
542 }
543 outputs = self.run(
544 camera=camera, input_backgrounds=input_backgrounds, sky_frame_backgrounds=sky_frame_backgrounds
545 )
546 # Add provenance that `run` doesn't have access to.
547 outputs.model.sky_frame_datasets = {
548 ref.dataId["detector"]: ref.id for ref in inputRefs.sky_frame_backgrounds
549 }
550 for ref in outputRefs.output_backgrounds:
551 if (bg := outputs.output_backgrounds.get(ref.dataId["detector"])) is not None:
552 butlerQC.put(bg, ref)
553 butlerQC.put(outputs.model, outputRefs.model)
555 def run(
556 self,
557 *,
558 camera: Camera,
559 input_backgrounds: Mapping[int, BackgroundList],
560 sky_frame_backgrounds: Mapping[int, BackgroundList] | None = None,
561 ) -> Struct:
562 """Fit a Chebyshev and optional detector pistons to pre-binned
563 background statistics images.
565 Parameters
566 ----------
567 camera : `lsst.afw.cameraGeom.Camera`
568 Description of the camera.
569 input_backgrounds : `~collections.abc.Mapping` [
570 `int`, `lsst.afw.math.BackgroundList`]
571 Input background models.
572 sky_frame_backgrounds : `~collections.abc.Mapping` [
573 `int`, `lsst.afw.math.BackgroundList`]
574 Background models subtracted from the sky frame.
576 Returns
577 -------
578 outputs : `lsst.pipe.base.Struct`
579 Result struct with the following attributes:
581 - ``output_backgrounds``: `dict` mapping detector ID to the new
582 `lsst.afw.math.BackgroundList`. These should be applied to
583 images that already have the original input background
584 subtracted.
585 - ``fit``: `VisitBackgroundFit` instance with the details of the
586 final fit.
587 - ``model``: `VisitBackgroundModel` instance that allows the
588 full model to be serialized.
589 """
590 self.log.info("Extracting input samples from %s detectors.", len(input_backgrounds))
591 per_detector_data: dict[int, FitInputData] = {}
592 detectors: dict[int, Detector] = {}
593 for detector_id, bg_list in input_backgrounds.items():
594 self.log.verbose("Extracting input samples for detector=%s.", detector_id)
595 detector = camera[detector_id]
596 sky_frame_bg_list = None
597 if self.config.fit_sky_frame:
598 sky_frame_bg_list = sky_frame_backgrounds[detector_id]
599 per_detector_data[detector_id] = self._gather_input_samples(
600 detector, bg_list, sky_frame_bg_list=sky_frame_bg_list
601 )
602 detectors[detector_id] = detector
603 full_data = FitInputData.concatenate(per_detector_data)
604 self.log.info(
605 "Fitting focal-plane model to %s data points from %s detectors.",
606 len(full_data),
607 len(detectors),
608 )
609 model = VisitBackgroundModel(
610 chebyshev_basis=ChebyshevBasis.from_bbox(
611 self._compute_camera_bbox(camera),
612 self.config.x_order,
613 self.config.y_order,
614 )
615 )
616 if self.config.fit_detector_pistons:
617 model.detector_ids = list(full_data.detector_slices.keys())
618 elif self.config.fit_type_pistons:
619 model.set_detector_types(detectors.values())
620 matrix = model.make_matrix(
621 full_data.x,
622 full_data.y,
623 full_data.detector_slices,
624 detectors=detectors,
625 drop_chebyshev0=(self.config.fit_detector_pistons or self.config.fit_type_pistons),
626 )
627 fit = full_data.fit(matrix)
628 self.log.info(
629 "Fit %s-parameter focal-plane model to %s data points.",
630 len(fit.coefficients) + (fit.sky_frame_factor is not None),
631 len(fit.data),
632 )
633 if self.config.clip_positive_threshold is not None or self.config.clip_negative_threshold is not None:
634 for n_iter in range(self.config.clip_iterations):
635 if self.config.clip_positive_threshold is not None:
636 bad = fit.residuals > self.config.clip_positive_threshold
637 if self.config.clip_negative_threshold is not None:
638 bad = np.logical_or(bad, fit.residuals < self.config.clip_negative_threshold)
639 else:
640 bad = fit.residuals < self.config.clip_negative_threshold
641 n_clipped = sum(bad)
642 if not n_clipped:
643 break
644 fit = fit.masked(np.logical_not(bad))
645 self.log.info("Clipped %s data points and re-fit for iteration %s", sum(bad), n_iter)
646 model.coefficients = fit.coefficients.tolist()
647 model.sky_frame_factor = fit.sky_frame_factor
648 self.log.info("Projecting focal-plane model back to detectors.")
649 output_backgrounds = {
650 detector_id: self._make_output_background(
651 detector,
652 model,
653 input_backgrounds[detector_id],
654 sky_frame_bg_list=(
655 sky_frame_backgrounds[detector_id] if sky_frame_backgrounds is not None else None
656 ),
657 )
658 for detector_id, detector in detectors.items()
659 }
660 return Struct(output_backgrounds=output_backgrounds, model=model, fit=fit)
662 @staticmethod
663 def _compute_camera_bbox(camera: Camera) -> Box2I:
664 """Compute the bounding box of a camera in focal-plane coordinates."""
665 bbox = Box2D()
666 for detector in camera:
667 for corner in detector.getCorners(FOCAL_PLANE):
668 bbox.include(corner)
669 return Box2I(bbox)
671 def _make_bin_grid_fp_coordinates(
672 self, detector: Detector, background: BackgroundMI
673 ) -> tuple[np.ndarray, np.ndarray]:
674 """Construct arrays of focal-plane coordinates that correspond to the
675 bin centers of the given background object.
677 Parameters
678 ----------
679 detector : `lsst.afw.cameraGeom.Detector`
680 Detector the background is for.
681 background : `lsst.afw.math.BackgroundMI`
682 A single layer of a background model. All other models for this
683 detector are assumed to have the same bins.
685 Returns
686 -------
687 x_fp : `numpy.ndarray`
688 Array of focal-plane X coordinates.
689 y_fp : `numpy.ndarray`
690 Array of focal-plane Y coordinates.
691 """
692 pix_to_fp = detector.getTransform(PIXELS, FOCAL_PLANE)
693 x2_pix, y2_pix = np.meshgrid(background.getBinCentersX(), background.getBinCentersY())
694 xy_fp = pix_to_fp.getMapping().applyForward(np.vstack((x2_pix.ravel(), y2_pix.ravel())))
695 return xy_fp[0], xy_fp[1]
697 def _gather_input_samples(
698 self, detector: Detector, bg_list: BackgroundList, sky_frame_bg_list: BackgroundList | None = None
699 ) -> FitInputData:
700 """Transform input background-estimation bin values to focal-plane
701 coordinates.
703 Parameters
704 ----------
705 detector : `lsst.afw.cameraGeom.Detector`
706 Detector the background is for.
707 bg_list : `lsst.afw.math.BackgroundList`
708 Input background model for this detector.
709 sky_frame_bg_list : `lsst.afw.math.BackgroundList`, optional
710 Background model constructed from the sky frame for this detector.
711 Ignored unless `~FitVisitBackgroundConfig.fit_sky_frame` is `True`.
713 Returns
714 -------
715 data : `FitInputData`
716 Struct containing finiteness-filtered bin values and their
717 focal plane coordinates.
718 """
719 sum_image, bg_for_coordinates = self._sum_bg_list(bg_list)
720 x_fp, y_fp = self._make_bin_grid_fp_coordinates(detector, bg_for_coordinates)
721 z = sum_image.image.array.ravel().astype(float)
722 w = sum_image.variance.array.ravel().astype(float) ** (-0.5)
723 sky_frame: np.ndarray | None = None
724 if self.config.fit_sky_frame:
725 assert sky_frame_bg_list is not None, "Should be guaranteed by caller."
726 sky_frame_sum_image, _ = self._sum_bg_list(sky_frame_bg_list)
727 if sky_frame_sum_image.getBBox() != sum_image.getBBox():
728 raise InvalidQuantumError(
729 f"Binned sky frame for detector {detector.getId()} has "
730 f"{sky_frame_sum_image.getWidth()}×{sky_frame_sum_image.getHeight()} bins, but "
731 f"background has {sum_image.getWidth()}×{sum_image.getHeight()}."
732 )
733 sky_frame = sky_frame_sum_image.image.array.ravel().astype(float)
734 return FitInputData(x=x_fp, y=y_fp, z=z, w=w, sky_frame=sky_frame).masked(
735 np.logical_and(w > 0.0, np.isfinite(z))
736 )
738 def _sum_bg_list(self, bg_list: BackgroundList) -> tuple[MaskedImageF, BackgroundMI]:
739 sum_image: MaskedImageF | None = None
740 sum_constant_value: float = 0.0
741 sum_constant_variance: float = 0.0
742 bg_for_coordinates: BackgroundMI | None = None
743 for bg, *_ in bg_list:
744 layer_image = bg.getStatsImage()
745 if layer_image.getBBox().getArea() == 1:
746 sum_constant_value += layer_image.image.array[0, 0]
747 sum_constant_variance += layer_image.variance.array[0, 0]
748 elif sum_image is not None:
749 sum_image += layer_image
750 else:
751 sum_image = layer_image.clone()
752 bg_for_coordinates = bg
753 sum_image.image.array[:, :] += sum_constant_value
754 sum_image.variance.array[:, :] += sum_constant_variance
755 return sum_image, bg_for_coordinates
757 def _make_output_background(
758 self,
759 detector: Detector,
760 model: VisitBackgroundModel,
761 original_bg_list: BackgroundList,
762 sky_frame_bg_list: BackgroundList | None,
763 ) -> BackgroundMI:
764 """Convert the focal-plane background model into a per-detector
765 background objet.
767 Parameters
768 ----------
769 detector : `lsst.afw.cameraGeom.Detector`
770 Detector the background is for.
771 model : `VisitBackgroundModel`
772 Focal-plane background model.
773 original_bg_list : `lsst.afw.math.BackgroundList`
774 Input background model for this detector. THESE MODELS ARE
775 MODIFIED IN PLACE AND SHOULD BE CONSIDERED CONSUMED.
776 sky_frame_bg_list : `lsst.afw.math.BackgroundList`, optional
777 Background model constructed from the sky frame for this detector.
778 Ignored unless `~FitVisitBackgroundConfig.fit_sky_frame` is `True`.
779 THESE MODELS ARE MODIFIED IN PLACE AND SHOULD BE CONSIDERED
780 CONSUMED.
782 Returns
783 -------
784 bg_list : `lsst.afw.math.BackgroundList`
785 Updated background model for this detector. This reverts the
786 input background before applying the new one.
787 """
788 # Make the background's "stats image" plenty big to exactly constrain
789 # a perfect Chebyshev fit (should actually be ~2x as many points as
790 # we need), since the background object insists on re-fitting a
791 # Chebyshev to the stats image every time you make a full-size image.
792 n_bins_x = self.config.x_order + 1
793 n_bins_y = self.config.y_order + 1
794 stats_image = MaskedImageF(n_bins_x, n_bins_y)
795 new_bg = BackgroundMI(detector.getBBox(), stats_image)
796 x_fp, y_fp = self._make_bin_grid_fp_coordinates(detector, new_bg)
797 matrix = model.make_matrix(
798 x_fp,
799 y_fp,
800 detector_slices={detector.getId(): slice(0, None)},
801 detectors={detector.getId(): detector},
802 drop_chebyshev0=(self.config.fit_detector_pistons or self.config.fit_type_pistons),
803 )
804 z = np.dot(matrix, model.coefficients)
805 stats_image.image.array[:, :] = z.reshape(n_bins_y, n_bins_x)
806 stats_image.variance.array[:, :] = 1.0 # just to keep everything finite; we won't use weights
807 ctrl = new_bg.getBackgroundControl()
808 ctrl.setInterpStyle(Interpolate.Style.CUBIC_SPLINE)
809 ctrl.setUndersampleStyle(UndersampleStyle.THROW_EXCEPTION)
810 actrl = ApproximateControl(
811 ApproximateControl.Style.UNKNOWN,
812 self.config.x_order,
813 self.config.y_order,
814 False,
815 )
816 ctrl.setApproximateControl(actrl)
817 result = original_bg_list
818 for revert_bg, *_ in result:
819 stats_image = revert_bg.getStatsImage()
820 stats_image *= -1
821 if sky_frame_bg_list:
822 for sky_frame_bg, *rest in sky_frame_bg_list:
823 sky_frame_stats_image = sky_frame_bg.getStatsImage()
824 sky_frame_stats_image *= model.sky_frame_factor
825 result.append((sky_frame_bg, *rest))
826 result.append(
827 (
828 new_bg,
829 ctrl.getInterpStyle(),
830 ctrl.getUndersampleStyle(),
831 actrl.getStyle(),
832 actrl.getOrderX(),
833 actrl.getOrderY(),
834 False,
835 )
836 )
837 return result