Coverage for python / lsst / daf / butler / direct_query_driver / _driver.py: 14%

457 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:17 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DirectQueryDriver",) 

31 

32import dataclasses 

33import itertools 

34import logging 

35import sys 

36import uuid 

37from collections import defaultdict 

38from collections.abc import Iterable, Iterator, Mapping, Set 

39from contextlib import ExitStack 

40from typing import TYPE_CHECKING, Any, TypeVar, cast, overload 

41 

42import sqlalchemy 

43 

44from .. import ddl 

45from .._collection_type import CollectionType 

46from .._dataset_type import DatasetType 

47from .._exceptions import InvalidQueryError 

48from ..dimensions import DataCoordinate, DataIdValue, DimensionElement, DimensionGroup, DimensionUniverse 

49from ..dimensions.record_cache import DimensionRecordCache 

50from ..queries import tree as qt 

51from ..queries.driver import ( 

52 DataCoordinateResultPage, 

53 DatasetRefResultPage, 

54 DimensionRecordResultPage, 

55 GeneralResultPage, 

56 QueryDriver, 

57 ResultPage, 

58) 

59from ..queries.predicate_constraints_summary import PredicateConstraintsSummary 

60from ..queries.result_specs import ( 

61 DataCoordinateResultSpec, 

62 DatasetRefResultSpec, 

63 DimensionRecordResultSpec, 

64 GeneralResultSpec, 

65 ResultSpec, 

66) 

67from ..registry import CollectionSummary, NoDefaultCollectionError 

68from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord 

69from ..registry.managers import RegistryManagerInstances 

70from ..registry.wildcards import CollectionWildcard 

71from ._postprocessing import Postprocessing 

72from ._query_analysis import ( 

73 QueryCollectionAnalysis, 

74 QueryFindFirstAnalysis, 

75 QueryJoinsAnalysis, 

76 QueryTreeAnalysis, 

77 ResolvedDatasetSearch, 

78) 

79from ._query_builder import QueryBuilder, SingleSelectQueryBuilder, UnionQueryBuilder 

80from ._result_page_converter import ( 

81 DataCoordinateResultPageConverter, 

82 DatasetRefResultPageConverter, 

83 DimensionRecordResultPageConverter, 

84 GeneralResultPageConverter, 

85 ResultPageConverter, 

86 ResultPageConverterContext, 

87) 

88from ._sql_builders import SqlJoinsBuilder, SqlSelectBuilder, make_table_spec 

89from ._sql_column_visitor import SqlColumnVisitor 

90 

91if TYPE_CHECKING: 

92 from ..registry.interfaces import Database 

93 

94 

95_LOG = logging.getLogger(__name__) 

96 

97_T = TypeVar("_T", bound=str | qt.AnyDatasetType) 

98 

99 

100class DirectQueryDriver(QueryDriver): 

101 """The `QueryDriver` implementation for `DirectButler`. 

102 

103 Parameters 

104 ---------- 

105 db : `Database` 

106 Abstraction for the SQL database. 

107 universe : `DimensionUniverse` 

108 Definitions of all dimensions. 

109 managers : `RegistryManagerInstances` 

110 Struct of registry manager objects. 

111 dimension_record_cache : `DimensionRecordCache` 

112 Cache of dimension records for infrequently-changing, commonly-used 

113 dimensions. 

114 default_collections : `~collections.abc.Sequence` [ `str` ] 

115 Default collection search path. 

116 default_data_id : `DataCoordinate` 

117 Default governor dimension values. 

118 raw_page_size : `int`, optional 

119 Number of database rows to fetch for each result page. The actual 

120 number of rows in a page may be smaller due to postprocessing. 

121 constant_rows_limit : `int`, optional 

122 Maximum number of uploaded rows to include in queries via 

123 `Database.constant_rows`; above this limit a temporary table is used 

124 instead. 

125 postprocessing_filter_factor : `int`, optional 

126 The number of database rows we expect to have to fetch to yield a 

127 single output row for queries that involve postprocessing. This is 

128 purely a performance tuning parameter that attempts to balance between 

129 fetching too much and requiring multiple fetches; the true value is 

130 highly dependent on the actual query. 

131 """ 

132 

133 def __init__( 

134 self, 

135 db: Database, 

136 universe: DimensionUniverse, 

137 managers: RegistryManagerInstances, 

138 dimension_record_cache: DimensionRecordCache, 

139 default_collections: Iterable[str], 

140 default_data_id: DataCoordinate, 

141 # Increasing raw_page_size increases memory usage for queries on 

142 # Butler server, so if you increase this you may need to increase the 

143 # memory allocation for the server in Phalanx as well. 

144 raw_page_size: int = 2000, 

145 constant_rows_limit: int = 1000, 

146 postprocessing_filter_factor: int = 10, 

147 ): 

148 self.db = db 

149 self.managers = managers 

150 self._dimension_record_cache = dimension_record_cache 

151 self._universe = universe 

152 self._default_collections = tuple(default_collections) 

153 self._default_data_id = default_data_id 

154 self._materializations: dict[qt.MaterializationKey, _MaterializationState] = {} 

155 self._upload_tables: dict[qt.DataCoordinateUploadKey, sqlalchemy.FromClause] = {} 

156 self._exit_stack: ExitStack | None = None 

157 self._raw_page_size = raw_page_size 

158 self._postprocessing_filter_factor = postprocessing_filter_factor 

159 self._constant_rows_limit = min(constant_rows_limit, db.get_constant_rows_max()) 

160 self._cursors: set[_Cursor] = set() 

161 

162 def __enter__(self) -> None: 

163 self._exit_stack = ExitStack() 

164 # It might be nice to defer opening a transaction here until first use 

165 # to reduce the time spent in transactions. But it's worth noting that 

166 # this is the default low-level behavior of the Python SQLite driver, 

167 # and it makes it incredibly prone to deadlocks. We might be okay 

168 # here, because Query doesn't do true write operations - just temp 

169 # table writes - but I'm not confident that's enough to make delayed 

170 # transaction starts safe against deadlocks, and it'd be more 

171 # complicated to implement anyway. 

172 # 

173 # We start a transaction rather than just opening a connection to make 

174 # temp table and cursors work with pg_bouncer transaction affinity. 

175 self._exit_stack.enter_context(self.db.transaction(for_temp_tables=True)) 

176 

177 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 

178 assert self._exit_stack is not None 

179 self._materializations.clear() 

180 self._upload_tables.clear() 

181 # Transfer open cursors' close methods to exit stack, this will help 

182 # with the cleanup in case a cursor raises an exceptions on close. 

183 for cursor in self._cursors: 

184 self._exit_stack.push(cursor.close) 

185 self._exit_stack.__exit__(exc_type, exc_value, traceback) 

186 self._cursors = set() 

187 self._exit_stack = None 

188 

189 @property 

190 def universe(self) -> DimensionUniverse: 

191 return self._universe 

192 

193 @overload 

194 def execute( 194 ↛ exitline 194 didn't return from function 'execute' because

195 self, result_spec: DataCoordinateResultSpec, tree: qt.QueryTree 

196 ) -> Iterator[DataCoordinateResultPage]: ... 

197 

198 @overload 

199 def execute( 199 ↛ exitline 199 didn't return from function 'execute' because

200 self, result_spec: DimensionRecordResultSpec, tree: qt.QueryTree 

201 ) -> Iterator[DimensionRecordResultPage]: ... 

202 

203 @overload 

204 def execute( 204 ↛ exitline 204 didn't return from function 'execute' because

205 self, result_spec: DatasetRefResultSpec, tree: qt.QueryTree 

206 ) -> Iterator[DatasetRefResultPage]: ... 

207 

208 @overload 

209 def execute(self, result_spec: GeneralResultSpec, tree: qt.QueryTree) -> Iterator[GeneralResultPage]: ... 209 ↛ exitline 209 didn't return from function 'execute' because

210 

211 def execute(self, result_spec: ResultSpec, tree: qt.QueryTree) -> Iterator[ResultPage]: 

212 # Docstring inherited. 

213 if self._exit_stack is None: 

214 raise RuntimeError("QueryDriver context must be entered before queries can be executed.") 

215 builder = self.build_query( 

216 tree, 

217 final_columns=result_spec.get_result_columns(), 

218 order_by=result_spec.order_by, 

219 find_first_dataset=result_spec.find_first_dataset, 

220 allow_duplicate_overlaps=result_spec.allow_duplicate_overlaps, 

221 ) 

222 sql_select, sql_columns = builder.finish_select() 

223 if result_spec.order_by: 

224 visitor = SqlColumnVisitor(sql_columns, self) 

225 sql_select = sql_select.order_by(*[visitor.expect_scalar(term) for term in result_spec.order_by]) 

226 if result_spec.limit is not None: 

227 if builder.postprocessing: 

228 builder.postprocessing.limit = result_spec.limit 

229 else: 

230 sql_select = sql_select.limit(result_spec.limit) 

231 if builder.postprocessing.limit is not None: 

232 # We might want to fetch many fewer rows than the default page 

233 # size if we have to implement limit in postprocessing. 

234 raw_page_size = min( 

235 self._postprocessing_filter_factor * builder.postprocessing.limit, 

236 self._raw_page_size, 

237 ) 

238 else: 

239 raw_page_size = self._raw_page_size 

240 # Execute the query by initializing a _Cursor object that manages the 

241 # lifetime of the result. 

242 cursor = _Cursor( 

243 self.db, 

244 sql_select, 

245 postprocessing=builder.postprocessing, 

246 raw_page_size=raw_page_size, 

247 page_converter=self._create_result_page_converter(result_spec, builder.final_columns), 

248 ) 

249 # Since this function isn't a context manager and the caller could stop 

250 # iterating before we retrieve all the results, we have to track open 

251 # cursors to ensure we can close them as part of higher-level cleanup. 

252 self._cursors.add(cursor) 

253 

254 # Return the iterator as a separate function, so that all the code 

255 # above runs immediately instead of later when we first read from the 

256 # iterator. This ensures that any exceptions that are triggered during 

257 # set-up for this query occur immediately. 

258 return self._read_results(cursor) 

259 

260 def _read_results(self, cursor: _Cursor) -> Iterator[ResultPage]: 

261 """Read out all of the result pages from the database.""" 

262 try: 

263 while (result_page := cursor.next()) is not None: 

264 yield result_page 

265 finally: 

266 self._cursors.discard(cursor) 

267 cursor.close() 

268 

269 def _create_result_page_converter(self, spec: ResultSpec, columns: qt.ColumnSet) -> ResultPageConverter: 

270 context = ResultPageConverterContext( 

271 db=self.db, 

272 column_order=columns.get_column_order(), 

273 dimension_record_cache=self._dimension_record_cache, 

274 ) 

275 match spec: 

276 case DimensionRecordResultSpec(): 

277 return DimensionRecordResultPageConverter(spec, context) 

278 case DataCoordinateResultSpec(): 

279 return DataCoordinateResultPageConverter(spec, context) 

280 case DatasetRefResultSpec(): 

281 return DatasetRefResultPageConverter( 

282 spec, self.get_dataset_type(spec.dataset_type_name), context 

283 ) 

284 case GeneralResultSpec(): 

285 return GeneralResultPageConverter(spec, context) 

286 case _: 

287 raise NotImplementedError(f"Result type '{spec.result_type}' not yet implemented") 

288 

289 def materialize( 

290 self, 

291 tree: qt.QueryTree, 

292 dimensions: DimensionGroup, 

293 datasets: frozenset[str], 

294 allow_duplicate_overlaps: bool = False, 

295 key: qt.MaterializationKey | None = None, 

296 ) -> qt.MaterializationKey: 

297 # Docstring inherited. 

298 if self._exit_stack is None: 

299 raise RuntimeError("QueryDriver context must be entered before 'materialize' is called.") 

300 plan = self.build_query( 

301 tree, qt.ColumnSet(dimensions), allow_duplicate_overlaps=allow_duplicate_overlaps 

302 ) 

303 # Current implementation ignores 'datasets' aside from remembering 

304 # them, because figuring out what to put in the temporary table for 

305 # them is tricky, especially if calibration collections are involved. 

306 # That's okay because: 

307 # 

308 # - the query whose results we materialize includes the dataset 

309 # searches as constraints; 

310 # 

311 # - we still (in Query.materialize) join the dataset searches back in 

312 # anyway, and given materialized data IDs the join to the dataset 

313 # search is straightforward and definitely well-indexed, and not much 

314 # (if at all) worse than joining back in on a materialized UUID. 

315 # 

316 sql_select, _ = plan.finish_select(return_columns=False) 

317 table = self._exit_stack.enter_context( 

318 self.db.temporary_table( 

319 make_table_spec(plan.final_columns, self.db, plan.postprocessing, make_indices=True) 

320 ) 

321 ) 

322 self.db.insert(table, select=sql_select) 

323 if key is None: 

324 key = uuid.uuid4() 

325 self._materializations[key] = _MaterializationState(table, datasets, plan.postprocessing) 

326 return key 

327 

328 def upload_data_coordinates( 

329 self, 

330 dimensions: DimensionGroup, 

331 rows: Iterable[tuple[DataIdValue, ...]], 

332 key: qt.DataCoordinateUploadKey | None = None, 

333 ) -> qt.DataCoordinateUploadKey: 

334 # Docstring inherited. 

335 if self._exit_stack is None: 

336 raise RuntimeError( 

337 "QueryDriver context must be entered before 'upload_data_coordinates' is called." 

338 ) 

339 columns = qt.ColumnSet(dimensions).drop_implied_dimension_keys() 

340 table_spec = ddl.TableSpec( 

341 [columns.get_column_spec(logical_table, field).to_sql_spec() for logical_table, field in columns] 

342 ) 

343 dict_rows: list[dict[str, Any]] 

344 if not columns: 

345 table_spec.fields.add( 

346 ddl.FieldSpec( 

347 SqlSelectBuilder.EMPTY_COLUMNS_NAME, 

348 dtype=SqlSelectBuilder.EMPTY_COLUMNS_TYPE, 

349 nullable=True, 

350 ) 

351 ) 

352 dict_rows = [{SqlSelectBuilder.EMPTY_COLUMNS_NAME: None}] 

353 else: 

354 dict_rows = [dict(zip(dimensions.required, values)) for values in rows] 

355 from_clause: sqlalchemy.FromClause 

356 if self.db.supports_temporary_tables and len(dict_rows) > self._constant_rows_limit: 

357 from_clause = self._exit_stack.enter_context(self.db.temporary_table(table_spec)) 

358 self.db.insert(from_clause, *dict_rows) 

359 else: 

360 from_clause = self.db.constant_rows(table_spec.fields, *dict_rows) 

361 if key is None: 

362 key = uuid.uuid4() 

363 self._upload_tables[key] = from_clause 

364 return key 

365 

366 def count( 

367 self, 

368 tree: qt.QueryTree, 

369 result_spec: ResultSpec, 

370 *, 

371 exact: bool, 

372 discard: bool, 

373 ) -> int: 

374 # Docstring inherited. 

375 columns = result_spec.get_result_columns() 

376 builder = self.build_query(tree, columns, find_first_dataset=result_spec.find_first_dataset) 

377 if not all(d.collection_records for d in builder.joins_analysis.datasets.values()): 

378 return 0 

379 # No need to do similar check on 

380 if not exact: 

381 builder.postprocessing = Postprocessing() 

382 if builder.postprocessing: 

383 if not discard: 

384 raise InvalidQueryError("Cannot count query rows exactly without discarding them.") 

385 sql_select, _ = builder.finish_select(return_columns=False) 

386 builder.postprocessing.limit = result_spec.limit 

387 n = 0 

388 with self.db.query(sql_select.execution_options(yield_per=self._raw_page_size)) as results: 

389 for _ in builder.postprocessing.apply(results): 

390 n += 1 

391 return n 

392 # If the query has DISTINCT, GROUP BY, or UNION [ALL], nest it in a 

393 # subquery so we count deduplicated rows. 

394 select_builder = builder.finish_nested() 

395 # Replace the columns of the query with just COUNT(*). 

396 select_builder.columns = qt.ColumnSet(self._universe.empty) 

397 select_builder.joins.special.clear() 

398 count_func: sqlalchemy.ColumnElement[int] = sqlalchemy.func.count() 

399 select_builder.joins.special["_ROWCOUNT"] = count_func 

400 # Render and run the query. 

401 sql_select = select_builder.select(builder.postprocessing) 

402 with self.db.query(sql_select) as result: 

403 count = cast(int, result.scalar()) 

404 if result_spec.limit is not None: 

405 count = min(count, result_spec.limit) 

406 return count 

407 

408 def any(self, tree: qt.QueryTree, *, execute: bool, exact: bool) -> bool: 

409 # Docstring inherited. 

410 builder = self.build_query(tree, qt.ColumnSet(tree.dimensions), allow_duplicate_overlaps=True) 

411 if not all(d.collection_records for d in builder.joins_analysis.datasets.values()): 

412 return False 

413 if not execute: 

414 if exact: 

415 raise InvalidQueryError("Cannot obtain exact result for 'any' without executing.") 

416 return True 

417 if builder.postprocessing and exact: 

418 sql_select, _ = builder.finish_select(return_columns=False) 

419 with self.db.query( 

420 sql_select.execution_options(yield_per=self._postprocessing_filter_factor) 

421 ) as result: 

422 for _ in builder.postprocessing.apply(result): 

423 return True 

424 return False 

425 sql_select, _ = builder.finish_select() 

426 with self.db.query(sql_select.limit(1)) as result: 

427 return result.first() is not None 

428 

429 def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str]: 

430 # Docstring inherited. 

431 plan = self.build_query(tree, qt.ColumnSet(tree.dimensions), analyze_only=True) 

432 if plan.joins_analysis.messages or not execute: 

433 return plan.joins_analysis.messages 

434 # TODO: guess at ways to split up query that might fail or succeed if 

435 # run separately, execute them with LIMIT 1 and report the results. 

436 return [] 

437 

438 def get_dataset_type(self, name: str) -> DatasetType: 

439 # Docstring inherited 

440 return self.managers.datasets.get_dataset_type(name) 

441 

442 def get_default_collections(self) -> tuple[str, ...]: 

443 # Docstring inherited. 

444 if not self._default_collections: 

445 raise NoDefaultCollectionError("No collections provided and no default collections.") 

446 return self._default_collections 

447 

448 def build_query( 

449 self, 

450 tree: qt.QueryTree, 

451 final_columns: qt.ColumnSet, 

452 *, 

453 order_by: Iterable[qt.OrderExpression] = (), 

454 find_first_dataset: str | qt.AnyDatasetType | None = None, 

455 analyze_only: bool = False, 

456 allow_duplicate_overlaps: bool = False, 

457 ) -> QueryBuilder: 

458 """Convert a query description into a nearly-complete builder object 

459 for the SQL version of that query. 

460 

461 Parameters 

462 ---------- 

463 tree : `.queries.tree.QueryTree` 

464 Description of the joins and row filters in the query. 

465 final_columns : `.queries.tree.ColumnSet` 

466 Final output columns that should be emitted by the SQL query. 

467 order_by : `~collections.abc.Iterable` [ \ 

468 `.queries.tree.OrderExpression` ], optional 

469 Column expressions to sort by. 

470 find_first_dataset : `str`, ``ANY_DATASET``, or `None`, optional 

471 Name of a dataset type for which only one result row for each data 

472 ID should be returned, with the collections searched in order. 

473 ``ANY_DATASET`` is used to represent the search for all dataset 

474 types with a particular set of dimensions in ``tree.any_dataset``. 

475 analyze_only : `bool`, optional 

476 If `True`, perform the initial analysis needed to construct the 

477 builder, but do not call methods that build its SQL form. This can 

478 be useful for obtaining diagnostic information about the query that 

479 would be generated. 

480 allow_duplicate_overlaps : `bool`, optional 

481 If set to `True` then query will be allowed to generate 

482 non-distinct rows for spatial overlaps. 

483 

484 Returns 

485 ------- 

486 builder : `QueryBuilder` 

487 An object that contains an analysis of the queries columns and 

488 general structure, SQLAlchemy representations of most of the 

489 constructed query, and a description of Python-side postprocessing 

490 to be performed after executing it. Callers should generally only 

491 have to call `finish_select` or `finish_nested`; all ``apply`` 

492 methods will have already been called. 

493 

494 Notes 

495 ----- 

496 The SQL queries produced by this driver are built in three steps: 

497 

498 - First we "analyze" the entire query, fetching extra information as 

499 needed to identify the tables we'll join in, the columns we'll have 

500 at each level of what may be a nested SELECT, and the degree to which 

501 we'll have to use GROUP BY / DISTINCT or window functions to obtain 

502 just the rows we want. This process initializes a `QueryBuilder` 

503 instance, but does not call any of its ``apply*`` methods to actually 

504 build the SQLAlchemy version of the query, and it is all that is done 

505 when ``analyze_only=True``. 

506 - In the next step we call `QueryBuilder` ``apply*`` methods to mutate 

507 and/or replace the `QueryBuilder` object's nested `SqlSelectBuilder` 

508 instances to reflect that analysis, building the SQL from the inside 

509 out (subqueries before their parents). This is also done by the 

510 `build_query` method by default 

511 - The returned builder can be used by calling either `finish_select` 

512 (to get the full executable query) or `finish_nested` (to wrap 

513 that query as a subquery - if needed - in order to do related 

514 queries, like ``COUNT(*)`` or ``LIMIT 1`` checks). 

515 

516 Within both the analysis and application steps, we further split the 

517 query *structure* and building process into three stages, mapping 

518 roughly to levels of (potential) subquery nesting: 

519 

520 - In the "joins" stage, we join all the tables we need columns or 

521 constraints from and apply the predicate as a WHERE clause. All 

522 non-calculated columns are included in the query at this stage. 

523 - In the optional "projection" stage, we apply a GROUP BY or DISTINCT 

524 to reduce the set of columns and/or rows, and in some cases add new 

525 calculated columns. 

526 - In the optional "find first" stage, we use a common table expression 

527 with PARTITION ON to search for datasets in a sequence of collections 

528 in the order those collections were provided. 

529 

530 In addition, there are two different overall structures for butler 

531 queries, modeled as two different `QueryBuilder` subclasses. 

532 

533 - `SingleSelectQueryBuilder` produces a single possibly-nested SELECT 

534 query structured directly according to the stages above. This is 

535 used for almost all queries - specifically whenever 

536 `QueryTree.any_dataset` is `None`. 

537 - `UnionQueryBuilder` implements queries for datasets of multiple types 

538 with the same dimensions as a UNION ALL combination of SELECTs, in 

539 which each component SELECT has the joins/projection/find-first 

540 structure. This cannot be implemented as a sequence of 

541 `SingleSelectQueryBuilder` objects, however, since the need for the 

542 terms to share a single `Postprocessing` object and column list means 

543 that they are not independent. 

544 

545 The `build_query` method delegates to methods of the `QueryBuilder` 

546 object when "applying" to let them handle these differences. These 

547 often call back to other methods on the driver (so code shared by both 

548 builders mostly lives in the driver class). 

549 """ 

550 # Analyze the dimensions, dataset searches, and other join operands 

551 # that will go into the query. This also initializes a 

552 # SqlSelectBuilder and Postprocessing with spatial/temporal constraints 

553 # potentially transformed by the dimensions manager (but none of the 

554 # rest of the analysis reflected in that SqlSelectBuilder). 

555 query_tree_analysis = self._analyze_query_tree(tree, allow_duplicate_overlaps) 

556 # The "projection" columns differ from the final columns by not 

557 # omitting any dimension keys (this keeps queries for different result 

558 # types more similar during construction), including any columns needed 

559 # only by order_by terms, and including the collection key if we need 

560 # it for GROUP BY or DISTINCT. 

561 projection_columns = final_columns.copy() 

562 projection_columns.restore_dimension_keys() 

563 for term in order_by: 

564 term.gather_required_columns(projection_columns) 

565 # There are two kinds of query builders: simple SELECTS and UNIONs 

566 # over dataset types. 

567 builder: QueryBuilder 

568 if tree.any_dataset is not None: 

569 builder = UnionQueryBuilder( 

570 query_tree_analysis, 

571 union_dataset_dimensions=tree.any_dataset.dimensions, 

572 projection_columns=projection_columns, 

573 final_columns=final_columns, 

574 find_first_dataset=find_first_dataset, 

575 ) 

576 else: 

577 assert find_first_dataset != qt.AnyDatasetType.ANY_DATASET 

578 builder = SingleSelectQueryBuilder( 

579 tree_analysis=query_tree_analysis, 

580 projection_columns=projection_columns, 

581 final_columns=final_columns, 

582 find_first_dataset=find_first_dataset, 

583 ) 

584 # Finish setting up the projection part of the builder. 

585 builder.analyze_projection() 

586 # The joins-stage query also needs to include all columns needed by the 

587 # downstream projection query. Note that this: 

588 # - never adds new dimensions to the joins stage (since those are 

589 # always a superset of the projection-stage dimensions); 

590 # - does not affect our previous determination of 

591 # needs_dataset_distinct, because any dataset fields being added to 

592 # the joins stage here are already in the projection. 

593 builder.joins_analysis.columns.update(builder.projection_columns) 

594 # Set up the find-first part of the builder. 

595 if find_first_dataset is not None: 

596 builder.analyze_find_first() 

597 # At this point, analysis is complete, and we can proceed to making 

598 # the select_builder(s) reflect that analysis. 

599 if not analyze_only: 

600 builder.apply_joins(self) 

601 builder.apply_projection(self, order_by) 

602 builder.apply_find_first(self) 

603 return builder 

604 

605 def _analyze_query_tree(self, tree: qt.QueryTree, allow_duplicate_overlaps: bool) -> QueryTreeAnalysis: 

606 """Analyze a `.queries.tree.QueryTree` as the first step in building 

607 a SQL query. 

608 

609 Parameters 

610 ---------- 

611 tree : `.queries.tree.QueryTree` 

612 Description of the joins and row filters in the query. 

613 allow_duplicate_overlaps : `bool`, optional 

614 If set to `True` then query will be allowed to generate 

615 non-distinct rows for spatial overlaps. 

616 

617 Returns 

618 ------- 

619 tree_analysis : `QueryTreeAnalysis` 

620 Struct containing additional information need to build the joins 

621 stage of a query. 

622 

623 Notes 

624 ----- 

625 See `build_query` for the complete picture of how SQL queries are 

626 constructed. This method is the very first step for all queries. 

627 

628 The fact that this method returns both a QueryPlan and an initial 

629 SqlSelectBuilder (rather than just a QueryPlan) is a tradeoff that lets 

630 DimensionRecordStorageManager.process_query_overlaps (which is called 

631 by the `_analyze_query_tree` call below) pull out overlap expressions 

632 from the predicate at the same time it turns them into SQL table joins 

633 (in the builder). 

634 """ 

635 # Fetch the records and summaries for any collections we might be 

636 # searching for datasets and organize them for the kind of lookups 

637 # we'll do later. 

638 collection_analysis = self._analyze_collections(tree) 

639 # Extract the data ID implied by the predicate; we can use the governor 

640 # dimensions in that to constrain the collections we search for 

641 # datasets later. 

642 predicate_constraints = PredicateConstraintsSummary(tree.predicate) 

643 # Use the default data ID to apply additional constraints where needed. 

644 predicate_constraints.apply_default_data_id( 

645 self._default_data_id, 

646 tree.dimensions, 

647 validate_governor_constraints=tree.validateGovernorConstraints, 

648 ) 

649 predicate = predicate_constraints.predicate 

650 # Delegate to the dimensions manager to rewrite the predicate and start 

651 # a SqlSelectBuilder to cover any spatial overlap joins or constraints. 

652 # We'll return that SqlSelectBuilder (or copies of it) at the end. 

653 ( 

654 predicate, 

655 select_builder, 

656 postprocessing, 

657 ) = self.managers.dimensions.process_query_overlaps( 

658 tree.dimensions, 

659 predicate, 

660 tree.get_joined_dimension_groups(), 

661 collection_analysis.calibration_dataset_types, 

662 allow_duplicate_overlaps, 

663 predicate_constraints.constraint_data_id, 

664 ) 

665 # Initialize the plan we're return at the end of the method. 

666 joins = QueryJoinsAnalysis(predicate=predicate, columns=select_builder.columns) 

667 joins.messages.extend(predicate_constraints.messages) 

668 # Add columns required by postprocessing. 

669 postprocessing.gather_columns_required(joins.columns) 

670 # Add materializations, which can also bring in more postprocessing. 

671 for m_key, m_dimensions in tree.materializations.items(): 

672 m_state = self._materializations[m_key] 

673 joins.materializations[m_key] = m_dimensions 

674 # When a query is materialized, the new tree has an empty 

675 # (trivially true) predicate because the original was used to make 

676 # the materialized rows. But the original postprocessing isn't 

677 # executed when the materialization happens, so we have to include 

678 # it here. 

679 postprocessing.spatial_join_filtering.extend(m_state.postprocessing.spatial_join_filtering) 

680 postprocessing.spatial_where_filtering.extend(m_state.postprocessing.spatial_where_filtering) 

681 postprocessing.spatial_expression_filtering.extend( 

682 m_state.postprocessing.spatial_expression_filtering 

683 ) 

684 # Add data coordinate uploads. 

685 joins.data_coordinate_uploads.update(tree.data_coordinate_uploads) 

686 # Add dataset_searches and filter out collections that don't have the 

687 # right dataset type or governor dimensions. We re-resolve dataset 

688 # searches now that we have a constraint data ID. 

689 for dataset_type_name, dataset_search in tree.datasets.items(): 

690 resolved_dataset_search = self._resolve_dataset_search( 

691 dataset_type_name, 

692 dataset_search, 

693 predicate_constraints.constraint_data_id, 

694 collection_analysis.summaries_by_dataset_type[dataset_type_name], 

695 ) 

696 if resolved_dataset_search.dimensions != self.get_dataset_type(dataset_type_name).dimensions: 

697 # This is really for server-side defensiveness; it's hard to 

698 # imagine the query getting different dimensions for a dataset 

699 # type in two calls to the same query driver. 

700 raise InvalidQueryError( 

701 f"Incorrect dimensions {resolved_dataset_search.dimensions} for dataset " 

702 f"{dataset_type_name!r} in query " 

703 f"(vs. {self.get_dataset_type(dataset_type_name).dimensions})." 

704 ) 

705 joins.datasets[dataset_type_name] = resolved_dataset_search 

706 if not resolved_dataset_search.collection_records: 

707 joins.messages.append( 

708 f"Search for dataset type {resolved_dataset_search.name!r} in " 

709 f"{list(dataset_search.collections)} is doomed to fail." 

710 ) 

711 joins.messages.extend(resolved_dataset_search.messages) 

712 # Process the special any_dataset search, if there is one. This entails 

713 # making a modified copy of the plan for each distinct post-filtering 

714 # collection search path. 

715 if tree.any_dataset is None: 

716 return QueryTreeAnalysis( 

717 joins, union_datasets=[], initial_select_builder=select_builder, postprocessing=postprocessing 

718 ) 

719 union_datasets = self._resolve_union_datasets(tree.any_dataset.dimensions, collection_analysis) 

720 return QueryTreeAnalysis( 

721 joins, 

722 union_datasets=union_datasets, 

723 initial_select_builder=select_builder, 

724 postprocessing=postprocessing, 

725 ) 

726 

727 def _resolve_union_datasets( 

728 self, dimensions: DimensionGroup, collection_analysis: QueryCollectionAnalysis 

729 ) -> list[ResolvedDatasetSearch[list[str]]]: 

730 """Resolve searches for union datasets. 

731 

732 Parameters 

733 ---------- 

734 dimensions : `DimensionGroup` 

735 Dimensions of the union dataset types in this query. 

736 collection_analysis : `CollectionAnalysis` 

737 Information about the collections appearing in this collection. 

738 

739 Returns 

740 ------- 

741 searches : `list` [ `ResolvedDatasetSearch` ] 

742 Resolved dataset searches for all union dataset types with these 

743 dimensions. Each item in the list groups dataset types with the 

744 same collection search path. 

745 """ 

746 # Gather the filtered collection search path for each union dataset 

747 # type. 

748 collections_by_dataset_type = defaultdict[str, list[str]](list) 

749 for collection_record, collection_summary in collection_analysis.summaries_by_dataset_type[ 

750 qt.ANY_DATASET 

751 ]: 

752 for dataset_type in collection_summary.dataset_types: 

753 if dataset_type.dimensions == dimensions: 

754 collections_by_dataset_type[dataset_type.name].append(collection_record.name) 

755 # Reverse the lookup order on the mapping we just made to group 

756 # dataset types by their collection search path. Each such group 

757 # yields a term in a union query builder 

758 dataset_searches_by_collections: dict[tuple[str, ...], ResolvedDatasetSearch[list[str]]] = {} 

759 for dataset_type_name, collection_path in collections_by_dataset_type.items(): 

760 key = tuple(collection_path) 

761 if (resolved_search := dataset_searches_by_collections.get(key)) is None: 

762 resolved_search = ResolvedDatasetSearch[list[str]]( 

763 [], 

764 dimensions=dimensions, 

765 collection_records=[ 

766 collection_analysis.collection_records[collection_name] 

767 for collection_name in collection_path 

768 ], 

769 messages=[], 

770 ) 

771 resolved_search.is_calibration_search = any( 

772 r.type is CollectionType.CALIBRATION for r in resolved_search.collection_records 

773 ) 

774 dataset_searches_by_collections[key] = resolved_search 

775 resolved_search.name.append(dataset_type_name) 

776 return list(dataset_searches_by_collections.values()) 

777 

778 def apply_initial_query_joins( 

779 self, 

780 select_builder: SqlSelectBuilder, 

781 joins_analysis: QueryJoinsAnalysis, 

782 union_dataset_dimensions: DimensionGroup | None, 

783 ) -> None: 

784 """Apply most of the "joins" stage of query construction to a single 

785 SELECT. 

786 

787 This method is expected to be invoked by `QueryBuilder.apply_joins` 

788 implementations. It handles all tables and subqueries in the FROM 

789 clause, except: 

790 

791 - the `QueryTree.any_dataset` search (handled by `UnionQueryBuilder` 

792 directly); 

793 - joins of dimension tables needed only for their keys (handled by 

794 `apply_missing_dimension_joins`). 

795 

796 Parameters 

797 ---------- 

798 select_builder : `SqlSelectBuilder` 

799 Low-level SQL builder for a single SELECT term, modified in place. 

800 joins_analysis : `QueryJoinsAnalysis` 

801 Information about the joins stage of query construction. 

802 union_dataset_dimensions : `DimensionGroup` or `None` 

803 Dimensions of the union dataset types, or `None` if this is not 

804 a union dataset query. 

805 """ 

806 # Process data coordinate upload joins. 

807 for upload_key, upload_dimensions in joins_analysis.data_coordinate_uploads.items(): 

808 select_builder.joins.join( 

809 SqlJoinsBuilder(db=self.db, from_clause=self._upload_tables[upload_key]).extract_dimensions( 

810 upload_dimensions.required 

811 ) 

812 ) 

813 # Process materialization joins. We maintain a set of dataset types 

814 # that were included in a materialization; searches for these datasets 

815 # can be dropped if they are only present to provide a constraint on 

816 # data IDs, since that's already embedded in a materialization. 

817 materialized_datasets: set[str] = set() 

818 for materialization_key, materialization_dimensions in joins_analysis.materializations.items(): 

819 materialized_datasets.update( 

820 self._join_materialization( 

821 select_builder.joins, materialization_key, materialization_dimensions 

822 ) 

823 ) 

824 # Process dataset joins (not including any union dataset). Datasets 

825 # searches included in materialization can be skipped unless we need 

826 # something from their tables. 

827 materialized_datasets = set() 

828 for m_state in self._materializations.values(): 

829 materialized_datasets.update(m_state.datasets) 

830 for dataset_type_name, dataset_search in joins_analysis.datasets.items(): 

831 if ( 

832 dataset_type_name not in materialized_datasets 

833 or dataset_type_name in select_builder.columns.dataset_fields 

834 ): 

835 self.join_dataset_search( 

836 select_builder.joins, 

837 dataset_search, 

838 joins_analysis.columns.dataset_fields[dataset_search.name], 

839 ) 

840 # Join in dimension element tables that we know we need relationships 

841 # or columns from. 

842 for element in joins_analysis.iter_mandatory(union_dataset_dimensions): 

843 select_builder.joins.join( 

844 self.managers.dimensions.make_joins_builder( 

845 element, joins_analysis.columns.dimension_fields[element.name] 

846 ) 

847 ) 

848 

849 def apply_missing_dimension_joins( 

850 self, select_builder: SqlSelectBuilder, joins_analysis: QueryJoinsAnalysis 

851 ) -> None: 

852 """Apply dimension-table joins to a single SQL SELECT builder to ensure 

853 the full set of desired dimension keys is preset. 

854 

855 This method is expected to be invoked by `QueryBuilder.apply_joins` 

856 implementations. 

857 

858 Parameters 

859 ---------- 

860 select_builder : `SqlSelectBuilder` 

861 Low-level SQL builder for a single SELECT term, modified in place. 

862 joins_analysis : `QueryJoinsAnalysis` 

863 Information about the joins stage of query construction. 

864 """ 

865 # See if any dimension keys are still missing, and if so join in 

866 # their tables. Note that we know there are no fields needed from 

867 # these. 

868 while not (select_builder.joins.dimension_keys.keys() >= joins_analysis.columns.dimensions.names): 

869 # Look for opportunities to join in multiple dimensions via 

870 # single table, to reduce the total number of tables joined in. 

871 missing_dimension_names = ( 

872 joins_analysis.columns.dimensions.names - select_builder.joins.dimension_keys.keys() 

873 ) 

874 best = self._universe[ 

875 max( 

876 missing_dimension_names, 

877 key=lambda name: len(self._universe[name].dimensions.names & missing_dimension_names), 

878 ) 

879 ] 

880 to_join = self.managers.dimensions.make_joins_builder(best, frozenset()) 

881 select_builder.joins.join(to_join) 

882 # Add the WHERE clause to the builder. 

883 select_builder.joins.where( 

884 joins_analysis.predicate.visit(SqlColumnVisitor(select_builder.joins, self)) 

885 ) 

886 

887 def project_spatial_join_filtering( 

888 self, 

889 columns: qt.ColumnSet, 

890 postprocessing: Postprocessing, 

891 select_builders: Iterable[SqlSelectBuilder], 

892 ) -> None: 

893 """Transform spatial join postprocessing into expressions that can be 

894 OR'd together via an aggregate function in a GROUP BY. 

895 

896 This only affects spatial join constraints involving region columns 

897 whose dimensions are being projected away. 

898 

899 Parameters 

900 ---------- 

901 columns : `.queries.tree.ColumnSet` 

902 Columns that will be included in the final query. 

903 postprocessing : `Postprocessing` 

904 Object that describes post-query processing; modified in place. 

905 select_builders : `~collections.abc.Iterable` [ `SqlSelectBuilder` ] 

906 SQL Builder objects to be modified in place. 

907 """ 

908 kept: list[tuple[DimensionElement, DimensionElement]] = [] 

909 for a, b in postprocessing.spatial_join_filtering: 

910 if a.name not in columns.dimensions.elements or b.name not in columns.dimensions.elements: 

911 expr_name = f"_{a}_OVERLAPS_{b}" 

912 postprocessing.spatial_expression_filtering.append(expr_name) 

913 for select_builder in select_builders: 

914 expr = sqlalchemy.cast( 

915 sqlalchemy.cast( 

916 select_builder.joins.fields[a.name]["region"], type_=sqlalchemy.String 

917 ) 

918 + sqlalchemy.literal("&", type_=sqlalchemy.String) 

919 + sqlalchemy.cast( 

920 select_builder.joins.fields[b.name]["region"], type_=sqlalchemy.String 

921 ), 

922 type_=sqlalchemy.LargeBinary, 

923 ) 

924 select_builder.joins.special[expr_name] = expr 

925 else: 

926 kept.append((a, b)) 

927 postprocessing.spatial_join_filtering = kept 

928 

929 def apply_query_projection( 

930 self, 

931 select_builder: SqlSelectBuilder, 

932 postprocessing: Postprocessing, 

933 *, 

934 join_datasets: Mapping[str, ResolvedDatasetSearch[str]], 

935 union_datasets: ResolvedDatasetSearch[list[str]] | None, 

936 projection_columns: qt.ColumnSet, 

937 needs_dimension_distinct: bool, 

938 needs_dataset_distinct: bool, 

939 needs_validity_match_count: bool, 

940 find_first_dataset: str | qt.AnyDatasetType | None, 

941 order_by: Iterable[qt.OrderExpression], 

942 ) -> None: 

943 """Apply the "projection" stage of query construction to a single 

944 SQL SELECT builder. 

945 

946 This method is expected to be invoked by 

947 `QueryBuilder.apply_projection` implementations. 

948 

949 Parameters 

950 ---------- 

951 select_builder : `SqlSelectBuilder` 

952 Low-level SQL builder for a single SELECT term, modified in place. 

953 postprocessing : `Postprocessing` 

954 Description of query processing that happens in Python after the 

955 query is executed by the database. 

956 join_datasets : `~collections.abc.Mapping` [ `str`, \ 

957 `ResolvedDatasetSearch` [ `str` ] ] 

958 Information about regular (non-union) dataset searches joined into 

959 the query. 

960 union_datasets : `ResolvedDatasetSearch [ `list` [ `str` ] ] or `None` 

961 Information about a search for dataset of multiple types with the 

962 same dimensions and the same post-filtering collection search path. 

963 projection_columns : `.queries.tree.ColumnSet` 

964 Columns to include in this projection stage of the query. 

965 needs_dimension_distinct : `bool` 

966 Whether this query needs a GROUP BY or DISTINCT to filter out rows 

967 where the only differences are dimension values that are not being 

968 returned to the user. 

969 needs_dataset_distinct : `bool` 

970 Whether this query needs a GROUP BY or DISTINCT to filter out rows 

971 that correspond to entries for different collections. 

972 needs_validity_match_count : `bool` 

973 Whether this query needs a COUNT column to track the number of 

974 datasets for each data ID and dataset type. If this is `False` but 

975 ``postprocessing.check_validity_match_count`` is `True`, a dummy 

976 count column that is just "1" should be added, because the check 

977 is needed only for some other SELECT term in a UNION ALL. 

978 find_first_dataset : `str` or ``ANY_DATASET`` or `None` 

979 Name of the dataset type that will need a find-first stage. 

980 ``ANY_DATASET`` is used when the union datasets need a find-first 

981 search, while `None` is used to represent both ``find_first=False`` 

982 and the case when ``find_first=True`` but only only collection has 

983 survived filtering. 

984 order_by : `~collections.abc.Iterable` [ \ 

985 `.queries.tree.OrderExpression` ] 

986 Order by clause associated with the query. 

987 """ 

988 select_builder.columns = projection_columns 

989 if not needs_dimension_distinct and not needs_dataset_distinct and not needs_validity_match_count: 

990 if postprocessing.check_validity_match_count: 

991 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = sqlalchemy.literal(1) 

992 # Rows are already unique; nothing else to do in this method. 

993 return 

994 # This method generates either a SELECT DISTINCT [ON] or a SELECT with 

995 # GROUP BY. We'll work out which as we go. 

996 have_aggregates: bool = False 

997 # Dimension key columns form at least most of our GROUP BY or DISTINCT 

998 # ON clause. 

999 unique_keys: list[sqlalchemy.ColumnElement[Any]] = [ 

1000 select_builder.joins.dimension_keys[k][0] 

1001 for k in projection_columns.dimensions.data_coordinate_keys 

1002 ] 

1003 

1004 # Many of our fields derive their uniqueness from the unique_key 

1005 # fields: if rows are unique over the 'unique_key' fields, then they're 

1006 # automatically unique over these 'derived_fields'. We just remember 

1007 # these as pairs of (logical_table, field) for now. 

1008 derived_fields: list[tuple[str | qt.AnyDatasetType, str]] = [] 

1009 

1010 # There are two reasons we might need an aggregate function: 

1011 # - to make sure temporal constraints and joins have resulted in at 

1012 # most one validity range match for each data ID and collection, 

1013 # when we're doing a find-first query. 

1014 # - to compute the unions of regions we need for postprocessing, when 

1015 # the data IDs for those regions are not wholly included in the 

1016 # results (i.e. we need to postprocess on 

1017 # visit_detector_region.region, but the output rows don't have 

1018 # detector, just visit - so we pack the overlap expression into a 

1019 # blob via an aggregate function and interpret it later). 

1020 if postprocessing.check_validity_match_count: 

1021 if needs_validity_match_count: 

1022 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = ( 

1023 sqlalchemy.func.count().label(postprocessing.VALIDITY_MATCH_COUNT) 

1024 ) 

1025 have_aggregates = True 

1026 else: 

1027 select_builder.joins.special[postprocessing.VALIDITY_MATCH_COUNT] = sqlalchemy.literal(1) 

1028 

1029 for element in postprocessing.iter_missing(projection_columns): 

1030 if element.name in projection_columns.dimensions.elements: 

1031 # The region associated with dimension keys returned by the 

1032 # query are derived fields, since there is only one region 

1033 # associated with each dimension key value. 

1034 derived_fields.append((element.name, "region")) 

1035 else: 

1036 # If there's a projection and we're doing postprocessing, we 

1037 # might be collapsing the dimensions of the postprocessing 

1038 # regions. When that happens, we want to apply an aggregate 

1039 # function to them that computes the union of the regions that 

1040 # are grouped together. Note that this should only happen for 

1041 # constraints that involve a "given", external-to-the-database 

1042 # region (postprocessing.spatial_where_filtering); join 

1043 # constraints that need aggregates should have already been 

1044 # transformed in advance. 

1045 select_builder.joins.fields[element.name]["region"] = ddl.Base64Region.union_aggregate( 

1046 select_builder.joins.fields[element.name]["region"] 

1047 ) 

1048 have_aggregates = True 

1049 # Postprocessing spatial join constraints where at least one region's 

1050 # dimensions are being projected away will have already been turned 

1051 # into the kind of expression that sphgeom.Region.decodeOverlapsBase64 

1052 # processes. We can just apply an aggregate function to these. Note 

1053 # that we don't do this to other constraints in order to minimize 

1054 # duplicate fetches of the same region blob. 

1055 for expr_name in postprocessing.spatial_expression_filtering: 

1056 select_builder.joins.special[expr_name] = sqlalchemy.cast( 

1057 sqlalchemy.func.aggregate_strings(select_builder.joins.special[expr_name], "|"), 

1058 type_=sqlalchemy.LargeBinary, 

1059 ) 

1060 have_aggregates = True 

1061 

1062 # All dimension record fields are derived fields. 

1063 for element_name, fields_for_element in projection_columns.dimension_fields.items(): 

1064 for element_field in fields_for_element: 

1065 derived_fields.append((element_name, element_field)) 

1066 # Some dataset fields are derived fields and some are unique keys, and 

1067 # it depends on the kinds of collection(s) we're searching and whether 

1068 # it's a find-first query. 

1069 for dataset_type, fields_for_dataset in projection_columns.dataset_fields.items(): 

1070 is_find_first = dataset_type == find_first_dataset 

1071 dataset_search: ResolvedDatasetSearch[Any] 

1072 if dataset_type is qt.ANY_DATASET: 

1073 assert union_datasets is not None 

1074 dataset_search = union_datasets 

1075 else: 

1076 dataset_search = join_datasets[dataset_type] 

1077 for dataset_field in fields_for_dataset: 

1078 if dataset_field == "collection_key": 

1079 # If the collection_key field is present, it's needed for 

1080 # uniqueness if we're looking in more than one collection. 

1081 # If not, it's a derived field. 

1082 if len(dataset_search.collection_records) > 1: 

1083 unique_keys.append(select_builder.joins.fields[dataset_type]["collection_key"]) 

1084 else: 

1085 derived_fields.append((dataset_type, "collection_key")) 

1086 elif dataset_field == "timespan" and dataset_search.is_calibration_search: 

1087 # The timespan is also a unique key... 

1088 if is_find_first: 

1089 # ...unless we're doing a find-first search on this 

1090 # dataset, in which case we need to use ANY_VALUE on 

1091 # the timespan and check that _VALIDITY_MATCH_COUNT 

1092 # (added earlier) is one, indicating that there was 

1093 # indeed only one timespan for each data ID in each 

1094 # collection that survived the base query's WHERE 

1095 # clauses and JOINs. 

1096 if not self.db.has_any_aggregate: 

1097 raise NotImplementedError( 

1098 f"Cannot generate query that returns timespan for {dataset_type!r} after a " 

1099 "find-first search, because this database does not support the ANY_VALUE " 

1100 "aggregate function (or equivalent)." 

1101 ) 

1102 select_builder.joins.timespans[dataset_type] = select_builder.joins.timespans[ 

1103 dataset_type 

1104 ].apply_any_aggregate(self.db.apply_any_aggregate) 

1105 else: 

1106 unique_keys.extend(select_builder.joins.timespans[dataset_type].flatten()) 

1107 elif ( 

1108 dataset_field == "dataset_id" 

1109 and len(dataset_search.collection_records) > 1 

1110 and not is_find_first 

1111 ): 

1112 # If we have more than one collection in the search, we can 

1113 # find multiple dataset IDs with the same data ID, in 

1114 # different collections. 

1115 # In a non-find-first search, we have to make dataset ID a 

1116 # unique key to prevent de-duplication for rows with the 

1117 # same data ID but different dataset IDs. 

1118 # We don't do this for a find-first search because the 

1119 # window function will take care of it. 

1120 unique_keys.append(select_builder.joins.fields[dataset_type]["dataset_id"]) 

1121 else: 

1122 # Other dataset fields derive their uniqueness from key 

1123 # fields. 

1124 derived_fields.append((dataset_type, dataset_field)) 

1125 if not have_aggregates and not derived_fields: 

1126 # SELECT DISTINCT is sufficient. 

1127 select_builder.distinct = True 

1128 # With DISTINCT ON, Postgres requires that the leftmost parts of the 

1129 # ORDER BY match the DISTINCT ON expressions. It's somewhat tricky to 

1130 # enforce that, so instead we just don't use DISTINCT ON if ORDER BY is 

1131 # present. There may be an optimization opportunity by relaxing this 

1132 # restriction. 

1133 elif not have_aggregates and self.db.has_distinct_on and len(list(order_by)) == 0: 

1134 # SELECT DISTINCT ON is sufficient and supported by this database. 

1135 select_builder.distinct = tuple(unique_keys) 

1136 else: 

1137 # GROUP BY is the only option. 

1138 if derived_fields: 

1139 if self.db.has_any_aggregate: 

1140 for logical_table, field in derived_fields: 

1141 if field == "timespan": 

1142 select_builder.joins.timespans[logical_table] = select_builder.joins.timespans[ 

1143 logical_table 

1144 ].apply_any_aggregate(self.db.apply_any_aggregate) 

1145 else: 

1146 select_builder.joins.fields[logical_table][field] = self.db.apply_any_aggregate( 

1147 select_builder.joins.fields[logical_table][field] 

1148 ) 

1149 else: 

1150 _LOG.debug( 

1151 "Adding %d fields to GROUP BY because this database backend does not support the " 

1152 "ANY_VALUE aggregate function (or equivalent). This may result in a poor query " 

1153 "plan. Materializing the query first sometimes avoids this problem. This warning " 

1154 "can be ignored unless query performance is a problem.", 

1155 len(derived_fields), 

1156 ) 

1157 for logical_table, field in derived_fields: 

1158 if field == "timespan": 

1159 unique_keys.extend(select_builder.joins.timespans[logical_table].flatten()) 

1160 else: 

1161 unique_keys.append(select_builder.joins.fields[logical_table][field]) 

1162 select_builder.group_by = tuple(unique_keys) 

1163 

1164 def apply_query_find_first( 

1165 self, 

1166 select_builder: SqlSelectBuilder, 

1167 postprocessing: Postprocessing, 

1168 find_first_analysis: QueryFindFirstAnalysis, 

1169 ) -> SqlSelectBuilder: 

1170 """Apply the "find first" stage of query construction to a single 

1171 SQL SELECT builder. 

1172 

1173 This method is expected to be invoked by 

1174 `QueryBuilder.apply_find_first` implementations. 

1175 

1176 Parameters 

1177 ---------- 

1178 select_builder : `SqlSelectBuilder` 

1179 Low-level SQL builder for a single SELECT term, consumed on return. 

1180 postprocessing : `Postprocessing` 

1181 Description of query processing that happens in Python after the 

1182 query is executed by the database. 

1183 find_first_analysis : `QueryFindFirstAnalysis` 

1184 Information about the find-first stage gathered during the analysis 

1185 phase of query construction. 

1186 

1187 Returns 

1188 ------- 

1189 select_builder : `SqlSelectBuilder` 

1190 Low-level SQL builder that includes the find-first logic. 

1191 """ 

1192 # The query we're building looks like this: 

1193 # 

1194 # WITH {dst}_base AS ( 

1195 # {target} 

1196 # ... 

1197 # ) 

1198 # SELECT 

1199 # {dst}_window.*, 

1200 # FROM ( 

1201 # SELECT 

1202 # {dst}_base.*, 

1203 # ROW_NUMBER() OVER ( 

1204 # PARTITION BY {dst_base}.{dimensions} 

1205 # ORDER BY {rank} 

1206 # ) AS rownum 

1207 # ) {dst}_window 

1208 # WHERE 

1209 # {dst}_window.rownum = 1; 

1210 # 

1211 # The outermost SELECT will be represented by the SqlSelectBuilder we 

1212 # return. The SqlSelectBuilder we're given corresponds to the Common 

1213 # Table Expression (CTE) at the top. 

1214 # 

1215 # For SQLite only, we could use a much simpler GROUP BY instead, 

1216 # because it extends the standard to do exactly what we want when MIN 

1217 # or MAX appears once and a column does not have an aggregate function 

1218 # (https://www.sqlite.org/quirks.html). But since that doesn't work 

1219 # with PostgreSQL it doesn't help us. 

1220 # 

1221 select_builder = select_builder.nested(cte=True, force=True, postprocessing=postprocessing) 

1222 # We start by filling out the "window" SELECT statement... 

1223 partition_by = [ 

1224 select_builder.joins.dimension_keys[d][0] for d in select_builder.columns.dimensions.required 

1225 ] 

1226 rank_sql_column = sqlalchemy.case( 

1227 {record.key: n for n, record in enumerate(find_first_analysis.search.collection_records)}, 

1228 value=select_builder.joins.fields[find_first_analysis.dataset_type]["collection_key"], 

1229 ) 

1230 if partition_by: 

1231 select_builder.joins.special["_ROWNUM"] = sqlalchemy.sql.func.row_number().over( 

1232 partition_by=partition_by, order_by=rank_sql_column 

1233 ) 

1234 else: 

1235 select_builder.joins.special["_ROWNUM"] = sqlalchemy.sql.func.row_number().over( 

1236 order_by=rank_sql_column 

1237 ) 

1238 # ... and then turn that into a subquery with a constraint on rownum. 

1239 select_builder = select_builder.nested(force=True, postprocessing=postprocessing) 

1240 # We can now add the WHERE constraint on rownum into the outer query. 

1241 select_builder.joins.where(select_builder.joins.special["_ROWNUM"] == 1) 

1242 # Don't propagate _ROWNUM into downstream queries. 

1243 del select_builder.joins.special["_ROWNUM"] 

1244 return select_builder 

1245 

1246 def _analyze_collections(self, tree: qt.QueryTree) -> QueryCollectionAnalysis: 

1247 """Fetch and organize information about all collections appearing in a 

1248 query. 

1249 

1250 Parameters 

1251 ---------- 

1252 tree : `.queries.tree.QueryTree` 

1253 Description of the joins and row filters in the query. 

1254 

1255 Returns 

1256 ------- 

1257 collection_analysis : `QueryCollectionAnalysis` 

1258 Struct containing collection records and summaries, organized 

1259 for later access by dataset type. 

1260 """ 

1261 # Retrieve collection information for all collections in a tree. 

1262 collection_names = set( 

1263 itertools.chain.from_iterable( 

1264 dataset_search.collections for dataset_search in tree.datasets.values() 

1265 ) 

1266 ) 

1267 if tree.any_dataset is not None: 

1268 collection_names.update(tree.any_dataset.collections) 

1269 collection_records = { 

1270 record.name: record 

1271 for record in self.managers.collections.resolve_wildcard( 

1272 CollectionWildcard.from_names(collection_names), flatten_chains=True, include_chains=True 

1273 ) 

1274 } 

1275 non_chain_records = [ 

1276 record for record in collection_records.values() if record.type is not CollectionType.CHAINED 

1277 ] 

1278 # Fetch summaries for a subset of dataset types. 

1279 if tree.any_dataset is not None: 

1280 summaries = self.managers.datasets.fetch_summaries(non_chain_records, dataset_types=None) 

1281 else: 

1282 dataset_types = [self.get_dataset_type(dataset_type_name) for dataset_type_name in tree.datasets] 

1283 summaries = self.managers.datasets.fetch_summaries(non_chain_records, dataset_types) 

1284 result = QueryCollectionAnalysis(collection_records=collection_records) 

1285 # Do a preliminary resolution for dataset searches to identify any 

1286 # calibration lookups that might participate in temporal joins. 

1287 for dataset_type_name, dataset_search in tree.iter_all_dataset_searches(): 

1288 collection_summaries = self._filter_collections( 

1289 dataset_search.collections, collection_records, summaries 

1290 ) 

1291 result.summaries_by_dataset_type[dataset_type_name] = collection_summaries 

1292 resolved_dataset_search = self._resolve_dataset_search( 

1293 dataset_type_name, dataset_search, {}, collection_summaries 

1294 ) 

1295 if resolved_dataset_search.is_calibration_search: 

1296 result.calibration_dataset_types.add(dataset_type_name) 

1297 return result 

1298 

1299 def _filter_collections( 

1300 self, 

1301 collection_names: Iterable[str], 

1302 records: Mapping[str, CollectionRecord], 

1303 summaries: Mapping[Any, CollectionSummary], 

1304 ) -> list[tuple[CollectionRecord, CollectionSummary]]: 

1305 """Return a subset of collection records and summaries ordered 

1306 according to the input collection list. 

1307 

1308 Parameters 

1309 ---------- 

1310 collection_names : `~collections.abc.Iterable` [`str`] 

1311 List of collection names. 

1312 records : `~collections.abc.Mapping` [`str`, `CollectionRecord`] 

1313 Mapping of collection names to collection records, must contain 

1314 records for all collections in ``collection_names`` and all their 

1315 children collections. 

1316 summaries : `~collections.abc.Mapping` [`typing.Any`, \ 

1317 `CollectionSummary`] 

1318 Mapping of collection IDs to collection summaries, must contain 

1319 summaries for all non-chained collections in the collection tree. 

1320 

1321 Returns 

1322 ------- 

1323 result `list` [`tuple` [`CollectionRecord`, `CollectionSummary`]] 

1324 Sequence of collection records and their corresponding summaries 

1325 ordered according to the order of input collections and their 

1326 child collections. Does not include chained collections. 

1327 """ 

1328 done: set[str] = set() 

1329 

1330 def recurse(names: Iterable[str]) -> Iterator[tuple[CollectionRecord, CollectionSummary]]: 

1331 for name in names: 

1332 if name not in done: 

1333 done.add(name) 

1334 record = records[name] 

1335 if record.type is CollectionType.CHAINED: 

1336 yield from recurse(cast(ChainedCollectionRecord, record).children) 

1337 else: 

1338 yield record, summaries[record.key] 

1339 

1340 return list(recurse(collection_names)) 

1341 

1342 def _resolve_dataset_search( 

1343 self, 

1344 dataset_type_name: _T, 

1345 dataset_search: qt.DatasetSearch, 

1346 constraint_data_id: Mapping[str, DataIdValue], 

1347 collections: list[tuple[CollectionRecord, CollectionSummary]], 

1348 ) -> ResolvedDatasetSearch[_T]: 

1349 """Resolve the collections that should actually be searched for 

1350 datasets of a particular type. 

1351 

1352 Parameters 

1353 ---------- 

1354 dataset_type_name : `str` or ``ANY_DATASET`` 

1355 Name of the dataset being searched for. 

1356 dataset_search : `.queries.tree.DatasetSearch` 

1357 Struct holding the dimensions and original collection search path. 

1358 constraint_data_id : `~collections.abc.Mapping` 

1359 Data ID mapping derived from the query predicate that may be used 

1360 to filter out some collections based on their governor dimensions. 

1361 collections : `list` [ `tuple` [ \ 

1362 `.registry.interfaces.CollectionRecord`, \ 

1363 `.registry.CollectionSummary` ] ] 

1364 Tuples of collection record and summary. 

1365 

1366 Returns 

1367 ------- 

1368 resolved : `ResolvedDatasetSearch` 

1369 Struct that extends `dataset_search`` with the dataset type name 

1370 and resolved collection records. 

1371 """ 

1372 result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions) 

1373 if not collections: 

1374 result.messages.append("No datasets can be found because collection list is empty.") 

1375 for collection_record, collection_summary in collections: 

1376 rejected: bool = False 

1377 if ( 

1378 result.name is not qt.ANY_DATASET 

1379 and result.name not in collection_summary.dataset_types.names 

1380 ): 

1381 result.messages.append( 

1382 f"No datasets of type {result.name!r} in collection {collection_record.name!r}." 

1383 ) 

1384 rejected = True 

1385 for governor in ( 

1386 constraint_data_id.keys() 

1387 & collection_summary.governors.keys() 

1388 & dataset_search.dimensions.names 

1389 ): 

1390 if constraint_data_id[governor] not in collection_summary.governors[governor]: 

1391 result.messages.append( 

1392 f"No datasets with {governor}={constraint_data_id[governor]!r} " 

1393 f"in collection {collection_record.name!r}." 

1394 ) 

1395 rejected = True 

1396 if not rejected: 

1397 if collection_record.type is CollectionType.CALIBRATION: 

1398 result.is_calibration_search = True 

1399 result.collection_records.append(collection_record) 

1400 return result 

1401 

1402 def _join_materialization( 

1403 self, 

1404 joins_builder: SqlJoinsBuilder, 

1405 key: qt.MaterializationKey, 

1406 dimensions: DimensionGroup, 

1407 ) -> frozenset[str]: 

1408 """Join a materialization into an under-construction query. 

1409 

1410 Parameters 

1411 ---------- 

1412 joins_builder : `SqlJoinsBuilder` 

1413 Component of a `SqlSelectBuilder` that holds the FROM and WHERE 

1414 clauses. This will be modified in-place on return. 

1415 key : `.queries.tree.MaterializationKey` 

1416 Unique identifier created for this materialization when it was 

1417 created. 

1418 dimensions : `DimensionGroup` 

1419 Dimensions of the materialization. 

1420 

1421 Returns 

1422 ------- 

1423 datasets : `frozenset` [ `str` ] 

1424 Dataset types that were included as constraints when this 

1425 materialization was created. 

1426 """ 

1427 columns = qt.ColumnSet(dimensions) 

1428 m_state = self._materializations[key] 

1429 joins_builder.join( 

1430 SqlJoinsBuilder(db=self.db, from_clause=m_state.table).extract_columns( 

1431 columns, m_state.postprocessing 

1432 ) 

1433 ) 

1434 return m_state.datasets 

1435 

1436 @overload 

1437 def join_dataset_search( 1437 ↛ exitline 1437 didn't return from function 'join_dataset_search' because

1438 self, 

1439 joins_builder: SqlJoinsBuilder, 

1440 resolved_search: ResolvedDatasetSearch[list[str]], 

1441 fields: Set[qt.AnyDatasetFieldName], 

1442 union_dataset_type_name: str, 

1443 ) -> None: ... 

1444 

1445 @overload 

1446 def join_dataset_search( 1446 ↛ exitline 1446 didn't return from function 'join_dataset_search' because

1447 self, 

1448 joins_builder: SqlJoinsBuilder, 

1449 resolved_search: ResolvedDatasetSearch[str], 

1450 fields: Set[qt.AnyDatasetFieldName], 

1451 ) -> None: ... 

1452 

1453 def join_dataset_search( 

1454 self, 

1455 joins_builder: SqlJoinsBuilder, 

1456 resolved_search: ResolvedDatasetSearch[Any], 

1457 fields: Set[qt.AnyDatasetFieldName], 

1458 union_dataset_type_name: str | None = None, 

1459 ) -> None: 

1460 """Join a dataset search into an under-construction query. 

1461 

1462 Parameters 

1463 ---------- 

1464 joins_builder : `SqlJoinsBuilder` 

1465 Component of a `SqlSelectBuilder` that holds the FROM and WHERE 

1466 clauses. This will be modified in-place on return. 

1467 resolved_search : `ResolvedDatasetSearch` 

1468 Struct that describes the dataset type and collections. 

1469 fields : `~collections.abc.Set` [ `str` ] 

1470 Dataset fields to include. 

1471 union_dataset_type_name : `str`, optional 

1472 Dataset type name to use when ``resolved_search`` represents 

1473 multiple union datasets. 

1474 """ 

1475 # The asserts below will need to be dropped (and the implications 

1476 # dealt with instead) if materializations start having dataset fields. 

1477 if union_dataset_type_name is None: 

1478 dataset_type = self.get_dataset_type(cast(str, resolved_search.name)) 

1479 assert dataset_type.name not in joins_builder.fields, ( 

1480 "Dataset fields have unexpectedly already been joined in." 

1481 ) 

1482 assert dataset_type.name not in joins_builder.timespans, ( 

1483 "Dataset timespan has unexpectedly already been joined in." 

1484 ) 

1485 else: 

1486 dataset_type = self.get_dataset_type(union_dataset_type_name) 

1487 assert qt.ANY_DATASET not in joins_builder.fields, ( 

1488 "Union dataset fields have unexpectedly already been joined in." 

1489 ) 

1490 assert qt.ANY_DATASET not in joins_builder.timespans, ( 

1491 "Union dataset timespan has unexpectedly already been joined in." 

1492 ) 

1493 

1494 joins_builder.join( 

1495 self.managers.datasets.make_joins_builder( 

1496 dataset_type, 

1497 resolved_search.collection_records, 

1498 fields, 

1499 is_union=(union_dataset_type_name is not None), 

1500 ) 

1501 ) 

1502 

1503 

1504@dataclasses.dataclass 

1505class _MaterializationState: 

1506 table: sqlalchemy.Table 

1507 datasets: frozenset[str] 

1508 postprocessing: Postprocessing 

1509 

1510 

1511class _Cursor: 

1512 """A helper class for managing paged query results and cursor lifetimes. 

1513 

1514 This class holds a context manager for the SQLAlchemy cursor object but is 

1515 not itself a context manager. It always cleans up (i.e. calls its `close` 

1516 method) when it raises an exception or exhausts the cursor, but external 

1517 code is responsible for calling `close` when the cursor is abandoned before 

1518 it is exhausted, including when that happens due to an external exception. 

1519 

1520 Parameters 

1521 ---------- 

1522 db : `.registry.interface.Database` 

1523 Database to run the query against. 

1524 sql : `sqlalchemy.Executable` 

1525 SQL query to execute. 

1526 postprocessing : `Postprocessing` 

1527 Post-query filtering and checks to perform. 

1528 raw_page_size : `int` 

1529 Maximum number of SQL result rows to return in each page, before 

1530 postprocessing. 

1531 page_converter : `ResultPageConverter` 

1532 Object for converting raw SQL result rows into ResultPage instances. 

1533 """ 

1534 

1535 def __init__( 

1536 self, 

1537 db: Database, 

1538 sql: sqlalchemy.Executable, 

1539 postprocessing: Postprocessing, 

1540 raw_page_size: int, 

1541 page_converter: ResultPageConverter, 

1542 ): 

1543 self._raw_page_size = raw_page_size 

1544 self._postprocessing = postprocessing 

1545 self._context = db.query(sql, execution_options=dict(yield_per=raw_page_size)) 

1546 self._page_converter = page_converter 

1547 self._closed = False 

1548 cursor = self._context.__enter__() 

1549 try: 

1550 self._iterator = cursor.partitions() 

1551 except BaseException: 

1552 self.close(*sys.exc_info()) 

1553 raise 

1554 

1555 def close(self, exc_type: Any = None, exc_value: Any = None, traceback: Any = None) -> None: 

1556 """Close this cursor. 

1557 

1558 Parameters 

1559 ---------- 

1560 exc_type : `type` 

1561 Exception type as obtained from `sys.exc_info`, or `None` if there 

1562 was no error. 

1563 exc_value : `BaseException` or `None` 

1564 Exception instance as obtained from `sys.exc_info`, or `None` if 

1565 there was no error. 

1566 traceback : `object` 

1567 Traceback as obtained from `sys.exc_info`, or `None` if there was 

1568 no error. 

1569 """ 

1570 if not self._closed: 

1571 self._context.__exit__(exc_type, exc_value, traceback) 

1572 self._closed = True 

1573 

1574 def next(self) -> ResultPage | None: 

1575 """Return the next result page from this query. 

1576 

1577 When there are no more results after this result page, the `next_page` 

1578 attribute of the returned object is `None` and the cursor will be 

1579 closed. The cursor is also closed if this method raises an exception. 

1580 """ 

1581 if self._closed: 

1582 raise RuntimeError("Cannot continue query result iteration: cursor has been closed") 

1583 try: 

1584 raw_page = next(self._iterator, None) 

1585 if raw_page is None: 

1586 self.close() 

1587 return None 

1588 

1589 postprocessed_rows = self._postprocessing.apply(raw_page) 

1590 return self._page_converter.convert(postprocessed_rows) 

1591 except BaseException: 

1592 self.close(*sys.exc_info()) 

1593 raise