Coverage for python / lsst / dax / images / cutout / projection_finders.py: 27%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:54 +0000
1# This file is part of dax_images_cutout.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "Chain",
26 "MatchDatasetTypeName",
27 "ProjectionFinder",
28 "ReadComponents",
29 "ReadComponentsAstropyFits",
30 "TryComponentParents",
31 "UseSkyMap",
32)
34import logging
35import re
36from abc import ABC, abstractmethod
37from collections.abc import Iterable
38from typing import cast
40import astropy.io.fits
42import lsst.geom
43from lsst.afw.geom import SkyWcs, getImageXY0FromMetadata, makeSkyWcs
44from lsst.daf.base import PropertyList
45from lsst.daf.butler import Butler, DatasetRef
46from lsst.geom import Box2I
47from lsst.skymap import BaseSkyMap
48from lsst.utils.timer import time_this
50_LOG = logging.getLogger(__name__)
51_TIMER_LOG_LEVEL = logging.INFO
54class ProjectionFinder(ABC):
55 """An interface for objects that can find the WCS and bounding box of a
56 butler dataset.
58 Notes
59 -----
60 Concrete `ProjectionFinder` implementations are intended to be composed to
61 define rules for how to find this projection information for a particular
62 dataset type; a finder that cannot handle a particular `DatasetRef` should
63 implement `find_projection` to return `None`, and implementations that
64 compose (e.g. `Chain`) can use this to decide when to try another nested
65 finder.
66 """
68 @abstractmethod
69 def find_projection(
70 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
71 ) -> tuple[SkyWcs, Box2I] | None:
72 """Run the finder on the given dataset with the given butler.
74 Parameters
75 ----------
76 ref : `DatasetRef`
77 Fully-resolved reference to the dataset.
78 butler : `Butler`
79 Butler client to use for reads. Need not support writes, and any
80 default search collections will be ignored.
81 logger : `logging.Logger`, optional
82 Logger to use for timing messages. If `None`, a default logger
83 will be used.
85 Returns
86 -------
87 wcs : `SkyWcs` (only if result is not `None`)
88 Mapping from sky to pixel coordinates for this dataset.
89 bbox : `Box2I` (only if result is not `None`)
90 Bounding box of the image dataset (or an image closely associated
91 with the dataset) in pixel coordinates.
92 """
93 raise NotImplementedError()
95 def __call__(
96 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
97 ) -> tuple[SkyWcs, Box2I]:
98 """Call `find_projection` but raise `LookupError` when no projection
99 information is found.
101 Parameters
102 ----------
103 ref : `DatasetRef`
104 Fully-resolved reference to the dataset.
105 butler : `Butler`
106 Butler client to use for reads. Need not support writes, and any
107 default search collections will be ignored.
108 logger : `logging.Logger`, optional
109 Logger to use for timing messages. If `None`, a default logger
110 will be used.
112 Returns
113 -------
114 wcs : `SkyWcs`
115 Mapping from sky to pixel coordinates for this dataset.
116 bbox : `Box2I`
117 Bounding box of the image dataset (or an image closely associated
118 with the dataset) in pixel coordinates.
119 """
120 result = self.find_projection(ref, butler, logger=logger)
121 if result is None:
122 raise LookupError(f"No way to obtain WCS and bounding box information for ref {ref}.")
123 return result
125 @staticmethod
126 def make_default() -> ProjectionFinder:
127 """Return a concrete finder appropriate for most pipelines.
129 Returns
130 -------
131 finder : `ProjectionFinder`
132 A finder that prefers to read, use, and cache a skymap when the
133 data ID includes tract or patch, and falls back to reading the WCS
134 and bbox from the dataset itself (or its parent, if the dataset is
135 a component).
136 """
137 return TryComponentParents(
138 Chain(
139 UseSkyMap(),
140 ReadComponentsAstropyFits(),
141 ReadComponents(),
142 )
143 )
146class ReadComponents(ProjectionFinder):
147 """A `ProjectionFinder` implementation that reads ``wcs`` and ``bbox``
148 from datasets that have them (e.g. ``Exposure``).
150 Notes
151 -----
152 This should usually be the final finder attempted in any chain; it's the
153 one most likely to work, but in many cases will not be the most efficient
154 or yield the most accurate WCS.
155 """
157 def find_projection(
158 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
159 ) -> tuple[SkyWcs, Box2I] | None:
160 # Docstring inherited.
161 if {"wcs", "bbox"}.issubset(ref.datasetType.storageClass.allComponents().keys()):
162 logger = logger if logger is not None else _LOG
163 with time_this(_LOG, msg="Read projection info from butler components", level=_TIMER_LOG_LEVEL):
164 wcs = butler.get(ref.makeComponentRef("wcs"))
165 bbox = butler.get(ref.makeComponentRef("bbox"))
166 return wcs, bbox
167 return None
170class ReadComponentsAstropyFits(ProjectionFinder):
171 """A `ProjectionFinder` implementation that reads ``wcs`` and ``bbox``
172 from datasets that have them (e.g. ``Exposure``) and assumes there is
173 a WCS associated with an IMAGE HDU in a FITS file.
175 Notes
176 -----
177 This might be more efficient for remote datasets where a full file download
178 is needed for butler to work using AFW.
179 """
181 def find_projection(
182 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
183 ) -> tuple[SkyWcs, Box2I] | None:
184 # Docstring inherited.
185 logger = logger if logger is not None else _LOG
186 with time_this(logger, msg="Read projection info using Astropy", level=_TIMER_LOG_LEVEL):
187 if {"wcs", "bbox"}.issubset(ref.datasetType.storageClass.allComponents().keys()):
188 try:
189 fs, fspath = butler.getURI(ref).to_fsspec()
190 with (
191 fs.open(fspath) as f,
192 astropy.io.fits.open(f) as fits_obj,
193 ):
194 # Look for first pixel HDU.
195 pixel_components = {"mask", "image", "variance"}
196 for i, hdu in enumerate(fits_obj):
197 if i == 0:
198 # Assumes WCS is in the IMAGE extension
199 # and not stored in the primary.
200 continue
201 hdr = hdu.header
202 extname = hdr.get("EXTNAME")
203 if extname and extname.lower() in pixel_components:
204 shape = hdu.shape
205 dimensions = lsst.geom.Extent2I(shape[1], shape[0])
206 pl = PropertyList()
207 pl.update(hdr)
208 # XY0 is defined in the A WCS.
209 xy0 = getImageXY0FromMetadata(pl, "A", strip=False)
210 bbox = lsst.geom.Box2I(xy0, dimensions)
211 wcs = makeSkyWcs(pl)
212 return wcs, bbox
213 except Exception:
214 # Any failure and we will try the next option.
215 pass
216 return None
219class TryComponentParents(ProjectionFinder):
220 """A composite `ProjectionFinder` that walks from component dataset to its
221 parent composite until its nested finder succeeds.
223 Parameters
224 ----------
225 nested : `ProjectionFinder`
226 Nested finder to delegate to.
228 Notes
229 -----
230 This is a good choice for the outermost composite finder, so that the same
231 sequence of nested rules are applied to each level of the
232 component-composite tree.
233 """
235 def __init__(self, nested: ProjectionFinder):
236 self._nested = nested
238 def find_projection(
239 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
240 ) -> tuple[SkyWcs, Box2I] | None:
241 # Docstring inherited.
242 while True:
243 if (result := self._nested.find_projection(ref, butler, logger=logger)) is not None:
244 return result
245 if ref.isComponent():
246 ref = ref.makeCompositeRef()
247 else:
248 return None
251class UseSkyMap(ProjectionFinder):
252 """A `ProjectionFinder` implementation that reads and caches
253 `lsst.skymap.BaseSkyMap` instances, allowing projections for coadds to be
254 found without requiring I/O for each one.
256 Parameters
257 ----------
258 dataset_type_name : `str`, optional
259 Name of the dataset type used to load `BaseSkyMap` instances.
260 collections : `Iterable` [ `str` ]
261 Collection search path for skymap datasets.
263 Notes
264 -----
265 This finder assumes any dataset with ``patch`` or ``tract`` dimensions
266 should get its WCS and bounding box from the skymap, and that datasets
267 without these dimensions never should (i.e. `find_projection` will return
268 `None` for these).
270 `BaseSkyMap` instances are never removed from the cache after being loaded;
271 we expect the number of distinct skymaps to be very small.
272 """
274 def __init__(self, dataset_type_name: str = "skyMap", collections: Iterable[str] = ("skymaps",)):
275 self._dataset_type_name = dataset_type_name
276 self._collections = tuple(collections)
277 self._cache: dict[str, BaseSkyMap] = {}
279 def find_projection(
280 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
281 ) -> tuple[SkyWcs, Box2I] | None:
282 # Docstring inherited.
283 if "tract" in ref.dataId.dimensions:
284 assert "skymap" in ref.dataId.dimensions, "Guaranteed by expected dimension schema."
285 if (skymap := self._cache.get(cast(str, ref.dataId["skymap"]))) is None:
286 skymap = butler.get(
287 self._dataset_type_name, skymap=ref.dataId["skymap"], collections=self._collections
288 )
289 tractInfo = skymap[ref.dataId["tract"]]
290 if "patch" in ref.dataId.dimensions:
291 patchInfo = tractInfo[ref.dataId["patch"]]
292 return patchInfo.wcs, patchInfo.outer_bbox
293 else:
294 return tractInfo.wcs, tractInfo.bbox
295 return None
298class Chain(ProjectionFinder):
299 """A composite `ProjectionFinder` that attempts each finder in a sequence
300 until one succeeds.
302 Parameters
303 ----------
304 *nested : `ProjectionFinder`
305 Nested finders to delegate to, in order.
306 """
308 def __init__(self, *nested: ProjectionFinder):
309 self._nested = tuple(nested)
311 def find_projection(
312 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
313 ) -> tuple[SkyWcs, Box2I] | None:
314 # Docstring inherited.
315 for f in self._nested:
316 if (result := f.find_projection(ref, butler, logger=logger)) is not None:
317 return result
318 return None
321class MatchDatasetTypeName(ProjectionFinder):
322 """A composite `ProjectionFinder` that delegates to different nested
323 finders based on whether the dataset type name matches a regular
324 expression.
326 Parameters
327 ----------
328 regex : `str`
329 Regular expression the dataset type name must match (in full).
330 on_match : `ProjectionFinder`, optional
331 Finder to try when the match succeeds, or `None` to return `None`.
332 otherwise : `ProjectionFinder`, optional
333 Finder to try when the match does not succeed, or `None` to return
334 `None`.
335 """
337 def __init__(
338 self,
339 regex: str,
340 on_match: ProjectionFinder | None = None,
341 otherwise: ProjectionFinder | None = None,
342 ):
343 self._regex = re.compile(regex)
344 self._on_match = on_match
345 self._otherwise = otherwise
347 def find_projection(
348 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None
349 ) -> tuple[SkyWcs, Box2I] | None:
350 # Docstring inherited.
351 if self._regex.match(ref.datasetType.name):
352 if self._on_match is not None:
353 return self._on_match.find_projection(ref, butler, logger=logger)
354 else:
355 if self._otherwise is not None:
356 return self._otherwise.find_projection(ref, butler, logger=logger)
357 return None