Coverage for python / lsst / images / _image.py: 25%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("Image", "ImageSerializationModel")
16from collections.abc import Callable, Sequence
17from contextlib import ExitStack
18from types import EllipsisType
19from typing import Any, ClassVar, final
21import astropy.io.fits
22import astropy.units
23import astropy.wcs
24import numpy as np
25import numpy.typing as npt
26import pydantic
27from astro_metadata_translator import ObservationInfo
29from lsst.resources import ResourcePath, ResourcePathExpression
31from . import fits
32from ._generalized_image import GeneralizedImage
33from ._geom import YX, Box
34from ._transforms import Frame, Projection, ProjectionSerializationModel
35from .serialization import (
36 ArchiveTree,
37 ArrayReferenceModel,
38 ArrayReferenceQuantityModel,
39 InputArchive,
40 MetadataValue,
41 OutputArchive,
42 no_header_updates,
43)
44from .utils import is_none
47@final
48class Image(GeneralizedImage):
49 """A 2-d array that may be augmented with units and a nonzero origin.
51 Parameters
52 ----------
53 array_or_fill
54 Array or fill value for the image. If a fill value, ``bbox`` or
55 ``shape`` must be provided.
56 bbox
57 Bounding box for the image.
58 start
59 Logical coordinates of the first pixel in the array, ordered ``y``,
60 ``x`` (unless an `XY` instance is passed). Ignored if
61 ``bbox`` is provided. Defaults to zeros.
62 shape
63 Leading dimensions of the array, ordered ``y``, ``x`` (unless an `XY`
64 instance is passed). Only needed if ``array_or_fill`` is not an
65 array and ``bbox`` is not provided. Like the bbox, this does not
66 include the last dimension of the array.
67 dtype
68 Pixel data type override.
69 unit
70 Units for the image's pixel values.
71 projection
72 Projection that maps the pixel grid to the sky.
73 obs_info
74 General information about the associated observation in standardized
75 form.
76 metadata
77 Arbitrary flexible metadata to associate with the image.
79 Notes
80 -----
81 Indexing the `array` attribute of an `Image` does not take into account its
82 ``start`` offset, but accessing a subimage by indexing an `Image` with a
83 `Box` does, and the `bbox` of the subimage is set to match its location
84 within the original image.
86 Indexed assignment to a subimage requires consistency between the
87 coordinate systems and units of both operands, but it will automatically
88 select a subimage of the right-hand side and convert compatible units when
89 possible. In other words::
91 a[box] = b
93 is a shortcut for
95 a[box].quantity = b[box].quantity
97 An ellipsis (``...``) can be used instead of a `Box` to assign to the full
98 image.
99 """
101 def __init__(
102 self,
103 array_or_fill: np.ndarray | int | float = 0,
104 /,
105 *,
106 bbox: Box | None = None,
107 start: Sequence[int] | None = None,
108 shape: Sequence[int] | None = None,
109 dtype: npt.DTypeLike | None = None,
110 unit: astropy.units.UnitBase | None = None,
111 projection: Projection[Any] | None = None,
112 obs_info: ObservationInfo | None = None,
113 metadata: dict[str, MetadataValue] | None = None,
114 ):
115 super().__init__(metadata)
116 if isinstance(array_or_fill, np.ndarray):
117 if dtype is not None:
118 array = np.array(array_or_fill, dtype=dtype)
119 else:
120 array = array_or_fill
121 if bbox is None:
122 bbox = Box.from_shape(array.shape, start=start)
123 elif bbox.shape != array.shape:
124 raise ValueError(
125 f"Explicit bbox shape {bbox.shape} does not match array with shape {array.shape}."
126 )
127 if shape is not None and shape != array.shape:
128 raise ValueError(f"Explicit shape {shape} does not match array with shape {array.shape}.")
129 else:
130 if bbox is None:
131 if shape is None:
132 raise TypeError("No bbox, shape, or array provided.")
133 bbox = Box.from_shape(shape, start=start)
134 elif shape is not None and shape != bbox.shape:
135 raise ValueError(f"Explicit shape {shape} does not match bbox shape {bbox.shape}.")
136 array = np.full(bbox.shape, array_or_fill, dtype=dtype)
137 self._array: np.ndarray = array
138 self._bbox: Box = bbox
139 self._unit = unit
140 self._projection = projection
141 self._obs_info = obs_info
143 @property
144 def array(self) -> np.ndarray:
145 """The low-level array (`numpy.ndarray`).
147 Assigning to this attribute modifies the existing array in place; the
148 bounding box and underlying data pointer are never changed.
149 """
150 return self._array
152 @array.setter
153 def array(self, value: np.ndarray | int | float) -> None:
154 self._array[...] = value
156 @property
157 def quantity(self) -> astropy.units.Quantity:
158 """The low-level array with units (`astropy.units.Quantity`).
160 Assigning to this attribute modifies the existing array in place; the
161 bounding box and underlying data pointer are never changed.
162 """
163 return astropy.units.Quantity(self._array, self._unit, copy=False)
165 @quantity.setter
166 def quantity(self, value: astropy.units.Quantity) -> None:
167 self.quantity[...] = value
169 @property
170 def bbox(self) -> Box:
171 """Bounding box for the image (`Box`)."""
172 return self._bbox
174 @property
175 def unit(self) -> astropy.units.UnitBase | None:
176 """Units for the image's pixel values (`astropy.units.Unit` or
177 `None`).
178 """
179 return self._unit
181 @property
182 def projection(self) -> Projection[Any] | None:
183 """The projection that maps this image's pixel grid to the sky
184 (`Projection` | `None`).
186 Notes
187 -----
188 The pixel coordinates used by this projection account for the bounding
189 box ``start``; they are not just array indices.
190 """
191 return self._projection
193 @property
194 def obs_info(self) -> ObservationInfo | None:
195 """General information about the associated observation in standard
196 form. (`~astro_metadata_translator.ObservationInfo` | `None`).
197 """
198 return self._obs_info
200 def __getitem__(self, bbox: Box | EllipsisType) -> Image:
201 if bbox is ...:
202 return self
203 super().__getitem__(bbox)
204 indices = bbox.slice_within(self._bbox)
205 return self._transfer_metadata(
206 Image(
207 self._array[indices],
208 bbox=bbox,
209 unit=self._unit,
210 projection=self._projection,
211 obs_info=self._obs_info,
212 ),
213 bbox=bbox,
214 )
216 def __setitem__(self, bbox: Box | EllipsisType, value: Image) -> None:
217 self[bbox].quantity[...] = value.quantity
219 def __str__(self) -> str:
220 return f"Image({self.bbox!s}, {self.array.dtype.type.__name__})"
222 def __repr__(self) -> str:
223 return f"Image(..., bbox={self.bbox!r}, dtype={self.array.dtype!r})"
225 def __eq__(self, other: object) -> bool:
226 if not isinstance(other, Image):
227 return NotImplemented
228 return (
229 self._bbox == other._bbox
230 and self._unit == other._unit
231 and np.array_equal(self._array, other._array, equal_nan=True)
232 )
234 def copy(self) -> Image:
235 return self._transfer_metadata(
236 Image(
237 self._array.copy(),
238 bbox=self._bbox,
239 unit=self._unit,
240 projection=self._projection,
241 obs_info=self._obs_info,
242 ),
243 copy=True,
244 )
246 def view(
247 self,
248 *,
249 unit: astropy.units.UnitBase | None | EllipsisType = ...,
250 projection: Projection | None | EllipsisType = ...,
251 start: Sequence[int] | EllipsisType = ...,
252 obs_info: ObservationInfo | None | EllipsisType = ...,
253 ) -> Image:
254 """Make a view of the image, with optional updates."""
255 if unit is ...:
256 unit = self._unit
257 if projection is ...:
258 projection = self._projection
259 if start is ...:
260 start = self._bbox.start
261 if obs_info is ...:
262 obs_info = self._obs_info
263 return self._transfer_metadata(
264 Image(self._array, start=start, unit=unit, projection=projection, obs_info=obs_info)
265 )
267 def serialize[P: pydantic.BaseModel](
268 self,
269 archive: OutputArchive[P],
270 *,
271 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
272 save_projection: bool = True,
273 save_obs_info: bool = True,
274 add_offset_wcs: str | None = "A",
275 ) -> ImageSerializationModel[P]:
276 """Serialize the image to an output archive.
278 Parameters
279 ----------
280 archive
281 Archive to write to.
282 update_header
283 A callback that will be given the FITS header for the HDU
284 containing this image in order to add keys to it. This callback
285 may be provided but will not be called if the output format is not
286 FITS.
287 save_projection
288 If `True`, save the `Projection` attached to the image, if there
289 is one. This does not affect whether a FITS WCS corresponding to
290 the projection is written (it always is, if available, and if
291 ``add_offset_wcs`` is not ``" "``).
292 save_obs_info
293 If `True`, save the
294 `~astro_metadata_translator.ObservationInfo` attached to the
295 image, if there is one.
296 add_offset_wcs
297 A FITS WCS single-character suffix to use when adding a linear
298 WCS that maps the FITS array to the logical pixel coordinates
299 defined by ``bbox.start``. Set to `None` to not write this WCS.
300 If this is set to ``" "``, it will prevent the `Projection` from
301 being saved as a FITS WCS.
302 """
304 def _update_header(header: astropy.io.fits.Header) -> None:
305 update_header(header)
306 if self.unit is not None:
307 header["BUNIT"] = self.unit.to_string(format="fits")
308 if self.projection is not None and add_offset_wcs != " ":
309 if self.fits_wcs:
310 header.update(self.fits_wcs.to_header(relax=True))
311 if add_offset_wcs is not None:
312 fits.add_offset_wcs(header, x=self.bbox.x.start, y=self.bbox.y.start, key=add_offset_wcs)
314 ref = archive.add_array(self.array, update_header=_update_header)
315 serialized_projection: ProjectionSerializationModel[P] | None = None
316 if save_projection and self.projection is not None:
317 serialized_projection = archive.serialize_direct("projection", self.projection.serialize)
318 data = (
319 ref
320 if self.unit is None
321 else ArrayReferenceQuantityModel.model_construct(value=ref, unit=self.unit)
322 )
323 return ImageSerializationModel.model_construct(
324 data=data,
325 start=list(self.bbox.start),
326 projection=serialized_projection,
327 obs_info=self._obs_info if save_obs_info else None,
328 metadata=self.metadata,
329 )
331 @staticmethod
332 def deserialize(
333 model: ImageSerializationModel[Any],
334 archive: InputArchive[Any],
335 *,
336 bbox: Box | None = None,
337 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
338 ) -> Image:
339 """Deserialize an image from an input archive.
341 Parameters
342 ----------
343 model
344 A Pydantic model representation of the image, holding references
345 to data stored in the archive.
346 archive
347 Archive to read from.
348 bbox
349 Bounding box of a subimage to read instead.
350 strip_header
351 A callable that strips out any FITS header cards added by the
352 ``update_header`` argument in the corresponding call to
353 `serialize`.
354 """
355 ref: ArrayReferenceModel
356 unit: astropy.units.UnitBase | None = None
357 if isinstance(model.data, ArrayReferenceQuantityModel):
358 ref = model.data.value
359 unit = model.data.unit
360 else:
361 ref = model.data
363 def _strip_header(header: astropy.io.fits.Header) -> None:
364 if unit is not None:
365 header.pop("BUNIT", None)
366 fits.strip_wcs_cards(header)
367 strip_header(header)
369 slices = bbox.slice_within(model.bbox) if bbox is not None else ...
370 array = archive.get_array(ref, strip_header=_strip_header, slices=slices)
371 projection = (
372 Projection.deserialize(model.projection, archive) if model.projection is not None else None
373 )
374 return Image(
375 array,
376 start=model.start if bbox is None else bbox.start,
377 unit=unit,
378 projection=projection,
379 obs_info=model.obs_info,
380 )._finish_deserialize(model)
382 @staticmethod
383 def _get_archive_tree_type[P: pydantic.BaseModel](
384 pointer_type: type[P],
385 ) -> type[ImageSerializationModel[P]]:
386 """Return the serialization model type for this object for an archive
387 type that uses the given pointer type.
388 """
389 return ImageSerializationModel[pointer_type] # type: ignore
391 _archive_default_name: ClassVar[str] = "image"
392 """The name this object should be serialized with when written as the
393 top-level object.
394 """
396 def write_fits(
397 self,
398 filename: str,
399 *,
400 compression: fits.FitsCompressionOptions | None = fits.FitsCompressionOptions.DEFAULT,
401 compression_seed: int | None = None,
402 ) -> None:
403 """Write the image to a FITS file.
405 Parameters
406 ----------
407 filename
408 Name of the file to write to. Must be a local file.
409 compression
410 Compression options.
411 compression_seed
412 A FITS tile compression seed to use whenever the configured
413 compression seed is `None` or (for backwards compatibility) ``0``.
414 """
415 compression_options = {}
416 if compression is not fits.FitsCompressionOptions.DEFAULT:
417 compression_options[self._archive_default_name] = compression
418 fits.write(self, filename, compression_options, compression_seed=compression_seed)
420 @staticmethod
421 def read_fits(url: ResourcePathExpression, *, bbox: Box | None = None) -> Image:
422 """Read an image from a FITS file.
424 Parameters
425 ----------
426 url
427 URL of the file to read; may be any type supported by
428 `lsst.resources.ResourcePath`.
429 bbox
430 Bounding box of a subimage to read instead.
431 """
432 return fits.read(Image, url, bbox=bbox).deserialized
434 @staticmethod
435 def from_legacy(legacy: Any, unit: astropy.units.UnitBase | None = None) -> Image:
436 """Convert from an `lsst.afw.image.Image` instance.
438 Parameters
439 ----------
440 legacy
441 An `lsst.afw.image.Image` instance that will share pixel data with
442 the returned object.
443 unit
444 Units of the image.
445 """
446 return Image(legacy.array, start=(legacy.getY0(), legacy.getX0()), unit=unit)
448 def to_legacy(self, *, copy: bool | None = None) -> Any:
449 """Convert to an `lsst.afw.image.Image` instance.
451 Parameters
452 ----------
453 copy
454 If `True`, always copy the pixel data. If `False`, return a view,
455 and raise `TypeError` if the pixel data is read-only (this is not
456 supported by afw). If `None`, onyl if the pixel data is
457 read-only.
458 """
459 import lsst.afw.image
460 import lsst.geom
462 array = self._array
463 if copy:
464 array = array.copy()
465 elif not self._array.flags.writeable:
466 if copy is None:
467 array = array.copy()
468 else:
469 raise TypeError("Cannot create a legacy lsst.afw.image.Image view into a read-only array.")
471 return lsst.afw.image.Image(
472 array,
473 deep=False,
474 dtype=array.dtype.type,
475 xy0=lsst.geom.Point2I(self._bbox.x.min, self._bbox.y.min),
476 )
478 @staticmethod
479 def read_legacy(
480 uri: ResourcePathExpression,
481 *,
482 preserve_quantization: bool = False,
483 ext: str | int = 1,
484 fits_wcs_frame: Frame | None = None,
485 ) -> Image:
486 """Read a FITS file written by `lsst.afw.image.Image.writeFits`.
488 Parameters
489 ----------
490 uri
491 URI or file name.
492 preserve_quantization
493 If `True`, ensure that writing the image back out again will
494 exactly preserve quantization-compressed pixel values. This causes
495 the arrays to be marked as read-only and stores the original binary
496 table data for those planes in memory. If the `Image` is copied,
497 the precompressed pixel values are not transferred to the copy.
498 ext
499 Name or index of the FITS HDU to read.
500 fits_wcs_frame
501 If not `None` and the HDU containing the image has a FITS WCS,
502 attach a `Projection` to the returned image by converting that
503 WCS.
504 """
505 opaque_metadata = fits.FitsOpaqueMetadata()
506 with ExitStack() as exit_stack:
507 fs, fspath = ResourcePath(uri).to_fsspec()
508 stream = exit_stack.enter_context(fs.open(fspath))
509 hdu_list = exit_stack.enter_context(astropy.io.fits.open(stream))
510 opaque_metadata.extract_legacy_primary_header(hdu_list[0].header)
511 bintable_hdu: astropy.io.fits.BinTableHDU | None = None
512 if preserve_quantization:
513 bintable_stream = exit_stack.enter_context(fs.open(fspath))
514 bintable_hdu_list = exit_stack.enter_context(
515 astropy.io.fits.open(bintable_stream, disable_image_compression=True)
516 )
517 bintable_hdu = bintable_hdu_list[ext]
518 result = Image._read_legacy_hdu(
519 hdu_list[ext], opaque_metadata, preserve_bintable=bintable_hdu, fits_wcs_frame=fits_wcs_frame
520 )
521 result._opaque_metadata = opaque_metadata
522 return result
524 @staticmethod
525 def _read_legacy_hdu(
526 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU,
527 opaque_metadata: fits.FitsOpaqueMetadata,
528 *,
529 preserve_bintable: astropy.io.fits.BinTableHDU | None,
530 fits_wcs_frame: Frame | None = None,
531 ) -> Image:
532 unit: astropy.units.UnitBase | None = None
533 if (fits_unit := hdu.header.pop("BUNIT", None)) is not None:
534 unit = astropy.units.Unit(fits_unit, format="fits")
535 dx: int = hdu.header.pop("LTV1")
536 dy: int = hdu.header.pop("LTV2")
537 start = YX(y=-dy, x=-dx)
538 read_only: bool = False
539 if preserve_bintable is not None:
540 opaque_metadata.precompressed[hdu.name] = fits.PrecompressedImage.from_bintable(preserve_bintable)
541 read_only = True
542 projection: Projection | None = None
543 if fits_wcs_frame is not None:
544 try:
545 fits_wcs = astropy.wcs.WCS(hdu.header)
546 except KeyError:
547 pass
548 else:
549 projection = Projection.from_fits_wcs(
550 fits_wcs, pixel_frame=fits_wcs_frame, x0=start.x, y0=start.y
551 )
552 image = Image(hdu.data, start=start, unit=unit, projection=projection)
553 if read_only:
554 image._array.flags["WRITEABLE"] = False
555 fits.strip_wcs_cards(hdu.header)
556 hdu.header.strip()
557 hdu.header.remove("EXTTYPE", ignore_missing=True)
558 hdu.header.remove("INHERIT", ignore_missing=True)
559 hdu.header.remove("UZSCALE", ignore_missing=True)
560 opaque_metadata.add_header(hdu.header)
561 return image
564class ImageSerializationModel[P: pydantic.BaseModel](ArchiveTree):
565 """Pydantic model used to represent the serialized form of an `.Image`."""
567 data: ArrayReferenceQuantityModel | ArrayReferenceModel = pydantic.Field(
568 description="Reference to pixel data."
569 )
570 start: list[int] = pydantic.Field(
571 description="Coordinate of the first pixels in the array, ordered (y, x)."
572 )
573 projection: ProjectionSerializationModel[P] | None = pydantic.Field(
574 default=None,
575 exclude_if=is_none,
576 description="Projection that maps the logical pixel grid onto the sky.",
577 )
578 obs_info: ObservationInfo | None = pydantic.Field(
579 default=None,
580 exclude_if=is_none,
581 description="Standardized description of image metadata",
582 )
584 @property
585 def bbox(self) -> Box:
586 """The bounding box of the image."""
587 if isinstance(self.data, ArrayReferenceQuantityModel):
588 shape = self.data.value.shape
589 else:
590 shape = self.data.shape
591 return Box.from_shape(shape, self.start)