Coverage for python / lsst / dax / images / cutout / projection_finders.py: 27%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:52 +0000

1# This file is part of dax_images_cutout. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "Chain", 

26 "MatchDatasetTypeName", 

27 "ProjectionFinder", 

28 "ReadComponents", 

29 "ReadComponentsAstropyFits", 

30 "TryComponentParents", 

31 "UseSkyMap", 

32) 

33 

34import logging 

35import re 

36from abc import ABC, abstractmethod 

37from collections.abc import Iterable 

38from typing import cast 

39 

40import astropy.io.fits 

41 

42import lsst.geom 

43from lsst.afw.geom import SkyWcs, getImageXY0FromMetadata, makeSkyWcs 

44from lsst.daf.base import PropertyList 

45from lsst.daf.butler import Butler, DatasetRef 

46from lsst.geom import Box2I 

47from lsst.skymap import BaseSkyMap 

48from lsst.utils.timer import time_this 

49 

50_LOG = logging.getLogger(__name__) 

51_TIMER_LOG_LEVEL = logging.INFO 

52 

53 

54class ProjectionFinder(ABC): 

55 """An interface for objects that can find the WCS and bounding box of a 

56 butler dataset. 

57 

58 Notes 

59 ----- 

60 Concrete `ProjectionFinder` implementations are intended to be composed to 

61 define rules for how to find this projection information for a particular 

62 dataset type; a finder that cannot handle a particular `DatasetRef` should 

63 implement `find_projection` to return `None`, and implementations that 

64 compose (e.g. `Chain`) can use this to decide when to try another nested 

65 finder. 

66 """ 

67 

68 @abstractmethod 

69 def find_projection( 

70 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

71 ) -> tuple[SkyWcs, Box2I] | None: 

72 """Run the finder on the given dataset with the given butler. 

73 

74 Parameters 

75 ---------- 

76 ref : `DatasetRef` 

77 Fully-resolved reference to the dataset. 

78 butler : `Butler` 

79 Butler client to use for reads. Need not support writes, and any 

80 default search collections will be ignored. 

81 logger : `logging.Logger`, optional 

82 Logger to use for timing messages. If `None`, a default logger 

83 will be used. 

84 

85 Returns 

86 ------- 

87 wcs : `SkyWcs` (only if result is not `None`) 

88 Mapping from sky to pixel coordinates for this dataset. 

89 bbox : `Box2I` (only if result is not `None`) 

90 Bounding box of the image dataset (or an image closely associated 

91 with the dataset) in pixel coordinates. 

92 """ 

93 raise NotImplementedError() 

94 

95 def __call__( 

96 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

97 ) -> tuple[SkyWcs, Box2I]: 

98 """Call `find_projection` but raise `LookupError` when no projection 

99 information is found. 

100 

101 Parameters 

102 ---------- 

103 ref : `DatasetRef` 

104 Fully-resolved reference to the dataset. 

105 butler : `Butler` 

106 Butler client to use for reads. Need not support writes, and any 

107 default search collections will be ignored. 

108 logger : `logging.Logger`, optional 

109 Logger to use for timing messages. If `None`, a default logger 

110 will be used. 

111 

112 Returns 

113 ------- 

114 wcs : `SkyWcs` 

115 Mapping from sky to pixel coordinates for this dataset. 

116 bbox : `Box2I` 

117 Bounding box of the image dataset (or an image closely associated 

118 with the dataset) in pixel coordinates. 

119 """ 

120 result = self.find_projection(ref, butler, logger=logger) 

121 if result is None: 

122 raise LookupError(f"No way to obtain WCS and bounding box information for ref {ref}.") 

123 return result 

124 

125 @staticmethod 

126 def make_default() -> ProjectionFinder: 

127 """Return a concrete finder appropriate for most pipelines. 

128 

129 Returns 

130 ------- 

131 finder : `ProjectionFinder` 

132 A finder that prefers to read, use, and cache a skymap when the 

133 data ID includes tract or patch, and falls back to reading the WCS 

134 and bbox from the dataset itself (or its parent, if the dataset is 

135 a component). 

136 """ 

137 return TryComponentParents( 

138 Chain( 

139 UseSkyMap(), 

140 ReadComponentsAstropyFits(), 

141 ReadComponents(), 

142 ) 

143 ) 

144 

145 

146class ReadComponents(ProjectionFinder): 

147 """A `ProjectionFinder` implementation that reads ``wcs`` and ``bbox`` 

148 from datasets that have them (e.g. ``Exposure``). 

149 

150 Notes 

151 ----- 

152 This should usually be the final finder attempted in any chain; it's the 

153 one most likely to work, but in many cases will not be the most efficient 

154 or yield the most accurate WCS. 

155 """ 

156 

157 def find_projection( 

158 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

159 ) -> tuple[SkyWcs, Box2I] | None: 

160 # Docstring inherited. 

161 if {"wcs", "bbox"}.issubset(ref.datasetType.storageClass.allComponents().keys()): 

162 logger = logger if logger is not None else _LOG 

163 with time_this(_LOG, msg="Read projection info from butler components", level=_TIMER_LOG_LEVEL): 

164 wcs = butler.get(ref.makeComponentRef("wcs")) 

165 bbox = butler.get(ref.makeComponentRef("bbox")) 

166 return wcs, bbox 

167 return None 

168 

169 

170class ReadComponentsAstropyFits(ProjectionFinder): 

171 """A `ProjectionFinder` implementation that reads ``wcs`` and ``bbox`` 

172 from datasets that have them (e.g. ``Exposure``) and assumes there is 

173 a WCS associated with an IMAGE HDU in a FITS file. 

174 

175 Notes 

176 ----- 

177 This might be more efficient for remote datasets where a full file download 

178 is needed for butler to work using AFW. 

179 """ 

180 

181 def find_projection( 

182 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

183 ) -> tuple[SkyWcs, Box2I] | None: 

184 # Docstring inherited. 

185 logger = logger if logger is not None else _LOG 

186 with time_this(logger, msg="Read projection info using Astropy", level=_TIMER_LOG_LEVEL): 

187 if {"wcs", "bbox"}.issubset(ref.datasetType.storageClass.allComponents().keys()): 

188 try: 

189 fs, fspath = butler.getURI(ref).to_fsspec() 

190 with ( 

191 fs.open(fspath) as f, 

192 astropy.io.fits.open(f) as fits_obj, 

193 ): 

194 # Look for first pixel HDU. 

195 pixel_components = {"mask", "image", "variance"} 

196 for i, hdu in enumerate(fits_obj): 

197 if i == 0: 

198 # Assumes WCS is in the IMAGE extension 

199 # and not stored in the primary. 

200 continue 

201 hdr = hdu.header 

202 extname = hdr.get("EXTNAME") 

203 if extname and extname.lower() in pixel_components: 

204 shape = hdu.shape 

205 dimensions = lsst.geom.Extent2I(shape[1], shape[0]) 

206 pl = PropertyList() 

207 pl.update(hdr) 

208 # XY0 is defined in the A WCS. 

209 xy0 = getImageXY0FromMetadata(pl, "A", strip=False) 

210 bbox = lsst.geom.Box2I(xy0, dimensions) 

211 wcs = makeSkyWcs(pl) 

212 return wcs, bbox 

213 except Exception: 

214 # Any failure and we will try the next option. 

215 pass 

216 return None 

217 

218 

219class TryComponentParents(ProjectionFinder): 

220 """A composite `ProjectionFinder` that walks from component dataset to its 

221 parent composite until its nested finder succeeds. 

222 

223 Parameters 

224 ---------- 

225 nested : `ProjectionFinder` 

226 Nested finder to delegate to. 

227 

228 Notes 

229 ----- 

230 This is a good choice for the outermost composite finder, so that the same 

231 sequence of nested rules are applied to each level of the 

232 component-composite tree. 

233 """ 

234 

235 def __init__(self, nested: ProjectionFinder): 

236 self._nested = nested 

237 

238 def find_projection( 

239 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

240 ) -> tuple[SkyWcs, Box2I] | None: 

241 # Docstring inherited. 

242 while True: 

243 if (result := self._nested.find_projection(ref, butler, logger=logger)) is not None: 

244 return result 

245 if ref.isComponent(): 

246 ref = ref.makeCompositeRef() 

247 else: 

248 return None 

249 

250 

251class UseSkyMap(ProjectionFinder): 

252 """A `ProjectionFinder` implementation that reads and caches 

253 `lsst.skymap.BaseSkyMap` instances, allowing projections for coadds to be 

254 found without requiring I/O for each one. 

255 

256 Parameters 

257 ---------- 

258 dataset_type_name : `str`, optional 

259 Name of the dataset type used to load `BaseSkyMap` instances. 

260 collections : `Iterable` [ `str` ] 

261 Collection search path for skymap datasets. 

262 

263 Notes 

264 ----- 

265 This finder assumes any dataset with ``patch`` or ``tract`` dimensions 

266 should get its WCS and bounding box from the skymap, and that datasets 

267 without these dimensions never should (i.e. `find_projection` will return 

268 `None` for these). 

269 

270 `BaseSkyMap` instances are never removed from the cache after being loaded; 

271 we expect the number of distinct skymaps to be very small. 

272 """ 

273 

274 def __init__(self, dataset_type_name: str = "skyMap", collections: Iterable[str] = ("skymaps",)): 

275 self._dataset_type_name = dataset_type_name 

276 self._collections = tuple(collections) 

277 self._cache: dict[str, BaseSkyMap] = {} 

278 

279 def find_projection( 

280 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

281 ) -> tuple[SkyWcs, Box2I] | None: 

282 # Docstring inherited. 

283 if "tract" in ref.dataId.dimensions: 

284 assert "skymap" in ref.dataId.dimensions, "Guaranteed by expected dimension schema." 

285 if (skymap := self._cache.get(cast(str, ref.dataId["skymap"]))) is None: 

286 skymap = butler.get( 

287 self._dataset_type_name, skymap=ref.dataId["skymap"], collections=self._collections 

288 ) 

289 tractInfo = skymap[ref.dataId["tract"]] 

290 if "patch" in ref.dataId.dimensions: 

291 patchInfo = tractInfo[ref.dataId["patch"]] 

292 return patchInfo.wcs, patchInfo.outer_bbox 

293 else: 

294 return tractInfo.wcs, tractInfo.bbox 

295 return None 

296 

297 

298class Chain(ProjectionFinder): 

299 """A composite `ProjectionFinder` that attempts each finder in a sequence 

300 until one succeeds. 

301 

302 Parameters 

303 ---------- 

304 *nested : `ProjectionFinder` 

305 Nested finders to delegate to, in order. 

306 """ 

307 

308 def __init__(self, *nested: ProjectionFinder): 

309 self._nested = tuple(nested) 

310 

311 def find_projection( 

312 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

313 ) -> tuple[SkyWcs, Box2I] | None: 

314 # Docstring inherited. 

315 for f in self._nested: 

316 if (result := f.find_projection(ref, butler, logger=logger)) is not None: 

317 return result 

318 return None 

319 

320 

321class MatchDatasetTypeName(ProjectionFinder): 

322 """A composite `ProjectionFinder` that delegates to different nested 

323 finders based on whether the dataset type name matches a regular 

324 expression. 

325 

326 Parameters 

327 ---------- 

328 regex : `str` 

329 Regular expression the dataset type name must match (in full). 

330 on_match : `ProjectionFinder`, optional 

331 Finder to try when the match succeeds, or `None` to return `None`. 

332 otherwise : `ProjectionFinder`, optional 

333 Finder to try when the match does not succeed, or `None` to return 

334 `None`. 

335 """ 

336 

337 def __init__( 

338 self, 

339 regex: str, 

340 on_match: ProjectionFinder | None = None, 

341 otherwise: ProjectionFinder | None = None, 

342 ): 

343 self._regex = re.compile(regex) 

344 self._on_match = on_match 

345 self._otherwise = otherwise 

346 

347 def find_projection( 

348 self, ref: DatasetRef, butler: Butler, logger: logging.Logger | None = None 

349 ) -> tuple[SkyWcs, Box2I] | None: 

350 # Docstring inherited. 

351 if self._regex.match(ref.datasetType.name): 

352 if self._on_match is not None: 

353 return self._on_match.find_projection(ref, butler, logger=logger) 

354 else: 

355 if self._otherwise is not None: 

356 return self._otherwise.find_projection(ref, butler, logger=logger) 

357 return None