Coverage for python / lsst / images / tests / extract_legacy_test_data.py: 0%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ()
16import os
18import numpy as np
20try:
21 import click
23 from lsst.afw.fits import (
24 CompressionAlgorithm,
25 CompressionOptions,
26 DitherAlgorithm,
27 QuantizationOptions,
28 ScalingAlgorithm,
29 )
30 from lsst.daf.butler import Butler, DatasetRef
31 from lsst.geom import Box2I, Extent2I, Point2I
32 from lsst.utils import getPackageDir
33except ImportError as err:
34 err.add_note(
35 "Updating the test data requires a full Rubin development enviroment with at least "
36 "'click', 'afw', 'obs_base', 'meas_extensions_psfex', and 'meas_extensions_piff' importable. "
37 "This is not necessary for just running the tests."
38 )
39 raise
42from ._data_ids import DP2_VISIT_DETECTOR_DATA_ID
45def extract_visit_image(
46 butler: Butler,
47 output_path: str,
48 dataset_ref: DatasetRef,
49 shuffle: bool,
50) -> None:
51 """Load a subimage of a processed visit image from a butler repository
52 and save it to testdata_images.
53 """
54 visit_image = butler.get(dataset_ref, parameters={"bbox": Box2I(Point2I(5, 4), Extent2I(256, 250))})
55 if shuffle:
56 indices = np.arange(visit_image.image.array.size, dtype=int)
57 rng = np.random.default_rng()
58 rng.shuffle(indices)
59 visit_image.image.array[:, :] = visit_image.image.array.flat[indices].reshape(250, 256)
60 visit_image.mask.array[:, :] = visit_image.mask.array.flat[indices].reshape(250, 256)
61 visit_image.variance.array[:, :] = visit_image.variance.array.flat[indices].reshape(250, 256)
62 float_compression = CompressionOptions(
63 algorithm=CompressionAlgorithm.RICE_1,
64 tile_height=50,
65 tile_width=64,
66 quantization=QuantizationOptions(
67 dither=DitherAlgorithm.SUBTRACTIVE_DITHER_2,
68 scaling=ScalingAlgorithm.STDEV_MASKED,
69 level=16,
70 seed=747,
71 ),
72 )
73 mask_compression = CompressionOptions(
74 algorithm=CompressionAlgorithm.GZIP_2,
75 tile_height=50,
76 tile_width=64,
77 quantization=None,
78 )
79 os.makedirs(os.path.dirname(output_path), exist_ok=True)
80 visit_image.writeFits(
81 output_path,
82 imageOptions=float_compression,
83 maskOptions=mask_compression,
84 varianceOptions=float_compression,
85 )
88def extract_visit_image_background(
89 butler: Butler,
90 output_path: str,
91 dataset_ref: DatasetRef,
92) -> None:
93 """Load the background model of a processed visit image from a butler
94 repository and save it to testdata_images.
95 """
96 visit_image_background = butler.get(dataset_ref)
97 visit_image_background.writeFits(output_path)
100def extract_camera(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
101 camera = butler.get(dataset_ref)
102 os.makedirs(os.path.dirname(output_path), exist_ok=True)
103 camera.writeFits(output_path)
106def find_dataset_or_raise(
107 butler: Butler, dataset_type: str, *, collections: str | None = None, **kwargs
108) -> DatasetRef:
109 ref = butler.find_dataset(dataset_type, collections=collections, **kwargs)
110 if ref is None:
111 raise LookupError(f"Could not find dataset {dataset_type} with data ID {kwargs}.")
112 return ref
115@click.group("extract_test_data")
116def extract_test_data() -> None:
117 pass
120@extract_test_data.command("dp2")
121@click.option("-b", "--butler-repo", help="Path to the butler repository.")
122@click.option("-d", "--testdata-dir", help="Path to the testdata_images directory.")
123@click.option(
124 "-c",
125 "--collection",
126 default="LSSTCam/runs/DRP/DP2/v30_0_0/DM-53881/stage4",
127 help="Collection to use for most data products.",
128)
129def extract_dp2(butler_repo: str | None, testdata_dir: str | None, collection: str) -> None:
130 """Extract test data from a butler repository."""
131 if butler_repo is None:
132 butler_repo = "dp2_prep"
133 if testdata_dir is None:
134 testdata_dir = getPackageDir("testdata_images")
135 butler = Butler.from_config(butler_repo, collections=[collection])
136 extract_visit_image(
137 butler,
138 os.path.join(
139 testdata_dir,
140 "dp2",
141 "legacy",
142 "visit_image.fits",
143 ),
144 find_dataset_or_raise(butler, "visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
145 shuffle=True,
146 )
147 extract_visit_image_background(
148 butler,
149 os.path.join(
150 testdata_dir,
151 "dp2",
152 "legacy",
153 "visit_image_background.fits",
154 ),
155 find_dataset_or_raise(butler, "visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
156 )
157 extract_visit_image(
158 butler,
159 os.path.join(
160 testdata_dir,
161 "dp2",
162 "legacy",
163 "preliminary_visit_image.fits",
164 ),
165 find_dataset_or_raise(butler, "preliminary_visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
166 shuffle=True,
167 )
168 extract_visit_image_background(
169 butler,
170 os.path.join(
171 testdata_dir,
172 "dp2",
173 "legacy",
174 "preliminary_visit_image_background.fits",
175 ),
176 find_dataset_or_raise(butler, "preliminary_visit_image_background", **DP2_VISIT_DETECTOR_DATA_ID),
177 )
178 extract_camera(
179 butler,
180 os.path.join(
181 testdata_dir,
182 "dp2",
183 "legacy",
184 "camera.fits",
185 ),
186 find_dataset_or_raise(butler, "camera", instrument="LSSTCam"),
187 )
190if __name__ == "__main__":
191 extract_test_data()