Coverage for tests / test_visit_image.py: 15%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
16import warnings
17from typing import Any
19import astropy.io.fits
20import astropy.units as u
21import astropy.wcs
22import numpy as np
23from astro_metadata_translator import ObservationInfo
25from lsst.images import (
26 Box,
27 DetectorFrame,
28 Image,
29 MaskPlane,
30 MaskSchema,
31 ProjectionAstropyView,
32 TractFrame,
33 VisitImage,
34 get_legacy_visit_image_mask_planes,
35)
36from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata
37from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction
38from lsst.images.tests import (
39 DP2_VISIT_DETECTOR_DATA_ID,
40 RoundtripFits,
41 TemporaryButler,
42 assert_masked_images_equal,
43 assert_projections_equal,
44 compare_visit_image_to_legacy,
45 make_random_projection,
46)
48DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
51class VisitImageTestCase(unittest.TestCase):
52 """Basic Tests for VisitImage."""
54 @classmethod
55 def setUpClass(cls) -> None:
56 cls.rng = np.random.default_rng(500)
57 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096])
58 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096])
59 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")])
60 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4)
61 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13])
63 opaque = FitsOpaqueMetadata()
64 hdr = astropy.io.fits.Header()
65 with warnings.catch_warnings():
66 # Silence warnings about long keys becoming HIERARCH.
67 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning)
68 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"})
69 opaque.extract_legacy_primary_header(hdr)
71 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy)
72 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy)
73 # API signature suggests projection and obs_info can be None but they
74 # are required.
75 cls.visit_image = VisitImage(
76 cls.image,
77 variance=cls.variance,
78 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
79 mask_schema=cls.mask_schema,
80 projection=cls.projection,
81 obs_info=cls.obs_info,
82 )
83 cls.visit_image._opaque_metadata = opaque
84 cls.simplest_visit_image = VisitImage(
85 cls.image,
86 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]),
87 mask_schema=cls.mask_schema,
88 projection=cls.projection,
89 obs_info=cls.obs_info,
90 )
92 def test_basics(self) -> None:
93 """Test basic constructor patterns."""
94 # Test default fill of variance.
95 visit = self.simplest_visit_image
96 self.assertEqual(visit.variance.array[0, 0], 1.0)
97 self.assertIs(visit[...], visit)
98 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])")
99 self.assertEqual(
100 repr(visit),
101 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024)),"
102 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')],"
103 " dtype=dtype('uint8')))",
104 )
106 astropy_wcs = visit.astropy_wcs
107 self.assertIsInstance(astropy_wcs, ProjectionAstropyView)
108 approx_wcs = visit.fits_wcs
109 self.assertIsInstance(approx_wcs, astropy.wcs.WCS)
111 # Check that it is a deep copy.
112 copy = visit.copy()
113 copy.image.array[0, 0] = 30.0
114 self.assertEqual(visit.image.array[0, 0], 42.0)
115 self.assertEqual(copy.image.array[0, 0], 30.0)
117 with self.assertRaises(TypeError):
118 # Requires a PSF.
119 VisitImage(
120 self.image,
121 mask_schema=self.mask_schema,
122 projection=self.projection,
123 obs_info=self.obs_info,
124 )
126 with self.assertRaises(TypeError):
127 # Requires ObservationInfo.
128 VisitImage(
129 self.image,
130 psf=self.gaussian_psf,
131 mask_schema=self.mask_schema,
132 projection=self.projection,
133 )
135 with self.assertRaises(TypeError):
136 # Requires a projection.
137 VisitImage(
138 self.image,
139 psf=self.gaussian_psf,
140 mask_schema=self.mask_schema,
141 obs_info=self.obs_info,
142 )
144 with self.assertRaises(TypeError):
145 # Requires some form of mask.
146 VisitImage(
147 self.image,
148 psf=self.gaussian_psf,
149 projection=self.projection,
150 obs_info=self.obs_info,
151 )
153 with self.assertRaises(TypeError):
154 VisitImage(
155 Image(42, shape=(5, 5)),
156 psf=self.gaussian_psf,
157 mask_schema=self.mask_schema,
158 projection=self.projection,
159 obs_info=self.obs_info,
160 )
162 # Requires a DetectorFrame.
163 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10])
164 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096])
165 with self.assertRaises(TypeError):
166 VisitImage(
167 self.image,
168 projection=tract_proj,
169 psf=self.gaussian_psf,
170 mask_schema=self.mask_schema,
171 obs_info=self.obs_info,
172 )
174 # Variance unit mismatch.
175 with self.assertRaises(ValueError):
176 VisitImage(
177 self.image,
178 variance=self.image,
179 psf=self.gaussian_psf,
180 mask_schema=self.mask_schema,
181 projection=self.projection,
182 obs_info=self.obs_info,
183 )
185 def test_obs_info(self) -> None:
186 """Check that ObservationInfo has been constructured."""
187 visit = self.visit_image
188 self.assertIsNotNone(visit.obs_info)
189 self.maxDiff = None
190 assert visit.obs_info is not None # for mypy.
191 self.assertEqual(visit.obs_info.instrument, "LSSTCam")
193 def test_read_write(self) -> None:
194 """Test that a visit can round trip through a FITS file."""
195 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
196 # Check that we're still using the right compression, and that we
197 # wrote WCSs.
198 fits = roundtrip.inspect()
199 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
200 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN")
201 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
202 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN")
203 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
204 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN")
205 # Check a subimage read.
206 subbox = Box.factory[8:13, 9:30]
207 subimage = roundtrip.get(bbox=subbox)
208 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
209 with self.subTest():
210 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
211 with self.subTest():
212 obs_info = roundtrip.get("obs_info")
213 self.assertIsInstance(obs_info, ObservationInfo)
214 self.assertEqual(obs_info, self.visit_image.obs_info)
216 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
217 # Check that the round-tripped headers are the same (up to card order).
218 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1)
219 self.assertEqual(
220 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
221 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
222 )
223 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
224 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
225 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
226 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info)
229@unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
230class VisitImageLegacyTestCase(unittest.TestCase):
231 """Tests for the VisitImage class and the basics of the archive system.
233 Requires legacy code.
234 """
236 @classmethod
237 def setUpClass(cls) -> None:
238 assert DATA_DIR is not None, "Guaranteed by decorator."
239 cls.filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
240 cls.plane_map = plane_map = get_legacy_visit_image_mask_planes()
241 cls.visit_image = VisitImage.read_legacy(
242 cls.filename, preserve_quantization=True, plane_map=plane_map
243 )
244 cls.legacy_exposure: Any = None
245 try:
246 from lsst.afw.image import ExposureFitsReader
248 cls.legacy_exposure = ExposureFitsReader(cls.filename).read()
249 except ImportError:
250 pass
252 def test_legacy_errors(self) -> None:
253 """Legacy read failure modes."""
254 with self.assertRaises(ValueError):
255 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC")
256 with self.assertRaises(ValueError):
257 VisitImage.from_legacy(self.legacy_exposure, visit=123456)
258 with self.assertRaises(ValueError):
259 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy)
260 visit = VisitImage.from_legacy(
261 self.legacy_exposure, instrument="LSSTCam", unit=u.nJy, visit=2025052000177
262 )
263 self.assertEqual(visit.unit, u.nJy)
265 with self.assertRaises(ValueError):
266 VisitImage.read_legacy(self.filename, instrument="HSC")
267 with self.assertRaises(ValueError):
268 VisitImage.read_legacy(self.filename, visit=123456)
270 def test_component_reads(self) -> None:
271 """Test reads of components from legacy file."""
272 visit = VisitImage.read_legacy(self.filename)
273 proj = VisitImage.read_legacy(self.filename, component="projection")
274 assert_projections_equal(self, proj, visit.projection, expect_identity=False)
275 image = VisitImage.read_legacy(self.filename, component="image")
276 self.assertEqual(image, visit.image)
277 variance = VisitImage.read_legacy(self.filename, component="variance")
278 self.assertEqual(variance, visit.variance)
279 mask = VisitImage.read_legacy(self.filename, component="mask")
280 self.assertEqual(mask, visit.mask)
281 psf = VisitImage.read_legacy(self.filename, component="psf")
282 self.assertIsInstance(psf, PointSpreadFunction)
283 obs_info = VisitImage.read_legacy(self.filename, component="obs_info")
284 self.assertIsInstance(obs_info, ObservationInfo)
286 def test_obs_info(self) -> None:
287 """Check that ObservationInfo has been constructed."""
288 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
289 self.assertIsNotNone(legacy.obs_info)
290 self.maxDiff = None
291 self.assertEqual(legacy.obs_info, self.visit_image.obs_info)
292 assert legacy.obs_info is not None # for mypy.
293 self.assertEqual(legacy.obs_info.instrument, "LSSTCam")
295 def test_read_legacy_headers(self) -> None:
296 """Test that headers were correctly stripped and interpreted in
297 `VisitImage.read_legacy`.
298 """
299 # Check that we read the units from BUNIT.
300 self.assertEqual(self.visit_image.unit, astropy.units.nJy)
301 # Check that the primary header has the keys we want, and none of the
302 # keys we don't want.
303 header = self.visit_image._opaque_metadata.headers[ExtensionKey()]
304 self.assertIn("EXPTIME", header)
305 self.assertEqual(header["PLATFORM"], "lsstcam")
306 self.assertNotIn("LSST BUTLER ID", header)
307 self.assertNotIn("AR HDU", header)
308 self.assertNotIn("A_ORDER", header)
309 # Check that the extension HDUs do not have any custom headers.
310 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
311 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
312 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
314 def test_from_legacy_headers(self) -> None:
315 """Test that from_legacy handles headers properly."""
316 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map)
317 header = legacy._opaque_metadata.headers[ExtensionKey()]
318 self.assertIn("EXPTIME", header)
319 self.assertEqual(header["PLATFORM"], "lsstcam")
320 self.assertNotIn("LSST BUTLER ID", header)
321 self.assertNotIn("AR HDU", header)
322 self.assertNotIn("A_ORDER", header)
323 # Check that the extension HDUs do not have any custom headers.
324 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")])
325 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")])
326 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")])
328 def test_rewrite(self) -> None:
329 """Test that we can rewrite the visit image and preserve both
330 lossy-compressed pixel values and components exactly.
331 """
332 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip:
333 # Check that we're still using the right compression, and that we
334 # wrote WCSs.
335 fits = roundtrip.inspect()
336 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
337 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP")
338 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
339 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP")
340 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
341 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP")
342 # Check a subimage read.
343 subbox = Box.factory[8:13, 9:30]
344 subimage = roundtrip.get(bbox=subbox)
345 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False)
346 alternates: dict[str, Any] = {}
347 with self.subTest():
348 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox)
349 alternates = {
350 k: roundtrip.get(k)
351 for k in ["projection", "image", "mask", "variance", "psf", "obs_info"]
352 }
353 # Try to do a butler get of a component with storage class
354 # override.
355 with self.subTest():
356 if self.legacy_exposure is not None:
357 import lsst.afw.image
359 # We have VisitInfo available.
360 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo")
361 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo)
362 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam")
363 else:
364 raise unittest.SkipTest("Can not test VisitInfo conversion without afw")
366 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False)
367 # Check that the round-tripped headers are the same (up to card order).
368 self.assertEqual(
369 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]),
370 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]),
371 )
372 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")])
373 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")])
374 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")])
375 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam")
376 with self.subTest():
377 if self.legacy_exposure is None:
378 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
379 compare_visit_image_to_legacy(
380 self,
381 roundtrip.result,
382 self.legacy_exposure,
383 expect_view=False,
384 plane_map=self.plane_map,
385 **DP2_VISIT_DETECTOR_DATA_ID,
386 alternates=alternates,
387 )
388 # Check converting from the legacy object in-memory.
389 compare_visit_image_to_legacy(
390 self,
391 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map),
392 self.legacy_exposure,
393 expect_view=True,
394 plane_map=self.plane_map,
395 **DP2_VISIT_DETECTOR_DATA_ID,
396 )
398 def test_butler_converters(self) -> None:
399 """Test that we can read a VisitImage and its components from a butler
400 dataset written as an `lsst.afw.image.Exposure`.
401 """
402 if self.legacy_exposure is None:
403 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.")
404 with TemporaryButler(legacy="ExposureF") as helper:
405 from lsst.daf.butler import FileDataset
407 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink")
408 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage")
409 visit_image = helper.butler.get(visit_image_ref)
410 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox"))
411 self.assertEqual(bbox, visit_image.bbox)
412 alternates = {
413 k: helper.butler.get(visit_image_ref.makeComponentRef(k))
414 # TODO: including "projection" or "obs_info" here fails because
415 # there's code in daf_butler that expects any component to be
416 # valid for the *internal* storage class, not the requested
417 # one, and that's difficult to fix because it's tied up with
418 # the data ID standardization logic.
419 for k in ["image", "mask", "variance", "psf"]
420 }
421 compare_visit_image_to_legacy(
422 self,
423 visit_image,
424 self.legacy_exposure,
425 expect_view=False,
426 plane_map=self.plane_map,
427 alternates=alternates,
428 **DP2_VISIT_DETECTOR_DATA_ID,
429 )
432if __name__ == "__main__":
433 unittest.main()