Coverage for python / lsst / images / tests / extract_legacy_test_data.py: 0%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:12 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:12 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ()
16import os
18import numpy as np
20try:
21 import click
23 from lsst.afw.fits import (
24 CompressionAlgorithm,
25 CompressionOptions,
26 DitherAlgorithm,
27 QuantizationOptions,
28 ScalingAlgorithm,
29 )
30 from lsst.daf.butler import Butler, DatasetRef
31 from lsst.geom import Box2I, Extent2I, Point2I
32 from lsst.utils import getPackageDir
33except ImportError as err:
34 err.add_note(
35 "Updating the test data requires a full Rubin development enviroment with at least "
36 "'click', 'afw', 'obs_base', 'meas_extensions_psfex', and 'meas_extensions_piff' importable. "
37 "This is not necessary for just running the tests."
38 )
39 raise
42from ._data_ids import DP2_VISIT_DETECTOR_DATA_ID
45def extract_visit_image(
46 butler: Butler,
47 output_path: str,
48 dataset_ref: DatasetRef,
49 shuffle: bool,
50 wcs_dataset_ref: DatasetRef | None = None,
51) -> None:
52 """Load a subimage of a processed visit image from a butler repository
53 and save it to testdata_images.
54 """
55 visit_image = butler.get(dataset_ref, parameters={"bbox": Box2I(Point2I(5, 4), Extent2I(256, 250))})
56 if shuffle:
57 indices = np.arange(visit_image.image.array.size, dtype=int)
58 rng = np.random.default_rng()
59 rng.shuffle(indices)
60 visit_image.image.array[:, :] = visit_image.image.array.flat[indices].reshape(250, 256)
61 visit_image.mask.array[:, :] = visit_image.mask.array.flat[indices].reshape(250, 256)
62 visit_image.variance.array[:, :] = visit_image.variance.array.flat[indices].reshape(250, 256)
63 if wcs_dataset_ref is not None:
64 visit_summary = butler.get(wcs_dataset_ref)
65 visit_summary_row = visit_summary.find(dataset_ref.dataId["detector"])
66 visit_image.setWcs(visit_summary_row.getWcs())
67 float_compression = CompressionOptions(
68 algorithm=CompressionAlgorithm.RICE_1,
69 tile_height=50,
70 tile_width=64,
71 quantization=QuantizationOptions(
72 dither=DitherAlgorithm.SUBTRACTIVE_DITHER_2,
73 scaling=ScalingAlgorithm.STDEV_MASKED,
74 level=16,
75 seed=747,
76 ),
77 )
78 mask_compression = CompressionOptions(
79 algorithm=CompressionAlgorithm.GZIP_2,
80 tile_height=50,
81 tile_width=64,
82 quantization=None,
83 )
84 os.makedirs(os.path.dirname(output_path), exist_ok=True)
85 visit_image.writeFits(
86 output_path,
87 imageOptions=float_compression,
88 maskOptions=mask_compression,
89 varianceOptions=float_compression,
90 )
93def extract_camera(butler: Butler, output_path: str, dataset_ref: DatasetRef) -> None:
94 camera = butler.get(dataset_ref)
95 os.makedirs(os.path.dirname(output_path), exist_ok=True)
96 camera.writeFits(output_path)
99def find_dataset_or_raise(
100 butler: Butler, dataset_type: str, *, collections: str | None = None, **kwargs
101) -> DatasetRef:
102 ref = butler.find_dataset(dataset_type, collections=collections, **kwargs)
103 if ref is None:
104 raise LookupError(f"Could not find dataset {dataset_type} with data ID {kwargs}.")
105 return ref
108@click.group("extract_test_data")
109def extract_test_data() -> None:
110 pass
113@extract_test_data.command("dp2")
114@click.option("-b", "--butler-repo", help="Path to the butler repository.")
115@click.option("-d", "--testdata-dir", help="Path to the testdata_images directory.")
116@click.option(
117 "-c",
118 "--collection",
119 default="LSSTCam/runs/DRP/20250515-20251214/v30_0_0_rc2/DM-53697",
120 help="Collection to use for most data products.",
121)
122@click.option(
123 "--wcs-collection",
124 default="LSSTCam/runs/DRP/v30_0_0/DM-53877",
125 help="Collection to search for visit_summary datasets used to update the WCS.",
126)
127def extract_dp2(
128 butler_repo: str | None, testdata_dir: str | None, collection: str, wcs_collection: str
129) -> None:
130 """Extract test data from a butler repository."""
131 if butler_repo is None:
132 butler_repo = "dp2_prep"
133 if testdata_dir is None:
134 testdata_dir = getPackageDir("testdata_images")
135 butler = Butler.from_config(butler_repo, collections=[collection])
136 extract_visit_image(
137 butler,
138 os.path.join(
139 testdata_dir,
140 "dp2",
141 "legacy",
142 "visit_image.fits",
143 ),
144 find_dataset_or_raise(butler, "visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
145 shuffle=True,
146 wcs_dataset_ref=find_dataset_or_raise(
147 butler, "visit_summary", **DP2_VISIT_DETECTOR_DATA_ID, collections=wcs_collection
148 ),
149 )
150 extract_visit_image(
151 butler,
152 os.path.join(
153 testdata_dir,
154 "dp2",
155 "legacy",
156 "preliminary_visit_image.fits",
157 ),
158 find_dataset_or_raise(butler, "preliminary_visit_image", **DP2_VISIT_DETECTOR_DATA_ID),
159 shuffle=True,
160 )
161 extract_camera(
162 butler,
163 os.path.join(
164 testdata_dir,
165 "dp2",
166 "legacy",
167 "camera.fits",
168 ),
169 find_dataset_or_raise(butler, "camera", instrument="LSSTCam"),
170 )
173if __name__ == "__main__":
174 extract_test_data()