Coverage for python / lsst / meas / extensions / multiprofit / utils.py: 12%
52 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:22 +0000
1# This file is part of meas_extensions_multiprofit.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from collections import defaultdict
23from typing import Iterable
25import lsst.afw.detection as afwDetect
26import lsst.afw.image as afwImage
27import lsst.geom as geom
28import numpy as np
31def defaultdictNested():
32 """Get a nested defaultdict with defaultdict default value.
34 Returns
35 -------
36 defaultdict : `defaultdict`
37 A `defaultdict` with `defaultdict` default values.
38 """
39 return defaultdict(defaultdictNested)
42def get_all_subclasses(cls, children_first: bool = True):
43 """Return all subclasses of a class recursively.
45 Parameters
46 ----------
47 children_first
48 If true, return child (direct subclasses) first, followed by their
49 children (recursively). Otherwise, return each direct child followed
50 by its own descendants first.
51 """
52 subclasses = {c: None for c in cls.__subclasses__()}
53 subclasses_return = subclasses.copy() if children_first else {}
54 for subclass in subclasses:
55 if children_first:
56 subclasses_return[subclass] = None
57 for subsubclass in get_all_subclasses(subclass):
58 subclasses_return[subsubclass] = None
59 return list(subclasses_return)
62# TODO: Allow addition to existing image
63def get_spanned_image(
64 exposure: afwImage.Exposure,
65 footprint: afwDetect.Footprint = None,
66 bbox: geom.Box2I | None = None,
67 spans: np.ndarray | None = None,
68 get_sig_inv: bool = False,
69 calibrate: bool = True,
70) -> tuple[np.ndarray, geom.Box2I, np.ndarray]:
71 """Get an image masked by its spanset.
73 Parameters
74 ----------
75 exposure
76 An exposure to extract arrays from.
77 footprint
78 The footprint to get spans/bboxes from. Not needed if both of
79 `bbox` and `spans` are provided.
80 bbox
81 The bounding box to subset the exposure with.
82 Defaults to the footprint's bbox.
83 spans
84 A spanset array (inverse mask/selection).
85 Defaults to the footprint's spans.
86 get_sig_inv
87 Whether to get the inverse variance and return its square root.
88 calibrate
89 Whether to calibrate the image; set to False if already calibrated.
91 Returns
92 -------
93 image
94 The image array, with masked pixels set to zero.
95 bbox
96 The bounding box used to subset the exposure.
97 sig_inv
98 The inverse sigma array, with masked pixels set to zero.
99 Set to None if `get_sig_inv` is False.
100 """
101 bbox_is_none = bbox is None
102 if bbox_is_none:
103 bbox = footprint.getBBox()
104 if not (bbox.getHeight() > 0 and bbox.getWidth() > 0):
105 return None, bbox
106 if spans is None:
107 spans = footprint.getSpans().asArray()
108 sig_inv = afwImage.ImageF(bbox) if get_sig_inv else None
109 img = afwImage.ImageF(bbox)
110 img.array[:] = np.nan
111 if footprint is None:
112 maskedIm = exposure.maskedImage.subset(bbox)
113 if not calibrate:
114 img = maskedIm.image.array
115 sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans])
116 else:
117 img.array[spans] = footprint.getImageArray()
118 if get_sig_inv:
119 # footprint.getVarianceArray() returns zeros
120 variance = exposure.variance[bbox]
121 if not calibrate:
122 sig_inv.array[spans] = 1 / np.sqrt(variance.array[spans])
123 if calibrate:
124 # Have to calibrate with the original image
125 maskedIm = afwImage.MaskedImageF(
126 image=exposure.image[bbox],
127 variance=variance if get_sig_inv else None,
128 )
129 if calibrate:
130 maskedIm = exposure.photoCalib.calibrateImage(maskedIm)
131 if footprint is None:
132 img = maskedIm.image.array
133 else:
134 # Apply the calibration to the deblended footprint
135 # ... hopefully it's multiplicative enough
136 img.array[spans] *= maskedIm.image.array[spans] / exposure.image[bbox].array[spans]
137 img = img.array
138 if get_sig_inv:
139 sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans])
140 # Should not happen but does with footprints having nans
141 sig_inv.array[~(sig_inv.array >= 0)] = 0
143 return np.array(img, dtype="float64"), bbox, np.array(sig_inv.array, dtype="float64")
146def join_and_filter(separator: str, items: Iterable[str], exclusion: str | None = None) -> str:
147 """Join an iterable of items by a separator, filtering out an exclusion.
149 Parameters
150 ----------
151 separator
152 The separator to join items by.
153 items
154 Items to join.
155 exclusion
156 The pattern to exclude.
158 Returns
159 -------
160 joined
161 The joined string.
162 """
163 return separator.join(filter(exclusion, items))