Coverage for python / lsst / images / _image.py: 25%
215 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:48 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("Image", "ImageSerializationModel")
16from collections.abc import Callable, Sequence
17from contextlib import ExitStack
18from types import EllipsisType
19from typing import Any, ClassVar, final
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import numpy as np
25import numpy.typing as npt
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import YX, Box
34from ._transforms import Frame, Projection, ProjectionSerializationModel
35from .serialization import (
36 ArchiveTree,
37 ArrayReferenceModel,
38 ArrayReferenceQuantityModel,
39 InlineArrayModel,
40 InlineArrayQuantityModel,
41 InputArchive,
42 MetadataValue,
43 OutputArchive,
44 no_header_updates,
45)
46from .utils import is_none
49@final
50class Image(GeneralizedImage):
51 """A 2-d array that may be augmented with units and a nonzero origin.
53 Parameters
54 ----------
55 array_or_fill
56 Array or fill value for the image. If a fill value, ``bbox`` or
57 ``shape`` must be provided.
58 bbox
59 Bounding box for the image.
60 start
61 Logical coordinates of the first pixel in the array, ordered ``y``,
62 ``x`` (unless an `XY` instance is passed). Ignored if
63 ``bbox`` is provided. Defaults to zeros.
64 shape
65 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
66 instance is passed). Only needed if ``array_or_fill`` is not an
67 array and ``bbox`` is not provided. Like the bbox, this does not
68 include the last dimension of the array.
69 dtype
70 Pixel data type override.
71 unit
72 Units for the image's pixel values.
73 projection
74 Projection that maps the pixel grid to the sky.
75 obs_info
76 General information about the associated observation in standardized
77 form.
78 metadata
79 Arbitrary flexible metadata to associate with the image.
81 Notes
82 -----
83 Indexing the `array` attribute of an `Image` does not take into account its
84 ``start`` offset, but accessing a subimage by indexing an `Image` with a
85 `Box` does, and the `bbox` of the subimage is set to match its location
86 within the original image.
88 Indexed assignment to a subimage requires consistency between the
89 coordinate systems and units of both operands, but it will automatically
90 select a subimage of the right-hand side and convert compatible units when
91 possible. In other words::
93 a[box] = b
95 is a shortcut for
97 a[box].quantity = b[box].quantity
99 An ellipsis (``...``) can be used instead of a `Box` to assign to the full
100 image.
101 """
103 def __init__(
104 self,
105 array_or_fill: np.ndarray | int | float = 0,
106 /,
107 *,
108 bbox: Box | None = None,
109 start: Sequence[int] | None = None,
110 shape: Sequence[int] | None = None,
111 dtype: npt.DTypeLike | None = None,
112 unit: astropy.units.UnitBase | None = None,
113 projection: Projection[Any] | None = None,
114 obs_info: ObservationInfo | None = None,
115 metadata: dict[str, MetadataValue] | None = None,
116 ):
117 super().__init__(metadata)
118 if isinstance(array_or_fill, np.ndarray):
119 if dtype is not None:
120 array = np.array(array_or_fill, dtype=dtype)
121 else:
122 array = array_or_fill
123 if bbox is None:
124 bbox = Box.from_shape(array.shape, start=start)
125 elif bbox.shape != array.shape:
126 raise ValueError(
127 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}."
128 )
129 if shape is not None and shape != array.shape:
130 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.")
131 else:
132 if bbox is None:
133 if shape is None:
134 raise TypeError("No bbox, shape, or array provided.")
135 bbox = Box.from_shape(shape, start=start)
136 elif shape is not None and shape != bbox.shape:
137 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.")
138 array = np.full(bbox.shape, array_or_fill, dtype=dtype)
139 self._array: np.ndarray = array
140 self._bbox: Box = bbox
141 self._unit = unit
142 self._projection = projection
143 self._obs_info = obs_info
145 @property
146 def array(self) -> np.ndarray:
147 """The low-level array (`numpy.ndarray`).
149 Assigning to this attribute modifies the existing array in place; the
150 bounding box and underlying data pointer are never changed.
151 """
152 return self._array
154 @array.setter
155 def array(self, value: np.ndarray | int | float) -> None:
156 self._array[...] = value
158 @property
159 def quantity(self) -> astropy.units.Quantity:
160 """The low-level array with units (`astropy.units.Quantity`).
162 Assigning to this attribute modifies the existing array in place; the
163 bounding box and underlying data pointer are never changed.
164 """
165 return astropy.units.Quantity(self._array, self._unit, copy=False)
167 @quantity.setter
168 def quantity(self, value: astropy.units.Quantity) -> None:
169 self.quantity[...] = value
171 @property
172 def bbox(self) -> Box:
173 """Bounding box for the image (`Box`)."""
174 return self._bbox
176 @property
177 def unit(self) -> astropy.units.UnitBase | None:
178 """Units for the image's pixel values (`astropy.units.Unit` or
179 `None`).
180 """
181 return self._unit
183 @property
184 def projection(self) -> Projection[Any] | None:
185 """The projection that maps this image's pixel grid to the sky
186 (`Projection` | `None`).
188 Notes
189 -----
190 The pixel coordinates used by this projection account for the bounding
191 box ``start``; they are not just array indices.
192 """
193 return self._projection
195 @property
196 def obs_info(self) -> ObservationInfo | None:
197 """General information about the associated observation in standard
198 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
199 """
200 return self._obs_info
202 def __getitem__(self, bbox: Box | EllipsisType) -> Image:
203 if bbox is ...:
204 return self
205 super().__getitem__(bbox)
206 indices = bbox.slice_within(self._bbox)
207 return self._transfer_metadata(
208 Image(
209 self._array[indices],
210 bbox=bbox,
211 unit=self._unit,
212 projection=self._projection,
213 obs_info=self._obs_info,
214 ),
215 bbox=bbox,
216 )
218 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None:
219 self[bbox].quantity[...] = value.quantity
221 def __str__(self) -> str:
222 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})"
224 def __repr__(self) -> str:
225 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})"
227 def __eq__(self, other: object) -> bool:
228 if not isinstance(other, Image):
229 return NotImplemented
230 return (
231 self._bbox == other._bbox
232 and self._unit == other._unit
233 and np.array_equal(self._array, other._array, equal_nan=True)
234 )
236 def copy(self) -> Image:
237 return self._transfer_metadata(
238 Image(
239 self._array.copy(),
240 bbox=self._bbox,
241 unit=self._unit,
242 projection=self._projection,
243 obs_info=self._obs_info,
244 ),
245 copy=True,
246 )
248 def view(
249 self,
250 *,
251 unit: astropy.units.UnitBase | None | EllipsisType = ...,
252 projection: Projection | None | EllipsisType = ...,
253 start: Sequence[int] | EllipsisType = ...,
254 obs_info: ObservationInfo | None | EllipsisType = ...,
255 ) -> Image:
256 """Make a view of the image, with optional updates."""
257 if unit is ...:
258 unit = self._unit
259 if projection is ...:
260 projection = self._projection
261 if start is ...:
262 start = self._bbox.start
263 if obs_info is ...:
264 obs_info = self._obs_info
265 return self._transfer_metadata(
266 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info)
267 )
269 def serialize[P: pydantic.BaseModel](
270 self,
271 archive: OutputArchive[P],
272 *,
273 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
274 save_projection: bool = True,
275 save_obs_info: bool = True,
276 add_offset_wcs: str | None = "A",
277 ) -> ImageSerializationModel[P]:
278 """Serialize the image to an output archive.
280 Parameters
281 ----------
282 archive
283 Archive to write to.
284 update_header
285 A callback that will be given the FITS header for the HDU
286 containing this image in order to add keys to it. This callback
287 may be provided but will not be called if the output format is not
288 FITS.
289 save_projection
290 If `True`, save the `Projection` attached to the image, if there
291 is one. This does not affect whether a FITS WCS corresponding to
292 the projection is written (it always is, if available, and if
293 ``add_offset_wcs`` is not ``" "``).
294 save_obs_info
295 If `True`, save the
296 `~astro_metadata_translator.ObservationInfo` attached to the
297 image, if there is one.
298 add_offset_wcs
299 A FITS WCS single-character suffix to use when adding a linear
300 WCS that maps the FITS array to the logical pixel coordinates
301 defined by ``bbox.start``. Set to `None` to not write this WCS.
302 If this is set to ``" "``, it will prevent the `Projection` from
303 being saved as a FITS WCS.
304 """
306 def _update_header(header: astropy.io.fits.Header) -> None:
307 update_header(header)
308 if self.unit is not None:
309 header["BUNIT"] = self.unit.to_string(format="fits")
310 if self.projection is not None and add_offset_wcs != " ":
311 if self.fits_wcs:
312 header.update(self.fits_wcs.to_header(relax=True))
313 if add_offset_wcs is not None:
314 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
316 array_model = archive.add_array(self.array, update_header=_update_header)
317 serialized_projection: ProjectionSerializationModel[P] | None = None
318 if save_projection and self.projection is not None:
319 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
320 data = array_model if self.unit is None else array_model.with_units(self.unit)
321 return ImageSerializationModel.model_construct(
322 data=data,
323 start=list(self.bbox.start),
324 projection=serialized_projection,
325 obs_info=self._obs_info if save_obs_info else None,
326 metadata=self.metadata,
327 )
329 @staticmethod
330 def deserialize(
331 model: ImageSerializationModel[Any],
332 archive: InputArchive[Any],
333 *,
334 bbox: Box | None = None,
335 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
336 ) -> Image:
337 """Deserialize an image from an input archive.
339 Parameters
340 ----------
341 model
342 A Pydantic model representation of the image, holding references
343 to data stored in the archive.
344 archive
345 Archive to read from.
346 bbox
347 Bounding box of a subimage to read instead.
348 strip_header
349 A callable that strips out any FITS header cards added by the
350 ``update_header`` argument in the corresponding call to
351 `serialize`.
352 """
353 array_model: ArrayReferenceModel | InlineArrayModel
354 unit: astropy.units.UnitBase | None = None
355 if isinstance(model.data, ArrayReferenceQuantityModel | InlineArrayQuantityModel):
356 array_model = model.data.value
357 unit = model.data.unit
358 else:
359 array_model = model.data
361 def _strip_header(header: astropy.io.fits.Header) -> None:
362 if unit is not None:
363 header.pop("BUNIT", None)
364 fits.strip_wcs_cards(header)
365 strip_header(header)
367 slices = bbox.slice_within(model.bbox) if bbox is not None else ...
368 array = archive.get_array(array_model, strip_header=_strip_header, slices=slices)
369 projection = (
370 Projection.deserialize(model.projection, archive) if model.projection is not None else None
371 )
372 return Image(
373 array,
374 start=model.start if bbox is None else bbox.start,
375 unit=unit,
376 projection=projection,
377 obs_info=model.obs_info,
378 )._finish_deserialize(model)
380 @staticmethod
381 def _get_archive_tree_type[P: pydantic.BaseModel](
382 pointer_type: type[P],
383 ) -> type[ImageSerializationModel[P]]:
384 """Return the serialization model type for this object for an archive
385 type that uses the given pointer type.
386 """
387 return ImageSerializationModel[pointer_type] # type: ignore
389 _archive_default_name: ClassVar[str] = "image"
390 """The name this object should be serialized with when written as the
391 top-level object.
392 """
394 def write_fits(
395 self,
396 filename: str,
397 *,
398 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
399 compression_seed: int | None = None,
400 ) -> None:
401 """Write the image to a FITS file.
403 Parameters
404 ----------
405 filename
406 Name of the file to write to. Must be a local file.
407 compression
408 Compression options.
409 compression_seed
410 A FITS tile compression seed to use whenever the configured
411 compression seed is `None` or (for backwards compatibility) ``0``.
412 """
413 compression_options = {}
414 if compression is not fits.FitsCompressionOptions.DEFAULT:
415 compression_options[self._archive_default_name] = compression
416 fits.write(self, filename, compression_options, compression_seed=compression_seed)
418 @staticmethod
419 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image:
420 """Read an image from a FITS file.
422 Parameters
423 ----------
424 url
425 URL of the file to read; may be any type supported by
426 `lsst.resources.ResourcePath`.
427 bbox
428 Bounding box of a subimage to read instead.
429 """
430 return fits.read(Image, url, bbox=bbox).deserialized
432 @staticmethod
433 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image:
434 """Convert from an `lsst.afw.image.Image` instance.
436 Parameters
437 ----------
438 legacy
439 An `lsst.afw.image.Image` instance that will share pixel data with
440 the returned object.
441 unit
442 Units of the image.
443 """
444 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit)
446 def to_legacy(self, *, copy: bool | None = None) -> Any:
447 """Convert to an `lsst.afw.image.Image` instance.
449 Parameters
450 ----------
451 copy
452 If `True`, always copy the pixel data. If `False`, return a view,
453 and raise `TypeError` if the pixel data is read-only (this is not
454 supported by afw). If `None`, onyl if the pixel data is
455 read-only.
456 """
457 import lsst.afw.image
458 import lsst.geom
460 array = self._array
461 if copy:
462 array = array.copy()
463 elif not self._array.flags.writeable:
464 if copy is None:
465 array = array.copy()
466 else:
467 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.")
469 return lsst.afw.image.Image(
470 array,
471 deep=False,
472 dtype=array.dtype.type,
473 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min),
474 )
476 @staticmethod
477 def read_legacy(
478 uri: ResourcePathExpression,
479 *,
480 preserve_quantization: bool = False,
481 ext: str | int = 1,
482 fits_wcs_frame: Frame | None = None,
483 ) -> Image:
484 """Read a FITS file written by `lsst.afw.image.Image.writeFits`.
486 Parameters
487 ----------
488 uri
489 URI or file name.
490 preserve_quantization
491 If `True`, ensure that writing the image back out again will
492 exactly preserve quantization-compressed pixel values. This causes
493 the arrays to be marked as read-only and stores the original binary
494 table data for those planes in memory. If the `Image` is copied,
495 the precompressed pixel values are not transferred to the copy.
496 ext
497 Name or index of the FITS HDU to read.
498 fits_wcs_frame
499 If not `None` and the HDU containing the image has a FITS WCS,
500 attach a `Projection` to the returned image by converting that
501 WCS.
502 """
503 opaque_metadata = fits.FitsOpaqueMetadata()
504 with ExitStack() as exit_stack:
505 fs, fspath = ResourcePath(uri).to_fsspec()
506 stream = exit_stack.enter_context(fs.open(fspath))
507 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream))
508 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
509 bintable_hdu: astropy.io.fits.BinTableHDU | None = None
510 if preserve_quantization:
511 bintable_stream = exit_stack.enter_context(fs.open(fspath))
512 bintable_hdu_list = exit_stack.enter_context(
513 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
514 )
515 bintable_hdu = bintable_hdu_list[ext]
516 result = Image._read_legacy_hdu(
517 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame
518 )
519 result._opaque_metadata = opaque_metadata
520 return result
522 @staticmethod
523 def _read_legacy_hdu(
524 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU,
525 opaque_metadata: fits.FitsOpaqueMetadata,
526 *,
527 preserve_bintable: astropy.io.fits.BinTableHDU | None,
528 fits_wcs_frame: Frame | None = None,
529 ) -> Image:
530 unit: astropy.units.UnitBase | None = None
531 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None:
532 unit = astropy.units.Unit(fits_unit, format="fits")
533 dx: int = hdu.header.pop("LTV1")
534 dy: int = hdu.header.pop("LTV2")
535 start = YX(y=-dy, x=-dx)
536 read_only: bool = False
537 if preserve_bintable is not None:
538 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable)
539 read_only = True
540 projection: Projection | None = None
541 if fits_wcs_frame is not None:
542 try:
543 fits_wcs = astropy.wcs.WCS(hdu.header)
544 except KeyError:
545 pass
546 else:
547 projection = Projection.from_fits_wcs(
548 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
549 )
550 image = Image(hdu.data, start=start, unit=unit, projection=projection)
551 if read_only:
552 image._array.flags["WRITEABLE"] = False
553 fits.strip_wcs_cards(hdu.header)
554 hdu.header.strip()
555 hdu.header.remove("EXTTYPE", ignore_missing=True)
556 hdu.header.remove("INHERIT", ignore_missing=True)
557 hdu.header.remove("UZSCALE", ignore_missing=True)
558 opaque_metadata.add_header(hdu.header)
559 return image
562class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
563 """Pydantic model used to represent the serialized form of an `.Image`."""
565 data: ArrayReferenceQuantityModel | ArrayReferenceModel | InlineArrayModel | InlineArrayQuantityModel = (
566 pydantic.Field(description="Reference to pixel data.")
567 )
568 start: list[int] = pydantic.Field(
569 description="Coordinate of the first pixels in the array, ordered (y, x)."
570 )
571 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
572 default=None,
573 exclude_if=is_none,
574 description="Projection that maps the logical pixel grid onto the sky.",
575 )
576 obs_info: ObservationInfo | None = pydantic.Field(
577 default=None,
578 exclude_if=is_none,
579 description="Standardized description of image metadata",
580 )
582 @property
583 def bbox(self) -> Box:
584 """The bounding box of the image."""
585 match self.data:
586 case ArrayReferenceQuantityModel() | InlineArrayQuantityModel():
587 shape = self.data.value.shape
588 case ArrayReferenceModel() | InlineArrayModel():
589 shape = self.data.shape
590 return Box.from_shape(shape, self.start)