Coverage for python / lsst / images / _image.py: 25%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:35 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("Image", "ImageSerializationModel") 

15 

16from collections.abc import Callable, Sequence 

17from contextlib import ExitStack 

18from types import EllipsisType 

19from typing import Any, ClassVar, final 

20 

21import astropy.io.fits 

22import astropy.units 

23import astropy.wcs 

24import numpy as np 

25import numpy.typing as npt 

26import pydantic 

27from astro_metadata_translator import ObservationInfo 

28 

29from lsst.resources import ResourcePath, ResourcePathExpression 

30 

31from . import fits 

32from ._generalized_image import GeneralizedImage 

33from ._geom import YX, Box 

34from ._transforms import Frame, Projection, ProjectionSerializationModel 

35from .serialization import ( 

36 ArchiveTree, 

37 ArrayReferenceModel, 

38 ArrayReferenceQuantityModel, 

39 InputArchive, 

40 MetadataValue, 

41 OutputArchive, 

42 no_header_updates, 

43) 

44from .utils import is_none 

45 

46 

47@final 

48class Image(GeneralizedImage): 

49 """A 2-d array that may be augmented with units and a nonzero origin. 

50 

51 Parameters 

52 ---------- 

53 array_or_fill 

54 Array or fill value for the image. If a fill value, ``bbox`` or 

55 ``shape`` must be provided. 

56 bbox 

57 Bounding box for the image. 

58 start 

59 Logical coordinates of the first pixel in the array, ordered ``y``, 

60 ``x`` (unless an `XY` instance is passed). Ignored if 

61 ``bbox`` is provided. Defaults to zeros. 

62 shape 

63 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY` 

64 instance is passed). Only needed if ``array_or_fill`` is not an 

65 array and ``bbox`` is not provided. Like the bbox, this does not 

66 include the last dimension of the array. 

67 dtype 

68 Pixel data type override. 

69 unit 

70 Units for the image's pixel values. 

71 projection 

72 Projection that maps the pixel grid to the sky. 

73 obs_info 

74 General information about the associated observation in standardized 

75 form. 

76 metadata 

77 Arbitrary flexible metadata to associate with the image. 

78 

79 Notes 

80 ----- 

81 Indexing the `array` attribute of an `Image` does not take into account its 

82 ``start`` offset, but accessing a subimage by indexing an `Image` with a 

83 `Box` does, and the `bbox` of the subimage is set to match its location 

84 within the original image. 

85 

86 Indexed assignment to a subimage requires consistency between the 

87 coordinate systems and units of both operands, but it will automatically 

88 select a subimage of the right-hand side and convert compatible units when 

89 possible. In other words:: 

90 

91 a[box] = b 

92 

93 is a shortcut for 

94 

95 a[box].quantity = b[box].quantity 

96 

97 An ellipsis (``...``) can be used instead of a `Box` to assign to the full 

98 image. 

99 """ 

100 

101 def __init__( 

102 self, 

103 array_or_fill: np.ndarray | int | float = 0, 

104 /, 

105 *, 

106 bbox: Box | None = None, 

107 start: Sequence[int] | None = None, 

108 shape: Sequence[int] | None = None, 

109 dtype: npt.DTypeLike | None = None, 

110 unit: astropy.units.UnitBase | None = None, 

111 projection: Projection[Any] | None = None, 

112 obs_info: ObservationInfo | None = None, 

113 metadata: dict[str, MetadataValue] | None = None, 

114 ): 

115 super().__init__(metadata) 

116 if isinstance(array_or_fill, np.ndarray): 

117 if dtype is not None: 

118 array = np.array(array_or_fill, dtype=dtype) 

119 else: 

120 array = array_or_fill 

121 if bbox is None: 

122 bbox = Box.from_shape(array.shape, start=start) 

123 elif bbox.shape != array.shape: 

124 raise ValueError( 

125 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}." 

126 ) 

127 if shape is not None and shape != array.shape: 

128 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.") 

129 else: 

130 if bbox is None: 

131 if shape is None: 

132 raise TypeError("No bbox, shape, or array provided.") 

133 bbox = Box.from_shape(shape, start=start) 

134 elif shape is not None and shape != bbox.shape: 

135 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.") 

136 array = np.full(bbox.shape, array_or_fill, dtype=dtype) 

137 self._array: np.ndarray = array 

138 self._bbox: Box = bbox 

139 self._unit = unit 

140 self._projection = projection 

141 self._obs_info = obs_info 

142 

143 @property 

144 def array(self) -> np.ndarray: 

145 """The low-level array (`numpy.ndarray`). 

146 

147 Assigning to this attribute modifies the existing array in place; the 

148 bounding box and underlying data pointer are never changed. 

149 """ 

150 return self._array 

151 

152 @array.setter 

153 def array(self, value: np.ndarray | int | float) -> None: 

154 self._array[...] = value 

155 

156 @property 

157 def quantity(self) -> astropy.units.Quantity: 

158 """The low-level array with units (`astropy.units.Quantity`). 

159 

160 Assigning to this attribute modifies the existing array in place; the 

161 bounding box and underlying data pointer are never changed. 

162 """ 

163 return astropy.units.Quantity(self._array, self._unit, copy=False) 

164 

165 @quantity.setter 

166 def quantity(self, value: astropy.units.Quantity) -> None: 

167 self.quantity[...] = value 

168 

169 @property 

170 def bbox(self) -> Box: 

171 """Bounding box for the image (`Box`).""" 

172 return self._bbox 

173 

174 @property 

175 def unit(self) -> astropy.units.UnitBase | None: 

176 """Units for the image's pixel values (`astropy.units.Unit` or 

177 `None`). 

178 """ 

179 return self._unit 

180 

181 @property 

182 def projection(self) -> Projection[Any] | None: 

183 """The projection that maps this image's pixel grid to the sky 

184 (`Projection` | `None`). 

185 

186 Notes 

187 ----- 

188 The pixel coordinates used by this projection account for the bounding 

189 box ``start``; they are not just array indices. 

190 """ 

191 return self._projection 

192 

193 @property 

194 def obs_info(self) -> ObservationInfo | None: 

195 """General information about the associated observation in standard 

196 form. (`~astro_metadata_translator.ObservationInfo` | `None`). 

197 """ 

198 return self._obs_info 

199 

200 def __getitem__(self, bbox: Box | EllipsisType) -> Image: 

201 if bbox is ...: 

202 return self 

203 super().__getitem__(bbox) 

204 indices = bbox.slice_within(self._bbox) 

205 return self._transfer_metadata( 

206 Image( 

207 self._array[indices], 

208 bbox=bbox, 

209 unit=self._unit, 

210 projection=self._projection, 

211 obs_info=self._obs_info, 

212 ), 

213 bbox=bbox, 

214 ) 

215 

216 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None: 

217 self[bbox].quantity[...] = value.quantity 

218 

219 def __str__(self) -> str: 

220 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})" 

221 

222 def __repr__(self) -> str: 

223 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})" 

224 

225 def __eq__(self, other: object) -> bool: 

226 if not isinstance(other, Image): 

227 return NotImplemented 

228 return ( 

229 self._bbox == other._bbox 

230 and self._unit == other._unit 

231 and np.array_equal(self._array, other._array, equal_nan=True) 

232 ) 

233 

234 def copy(self) -> Image: 

235 return self._transfer_metadata( 

236 Image( 

237 self._array.copy(), 

238 bbox=self._bbox, 

239 unit=self._unit, 

240 projection=self._projection, 

241 obs_info=self._obs_info, 

242 ), 

243 copy=True, 

244 ) 

245 

246 def view( 

247 self, 

248 *, 

249 unit: astropy.units.UnitBase | None | EllipsisType = ..., 

250 projection: Projection | None | EllipsisType = ..., 

251 start: Sequence[int] | EllipsisType = ..., 

252 obs_info: ObservationInfo | None | EllipsisType = ..., 

253 ) -> Image: 

254 """Make a view of the image, with optional updates.""" 

255 if unit is ...: 

256 unit = self._unit 

257 if projection is ...: 

258 projection = self._projection 

259 if start is ...: 

260 start = self._bbox.start 

261 if obs_info is ...: 

262 obs_info = self._obs_info 

263 return self._transfer_metadata( 

264 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info) 

265 ) 

266 

267 def serialize[P: pydantic.BaseModel]( 

268 self, 

269 archive: OutputArchive[P], 

270 *, 

271 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

272 save_projection: bool = True, 

273 save_obs_info: bool = True, 

274 add_offset_wcs: str | None = "A", 

275 ) -> ImageSerializationModel[P]: 

276 """Serialize the image to an output archive. 

277 

278 Parameters 

279 ---------- 

280 archive 

281 Archive to write to. 

282 update_header 

283 A callback that will be given the FITS header for the HDU 

284 containing this image in order to add keys to it. This callback 

285 may be provided but will not be called if the output format is not 

286 FITS. 

287 save_projection 

288 If `True`, save the `Projection` attached to the image, if there 

289 is one. This does not affect whether a FITS WCS corresponding to 

290 the projection is written (it always is, if available, and if 

291 ``add_offset_wcs`` is not ``" "``). 

292 save_obs_info 

293 If `True`, save the 

294 `~astro_metadata_translator.ObservationInfo` attached to the 

295 image, if there is one. 

296 add_offset_wcs 

297 A FITS WCS single-character suffix to use when adding a linear 

298 WCS that maps the FITS array to the logical pixel coordinates 

299 defined by ``bbox.start``. Set to `None` to not write this WCS. 

300 If this is set to ``" "``, it will prevent the `Projection` from 

301 being saved as a FITS WCS. 

302 """ 

303 

304 def _update_header(header: astropy.io.fits.Header) -> None: 

305 update_header(header) 

306 if self.unit is not None: 

307 header["BUNIT"] = self.unit.to_string(format="fits") 

308 if self.projection is not None and add_offset_wcs != " ": 

309 if self.fits_wcs: 

310 header.update(self.fits_wcs.to_header(relax=True)) 

311 if add_offset_wcs is not None: 

312 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs) 

313 

314 ref = archive.add_array(self.array, update_header=_update_header) 

315 serialized_projection: ProjectionSerializationModel[P] | None = None 

316 if save_projection and self.projection is not None: 

317 serialized_projection = archive.serialize_direct("projection", self.projection.serialize) 

318 data = ( 

319 ref 

320 if self.unit is None 

321 else ArrayReferenceQuantityModel.model_construct(value=ref, unit=self.unit) 

322 ) 

323 return ImageSerializationModel.model_construct( 

324 data=data, 

325 start=list(self.bbox.start), 

326 projection=serialized_projection, 

327 obs_info=self._obs_info if save_obs_info else None, 

328 metadata=self.metadata, 

329 ) 

330 

331 @staticmethod 

332 def deserialize( 

333 model: ImageSerializationModel[Any], 

334 archive: InputArchive[Any], 

335 *, 

336 bbox: Box | None = None, 

337 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

338 ) -> Image: 

339 """Deserialize an image from an input archive. 

340 

341 Parameters 

342 ---------- 

343 model 

344 A Pydantic model representation of the image, holding references 

345 to data stored in the archive. 

346 archive 

347 Archive to read from. 

348 bbox 

349 Bounding box of a subimage to read instead. 

350 strip_header 

351 A callable that strips out any FITS header cards added by the 

352 ``update_header`` argument in the corresponding call to 

353 `serialize`. 

354 """ 

355 ref: ArrayReferenceModel 

356 unit: astropy.units.UnitBase | None = None 

357 if isinstance(model.data, ArrayReferenceQuantityModel): 

358 ref = model.data.value 

359 unit = model.data.unit 

360 else: 

361 ref = model.data 

362 

363 def _strip_header(header: astropy.io.fits.Header) -> None: 

364 if unit is not None: 

365 header.pop("BUNIT", None) 

366 fits.strip_wcs_cards(header) 

367 strip_header(header) 

368 

369 slices = bbox.slice_within(model.bbox) if bbox is not None else ... 

370 array = archive.get_array(ref, strip_header=_strip_header, slices=slices) 

371 projection = ( 

372 Projection.deserialize(model.projection, archive) if model.projection is not None else None 

373 ) 

374 return Image( 

375 array, 

376 start=model.start if bbox is None else bbox.start, 

377 unit=unit, 

378 projection=projection, 

379 obs_info=model.obs_info, 

380 )._finish_deserialize(model) 

381 

382 @staticmethod 

383 def _get_archive_tree_type[P: pydantic.BaseModel]( 

384 pointer_type: type[P], 

385 ) -> type[ImageSerializationModel[P]]: 

386 """Return the serialization model type for this object for an archive 

387 type that uses the given pointer type. 

388 """ 

389 return ImageSerializationModel[pointer_type] # type: ignore 

390 

391 _archive_default_name: ClassVar[str] = "image" 

392 """The name this object should be serialized with when written as the 

393 top-level object. 

394 """ 

395 

396 def write_fits( 

397 self, 

398 filename: str, 

399 *, 

400 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT, 

401 compression_seed: int | None = None, 

402 ) -> None: 

403 """Write the image to a FITS file. 

404 

405 Parameters 

406 ---------- 

407 filename 

408 Name of the file to write to. Must be a local file. 

409 compression 

410 Compression options. 

411 compression_seed 

412 A FITS tile compression seed to use whenever the configured 

413 compression seed is `None` or (for backwards compatibility) ``0``. 

414 """ 

415 compression_options = {} 

416 if compression is not fits.FitsCompressionOptions.DEFAULT: 

417 compression_options[self._archive_default_name] = compression 

418 fits.write(self, filename, compression_options, compression_seed=compression_seed) 

419 

420 @staticmethod 

421 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image: 

422 """Read an image from a FITS file. 

423 

424 Parameters 

425 ---------- 

426 url 

427 URL of the file to read; may be any type supported by 

428 `lsst.resources.ResourcePath`. 

429 bbox 

430 Bounding box of a subimage to read instead. 

431 """ 

432 return fits.read(Image, url, bbox=bbox).deserialized 

433 

434 @staticmethod 

435 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image: 

436 """Convert from an `lsst.afw.image.Image` instance. 

437 

438 Parameters 

439 ---------- 

440 legacy 

441 An `lsst.afw.image.Image` instance that will share pixel data with 

442 the returned object. 

443 unit 

444 Units of the image. 

445 """ 

446 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit) 

447 

448 def to_legacy(self, *, copy: bool | None = None) -> Any: 

449 """Convert to an `lsst.afw.image.Image` instance. 

450 

451 Parameters 

452 ---------- 

453 copy 

454 If `True`, always copy the pixel data. If `False`, return a view, 

455 and raise `TypeError` if the pixel data is read-only (this is not 

456 supported by afw). If `None`, onyl if the pixel data is 

457 read-only. 

458 """ 

459 import lsst.afw.image 

460 import lsst.geom 

461 

462 array = self._array 

463 if copy: 

464 array = array.copy() 

465 elif not self._array.flags.writeable: 

466 if copy is None: 

467 array = array.copy() 

468 else: 

469 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.") 

470 

471 return lsst.afw.image.Image( 

472 array, 

473 deep=False, 

474 dtype=array.dtype.type, 

475 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min), 

476 ) 

477 

478 @staticmethod 

479 def read_legacy( 

480 uri: ResourcePathExpression, 

481 *, 

482 preserve_quantization: bool = False, 

483 ext: str | int = 1, 

484 fits_wcs_frame: Frame | None = None, 

485 ) -> Image: 

486 """Read a FITS file written by `lsst.afw.image.Image.writeFits`. 

487 

488 Parameters 

489 ---------- 

490 uri 

491 URI or file name. 

492 preserve_quantization 

493 If `True`, ensure that writing the image back out again will 

494 exactly preserve quantization-compressed pixel values. This causes 

495 the arrays to be marked as read-only and stores the original binary 

496 table data for those planes in memory. If the `Image` is copied, 

497 the precompressed pixel values are not transferred to the copy. 

498 ext 

499 Name or index of the FITS HDU to read. 

500 fits_wcs_frame 

501 If not `None` and the HDU containing the image has a FITS WCS, 

502 attach a `Projection` to the returned image by converting that 

503 WCS. 

504 """ 

505 opaque_metadata = fits.FitsOpaqueMetadata() 

506 with ExitStack() as exit_stack: 

507 fs, fspath = ResourcePath(uri).to_fsspec() 

508 stream = exit_stack.enter_context(fs.open(fspath)) 

509 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream)) 

510 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header) 

511 bintable_hdu: astropy.io.fits.BinTableHDU | None = None 

512 if preserve_quantization: 

513 bintable_stream = exit_stack.enter_context(fs.open(fspath)) 

514 bintable_hdu_list = exit_stack.enter_context( 

515 astropy.io.fits.open(bintable_stream, disable_image_compression=True) 

516 ) 

517 bintable_hdu = bintable_hdu_list[ext] 

518 result = Image._read_legacy_hdu( 

519 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame 

520 ) 

521 result._opaque_metadata = opaque_metadata 

522 return result 

523 

524 @staticmethod 

525 def _read_legacy_hdu( 

526 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU, 

527 opaque_metadata: fits.FitsOpaqueMetadata, 

528 *, 

529 preserve_bintable: astropy.io.fits.BinTableHDU | None, 

530 fits_wcs_frame: Frame | None = None, 

531 ) -> Image: 

532 unit: astropy.units.UnitBase | None = None 

533 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None: 

534 unit = astropy.units.Unit(fits_unit, format="fits") 

535 dx: int = hdu.header.pop("LTV1") 

536 dy: int = hdu.header.pop("LTV2") 

537 start = YX(y=-dy, x=-dx) 

538 read_only: bool = False 

539 if preserve_bintable is not None: 

540 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable) 

541 read_only = True 

542 projection: Projection | None = None 

543 if fits_wcs_frame is not None: 

544 try: 

545 fits_wcs = astropy.wcs.WCS(hdu.header) 

546 except KeyError: 

547 pass 

548 else: 

549 projection = Projection.from_fits_wcs( 

550 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y 

551 ) 

552 image = Image(hdu.data, start=start, unit=unit, projection=projection) 

553 if read_only: 

554 image._array.flags["WRITEABLE"] = False 

555 fits.strip_wcs_cards(hdu.header) 

556 hdu.header.strip() 

557 hdu.header.remove("EXTTYPE", ignore_missing=True) 

558 hdu.header.remove("INHERIT", ignore_missing=True) 

559 hdu.header.remove("UZSCALE", ignore_missing=True) 

560 opaque_metadata.add_header(hdu.header) 

561 return image 

562 

563 

564class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree): 

565 """Pydantic model used to represent the serialized form of an `.Image`.""" 

566 

567 data: ArrayReferenceQuantityModel | ArrayReferenceModel = pydantic.Field( 

568 description="Reference to pixel data." 

569 ) 

570 start: list[int] = pydantic.Field( 

571 description="Coordinate of the first pixels in the array, ordered (y, x)." 

572 ) 

573 projection: ProjectionSerializationModel[P] | None = pydantic.Field( 

574 default=None, 

575 exclude_if=is_none, 

576 description="Projection that maps the logical pixel grid onto the sky.", 

577 ) 

578 obs_info: ObservationInfo | None = pydantic.Field( 

579 default=None, 

580 exclude_if=is_none, 

581 description="Standardized description of image metadata", 

582 ) 

583 

584 @property 

585 def bbox(self) -> Box: 

586 """The bounding box of the image.""" 

587 if isinstance(self.data, ArrayReferenceQuantityModel): 

588 shape = self.data.value.shape 

589 else: 

590 shape = self.data.shape 

591 return Box.from_shape(shape, self.start)