Coverage for tests / test_cell_coadd.py: 35%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:48 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import pickle 

16import unittest 

17from typing import Any 

18 

19import numpy as np 

20 

21from lsst.images import YX, Box, Interval, get_legacy_deep_coadd_mask_planes 

22from lsst.images.cells import CellCoadd, CellIJ 

23from lsst.images.tests import ( 

24 DP2_COADD_DATA_ID, 

25 DP2_COADD_MISSING_CELL, 

26 RoundtripFits, 

27 assert_masked_images_equal, 

28 assert_psfs_equal, 

29 compare_cell_coadd_to_legacy, 

30) 

31 

32DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

33 

34 

35@unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

36class CellCoaddTestCase(unittest.TestCase): 

37 """Tests for the CellCoadd class and its many component classes.""" 

38 

39 @classmethod 

40 def setUpClass(cls) -> None: 

41 assert DATA_DIR is not None, "Guaranteed by decorator." 

42 cls.filename = os.path.join(DATA_DIR, "dp2", "legacy", "deep_coadd_cell_predetection.fits") 

43 cls.plane_map = get_legacy_deep_coadd_mask_planes() 

44 cls.missing_cell = CellIJ(**DP2_COADD_MISSING_CELL) 

45 try: 

46 from lsst.cell_coadds import MultipleCellCoadd 

47 

48 cls.legacy_cell_coadd = MultipleCellCoadd.read_fits(cls.filename) 

49 except ImportError: 

50 raise unittest.SkipTest("lsst.cell_coadds could not be imported.") from None 

51 with open(os.path.join(DATA_DIR, "dp2", "legacy", "skyMap.pickle"), "rb") as stream: 

52 cls.skymap = pickle.load(stream) 

53 cls.cell_coadd = CellCoadd.from_legacy( 

54 cls.legacy_cell_coadd, 

55 plane_map=cls.plane_map, 

56 tract_info=cls.skymap[DP2_COADD_DATA_ID["tract"]], 

57 ) 

58 

59 def make_psf_points(self, bbox: Box) -> YX[np.ndarray]: 

60 """Make arrays of points to test PSFs at, given a bbox that is assumed 

61 to be snapped to the cell_coadd grid. 

62 """ 

63 xc, yc = np.meshgrid( 

64 np.arange( 

65 bbox.x.start + self.cell_coadd.grid.cell_shape.x * 0.5, 

66 bbox.x.stop, 

67 self.cell_coadd.grid.cell_shape.x, 

68 ), 

69 np.arange( 

70 bbox.y.start + self.cell_coadd.grid.cell_shape.y * 0.5, 

71 bbox.y.stop, 

72 self.cell_coadd.grid.cell_shape.y, 

73 ), 

74 ) 

75 return YX( 

76 y=yc.ravel() + self.rng.uniform(-0.4, 0.4, size=yc.size), 

77 x=xc.ravel() + self.rng.uniform(-0.4, 0.4, size=xc.size), 

78 ) 

79 

80 def setUp(self) -> None: 

81 self.rng = np.random.default_rng(44) 

82 self.psf_points = self.make_psf_points(self.cell_coadd.bbox) 

83 

84 def test_from_legacy(self) -> None: 

85 """Test constructing a CellCoadd by converting a legacy 

86 lsst.cell_coadds.MultipleCellCoadd. 

87 """ 

88 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell}) 

89 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050]) 

90 compare_cell_coadd_to_legacy( 

91 self, 

92 self.cell_coadd, 

93 self.legacy_cell_coadd, 

94 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()), 

95 plane_map=self.plane_map, 

96 psf_points=self.psf_points, 

97 ) 

98 

99 def test_roundtrip(self) -> None: 

100 """Test serializing a CellCoadd and reading it back in, including 

101 subimage and component reads. 

102 """ 

103 with RoundtripFits(self, self.cell_coadd, "CellCoadd") as roundtrip: 

104 # Check a subimage read. The subbox only overlaps (but does not 

105 # fully cover) the middle 2 (of 4) cells in y, while covering 

106 # exactly the last column of cells in x. It does not cover the 

107 # missing cell. 

108 subbox = Box.factory[ 

109 self.cell_coadd.bbox.y.start + 252 : self.cell_coadd.bbox.y.stop - 175, 

110 self.cell_coadd.bbox.x.stop - 150 : self.cell_coadd.bbox.x.stop, 

111 ] 

112 subimage = roundtrip.get(bbox=subbox) 

113 assert_masked_images_equal(self, subimage, self.cell_coadd[subbox], expect_view=False) 

114 alternates: dict[str, Any] = {} 

115 with self.subTest(): 

116 subpsf = roundtrip.get("psf", bbox=subbox) 

117 self.assertEqual( 

118 subpsf.bounds.bbox, 

119 Box( 

120 y=Interval.factory[ 

121 self.cell_coadd.bbox.y.start + 150 : self.cell_coadd.bbox.y.stop - 150 

122 ], 

123 x=subbox.x, 

124 ), 

125 ) 

126 assert_psfs_equal(self, subpsf, self.cell_coadd.psf, points=self.make_psf_points(subbox)) 

127 self.assertEqual(roundtrip.get("bbox"), self.cell_coadd.bbox) 

128 alternates = { 

129 k: roundtrip.get(k) 

130 for k in ["projection", "image", "mask", "variance", "psf", "provenance"] 

131 } 

132 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell}) 

133 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050]) 

134 compare_cell_coadd_to_legacy( 

135 self, 

136 roundtrip.result, 

137 self.legacy_cell_coadd, 

138 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()), 

139 plane_map=self.plane_map, 

140 alternates=alternates, 

141 psf_points=self.psf_points, 

142 ) 

143 

144 

145if __name__ == "__main__": 

146 unittest.main()