Coverage for python / lsst / obs / base / _read_curated_calibs.py: 11%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:21 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["CuratedCalibration", "read_all"] 

25 

26import datetime 

27import os 

28import re 

29from collections.abc import Mapping 

30from typing import TYPE_CHECKING, Any, NamedTuple, Protocol 

31 

32from lsst.resources import ResourcePath 

33 

34if TYPE_CHECKING: 

35 import lsst.afw.cameraGeom 

36 from lsst.resources import ResourcePathExpression 

37 

38 

39class _SearchPath(NamedTuple): 

40 root: ResourcePath 

41 children: tuple[str, ...] 

42 

43 

44class CuratedCalibration(Protocol): 

45 """Protocol that describes the methods needed by this class when dealing 

46 with curated calibration datasets. 

47 """ 

48 

49 @classmethod 

50 def readText(cls, path: str) -> CuratedCalibration: ... 

51 

52 def getMetadata(self) -> Mapping: ... 

53 

54 

55def read_one_calib( 

56 path: _SearchPath, 

57 chip_id: int | None, 

58 filter_name: str | None, 

59 calib_class: type[CuratedCalibration], 

60) -> tuple[dict[datetime.datetime, CuratedCalibration], str]: 

61 """Read data for a particular path from the standard format at a 

62 particular root. 

63 

64 Parameters 

65 ---------- 

66 path : `_SearchPath` 

67 This tuple contains the top level of the data tree 

68 and then further optional subdirectories in subsequent 

69 indices. See Notes below for more details. 

70 chip_id : `int` or None 

71 The identifier for the sensor in question. To be used in 

72 validation. 

73 filter_name : `str` or None 

74 The identifier for the filter in question. To be used in 

75 validation. 

76 calib_class : `typing.Any` 

77 The class to use to read the curated calibration text file. Must 

78 support the ``readText()`` method. 

79 

80 Returns 

81 ------- 

82 data_dict : `dict` 

83 A dictionary of calibration objects constructed from the 

84 appropriate factory class. The key is the validity start time 

85 as a `datetime.datetime` object. 

86 calib_type : `str` 

87 The type of calibrations that have been read and are included 

88 in the ``data_dict``. 

89 

90 Notes 

91 ----- 

92 Curated calibrations are read from the appropriate ``obs_ _data`` 

93 package, and are required to have a common directory structure to 

94 be identified and ingested properly. The top-level directories 

95 are organized by the instrument's ``policyName``. These names are 

96 generally all lower-case, but this is not universally true. 

97 

98 Below the top-level instrument directory, subdirectories named 

99 after the curated calibration type contained within, with the 

100 dataset_type_name forced to lowercase. For calibrations that 

101 depend on the detector (i.e., the defects), the next level of 

102 subdirectories should contain directories named with the detector 

103 name, again forced to lowercase. 

104 

105 For filter dependent calibrations that do not depend on the 

106 detector (i.e., transmission_filter), the calibrations should be 

107 grouped into directories named with the physical filter name 

108 (again, all lowercase) below the dataset_type_name directory. 

109 Filter dependent calibrations that do depend on the detector 

110 (i.e., transmission_system), have physical filter named 

111 directories below the detector level directories. 

112 

113 The files themselves are required to be named with the validity start 

114 date and the appropriate file extension. The date must be in an ISO format 

115 that can be parsed by `datetime.datetime.fromisoformat`. This can include 

116 variants commonly used data packages such as ``2025-04-30T12:23:00`` or the 

117 more compact ``20250430T123000``. 

118 """ 

119 files: list[ResourcePath] = [] 

120 extensions = (".ecsv", ".yaml", ".json") 

121 search_root = path.root 

122 for subdir in path.children: 

123 search_root = search_root.join(subdir, forceDirectory=True) 

124 

125 file_filter = "(" + "|".join(re.escape(ext) for ext in extensions) + ")$" 

126 files = list(ResourcePath.findFileResources([search_root], file_filter=file_filter)) 

127 

128 # Convention is that data reside in location where the final two parts of 

129 # the directory root are <instrument>/<calib type>. 

130 # os.path.split() does not like a trailing "/" directory indicator. 

131 parts = os.path.split(path.root.path.removesuffix("/")) 

132 instrument = os.path.split(parts[0])[1] # convention is that these reside at <instrument>/<calib_type> 

133 calib_type = parts[1] 

134 

135 data_dict: dict[datetime.datetime, Any] = {} 

136 for f in files: 

137 date_str = f.updatedExtension("").basename() 

138 # Assume files are using some form of ISO date string. 

139 valid_start = datetime.datetime.fromisoformat(date_str) 

140 # For now readText does not know about URIs. 

141 with f.as_local() as local_uri: 

142 data_dict[valid_start] = calib_class.readText(local_uri.ospath) 

143 check_metadata(data_dict[valid_start], valid_start, instrument, chip_id, filter_name, f, calib_type) 

144 return data_dict, calib_type 

145 

146 

147def check_metadata( 

148 obj: Any, 

149 valid_start: datetime.datetime, 

150 instrument: str, 

151 chip_id: int | None, 

152 filter_name: str | None, 

153 filepath: ResourcePath, 

154 calib_type: str, 

155) -> None: 

156 """Check that the metadata is complete and self consistent. 

157 

158 Parameters 

159 ---------- 

160 obj : object of same type as the factory 

161 Object to retrieve metadata from in order to compare with 

162 metadata inferred from the path. 

163 valid_start : `datetime` 

164 Start of the validity range for data. 

165 instrument : `str` 

166 Name of the instrument in question. 

167 chip_id : `int` 

168 Identifier of the sensor in question. 

169 filter_name : `str` 

170 Identifier of the filter in question. 

171 filepath : `lsst.resources.ResourcePath` 

172 Path of the file read to construct the data. 

173 calib_type : `str` 

174 Name of the type of data being read. 

175 

176 Returns 

177 ------- 

178 None 

179 

180 Raises 

181 ------ 

182 ValueError 

183 If the metadata from the path and the metadata encoded 

184 in the path do not match for any reason. 

185 """ 

186 md = obj.getMetadata() 

187 # It is an error if these two do not exist. 

188 finst = md["INSTRUME"] 

189 fcalib_type = md["OBSTYPE"] 

190 # These may optionally not exist. 

191 fchip_id = md.get("DETECTOR", None) 

192 ffilter_name = md.get("FILTER", None) 

193 

194 if chip_id is not None: 

195 fchip_id = int(fchip_id) 

196 if filter_name is not None: 

197 ffilter_name = ffilter_name.lower() 

198 filter_name = filter_name.lower() 

199 

200 if not ( 

201 (finst.lower(), fchip_id, ffilter_name, fcalib_type.lower()) 

202 == (instrument.lower(), chip_id, filter_name, calib_type.lower()) 

203 ): 

204 raise ValueError( 

205 "Path and file metadata do not agree:\n" 

206 f"Path metadata: {instrument} {chip_id} {filter_name} {calib_type}\n" 

207 f"File metadata: {finst} {fchip_id} {ffilter_name} {fcalib_type}\n" 

208 f"File read from : {filepath}\n" 

209 ) 

210 

211 

212def read_all( 

213 root: ResourcePathExpression, 

214 camera: lsst.afw.cameraGeom.Camera, 

215 calib_class: type[CuratedCalibration], 

216 required_dimensions: list[str], 

217 filters: set[str], 

218) -> tuple[dict[_SearchPath, dict[datetime.datetime, CuratedCalibration]], str]: 

219 """Read all data from the standard format at a particular root. 

220 

221 Parameters 

222 ---------- 

223 root : `lsst.resources.ResourcePathExpression` 

224 URI to the top level of the data tree. This is expected to hold 

225 directories named after the sensor names. They are expected to be 

226 lower case. 

227 camera : `lsst.afw.cameraGeom.Camera` 

228 The camera that goes with the data being read. 

229 calib_class : `typing.Any` 

230 The class to use to read the curated calibration text file. Must 

231 support the ``readText()`` and ``getMetadata()`` methods. 

232 required_dimensions : `list` [`str`] 

233 Dimensions required for the calibration. 

234 filters : `list` [`str`] 

235 List of the known filters for this camera. Used to identify 

236 filter-dependent calibrations. 

237 

238 Returns 

239 ------- 

240 calibration_data : `dict` 

241 A dictionary of dictionaries of calibration objects 

242 constructed with the appropriate factory class. The first key 

243 is the sensor name in lower case, and the second is the 

244 validity start time as a `datetime.datetime` object. 

245 calib_type : `str` 

246 The type of calibrations that have been read and are included 

247 in the ``data_dict``. 

248 

249 Notes 

250 ----- 

251 Each leaf object in the constructed dictionary has metadata associated with 

252 it. The detector ID may be retrieved from the DETECTOR entry of that 

253 metadata. 

254 """ 

255 calibration_data: dict[_SearchPath, dict[datetime.datetime, CuratedCalibration]] = {} 

256 

257 root_uri = ResourcePath(root, forceDirectory=True, forceAbsolute=True) 

258 

259 # Read all the subdirectories. These will be things like detectors or 

260 # physical filters. We assume that every sub directory contains calibration 

261 # data. We only walk the top level. 

262 _, dirs, _ = next(root_uri.walk()) 

263 

264 # If there are no sub directories this is a global calibration. 

265 if not dirs: 

266 dirs = [""] 

267 

268 calib_types = set() 

269 # We assume the directories have been lowered. 

270 detector_map = {det.getName().lower(): det.getName() for det in camera} 

271 filter_map = {filterName.lower().replace(" ", "_"): filterName for filterName in filters} 

272 

273 paths_to_search: list[_SearchPath] = [] 

274 for dir_name in dirs: 

275 # Catch possible mistakes: 

276 if "detector" in required_dimensions: 

277 if dir_name not in detector_map: 

278 # Top level directories must be detectors if they're 

279 # required. 

280 detectors = list(detector_map) 

281 max_detectors = 10 

282 note_str = "knows" 

283 if len(detectors) > max_detectors: 

284 # report example subset 

285 note_str = "examples" 

286 detectors = detectors[:max_detectors] 

287 raise RuntimeError( 

288 f"Detector {dir_name} not known to supplied camera " 

289 f"{camera.getName()} ({note_str}: {','.join(detectors)})" 

290 ) 

291 elif "physical_filter" in required_dimensions: 

292 # If the calibration depends on both detector and 

293 # physical_filter, the subdirs here should contain the 

294 # filter name. 

295 subdir = root_uri.join(dir_name, forceDirectory=True) 

296 _, subdirs, _ = next(subdir.walk()) 

297 for subdir_name in subdirs: 

298 if subdir_name not in filter_map: 

299 raise RuntimeError(f"Filter {subdir_name} not known to supplied camera in {subdir}.") 

300 else: 

301 paths_to_search.append(_SearchPath(root_uri, (dir_name, subdir_name))) 

302 else: 

303 paths_to_search.append(_SearchPath(root_uri, (dir_name,))) 

304 elif "physical_filter" in required_dimensions: 

305 # If detector is not required, but physical_filter is, 

306 # then the top level should contain the filter 

307 # directories. 

308 if dir_name not in filter_map: 

309 raise RuntimeError(f"Filter {dir_name} not known to supplied camera in {root_uri}.") 

310 paths_to_search.append(_SearchPath(root_uri, (dir_name,))) 

311 else: 

312 # Neither detector nor physical_filter are required, so 

313 # the calibration is global, and will not be found in 

314 # subdirectories. 

315 paths_to_search.append(_SearchPath(root_uri, ())) 

316 

317 if not paths_to_search: 

318 raise RuntimeError(f"Found no data files in {root_uri}") 

319 

320 calib_type = "undefined" 

321 for path in paths_to_search: 

322 chip_id = None 

323 filter_name = None 

324 if "detector" in required_dimensions: 

325 chip_id = camera[detector_map[path.children[0]]].getId() 

326 if "physical_filter" in required_dimensions: 

327 filter_name = filter_map[path.children[-1]] 

328 

329 calibration_data[path], calib_type = read_one_calib(path, chip_id, filter_name, calib_class) 

330 

331 calib_types.add(calib_type) 

332 if len(calib_types) != 1: # set.add(None) has length 1 so None is OK here. 

333 raise ValueError(f"Error mixing calib types: {calib_types}") 

334 

335 no_data = all(v == {} for v in calibration_data.values()) 

336 if no_data: 

337 raise RuntimeError("No data to ingest") 

338 

339 return calibration_data, calib_type