Coverage for python / lsst / images / tests / extract_legacy_test_data.py: 0%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:12 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = () 

15 

16import os 

17 

18import numpy as np 

19 

20try: 

21 import click 

22 

23 from lsst.afw.fits import ( 

24 CompressionAlgorithm, 

25 CompressionOptions, 

26 DitherAlgorithm, 

27 QuantizationOptions, 

28 ScalingAlgorithm, 

29 ) 

30 from lsst.daf.butler import Butler, DatasetRef 

31 from lsst.geom import Box2I, Extent2I, Point2I 

32 from lsst.utils import getPackageDir 

33except ImportError as err: 

34 err.add_note( 

35 "Updating the test data requires a full Rubin development enviroment with at least " 

36 "'click', 'afw', 'obs_base', 'meas_extensions_psfex', and 'meas_extensions_piff' importable. " 

37 "This is not necessary for just running the tests." 

38 ) 

39 raise 

40 

41 

42from ._data_ids import DP2_VISIT_DETECTOR_DATA_ID 

43 

44 

45def extract_visit_image( 

46 butler: Butler, 

47 output_path: str, 

48 dataset_ref: DatasetRef, 

49 shuffle: bool, 

50 wcs_dataset_ref: DatasetRef | None = None, 

51) -> None: 

52 """Load a subimage of a processed visit image from a butler repository 

53 and save it to testdata_images. 

54 """ 

55 visit_image = butler.get(dataset_ref, parameters={"bbox": Box2I(Point2I(5, 4), Extent2I(256, 250))}) 

56 if shuffle: 

57 indices = np.arange(visit_image.image.array.size, dtype=int) 

58 rng = np.random.default_rng() 

59 rng.shuffle(indices) 

60 visit_image.image.array[:, :] = visit_image.image.array.flat[indices].reshape(250, 256) 

61 visit_image.mask.array[:, :] = visit_image.mask.array.flat[indices].reshape(250, 256) 

62 visit_image.variance.array[:, :] = visit_image.variance.array.flat[indices].reshape(250, 256) 

63 if wcs_dataset_ref is not None: 

64 visit_summary = butler.get(wcs_dataset_ref) 

65 visit_summary_row = visit_summary.find(dataset_ref.dataId["detector"]) 

66 visit_image.setWcs(visit_summary_row.getWcs()) 

67 float_compression = CompressionOptions( 

68 algorithm=CompressionAlgorithm.RICE_1, 

69 tile_height=50, 

70 tile_width=64, 

71 quantization=QuantizationOptions( 

72 dither=DitherAlgorithm.SUBTRACTIVE_DITHER_2, 

73 scaling=ScalingAlgorithm.STDEV_MASKED, 

74 level=16, 

75 seed=747, 

76 ), 

77 ) 

78 mask_compression = CompressionOptions( 

79 algorithm=CompressionAlgorithm.GZIP_2, 

80 tile_height=50, 

81 tile_width=64, 

82 quantization=None, 

83 ) 

84 os.makedirs(os.path.dirname(output_path), exist_ok=True) 

85 visit_image.writeFits( 

86 output_path, 

87 imageOptions=float_compression, 

88 maskOptions=mask_compression, 

89 varianceOptions=float_compression, 

90 ) 

91 

92 

93def extract_camera(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None: 

94 camera = butler.get(dataset_ref) 

95 os.makedirs(os.path.dirname(output_path), exist_ok=True) 

96 camera.writeFits(output_path) 

97 

98 

99def find_dataset_or_raise( 

100 butler: Butler, dataset_type: str, *, collections: str | None = None, **kwargs 

101) -> DatasetRef: 

102 ref = butler.find_dataset(dataset_type, collections=collections, **kwargs) 

103 if ref is None: 

104 raise LookupError(f"Could not find dataset {dataset_type} with data ID {kwargs}.") 

105 return ref 

106 

107 

108@click.group("extract_test_data") 

109def extract_test_data() -> None: 

110 pass 

111 

112 

113@extract_test_data.command("dp2") 

114@click.option("-b", "--butler-repo", help="Path to the butler repository.") 

115@click.option("-d", "--testdata-dir", help="Path to the testdata_images directory.") 

116@click.option( 

117 "-c", 

118 "--collection", 

119 default="LSSTCam/runs/DRP/20250515-20251214/v30_0_0_rc2/DM-53697", 

120 help="Collection to use for most data products.", 

121) 

122@click.option( 

123 "--wcs-collection", 

124 default="LSSTCam/runs/DRP/v30_0_0/DM-53877", 

125 help="Collection to search for visit_summary datasets used to update the WCS.", 

126) 

127def extract_dp2( 

128 butler_repo: str | None, testdata_dir: str | None, collection: str, wcs_collection: str 

129) -> None: 

130 """Extract test data from a butler repository.""" 

131 if butler_repo is None: 

132 butler_repo = "dp2_prep" 

133 if testdata_dir is None: 

134 testdata_dir = getPackageDir("testdata_images") 

135 butler = Butler.from_config(butler_repo, collections=[collection]) 

136 extract_visit_image( 

137 butler, 

138 os.path.join( 

139 testdata_dir, 

140 "dp2", 

141 "legacy", 

142 "visit_image.fits", 

143 ), 

144 find_dataset_or_raise(butler, "visit_image", **DP2_VISIT_DETECTOR_DATA_ID), 

145 shuffle=True, 

146 wcs_dataset_ref=find_dataset_or_raise( 

147 butler, "visit_summary", **DP2_VISIT_DETECTOR_DATA_ID, collections=wcs_collection 

148 ), 

149 ) 

150 extract_visit_image( 

151 butler, 

152 os.path.join( 

153 testdata_dir, 

154 "dp2", 

155 "legacy", 

156 "preliminary_visit_image.fits", 

157 ), 

158 find_dataset_or_raise(butler, "preliminary_visit_image", **DP2_VISIT_DETECTOR_DATA_ID), 

159 shuffle=True, 

160 ) 

161 extract_camera( 

162 butler, 

163 os.path.join( 

164 testdata_dir, 

165 "dp2", 

166 "legacy", 

167 "camera.fits", 

168 ), 

169 find_dataset_or_raise(butler, "camera", instrument="LSSTCam"), 

170 ) 

171 

172 

173if __name__ == "__main__": 

174 extract_test_data()