Coverage for python / lsst / daf / butler / registry / dimensions / static.py: 0%

361 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29import dataclasses 

30import itertools 

31import logging 

32from collections import defaultdict 

33from collections.abc import Iterable, Mapping, Sequence, Set 

34from typing import Any 

35 

36import sqlalchemy 

37 

38from lsst.sphgeom import Region 

39 

40from ... import ddl 

41from ..._exceptions import UnimplementedQueryError 

42from ..._named import NamedKeyDict 

43from ...dimensions import ( 

44 DatabaseDimensionElement, 

45 DatabaseTopologicalFamily, 

46 DataCoordinate, 

47 DataIdValue, 

48 Dimension, 

49 DimensionElement, 

50 DimensionGroup, 

51 DimensionRecord, 

52 DimensionRecordSet, 

53 DimensionUniverse, 

54 SkyPixDimension, 

55 addDimensionForeignKey, 

56) 

57from ...dimensions.record_cache import DimensionRecordCache 

58from ...direct_query_driver import ( # Future query system (direct,server). 

59 Postprocessing, 

60 SqlJoinsBuilder, 

61 SqlSelectBuilder, 

62) 

63from ...queries import tree as qt # Future query system (direct,client,server) 

64from ...queries.overlaps import OverlapsVisitor 

65from ...queries.visitors import PredicateVisitFlags 

66from ..interfaces import Database, DimensionRecordStorageManager, StaticTablesContext, VersionTuple 

67 

68# This has to be updated on every schema change 

69_VERSION = VersionTuple(6, 0, 2) 

70 

71_LOG = logging.getLogger(__name__) 

72 

73 

74class StaticDimensionRecordStorageManager(DimensionRecordStorageManager): 

75 """An implementation of `DimensionRecordStorageManager` for single-layer 

76 `Registry` and the base layers of multi-layer `Registry`. 

77 

78 This manager creates `DimensionRecordStorage` instances for all elements 

79 in the `DimensionUniverse` in its own `initialize` method, as part of 

80 static table creation, so it never needs to manage any dynamic registry 

81 tables. 

82 

83 Parameters 

84 ---------- 

85 db : `Database` 

86 Interface to the underlying database engine and namespace. 

87 tables : `dict` [ `str`, `sqlalchemy.Table` ] 

88 Mapping from dimension element name to SQL table, for all elements that 

89 have `DimensionElement.has_own_table` `True`. 

90 overlap_tables : `dict` [ `str`, `tuple` [ `sqlalchemy.Table`, \ 

91 `sqlalchemy.Table` ] ] 

92 Mapping from dimension element name to SQL table holding overlaps 

93 between the common skypix dimension and that element, for all elements 

94 that have `DimensionElement.has_own_table` `True` and 

95 `DimensionElement.spatial` not `None`. 

96 dimension_group_storage : `_DimensionGroupStorage` 

97 Object that manages saved `DimensionGroup` definitions. 

98 universe : `DimensionUniverse` 

99 All known dimensions. 

100 registry_schema_version : `VersionTuple` or `None`, optional 

101 Version of registry schema. 

102 """ 

103 

104 def __init__( 

105 self, 

106 db: Database, 

107 *, 

108 tables: dict[str, sqlalchemy.Table], 

109 overlap_tables: dict[str, tuple[sqlalchemy.Table, sqlalchemy.Table]], 

110 dimension_group_storage: _DimensionGroupStorage, 

111 universe: DimensionUniverse, 

112 registry_schema_version: VersionTuple | None = None, 

113 ): 

114 super().__init__(universe=universe, registry_schema_version=registry_schema_version) 

115 self._db = db 

116 self._tables = tables 

117 self._overlap_tables = overlap_tables 

118 self._dimension_group_storage = dimension_group_storage 

119 

120 def clone(self, db: Database) -> StaticDimensionRecordStorageManager: 

121 return StaticDimensionRecordStorageManager( 

122 db, 

123 tables=self._tables, 

124 overlap_tables=self._overlap_tables, 

125 dimension_group_storage=self._dimension_group_storage.clone(db), 

126 universe=self.universe, 

127 registry_schema_version=self._registry_schema_version, 

128 ) 

129 

130 @classmethod 

131 def initialize( 

132 cls, 

133 db: Database, 

134 context: StaticTablesContext, 

135 *, 

136 universe: DimensionUniverse, 

137 registry_schema_version: VersionTuple | None = None, 

138 ) -> DimensionRecordStorageManager: 

139 # Docstring inherited from DimensionRecordStorageManager. 

140 tables: dict[str, sqlalchemy.Table] = {} 

141 # Define tables for governor dimensions, which are never spatial or 

142 # temporal and always have tables. 

143 for dimension in universe.governor_dimensions: 

144 spec = dimension.RecordClass.fields.makeTableSpec( 

145 TimespanReprClass=db.getTimespanRepresentation() 

146 ) 

147 tables[dimension.name] = context.addTable(dimension.name, spec) 

148 # Define tables for database dimension elements, which may or may not 

149 # have their own tables and may be spatial or temporal. 

150 spatial = NamedKeyDict[DatabaseTopologicalFamily, list[DimensionElement]]() 

151 overlap_tables: dict[str, tuple[sqlalchemy.Table, sqlalchemy.Table]] = {} 

152 for element in universe.database_elements: 

153 if not element.has_own_table: 

154 continue 

155 spec = element.RecordClass.fields.makeTableSpec(TimespanReprClass=db.getTimespanRepresentation()) 

156 tables[element.name] = context.addTable(element.name, spec) 

157 if element.spatial is not None: 

158 spatial.setdefault(element.spatial, []).append(element) 

159 overlap_tables[element.name] = cls._make_skypix_overlap_tables(context, element) 

160 for field_name in spec.fields.names: 

161 if ( 

162 len(qt.ColumnSet.get_qualified_name(element.name, field_name)) 

163 >= db.dialect.max_identifier_length 

164 ): 

165 # Being able to assume that all dimension fields fit inside 

166 # the DB's identifier limit is really convenient and very 

167 # unlikely to cause trouble in practice. We'll just make 

168 # sure we catch any such trouble as early as possible. 

169 raise RuntimeError( 

170 f"Dimension filed '{element.name}.{field_name}' is too long for this database. " 

171 "Please file a ticket for long-field support if this was not a mistake." 

172 ) 

173 # Add some tables for materialized overlaps between database 

174 # dimensions. We've never used these and no longer plan to, but we 

175 # have to keep creating them to keep schema versioning consistent. 

176 cls._make_legacy_overlap_tables(context, spatial) 

177 # Create tables that store DimensionGroup definitions. 

178 dimension_group_storage = _DimensionGroupStorage.initialize(db, context, universe=universe) 

179 return cls( 

180 db=db, 

181 tables=tables, 

182 overlap_tables=overlap_tables, 

183 universe=universe, 

184 dimension_group_storage=dimension_group_storage, 

185 registry_schema_version=registry_schema_version, 

186 ) 

187 

188 def fetch_cache_dict(self) -> dict[str, DimensionRecordSet]: 

189 # Docstring inherited. 

190 result: dict[str, DimensionRecordSet] = {} 

191 with self._db.transaction(): 

192 for element in self.universe.elements: 

193 if not element.is_cached: 

194 continue 

195 assert not element.temporal, ( 

196 "Cached dimension elements should not be spatial or temporal, as that " 

197 "suggests a large number of records." 

198 ) 

199 if element.implied_union_target is not None: 

200 assert isinstance(element, Dimension), "Only dimensions can be implied dependencies." 

201 table = self._tables[element.implied_union_target.name] 

202 sql = sqlalchemy.select( 

203 table.columns[element.name].label(element.primary_key.name) 

204 ).distinct() 

205 else: 

206 table = self._tables[element.name] 

207 sql = table.select() 

208 with self._db.query(sql) as results: 

209 result[element.name] = DimensionRecordSet( 

210 element=element, 

211 records=[element.RecordClass(**row) for row in results.mappings()], 

212 ) 

213 return result 

214 

215 def insert( 

216 self, 

217 element: DimensionElement, 

218 *records: DimensionRecord, 

219 replace: bool = False, 

220 skip_existing: bool = False, 

221 ) -> None: 

222 # Docstring inherited. 

223 if not element.has_own_table: 

224 raise TypeError(f"Cannot insert {element.name} records.") 

225 db_rows = self._make_record_db_rows(element, records, replace=replace) 

226 table = self._tables[element.name] 

227 with self._db.transaction(): 

228 if replace: 

229 self._db.replace(table, *db_rows.main_rows) 

230 elif skip_existing: 

231 self._db.ensure(table, *db_rows.main_rows, primary_key_only=True) 

232 else: 

233 self._db.insert(table, *db_rows.main_rows) 

234 self._insert_overlaps( 

235 element, db_rows.overlap_insert_rows, db_rows.overlap_delete_rows, skip_existing=skip_existing 

236 ) 

237 for related_element_name, summary_rows in db_rows.overlap_summary_rows.items(): 

238 self._db.ensure(self._overlap_tables[related_element_name][0], *summary_rows) 

239 

240 def sync(self, record: DimensionRecord, update: bool = False) -> bool | dict[str, Any]: 

241 # Docstring inherited. 

242 if not record.definition.has_own_table: 

243 raise TypeError(f"Cannot sync {record.definition.name} records.") 

244 # We might not need the overlap rows at all; we won't know until we try 

245 # to insert the main row. But we figure it's better to spend the time 

246 # to compute them in advance always *outside* the database transaction 

247 # than to compute them only as-needed inside the database transaction, 

248 # since in-transaction time is especially precious. 

249 db_rows = self._make_record_db_rows(record.definition, [record], replace=True) 

250 (compared,) = db_rows.main_rows 

251 keys = {} 

252 for name in record.fields.required.names: 

253 keys[name] = compared.pop(name) 

254 with self._db.transaction(): 

255 _, inserted_or_updated = self._db.sync( 

256 self._tables[record.definition.name], 

257 keys=keys, 

258 compared=compared, 

259 update=update, 

260 ) 

261 if inserted_or_updated: 

262 if inserted_or_updated is True: 

263 # Inserted a new row, so we just need to insert new 

264 # overlap rows (if there are any). 

265 self._insert_overlaps( 

266 record.definition, db_rows.overlap_insert_rows, overlap_delete_rows=[] 

267 ) 

268 elif "region" in inserted_or_updated: 

269 # Updated the region, so we need to delete old overlap 

270 # rows and insert new ones. 

271 self._insert_overlaps( 

272 record.definition, db_rows.overlap_insert_rows, db_rows.overlap_delete_rows 

273 ) 

274 for related_element_name, summary_rows in db_rows.overlap_summary_rows.items(): 

275 self._db.ensure(self._overlap_tables[related_element_name][0], *summary_rows) 

276 return inserted_or_updated 

277 

278 def fetch_one( 

279 self, 

280 element_name: str, 

281 data_id: DataCoordinate, 

282 cache: DimensionRecordCache, 

283 ) -> DimensionRecord | None: 

284 # Docstring inherited. 

285 element = self.universe[element_name] 

286 if element_name in cache: 

287 try: 

288 return cache[element_name].find(data_id) 

289 except LookupError: 

290 return None 

291 if element.implied_union_target is not None: 

292 assert isinstance(element, Dimension), "Only dimensions can be implied dependencies." 

293 table = self._tables[element.implied_union_target.name] 

294 sql = sqlalchemy.select(table.columns[element.name].label(element.primary_key.name)).where( 

295 table.columns[element_name] == data_id[element_name] 

296 ) 

297 elif isinstance(element, SkyPixDimension): 

298 id = data_id[element_name] 

299 return element.RecordClass(id=id, region=element.pixelization.pixel(id)) 

300 else: 

301 table = self._tables[element.name] 

302 sql = table.select().where( 

303 *[ 

304 table.columns[column_name] == data_id[dimension_name] 

305 for column_name, dimension_name in zip( 

306 element.schema.required.names, element.required.names 

307 ) 

308 ] 

309 ) 

310 with self._db.query(sql) as results: 

311 row = results.fetchone() 

312 if row is None: 

313 return None 

314 mapping: Mapping 

315 if element.temporal is not None: 

316 mapping = dict(**row._mapping) 

317 timespan = self._db.getTimespanRepresentation().extract(mapping) 

318 for name in self._db.getTimespanRepresentation().getFieldNames(): 

319 del mapping[name] 

320 mapping["timespan"] = timespan 

321 else: 

322 mapping = row._mapping 

323 return element.RecordClass(**mapping) 

324 

325 def save_dimension_group(self, group: DimensionGroup) -> int: 

326 # Docstring inherited from DimensionRecordStorageManager. 

327 return self._dimension_group_storage.save(group) 

328 

329 def load_dimension_group(self, key: int) -> DimensionGroup: 

330 # Docstring inherited from DimensionRecordStorageManager. 

331 return self._dimension_group_storage.load(key) 

332 

333 def make_joins_builder(self, element: DimensionElement, fields: Set[str]) -> SqlJoinsBuilder: 

334 if element.implied_union_target is not None: 

335 assert not fields, "Dimensions with implied-union storage never have fields." 

336 return SqlSelectBuilder( 

337 self.make_joins_builder(element.implied_union_target, fields), 

338 columns=qt.ColumnSet(element.minimal_group).drop_implied_dimension_keys(), 

339 distinct=True, 

340 ).into_joins_builder(postprocessing=None) 

341 if not element.has_own_table: 

342 raise UnimplementedQueryError(f"Cannot join dimension element {element} with no table.") 

343 table = self._tables[element.name] 

344 result = SqlJoinsBuilder(db=self._db, from_clause=table) 

345 for dimension_name, column_name in zip(element.required.names, element.schema.required.names): 

346 result.dimension_keys[dimension_name].append(table.columns[column_name]) 

347 result.extract_dimensions(element.implied.names) 

348 for field in fields: 

349 if field == "timespan": 

350 result.timespans[element.name] = self._db.getTimespanRepresentation().from_columns( 

351 table.columns 

352 ) 

353 else: 

354 result.fields[element.name][field] = table.columns[field] 

355 return result 

356 

357 def process_query_overlaps( 

358 self, 

359 dimensions: DimensionGroup, 

360 predicate: qt.Predicate, 

361 join_operands: Iterable[DimensionGroup], 

362 calibration_dataset_types: Set[str | qt.AnyDatasetType], 

363 allow_duplicates: bool, 

364 constraint_data_id: Mapping[str, DataIdValue], 

365 ) -> tuple[qt.Predicate, SqlSelectBuilder, Postprocessing]: 

366 overlaps_visitor = _CommonSkyPixMediatedOverlapsVisitor( 

367 self._db, 

368 dimensions, 

369 calibration_dataset_types, 

370 self._overlap_tables, 

371 allow_duplicates, 

372 constraint_data_id=constraint_data_id, 

373 ) 

374 new_predicate = overlaps_visitor.run(predicate, join_operands) 

375 return new_predicate, overlaps_visitor.builder, overlaps_visitor.postprocessing 

376 

377 @classmethod 

378 def currentVersions(cls) -> list[VersionTuple]: 

379 # Docstring inherited from VersionedExtension. 

380 return [_VERSION] 

381 

382 @classmethod 

383 def _make_skypix_overlap_tables( 

384 cls, context: StaticTablesContext, element: DimensionElement 

385 ) -> tuple[sqlalchemy.Table, sqlalchemy.Table]: 

386 assert element.governor is not None 

387 summary_spec = ddl.TableSpec( 

388 fields=[ 

389 ddl.FieldSpec( 

390 name="skypix_system", 

391 dtype=sqlalchemy.String, 

392 length=16, 

393 nullable=False, 

394 primaryKey=True, 

395 ), 

396 ddl.FieldSpec( 

397 name="skypix_level", 

398 dtype=sqlalchemy.SmallInteger, 

399 nullable=False, 

400 primaryKey=True, 

401 ), 

402 ] 

403 ) 

404 addDimensionForeignKey(summary_spec, element.governor, primaryKey=True) 

405 overlap_spec = ddl.TableSpec( 

406 fields=[ 

407 ddl.FieldSpec( 

408 name="skypix_system", 

409 dtype=sqlalchemy.String, 

410 length=16, 

411 nullable=False, 

412 primaryKey=True, 

413 ), 

414 ddl.FieldSpec( 

415 name="skypix_level", 

416 dtype=sqlalchemy.SmallInteger, 

417 nullable=False, 

418 primaryKey=True, 

419 ), 

420 # (more columns added below) 

421 ], 

422 unique=set(), 

423 indexes={ 

424 # This index has the same fields as the PK, in a different 

425 # order, to facilitate queries that know skypix_index and want 

426 # to find the other element. 

427 ddl.IndexSpec( 

428 "skypix_system", 

429 "skypix_level", 

430 "skypix_index", 

431 *element.minimal_group.required, 

432 ), 

433 }, 

434 foreignKeys=[ 

435 # Foreign key to summary table. This makes sure we don't 

436 # materialize any overlaps without remembering that we've done 

437 # so in the summary table, though it can't prevent the converse 

438 # of adding a summary row without adding overlap row (either of 

439 # those is a logic bug, of course, but we want to be defensive 

440 # about those). Using ON DELETE CASCADE, it'd be very easy to 

441 # implement "disabling" an overlap materialization, because we 

442 # can just delete the summary row. 

443 # Note that the governor dimension column is added below, in 

444 # the call to addDimensionForeignKey. 

445 ddl.ForeignKeySpec( 

446 f"{element.name}_skypix_overlap_summary", 

447 source=("skypix_system", "skypix_level", element.governor.name), 

448 target=("skypix_system", "skypix_level", element.governor.name), 

449 onDelete="CASCADE", 

450 ), 

451 ], 

452 ) 

453 # Add fields for the standard element this class manages overlaps for. 

454 # This is guaranteed to add a column for the governor dimension, 

455 # because that's a required dependency of element. 

456 for dimension in element.required: 

457 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True) 

458 # Add field for the actual skypix index. We do this later because I 

459 # think we care (at least a bit) about the order in which the primary 

460 # key is defined, in that we want a non-summary column like this one 

461 # to appear after the governor dimension column. 

462 overlap_spec.fields.add( 

463 ddl.FieldSpec( 

464 name="skypix_index", 

465 dtype=sqlalchemy.BigInteger, 

466 nullable=False, 

467 primaryKey=True, 

468 ) 

469 ) 

470 return ( 

471 context.addTable(f"{element.name}_skypix_overlap_summary", summary_spec), 

472 context.addTable(f"{element.name}_skypix_overlap", overlap_spec), 

473 ) 

474 

475 @classmethod 

476 def _make_legacy_overlap_tables( 

477 cls, 

478 context: StaticTablesContext, 

479 spatial: NamedKeyDict[DatabaseTopologicalFamily, list[DimensionElement]], 

480 ) -> None: 

481 for (_, elements1), (_, elements2) in itertools.combinations(spatial.items(), 2): 

482 for element1, element2 in itertools.product(elements1, elements2): 

483 if element1 > element2: 

484 (element2, element1) = (element1, element2) 

485 assert element1.spatial is not None and element2.spatial is not None 

486 assert element1.governor != element2.governor 

487 assert element1.governor is not None and element2.governor is not None 

488 summary_spec = ddl.TableSpec(fields=[]) 

489 addDimensionForeignKey(summary_spec, element1.governor, primaryKey=True) 

490 addDimensionForeignKey(summary_spec, element2.governor, primaryKey=True) 

491 context.addTable(f"{element1.name}_{element2.name}_overlap_summary", summary_spec) 

492 overlap_spec = ddl.TableSpec(fields=[]) 

493 addDimensionForeignKey(overlap_spec, element1.governor, primaryKey=True) 

494 addDimensionForeignKey(overlap_spec, element2.governor, primaryKey=True) 

495 for dimension in element1.required: 

496 if dimension != element1.governor: 

497 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True) 

498 for dimension in element2.required: 

499 if dimension != element2.governor: 

500 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True) 

501 context.addTable(f"{element1.name}_{element2.name}_overlap", overlap_spec) 

502 

503 def _make_record_db_rows( 

504 self, element: DimensionElement, records: Sequence[DimensionRecord], replace: bool 

505 ) -> _DimensionRecordDatabaseRows: 

506 result = _DimensionRecordDatabaseRows() 

507 result.main_rows = [record.toDict() for record in records] 

508 if element.temporal is not None: 

509 TimespanReprClass = self._db.getTimespanRepresentation() 

510 for row in result.main_rows: 

511 timespan = row.pop("timespan") 

512 TimespanReprClass.update(timespan, result=row) 

513 if element.spatial is not None: 

514 result.overlap_insert_rows = self._compute_common_skypix_overlap_inserts(element, records) 

515 if replace: 

516 result.overlap_delete_rows = self._compute_common_skypix_overlap_deletes(records) 

517 if element in self.universe.governor_dimensions: 

518 for related_element_name in self._overlap_tables.keys(): 

519 if self.universe[related_element_name].governor == element: 

520 result.overlap_summary_rows[related_element_name] = [ 

521 { 

522 "skypix_system": self.universe.commonSkyPix.system.name, 

523 "skypix_level": self.universe.commonSkyPix.level, 

524 element.name: record.dataId[element.name], 

525 } 

526 for record in records 

527 ] 

528 return result 

529 

530 def _compute_common_skypix_overlap_deletes( 

531 self, records: Sequence[DimensionRecord] 

532 ) -> list[dict[str, Any]]: 

533 return [ 

534 { 

535 "skypix_system": self.universe.commonSkyPix.system.name, 

536 "skypix_level": self.universe.commonSkyPix.level, 

537 **record.dataId.required, 

538 } 

539 for record in records 

540 ] 

541 

542 def _compute_common_skypix_overlap_inserts( 

543 self, 

544 element: DimensionElement, 

545 records: Sequence[DimensionRecord], 

546 ) -> list[dict[str, Any]]: 

547 _LOG.debug("Precomputing common skypix overlaps for %s.", element.name) 

548 overlap_records: list[dict[str, Any]] = [] 

549 for record in records: 

550 if record.region is None: 

551 continue 

552 base_overlap_record = dict(record.dataId.required) 

553 base_overlap_record["skypix_system"] = self.universe.commonSkyPix.system.name 

554 base_overlap_record["skypix_level"] = self.universe.commonSkyPix.level 

555 for begin, end in self.universe.commonSkyPix.pixelization.envelope(record.region): 

556 for index in range(begin, end): 

557 overlap_records.append({"skypix_index": index, **base_overlap_record}) 

558 return overlap_records 

559 

560 def _insert_overlaps( 

561 self, 

562 element: DimensionElement, 

563 overlap_insert_rows: list[dict[str, Any]], 

564 overlap_delete_rows: list[dict[str, Any]], 

565 skip_existing: bool = False, 

566 ) -> None: 

567 if overlap_delete_rows: 

568 # Since any of the new records might have replaced existing ones 

569 # that already have overlap records, and we don't know which, we 

570 # have no choice but to delete all overlaps for these records and 

571 # recompute them. We include the skypix_system and skypix_level 

572 # column values explicitly instead of just letting the query search 

573 # for all of those related to the given records, because they are 

574 # the first columns in the primary key, and hence searching with 

575 # them will be way faster (and we don't want to add a new index 

576 # just for this operation). 

577 _LOG.debug("Deleting old common skypix overlaps for %s.", element.name) 

578 self._db.delete( 

579 self._overlap_tables[element.name][1], 

580 ["skypix_system", "skypix_level"] + list(element.minimal_group.required), 

581 *overlap_delete_rows, 

582 ) 

583 if overlap_insert_rows: 

584 _LOG.debug("Inserting %d new skypix overlap rows for %s.", len(overlap_insert_rows), element.name) 

585 if skip_existing: 

586 self._db.ensure( 

587 self._overlap_tables[element.name][1], *overlap_insert_rows, primary_key_only=True 

588 ) 

589 else: 

590 self._db.insert(self._overlap_tables[element.name][1], *overlap_insert_rows) 

591 # We have only ever put overlaps with the commonSkyPix system into 

592 # this table, and *probably* only ever will. But the schema leaves 

593 # open the possibility that we should be inserting overlaps for 

594 # some other skypix system, as we once thought we'd support. In 

595 # case that door opens again in the future, we need to check the 

596 # "overlap summary" table to see if are any skypix systems other 

597 # than the common skypix system and raise (rolling back the entire 

598 # transaction) if there are. 

599 summary_table = self._overlap_tables[element.name][0] 

600 check_sql = ( 

601 sqlalchemy.sql.select(summary_table.columns.skypix_system, summary_table.columns.skypix_level) 

602 .select_from(summary_table) 

603 .where( 

604 sqlalchemy.sql.not_( 

605 sqlalchemy.sql.and_( 

606 summary_table.columns.skypix_system == self.universe.commonSkyPix.system.name, 

607 summary_table.columns.skypix_level == self.universe.commonSkyPix.level, 

608 ) 

609 ) 

610 ) 

611 ) 

612 with self._db.query(check_sql) as sql_result: 

613 bad_summary_rows = sql_result.fetchall() 

614 if bad_summary_rows: 

615 bad_skypix_names = [f"{row.skypix_system}{row.skypix.level}" for row in bad_summary_rows] 

616 raise RuntimeError( 

617 f"Data repository has overlaps between {element} and {bad_skypix_names} that " 

618 "are not supported by this version of daf_butler. Please use a newer version." 

619 ) 

620 

621 

622@dataclasses.dataclass 

623class _DimensionRecordDatabaseRows: 

624 """Rows to be inserted into the database whenever a DimensionRecord is 

625 added. 

626 """ 

627 

628 main_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list) 

629 """Rows for the dimension element table itself.""" 

630 

631 overlap_insert_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list) 

632 """Rows for overlaps with the common skypix dimension.""" 

633 

634 overlap_delete_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list) 

635 """Rows for overlaps with the common skypix dimension that should be 

636 deleted before inserting new ones. 

637 """ 

638 

639 overlap_summary_rows: dict[str, list[dict[str, Any]]] = dataclasses.field(default_factory=dict) 

640 """Rows that record which overlaps between skypix dimensiosn and other 

641 dimension elements are stored. 

642 

643 This is populated when inserting governor dimension rows, with keys being 

644 the names of spatial dimension elements associated with that governor. 

645 """ 

646 

647 

648class _DimensionGroupStorage: 

649 """Helper object that manages saved DimensionGroup definitions. 

650 

651 Should generally be constructed by calling `initialize` instead of invoking 

652 the constructor directly. 

653 

654 Parameters 

655 ---------- 

656 db : `Database` 

657 Interface to the underlying database engine and namespace. 

658 idTable : `sqlalchemy.schema.Table` 

659 Table that just holds unique IDs for dimension graphs. 

660 definitionTable : `sqlalchemy.schema.Table` 

661 Table that maps dimension names to the IDs of the dimension graphs to 

662 which they belong. 

663 universe : `DimensionUniverse` 

664 All known dimensions. 

665 """ 

666 

667 def __init__( 

668 self, 

669 db: Database, 

670 idTable: sqlalchemy.schema.Table, 

671 definitionTable: sqlalchemy.schema.Table, 

672 universe: DimensionUniverse, 

673 ): 

674 self._db = db 

675 self._idTable = idTable 

676 self._definitionTable = definitionTable 

677 self._universe = universe 

678 self._keysByGroup: dict[DimensionGroup, int] = {universe.empty: 0} 

679 self._groupsByKey: dict[int, DimensionGroup] = {0: universe.empty} 

680 

681 def clone(self, db: Database) -> _DimensionGroupStorage: 

682 """Make an independent copy of this manager instance bound to a new 

683 `Database` instance. 

684 

685 Parameters 

686 ---------- 

687 db : `Database` 

688 New `Database` object to use when instantiating the manager. 

689 

690 Returns 

691 ------- 

692 instance : `_DimensionGroupStorage` 

693 New manager instance with the same configuration as this instance, 

694 but bound to a new Database object. 

695 """ 

696 return _DimensionGroupStorage( 

697 db=db, idTable=self._idTable, definitionTable=self._definitionTable, universe=self._universe 

698 ) 

699 

700 @classmethod 

701 def initialize( 

702 cls, 

703 db: Database, 

704 context: StaticTablesContext, 

705 *, 

706 universe: DimensionUniverse, 

707 ) -> _DimensionGroupStorage: 

708 """Construct a new instance, including creating tables if necessary. 

709 

710 Parameters 

711 ---------- 

712 db : `Database` 

713 Interface to the underlying database engine and namespace. 

714 context : `StaticTablesContext` 

715 Context object obtained from `Database.declareStaticTables`; used 

716 to declare any tables that should always be present. 

717 universe : `DimensionUniverse` 

718 All known dimensions. 

719 

720 Returns 

721 ------- 

722 storage : `_DimensionGroupStorage` 

723 New instance of this class. 

724 """ 

725 # We need two tables just so we have one where the autoincrement key is 

726 # the only primary key column, as is required by (at least) SQLite. In 

727 # other databases, we might be able to use a Sequence directly. 

728 idTable = context.addTable( 

729 "dimension_graph_key", 

730 ddl.TableSpec( 

731 fields=[ 

732 ddl.FieldSpec( 

733 name="id", 

734 dtype=sqlalchemy.BigInteger, 

735 autoincrement=True, 

736 primaryKey=True, 

737 ), 

738 ], 

739 ), 

740 ) 

741 definitionTable = context.addTable( 

742 "dimension_graph_definition", 

743 ddl.TableSpec( 

744 fields=[ 

745 ddl.FieldSpec(name="dimension_graph_id", dtype=sqlalchemy.BigInteger, primaryKey=True), 

746 ddl.FieldSpec(name="dimension_name", dtype=sqlalchemy.Text, primaryKey=True), 

747 ], 

748 foreignKeys=[ 

749 ddl.ForeignKeySpec( 

750 "dimension_graph_key", 

751 source=("dimension_graph_id",), 

752 target=("id",), 

753 onDelete="CASCADE", 

754 ), 

755 ], 

756 ), 

757 ) 

758 return cls(db, idTable, definitionTable, universe=universe) 

759 

760 def refresh(self) -> None: 

761 """Refresh the in-memory cache of saved DimensionGroup definitions. 

762 

763 This should be done automatically whenever needed, but it can also 

764 be called explicitly. 

765 """ 

766 dimensionNamesByKey: dict[int, set[str]] = defaultdict(set) 

767 with self._db.query(self._definitionTable.select()) as sql_result: 

768 sql_rows = sql_result.mappings().fetchall() 

769 for row in sql_rows: 

770 key = row[self._definitionTable.columns.dimension_graph_id] 

771 dimensionNamesByKey[key].add(row[self._definitionTable.columns.dimension_name]) 

772 keysByGraph: dict[DimensionGroup, int] = {self._universe.empty: 0} 

773 graphsByKey: dict[int, DimensionGroup] = {0: self._universe.empty} 

774 for key, dimensionNames in dimensionNamesByKey.items(): 

775 graph = DimensionGroup(self._universe, names=dimensionNames) 

776 keysByGraph[graph] = key 

777 graphsByKey[key] = graph 

778 self._groupsByKey = graphsByKey 

779 self._keysByGroup = keysByGraph 

780 

781 def save(self, group: DimensionGroup) -> int: 

782 """Save a `DimensionGroup` definition to the database, allowing it to 

783 be retrieved later via the returned key. 

784 

785 Parameters 

786 ---------- 

787 group : `DimensionGroup` 

788 Set of dimensions to save. 

789 

790 Returns 

791 ------- 

792 key : `int` 

793 Integer used as the unique key for this `DimensionGroup` in the 

794 database. 

795 """ 

796 key = self._keysByGroup.get(group) 

797 if key is not None: 

798 return key 

799 # Lock tables and then refresh to guard against races where some other 

800 # process is trying to register the exact same dimension graph. This 

801 # is probably not the most efficient way to do it, but it should be a 

802 # rare operation, especially since the short-circuit above will usually 

803 # work in long-lived data repositories. 

804 with self._db.transaction(lock=[self._idTable, self._definitionTable]): 

805 self.refresh() 

806 key = self._keysByGroup.get(group) 

807 if key is None: 

808 (key,) = self._db.insert(self._idTable, {}, returnIds=True) # type: ignore 

809 self._db.insert( 

810 self._definitionTable, 

811 *[{"dimension_graph_id": key, "dimension_name": name} for name in group.required], 

812 ) 

813 self._keysByGroup[group] = key 

814 self._groupsByKey[key] = group 

815 return key 

816 

817 def load(self, key: int) -> DimensionGroup: 

818 """Retrieve a `DimensionGroup` that was previously saved in the 

819 database. 

820 

821 Parameters 

822 ---------- 

823 key : `int` 

824 Integer used as the unique key for this `DimensionGroup` in the 

825 database. 

826 

827 Returns 

828 ------- 

829 graph : `DimensionGroup` 

830 Retrieved graph. 

831 """ 

832 graph = self._groupsByKey.get(key) 

833 if graph is None: 

834 self.refresh() 

835 graph = self._groupsByKey[key] 

836 return graph 

837 

838 

839class _CommonSkyPixMediatedOverlapsVisitor(OverlapsVisitor): 

840 def __init__( 

841 self, 

842 db: Database, 

843 dimensions: DimensionGroup, 

844 calibration_dataset_types: Set[str | qt.AnyDatasetType], 

845 overlap_tables: Mapping[str, tuple[sqlalchemy.Table, sqlalchemy.Table]], 

846 allow_duplicates: bool, 

847 constraint_data_id: Mapping[str, DataIdValue], 

848 ): 

849 super().__init__(dimensions, calibration_dataset_types) 

850 self.builder: SqlSelectBuilder = SqlJoinsBuilder(db=db).to_select_builder(qt.ColumnSet(dimensions)) 

851 self.postprocessing = Postprocessing() 

852 self.common_skypix = dimensions.universe.commonSkyPix 

853 self.overlap_tables: Mapping[str, tuple[sqlalchemy.Table, sqlalchemy.Table]] = overlap_tables 

854 self.common_skypix_overlaps_done: set[DatabaseDimensionElement] = set() 

855 self.allow_duplicates = allow_duplicates 

856 self.constraint_data_id = constraint_data_id 

857 

858 def visit_spatial_constraint( 

859 self, 

860 element: DimensionElement, 

861 region: Region, 

862 flags: PredicateVisitFlags, 

863 ) -> qt.Predicate | None: 

864 # Reject spatial constraints that are nested inside OR or NOT, because 

865 # the postprocessing needed for those would be a lot harder. 

866 if flags & PredicateVisitFlags.INVERTED or flags & PredicateVisitFlags.HAS_OR_SIBLINGS: 

867 raise UnimplementedQueryError( 

868 "Spatial overlap constraints nested inside OR or NOT are not supported." 

869 ) 

870 # Delegate to super just because that's good practice with 

871 # OverlapVisitor. 

872 super().visit_spatial_constraint(element, region, flags) 

873 match element: 

874 case DatabaseDimensionElement(): 

875 # If this is a database dimension element like tract, patch, or 

876 # visit, we need to: 

877 # - join in the common skypix overlap table for this element; 

878 # - constrain the common skypix index to be inside the 

879 # ranges that overlap the region as a SQL where clause; 

880 # - add postprocessing to reject rows where the database 

881 # dimension element's region doesn't actually overlap the 

882 # region. 

883 self.postprocessing.spatial_where_filtering.append((element, region)) 

884 if self.common_skypix.name in self.dimensions: 

885 # The common skypix dimension should be part of the query 

886 # as a first-class dimension, so we can join in the overlap 

887 # table directly, and fall through to the end of this 

888 # function to construct a Predicate that will turn into the 

889 # SQL WHERE clause we want. 

890 self._join_common_skypix_overlap(element) 

891 skypix = self.common_skypix 

892 else: 

893 # We need to hide the common skypix dimension from the 

894 # larger query, so we make a subquery out of the overlap 

895 # table that embeds the SQL WHERE clause we want and then 

896 # projects out that dimension (with SELECT DISTINCT, to 

897 # avoid introducing duplicate rows into the larger query). 

898 joins_builder = self._make_common_skypix_overlap_joins_builder(element) 

899 sql_where_or: list[sqlalchemy.ColumnElement[bool]] = [] 

900 sql_skypix_col = joins_builder.dimension_keys[self.common_skypix.name][0] 

901 for begin, end in self.common_skypix.pixelization.envelope(region): 

902 sql_where_or.append(sqlalchemy.and_(sql_skypix_col >= begin, sql_skypix_col < end)) 

903 joins_builder.where(sqlalchemy.or_(*sql_where_or)) 

904 self.builder.join( 

905 joins_builder.to_select_builder( 

906 qt.ColumnSet(element.minimal_group).drop_implied_dimension_keys(), 

907 distinct=not self.allow_duplicates, 

908 ).into_joins_builder(postprocessing=None) 

909 ) 

910 # Short circuit here since the SQL WHERE clause has already 

911 # been embedded in the subquery. 

912 return qt.Predicate.from_bool(True) 

913 case SkyPixDimension(): 

914 # If this is a skypix dimension, we can do a index-in-ranges 

915 # test directly on that dimension. Note that this doesn't on 

916 # its own guarantee the skypix dimension column will be in the 

917 # query; that'll be the job of the DirectQueryDriver to sort 

918 # out (generally this will require a dataset using that skypix 

919 # dimension to be joined in, unless this is the common skypix 

920 # system). 

921 assert element.name in self.dimensions, ( 

922 "QueryTree guarantees dimensions are expanded when constraints are added." 

923 ) 

924 skypix = element 

925 case _: 

926 raise UnimplementedQueryError( 

927 f"Spatial overlap constraint for dimension {element} not supported." 

928 ) 

929 # Convert the region-overlap constraint into a skypix 

930 # index range-membership constraint in SQL. 

931 result = qt.Predicate.from_bool(False) 

932 skypix_col_ref = qt.DimensionKeyReference.model_construct(dimension=skypix) 

933 for begin, end in skypix.pixelization.envelope(region): 

934 result = result.logical_or(qt.Predicate.in_range(skypix_col_ref, start=begin, stop=end)) 

935 return result 

936 

937 def visit_spatial_join( 

938 self, a: DimensionElement, b: DimensionElement, flags: PredicateVisitFlags 

939 ) -> qt.Predicate | None: 

940 # Reject spatial joins that are nested inside OR or NOT, because the 

941 # postprocessing needed for those would be a lot harder. 

942 if flags & PredicateVisitFlags.INVERTED or flags & PredicateVisitFlags.HAS_OR_SIBLINGS: 

943 raise UnimplementedQueryError("Spatial overlap joins nested inside OR or NOT are not supported.") 

944 # Delegate to super to check for invalid joins and record this 

945 # "connection" for use when seeing whether to add an automatic join 

946 # later. 

947 super().visit_spatial_join(a, b, flags) 

948 match (a, b): 

949 case (self.common_skypix, DatabaseDimensionElement() as b): 

950 self._join_common_skypix_overlap(b) 

951 case (DatabaseDimensionElement() as a, self.common_skypix): 

952 self._join_common_skypix_overlap(a) 

953 case (DatabaseDimensionElement() as a, DatabaseDimensionElement() as b): 

954 if self.common_skypix.name in self.dimensions: 

955 # We want the common skypix dimension to appear in the 

956 # query as a first-class dimension, so just join in the 

957 # two overlap tables directly. 

958 self._join_common_skypix_overlap(a) 

959 self._join_common_skypix_overlap(b) 

960 else: 

961 # We do not want the common skypix system to appear in the 

962 # query or cause duplicate rows, so we join the two overlap 

963 # tables in a subquery that projects out the common skypix 

964 # index column with SELECT DISTINCT. 

965 

966 self.builder.join( 

967 self._make_common_skypix_overlap_joins_builder(a) 

968 .join(self._make_common_skypix_overlap_joins_builder(b)) 

969 .to_select_builder( 

970 qt.ColumnSet(a.minimal_group | b.minimal_group).drop_implied_dimension_keys(), 

971 distinct=not self.allow_duplicates, 

972 ) 

973 .into_joins_builder(postprocessing=None) 

974 ) 

975 # In both cases we add postprocessing to check that the regions 

976 # really do overlap, since overlapping the same common skypix 

977 # tile is necessary but not sufficient for that. 

978 self.postprocessing.spatial_join_filtering.append((a, b)) 

979 case _: 

980 raise UnimplementedQueryError(f"Unsupported combination for spatial join: {a, b}.") 

981 return qt.Predicate.from_bool(True) 

982 

983 def _join_common_skypix_overlap(self, element: DatabaseDimensionElement) -> None: 

984 if element not in self.common_skypix_overlaps_done: 

985 self.builder.join(self._make_common_skypix_overlap_joins_builder(element)) 

986 self.common_skypix_overlaps_done.add(element) 

987 

988 def _make_common_skypix_overlap_joins_builder(self, element: DatabaseDimensionElement) -> SqlJoinsBuilder: 

989 _, overlap_table = self.overlap_tables[element.name] 

990 where_terms: list[sqlalchemy.ColumnElement] = [ 

991 overlap_table.c.skypix_system == self.common_skypix.system.name, 

992 overlap_table.c.skypix_level == self.common_skypix.level, 

993 ] 

994 for dimension in element.minimal_group.required & self.constraint_data_id.keys(): 

995 where_terms.append( 

996 overlap_table.columns[dimension] == sqlalchemy.literal(self.constraint_data_id[dimension]) 

997 ) 

998 return ( 

999 SqlJoinsBuilder(db=self.builder.joins.db, from_clause=overlap_table) 

1000 .extract_dimensions(element.required.names, skypix_index=self.common_skypix.name) 

1001 .where(sqlalchemy.and_(*where_terms)) 

1002 )