Coverage for python / lsst / obs / base / gen2to3 / calibRepoConverter.py: 13%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 22:29 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ["CalibRepoConverter"] 

24 

25import os 

26import sqlite3 

27from collections import defaultdict 

28from typing import TYPE_CHECKING, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple 

29 

30import astropy.time 

31import astropy.units as u 

32from lsst.daf.butler import CollectionType, DataCoordinate, FileDataset, Timespan 

33 

34from .repoConverter import RepoConverter 

35from .repoWalker import RepoWalker 

36 

37if TYPE_CHECKING: 

38 from lsst.daf.butler import DatasetType, FormatterParameter, StorageClass 

39 

40 from ..cameraMapper import CameraMapper 

41 from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping 

42 from .repoWalker.scanner import PathElementHandler 

43 

44 

45class CalibRepoConverter(RepoConverter): 

46 """A specialization of `RepoConverter` for calibration repositories. 

47 

48 Parameters 

49 ---------- 

50 mapper : `CameraMapper` 

51 Gen2 mapper for the data repository. The root associated with the 

52 mapper is ignored and need not match the root of the repository. 

53 labels : `Sequence` [ `str` ] 

54 Strings injected into the names of the collections that calibration 

55 datasets are written and certified into (forwarded as the ``extra`` 

56 argument to `Instrument` methods that generate collection names and 

57 write curated calibrations). 

58 **kwargs 

59 Additional keyword arguments are forwarded to (and required by) 

60 `RepoConverter`. 

61 """ 

62 

63 def __init__(self, *, mapper: CameraMapper, labels: Sequence[str] = (), **kwargs): 

64 super().__init__(run=None, **kwargs) 

65 self.mapper = mapper 

66 self.collection = self.task.instrument.makeCalibrationCollectionName(*labels) 

67 self._labels = tuple(labels) 

68 self._datasetTypes = set() 

69 

70 def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool: 

71 # Docstring inherited from RepoConverter. 

72 return datasetTypeName in self.instrument.getCuratedCalibrationNames() 

73 

74 def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]: 

75 # Docstring inherited from RepoConverter. 

76 yield from self.mapper.calibrations.items() 

77 

78 def makeRepoWalkerTarget( 

79 self, 

80 datasetTypeName: str, 

81 template: str, 

82 keys: Dict[str, type], 

83 storageClass: StorageClass, 

84 formatter: FormatterParameter = None, 

85 targetHandler: Optional[PathElementHandler] = None, 

86 ) -> RepoWalker.Target: 

87 # Docstring inherited from RepoConverter. 

88 target = RepoWalker.Target( 

89 datasetTypeName=datasetTypeName, 

90 storageClass=storageClass, 

91 template=template, 

92 keys=keys, 

93 instrument=self.task.instrument.getName(), 

94 universe=self.task.registry.dimensions, 

95 formatter=formatter, 

96 targetHandler=targetHandler, 

97 translatorFactory=self.task.translatorFactory, 

98 ) 

99 self._datasetTypes.add(target.datasetType) 

100 return target 

101 

102 def _queryGen2CalibRegistry( 

103 self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str 

104 ) -> Iterator[sqlite3.Row]: 

105 """Query the Gen2 calibration registry for the validity ranges and 

106 optionally detectors and filters associated with the given dataset type 

107 and ``calibDate``. 

108 

109 Parameters 

110 ---------- 

111 db : `sqlite3.Connection` 

112 DBAPI connection to the Gen2 ``calibRegistry.sqlite3`` file. 

113 datasetType : `DatasetType` 

114 Gen3 dataset type being queried. 

115 calibDate : `str` 

116 String extracted from the ``calibDate`` template entry in Gen2 

117 filenames. 

118 

119 Yields 

120 ------ 

121 row : `sqlite3.Row` 

122 SQLite result object; will have ``validStart`` and ``validEnd`` 

123 columns, may have a detector column (named 

124 ``self.task.config.ccdKey``) and/or a ``filter`` column, depending 

125 on whether ``datasetType.dimensions`` includes ``detector`` and 

126 ``physical_filter``, respectively. 

127 """ 

128 fields = ["validStart", "validEnd"] 

129 if "detector" in datasetType.dimensions.names: 

130 fields.append(self.task.config.ccdKey) 

131 else: 

132 fields.append(f"NULL AS {self.task.config.ccdKey}") 

133 if "physical_filter" in datasetType.dimensions.names: 

134 fields.append("filter") 

135 else: 

136 assert "band" not in datasetType.dimensions.names 

137 fields.append("NULL AS filter") 

138 tables = self.mapper.mappings[datasetType.name].tables 

139 if tables is None or len(tables) == 0: 

140 self.task.log.warning( 

141 "Could not extract calibration ranges for %s in %s; no tables in Gen2 mapper.", 

142 datasetType.name, 

143 self.root, 

144 tables[0], 

145 ) 

146 return 

147 query = f"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;" 

148 try: 

149 results = db.execute(query, (calibDate,)) 

150 except sqlite3.OperationalError as e: 

151 self.task.log.warning( 

152 "Could not extract calibration ranges for %s in %s from table %s: %r", 

153 datasetType.name, 

154 self.root, 

155 tables[0], 

156 e, 

157 ) 

158 return 

159 yield from results 

160 

161 def _finish( 

162 self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]], count: int 

163 ) -> None: 

164 # Docstring inherited from RepoConverter. 

165 # Read Gen2 calibration repository and extract validity ranges for 

166 # all datasetType + calibDate combinations we ingested. 

167 calibFile = os.path.join(self.root, "calibRegistry.sqlite3") 

168 # If the registry file does not exist this indicates a problem. 

169 # We check explicitly because sqlite will try to create the 

170 # missing file if it can. 

171 if not os.path.exists(calibFile): 

172 raise RuntimeError( 

173 f"Attempting to convert calibrations but no registry database found in {self.root}" 

174 ) 

175 

176 # Initially we collate timespans for each dataId + dataset type 

177 # combination. This allows us to check for small gaps or overlaps 

178 # inherent in the ambiguous usage of validity ranges in gen2 

179 timespansByDataId = defaultdict(list) 

180 

181 db = sqlite3.connect(calibFile) 

182 db.row_factory = sqlite3.Row 

183 

184 with self.progress.bar(desc="Querying Gen2 calibRegistry", total=count) as progressBar: 

185 for datasetType, datasetsByCalibDate in datasets.items(): 

186 if not datasetType.isCalibration(): 

187 continue 

188 gen2keys = {} 

189 if "detector" in datasetType.dimensions.names: 

190 gen2keys[self.task.config.ccdKey] = int 

191 if "physical_filter" in datasetType.dimensions.names: 

192 gen2keys["filter"] = str 

193 translator = self.instrument.makeDataIdTranslatorFactory().makeMatching( 

194 datasetType.name, gen2keys, instrument=self.instrument.getName() 

195 ) 

196 for calibDate, datasetsForCalibDate in datasetsByCalibDate.items(): 

197 assert calibDate is not None, ( 

198 "datasetType.isCalibration() is set by " 

199 "the presence of calibDate in the Gen2 template" 

200 ) 

201 # Build a mapping that lets us find DatasetRefs by data ID, 

202 # for this DatasetType and calibDate. We know there is 

203 # only one ref for each data ID (given DatasetType and 

204 # calibDate as well). 

205 refsByDataId = {} 

206 for dataset in datasetsForCalibDate: 

207 refsByDataId.update((ref.dataId, ref) for ref in dataset.refs) 

208 # Query the Gen2 calibration repo for the validity ranges 

209 # for this DatasetType and calibDate, and look up the 

210 # appropriate refs by data ID. 

211 for row in self._queryGen2CalibRegistry(db, datasetType, calibDate): 

212 # For validity times we use TAI as some gen2 repos have 

213 # validity dates very far in the past or future. 

214 timespan = Timespan( 

215 astropy.time.Time(row["validStart"], format="iso", scale="tai"), 

216 astropy.time.Time(row["validEnd"], format="iso", scale="tai"), 

217 ) 

218 # Make a Gen2 data ID from query results. 

219 gen2id = {} 

220 if "detector" in datasetType.dimensions.names: 

221 gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey] 

222 if "physical_filter" in datasetType.dimensions.names: 

223 gen2id["filter"] = row["filter"] 

224 # Translate that to Gen3. 

225 gen3id, _ = translator(gen2id) 

226 dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions) 

227 ref = refsByDataId.get(dataId) 

228 if ref is not None: 

229 # Validity ranges must not overlap for the same 

230 # dataID datasetType combination. Use that as a 

231 # primary key and store the timespan and ref in a 

232 # tuple as the value for later timespan validation. 

233 timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref)) 

234 else: 

235 # The Gen2 calib registry mentions this dataset, 

236 # but it isn't included in what we've ingested. 

237 # This might sometimes be a problem, but it should 

238 # usually represent someone just trying to convert 

239 # a subset of the Gen2 repo, so I don't think it's 

240 # appropriate to warn or even log at info, since in 

241 # that case there may be a _lot_ of these messages. 

242 self.task.log.debug( 

243 "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.", 

244 datasetType.name, 

245 calibDate, 

246 dataId, 

247 ) 

248 progressBar.update(len(datasetsForCalibDate)) 

249 

250 # Analyze the timespans to check for overlap problems 

251 # Gaps of a day should be closed since we assume differing 

252 # conventions in gen2 repos. 

253 

254 # We need to correct any validity range issues and store the 

255 # results in a dict-of-lists keyed by Timespan, since 

256 # Registry.certify operates on one Timespan and multiple refs at a 

257 # time. 

258 refsByTimespan = defaultdict(list) 

259 

260 # A day with a bit of fuzz to indicate the largest gap we will close 

261 max_gap = astropy.time.TimeDelta(1.001, format="jd", scale="tai") 

262 

263 # Since in many cases the validity ranges are relevant for multiple 

264 # dataset types and dataIds we don't want to over-report and so 

265 # cache the messages for later. 

266 info_messages = set() 

267 warn_messages = set() 

268 for timespans in self.progress.wrap(timespansByDataId.values(), desc="Fixing validity ranges"): 

269 # Sort all the timespans and check overlaps 

270 sorted_timespans = sorted(timespans, key=lambda x: x[0]) 

271 timespan_prev, ref_prev = sorted_timespans.pop(0) 

272 for timespan, ref in sorted_timespans: 

273 # See if we have a suspicious gap 

274 delta = timespan.begin - timespan_prev.end 

275 abs_delta = abs(delta) 

276 if abs_delta > 0 and abs_delta < max_gap: 

277 if delta > 0: 

278 # Gap between timespans 

279 msg = f"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}" 

280 info_messages.add(msg) 

281 else: 

282 # Overlap of timespans 

283 msg = ( 

284 f"Calibration validity overlap of {abs(delta).to(u.s)} removed for period " 

285 f"{timespan.begin} to {timespan_prev.end}" 

286 ) 

287 warn_messages.add(msg) 

288 

289 self.task.log.debug( 

290 "Correcting validity range for %s with end %s", ref_prev, timespan_prev.end 

291 ) 

292 

293 # Assume this gap is down to convention in gen2. 

294 # We have to adjust the previous timespan to fit 

295 # since we always trust validStart. 

296 timespan_prev = Timespan(begin=timespan_prev.begin, end=timespan.begin) 

297 # Store the previous timespan and ref since it has now 

298 # been verified 

299 refsByTimespan[timespan_prev].append(ref_prev) 

300 

301 # And update the previous values for the next iteration 

302 timespan_prev = timespan 

303 ref_prev = ref 

304 

305 # Store the final timespan/ref pair 

306 refsByTimespan[timespan_prev].append(ref_prev) 

307 

308 # Issue any pending log messages we have recorded 

309 for msg in sorted(info_messages): 

310 self.task.log.info(msg) 

311 for msg in sorted(warn_messages): 

312 self.task.log.warning(msg) 

313 

314 # Done reading from Gen2, time to certify into Gen3. 

315 self.task.registry.registerCollection(self.collection, type=CollectionType.CALIBRATION) 

316 for timespan, refs in refsByTimespan.items(): 

317 self.task.registry.certify(self.collection, refs, timespan) 

318 

319 def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str: 

320 # Docstring inherited from RepoConverter. 

321 if calibDate is None: 

322 return super().getRun(datasetTypeName) 

323 else: 

324 return self.instrument.makeCalibrationCollectionName( 

325 *self._labels, 

326 self.instrument.formatCollectionTimestamp(calibDate), 

327 ) 

328 

329 # Class attributes that will be shadowed by public instance attributes; 

330 # defined here only for documentation purposes. 

331 

332 mapper: CameraMapper 

333 """Gen2 mapper associated with this repository. 

334 """