Coverage for python / lsst / images / _image.py: 25%
216 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:13 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("Image", "ImageSerializationModel")
16from collections.abc import Callable, Sequence
17from contextlib import ExitStack
18from types import EllipsisType
19from typing import Any, ClassVar, final
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import numpy as np
25import numpy.typing as npt
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import YX, Box
34from ._transforms import Frame, Projection, ProjectionSerializationModel
35from .serialization import (
36 ArchiveTree,
37 ArrayReferenceModel,
38 ArrayReferenceQuantityModel,
39 InputArchive,
40 MetadataValue,
41 OutputArchive,
42 no_header_updates,
43)
44from .utils import is_none
47@final
48class Image(GeneralizedImage):
49 """A 2-d array that may be augmented with units and a nonzero origin.
51 Parameters
52 ----------
53 array_or_fill
54 Array or fill value for the image. If a fill value, ``bbox`` or
55 ``shape`` must be provided.
56 bbox
57 Bounding box for the image.
58 start
59 Logical coordinates of the first pixel in the array, ordered ``y``,
60 ``x`` (unless an `XY` instance is passed). Ignored if
61 ``bbox`` is provided. Defaults to zeros.
62 shape
63 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
64 instance is passed). Only needed if ``array_or_fill`` is not an
65 array and ``bbox`` is not provided. Like the bbox, this does not
66 include the last dimension of the array.
67 dtype
68 Pixel data type override.
69 unit
70 Units for the image's pixel values.
71 projection
72 Projection that maps the pixel grid to the sky.
73 obs_info
74 General information about the associated observation in standardized
75 form.
76 metadata
77 Arbitrary flexible metadata to associate with the image.
79 Notes
80 -----
81 Indexing the `array` attribute of an `Image` does not take into account its
82 ``start`` offset, but accessing a subimage by indexing an `Image` with a
83 `Box` does, and the `bbox` of the subimage is set to match its location
84 within the original image.
86 Indexed assignment to a subimage requires consistency between the
87 coordinate systems and units of both operands, but it will automatically
88 select a subimage of the right-hand side and convert compatible units when
89 possible. In other words::
91 a[box] = b
93 is a shortcut for
95 a[box].quantity = b[box].quantity
97 An ellipsis (``...``) can be used instead of a `Box` to assign to the full
98 image.
99 """
101 def __init__(
102 self,
103 array_or_fill: np.ndarray | int | float = 0,
104 /,
105 *,
106 bbox: Box | None = None,
107 start: Sequence[int] | None = None,
108 shape: Sequence[int] | None = None,
109 dtype: npt.DTypeLike | None = None,
110 unit: astropy.units.UnitBase | None = None,
111 projection: Projection[Any] | None = None,
112 obs_info: ObservationInfo | None = None,
113 metadata: dict[str, MetadataValue] | None = None,
114 ):
115 super().__init__(metadata)
116 if isinstance(array_or_fill, np.ndarray):
117 if dtype is not None:
118 array = np.array(array_or_fill, dtype=dtype)
119 else:
120 array = array_or_fill
121 if bbox is None:
122 bbox = Box.from_shape(array.shape, start=start)
123 elif bbox.shape != array.shape:
124 raise ValueError(
125 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}."
126 )
127 if shape is not None and shape != array.shape:
128 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.")
129 else:
130 if bbox is None:
131 if shape is None:
132 raise TypeError("No bbox, shape, or array provided.")
133 bbox = Box.from_shape(shape, start=start)
134 elif shape is not None and shape != bbox.shape:
135 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.")
136 array = np.full(bbox.shape, array_or_fill, dtype=dtype)
137 self._array: np.ndarray = array
138 self._bbox: Box = bbox
139 self._unit = unit
140 self._projection = projection
141 self._obs_info = obs_info
143 @property
144 def array(self) -> np.ndarray:
145 """The low-level array (`numpy.ndarray`).
147 Assigning to this attribute modifies the existing array in place; the
148 bounding box and underlying data pointer are never changed.
149 """
150 return self._array
152 @array.setter
153 def array(self, value: np.ndarray | int | float) -> None:
154 self._array[...] = value
156 @property
157 def quantity(self) -> astropy.units.Quantity:
158 """The low-level array with units (`astropy.units.Quantity`).
160 Assigning to this attribute modifies the existing array in place; the
161 bounding box and underlying data pointer are never changed.
162 """
163 return astropy.units.Quantity(self._array, self._unit, copy=False)
165 @quantity.setter
166 def quantity(self, value: astropy.units.Quantity) -> None:
167 self.quantity[...] = value
169 @property
170 def bbox(self) -> Box:
171 """Bounding box for the image (`Box`)."""
172 return self._bbox
174 @property
175 def unit(self) -> astropy.units.UnitBase | None:
176 """Units for the image's pixel values (`astropy.units.Unit` or
177 `None`).
178 """
179 return self._unit
181 @property
182 def projection(self) -> Projection[Any] | None:
183 """The projection that maps this image's pixel grid to the sky
184 (`Projection` | `None`).
186 Notes
187 -----
188 The pixel coordinates used by this projection account for the bounding
189 box ``start``; they are not just array indices.
190 """
191 return self._projection
193 @property
194 def obs_info(self) -> ObservationInfo | None:
195 """General information about the associated observation in standard
196 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
197 """
198 return self._obs_info
200 def __getitem__(self, bbox: Box | EllipsisType) -> Image:
201 if bbox is ...:
202 return self
203 super().__getitem__(bbox)
204 indices = bbox.slice_within(self._bbox)
205 return self._transfer_metadata(
206 Image(
207 self._array[indices],
208 bbox=bbox,
209 unit=self._unit,
210 projection=self._projection,
211 obs_info=self._obs_info,
212 ),
213 bbox=bbox,
214 )
216 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None:
217 self[bbox].quantity[...] = value.quantity
219 def __str__(self) -> str:
220 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})"
222 def __repr__(self) -> str:
223 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})"
225 def __eq__(self, other: object) -> bool:
226 if not isinstance(other, Image):
227 return NotImplemented
228 return (
229 self._bbox == other._bbox
230 and self._unit == other._unit
231 and np.array_equal(self._array, other._array, equal_nan=True)
232 )
234 def copy(self) -> Image:
235 return self._transfer_metadata(
236 Image(
237 self._array.copy(),
238 bbox=self._bbox,
239 unit=self._unit,
240 projection=self._projection,
241 obs_info=self._obs_info,
242 ),
243 copy=True,
244 )
246 def view(
247 self,
248 *,
249 unit: astropy.units.UnitBase | None | EllipsisType = ...,
250 projection: Projection | None | EllipsisType = ...,
251 start: Sequence[int] | EllipsisType = ...,
252 obs_info: ObservationInfo | None | EllipsisType = ...,
253 ) -> Image:
254 """Make a view of the image, with optional updates."""
255 if unit is ...:
256 unit = self._unit
257 if projection is ...:
258 projection = self._projection
259 if start is ...:
260 start = self._bbox.start
261 if obs_info is ...:
262 obs_info = self._obs_info
263 return self._transfer_metadata(
264 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info)
265 )
267 def serialize[P: pydantic.BaseModel](
268 self,
269 archive: OutputArchive[P],
270 *,
271 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
272 save_projection: bool = True,
273 add_offset_wcs: str | None = "A",
274 ) -> ImageSerializationModel[P]:
275 """Serialize the image to an output archive.
277 Parameters
278 ----------
279 archive
280 Archive to write to.
281 update_header
282 A callback that will be given the FITS header for the HDU
283 containing this image in order to add keys to it. This callback
284 may be provided but will not be called if the output format is not
285 FITS.
286 save_projection
287 If `True`, save the `Projection` attached to the image, if there
288 is one.
289 add_offset_wcs
290 A FITS WCS single-character suffix to use when adding a linear
291 WCS that maps the FITS array to the logical pixel coordinates
292 defined by ``bbox.start``. Set to `None` to not write this WCS.
293 """
294 if save_projection and add_offset_wcs == "":
295 raise TypeError("save_projection=True is not compatible with add_offset_wcs=''.")
297 def _update_header(header: astropy.io.fits.Header) -> None:
298 update_header(header)
299 if self.unit is not None:
300 header["BUNIT"] = self.unit.to_string(format="fits")
301 if self.projection is not None:
302 if self.fits_wcs:
303 header.update(self.fits_wcs.to_header(relax=True))
304 if add_offset_wcs is not None:
305 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
307 ref = archive.add_array(self.array, update_header=_update_header)
308 serialized_projection: ProjectionSerializationModel[P] | None = None
309 if save_projection and self.projection is not None:
310 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
311 if self.unit is None:
312 return ImageSerializationModel.model_construct(
313 data=ref,
314 start=list(self.bbox.start),
315 projection=serialized_projection,
316 obs_info=self._obs_info,
317 metadata=self.metadata,
318 )
319 else:
320 return ImageSerializationModel.model_construct(
321 data=ArrayReferenceQuantityModel.model_construct(value=ref, unit=self.unit),
322 start=list(self.bbox.start),
323 projection=serialized_projection,
324 obs_info=self._obs_info,
325 metadata=self.metadata,
326 )
328 @staticmethod
329 def deserialize(
330 model: ImageSerializationModel[Any],
331 archive: InputArchive[Any],
332 *,
333 bbox: Box | None = None,
334 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
335 ) -> Image:
336 """Deserialize an image from an input archive.
338 Parameters
339 ----------
340 model
341 A Pydantic model representation of the image, holding references
342 to data stored in the archive.
343 archive
344 Archive to read from.
345 bbox
346 Bounding box of a subimage to read instead.
347 strip_header
348 A callable that strips out any FITS header cards added by the
349 ``update_header`` argument in the corresponding call to
350 `serialize`.
351 """
352 ref: ArrayReferenceModel
353 unit: astropy.units.UnitBase | None = None
354 if isinstance(model.data, ArrayReferenceQuantityModel):
355 ref = model.data.value
356 unit = model.data.unit
357 else:
358 ref = model.data
360 def _strip_header(header: astropy.io.fits.Header) -> None:
361 if unit is not None:
362 header.pop("BUNIT", None)
363 fits.strip_wcs_cards(header)
364 strip_header(header)
366 slices = bbox.slice_within(model.bbox) if bbox is not None else ...
367 array = archive.get_array(ref, strip_header=_strip_header, slices=slices)
368 projection = (
369 Projection.deserialize(model.projection, archive) if model.projection is not None else None
370 )
371 return Image(
372 array,
373 start=model.start if bbox is None else bbox.start,
374 unit=unit,
375 projection=projection,
376 obs_info=model.obs_info,
377 )._finish_deserialize(model)
379 @staticmethod
380 def _get_archive_tree_type[P: pydantic.BaseModel](
381 pointer_type: type[P],
382 ) -> type[ImageSerializationModel[P]]:
383 """Return the serialization model type for this object for an archive
384 type that uses the given pointer type.
385 """
386 return ImageSerializationModel[pointer_type] # type: ignore
388 _archive_default_name: ClassVar[str] = "image"
389 """The name this object should be serialized with when written as the
390 top-level object.
391 """
393 def write_fits(
394 self,
395 filename: str,
396 *,
397 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
398 compression_seed: int | None = None,
399 ) -> None:
400 """Write the image to a FITS file.
402 Parameters
403 ----------
404 filename
405 Name of the file to write to. Must be a local file.
406 compression
407 Compression options.
408 compression_seed
409 A FITS tile compression seed to use whenever the configured
410 compression seed is `None` or (for backwards compatibility) ``0``.
411 """
412 compression_options = {}
413 if compression is not fits.FitsCompressionOptions.DEFAULT:
414 compression_options[self._archive_default_name] = compression
415 fits.write(self, filename, compression_options, compression_seed=compression_seed)
417 @staticmethod
418 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image:
419 """Read an image from a FITS file.
421 Parameters
422 ----------
423 url
424 URL of the file to read; may be any type supported by
425 `lsst.resources.ResourcePath`.
426 bbox
427 Bounding box of a subimage to read instead.
428 """
429 return fits.read(Image, url, bbox=bbox).deserialized
431 @staticmethod
432 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image:
433 """Convert from an `lsst.afw.image.Image` instance.
435 Parameters
436 ----------
437 legacy
438 An `lsst.afw.image.Image` instance that will share pixel data with
439 the returned object.
440 unit
441 Units of the image.
442 """
443 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit)
445 def to_legacy(self, *, copy: bool | None = None) -> Any:
446 """Convert to an `lsst.afw.image.Image` instance.
448 Parameters
449 ----------
450 copy
451 If `True`, always copy the pixel data. If `False`, return a view,
452 and raise `TypeError` if the pixel data is read-only (this is not
453 supported by afw). If `None`, onyl if the pixel data is
454 read-only.
455 """
456 import lsst.afw.image
457 import lsst.geom
459 array = self._array
460 if copy:
461 array = array.copy()
462 elif not self._array.flags.writeable:
463 if copy is None:
464 array = array.copy()
465 else:
466 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.")
468 return lsst.afw.image.Image(
469 array,
470 deep=False,
471 dtype=array.dtype.type,
472 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min),
473 )
475 @staticmethod
476 def read_legacy(
477 uri: ResourcePathExpression,
478 *,
479 preserve_quantization: bool = False,
480 ext: str | int = 1,
481 fits_wcs_frame: Frame | None = None,
482 ) -> Image:
483 """Read a FITS file written by `lsst.afw.image.Image.writeFits`.
485 Parameters
486 ----------
487 uri
488 URI or file name.
489 preserve_quantization
490 If `True`, ensure that writing the image back out again will
491 exactly preserve quantization-compressed pixel values. This causes
492 the arrays to be marked as read-only and stores the original binary
493 table data for those planes in memory. If the `Image` is copied,
494 the precompressed pixel values are not transferred to the copy.
495 ext
496 Name or index of the FITS HDU to read.
497 fits_wcs_frame
498 If not `None` and the HDU containing the image has a FITS WCS,
499 attach a `Projection` to the returned image by converting that
500 WCS.
501 """
502 opaque_metadata = fits.FitsOpaqueMetadata()
503 with ExitStack() as exit_stack:
504 fs, fspath = ResourcePath(uri).to_fsspec()
505 stream = exit_stack.enter_context(fs.open(fspath))
506 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream))
507 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
508 bintable_hdu: astropy.io.fits.BinTableHDU | None = None
509 if preserve_quantization:
510 bintable_stream = exit_stack.enter_context(fs.open(fspath))
511 bintable_hdu_list = exit_stack.enter_context(
512 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
513 )
514 bintable_hdu = bintable_hdu_list[ext]
515 result = Image._read_legacy_hdu(
516 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame
517 )
518 result._opaque_metadata = opaque_metadata
519 return result
521 @staticmethod
522 def _read_legacy_hdu(
523 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU,
524 opaque_metadata: fits.FitsOpaqueMetadata,
525 *,
526 preserve_bintable: astropy.io.fits.BinTableHDU | None,
527 fits_wcs_frame: Frame | None = None,
528 ) -> Image:
529 unit: astropy.units.UnitBase | None = None
530 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None:
531 unit = astropy.units.Unit(fits_unit, format="fits")
532 dx: int = hdu.header.pop("LTV1")
533 dy: int = hdu.header.pop("LTV2")
534 start = YX(y=-dy, x=-dx)
535 read_only: bool = False
536 if preserve_bintable is not None:
537 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable)
538 read_only = True
539 projection: Projection | None = None
540 if fits_wcs_frame is not None:
541 try:
542 fits_wcs = astropy.wcs.WCS(hdu.header)
543 except KeyError:
544 pass
545 else:
546 projection = Projection.from_fits_wcs(
547 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
548 )
549 image = Image(hdu.data, start=start, unit=unit, projection=projection)
550 if read_only:
551 image._array.flags["WRITEABLE"] = False
552 fits.strip_wcs_cards(hdu.header)
553 hdu.header.strip()
554 hdu.header.remove("EXTTYPE", ignore_missing=True)
555 hdu.header.remove("INHERIT", ignore_missing=True)
556 hdu.header.remove("UZSCALE", ignore_missing=True)
557 opaque_metadata.add_header(hdu.header)
558 return image
561class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
562 """Pydantic model used to represent the serialized form of an `.Image`."""
564 data: ArrayReferenceQuantityModel | ArrayReferenceModel = pydantic.Field(
565 description="Reference to pixel data."
566 )
567 start: list[int] = pydantic.Field(
568 description="Coordinate of the first pixels in the array, ordered (y, x)."
569 )
570 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
571 default=None,
572 exclude_if=is_none,
573 description="Projection that maps the logical pixel grid onto the sky.",
574 )
575 obs_info: ObservationInfo | None = pydantic.Field(
576 default=None,
577 exclude_if=is_none,
578 description="Standardized description of image metadata",
579 )
581 @property
582 def bbox(self) -> Box:
583 """The bounding box of the image."""
584 if isinstance(self.data, ArrayReferenceQuantityModel):
585 shape = self.data.value.shape
586 else:
587 shape = self.data.shape
588 return Box.from_shape(shape, self.start)