Coverage for python / lsst / meas / algorithms / stamps.py: 22%

191 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:32 +0000

1# This file is part of meas_algorithms. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Collection of small images (stamps).""" 

23 

24__all__ = ["Stamp", "Stamps", "StampsBase", "writeFits", "readFitsWithOptions"] 

25 

26import abc 

27from collections.abc import Sequence 

28import copy 

29from dataclasses import dataclass, field, fields 

30 

31import numpy as np 

32from lsst.afw.fits import Fits, readMetadata 

33from lsst.afw.image import ImageFitsReader, MaskedImage, MaskedImageF, MaskFitsReader 

34from lsst.afw.table.io import InputArchive, OutputArchive, Persistable 

35from lsst.daf.base import PropertyList 

36from lsst.geom import Angle, Box2I, Extent2I, Point2I, SpherePoint, degrees 

37from lsst.utils import doImport 

38from lsst.utils.introspection import get_full_type_name 

39 

40 

41def writeFits(filename, stamps, metadata, type_name, write_mask, write_variance, write_archive=False): 

42 """Write a single FITS file containing all stamps. 

43 

44 Parameters 

45 ---------- 

46 filename : `str` 

47 A string indicating the output filename 

48 stamps : iterable of `BaseStamp` 

49 An iterable of Stamp objects 

50 metadata : `PropertyList` 

51 A collection of key, value metadata pairs to be 

52 written to the primary header 

53 type_name : `str` 

54 Python type name of the StampsBase subclass to use 

55 write_mask : `bool` 

56 Write the mask data to the output file? 

57 write_variance : `bool` 

58 Write the variance data to the output file? 

59 write_archive : `bool`, optional 

60 Write an archive to store Persistables along with each stamp? 

61 Default: ``False``. 

62 """ 

63 metadata["HAS_MASK"] = write_mask 

64 metadata["HAS_VARIANCE"] = write_variance 

65 metadata["HAS_ARCHIVE"] = write_archive 

66 metadata["N_STAMPS"] = len(stamps) 

67 metadata["STAMPCLS"] = type_name 

68 # Record version number in case of future code changes 

69 metadata["VERSION"] = 1 

70 # create primary HDU with global metadata 

71 fitsFile = Fits(filename, "w") 

72 fitsFile.createEmpty() 

73 # Store Persistables in an OutputArchive and write it 

74 if write_archive: 

75 oa = OutputArchive() 

76 archive_ids = [oa.put(stamp.archive_element) for stamp in stamps] 

77 metadata["ARCHIVE_IDS"] = archive_ids 

78 fitsFile.writeMetadata(metadata) 

79 oa.writeFits(fitsFile) 

80 else: 

81 fitsFile.writeMetadata(metadata) 

82 fitsFile.closeFile() 

83 # add all pixel data optionally writing mask and variance information 

84 for i, stamp in enumerate(stamps): 

85 metadata = PropertyList() 

86 # EXTVER should be 1-based, the index from enumerate is 0-based 

87 metadata.update({"EXTVER": i + 1, "EXTNAME": "IMAGE"}) 

88 stamp.stamp_im.getImage().writeFits(filename, metadata=metadata, mode="a") 

89 if write_mask: 

90 metadata = PropertyList() 

91 metadata.update({"EXTVER": i + 1, "EXTNAME": "MASK"}) 

92 stamp.stamp_im.getMask().writeFits(filename, metadata=metadata, mode="a") 

93 if write_variance: 

94 metadata = PropertyList() 

95 metadata.update({"EXTVER": i + 1, "EXTNAME": "VARIANCE"}) 

96 stamp.stamp_im.getVariance().writeFits(filename, metadata=metadata, mode="a") 

97 return None 

98 

99 

100def readFitsWithOptions(filename, stamp_factory, options): 

101 """Read stamps from FITS file, allowing for only a subregion of the stamps 

102 to be read. 

103 

104 Parameters 

105 ---------- 

106 filename : `str` 

107 A string indicating the file to read 

108 stamp_factory : classmethod 

109 A factory function defined on a dataclass for constructing 

110 stamp objects a la `~lsst.meas.algorithm.Stamp` 

111 options : `PropertyList` or `dict` 

112 A collection of parameters. If it contains a bounding box 

113 (``bbox`` key), or if certain other keys (``llcX``, ``llcY``, 

114 ``width``, ``height``) are available for one to be constructed, 

115 the bounding box is passed to the ``FitsReader`` in order to 

116 return a sub-image. 

117 

118 Returns 

119 ------- 

120 stamps : `list` of dataclass objects like `Stamp`, PropertyList 

121 A tuple of a list of `Stamp`-like objects 

122 metadata : `PropertyList` 

123 The metadata 

124 

125 Notes 

126 ----- 

127 The data are read using the data type expected by the 

128 `~lsst.afw.image.MaskedImage` class attached to the `AbstractStamp` 

129 dataclass associated with the factory method. 

130 """ 

131 # extract necessary info from metadata 

132 metadata = readMetadata(filename, hdu=0) 

133 nStamps = metadata["N_STAMPS"] 

134 has_archive = metadata["HAS_ARCHIVE"] 

135 if has_archive: 

136 archive_ids = metadata.getArray("ARCHIVE_IDS") 

137 with Fits(filename, "r") as f: 

138 nExtensions = f.countHdus() 

139 # check if a bbox was provided 

140 kwargs = {} 

141 if options: 

142 # gen3 API 

143 if "bbox" in options.keys(): 

144 kwargs["bbox"] = options["bbox"] 

145 # gen2 API 

146 elif "llcX" in options.keys(): 

147 llcX = options["llcX"] 

148 llcY = options["llcY"] 

149 width = options["width"] 

150 height = options["height"] 

151 bbox = Box2I(Point2I(llcX, llcY), Extent2I(width, height)) 

152 kwargs["bbox"] = bbox 

153 stamp_parts = {} 

154 

155 # Determine the dtype from the factory. This allows a Stamp class 

156 # to be defined in terms of MaskedImageD or MaskedImageI without 

157 # forcing everything to floats. 

158 masked_image_cls = None 

159 for stamp_field in fields(stamp_factory.__self__): 

160 if issubclass(stamp_field.type, MaskedImage): 

161 masked_image_cls = stamp_field.type 

162 break 

163 else: 

164 raise RuntimeError("Stamp factory does not use MaskedImage.") 

165 default_dtype = np.dtype(masked_image_cls.dtype) 

166 variance_dtype = np.dtype(np.float32) # Variance is always the same type. 

167 

168 # We need to be careful because nExtensions includes the primary HDU. 

169 stamp_metadata = [] 

170 for idx in range(nExtensions - 1): 

171 dtype = None 

172 _metadata = copy.copy(metadata) 

173 md = readMetadata(filename, hdu=idx + 1) 

174 _metadata.update(md) 

175 stamp_metadata.append(_metadata) 

176 # Skip binary tables that aren't images or archives. 

177 if md["XTENSION"] == "BINTABLE" and not ("ZIMAGE" in md and md["ZIMAGE"]): 

178 if md["EXTNAME"] != "ARCHIVE_INDEX": 

179 continue 

180 if md["EXTNAME"] in ("IMAGE", "VARIANCE"): 

181 reader = ImageFitsReader(filename, hdu=idx + 1) 

182 if md["EXTNAME"] == "VARIANCE": 

183 dtype = variance_dtype 

184 else: 

185 dtype = default_dtype 

186 elif md["EXTNAME"] == "MASK": 

187 reader = MaskFitsReader(filename, hdu=idx + 1) 

188 elif md["EXTNAME"] == "ARCHIVE_INDEX": 

189 f.setHdu(idx + 1) 

190 archive = InputArchive.readFits(f) 

191 continue 

192 elif md["EXTTYPE"] == "ARCHIVE_DATA": 

193 continue 

194 else: 

195 raise ValueError(f"Unknown extension type: {md['EXTNAME']}") 

196 stamp_parts.setdefault(md["EXTVER"], {})[md["EXTNAME"].lower()] = reader.read(dtype=dtype, 

197 **kwargs) 

198 if len(stamp_parts) != nStamps: 

199 raise ValueError( 

200 f"Number of stamps read ({len(stamp_parts)}) does not agree with the " 

201 f"number of stamps recorded in the metadata ({nStamps})." 

202 ) 

203 # construct stamps themselves 

204 stamps = [] 

205 for k in range(nStamps): 

206 # Need to increment by one since EXTVER starts at 1 

207 maskedImage = masked_image_cls(**stamp_parts[k + 1]) 

208 archive_element = archive.get(archive_ids[k]) if has_archive else None 

209 stamps.append(stamp_factory(maskedImage, stamp_metadata[k], k, archive_element)) 

210 

211 return stamps, metadata 

212 

213 

214@dataclass 

215class AbstractStamp(abc.ABC): 

216 """Single abstract stamp. 

217 

218 Parameters 

219 ---------- 

220 Inherit from this class to add metadata to the stamp. 

221 """ 

222 

223 @classmethod 

224 @abc.abstractmethod 

225 def factory(cls, stamp_im, metadata, index, archive_element=None): 

226 """This method is needed to service the FITS reader. We need a standard 

227 interface to construct objects like this. Parameters needed to 

228 construct this object are passed in via a metadata dictionary and then 

229 passed to the constructor of this class. 

230 

231 Parameters 

232 ---------- 

233 stamp : `~lsst.afw.image.MaskedImage` 

234 Pixel data to pass to the constructor 

235 metadata : `dict` 

236 Dictionary containing the information 

237 needed by the constructor. 

238 idx : `int` 

239 Index into the lists in ``metadata`` 

240 archive_element : `~lsst.afw.table.io.Persistable`, optional 

241 Archive element (e.g. Transform or WCS) associated with this stamp. 

242 

243 Returns 

244 ------- 

245 stamp : `AbstractStamp` 

246 An instance of this class 

247 """ 

248 raise NotImplementedError 

249 

250 

251def _default_position(): 

252 # SpherePoint is nominally mutable in C++ so we must use a factory 

253 # and return an entirely new SpherePoint each time a Stamps is created. 

254 return SpherePoint(Angle(np.nan), Angle(np.nan)) 

255 

256 

257@dataclass 

258class Stamp(AbstractStamp): 

259 """Single stamp. 

260 

261 Parameters 

262 ---------- 

263 stamp_im : `~lsst.afw.image.MaskedImageF` 

264 The actual pixel values for the postage stamp. 

265 archive_element : `~lsst.afw.table.io.Persistable` or `None`, optional 

266 Archive element (e.g. Transform or WCS) associated with this stamp. 

267 position : `~lsst.geom.SpherePoint` or `None`, optional 

268 Position of the center of the stamp. Note the user must keep track of 

269 the coordinate system. 

270 """ 

271 

272 stamp_im: MaskedImageF 

273 archive_element: Persistable | None = None 

274 position: SpherePoint | None = field(default_factory=_default_position) 

275 metadata: PropertyList | None = None 

276 

277 @classmethod 

278 def factory(cls, stamp_im, metadata, index, archive_element=None): 

279 """This method is needed to service the FITS reader. We need a standard 

280 interface to construct objects like this. Parameters needed to 

281 construct this object are passed in via a metadata dictionary and then 

282 passed to the constructor of this class. If lists of values are passed 

283 with the following keys, they will be passed to the constructor, 

284 otherwise dummy values will be passed: RA_DEG, DEC_DEG. They should 

285 each point to lists of values. 

286 

287 Parameters 

288 ---------- 

289 stamp : `~lsst.afw.image.MaskedImage` 

290 Pixel data to pass to the constructor 

291 metadata : `PropertyList` 

292 A collection of key, value metadata pairs containing the 

293 information needed by the constructor. 

294 idx : `int` 

295 Index into the lists in ``metadata`` 

296 archive_element : `~lsst.afw.table.io.Persistable`, optional 

297 Archive element (e.g. Transform or WCS) associated with this stamp. 

298 

299 Returns 

300 ------- 

301 stamp : `Stamp` 

302 An instance of this class 

303 """ 

304 if "RA_DEG" in metadata and "DEC_DEG" in metadata: 

305 return cls( 

306 stamp_im=stamp_im, 

307 archive_element=archive_element, 

308 position=SpherePoint( 

309 Angle(metadata.getArray("RA_DEG")[index], degrees), 

310 Angle(metadata.getArray("DEC_DEG")[index], degrees), 

311 ), 

312 metadata=metadata, 

313 ) 

314 else: 

315 return cls( 

316 stamp_im=stamp_im, 

317 archive_element=archive_element, 

318 position=SpherePoint(Angle(np.nan), Angle(np.nan)), 

319 metadata=metadata, 

320 ) 

321 

322 

323class StampsBase(abc.ABC, Sequence): 

324 """Collection of stamps and associated metadata. 

325 

326 Parameters 

327 ---------- 

328 stamps : iterable 

329 This should be an iterable of dataclass objects 

330 a la ``~lsst.meas.algorithms.Stamp``. 

331 metadata : `~lsst.daf.base.PropertyList`, optional 

332 Metadata associated with the objects within the stamps. 

333 use_mask : `bool`, optional 

334 If ``True`` read and write the mask data. Default ``True``. 

335 use_variance : `bool`, optional 

336 If ``True`` read and write the variance data. Default ``True``. 

337 use_archive : `bool`, optional 

338 If ``True``, read and write an Archive that contains a Persistable 

339 associated with each stamp, for example a Transform or a WCS. 

340 Default ``False``. 

341 

342 Notes 

343 ----- 

344 A butler can be used to read only a part of the stamps, 

345 specified by a bbox: 

346 

347 >>> starSubregions = butler.get( 

348 "brightStarStamps", 

349 dataId, 

350 parameters={"bbox": bbox} 

351 ) 

352 """ 

353 

354 def __init__(self, stamps, metadata=None, use_mask=True, use_variance=True, use_archive=False): 

355 for stamp in stamps: 

356 if not isinstance(stamp, AbstractStamp): 

357 raise ValueError(f"The entries in stamps must inherit from AbstractStamp. Got {type(stamp)}.") 

358 self._stamps = stamps 

359 self._metadata = PropertyList() if metadata is None else metadata.deepCopy() 

360 self.use_mask = use_mask 

361 self.use_variance = use_variance 

362 self.use_archive = use_archive 

363 

364 @classmethod 

365 def readFits(cls, filename): 

366 """Build an instance of this class from a file. 

367 

368 Parameters 

369 ---------- 

370 filename : `str` 

371 Name of the file to read 

372 """ 

373 

374 return cls.readFitsWithOptions(filename, None) 

375 

376 @classmethod 

377 def readFitsWithOptions(cls, filename, options): 

378 """Build an instance of this class with options. 

379 

380 Parameters 

381 ---------- 

382 filename : `str` 

383 Name of the file to read 

384 options : `PropertyList` 

385 Collection of metadata parameters 

386 """ 

387 # To avoid problems since this is no longer an abstract method. 

388 # TO-DO: Consider refactoring this method. This class check was added 

389 # to allow the butler formatter to use a generic type but still end up 

390 # giving the correct type back, ensuring that the abstract base class 

391 # is not used by mistake. Perhaps this logic can be optimised. 

392 if cls is not StampsBase: 

393 raise NotImplementedError(f"Please implement specific FITS reader for class {cls}") 

394 

395 # Load metadata to get class 

396 metadata = readMetadata(filename, hdu=0) 

397 type_name = metadata.get("STAMPCLS") 

398 if type_name is None: 

399 raise RuntimeError( 

400 f"No class name in file {filename}. Unable to instantiate correct stamps subclass. " 

401 "Is this an old version format Stamps file?" 

402 ) 

403 

404 # Import class and override `cls` 

405 stamp_type = doImport(type_name) 

406 cls = stamp_type 

407 

408 return cls.readFitsWithOptions(filename, options) 

409 

410 @abc.abstractmethod 

411 def _refresh_metadata(self): 

412 """Make sure metadata is up to date, as this object can be extended.""" 

413 raise NotImplementedError 

414 

415 def writeFits(self, filename): 

416 """Write this object to a file. 

417 

418 Parameters 

419 ---------- 

420 filename : `str` 

421 Name of file to write. 

422 """ 

423 self._refresh_metadata() 

424 type_name = get_full_type_name(self) 

425 writeFits( 

426 filename, 

427 self._stamps, 

428 self._metadata, 

429 type_name, 

430 self.use_mask, 

431 self.use_variance, 

432 self.use_archive, 

433 ) 

434 

435 def __len__(self): 

436 return len(self._stamps) 

437 

438 def __getitem__(self, index): 

439 return self._stamps[index] 

440 

441 def __iter__(self): 

442 return iter(self._stamps) 

443 

444 def getMaskedImages(self): 

445 """Retrieve star images. 

446 

447 Returns 

448 ------- 

449 maskedImages : 

450 `list` [`~lsst.afw.image.MaskedImageF`] 

451 """ 

452 return [stamp.stamp_im for stamp in self._stamps] 

453 

454 def getArchiveElements(self): 

455 """Retrieve archive elements associated with each stamp. 

456 

457 Returns 

458 ------- 

459 archiveElements : 

460 `list` [`~lsst.afw.table.io.Persistable`] 

461 """ 

462 return [stamp.archive_element for stamp in self._stamps] 

463 

464 @property 

465 def metadata(self): 

466 return self._metadata 

467 

468 

469class Stamps(StampsBase): 

470 def _refresh_metadata(self): 

471 positions = self.getPositions() 

472 self._metadata["RA_DEG"] = [p.getRa().asDegrees() for p in positions] 

473 self._metadata["DEC_DEG"] = [p.getDec().asDegrees() for p in positions] 

474 

475 def getPositions(self): 

476 return [s.position for s in self._stamps] 

477 

478 def append(self, item): 

479 """Add an additional stamp. 

480 

481 Parameters 

482 ---------- 

483 item : `Stamp` 

484 Stamp object to append. 

485 """ 

486 if not isinstance(item, Stamp): 

487 raise ValueError("Objects added must be a Stamp object.") 

488 self._stamps.append(item) 

489 return None 

490 

491 def extend(self, stamp_list): 

492 """Extend Stamps instance by appending elements from another instance. 

493 

494 Parameters 

495 ---------- 

496 stamps_list : `list` [`Stamp`] 

497 List of Stamp object to append. 

498 """ 

499 for s in stamp_list: 

500 if not isinstance(s, Stamp): 

501 raise ValueError("Can only extend with Stamp objects") 

502 self._stamps += stamp_list 

503 

504 @classmethod 

505 def readFits(cls, filename): 

506 """Build an instance of this class from a file. 

507 

508 Parameters 

509 ---------- 

510 filename : `str` 

511 Name of the file to read. 

512 

513 Returns 

514 ------- 

515 object : `Stamps` 

516 An instance of this class. 

517 """ 

518 return cls.readFitsWithOptions(filename, None) 

519 

520 @classmethod 

521 def readFitsWithOptions(cls, filename, options): 

522 """Build an instance of this class with options. 

523 

524 Parameters 

525 ---------- 

526 filename : `str` 

527 Name of the file to read. 

528 options : `PropertyList` or `dict` 

529 Collection of metadata parameters. 

530 

531 Returns 

532 ------- 

533 object : `Stamps` 

534 An instance of this class. 

535 """ 

536 stamps, metadata = readFitsWithOptions(filename, Stamp.factory, options) 

537 return cls( 

538 stamps, 

539 metadata=metadata, 

540 use_mask=metadata["HAS_MASK"], 

541 use_variance=metadata["HAS_VARIANCE"], 

542 use_archive=metadata["HAS_ARCHIVE"], 

543 )