Coverage for python / lsst / images / _image.py: 25%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:13 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import YX, Box 

34from ._transforms import Frame, Projection, ProjectionSerializationModel 

35from .serialization import ( 

36 ArchiveTree, 

37 ArrayReferenceModel, 

38 ArrayReferenceQuantityModel, 

39 InputArchive, 

40 MetadataValue, 

41 OutputArchive, 

42 no_header_updates, 

43) 

44from .utils import is_none 

45 

46 

47@final 

48class Image(GeneralizedImage): 

49 """A 2-d array that may be augmented with units and a nonzero origin. 

50 

51 Parameters 

52 ---------- 

53 array_or_fill 

54 Array or fill value for the image. If a fill value, ``bbox`` or 

55 ``shape`` must be provided. 

56 bbox 

57 Bounding box for the image. 

58 start 

59 Logical coordinates of the first pixel in the array, ordered ``y``, 

60 ``x`` (unless an `XY` instance is passed). Ignored if 

61 ``bbox`` is provided. Defaults to zeros. 

62 shape 

63 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

64 instance is passed). Only needed if ``array_or_fill`` is not an 

65 array and ``bbox`` is not provided. Like the bbox, this does not 

66 include the last dimension of the array. 

67 dtype 

68 Pixel data type override. 

69 unit 

70 Units for the image's pixel values. 

71 projection 

72 Projection that maps the pixel grid to the sky. 

73 obs_info 

74 General information about the associated observation in standardized 

75 form. 

76 metadata 

77 Arbitrary flexible metadata to associate with the image. 

78 

79 Notes 

80 ----- 

81 Indexing the `array` attribute of an `Image` does not take into account its 

82 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

83 `Box` does, and the `bbox` of the subimage is set to match its location 

84 within the original image. 

85 

86 Indexed assignment to a subimage requires consistency between the 

87 coordinate systems and units of both operands, but it will automatically 

88 select a subimage of the right-hand side and convert compatible units when 

89 possible. In other words:: 

90 

91 a[box] = b 

92 

93 is a shortcut for 

94 

95 a[box].quantity = b[box].quantity 

96 

97 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

98 image. 

99 """ 

100 

101 def __init__( 

102 self, 

103 array_or_fill: np.ndarray | int | float = 0, 

104 /, 

105 *, 

106 bbox: Box | None = None, 

107 start: Sequence[int] | None = None, 

108 shape: Sequence[int] | None = None, 

109 dtype: npt.DTypeLike | None = None, 

110 unit: astropy.units.UnitBase | None = None, 

111 projection: Projection[Any] | None = None, 

112 obs_info: ObservationInfo | None = None, 

113 metadata: dict[str, MetadataValue] | None = None, 

114 ): 

115 super().__init__(metadata) 

116 if isinstance(array_or_fill, np.ndarray): 

117 if dtype is not None: 

118 array = np.array(array_or_fill, dtype=dtype) 

119 else: 

120 array = array_or_fill 

121 if bbox is None: 

122 bbox = Box.from_shape(array.shape, start=start) 

123 elif bbox.shape != array.shape: 

124 raise ValueError( 

125 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

126 ) 

127 if shape is not None and shape != array.shape: 

128 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

129 else: 

130 if bbox is None: 

131 if shape is None: 

132 raise TypeError("No bbox, shape, or array provided.") 

133 bbox = Box.from_shape(shape, start=start) 

134 elif shape is not None and shape != bbox.shape: 

135 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

136 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

137 self._array: np.ndarray = array 

138 self._bbox: Box = bbox 

139 self._unit = unit 

140 self._projection = projection 

141 self._obs_info = obs_info 

142 

143 @property 

144 def array(self) -> np.ndarray: 

145 """The low-level array (`numpy.ndarray`). 

146 

147 Assigning to this attribute modifies the existing array in place; the 

148 bounding box and underlying data pointer are never changed. 

149 """ 

150 return self._array 

151 

152 @array.setter 

153 def array(self, value: np.ndarray | int | float) -> None: 

154 self._array[...] = value 

155 

156 @property 

157 def quantity(self) -> astropy.units.Quantity: 

158 """The low-level array with units (`astropy.units.Quantity`). 

159 

160 Assigning to this attribute modifies the existing array in place; the 

161 bounding box and underlying data pointer are never changed. 

162 """ 

163 return astropy.units.Quantity(self._array, self._unit, copy=False) 

164 

165 @quantity.setter 

166 def quantity(self, value: astropy.units.Quantity) -> None: 

167 self.quantity[...] = value 

168 

169 @property 

170 def bbox(self) -> Box: 

171 """Bounding box for the image (`Box`).""" 

172 return self._bbox 

173 

174 @property 

175 def unit(self) -> astropy.units.UnitBase | None: 

176 """Units for the image's pixel values (`astropy.units.Unit` or 

177 `None`). 

178 """ 

179 return self._unit 

180 

181 @property 

182 def projection(self) -> Projection[Any] | None: 

183 """The projection that maps this image's pixel grid to the sky 

184 (`Projection` | `None`). 

185 

186 Notes 

187 ----- 

188 The pixel coordinates used by this projection account for the bounding 

189 box ``start``; they are not just array indices. 

190 """ 

191 return self._projection 

192 

193 @property 

194 def obs_info(self) -> ObservationInfo | None: 

195 """General information about the associated observation in standard 

196 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

197 """ 

198 return self._obs_info 

199 

200 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

201 if bbox is ...: 

202 return self 

203 super().__getitem__(bbox) 

204 indices = bbox.slice_within(self._bbox) 

205 return self._transfer_metadata( 

206 Image( 

207 self._array[indices], 

208 bbox=bbox, 

209 unit=self._unit, 

210 projection=self._projection, 

211 obs_info=self._obs_info, 

212 ), 

213 bbox=bbox, 

214 ) 

215 

216 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

217 self[bbox].quantity[...] = value.quantity 

218 

219 def __str__(self) -> str: 

220 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

221 

222 def __repr__(self) -> str: 

223 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

224 

225 def __eq__(self, other: object) -> bool: 

226 if not isinstance(other, Image): 

227 return NotImplemented 

228 return ( 

229 self._bbox == other._bbox 

230 and self._unit == other._unit 

231 and np.array_equal(self._array, other._array, equal_nan=True) 

232 ) 

233 

234 def copy(self) -> Image: 

235 return self._transfer_metadata( 

236 Image( 

237 self._array.copy(), 

238 bbox=self._bbox, 

239 unit=self._unit, 

240 projection=self._projection, 

241 obs_info=self._obs_info, 

242 ), 

243 copy=True, 

244 ) 

245 

246 def view( 

247 self, 

248 *, 

249 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

250 projection: Projection | None | EllipsisType = ..., 

251 start: Sequence[int] | EllipsisType = ..., 

252 obs_info: ObservationInfo | None | EllipsisType = ..., 

253 ) -> Image: 

254 """Make a view of the image, with optional updates.""" 

255 if unit is ...: 

256 unit = self._unit 

257 if projection is ...: 

258 projection = self._projection 

259 if start is ...: 

260 start = self._bbox.start 

261 if obs_info is ...: 

262 obs_info = self._obs_info 

263 return self._transfer_metadata( 

264 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info) 

265 ) 

266 

267 def serialize[P: pydantic.BaseModel]( 

268 self, 

269 archive: OutputArchive[P], 

270 *, 

271 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

272 save_projection: bool = True, 

273 add_offset_wcs: str | None = "A", 

274 ) -> ImageSerializationModel[P]: 

275 """Serialize the image to an output archive. 

276 

277 Parameters 

278 ---------- 

279 archive 

280 Archive to write to. 

281 update_header 

282 A callback that will be given the FITS header for the HDU 

283 containing this image in order to add keys to it. This callback 

284 may be provided but will not be called if the output format is not 

285 FITS. 

286 save_projection 

287 If `True`, save the `Projection` attached to the image, if there 

288 is one. 

289 add_offset_wcs 

290 A FITS WCS single-character suffix to use when adding a linear 

291 WCS that maps the FITS array to the logical pixel coordinates 

292 defined by ``bbox.start``. Set to `None` to not write this WCS. 

293 """ 

294 if save_projection and add_offset_wcs == "": 

295 raise TypeError("save_projection=True is not compatible with add_offset_wcs=''.") 

296 

297 def _update_header(header: astropy.io.fits.Header) -> None: 

298 update_header(header) 

299 if self.unit is not None: 

300 header["BUNIT"] = self.unit.to_string(format="fits") 

301 if self.projection is not None: 

302 if self.fits_wcs: 

303 header.update(self.fits_wcs.to_header(relax=True)) 

304 if add_offset_wcs is not None: 

305 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

306 

307 ref = archive.add_array(self.array, update_header=_update_header) 

308 serialized_projection: ProjectionSerializationModel[P] | None = None 

309 if save_projection and self.projection is not None: 

310 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

311 if self.unit is None: 

312 return ImageSerializationModel.model_construct( 

313 data=ref, 

314 start=list(self.bbox.start), 

315 projection=serialized_projection, 

316 obs_info=self._obs_info, 

317 metadata=self.metadata, 

318 ) 

319 else: 

320 return ImageSerializationModel.model_construct( 

321 data=ArrayReferenceQuantityModel.model_construct(value=ref, unit=self.unit), 

322 start=list(self.bbox.start), 

323 projection=serialized_projection, 

324 obs_info=self._obs_info, 

325 metadata=self.metadata, 

326 ) 

327 

328 @staticmethod 

329 def deserialize( 

330 model: ImageSerializationModel[Any], 

331 archive: InputArchive[Any], 

332 *, 

333 bbox: Box | None = None, 

334 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

335 ) -> Image: 

336 """Deserialize an image from an input archive. 

337 

338 Parameters 

339 ---------- 

340 model 

341 A Pydantic model representation of the image, holding references 

342 to data stored in the archive. 

343 archive 

344 Archive to read from. 

345 bbox 

346 Bounding box of a subimage to read instead. 

347 strip_header 

348 A callable that strips out any FITS header cards added by the 

349 ``update_header`` argument in the corresponding call to 

350 `serialize`. 

351 """ 

352 ref: ArrayReferenceModel 

353 unit: astropy.units.UnitBase | None = None 

354 if isinstance(model.data, ArrayReferenceQuantityModel): 

355 ref = model.data.value 

356 unit = model.data.unit 

357 else: 

358 ref = model.data 

359 

360 def _strip_header(header: astropy.io.fits.Header) -> None: 

361 if unit is not None: 

362 header.pop("BUNIT", None) 

363 fits.strip_wcs_cards(header) 

364 strip_header(header) 

365 

366 slices = bbox.slice_within(model.bbox) if bbox is not None else ... 

367 array = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

368 projection = ( 

369 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

370 ) 

371 return Image( 

372 array, 

373 start=model.start if bbox is None else bbox.start, 

374 unit=unit, 

375 projection=projection, 

376 obs_info=model.obs_info, 

377 )._finish_deserialize(model) 

378 

379 @staticmethod 

380 def _get_archive_tree_type[P: pydantic.BaseModel]( 

381 pointer_type: type[P], 

382 ) -> type[ImageSerializationModel[P]]: 

383 """Return the serialization model type for this object for an archive 

384 type that uses the given pointer type. 

385 """ 

386 return ImageSerializationModel[pointer_type] # type: ignore 

387 

388 _archive_default_name: ClassVar[str] = "image" 

389 """The name this object should be serialized with when written as the 

390 top-level object. 

391 """ 

392 

393 def write_fits( 

394 self, 

395 filename: str, 

396 *, 

397 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

398 compression_seed: int | None = None, 

399 ) -> None: 

400 """Write the image to a FITS file. 

401 

402 Parameters 

403 ---------- 

404 filename 

405 Name of the file to write to. Must be a local file. 

406 compression 

407 Compression options. 

408 compression_seed 

409 A FITS tile compression seed to use whenever the configured 

410 compression seed is `None` or (for backwards compatibility) ``0``. 

411 """ 

412 compression_options = {} 

413 if compression is not fits.FitsCompressionOptions.DEFAULT: 

414 compression_options[self._archive_default_name] = compression 

415 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

416 

417 @staticmethod 

418 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

419 """Read an image from a FITS file. 

420 

421 Parameters 

422 ---------- 

423 url 

424 URL of the file to read; may be any type supported by 

425 `lsst.resources.ResourcePath`. 

426 bbox 

427 Bounding box of a subimage to read instead. 

428 """ 

429 return fits.read(Image, url, bbox=bbox).deserialized 

430 

431 @staticmethod 

432 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image: 

433 """Convert from an `lsst.afw.image.Image` instance. 

434 

435 Parameters 

436 ---------- 

437 legacy 

438 An `lsst.afw.image.Image` instance that will share pixel data with 

439 the returned object. 

440 unit 

441 Units of the image. 

442 """ 

443 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

444 

445 def to_legacy(self, *, copy: bool | None = None) -> Any: 

446 """Convert to an `lsst.afw.image.Image` instance. 

447 

448 Parameters 

449 ---------- 

450 copy 

451 If `True`, always copy the pixel data. If `False`, return a view, 

452 and raise `TypeError` if the pixel data is read-only (this is not 

453 supported by afw). If `None`, onyl if the pixel data is 

454 read-only. 

455 """ 

456 import lsst.afw.image 

457 import lsst.geom 

458 

459 array = self._array 

460 if copy: 

461 array = array.copy() 

462 elif not self._array.flags.writeable: 

463 if copy is None: 

464 array = array.copy() 

465 else: 

466 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

467 

468 return lsst.afw.image.Image( 

469 array, 

470 deep=False, 

471 dtype=array.dtype.type, 

472 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

473 ) 

474 

475 @staticmethod 

476 def read_legacy( 

477 uri: ResourcePathExpression, 

478 *, 

479 preserve_quantization: bool = False, 

480 ext: str | int = 1, 

481 fits_wcs_frame: Frame | None = None, 

482 ) -> Image: 

483 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

484 

485 Parameters 

486 ---------- 

487 uri 

488 URI or file name. 

489 preserve_quantization 

490 If `True`, ensure that writing the image back out again will 

491 exactly preserve quantization-compressed pixel values. This causes 

492 the arrays to be marked as read-only and stores the original binary 

493 table data for those planes in memory. If the `Image` is copied, 

494 the precompressed pixel values are not transferred to the copy. 

495 ext 

496 Name or index of the FITS HDU to read. 

497 fits_wcs_frame 

498 If not `None` and the HDU containing the image has a FITS WCS, 

499 attach a `Projection` to the returned image by converting that 

500 WCS. 

501 """ 

502 opaque_metadata = fits.FitsOpaqueMetadata() 

503 with ExitStack() as exit_stack: 

504 fs, fspath = ResourcePath(uri).to_fsspec() 

505 stream = exit_stack.enter_context(fs.open(fspath)) 

506 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

507 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

508 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

509 if preserve_quantization: 

510 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

511 bintable_hdu_list = exit_stack.enter_context( 

512 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

513 ) 

514 bintable_hdu = bintable_hdu_list[ext] 

515 result = Image._read_legacy_hdu( 

516 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

517 ) 

518 result._opaque_metadata = opaque_metadata 

519 return result 

520 

521 @staticmethod 

522 def _read_legacy_hdu( 

523 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

524 opaque_metadata: fits.FitsOpaqueMetadata, 

525 *, 

526 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

527 fits_wcs_frame: Frame | None = None, 

528 ) -> Image: 

529 unit: astropy.units.UnitBase | None = None 

530 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

531 unit = astropy.units.Unit(fits_unit, format="fits") 

532 dx: int = hdu.header.pop("LTV1") 

533 dy: int = hdu.header.pop("LTV2") 

534 start = YX(y=-dy, x=-dx) 

535 read_only: bool = False 

536 if preserve_bintable is not None: 

537 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

538 read_only = True 

539 projection: Projection | None = None 

540 if fits_wcs_frame is not None: 

541 try: 

542 fits_wcs = astropy.wcs.WCS(hdu.header) 

543 except KeyError: 

544 pass 

545 else: 

546 projection = Projection.from_fits_wcs( 

547 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

548 ) 

549 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

550 if read_only: 

551 image._array.flags["WRITEABLE"] = False 

552 fits.strip_wcs_cards(hdu.header) 

553 hdu.header.strip() 

554 hdu.header.remove("EXTTYPE", ignore_missing=True) 

555 hdu.header.remove("INHERIT", ignore_missing=True) 

556 hdu.header.remove("UZSCALE", ignore_missing=True) 

557 opaque_metadata.add_header(hdu.header) 

558 return image 

559 

560 

561class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

562 """Pydantic model used to represent the serialized form of an `.Image`.""" 

563 

564 data: ArrayReferenceQuantityModel | ArrayReferenceModel = pydantic.Field( 

565 description="Reference to pixel data." 

566 ) 

567 start: list[int] = pydantic.Field( 

568 description="Coordinate of the first pixels in the array, ordered (y, x)." 

569 ) 

570 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

571 default=None, 

572 exclude_if=is_none, 

573 description="Projection that maps the logical pixel grid onto the sky.", 

574 ) 

575 obs_info: ObservationInfo | None = pydantic.Field( 

576 default=None, 

577 exclude_if=is_none, 

578 description="Standardized description of image metadata", 

579 ) 

580 

581 @property 

582 def bbox(self) -> Box: 

583 """The bounding box of the image.""" 

584 if isinstance(self.data, ArrayReferenceQuantityModel): 

585 shape = self.data.value.shape 

586 else: 

587 shape = self.data.shape 

588 return Box.from_shape(shape, self.start)