Coverage for python / lsst / daf / butler / registry / dimensions / static.py: 0%
361 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29import dataclasses
30import itertools
31import logging
32from collections import defaultdict
33from collections.abc import Iterable, Mapping, Sequence, Set
34from typing import Any
36import sqlalchemy
38from lsst.sphgeom import Region
40from ... import ddl
41from ..._exceptions import UnimplementedQueryError
42from ..._named import NamedKeyDict
43from ...dimensions import (
44 DatabaseDimensionElement,
45 DatabaseTopologicalFamily,
46 DataCoordinate,
47 DataIdValue,
48 Dimension,
49 DimensionElement,
50 DimensionGroup,
51 DimensionRecord,
52 DimensionRecordSet,
53 DimensionUniverse,
54 SkyPixDimension,
55 addDimensionForeignKey,
56)
57from ...dimensions.record_cache import DimensionRecordCache
58from ...direct_query_driver import ( # Future query system (direct,server).
59 Postprocessing,
60 SqlJoinsBuilder,
61 SqlSelectBuilder,
62)
63from ...queries import tree as qt # Future query system (direct,client,server)
64from ...queries.overlaps import OverlapsVisitor
65from ...queries.visitors import PredicateVisitFlags
66from ..interfaces import Database, DimensionRecordStorageManager, StaticTablesContext, VersionTuple
68# This has to be updated on every schema change
69_VERSION = VersionTuple(6, 0, 2)
71_LOG = logging.getLogger(__name__)
74class StaticDimensionRecordStorageManager(DimensionRecordStorageManager):
75 """An implementation of `DimensionRecordStorageManager` for single-layer
76 `Registry` and the base layers of multi-layer `Registry`.
78 This manager creates `DimensionRecordStorage` instances for all elements
79 in the `DimensionUniverse` in its own `initialize` method, as part of
80 static table creation, so it never needs to manage any dynamic registry
81 tables.
83 Parameters
84 ----------
85 db : `Database`
86 Interface to the underlying database engine and namespace.
87 tables : `dict` [ `str`, `sqlalchemy.Table` ]
88 Mapping from dimension element name to SQL table, for all elements that
89 have `DimensionElement.has_own_table` `True`.
90 overlap_tables : `dict` [ `str`, `tuple` [ `sqlalchemy.Table`, \
91 `sqlalchemy.Table` ] ]
92 Mapping from dimension element name to SQL table holding overlaps
93 between the common skypix dimension and that element, for all elements
94 that have `DimensionElement.has_own_table` `True` and
95 `DimensionElement.spatial` not `None`.
96 dimension_group_storage : `_DimensionGroupStorage`
97 Object that manages saved `DimensionGroup` definitions.
98 universe : `DimensionUniverse`
99 All known dimensions.
100 registry_schema_version : `VersionTuple` or `None`, optional
101 Version of registry schema.
102 """
104 def __init__(
105 self,
106 db: Database,
107 *,
108 tables: dict[str, sqlalchemy.Table],
109 overlap_tables: dict[str, tuple[sqlalchemy.Table, sqlalchemy.Table]],
110 dimension_group_storage: _DimensionGroupStorage,
111 universe: DimensionUniverse,
112 registry_schema_version: VersionTuple | None = None,
113 ):
114 super().__init__(universe=universe, registry_schema_version=registry_schema_version)
115 self._db = db
116 self._tables = tables
117 self._overlap_tables = overlap_tables
118 self._dimension_group_storage = dimension_group_storage
120 def clone(self, db: Database) -> StaticDimensionRecordStorageManager:
121 return StaticDimensionRecordStorageManager(
122 db,
123 tables=self._tables,
124 overlap_tables=self._overlap_tables,
125 dimension_group_storage=self._dimension_group_storage.clone(db),
126 universe=self.universe,
127 registry_schema_version=self._registry_schema_version,
128 )
130 @classmethod
131 def initialize(
132 cls,
133 db: Database,
134 context: StaticTablesContext,
135 *,
136 universe: DimensionUniverse,
137 registry_schema_version: VersionTuple | None = None,
138 ) -> DimensionRecordStorageManager:
139 # Docstring inherited from DimensionRecordStorageManager.
140 tables: dict[str, sqlalchemy.Table] = {}
141 # Define tables for governor dimensions, which are never spatial or
142 # temporal and always have tables.
143 for dimension in universe.governor_dimensions:
144 spec = dimension.RecordClass.fields.makeTableSpec(
145 TimespanReprClass=db.getTimespanRepresentation()
146 )
147 tables[dimension.name] = context.addTable(dimension.name, spec)
148 # Define tables for database dimension elements, which may or may not
149 # have their own tables and may be spatial or temporal.
150 spatial = NamedKeyDict[DatabaseTopologicalFamily, list[DimensionElement]]()
151 overlap_tables: dict[str, tuple[sqlalchemy.Table, sqlalchemy.Table]] = {}
152 for element in universe.database_elements:
153 if not element.has_own_table:
154 continue
155 spec = element.RecordClass.fields.makeTableSpec(TimespanReprClass=db.getTimespanRepresentation())
156 tables[element.name] = context.addTable(element.name, spec)
157 if element.spatial is not None:
158 spatial.setdefault(element.spatial, []).append(element)
159 overlap_tables[element.name] = cls._make_skypix_overlap_tables(context, element)
160 for field_name in spec.fields.names:
161 if (
162 len(qt.ColumnSet.get_qualified_name(element.name, field_name))
163 >= db.dialect.max_identifier_length
164 ):
165 # Being able to assume that all dimension fields fit inside
166 # the DB's identifier limit is really convenient and very
167 # unlikely to cause trouble in practice. We'll just make
168 # sure we catch any such trouble as early as possible.
169 raise RuntimeError(
170 f"Dimension filed '{element.name}.{field_name}' is too long for this database. "
171 "Please file a ticket for long-field support if this was not a mistake."
172 )
173 # Add some tables for materialized overlaps between database
174 # dimensions. We've never used these and no longer plan to, but we
175 # have to keep creating them to keep schema versioning consistent.
176 cls._make_legacy_overlap_tables(context, spatial)
177 # Create tables that store DimensionGroup definitions.
178 dimension_group_storage = _DimensionGroupStorage.initialize(db, context, universe=universe)
179 return cls(
180 db=db,
181 tables=tables,
182 overlap_tables=overlap_tables,
183 universe=universe,
184 dimension_group_storage=dimension_group_storage,
185 registry_schema_version=registry_schema_version,
186 )
188 def fetch_cache_dict(self) -> dict[str, DimensionRecordSet]:
189 # Docstring inherited.
190 result: dict[str, DimensionRecordSet] = {}
191 with self._db.transaction():
192 for element in self.universe.elements:
193 if not element.is_cached:
194 continue
195 assert not element.temporal, (
196 "Cached dimension elements should not be spatial or temporal, as that "
197 "suggests a large number of records."
198 )
199 if element.implied_union_target is not None:
200 assert isinstance(element, Dimension), "Only dimensions can be implied dependencies."
201 table = self._tables[element.implied_union_target.name]
202 sql = sqlalchemy.select(
203 table.columns[element.name].label(element.primary_key.name)
204 ).distinct()
205 else:
206 table = self._tables[element.name]
207 sql = table.select()
208 with self._db.query(sql) as results:
209 result[element.name] = DimensionRecordSet(
210 element=element,
211 records=[element.RecordClass(**row) for row in results.mappings()],
212 )
213 return result
215 def insert(
216 self,
217 element: DimensionElement,
218 *records: DimensionRecord,
219 replace: bool = False,
220 skip_existing: bool = False,
221 ) -> None:
222 # Docstring inherited.
223 if not element.has_own_table:
224 raise TypeError(f"Cannot insert {element.name} records.")
225 db_rows = self._make_record_db_rows(element, records, replace=replace)
226 table = self._tables[element.name]
227 with self._db.transaction():
228 if replace:
229 self._db.replace(table, *db_rows.main_rows)
230 elif skip_existing:
231 self._db.ensure(table, *db_rows.main_rows, primary_key_only=True)
232 else:
233 self._db.insert(table, *db_rows.main_rows)
234 self._insert_overlaps(
235 element, db_rows.overlap_insert_rows, db_rows.overlap_delete_rows, skip_existing=skip_existing
236 )
237 for related_element_name, summary_rows in db_rows.overlap_summary_rows.items():
238 self._db.ensure(self._overlap_tables[related_element_name][0], *summary_rows)
240 def sync(self, record: DimensionRecord, update: bool = False) -> bool | dict[str, Any]:
241 # Docstring inherited.
242 if not record.definition.has_own_table:
243 raise TypeError(f"Cannot sync {record.definition.name} records.")
244 # We might not need the overlap rows at all; we won't know until we try
245 # to insert the main row. But we figure it's better to spend the time
246 # to compute them in advance always *outside* the database transaction
247 # than to compute them only as-needed inside the database transaction,
248 # since in-transaction time is especially precious.
249 db_rows = self._make_record_db_rows(record.definition, [record], replace=True)
250 (compared,) = db_rows.main_rows
251 keys = {}
252 for name in record.fields.required.names:
253 keys[name] = compared.pop(name)
254 with self._db.transaction():
255 _, inserted_or_updated = self._db.sync(
256 self._tables[record.definition.name],
257 keys=keys,
258 compared=compared,
259 update=update,
260 )
261 if inserted_or_updated:
262 if inserted_or_updated is True:
263 # Inserted a new row, so we just need to insert new
264 # overlap rows (if there are any).
265 self._insert_overlaps(
266 record.definition, db_rows.overlap_insert_rows, overlap_delete_rows=[]
267 )
268 elif "region" in inserted_or_updated:
269 # Updated the region, so we need to delete old overlap
270 # rows and insert new ones.
271 self._insert_overlaps(
272 record.definition, db_rows.overlap_insert_rows, db_rows.overlap_delete_rows
273 )
274 for related_element_name, summary_rows in db_rows.overlap_summary_rows.items():
275 self._db.ensure(self._overlap_tables[related_element_name][0], *summary_rows)
276 return inserted_or_updated
278 def fetch_one(
279 self,
280 element_name: str,
281 data_id: DataCoordinate,
282 cache: DimensionRecordCache,
283 ) -> DimensionRecord | None:
284 # Docstring inherited.
285 element = self.universe[element_name]
286 if element_name in cache:
287 try:
288 return cache[element_name].find(data_id)
289 except LookupError:
290 return None
291 if element.implied_union_target is not None:
292 assert isinstance(element, Dimension), "Only dimensions can be implied dependencies."
293 table = self._tables[element.implied_union_target.name]
294 sql = sqlalchemy.select(table.columns[element.name].label(element.primary_key.name)).where(
295 table.columns[element_name] == data_id[element_name]
296 )
297 elif isinstance(element, SkyPixDimension):
298 id = data_id[element_name]
299 return element.RecordClass(id=id, region=element.pixelization.pixel(id))
300 else:
301 table = self._tables[element.name]
302 sql = table.select().where(
303 *[
304 table.columns[column_name] == data_id[dimension_name]
305 for column_name, dimension_name in zip(
306 element.schema.required.names, element.required.names
307 )
308 ]
309 )
310 with self._db.query(sql) as results:
311 row = results.fetchone()
312 if row is None:
313 return None
314 mapping: Mapping
315 if element.temporal is not None:
316 mapping = dict(**row._mapping)
317 timespan = self._db.getTimespanRepresentation().extract(mapping)
318 for name in self._db.getTimespanRepresentation().getFieldNames():
319 del mapping[name]
320 mapping["timespan"] = timespan
321 else:
322 mapping = row._mapping
323 return element.RecordClass(**mapping)
325 def save_dimension_group(self, group: DimensionGroup) -> int:
326 # Docstring inherited from DimensionRecordStorageManager.
327 return self._dimension_group_storage.save(group)
329 def load_dimension_group(self, key: int) -> DimensionGroup:
330 # Docstring inherited from DimensionRecordStorageManager.
331 return self._dimension_group_storage.load(key)
333 def make_joins_builder(self, element: DimensionElement, fields: Set[str]) -> SqlJoinsBuilder:
334 if element.implied_union_target is not None:
335 assert not fields, "Dimensions with implied-union storage never have fields."
336 return SqlSelectBuilder(
337 self.make_joins_builder(element.implied_union_target, fields),
338 columns=qt.ColumnSet(element.minimal_group).drop_implied_dimension_keys(),
339 distinct=True,
340 ).into_joins_builder(postprocessing=None)
341 if not element.has_own_table:
342 raise UnimplementedQueryError(f"Cannot join dimension element {element} with no table.")
343 table = self._tables[element.name]
344 result = SqlJoinsBuilder(db=self._db, from_clause=table)
345 for dimension_name, column_name in zip(element.required.names, element.schema.required.names):
346 result.dimension_keys[dimension_name].append(table.columns[column_name])
347 result.extract_dimensions(element.implied.names)
348 for field in fields:
349 if field == "timespan":
350 result.timespans[element.name] = self._db.getTimespanRepresentation().from_columns(
351 table.columns
352 )
353 else:
354 result.fields[element.name][field] = table.columns[field]
355 return result
357 def process_query_overlaps(
358 self,
359 dimensions: DimensionGroup,
360 predicate: qt.Predicate,
361 join_operands: Iterable[DimensionGroup],
362 calibration_dataset_types: Set[str | qt.AnyDatasetType],
363 allow_duplicates: bool,
364 constraint_data_id: Mapping[str, DataIdValue],
365 ) -> tuple[qt.Predicate, SqlSelectBuilder, Postprocessing]:
366 overlaps_visitor = _CommonSkyPixMediatedOverlapsVisitor(
367 self._db,
368 dimensions,
369 calibration_dataset_types,
370 self._overlap_tables,
371 allow_duplicates,
372 constraint_data_id=constraint_data_id,
373 )
374 new_predicate = overlaps_visitor.run(predicate, join_operands)
375 return new_predicate, overlaps_visitor.builder, overlaps_visitor.postprocessing
377 @classmethod
378 def currentVersions(cls) -> list[VersionTuple]:
379 # Docstring inherited from VersionedExtension.
380 return [_VERSION]
382 @classmethod
383 def _make_skypix_overlap_tables(
384 cls, context: StaticTablesContext, element: DimensionElement
385 ) -> tuple[sqlalchemy.Table, sqlalchemy.Table]:
386 assert element.governor is not None
387 summary_spec = ddl.TableSpec(
388 fields=[
389 ddl.FieldSpec(
390 name="skypix_system",
391 dtype=sqlalchemy.String,
392 length=16,
393 nullable=False,
394 primaryKey=True,
395 ),
396 ddl.FieldSpec(
397 name="skypix_level",
398 dtype=sqlalchemy.SmallInteger,
399 nullable=False,
400 primaryKey=True,
401 ),
402 ]
403 )
404 addDimensionForeignKey(summary_spec, element.governor, primaryKey=True)
405 overlap_spec = ddl.TableSpec(
406 fields=[
407 ddl.FieldSpec(
408 name="skypix_system",
409 dtype=sqlalchemy.String,
410 length=16,
411 nullable=False,
412 primaryKey=True,
413 ),
414 ddl.FieldSpec(
415 name="skypix_level",
416 dtype=sqlalchemy.SmallInteger,
417 nullable=False,
418 primaryKey=True,
419 ),
420 # (more columns added below)
421 ],
422 unique=set(),
423 indexes={
424 # This index has the same fields as the PK, in a different
425 # order, to facilitate queries that know skypix_index and want
426 # to find the other element.
427 ddl.IndexSpec(
428 "skypix_system",
429 "skypix_level",
430 "skypix_index",
431 *element.minimal_group.required,
432 ),
433 },
434 foreignKeys=[
435 # Foreign key to summary table. This makes sure we don't
436 # materialize any overlaps without remembering that we've done
437 # so in the summary table, though it can't prevent the converse
438 # of adding a summary row without adding overlap row (either of
439 # those is a logic bug, of course, but we want to be defensive
440 # about those). Using ON DELETE CASCADE, it'd be very easy to
441 # implement "disabling" an overlap materialization, because we
442 # can just delete the summary row.
443 # Note that the governor dimension column is added below, in
444 # the call to addDimensionForeignKey.
445 ddl.ForeignKeySpec(
446 f"{element.name}_skypix_overlap_summary",
447 source=("skypix_system", "skypix_level", element.governor.name),
448 target=("skypix_system", "skypix_level", element.governor.name),
449 onDelete="CASCADE",
450 ),
451 ],
452 )
453 # Add fields for the standard element this class manages overlaps for.
454 # This is guaranteed to add a column for the governor dimension,
455 # because that's a required dependency of element.
456 for dimension in element.required:
457 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True)
458 # Add field for the actual skypix index. We do this later because I
459 # think we care (at least a bit) about the order in which the primary
460 # key is defined, in that we want a non-summary column like this one
461 # to appear after the governor dimension column.
462 overlap_spec.fields.add(
463 ddl.FieldSpec(
464 name="skypix_index",
465 dtype=sqlalchemy.BigInteger,
466 nullable=False,
467 primaryKey=True,
468 )
469 )
470 return (
471 context.addTable(f"{element.name}_skypix_overlap_summary", summary_spec),
472 context.addTable(f"{element.name}_skypix_overlap", overlap_spec),
473 )
475 @classmethod
476 def _make_legacy_overlap_tables(
477 cls,
478 context: StaticTablesContext,
479 spatial: NamedKeyDict[DatabaseTopologicalFamily, list[DimensionElement]],
480 ) -> None:
481 for (_, elements1), (_, elements2) in itertools.combinations(spatial.items(), 2):
482 for element1, element2 in itertools.product(elements1, elements2):
483 if element1 > element2:
484 (element2, element1) = (element1, element2)
485 assert element1.spatial is not None and element2.spatial is not None
486 assert element1.governor != element2.governor
487 assert element1.governor is not None and element2.governor is not None
488 summary_spec = ddl.TableSpec(fields=[])
489 addDimensionForeignKey(summary_spec, element1.governor, primaryKey=True)
490 addDimensionForeignKey(summary_spec, element2.governor, primaryKey=True)
491 context.addTable(f"{element1.name}_{element2.name}_overlap_summary", summary_spec)
492 overlap_spec = ddl.TableSpec(fields=[])
493 addDimensionForeignKey(overlap_spec, element1.governor, primaryKey=True)
494 addDimensionForeignKey(overlap_spec, element2.governor, primaryKey=True)
495 for dimension in element1.required:
496 if dimension != element1.governor:
497 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True)
498 for dimension in element2.required:
499 if dimension != element2.governor:
500 addDimensionForeignKey(overlap_spec, dimension, primaryKey=True)
501 context.addTable(f"{element1.name}_{element2.name}_overlap", overlap_spec)
503 def _make_record_db_rows(
504 self, element: DimensionElement, records: Sequence[DimensionRecord], replace: bool
505 ) -> _DimensionRecordDatabaseRows:
506 result = _DimensionRecordDatabaseRows()
507 result.main_rows = [record.toDict() for record in records]
508 if element.temporal is not None:
509 TimespanReprClass = self._db.getTimespanRepresentation()
510 for row in result.main_rows:
511 timespan = row.pop("timespan")
512 TimespanReprClass.update(timespan, result=row)
513 if element.spatial is not None:
514 result.overlap_insert_rows = self._compute_common_skypix_overlap_inserts(element, records)
515 if replace:
516 result.overlap_delete_rows = self._compute_common_skypix_overlap_deletes(records)
517 if element in self.universe.governor_dimensions:
518 for related_element_name in self._overlap_tables.keys():
519 if self.universe[related_element_name].governor == element:
520 result.overlap_summary_rows[related_element_name] = [
521 {
522 "skypix_system": self.universe.commonSkyPix.system.name,
523 "skypix_level": self.universe.commonSkyPix.level,
524 element.name: record.dataId[element.name],
525 }
526 for record in records
527 ]
528 return result
530 def _compute_common_skypix_overlap_deletes(
531 self, records: Sequence[DimensionRecord]
532 ) -> list[dict[str, Any]]:
533 return [
534 {
535 "skypix_system": self.universe.commonSkyPix.system.name,
536 "skypix_level": self.universe.commonSkyPix.level,
537 **record.dataId.required,
538 }
539 for record in records
540 ]
542 def _compute_common_skypix_overlap_inserts(
543 self,
544 element: DimensionElement,
545 records: Sequence[DimensionRecord],
546 ) -> list[dict[str, Any]]:
547 _LOG.debug("Precomputing common skypix overlaps for %s.", element.name)
548 overlap_records: list[dict[str, Any]] = []
549 for record in records:
550 if record.region is None:
551 continue
552 base_overlap_record = dict(record.dataId.required)
553 base_overlap_record["skypix_system"] = self.universe.commonSkyPix.system.name
554 base_overlap_record["skypix_level"] = self.universe.commonSkyPix.level
555 for begin, end in self.universe.commonSkyPix.pixelization.envelope(record.region):
556 for index in range(begin, end):
557 overlap_records.append({"skypix_index": index, **base_overlap_record})
558 return overlap_records
560 def _insert_overlaps(
561 self,
562 element: DimensionElement,
563 overlap_insert_rows: list[dict[str, Any]],
564 overlap_delete_rows: list[dict[str, Any]],
565 skip_existing: bool = False,
566 ) -> None:
567 if overlap_delete_rows:
568 # Since any of the new records might have replaced existing ones
569 # that already have overlap records, and we don't know which, we
570 # have no choice but to delete all overlaps for these records and
571 # recompute them. We include the skypix_system and skypix_level
572 # column values explicitly instead of just letting the query search
573 # for all of those related to the given records, because they are
574 # the first columns in the primary key, and hence searching with
575 # them will be way faster (and we don't want to add a new index
576 # just for this operation).
577 _LOG.debug("Deleting old common skypix overlaps for %s.", element.name)
578 self._db.delete(
579 self._overlap_tables[element.name][1],
580 ["skypix_system", "skypix_level"] + list(element.minimal_group.required),
581 *overlap_delete_rows,
582 )
583 if overlap_insert_rows:
584 _LOG.debug("Inserting %d new skypix overlap rows for %s.", len(overlap_insert_rows), element.name)
585 if skip_existing:
586 self._db.ensure(
587 self._overlap_tables[element.name][1], *overlap_insert_rows, primary_key_only=True
588 )
589 else:
590 self._db.insert(self._overlap_tables[element.name][1], *overlap_insert_rows)
591 # We have only ever put overlaps with the commonSkyPix system into
592 # this table, and *probably* only ever will. But the schema leaves
593 # open the possibility that we should be inserting overlaps for
594 # some other skypix system, as we once thought we'd support. In
595 # case that door opens again in the future, we need to check the
596 # "overlap summary" table to see if are any skypix systems other
597 # than the common skypix system and raise (rolling back the entire
598 # transaction) if there are.
599 summary_table = self._overlap_tables[element.name][0]
600 check_sql = (
601 sqlalchemy.sql.select(summary_table.columns.skypix_system, summary_table.columns.skypix_level)
602 .select_from(summary_table)
603 .where(
604 sqlalchemy.sql.not_(
605 sqlalchemy.sql.and_(
606 summary_table.columns.skypix_system == self.universe.commonSkyPix.system.name,
607 summary_table.columns.skypix_level == self.universe.commonSkyPix.level,
608 )
609 )
610 )
611 )
612 with self._db.query(check_sql) as sql_result:
613 bad_summary_rows = sql_result.fetchall()
614 if bad_summary_rows:
615 bad_skypix_names = [f"{row.skypix_system}{row.skypix.level}" for row in bad_summary_rows]
616 raise RuntimeError(
617 f"Data repository has overlaps between {element} and {bad_skypix_names} that "
618 "are not supported by this version of daf_butler. Please use a newer version."
619 )
622@dataclasses.dataclass
623class _DimensionRecordDatabaseRows:
624 """Rows to be inserted into the database whenever a DimensionRecord is
625 added.
626 """
628 main_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list)
629 """Rows for the dimension element table itself."""
631 overlap_insert_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list)
632 """Rows for overlaps with the common skypix dimension."""
634 overlap_delete_rows: list[dict[str, Any]] = dataclasses.field(default_factory=list)
635 """Rows for overlaps with the common skypix dimension that should be
636 deleted before inserting new ones.
637 """
639 overlap_summary_rows: dict[str, list[dict[str, Any]]] = dataclasses.field(default_factory=dict)
640 """Rows that record which overlaps between skypix dimensiosn and other
641 dimension elements are stored.
643 This is populated when inserting governor dimension rows, with keys being
644 the names of spatial dimension elements associated with that governor.
645 """
648class _DimensionGroupStorage:
649 """Helper object that manages saved DimensionGroup definitions.
651 Should generally be constructed by calling `initialize` instead of invoking
652 the constructor directly.
654 Parameters
655 ----------
656 db : `Database`
657 Interface to the underlying database engine and namespace.
658 idTable : `sqlalchemy.schema.Table`
659 Table that just holds unique IDs for dimension graphs.
660 definitionTable : `sqlalchemy.schema.Table`
661 Table that maps dimension names to the IDs of the dimension graphs to
662 which they belong.
663 universe : `DimensionUniverse`
664 All known dimensions.
665 """
667 def __init__(
668 self,
669 db: Database,
670 idTable: sqlalchemy.schema.Table,
671 definitionTable: sqlalchemy.schema.Table,
672 universe: DimensionUniverse,
673 ):
674 self._db = db
675 self._idTable = idTable
676 self._definitionTable = definitionTable
677 self._universe = universe
678 self._keysByGroup: dict[DimensionGroup, int] = {universe.empty: 0}
679 self._groupsByKey: dict[int, DimensionGroup] = {0: universe.empty}
681 def clone(self, db: Database) -> _DimensionGroupStorage:
682 """Make an independent copy of this manager instance bound to a new
683 `Database` instance.
685 Parameters
686 ----------
687 db : `Database`
688 New `Database` object to use when instantiating the manager.
690 Returns
691 -------
692 instance : `_DimensionGroupStorage`
693 New manager instance with the same configuration as this instance,
694 but bound to a new Database object.
695 """
696 return _DimensionGroupStorage(
697 db=db, idTable=self._idTable, definitionTable=self._definitionTable, universe=self._universe
698 )
700 @classmethod
701 def initialize(
702 cls,
703 db: Database,
704 context: StaticTablesContext,
705 *,
706 universe: DimensionUniverse,
707 ) -> _DimensionGroupStorage:
708 """Construct a new instance, including creating tables if necessary.
710 Parameters
711 ----------
712 db : `Database`
713 Interface to the underlying database engine and namespace.
714 context : `StaticTablesContext`
715 Context object obtained from `Database.declareStaticTables`; used
716 to declare any tables that should always be present.
717 universe : `DimensionUniverse`
718 All known dimensions.
720 Returns
721 -------
722 storage : `_DimensionGroupStorage`
723 New instance of this class.
724 """
725 # We need two tables just so we have one where the autoincrement key is
726 # the only primary key column, as is required by (at least) SQLite. In
727 # other databases, we might be able to use a Sequence directly.
728 idTable = context.addTable(
729 "dimension_graph_key",
730 ddl.TableSpec(
731 fields=[
732 ddl.FieldSpec(
733 name="id",
734 dtype=sqlalchemy.BigInteger,
735 autoincrement=True,
736 primaryKey=True,
737 ),
738 ],
739 ),
740 )
741 definitionTable = context.addTable(
742 "dimension_graph_definition",
743 ddl.TableSpec(
744 fields=[
745 ddl.FieldSpec(name="dimension_graph_id", dtype=sqlalchemy.BigInteger, primaryKey=True),
746 ddl.FieldSpec(name="dimension_name", dtype=sqlalchemy.Text, primaryKey=True),
747 ],
748 foreignKeys=[
749 ddl.ForeignKeySpec(
750 "dimension_graph_key",
751 source=("dimension_graph_id",),
752 target=("id",),
753 onDelete="CASCADE",
754 ),
755 ],
756 ),
757 )
758 return cls(db, idTable, definitionTable, universe=universe)
760 def refresh(self) -> None:
761 """Refresh the in-memory cache of saved DimensionGroup definitions.
763 This should be done automatically whenever needed, but it can also
764 be called explicitly.
765 """
766 dimensionNamesByKey: dict[int, set[str]] = defaultdict(set)
767 with self._db.query(self._definitionTable.select()) as sql_result:
768 sql_rows = sql_result.mappings().fetchall()
769 for row in sql_rows:
770 key = row[self._definitionTable.columns.dimension_graph_id]
771 dimensionNamesByKey[key].add(row[self._definitionTable.columns.dimension_name])
772 keysByGraph: dict[DimensionGroup, int] = {self._universe.empty: 0}
773 graphsByKey: dict[int, DimensionGroup] = {0: self._universe.empty}
774 for key, dimensionNames in dimensionNamesByKey.items():
775 graph = DimensionGroup(self._universe, names=dimensionNames)
776 keysByGraph[graph] = key
777 graphsByKey[key] = graph
778 self._groupsByKey = graphsByKey
779 self._keysByGroup = keysByGraph
781 def save(self, group: DimensionGroup) -> int:
782 """Save a `DimensionGroup` definition to the database, allowing it to
783 be retrieved later via the returned key.
785 Parameters
786 ----------
787 group : `DimensionGroup`
788 Set of dimensions to save.
790 Returns
791 -------
792 key : `int`
793 Integer used as the unique key for this `DimensionGroup` in the
794 database.
795 """
796 key = self._keysByGroup.get(group)
797 if key is not None:
798 return key
799 # Lock tables and then refresh to guard against races where some other
800 # process is trying to register the exact same dimension graph. This
801 # is probably not the most efficient way to do it, but it should be a
802 # rare operation, especially since the short-circuit above will usually
803 # work in long-lived data repositories.
804 with self._db.transaction(lock=[self._idTable, self._definitionTable]):
805 self.refresh()
806 key = self._keysByGroup.get(group)
807 if key is None:
808 (key,) = self._db.insert(self._idTable, {}, returnIds=True) # type: ignore
809 self._db.insert(
810 self._definitionTable,
811 *[{"dimension_graph_id": key, "dimension_name": name} for name in group.required],
812 )
813 self._keysByGroup[group] = key
814 self._groupsByKey[key] = group
815 return key
817 def load(self, key: int) -> DimensionGroup:
818 """Retrieve a `DimensionGroup` that was previously saved in the
819 database.
821 Parameters
822 ----------
823 key : `int`
824 Integer used as the unique key for this `DimensionGroup` in the
825 database.
827 Returns
828 -------
829 graph : `DimensionGroup`
830 Retrieved graph.
831 """
832 graph = self._groupsByKey.get(key)
833 if graph is None:
834 self.refresh()
835 graph = self._groupsByKey[key]
836 return graph
839class _CommonSkyPixMediatedOverlapsVisitor(OverlapsVisitor):
840 def __init__(
841 self,
842 db: Database,
843 dimensions: DimensionGroup,
844 calibration_dataset_types: Set[str | qt.AnyDatasetType],
845 overlap_tables: Mapping[str, tuple[sqlalchemy.Table, sqlalchemy.Table]],
846 allow_duplicates: bool,
847 constraint_data_id: Mapping[str, DataIdValue],
848 ):
849 super().__init__(dimensions, calibration_dataset_types)
850 self.builder: SqlSelectBuilder = SqlJoinsBuilder(db=db).to_select_builder(qt.ColumnSet(dimensions))
851 self.postprocessing = Postprocessing()
852 self.common_skypix = dimensions.universe.commonSkyPix
853 self.overlap_tables: Mapping[str, tuple[sqlalchemy.Table, sqlalchemy.Table]] = overlap_tables
854 self.common_skypix_overlaps_done: set[DatabaseDimensionElement] = set()
855 self.allow_duplicates = allow_duplicates
856 self.constraint_data_id = constraint_data_id
858 def visit_spatial_constraint(
859 self,
860 element: DimensionElement,
861 region: Region,
862 flags: PredicateVisitFlags,
863 ) -> qt.Predicate | None:
864 # Reject spatial constraints that are nested inside OR or NOT, because
865 # the postprocessing needed for those would be a lot harder.
866 if flags & PredicateVisitFlags.INVERTED or flags & PredicateVisitFlags.HAS_OR_SIBLINGS:
867 raise UnimplementedQueryError(
868 "Spatial overlap constraints nested inside OR or NOT are not supported."
869 )
870 # Delegate to super just because that's good practice with
871 # OverlapVisitor.
872 super().visit_spatial_constraint(element, region, flags)
873 match element:
874 case DatabaseDimensionElement():
875 # If this is a database dimension element like tract, patch, or
876 # visit, we need to:
877 # - join in the common skypix overlap table for this element;
878 # - constrain the common skypix index to be inside the
879 # ranges that overlap the region as a SQL where clause;
880 # - add postprocessing to reject rows where the database
881 # dimension element's region doesn't actually overlap the
882 # region.
883 self.postprocessing.spatial_where_filtering.append((element, region))
884 if self.common_skypix.name in self.dimensions:
885 # The common skypix dimension should be part of the query
886 # as a first-class dimension, so we can join in the overlap
887 # table directly, and fall through to the end of this
888 # function to construct a Predicate that will turn into the
889 # SQL WHERE clause we want.
890 self._join_common_skypix_overlap(element)
891 skypix = self.common_skypix
892 else:
893 # We need to hide the common skypix dimension from the
894 # larger query, so we make a subquery out of the overlap
895 # table that embeds the SQL WHERE clause we want and then
896 # projects out that dimension (with SELECT DISTINCT, to
897 # avoid introducing duplicate rows into the larger query).
898 joins_builder = self._make_common_skypix_overlap_joins_builder(element)
899 sql_where_or: list[sqlalchemy.ColumnElement[bool]] = []
900 sql_skypix_col = joins_builder.dimension_keys[self.common_skypix.name][0]
901 for begin, end in self.common_skypix.pixelization.envelope(region):
902 sql_where_or.append(sqlalchemy.and_(sql_skypix_col >= begin, sql_skypix_col < end))
903 joins_builder.where(sqlalchemy.or_(*sql_where_or))
904 self.builder.join(
905 joins_builder.to_select_builder(
906 qt.ColumnSet(element.minimal_group).drop_implied_dimension_keys(),
907 distinct=not self.allow_duplicates,
908 ).into_joins_builder(postprocessing=None)
909 )
910 # Short circuit here since the SQL WHERE clause has already
911 # been embedded in the subquery.
912 return qt.Predicate.from_bool(True)
913 case SkyPixDimension():
914 # If this is a skypix dimension, we can do a index-in-ranges
915 # test directly on that dimension. Note that this doesn't on
916 # its own guarantee the skypix dimension column will be in the
917 # query; that'll be the job of the DirectQueryDriver to sort
918 # out (generally this will require a dataset using that skypix
919 # dimension to be joined in, unless this is the common skypix
920 # system).
921 assert element.name in self.dimensions, (
922 "QueryTree guarantees dimensions are expanded when constraints are added."
923 )
924 skypix = element
925 case _:
926 raise UnimplementedQueryError(
927 f"Spatial overlap constraint for dimension {element} not supported."
928 )
929 # Convert the region-overlap constraint into a skypix
930 # index range-membership constraint in SQL.
931 result = qt.Predicate.from_bool(False)
932 skypix_col_ref = qt.DimensionKeyReference.model_construct(dimension=skypix)
933 for begin, end in skypix.pixelization.envelope(region):
934 result = result.logical_or(qt.Predicate.in_range(skypix_col_ref, start=begin, stop=end))
935 return result
937 def visit_spatial_join(
938 self, a: DimensionElement, b: DimensionElement, flags: PredicateVisitFlags
939 ) -> qt.Predicate | None:
940 # Reject spatial joins that are nested inside OR or NOT, because the
941 # postprocessing needed for those would be a lot harder.
942 if flags & PredicateVisitFlags.INVERTED or flags & PredicateVisitFlags.HAS_OR_SIBLINGS:
943 raise UnimplementedQueryError("Spatial overlap joins nested inside OR or NOT are not supported.")
944 # Delegate to super to check for invalid joins and record this
945 # "connection" for use when seeing whether to add an automatic join
946 # later.
947 super().visit_spatial_join(a, b, flags)
948 match (a, b):
949 case (self.common_skypix, DatabaseDimensionElement() as b):
950 self._join_common_skypix_overlap(b)
951 case (DatabaseDimensionElement() as a, self.common_skypix):
952 self._join_common_skypix_overlap(a)
953 case (DatabaseDimensionElement() as a, DatabaseDimensionElement() as b):
954 if self.common_skypix.name in self.dimensions:
955 # We want the common skypix dimension to appear in the
956 # query as a first-class dimension, so just join in the
957 # two overlap tables directly.
958 self._join_common_skypix_overlap(a)
959 self._join_common_skypix_overlap(b)
960 else:
961 # We do not want the common skypix system to appear in the
962 # query or cause duplicate rows, so we join the two overlap
963 # tables in a subquery that projects out the common skypix
964 # index column with SELECT DISTINCT.
966 self.builder.join(
967 self._make_common_skypix_overlap_joins_builder(a)
968 .join(self._make_common_skypix_overlap_joins_builder(b))
969 .to_select_builder(
970 qt.ColumnSet(a.minimal_group | b.minimal_group).drop_implied_dimension_keys(),
971 distinct=not self.allow_duplicates,
972 )
973 .into_joins_builder(postprocessing=None)
974 )
975 # In both cases we add postprocessing to check that the regions
976 # really do overlap, since overlapping the same common skypix
977 # tile is necessary but not sufficient for that.
978 self.postprocessing.spatial_join_filtering.append((a, b))
979 case _:
980 raise UnimplementedQueryError(f"Unsupported combination for spatial join: {a, b}.")
981 return qt.Predicate.from_bool(True)
983 def _join_common_skypix_overlap(self, element: DatabaseDimensionElement) -> None:
984 if element not in self.common_skypix_overlaps_done:
985 self.builder.join(self._make_common_skypix_overlap_joins_builder(element))
986 self.common_skypix_overlaps_done.add(element)
988 def _make_common_skypix_overlap_joins_builder(self, element: DatabaseDimensionElement) -> SqlJoinsBuilder:
989 _, overlap_table = self.overlap_tables[element.name]
990 where_terms: list[sqlalchemy.ColumnElement] = [
991 overlap_table.c.skypix_system == self.common_skypix.system.name,
992 overlap_table.c.skypix_level == self.common_skypix.level,
993 ]
994 for dimension in element.minimal_group.required & self.constraint_data_id.keys():
995 where_terms.append(
996 overlap_table.columns[dimension] == sqlalchemy.literal(self.constraint_data_id[dimension])
997 )
998 return (
999 SqlJoinsBuilder(db=self.builder.joins.db, from_clause=overlap_table)
1000 .extract_dimensions(element.required.names, skypix_index=self.common_skypix.name)
1001 .where(sqlalchemy.and_(*where_terms))
1002 )