Coverage for python / lsst / obs / base / gen2to3 / calibRepoConverter.py: 13%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 22:29 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 22:29 +0000
1# This file is part of obs_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = ["CalibRepoConverter"]
25import os
26import sqlite3
27from collections import defaultdict
28from typing import TYPE_CHECKING, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple
30import astropy.time
31import astropy.units as u
32from lsst.daf.butler import CollectionType, DataCoordinate, FileDataset, Timespan
34from .repoConverter import RepoConverter
35from .repoWalker import RepoWalker
37if TYPE_CHECKING:
38 from lsst.daf.butler import DatasetType, FormatterParameter, StorageClass
40 from ..cameraMapper import CameraMapper
41 from ..mapping import Mapping as CameraMapperMapping # disambiguate from collections.abc.Mapping
42 from .repoWalker.scanner import PathElementHandler
45class CalibRepoConverter(RepoConverter):
46 """A specialization of `RepoConverter` for calibration repositories.
48 Parameters
49 ----------
50 mapper : `CameraMapper`
51 Gen2 mapper for the data repository. The root associated with the
52 mapper is ignored and need not match the root of the repository.
53 labels : `Sequence` [ `str` ]
54 Strings injected into the names of the collections that calibration
55 datasets are written and certified into (forwarded as the ``extra``
56 argument to `Instrument` methods that generate collection names and
57 write curated calibrations).
58 **kwargs
59 Additional keyword arguments are forwarded to (and required by)
60 `RepoConverter`.
61 """
63 def __init__(self, *, mapper: CameraMapper, labels: Sequence[str] = (), **kwargs):
64 super().__init__(run=None, **kwargs)
65 self.mapper = mapper
66 self.collection = self.task.instrument.makeCalibrationCollectionName(*labels)
67 self._labels = tuple(labels)
68 self._datasetTypes = set()
70 def isDatasetTypeSpecial(self, datasetTypeName: str) -> bool:
71 # Docstring inherited from RepoConverter.
72 return datasetTypeName in self.instrument.getCuratedCalibrationNames()
74 def iterMappings(self) -> Iterator[Tuple[str, CameraMapperMapping]]:
75 # Docstring inherited from RepoConverter.
76 yield from self.mapper.calibrations.items()
78 def makeRepoWalkerTarget(
79 self,
80 datasetTypeName: str,
81 template: str,
82 keys: Dict[str, type],
83 storageClass: StorageClass,
84 formatter: FormatterParameter = None,
85 targetHandler: Optional[PathElementHandler] = None,
86 ) -> RepoWalker.Target:
87 # Docstring inherited from RepoConverter.
88 target = RepoWalker.Target(
89 datasetTypeName=datasetTypeName,
90 storageClass=storageClass,
91 template=template,
92 keys=keys,
93 instrument=self.task.instrument.getName(),
94 universe=self.task.registry.dimensions,
95 formatter=formatter,
96 targetHandler=targetHandler,
97 translatorFactory=self.task.translatorFactory,
98 )
99 self._datasetTypes.add(target.datasetType)
100 return target
102 def _queryGen2CalibRegistry(
103 self, db: sqlite3.Connection, datasetType: DatasetType, calibDate: str
104 ) -> Iterator[sqlite3.Row]:
105 """Query the Gen2 calibration registry for the validity ranges and
106 optionally detectors and filters associated with the given dataset type
107 and ``calibDate``.
109 Parameters
110 ----------
111 db : `sqlite3.Connection`
112 DBAPI connection to the Gen2 ``calibRegistry.sqlite3`` file.
113 datasetType : `DatasetType`
114 Gen3 dataset type being queried.
115 calibDate : `str`
116 String extracted from the ``calibDate`` template entry in Gen2
117 filenames.
119 Yields
120 ------
121 row : `sqlite3.Row`
122 SQLite result object; will have ``validStart`` and ``validEnd``
123 columns, may have a detector column (named
124 ``self.task.config.ccdKey``) and/or a ``filter`` column, depending
125 on whether ``datasetType.dimensions`` includes ``detector`` and
126 ``physical_filter``, respectively.
127 """
128 fields = ["validStart", "validEnd"]
129 if "detector" in datasetType.dimensions.names:
130 fields.append(self.task.config.ccdKey)
131 else:
132 fields.append(f"NULL AS {self.task.config.ccdKey}")
133 if "physical_filter" in datasetType.dimensions.names:
134 fields.append("filter")
135 else:
136 assert "band" not in datasetType.dimensions.names
137 fields.append("NULL AS filter")
138 tables = self.mapper.mappings[datasetType.name].tables
139 if tables is None or len(tables) == 0:
140 self.task.log.warning(
141 "Could not extract calibration ranges for %s in %s; no tables in Gen2 mapper.",
142 datasetType.name,
143 self.root,
144 tables[0],
145 )
146 return
147 query = f"SELECT DISTINCT {', '.join(fields)} FROM {tables[0]} WHERE calibDate = ?;"
148 try:
149 results = db.execute(query, (calibDate,))
150 except sqlite3.OperationalError as e:
151 self.task.log.warning(
152 "Could not extract calibration ranges for %s in %s from table %s: %r",
153 datasetType.name,
154 self.root,
155 tables[0],
156 e,
157 )
158 return
159 yield from results
161 def _finish(
162 self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]], count: int
163 ) -> None:
164 # Docstring inherited from RepoConverter.
165 # Read Gen2 calibration repository and extract validity ranges for
166 # all datasetType + calibDate combinations we ingested.
167 calibFile = os.path.join(self.root, "calibRegistry.sqlite3")
168 # If the registry file does not exist this indicates a problem.
169 # We check explicitly because sqlite will try to create the
170 # missing file if it can.
171 if not os.path.exists(calibFile):
172 raise RuntimeError(
173 f"Attempting to convert calibrations but no registry database found in {self.root}"
174 )
176 # Initially we collate timespans for each dataId + dataset type
177 # combination. This allows us to check for small gaps or overlaps
178 # inherent in the ambiguous usage of validity ranges in gen2
179 timespansByDataId = defaultdict(list)
181 db = sqlite3.connect(calibFile)
182 db.row_factory = sqlite3.Row
184 with self.progress.bar(desc="Querying Gen2 calibRegistry", total=count) as progressBar:
185 for datasetType, datasetsByCalibDate in datasets.items():
186 if not datasetType.isCalibration():
187 continue
188 gen2keys = {}
189 if "detector" in datasetType.dimensions.names:
190 gen2keys[self.task.config.ccdKey] = int
191 if "physical_filter" in datasetType.dimensions.names:
192 gen2keys["filter"] = str
193 translator = self.instrument.makeDataIdTranslatorFactory().makeMatching(
194 datasetType.name, gen2keys, instrument=self.instrument.getName()
195 )
196 for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
197 assert calibDate is not None, (
198 "datasetType.isCalibration() is set by "
199 "the presence of calibDate in the Gen2 template"
200 )
201 # Build a mapping that lets us find DatasetRefs by data ID,
202 # for this DatasetType and calibDate. We know there is
203 # only one ref for each data ID (given DatasetType and
204 # calibDate as well).
205 refsByDataId = {}
206 for dataset in datasetsForCalibDate:
207 refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
208 # Query the Gen2 calibration repo for the validity ranges
209 # for this DatasetType and calibDate, and look up the
210 # appropriate refs by data ID.
211 for row in self._queryGen2CalibRegistry(db, datasetType, calibDate):
212 # For validity times we use TAI as some gen2 repos have
213 # validity dates very far in the past or future.
214 timespan = Timespan(
215 astropy.time.Time(row["validStart"], format="iso", scale="tai"),
216 astropy.time.Time(row["validEnd"], format="iso", scale="tai"),
217 )
218 # Make a Gen2 data ID from query results.
219 gen2id = {}
220 if "detector" in datasetType.dimensions.names:
221 gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey]
222 if "physical_filter" in datasetType.dimensions.names:
223 gen2id["filter"] = row["filter"]
224 # Translate that to Gen3.
225 gen3id, _ = translator(gen2id)
226 dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
227 ref = refsByDataId.get(dataId)
228 if ref is not None:
229 # Validity ranges must not overlap for the same
230 # dataID datasetType combination. Use that as a
231 # primary key and store the timespan and ref in a
232 # tuple as the value for later timespan validation.
233 timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref))
234 else:
235 # The Gen2 calib registry mentions this dataset,
236 # but it isn't included in what we've ingested.
237 # This might sometimes be a problem, but it should
238 # usually represent someone just trying to convert
239 # a subset of the Gen2 repo, so I don't think it's
240 # appropriate to warn or even log at info, since in
241 # that case there may be a _lot_ of these messages.
242 self.task.log.debug(
243 "Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
244 datasetType.name,
245 calibDate,
246 dataId,
247 )
248 progressBar.update(len(datasetsForCalibDate))
250 # Analyze the timespans to check for overlap problems
251 # Gaps of a day should be closed since we assume differing
252 # conventions in gen2 repos.
254 # We need to correct any validity range issues and store the
255 # results in a dict-of-lists keyed by Timespan, since
256 # Registry.certify operates on one Timespan and multiple refs at a
257 # time.
258 refsByTimespan = defaultdict(list)
260 # A day with a bit of fuzz to indicate the largest gap we will close
261 max_gap = astropy.time.TimeDelta(1.001, format="jd", scale="tai")
263 # Since in many cases the validity ranges are relevant for multiple
264 # dataset types and dataIds we don't want to over-report and so
265 # cache the messages for later.
266 info_messages = set()
267 warn_messages = set()
268 for timespans in self.progress.wrap(timespansByDataId.values(), desc="Fixing validity ranges"):
269 # Sort all the timespans and check overlaps
270 sorted_timespans = sorted(timespans, key=lambda x: x[0])
271 timespan_prev, ref_prev = sorted_timespans.pop(0)
272 for timespan, ref in sorted_timespans:
273 # See if we have a suspicious gap
274 delta = timespan.begin - timespan_prev.end
275 abs_delta = abs(delta)
276 if abs_delta > 0 and abs_delta < max_gap:
277 if delta > 0:
278 # Gap between timespans
279 msg = f"Calibration validity gap closed from {timespan_prev.end} to {timespan.begin}"
280 info_messages.add(msg)
281 else:
282 # Overlap of timespans
283 msg = (
284 f"Calibration validity overlap of {abs(delta).to(u.s)} removed for period "
285 f"{timespan.begin} to {timespan_prev.end}"
286 )
287 warn_messages.add(msg)
289 self.task.log.debug(
290 "Correcting validity range for %s with end %s", ref_prev, timespan_prev.end
291 )
293 # Assume this gap is down to convention in gen2.
294 # We have to adjust the previous timespan to fit
295 # since we always trust validStart.
296 timespan_prev = Timespan(begin=timespan_prev.begin, end=timespan.begin)
297 # Store the previous timespan and ref since it has now
298 # been verified
299 refsByTimespan[timespan_prev].append(ref_prev)
301 # And update the previous values for the next iteration
302 timespan_prev = timespan
303 ref_prev = ref
305 # Store the final timespan/ref pair
306 refsByTimespan[timespan_prev].append(ref_prev)
308 # Issue any pending log messages we have recorded
309 for msg in sorted(info_messages):
310 self.task.log.info(msg)
311 for msg in sorted(warn_messages):
312 self.task.log.warning(msg)
314 # Done reading from Gen2, time to certify into Gen3.
315 self.task.registry.registerCollection(self.collection, type=CollectionType.CALIBRATION)
316 for timespan, refs in refsByTimespan.items():
317 self.task.registry.certify(self.collection, refs, timespan)
319 def getRun(self, datasetTypeName: str, calibDate: Optional[str] = None) -> str:
320 # Docstring inherited from RepoConverter.
321 if calibDate is None:
322 return super().getRun(datasetTypeName)
323 else:
324 return self.instrument.makeCalibrationCollectionName(
325 *self._labels,
326 self.instrument.formatCollectionTimestamp(calibDate),
327 )
329 # Class attributes that will be shadowed by public instance attributes;
330 # defined here only for documentation purposes.
332 mapper: CameraMapper
333 """Gen2 mapper associated with this repository.
334 """