Coverage for tests / test_visit_image.py: 15%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:04 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16import warnings 

17from typing import Any 

18 

19import astropy.io.fits 

20import astropy.units as u 

21import astropy.wcs 

22import numpy as np 

23from astro_metadata_translator import ObservationInfo 

24 

25from lsst.images import ( 

26 Box, 

27 DetectorFrame, 

28 Image, 

29 MaskPlane, 

30 MaskSchema, 

31 ProjectionAstropyView, 

32 TractFrame, 

33 VisitImage, 

34 get_legacy_visit_image_mask_planes, 

35) 

36from lsst.images.fits import ExtensionKey, FitsOpaqueMetadata 

37from lsst.images.psfs import GaussianPointSpreadFunction, PointSpreadFunction 

38from lsst.images.tests import ( 

39 DP2_VISIT_DETECTOR_DATA_ID, 

40 RoundtripFits, 

41 TemporaryButler, 

42 assert_masked_images_equal, 

43 assert_projections_equal, 

44 compare_visit_image_to_legacy, 

45 make_random_projection, 

46) 

47 

48DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

49 

50 

51class VisitImageTestCase(unittest.TestCase): 

52 """Basic Tests for VisitImage.""" 

53 

54 @classmethod 

55 def setUpClass(cls) -> None: 

56 cls.rng = np.random.default_rng(500) 

57 det_frame = DetectorFrame(instrument="Inst", visit=1234, detector=1, bbox=Box.factory[1:4096, 1:4096]) 

58 cls.projection = make_random_projection(cls.rng, det_frame, Box.factory[1:4096, 1:4096]) 

59 cls.mask_schema = MaskSchema([MaskPlane("M1", "D1")]) 

60 cls.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

61 cls.gaussian_psf = GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]) 

62 

63 opaque = FitsOpaqueMetadata() 

64 hdr = astropy.io.fits.Header() 

65 with warnings.catch_warnings(): 

66 # Silence warnings about long keys becoming HIERARCH. 

67 warnings.simplefilter("ignore", category=astropy.io.fits.verify.VerifyWarning) 

68 hdr.update({"PLATFORM": "lsstcam", "LSST BUTLER ID": "123456789"}) 

69 opaque.extract_legacy_primary_header(hdr) 

70 

71 cls.image = Image(42, shape=(1024, 1024), unit=u.nJy) 

72 cls.variance = Image(5.0, shape=(1024, 1024), unit=u.nJy * u.nJy) 

73 # API signature suggests projection and obs_info can be None but they 

74 # are required. 

75 cls.visit_image = VisitImage( 

76 cls.image, 

77 variance=cls.variance, 

78 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

79 mask_schema=cls.mask_schema, 

80 projection=cls.projection, 

81 obs_info=cls.obs_info, 

82 ) 

83 cls.visit_image._opaque_metadata = opaque 

84 cls.simplest_visit_image = VisitImage( 

85 cls.image, 

86 psf=GaussianPointSpreadFunction(2.5, stamp_size=33, bounds=Box.factory[-10:10, -12:13]), 

87 mask_schema=cls.mask_schema, 

88 projection=cls.projection, 

89 obs_info=cls.obs_info, 

90 ) 

91 

92 def test_basics(self) -> None: 

93 """Test basic constructor patterns.""" 

94 # Test default fill of variance. 

95 visit = self.simplest_visit_image 

96 self.assertEqual(visit.variance.array[0, 0], 1.0) 

97 self.assertIs(visit[...], visit) 

98 self.assertEqual(str(visit), "VisitImage(Image([y=0:1024, x=0:1024], int64), ['M1'])") 

99 self.assertEqual( 

100 repr(visit), 

101 "VisitImage(Image(..., bbox=Box(y=Interval(start=0, stop=1024), x=Interval(start=0, stop=1024))," 

102 " dtype=dtype('int64')), mask_schema=MaskSchema([MaskPlane(name='M1', description='D1')]," 

103 " dtype=dtype('uint8')))", 

104 ) 

105 

106 astropy_wcs = visit.astropy_wcs 

107 self.assertIsInstance(astropy_wcs, ProjectionAstropyView) 

108 approx_wcs = visit.fits_wcs 

109 self.assertIsInstance(approx_wcs, astropy.wcs.WCS) 

110 

111 # Check that it is a deep copy. 

112 copy = visit.copy() 

113 copy.image.array[0, 0] = 30.0 

114 self.assertEqual(visit.image.array[0, 0], 42.0) 

115 self.assertEqual(copy.image.array[0, 0], 30.0) 

116 

117 with self.assertRaises(TypeError): 

118 # Requires a PSF. 

119 VisitImage( 

120 self.image, 

121 mask_schema=self.mask_schema, 

122 projection=self.projection, 

123 obs_info=self.obs_info, 

124 ) 

125 

126 with self.assertRaises(TypeError): 

127 # Requires ObservationInfo. 

128 VisitImage( 

129 self.image, 

130 psf=self.gaussian_psf, 

131 mask_schema=self.mask_schema, 

132 projection=self.projection, 

133 ) 

134 

135 with self.assertRaises(TypeError): 

136 # Requires a projection. 

137 VisitImage( 

138 self.image, 

139 psf=self.gaussian_psf, 

140 mask_schema=self.mask_schema, 

141 obs_info=self.obs_info, 

142 ) 

143 

144 with self.assertRaises(TypeError): 

145 # Requires some form of mask. 

146 VisitImage( 

147 self.image, 

148 psf=self.gaussian_psf, 

149 projection=self.projection, 

150 obs_info=self.obs_info, 

151 ) 

152 

153 with self.assertRaises(TypeError): 

154 VisitImage( 

155 Image(42, shape=(5, 5)), 

156 psf=self.gaussian_psf, 

157 mask_schema=self.mask_schema, 

158 projection=self.projection, 

159 obs_info=self.obs_info, 

160 ) 

161 

162 # Requires a DetectorFrame. 

163 tract_frame = TractFrame(skymap="Skymap", tract=1, bbox=Box.factory[1:10, 1:10]) 

164 tract_proj = make_random_projection(self.rng, tract_frame, Box.factory[1:4096, 1:4096]) 

165 with self.assertRaises(TypeError): 

166 VisitImage( 

167 self.image, 

168 projection=tract_proj, 

169 psf=self.gaussian_psf, 

170 mask_schema=self.mask_schema, 

171 obs_info=self.obs_info, 

172 ) 

173 

174 # Variance unit mismatch. 

175 with self.assertRaises(ValueError): 

176 VisitImage( 

177 self.image, 

178 variance=self.image, 

179 psf=self.gaussian_psf, 

180 mask_schema=self.mask_schema, 

181 projection=self.projection, 

182 obs_info=self.obs_info, 

183 ) 

184 

185 def test_obs_info(self) -> None: 

186 """Check that ObservationInfo has been constructured.""" 

187 visit = self.visit_image 

188 self.assertIsNotNone(visit.obs_info) 

189 self.maxDiff = None 

190 assert visit.obs_info is not None # for mypy. 

191 self.assertEqual(visit.obs_info.instrument, "LSSTCam") 

192 

193 def test_read_write(self) -> None: 

194 """Test that a visit can round trip through a FITS file.""" 

195 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

196 # Check that we're still using the right compression, and that we 

197 # wrote WCSs. 

198 fits = roundtrip.inspect() 

199 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

200 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN") 

201 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

202 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN") 

203 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

204 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN") 

205 # Check a subimage read. 

206 subbox = Box.factory[8:13, 9:30] 

207 subimage = roundtrip.get(bbox=subbox) 

208 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

209 with self.subTest(): 

210 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

211 with self.subTest(): 

212 obs_info = roundtrip.get("obs_info") 

213 self.assertIsInstance(obs_info, ObservationInfo) 

214 self.assertEqual(obs_info, self.visit_image.obs_info) 

215 

216 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

217 # Check that the round-tripped headers are the same (up to card order). 

218 self.assertEqual(len(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 1) 

219 self.assertEqual( 

220 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

221 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

222 ) 

223 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

224 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

225 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

226 self.assertEqual(roundtrip.result.obs_info, self.visit_image.obs_info) 

227 

228 

229@unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

230class VisitImageLegacyTestCase(unittest.TestCase): 

231 """Tests for the VisitImage class and the basics of the archive system. 

232 

233 Requires legacy code. 

234 """ 

235 

236 @classmethod 

237 def setUpClass(cls) -> None: 

238 assert DATA_DIR is not None, "Guaranteed by decorator." 

239 cls.filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

240 cls.plane_map = plane_map = get_legacy_visit_image_mask_planes() 

241 cls.visit_image = VisitImage.read_legacy( 

242 cls.filename, preserve_quantization=True, plane_map=plane_map 

243 ) 

244 cls.legacy_exposure: Any = None 

245 try: 

246 from lsst.afw.image import ExposureFitsReader 

247 

248 cls.legacy_exposure = ExposureFitsReader(cls.filename).read() 

249 except ImportError: 

250 pass 

251 

252 def test_legacy_errors(self) -> None: 

253 """Legacy read failure modes.""" 

254 with self.assertRaises(ValueError): 

255 VisitImage.from_legacy(self.legacy_exposure, instrument="HSC") 

256 with self.assertRaises(ValueError): 

257 VisitImage.from_legacy(self.legacy_exposure, visit=123456) 

258 with self.assertRaises(ValueError): 

259 VisitImage.from_legacy(self.legacy_exposure, unit=u.mJy) 

260 visit = VisitImage.from_legacy( 

261 self.legacy_exposure, instrument="LSSTCam", unit=u.nJy, visit=2025052000177 

262 ) 

263 self.assertEqual(visit.unit, u.nJy) 

264 

265 with self.assertRaises(ValueError): 

266 VisitImage.read_legacy(self.filename, instrument="HSC") 

267 with self.assertRaises(ValueError): 

268 VisitImage.read_legacy(self.filename, visit=123456) 

269 

270 def test_component_reads(self) -> None: 

271 """Test reads of components from legacy file.""" 

272 visit = VisitImage.read_legacy(self.filename) 

273 proj = VisitImage.read_legacy(self.filename, component="projection") 

274 assert_projections_equal(self, proj, visit.projection, expect_identity=False) 

275 image = VisitImage.read_legacy(self.filename, component="image") 

276 self.assertEqual(image, visit.image) 

277 variance = VisitImage.read_legacy(self.filename, component="variance") 

278 self.assertEqual(variance, visit.variance) 

279 mask = VisitImage.read_legacy(self.filename, component="mask") 

280 self.assertEqual(mask, visit.mask) 

281 psf = VisitImage.read_legacy(self.filename, component="psf") 

282 self.assertIsInstance(psf, PointSpreadFunction) 

283 obs_info = VisitImage.read_legacy(self.filename, component="obs_info") 

284 self.assertIsInstance(obs_info, ObservationInfo) 

285 

286 def test_obs_info(self) -> None: 

287 """Check that ObservationInfo has been constructed.""" 

288 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

289 self.assertIsNotNone(legacy.obs_info) 

290 self.maxDiff = None 

291 self.assertEqual(legacy.obs_info, self.visit_image.obs_info) 

292 assert legacy.obs_info is not None # for mypy. 

293 self.assertEqual(legacy.obs_info.instrument, "LSSTCam") 

294 

295 def test_read_legacy_headers(self) -> None: 

296 """Test that headers were correctly stripped and interpreted in 

297 `VisitImage.read_legacy`. 

298 """ 

299 # Check that we read the units from BUNIT. 

300 self.assertEqual(self.visit_image.unit, astropy.units.nJy) 

301 # Check that the primary header has the keys we want, and none of the 

302 # keys we don't want. 

303 header = self.visit_image._opaque_metadata.headers[ExtensionKey()] 

304 self.assertIn("EXPTIME", header) 

305 self.assertEqual(header["PLATFORM"], "lsstcam") 

306 self.assertNotIn("LSST BUTLER ID", header) 

307 self.assertNotIn("AR HDU", header) 

308 self.assertNotIn("A_ORDER", header) 

309 # Check that the extension HDUs do not have any custom headers. 

310 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

311 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

312 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

313 

314 def test_from_legacy_headers(self) -> None: 

315 """Test that from_legacy handles headers properly.""" 

316 legacy = VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map) 

317 header = legacy._opaque_metadata.headers[ExtensionKey()] 

318 self.assertIn("EXPTIME", header) 

319 self.assertEqual(header["PLATFORM"], "lsstcam") 

320 self.assertNotIn("LSST BUTLER ID", header) 

321 self.assertNotIn("AR HDU", header) 

322 self.assertNotIn("A_ORDER", header) 

323 # Check that the extension HDUs do not have any custom headers. 

324 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

325 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("MASK")]) 

326 self.assertFalse(self.visit_image._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

327 

328 def test_rewrite(self) -> None: 

329 """Test that we can rewrite the visit image and preserve both 

330 lossy-compressed pixel values and components exactly. 

331 """ 

332 with RoundtripFits(self, self.visit_image, "VisitImage") as roundtrip: 

333 # Check that we're still using the right compression, and that we 

334 # wrote WCSs. 

335 fits = roundtrip.inspect() 

336 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

337 self.assertEqual(fits[1].header["CTYPE1"], "RA---TAN-SIP") 

338 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

339 self.assertEqual(fits[2].header["CTYPE1"], "RA---TAN-SIP") 

340 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

341 self.assertEqual(fits[3].header["CTYPE1"], "RA---TAN-SIP") 

342 # Check a subimage read. 

343 subbox = Box.factory[8:13, 9:30] 

344 subimage = roundtrip.get(bbox=subbox) 

345 assert_masked_images_equal(self, subimage, self.visit_image[subbox], expect_view=False) 

346 alternates: dict[str, Any] = {} 

347 with self.subTest(): 

348 self.assertEqual(roundtrip.get("bbox"), self.visit_image.bbox) 

349 alternates = { 

350 k: roundtrip.get(k) 

351 for k in ["projection", "image", "mask", "variance", "psf", "obs_info"] 

352 } 

353 # Try to do a butler get of a component with storage class 

354 # override. 

355 with self.subTest(): 

356 if self.legacy_exposure is not None: 

357 import lsst.afw.image 

358 

359 # We have VisitInfo available. 

360 visit_info = roundtrip.get("obs_info", storageClass="VisitInfo") 

361 self.assertIsInstance(visit_info, lsst.afw.image.VisitInfo) 

362 self.assertEqual(visit_info.getInstrumentLabel(), "LSSTCam") 

363 else: 

364 raise unittest.SkipTest("Can not test VisitInfo conversion without afw") 

365 

366 assert_masked_images_equal(self, roundtrip.result, self.visit_image, expect_view=False) 

367 # Check that the round-tripped headers are the same (up to card order). 

368 self.assertEqual( 

369 dict(self.visit_image._opaque_metadata.headers[ExtensionKey()]), 

370 dict(roundtrip.result._opaque_metadata.headers[ExtensionKey()]), 

371 ) 

372 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("IMAGE")]) 

373 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("MASK")]) 

374 self.assertFalse(roundtrip.result._opaque_metadata.headers[ExtensionKey("VARIANCE")]) 

375 self.assertEqual(roundtrip.result._opaque_metadata.headers[ExtensionKey()]["PLATFORM"], "lsstcam") 

376 with self.subTest(): 

377 if self.legacy_exposure is None: 

378 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

379 compare_visit_image_to_legacy( 

380 self, 

381 roundtrip.result, 

382 self.legacy_exposure, 

383 expect_view=False, 

384 plane_map=self.plane_map, 

385 **DP2_VISIT_DETECTOR_DATA_ID, 

386 alternates=alternates, 

387 ) 

388 # Check converting from the legacy object in-memory. 

389 compare_visit_image_to_legacy( 

390 self, 

391 VisitImage.from_legacy(self.legacy_exposure, plane_map=self.plane_map), 

392 self.legacy_exposure, 

393 expect_view=True, 

394 plane_map=self.plane_map, 

395 **DP2_VISIT_DETECTOR_DATA_ID, 

396 ) 

397 

398 def test_butler_converters(self) -> None: 

399 """Test that we can read a VisitImage and its components from a butler 

400 dataset written as an `lsst.afw.image.Exposure`. 

401 """ 

402 if self.legacy_exposure is None: 

403 raise unittest.SkipTest("lsst.afw.image.afw could not be imported.") 

404 with TemporaryButler(legacy="ExposureF") as helper: 

405 from lsst.daf.butler import FileDataset 

406 

407 helper.butler.ingest(FileDataset(path=self.filename, refs=[helper.legacy]), transfer="symlink") 

408 visit_image_ref = helper.legacy.overrideStorageClass("VisitImage") 

409 visit_image = helper.butler.get(visit_image_ref) 

410 bbox = helper.butler.get(visit_image_ref.makeComponentRef("bbox")) 

411 self.assertEqual(bbox, visit_image.bbox) 

412 alternates = { 

413 k: helper.butler.get(visit_image_ref.makeComponentRef(k)) 

414 # TODO: including "projection" or "obs_info" here fails because 

415 # there's code in daf_butler that expects any component to be 

416 # valid for the *internal* storage class, not the requested 

417 # one, and that's difficult to fix because it's tied up with 

418 # the data ID standardization logic. 

419 for k in ["image", "mask", "variance", "psf"] 

420 } 

421 compare_visit_image_to_legacy( 

422 self, 

423 visit_image, 

424 self.legacy_exposure, 

425 expect_view=False, 

426 plane_map=self.plane_map, 

427 alternates=alternates, 

428 **DP2_VISIT_DETECTOR_DATA_ID, 

429 ) 

430 

431 

432if __name__ == "__main__": 

433 unittest.main()