Coverage for python / lsst / images / tests / extract_legacy_test_data.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ()
16import os
18import numpy as np
20try:
21 import click
23 from lsst.afw.fits import (
24 CompressionAlgorithm,
25 CompressionOptions,
26 DitherAlgorithm,
27 QuantizationOptions,
28 ScalingAlgorithm,
29 )
30 from lsst.cell_coadds import MultipleCellCoadd
31 from lsst.daf.butler import Butler, DatasetRef
32 from lsst.geom import Box2I, Extent2I, Point2I
33 from lsst.utils import getPackageDir
34except ImportError as err:
35 err.add_note(
36 "Updating the test data requires a full Rubin development enviroment with at least "
37 "'click', 'afw', 'obs_base', 'meas_extensions_psfex', 'meas_extensions_piff' and 'cell_coadds' "
38 "importable. This is not necessary for just running the tests."
39 )
40 raise
43from ._data_ids import DP2_COADD_DATA_ID, DP2_COADD_MISSING_CELL, DP2_VISIT_DETECTOR_DATA_ID
46def extract_visit_image(
47 butler: Butler,
48 output_path: str,
49 dataset_ref: DatasetRef,
50 shuffle: bool,
51) -> None:
52 """Load a subimage of a processed visit image from a butler repository
53 and save it to testdata_images.
54 """
55 visit_image = butler.get(dataset_ref, parameters={"bbox": Box2I(Point2I(5, 4), Extent2I(256, 250))})
56 if shuffle:
57 indices = np.arange(visit_image.image.array.size, dtype=int)
58 rng = np.random.default_rng()
59 rng.shuffle(indices)
60 visit_image.image.array[:, :] = visit_image.image.array.flat[indices].reshape(250, 256)
61 visit_image.mask.array[:, :] = visit_image.mask.array.flat[indices].reshape(250, 256)
62 visit_image.variance.array[:, :] = visit_image.variance.array.flat[indices].reshape(250, 256)
63 float_compression = CompressionOptions(
64 algorithm=CompressionAlgorithm.RICE_1,
65 tile_height=50,
66 tile_width=64,
67 quantization=QuantizationOptions(
68 dither=DitherAlgorithm.SUBTRACTIVE_DITHER_2,
69 scaling=ScalingAlgorithm.STDEV_MASKED,
70 level=16,
71 seed=747,
72 ),
73 )
74 mask_compression = CompressionOptions(
75 algorithm=CompressionAlgorithm.GZIP_2,
76 tile_height=50,
77 tile_width=64,
78 quantization=None,
79 )
80 os.makedirs(os.path.dirname(output_path), exist_ok=True)
81 visit_image.writeFits(
82 output_path,
83 imageOptions=float_compression,
84 maskOptions=mask_compression,
85 varianceOptions=float_compression,
86 )
89def extract_visit_image_background(
90 butler: Butler,
91 output_path: str,
92 dataset_ref: DatasetRef,
93) -> None:
94 """Load the background model of a processed visit image from a butler
95 repository and save it to testdata_images.
96 """
97 visit_image_background = butler.get(dataset_ref)
98 visit_image_background.writeFits(output_path)
101def extract_cell_coadd(
102 butler: Butler,
103 output_path: str,
104 dataset_ref: DatasetRef,
105 shuffle: bool,
106) -> None:
107 """Load a subimage of a cell coadd from a butler repository and save it
108 to testdata_images.
109 """
110 full_cell_coadd = butler.get(dataset_ref)
111 cell_coadd = MultipleCellCoadd(
112 [
113 full_cell_coadd.cells[x, y]
114 for y in range(7, 11)
115 for x in range(5, 8)
116 if {"i": y, "j": x} != DP2_COADD_MISSING_CELL
117 ],
118 grid=full_cell_coadd.grid,
119 outer_cell_size=full_cell_coadd.outer_cell_size,
120 psf_image_size=full_cell_coadd.psf_image_size,
121 common=full_cell_coadd.common,
122 )
123 if shuffle:
124 rng = np.random.default_rng()
125 for cell in cell_coadd.cells.values():
126 indices = np.arange(cell.outer.image.array.size, dtype=int)
127 rng.shuffle(indices)
128 cell.outer.image.array[:, :] = cell.outer.image.array.flat[indices].reshape(150, 150)
129 cell.outer.mask.array[:, :] = cell.outer.mask.array.flat[indices].reshape(150, 150)
130 cell.outer.variance.array[:, :] = cell.outer.variance.array.flat[indices].reshape(150, 150)
131 for n in cell.outer.noise_realizations:
132 n.array[:, :] = n.array.flat[indices].reshape(150, 150)
133 if cell.outer.mask_fractions is not None:
134 cell.outer.mask_fractions.array[:, :] = cell.outer.mask_fractions.array.flat[indices].reshape(
135 150, 150
136 )
137 os.makedirs(os.path.dirname(output_path), exist_ok=True)
138 cell_coadd.writeFits(output_path)
141def extract_camera(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
142 """Read camera geometry from a butler repository and save it to
143 testdata_images.
144 """
145 camera = butler.get(dataset_ref)
146 os.makedirs(os.path.dirname(output_path), exist_ok=True)
147 camera.writeFits(output_path)
150def extract_skymap(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
151 """Read a skymap definition from a butler repository and save it to
152 testdata_images.
153 """
154 os.makedirs(os.path.dirname(output_path), exist_ok=True)
155 (path,) = butler.retrieveArtifacts(
156 [dataset_ref],
157 destination=os.path.dirname(output_path),
158 transfer="copy",
159 preserve_path=False,
160 overwrite=True,
161 )
162 if path.ospath != output_path:
163 os.rename(path.ospath, output_path)
166def find_dataset_or_raise(
167 butler: Butler, dataset_type: str, *, collections: str | None = None, **kwargs
168) -> DatasetRef:
169 """Call `lsst.daf.butler.Butler.find_dataset` with the given arguments and
170 raise `LookupError` if it returns `None`.
171 """
172 ref = butler.find_dataset(dataset_type, collections=collections, **kwargs)
173 if ref is None:
174 raise LookupError(f"Could not find dataset {dataset_type} with data ID {kwargs}.")
175 return ref
178@click.group("extract_test_data")
179def extract_test_data() -> None:
180 pass
183@extract_test_data.command("dp2")
184@click.option("-b", "--butler-repo", help="Path to the butler repository.")
185@click.option("-d", "--testdata-dir", help="Path to the testdata_images directory.")
186@click.option(
187 "-c",
188 "--collection",
189 default="LSSTCam/runs/DRP/DP2/v30_0_0/DM-53881/stage4",
190 help="Collection to use for most data products.",
191)
192@click.option(
193 "--wcs-collection",
194 default="LSSTCam/runs/DRP/v30_0_0/DM-53877",
195 help="Collection to search for visit_summary datasets used to update the WCS.",
196)
197@click.option(
198 "--visit-images/--no-visit-images",
199 default=True,
200 help="Whether to extract [preliminary_]visit_image datasets.",
201)
202@click.option(
203 "--coadds/--no-coadds",
204 default=True,
205 help="Whether to extract coadd datasets.",
206)
207@click.option(
208 "--camera/--no-camera",
209 default=True,
210 help="Whether to extract the camera.",
211)
212@click.option(
213 "--skymap/--no-skymap",
214 default=True,
215 help="Whether to extract the skymap.",
216)
217def extract_dp2(
218 butler_repo: str | None,
219 testdata_dir: str | None,
220 collection: str,
221 wcs_collection: str,
222 *,
223 visit_images: bool,
224 coadds: bool,
225 camera: bool,
226 skymap: bool,
227) -> None:
228 """Extract test data from a butler repository."""
229 if butler_repo is None:
230 butler_repo = "dp2_prep"
231 if testdata_dir is None:
232 testdata_dir = getPackageDir("testdata_images")
233 butler = Butler.from_config(butler_repo, collections=[collection])
234 if visit_images:
235 extract_visit_image(
236 butler,
237 os.path.join(
238 testdata_dir,
239 "dp2",
240 "legacy",
241 "visit_image.fits",
242 ),
243 find_dataset_or_raise(butler, "visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
244 shuffle=True,
245 )
246 extract_visit_image_background(
247 butler,
248 os.path.join(
249 testdata_dir,
250 "dp2",
251 "legacy",
252 "visit_image_background.fits",
253 ),
254 find_dataset_or_raise(butler, "visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
255 )
256 extract_visit_image(
257 butler,
258 os.path.join(
259 testdata_dir,
260 "dp2",
261 "legacy",
262 "preliminary_visit_image.fits",
263 ),
264 find_dataset_or_raise(butler, "preliminary_visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
265 shuffle=True,
266 )
267 extract_visit_image_background(
268 butler,
269 os.path.join(
270 testdata_dir,
271 "dp2",
272 "legacy",
273 "preliminary_visit_image_background.fits",
274 ),
275 find_dataset_or_raise(butler, "preliminary_visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
276 )
277 if coadds:
278 extract_cell_coadd(
279 butler,
280 os.path.join(
281 testdata_dir,
282 "dp2",
283 "legacy",
284 "deep_coadd_cell_predetection.fits",
285 ),
286 find_dataset_or_raise(butler, "deep_coadd_cell_predetection", **DP2_COADD_DATA_ID),
287 shuffle=True,
288 )
289 if skymap:
290 extract_skymap(
291 butler,
292 os.path.join(
293 testdata_dir,
294 "dp2",
295 "legacy",
296 "skyMap.pickle",
297 ),
298 find_dataset_or_raise(butler, "skyMap", skymap=DP2_COADD_DATA_ID["skymap"]),
299 )
300 if camera:
301 extract_camera(
302 butler,
303 os.path.join(
304 testdata_dir,
305 "dp2",
306 "legacy",
307 "camera.fits",
308 ),
309 find_dataset_or_raise(butler, "camera", instrument="LSSTCam"),
310 )
313if __name__ == "__main__":
314 extract_test_data()