Coverage for python / lsst / daf / butler / direct_query_driver / _result_page_converter.py: 31%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import datetime 

31from abc import abstractmethod 

32from collections.abc import Iterable 

33from dataclasses import dataclass 

34from typing import TYPE_CHECKING, Any 

35 

36import astropy.time 

37import sqlalchemy 

38 

39from .._dataset_ref import DatasetRef 

40from .._dataset_type import DatasetType 

41from ..dimensions import ( 

42 DataCoordinate, 

43 DimensionElement, 

44 DimensionGroup, 

45 DimensionRecord, 

46 DimensionRecordSet, 

47 SkyPixDimension, 

48) 

49from ..dimensions.record_cache import DimensionRecordCache 

50from ..queries import tree as qt 

51from ..queries.driver import ( 

52 DataCoordinateResultPage, 

53 DatasetRefResultPage, 

54 DimensionRecordResultPage, 

55 GeneralResultPage, 

56 ResultPage, 

57) 

58from ..queries.result_specs import ( 

59 DataCoordinateResultSpec, 

60 DatasetRefResultSpec, 

61 DimensionRecordResultSpec, 

62 GeneralResultSpec, 

63) 

64from ..timespan_database_representation import TimespanDatabaseRepresentation 

65 

66if TYPE_CHECKING: 

67 from ..registry.interfaces import Database 

68 

69 

70class ResultPageConverter: 

71 """Interface for raw SQL result row conversion to `ResultPage`.""" 

72 

73 @abstractmethod 

74 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> ResultPage: 

75 """Convert raw SQL result rows into a `ResultPage` object containing 

76 high-level `Butler` types. 

77 

78 Parameters 

79 ---------- 

80 raw_rows : `~collections.abc.Iterable` [ `sqlalchemy.Row` ] 

81 Iterable of SQLAlchemy rows, with `Postprocessing` filters already 

82 applied. 

83 

84 Returns 

85 ------- 

86 result_page : `ResultPage` 

87 Converted results. 

88 """ 

89 raise NotImplementedError() 

90 

91 

92@dataclass(frozen=True) 

93class ResultPageConverterContext: 

94 """Parameters used by all result page converters.""" 

95 

96 db: Database 

97 column_order: qt.ColumnOrder 

98 dimension_record_cache: DimensionRecordCache 

99 

100 

101class DimensionRecordResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01 

102 """Converts raw SQL rows into pages of `DimensionRecord` query results.""" 

103 

104 def __init__(self, spec: DimensionRecordResultSpec, ctx: ResultPageConverterContext) -> None: 

105 self._result_spec = spec 

106 self._converter = _create_dimension_record_row_converter(spec.element, ctx, use_cache=False) 

107 

108 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> DimensionRecordResultPage: 

109 record_set = DimensionRecordSet(self._result_spec.element) 

110 for raw_row in raw_rows: 

111 record_set.add(self._converter.convert(raw_row)) 

112 return DimensionRecordResultPage(spec=self._result_spec, rows=record_set) 

113 

114 

115def _create_dimension_record_row_converter( 

116 element: DimensionElement, ctx: ResultPageConverterContext, *, use_cache: bool = True 

117) -> _DimensionRecordRowConverter: 

118 if use_cache and element.is_cached: 

119 return _CachedDimensionRecordRowConverter(element, ctx.dimension_record_cache) 

120 elif isinstance(element, SkyPixDimension): 

121 return _SkypixDimensionRecordRowConverter(element) 

122 else: 

123 return _NormalDimensionRecordRowConverter(element, ctx.db) 

124 

125 

126class _DimensionRecordRowConverter: 

127 """Interface definition for helper objects that convert a result row into a 

128 DimensionRecord instance. 

129 """ 

130 

131 @abstractmethod 

132 def convert(self, row: sqlalchemy.Row) -> DimensionRecord: 

133 raise NotImplementedError() 

134 

135 

136class _NormalDimensionRecordRowConverter(_DimensionRecordRowConverter): 

137 """Helper for converting result row into a DimensionRecord instance for 

138 typical dimensions (all non-skypix dimensions). 

139 """ 

140 

141 def __init__(self, element: DimensionElement, db: Database) -> None: 

142 self._db = db 

143 self._element = element 

144 self._record_cls = self._element.RecordClass 

145 

146 # Mapping from DimensionRecord attribute name to qualified column 

147 # name, but as a list of tuples since we'd just iterate over items 

148 # anyway. 

149 column_map = list( 

150 zip( 

151 element.schema.dimensions.names, 

152 element.dimensions.names, 

153 ) 

154 ) 

155 for field in element.schema.remainder.names: 

156 if field != "timespan": 

157 column_map.append((field, qt.ColumnSet.get_qualified_name(element.name, field))) 

158 self._column_map = column_map 

159 

160 self._timespan_qualified_name: str | None = None 

161 if element.temporal: 

162 self._timespan_qualified_name = qt.ColumnSet.get_qualified_name(element.name, "timespan") 

163 

164 def convert(self, row: sqlalchemy.Row) -> DimensionRecord: 

165 m = row._mapping 

166 d = {k: m[v] for k, v in self._column_map} 

167 if self._timespan_qualified_name is not None: 

168 d["timespan"] = self._db.getTimespanRepresentation().extract( 

169 m, name=self._timespan_qualified_name 

170 ) 

171 return self._record_cls(**d) 

172 

173 

174class _SkypixDimensionRecordRowConverter(_DimensionRecordRowConverter): 

175 """Helper for converting result row into a DimensionRecord instance for 

176 skypix dimensions. 

177 """ 

178 

179 def __init__(self, element: SkyPixDimension): 

180 self._pixelization = element.pixelization 

181 self._id_qualified_name = qt.ColumnSet.get_qualified_name(element.name, None) 

182 self._record_cls = element.RecordClass 

183 

184 def convert(self, row: sqlalchemy.Row) -> DimensionRecord: 

185 pixel_id = row._mapping[self._id_qualified_name] 

186 return self._record_cls(id=pixel_id, region=self._pixelization.pixel(pixel_id)) 

187 

188 

189class _CachedDimensionRecordRowConverter(_DimensionRecordRowConverter): 

190 """Helper for converting result row into a DimensionRecord instance for 

191 "cached" dimensions. These are dimensions with few records, used by 

192 many/all dataset types. For these dimensions, a complete cache of records 

193 is stored client-side instead of re-fetching them from the DB constantly. 

194 """ 

195 

196 def __init__(self, element: DimensionElement, cache: DimensionRecordCache) -> None: 

197 self._element = element 

198 self._cache = cache 

199 self._key_columns = [qt.ColumnSet.get_qualified_name(name, None) for name in element.required.names] 

200 

201 def convert(self, row: sqlalchemy.Row) -> DimensionRecord: 

202 mapping = row._mapping 

203 values = tuple(mapping[key] for key in self._key_columns) 

204 return self._cache[self._element.name].find_with_required_values(values) 

205 

206 

207class DataCoordinateResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01 

208 """Converts raw SQL result iterables into a page of `DataCoordinate` 

209 query results. 

210 """ 

211 

212 def __init__( 

213 self, 

214 spec: DataCoordinateResultSpec, 

215 ctx: ResultPageConverterContext, 

216 ) -> None: 

217 self._spec = spec 

218 self._converter = _DataCoordinateRowConverter( 

219 spec.dimensions, ctx, include_dimension_records=spec.include_dimension_records 

220 ) 

221 

222 def convert( 

223 self, 

224 raw_rows: Iterable[sqlalchemy.Row], 

225 ) -> DataCoordinateResultPage: 

226 convert = self._converter.convert 

227 rows = [convert(row) for row in raw_rows] 

228 return DataCoordinateResultPage(spec=self._spec, rows=rows) 

229 

230 

231class DatasetRefResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01 

232 """Convert raw SQL result iterables into pages of `DatasetRef` query 

233 results. 

234 """ 

235 

236 def __init__( 

237 self, 

238 spec: DatasetRefResultSpec, 

239 dataset_type: DatasetType, 

240 ctx: ResultPageConverterContext, 

241 ) -> None: 

242 self._spec = spec 

243 self._dataset_type = dataset_type 

244 self._data_coordinate_converter = _DataCoordinateRowConverter( 

245 spec.dimensions, ctx, include_dimension_records=spec.include_dimension_records 

246 ) 

247 self._column_order = ctx.column_order 

248 self._name_shrinker = ctx.db.name_shrinker 

249 

250 def convert( 

251 self, 

252 raw_rows: Iterable[sqlalchemy.Row], 

253 ) -> DatasetRefResultPage: 

254 run_column = self._name_shrinker.shrink( 

255 qt.ColumnSet.get_qualified_name(self._spec.dataset_type_name, "run") 

256 ) 

257 dataset_id_column = self._name_shrinker.shrink( 

258 qt.ColumnSet.get_qualified_name(self._spec.dataset_type_name, "dataset_id") 

259 ) 

260 rows = [ 

261 DatasetRef( 

262 datasetType=self._dataset_type, 

263 dataId=self._data_coordinate_converter.convert(row), 

264 run=row._mapping[run_column], 

265 id=row._mapping[dataset_id_column], 

266 conform=False, 

267 ) 

268 for row in raw_rows 

269 ] 

270 

271 return DatasetRefResultPage(spec=self._spec, rows=rows) 

272 

273 

274class _DataCoordinateRowConverter: 

275 """Helper for converting a raw SQL result row into a DataCoordinate 

276 instance. 

277 """ 

278 

279 def __init__( 

280 self, 

281 dimensions: DimensionGroup, 

282 ctx: ResultPageConverterContext, 

283 include_dimension_records: bool, 

284 ): 

285 assert list(dimensions.data_coordinate_keys) == ctx.column_order.dimension_key_names, ( 

286 "Dimension keys in result row should be in same order as those specified by the result spec" 

287 ) 

288 

289 self._dimensions = dimensions 

290 self._column_order = ctx.column_order 

291 self._dimension_record_converter = None 

292 if include_dimension_records: 

293 self._dimension_record_converter = _DimensionGroupRecordRowConverter(dimensions, ctx) 

294 

295 def convert(self, row: sqlalchemy.Row) -> DataCoordinate: 

296 coordinate = DataCoordinate.from_full_values( 

297 self._dimensions, tuple(self._column_order.extract_dimension_key_columns(row)) 

298 ) 

299 

300 if self._dimension_record_converter is None: 

301 return coordinate 

302 else: 

303 return coordinate.expanded(self._dimension_record_converter.convert(row)) 

304 

305 

306class _DimensionGroupRecordRowConverter: # numpydoc ignore=PR01 

307 """Helper for pulling out all the DimensionRecords in a raw SQL result 

308 row. 

309 """ 

310 

311 def __init__(self, dimensions: DimensionGroup, ctx: ResultPageConverterContext) -> None: 

312 self._record_converters = { 

313 name: _create_dimension_record_row_converter(dimensions.universe[name], ctx) 

314 for name in dimensions.elements 

315 } 

316 

317 def convert(self, row: sqlalchemy.Row) -> dict[str, DimensionRecord]: # numpydoc ignore=PR01 

318 """Return a mapping from dimension name to dimension records for all 

319 the dimensions in the database row. 

320 """ 

321 return {name: converter.convert(row) for name, converter in self._record_converters.items()} 

322 

323 

324class GeneralResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01 

325 """Converts raw SQL rows into pages of `GeneralResult` query results.""" 

326 

327 def __init__(self, spec: GeneralResultSpec, ctx: ResultPageConverterContext) -> None: 

328 self.spec = spec 

329 # In case `spec.include_dimension_records` is True then in addition to 

330 # columns returned by the query we have to add columns from dimension 

331 # records that are not returned by the query. These columns belong to 

332 # either cached or skypix dimensions. 

333 columns = spec.get_result_columns() 

334 universe = spec.dimensions.universe 

335 self.converters: list[_GeneralColumnConverter] = [] 

336 self.record_converters: dict[DimensionElement, _DimensionRecordRowConverter] = {} 

337 for column in columns: 

338 column_name = qt.ColumnSet.get_qualified_name(column.logical_table, column.field) 

339 converter: _GeneralColumnConverter 

340 if column.field == TimespanDatabaseRepresentation.NAME: 

341 converter = _TimespanGeneralColumnConverter(column_name, ctx.db) 

342 elif column.field == "ingest_date": 

343 converter = _TimestampGeneralColumnConverter(column_name) 

344 else: 

345 converter = _DefaultGeneralColumnConverter(column_name) 

346 self.converters.append(converter) 

347 

348 if spec.include_dimension_records: 

349 universe = self.spec.dimensions.universe 

350 for element_name in self.spec.dimensions.elements: 

351 element = universe[element_name] 

352 if isinstance(element, SkyPixDimension): 

353 self.record_converters[element] = _SkypixDimensionRecordRowConverter(element) 

354 elif element.is_cached: 

355 self.record_converters[element] = _CachedDimensionRecordRowConverter( 

356 element, ctx.dimension_record_cache 

357 ) 

358 

359 def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> GeneralResultPage: 

360 rows = [] 

361 dimension_records = None 

362 if self.spec.include_dimension_records: 

363 dimension_records = {element: DimensionRecordSet(element) for element in self.record_converters} 

364 for row in raw_rows: 

365 rows.append(tuple(cvt.convert(row) for cvt in self.converters)) 

366 if dimension_records: 

367 for element, converter in self.record_converters.items(): 

368 dimension_records[element].add(converter.convert(row)) 

369 

370 return GeneralResultPage(spec=self.spec, rows=rows, dimension_records=dimension_records) 

371 

372 

373class _GeneralColumnConverter: 

374 """Interface for converting one or more columns in a result row to a single 

375 column value in output row. 

376 """ 

377 

378 @abstractmethod 

379 def convert(self, row: sqlalchemy.Row) -> Any: 

380 """Convert one or more columns in the row into single value. 

381 

382 Parameters 

383 ---------- 

384 row : `sqlalchemy.Row` 

385 Row of values. 

386 

387 Returns 

388 ------- 

389 value : `typing.Any` 

390 Result of the conversion. 

391 """ 

392 raise NotImplementedError() 

393 

394 

395class _DefaultGeneralColumnConverter(_GeneralColumnConverter): 

396 """Converter that returns column value without conversion. 

397 

398 Parameters 

399 ---------- 

400 name : `str` 

401 Column name 

402 """ 

403 

404 def __init__(self, name: str): 

405 self.name = name 

406 

407 def convert(self, row: sqlalchemy.Row) -> Any: 

408 return row._mapping[self.name] 

409 

410 

411class _TimestampGeneralColumnConverter(_GeneralColumnConverter): 

412 """Converter that transforms ``datetime`` instances into astropy Time. Only 

413 ``dataset.ingest_date`` column was using native timestamps in the initial 

414 schema version, and we are switching to our common nanoseconds-since-epoch 

415 representation for that column in newer schema versions. For both schema 

416 versions we want to return astropy time to clients. 

417 

418 Parameters 

419 ---------- 

420 name : `str` 

421 Column name 

422 """ 

423 

424 def __init__(self, name: str): 

425 self.name = name 

426 

427 def convert(self, row: sqlalchemy.Row) -> Any: 

428 value = row._mapping[self.name] 

429 if isinstance(value, datetime.datetime): 

430 value = astropy.time.Time(value, scale="utc").tai 

431 return value 

432 

433 

434class _TimespanGeneralColumnConverter(_GeneralColumnConverter): 

435 """Converter that extracts timespan from the row. 

436 

437 Parameters 

438 ---------- 

439 name : `str` 

440 Column name or prefix. 

441 db : `Database` 

442 Database instance. 

443 """ 

444 

445 def __init__(self, name: str, db: Database): 

446 self.timespan_class = db.getTimespanRepresentation() 

447 self.name = name 

448 

449 def convert(self, row: sqlalchemy.Row) -> Any: 

450 timespan = self.timespan_class.extract(row._mapping, self.name) 

451 return timespan