Coverage for python / lsst / obs / base / _read_curated_calibs.py: 11%
95 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:47 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["CuratedCalibration", "read_all"]
26import datetime
27import os
28import re
29from collections.abc import Mapping
30from typing import TYPE_CHECKING, Any, NamedTuple, Protocol
32from lsst.resources import ResourcePath
34if TYPE_CHECKING:
35 import lsst.afw.cameraGeom
36 from lsst.resources import ResourcePathExpression
39class _SearchPath(NamedTuple):
40 root: ResourcePath
41 children: tuple[str, ...]
44class CuratedCalibration(Protocol):
45 """Protocol that describes the methods needed by this class when dealing
46 with curated calibration datasets.
47 """
49 @classmethod
50 def readText(cls, path: str) -> CuratedCalibration: ...
52 def getMetadata(self) -> Mapping: ...
55def read_one_calib(
56 path: _SearchPath,
57 chip_id: int | None,
58 filter_name: str | None,
59 calib_class: type[CuratedCalibration],
60) -> tuple[dict[datetime.datetime, CuratedCalibration], str]:
61 """Read data for a particular path from the standard format at a
62 particular root.
64 Parameters
65 ----------
66 path : `_SearchPath`
67 This tuple contains the top level of the data tree
68 and then further optional subdirectories in subsequent
69 indices. See Notes below for more details.
70 chip_id : `int` or None
71 The identifier for the sensor in question. To be used in
72 validation.
73 filter_name : `str` or None
74 The identifier for the filter in question. To be used in
75 validation.
76 calib_class : `typing.Any`
77 The class to use to read the curated calibration text file. Must
78 support the ``readText()`` method.
80 Returns
81 -------
82 data_dict : `dict`
83 A dictionary of calibration objects constructed from the
84 appropriate factory class. The key is the validity start time
85 as a `datetime.datetime` object.
86 calib_type : `str`
87 The type of calibrations that have been read and are included
88 in the ``data_dict``.
90 Notes
91 -----
92 Curated calibrations are read from the appropriate ``obs_ _data``
93 package, and are required to have a common directory structure to
94 be identified and ingested properly. The top-level directories
95 are organized by the instrument's ``policyName``. These names are
96 generally all lower-case, but this is not universally true.
98 Below the top-level instrument directory, subdirectories named
99 after the curated calibration type contained within, with the
100 dataset_type_name forced to lowercase. For calibrations that
101 depend on the detector (i.e., the defects), the next level of
102 subdirectories should contain directories named with the detector
103 name, again forced to lowercase.
105 For filter dependent calibrations that do not depend on the
106 detector (i.e., transmission_filter), the calibrations should be
107 grouped into directories named with the physical filter name
108 (again, all lowercase) below the dataset_type_name directory.
109 Filter dependent calibrations that do depend on the detector
110 (i.e., transmission_system), have physical filter named
111 directories below the detector level directories.
113 The files themselves are required to be named with the validity start
114 date and the appropriate file extension. The date must be in an ISO format
115 that can be parsed by `datetime.datetime.fromisoformat`. This can include
116 variants commonly used data packages such as ``2025-04-30T12:23:00`` or the
117 more compact ``20250430T123000``.
118 """
119 files: list[ResourcePath] = []
120 extensions = (".ecsv", ".yaml", ".json")
121 search_root = path.root
122 for subdir in path.children:
123 search_root = search_root.join(subdir, forceDirectory=True)
125 file_filter = "(" + "|".join(re.escape(ext) for ext in extensions) + ")$"
126 files = list(ResourcePath.findFileResources([search_root], file_filter=file_filter))
128 # Convention is that data reside in location where the final two parts of
129 # the directory root are <instrument>/<calib type>.
130 # os.path.split() does not like a trailing "/" directory indicator.
131 parts = os.path.split(path.root.path.removesuffix("/"))
132 instrument = os.path.split(parts[0])[1] # convention is that these reside at <instrument>/<calib_type>
133 calib_type = parts[1]
135 data_dict: dict[datetime.datetime, Any] = {}
136 for f in files:
137 date_str = f.updatedExtension("").basename()
138 # Assume files are using some form of ISO date string.
139 valid_start = datetime.datetime.fromisoformat(date_str)
140 # For now readText does not know about URIs.
141 with f.as_local() as local_uri:
142 data_dict[valid_start] = calib_class.readText(local_uri.ospath)
143 check_metadata(data_dict[valid_start], valid_start, instrument, chip_id, filter_name, f, calib_type)
144 return data_dict, calib_type
147def check_metadata(
148 obj: Any,
149 valid_start: datetime.datetime,
150 instrument: str,
151 chip_id: int | None,
152 filter_name: str | None,
153 filepath: ResourcePath,
154 calib_type: str,
155) -> None:
156 """Check that the metadata is complete and self consistent.
158 Parameters
159 ----------
160 obj : object of same type as the factory
161 Object to retrieve metadata from in order to compare with
162 metadata inferred from the path.
163 valid_start : `datetime`
164 Start of the validity range for data.
165 instrument : `str`
166 Name of the instrument in question.
167 chip_id : `int`
168 Identifier of the sensor in question.
169 filter_name : `str`
170 Identifier of the filter in question.
171 filepath : `lsst.resources.ResourcePath`
172 Path of the file read to construct the data.
173 calib_type : `str`
174 Name of the type of data being read.
176 Returns
177 -------
178 None
180 Raises
181 ------
182 ValueError
183 If the metadata from the path and the metadata encoded
184 in the path do not match for any reason.
185 """
186 md = obj.getMetadata()
187 # It is an error if these two do not exist.
188 finst = md["INSTRUME"]
189 fcalib_type = md["OBSTYPE"]
190 # These may optionally not exist.
191 fchip_id = md.get("DETECTOR", None)
192 ffilter_name = md.get("FILTER", None)
194 if chip_id is not None:
195 fchip_id = int(fchip_id)
196 if filter_name is not None:
197 ffilter_name = ffilter_name.lower()
198 filter_name = filter_name.lower()
200 if not (
201 (finst.lower(), fchip_id, ffilter_name, fcalib_type.lower())
202 == (instrument.lower(), chip_id, filter_name, calib_type.lower())
203 ):
204 raise ValueError(
205 "Path and file metadata do not agree:\n"
206 f"Path metadata: {instrument} {chip_id} {filter_name} {calib_type}\n"
207 f"File metadata: {finst} {fchip_id} {ffilter_name} {fcalib_type}\n"
208 f"File read from : {filepath}\n"
209 )
212def read_all(
213 root: ResourcePathExpression,
214 camera: lsst.afw.cameraGeom.Camera,
215 calib_class: type[CuratedCalibration],
216 required_dimensions: list[str],
217 filters: set[str],
218) -> tuple[dict[_SearchPath, dict[datetime.datetime, CuratedCalibration]], str]:
219 """Read all data from the standard format at a particular root.
221 Parameters
222 ----------
223 root : `lsst.resources.ResourcePathExpression`
224 URI to the top level of the data tree. This is expected to hold
225 directories named after the sensor names. They are expected to be
226 lower case.
227 camera : `lsst.afw.cameraGeom.Camera`
228 The camera that goes with the data being read.
229 calib_class : `typing.Any`
230 The class to use to read the curated calibration text file. Must
231 support the ``readText()`` and ``getMetadata()`` methods.
232 required_dimensions : `list` [`str`]
233 Dimensions required for the calibration.
234 filters : `list` [`str`]
235 List of the known filters for this camera. Used to identify
236 filter-dependent calibrations.
238 Returns
239 -------
240 calibration_data : `dict`
241 A dictionary of dictionaries of calibration objects
242 constructed with the appropriate factory class. The first key
243 is the sensor name in lower case, and the second is the
244 validity start time as a `datetime.datetime` object.
245 calib_type : `str`
246 The type of calibrations that have been read and are included
247 in the ``data_dict``.
249 Notes
250 -----
251 Each leaf object in the constructed dictionary has metadata associated with
252 it. The detector ID may be retrieved from the DETECTOR entry of that
253 metadata.
254 """
255 calibration_data: dict[_SearchPath, dict[datetime.datetime, CuratedCalibration]] = {}
257 root_uri = ResourcePath(root, forceDirectory=True, forceAbsolute=True)
259 # Read all the subdirectories. These will be things like detectors or
260 # physical filters. We assume that every sub directory contains calibration
261 # data. We only walk the top level.
262 _, dirs, _ = next(root_uri.walk())
264 # If there are no sub directories this is a global calibration.
265 if not dirs:
266 dirs = [""]
268 calib_types = set()
269 # We assume the directories have been lowered.
270 detector_map = {det.getName().lower(): det.getName() for det in camera}
271 filter_map = {filterName.lower().replace(" ", "_"): filterName for filterName in filters}
273 paths_to_search: list[_SearchPath] = []
274 for dir_name in dirs:
275 # Catch possible mistakes:
276 if "detector" in required_dimensions:
277 if dir_name not in detector_map:
278 # Top level directories must be detectors if they're
279 # required.
280 detectors = list(detector_map)
281 max_detectors = 10
282 note_str = "knows"
283 if len(detectors) > max_detectors:
284 # report example subset
285 note_str = "examples"
286 detectors = detectors[:max_detectors]
287 raise RuntimeError(
288 f"Detector {dir_name} not known to supplied camera "
289 f"{camera.getName()} ({note_str}: {','.join(detectors)})"
290 )
291 elif "physical_filter" in required_dimensions:
292 # If the calibration depends on both detector and
293 # physical_filter, the subdirs here should contain the
294 # filter name.
295 subdir = root_uri.join(dir_name, forceDirectory=True)
296 _, subdirs, _ = next(subdir.walk())
297 for subdir_name in subdirs:
298 if subdir_name not in filter_map:
299 raise RuntimeError(f"Filter {subdir_name} not known to supplied camera in {subdir}.")
300 else:
301 paths_to_search.append(_SearchPath(root_uri, (dir_name, subdir_name)))
302 else:
303 paths_to_search.append(_SearchPath(root_uri, (dir_name,)))
304 elif "physical_filter" in required_dimensions:
305 # If detector is not required, but physical_filter is,
306 # then the top level should contain the filter
307 # directories.
308 if dir_name not in filter_map:
309 raise RuntimeError(f"Filter {dir_name} not known to supplied camera in {root_uri}.")
310 paths_to_search.append(_SearchPath(root_uri, (dir_name,)))
311 else:
312 # Neither detector nor physical_filter are required, so
313 # the calibration is global, and will not be found in
314 # subdirectories.
315 paths_to_search.append(_SearchPath(root_uri, ()))
317 if not paths_to_search:
318 raise RuntimeError(f"Found no data files in {root_uri}")
320 calib_type = "undefined"
321 for path in paths_to_search:
322 chip_id = None
323 filter_name = None
324 if "detector" in required_dimensions:
325 chip_id = camera[detector_map[path.children[0]]].getId()
326 if "physical_filter" in required_dimensions:
327 filter_name = filter_map[path.children[-1]]
329 calibration_data[path], calib_type = read_one_calib(path, chip_id, filter_name, calib_class)
331 calib_types.add(calib_type)
332 if len(calib_types) != 1: # set.add(None) has length 1 so None is OK here.
333 raise ValueError(f"Error mixing calib types: {calib_types}")
335 no_data = all(v == {} for v in calibration_data.values())
336 if no_data:
337 raise RuntimeError("No data to ingest")
339 return calibration_data, calib_type