Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / _manager.py: 0%
560 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1from __future__ import annotations
3__all__ = ("ByDimensionsDatasetRecordStorageManagerUUID",)
5import dataclasses
6import datetime
7import logging
8from collections import defaultdict
9from collections.abc import Iterable, Mapping, Sequence, Set
10from typing import TYPE_CHECKING, Any, ClassVar
12import astropy.time
13import sqlalchemy
15from lsst.utils.iteration import chunk_iterable
17from .... import ddl
18from ...._collection_type import CollectionType
19from ...._dataset_ref import DatasetId, DatasetIdFactory, DatasetIdGenEnum, DatasetRef
20from ...._dataset_type import DatasetType, get_dataset_type_name
21from ...._exceptions import CollectionTypeError, MissingDatasetTypeError
22from ...._exceptions_legacy import DatasetTypeError
23from ...._timespan import Timespan
24from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse
25from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only
26from ....queries import QueryFactoryFunction
27from ....queries import tree as qt # new query system, both clients + server
28from ..._caching_context import CachingContext
29from ..._collection_summary import CollectionSummary
30from ..._exceptions import ConflictingDefinitionError, DatasetTypeExpressionError, OrphanedRecordError
31from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple
32from ...wildcards import DatasetTypeWildcard
33from ._dataset_type_cache import DatasetTypeCache
34from .summaries import CollectionSummaryManager
35from .tables import (
36 DynamicTables,
37 StaticDatasetTableSpecTuple,
38 StaticDatasetTablesTuple,
39 addDatasetForeignKey,
40 makeStaticTableSpecs,
41 makeTagTableSpec,
42)
44if TYPE_CHECKING:
45 from ...interfaces import (
46 CollectionManager,
47 CollectionRecord,
48 Database,
49 DimensionRecordStorageManager,
50 StaticTablesContext,
51 )
54# This has to be updated on every schema change
55# TODO: 1.0.0 can be removed once all repos were migrated to 2.0.0.
56_VERSION_UUID = VersionTuple(1, 0, 0)
57# Starting with 2.0.0 the `ingest_date` column type uses nanoseconds instead
58# of TIMESTAMP. The code supports both 1.0.0 and 2.0.0 for the duration of
59# client migration period.
60_VERSION_UUID_NS = VersionTuple(2, 0, 0)
62_LOG = logging.getLogger(__name__)
65@dataclasses.dataclass
66class _DatasetTypeRecord:
67 """Contents of a single dataset type record."""
69 dataset_type: DatasetType
70 dataset_type_id: int
71 dimensions_key: int
72 tag_table_name: str
73 calib_table_name: str | None
75 def make_dynamic_tables(self) -> DynamicTables:
76 return DynamicTables(
77 self.dataset_type.dimensions, self.dimensions_key, self.tag_table_name, self.calib_table_name
78 )
80 def update_dynamic_tables(self, current: DynamicTables) -> DynamicTables:
81 assert self.dimensions_key == current.dimensions_key
82 assert self.tag_table_name == current.tags_name
83 if self.calib_table_name is not None:
84 if current.calibs_name is not None:
85 assert self.calib_table_name == current.calibs_name
86 else:
87 # Some previously-cached dataset type had the same dimensions
88 # but was not a calibration.
89 current = current.copy(calibs_name=self.calib_table_name)
90 # If some previously-cached dataset type was a calibration but this
91 # one isn't, we don't want to forget the calibs table.
92 return current
95@dataclasses.dataclass
96class _DatasetRecordStorage:
97 """Information cached about a dataset type.
99 This combines information cached with different keys - the dataset type
100 and its ID are cached by name, while the tables are cached by the dataset
101 types dimensions (and hence shared with other dataset types that have the
102 same dimensions).
103 """
105 dataset_type: DatasetType
106 dataset_type_id: int
107 dynamic_tables: DynamicTables
110class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager):
111 """A manager class for datasets that uses one dataset-collection table for
112 each group of dataset types that share the same dimensions.
114 In addition to the table organization, this class makes a number of
115 other design choices that would have been cumbersome (to say the least) to
116 try to pack into its name:
118 - It uses a private surrogate integer autoincrement field to identify
119 dataset types, instead of using the name as the primary and foreign key
120 directly.
122 - It aggressively loads all DatasetTypes into memory instead of fetching
123 them from the database only when needed or attempting more clever forms
124 of caching.
126 Alternative implementations that make different choices for these while
127 keeping the same general table organization might be reasonable as well.
129 Parameters
130 ----------
131 db : `Database`
132 Interface to the underlying database engine and namespace.
133 collections : `CollectionManager`
134 Manager object for the collections in this `Registry`.
135 dimensions : `DimensionRecordStorageManager`
136 Manager object for the dimensions in this `Registry`.
137 static : `StaticDatasetTablesTuple`
138 Named tuple of `sqlalchemy.schema.Table` instances for all static
139 tables used by this class.
140 summaries : `CollectionSummaryManager`
141 Structure containing tables that summarize the contents of collections.
142 registry_schema_version : `VersionTuple` or `None`, optional
143 Version of registry schema.
144 _cache : `None`, optional
145 For internal use only.
146 """
148 def __init__(
149 self,
150 *,
151 db: Database,
152 collections: CollectionManager,
153 dimensions: DimensionRecordStorageManager,
154 static: StaticDatasetTablesTuple,
155 summaries: CollectionSummaryManager,
156 registry_schema_version: VersionTuple | None = None,
157 _cache: DatasetTypeCache | None = None,
158 ):
159 super().__init__(registry_schema_version=registry_schema_version)
160 self._db = db
161 self._collections = collections
162 self._dimensions = dimensions
163 self._static = static
164 self._summaries = summaries
165 self._cache = _cache if _cache is not None else DatasetTypeCache()
166 self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai
167 self._run_key_column = collections.getRunForeignKeyName()
169 _versions: ClassVar[list[VersionTuple]] = [_VERSION_UUID, _VERSION_UUID_NS]
171 _id_maker: ClassVar[DatasetIdFactory] = DatasetIdFactory()
172 """Factory for dataset IDs. In the future this factory may be shared with
173 other classes (e.g. Registry).
174 """
176 @classmethod
177 def initialize(
178 cls,
179 db: Database,
180 context: StaticTablesContext,
181 *,
182 collections: CollectionManager,
183 dimensions: DimensionRecordStorageManager,
184 caching_context: CachingContext,
185 registry_schema_version: VersionTuple | None = None,
186 ) -> DatasetRecordStorageManager:
187 # Docstring inherited from DatasetRecordStorageManager.
188 specs = cls.makeStaticTableSpecs(
189 type(collections), universe=dimensions.universe, schema_version=registry_schema_version
190 )
191 static: StaticDatasetTablesTuple = context.addTableTuple(specs) # type: ignore
192 summaries = CollectionSummaryManager.initialize(
193 db,
194 context,
195 collections=collections,
196 dimensions=dimensions,
197 dataset_type_table=static.dataset_type,
198 caching_context=caching_context,
199 )
200 return cls(
201 db=db,
202 collections=collections,
203 dimensions=dimensions,
204 static=static,
205 summaries=summaries,
206 registry_schema_version=registry_schema_version,
207 )
209 @classmethod
210 def currentVersions(cls) -> list[VersionTuple]:
211 # Docstring inherited from VersionedExtension.
212 return cls._versions
214 @classmethod
215 def makeStaticTableSpecs(
216 cls,
217 collections: type[CollectionManager],
218 universe: DimensionUniverse,
219 schema_version: VersionTuple | None,
220 ) -> StaticDatasetTableSpecTuple:
221 """Construct all static tables used by the classes in this package.
223 Static tables are those that are present in all Registries and do not
224 depend on what DatasetTypes have been registered.
226 Parameters
227 ----------
228 collections : `CollectionManager`
229 Manager object for the collections in this `Registry`.
230 universe : `DimensionUniverse`
231 Universe graph containing all dimensions known to this `Registry`.
232 schema_version : `VersionTuple` or `None`
233 Version of the schema that should be created, if `None` then
234 default schema should be used.
236 Returns
237 -------
238 specs : `StaticDatasetTablesTuple`
239 A named tuple containing `ddl.TableSpec` instances.
240 """
241 schema_version = cls.clsNewSchemaVersion(schema_version)
242 assert schema_version is not None, "New schema version cannot be None"
243 return makeStaticTableSpecs(
244 collections,
245 universe=universe,
246 schema_version=schema_version,
247 )
249 @classmethod
250 def addDatasetForeignKey(
251 cls,
252 tableSpec: ddl.TableSpec,
253 *,
254 name: str = "dataset",
255 constraint: bool = True,
256 onDelete: str | None = None,
257 **kwargs: Any,
258 ) -> ddl.FieldSpec:
259 # Docstring inherited from DatasetRecordStorageManager.
260 return addDatasetForeignKey(tableSpec, name=name, onDelete=onDelete, constraint=constraint, **kwargs)
262 @classmethod
263 def _newDefaultSchemaVersion(cls) -> VersionTuple:
264 # Docstring inherited from VersionedExtension.
265 return _VERSION_UUID_NS
267 def clone(
268 self,
269 *,
270 db: Database,
271 collections: CollectionManager,
272 dimensions: DimensionRecordStorageManager,
273 caching_context: CachingContext,
274 ) -> ByDimensionsDatasetRecordStorageManagerUUID:
275 return ByDimensionsDatasetRecordStorageManagerUUID(
276 db=db,
277 collections=collections,
278 dimensions=dimensions,
279 static=self._static,
280 summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context),
281 registry_schema_version=self._registry_schema_version,
282 # See notes on DatasetTypeCache.clone() about cache behavior after
283 # cloning.
284 _cache=self._cache.clone(),
285 )
287 def refresh(self) -> None:
288 # Docstring inherited from DatasetRecordStorageManager.
289 self._cache.clear()
291 def remove_dataset_type(self, name: str) -> None:
292 # Docstring inherited from DatasetRecordStorageManager.
293 compositeName, componentName = DatasetType.splitDatasetTypeName(name)
294 if componentName is not None:
295 raise ValueError(f"Cannot delete a dataset type of a component of a composite (given {name})")
297 # Delete the row
298 try:
299 self._db.delete(self._static.dataset_type, ["name"], {"name": name})
300 except sqlalchemy.exc.IntegrityError as e:
301 raise OrphanedRecordError(
302 f"Dataset type {name} can not be removed."
303 " It is associated with datasets that must be removed first."
304 ) from e
306 # Now refresh everything -- removal is rare enough that this does
307 # not need to be fast.
308 self.refresh()
310 def get_dataset_type(self, name: str) -> DatasetType:
311 # Docstring inherited from DatasetRecordStorageManager.
312 return self._find_storage(name).dataset_type
314 def register_dataset_type(self, dataset_type: DatasetType) -> bool:
315 # Docstring inherited from DatasetRecordStorageManager.
316 #
317 # This is one of the places where we populate the dataset type cache.
318 # See the comment in _fetch_dataset_types for how these are related and
319 # invariants they must maintain.
320 #
321 if dataset_type.isComponent():
322 raise ValueError(
323 f"Component dataset types can not be stored in registry. Rejecting {dataset_type.name}"
324 )
326 # If database universe and dimension group universe are different it
327 # can cause unexpected effects.
328 if dataset_type.dimensions.universe is not self._dimensions.universe:
329 raise ValueError(
330 "Incompatible dimension universe versions - "
331 f"database universe: {self._dimensions.universe}, "
332 f"dataset type universe: {dataset_type.dimensions.universe}."
333 )
335 record = self._fetch_dataset_type_record(dataset_type.name)
336 if record is None:
337 if (dynamic_tables := self._cache.get_by_dimensions(dataset_type.dimensions)) is None:
338 dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions)
339 dynamic_tables = DynamicTables.from_dimensions_key(
340 dataset_type.dimensions, dimensions_key, dataset_type.isCalibration()
341 )
342 dynamic_tables.create(self._db, type(self._collections), self._cache.tables)
343 elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None:
344 dynamic_tables = dynamic_tables.add_calibs(
345 self._db, type(self._collections), self._cache.tables
346 )
347 row, inserted = self._db.sync(
348 self._static.dataset_type,
349 keys={"name": dataset_type.name},
350 compared={
351 "dimensions_key": dynamic_tables.dimensions_key,
352 # Force the storage class to be loaded to ensure it
353 # exists and there is no typo in the name.
354 "storage_class": dataset_type.storageClass.name,
355 },
356 extra={
357 "tag_association_table": dynamic_tables.tags_name,
358 "calibration_association_table": (
359 dynamic_tables.calibs_name if dataset_type.isCalibration() else None
360 ),
361 },
362 returning=["id", "tag_association_table"],
363 )
364 # Make sure that cache is updated
365 if row is not None:
366 self._cache.add(dataset_type, row["id"])
367 self._cache.add_by_dimensions(dataset_type.dimensions, dynamic_tables)
368 else:
369 if dataset_type != record.dataset_type:
370 raise ConflictingDefinitionError(
371 f"Given dataset type {dataset_type} is inconsistent "
372 f"with database definition {record.dataset_type}."
373 )
374 inserted = False
375 return bool(inserted)
377 def resolve_wildcard(
378 self,
379 expression: Any,
380 missing: list[str] | None = None,
381 explicit_only: bool = False,
382 ) -> list[DatasetType]:
383 wildcard = DatasetTypeWildcard.from_expression(expression)
384 result: list[DatasetType] = []
385 for name, dataset_type in wildcard.values.items():
386 parent_name, component_name = DatasetType.splitDatasetTypeName(name)
387 if component_name is not None:
388 raise DatasetTypeError(
389 "Component dataset types are not supported in Registry methods; use DatasetRef or "
390 "DatasetType methods to obtain components from parents instead."
391 )
392 try:
393 resolved_dataset_type = self.get_dataset_type(parent_name)
394 except MissingDatasetTypeError:
395 if missing is not None:
396 missing.append(name)
397 else:
398 if dataset_type is not None:
399 if dataset_type.is_compatible_with(resolved_dataset_type):
400 # Prefer the given dataset type to enable storage class
401 # conversions.
402 resolved_dataset_type = dataset_type
403 else:
404 raise DatasetTypeError(
405 f"Dataset type definition in query expression {dataset_type} is "
406 f"not compatible with the registered type {resolved_dataset_type}."
407 )
408 result.append(resolved_dataset_type)
409 if wildcard.patterns is ...:
410 if explicit_only:
411 raise TypeError(
412 "Universal wildcard '...' is not permitted for dataset types in this context."
413 )
414 for datasetType in self._fetch_dataset_types():
415 result.append(datasetType)
416 elif wildcard.patterns:
417 if explicit_only:
418 raise DatasetTypeExpressionError(
419 "Dataset type wildcard expressions are not supported in this context."
420 )
421 dataset_types = self._fetch_dataset_types()
422 for datasetType in dataset_types:
423 if any(p.fullmatch(datasetType.name) for p in wildcard.patterns):
424 result.append(datasetType)
426 return result
428 def get_dataset_refs(self, ids: list[DatasetId]) -> list[DatasetRef]:
429 dataset_type_map: dict[DatasetId, DatasetType] = {}
430 for batch in chunk_iterable(set(ids), 50000):
431 # Look up the dataset types corresponding to the given Dataset IDs.
432 id_col = self._static.dataset.columns["id"]
433 sql = sqlalchemy.sql.select(
434 id_col,
435 self._static.dataset.columns["dataset_type_id"],
436 ).where(id_col.in_(batch))
437 with self._db.query(sql) as sql_result:
438 dataset_rows = sql_result.mappings().all()
439 for row in dataset_rows:
440 dataset_type_map[row["id"]] = self._get_dataset_type_by_id(row["dataset_type_id"])
442 # Group the given dataset IDs by the DimensionGroup of their dataset
443 # types -- there is a separate tags table for each DimensionGroup.
444 dimension_groups = defaultdict[DimensionGroup, set[DatasetId]](set)
445 for id, dataset_type in dataset_type_map.items():
446 dimension_groups[dataset_type.dimensions].add(id)
448 output_refs: list[DatasetRef] = []
449 for dimension_group, datasets in dimension_groups.items():
450 # Query the tags table for each dimension group to look up the
451 # data IDs corresponding to the UUIDs found from the dataset table.
452 dynamic_tables = self._get_dynamic_tables(dimension_group)
453 tags_table = self._get_tags_table(dynamic_tables)
454 for batch in chunk_iterable(datasets, 50000):
455 tags_sql = tags_table.select().where(tags_table.columns["dataset_id"].in_(batch))
456 # Join in the collection table to fetch the run name.
457 collection_column = tags_table.columns[self._collections.getCollectionForeignKeyName()]
458 joined_collections = self._collections.join_collections_sql(collection_column, tags_sql)
459 tags_sql = joined_collections.joined_sql
460 run_name_column = joined_collections.name_column
461 tags_sql = tags_sql.add_columns(run_name_column)
462 # Tags table includes run collections and tagged
463 # collections.
464 # In theory the data ID for a given dataset should be the
465 # same in both, but nothing actually guarantees this.
466 # So skip any tagged collections, using the run collection
467 # as the definitive definition.
468 tags_sql = tags_sql.where(joined_collections.type_column == int(CollectionType.RUN))
470 with self._db.query(tags_sql) as sql_result:
471 data_id_rows = sql_result.mappings().all()
473 assert run_name_column.key is not None
474 for data_id_row in data_id_rows:
475 id = data_id_row["dataset_id"]
476 dataset_type = dataset_type_map[id]
477 run_name = data_id_row[run_name_column.key]
478 data_id = DataCoordinate.from_required_values(
479 dimension_group,
480 tuple(data_id_row[dimension] for dimension in dimension_group.required),
481 )
482 ref = DatasetRef(
483 datasetType=dataset_type,
484 dataId=data_id,
485 id=id,
486 run=run_name,
487 )
488 output_refs.append(ref)
490 return output_refs
492 def _fetch_dataset_type_record(self, name: str) -> _DatasetTypeRecord | None:
493 """Retrieve all dataset types defined in database.
495 Yields
496 ------
497 dataset_types : `_DatasetTypeRecord`
498 Information from a single database record.
499 """
500 c = self._static.dataset_type.columns
501 stmt = self._static.dataset_type.select().where(c.name == name)
502 with self._db.query(stmt) as sql_result:
503 row = sql_result.mappings().one_or_none()
504 if row is None:
505 return None
506 else:
507 return self._record_from_row(row)
509 def _record_from_row(self, row: Mapping) -> _DatasetTypeRecord:
510 name = row["name"]
511 dimensions = self._dimensions.load_dimension_group(row["dimensions_key"])
512 calibTableName = row["calibration_association_table"]
513 datasetType = DatasetType(
514 name, dimensions, row["storage_class"], isCalibration=(calibTableName is not None)
515 )
516 return _DatasetTypeRecord(
517 dataset_type=datasetType,
518 dataset_type_id=row["id"],
519 dimensions_key=row["dimensions_key"],
520 tag_table_name=row["tag_association_table"],
521 calib_table_name=calibTableName,
522 )
524 def _dataset_type_from_row(self, row: Mapping) -> DatasetType:
525 return self._record_from_row(row).dataset_type
527 def preload_cache(self) -> None:
528 self._fetch_dataset_types()
530 def _fetch_dataset_types(self, force_refresh: bool = False) -> list[DatasetType]:
531 """Fetch list of all defined dataset types."""
532 # This is one of two places we populate the dataset type cache:
533 #
534 # - This method handles almost all requests for dataset types that
535 # should already exist. It always marks the cache as "full" in both
536 # dataset type names and dimensions.
537 #
538 # - register_dataset_type handles the case where the dataset type might
539 # not existing yet. Since it can only add a single dataset type, it
540 # never changes whether the cache is full.
541 #
542 # In both cases, we require that the per-dimensions data be cached
543 # whenever a dataset type is added to the cache by name, to reduce the
544 # number of possible states the cache can be in and minimize the number
545 # of queries.
546 if self._cache.full and not force_refresh:
547 return [dataset_type for dataset_type, _ in self._cache.items()]
548 with self._db.query(self._static.dataset_type.select()) as sql_result:
549 sql_rows = sql_result.mappings().fetchall()
550 records = [self._record_from_row(row) for row in sql_rows]
551 # Cache everything and specify that cache is complete.
552 cache_data: list[tuple[DatasetType, int]] = []
553 cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {}
554 for record in records:
555 cache_data.append((record.dataset_type, record.dataset_type_id))
556 if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None:
557 tables = record.make_dynamic_tables()
558 else:
559 tables = record.update_dynamic_tables(dynamic_tables)
560 cache_dimensions_data[record.dataset_type.dimensions] = tables
561 self._cache.set(
562 cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True
563 )
564 return [record.dataset_type for record in records]
566 def _get_dataset_type_by_id(self, id: int) -> DatasetType:
567 dt = self._cache.get_by_id(id)
568 if dt is None:
569 # Since the ID is not a concept exposed to the public API, it
570 # had to have come from a dataset table, and our cache must be
571 # empty or out of date.
572 self._fetch_dataset_types(force_refresh=True)
573 dt = self._cache.get_by_id(id)
574 if dt is None:
575 raise RuntimeError(f"Failed to look up dataset type with ID {id}")
576 return dt
578 def _find_storage(self, name: str) -> _DatasetRecordStorage:
579 """Find a dataset type and the extra information needed to work with
580 it, utilizing and populating the cache as needed.
581 """
582 dataset_type, dataset_type_id = self._cache.get(name)
583 if dataset_type is not None:
584 tables = self._get_dynamic_tables(dataset_type.dimensions)
585 assert dataset_type_id is not None, "Dataset type cache population is incomplete."
586 return _DatasetRecordStorage(
587 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables
588 )
589 else:
590 # On the first cache miss populate the cache with complete list
591 # of dataset types (if it was not done yet).
592 if not self._cache.full:
593 self._fetch_dataset_types()
594 # Try again
595 dataset_type, dataset_type_id = self._cache.get(name)
596 if dataset_type is not None:
597 tables = self._get_dynamic_tables(dataset_type.dimensions)
598 assert dataset_type_id is not None, "Dataset type cache population is incomplete."
599 return _DatasetRecordStorage(
600 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables
601 )
602 record = self._fetch_dataset_type_record(name)
603 if record is not None:
604 self._cache.add(record.dataset_type, record.dataset_type_id)
605 tables = record.make_dynamic_tables()
606 self._cache.add_by_dimensions(record.dataset_type.dimensions, tables)
607 return _DatasetRecordStorage(record.dataset_type, record.dataset_type_id, tables)
608 raise MissingDatasetTypeError(f"Dataset type {name!r} does not exist.")
610 def _get_dynamic_tables(self, dimensions: DimensionGroup) -> DynamicTables:
611 tables = self._cache.get_by_dimensions(dimensions)
612 assert tables is not None, (
613 "_fetch_dataset_types is supposed to guarantee that the tables cache is populated."
614 )
615 return tables
617 def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary:
618 # Docstring inherited from DatasetRecordStorageManager.
619 summaries = self._summaries.fetch_summaries([collection], None, self._dataset_type_from_row)
620 return summaries[collection.key]
622 def fetch_summaries(
623 self,
624 collections: Iterable[CollectionRecord],
625 dataset_types: Iterable[DatasetType] | Iterable[str] | None = None,
626 ) -> Mapping[Any, CollectionSummary]:
627 # Docstring inherited from DatasetRecordStorageManager.
628 dataset_type_names: Iterable[str] | None = None
629 if dataset_types is not None:
630 dataset_type_names = set(get_dataset_type_name(dt) for dt in dataset_types)
631 return self._summaries.fetch_summaries(collections, dataset_type_names, self._dataset_type_from_row)
633 def fetch_run_dataset_ids(self, run: RunRecord) -> list[DatasetId]:
634 # Docstring inherited.
635 sql = sqlalchemy.select(self._static.dataset.c.id).where(
636 self._static.dataset.c[self._run_key_column] == run.key
637 )
638 with self._db.query(sql) as result:
639 return list(result.scalars())
641 def ingest_date_dtype(self) -> type:
642 """Return type of the ``ingest_date`` column."""
643 schema_version = self.newSchemaVersion()
644 if schema_version is not None and schema_version.major > 1:
645 return ddl.AstropyTimeNsecTai
646 else:
647 return sqlalchemy.TIMESTAMP
649 def insert(
650 self,
651 dataset_type_name: str,
652 run: RunRecord,
653 data_ids: Iterable[DataCoordinate],
654 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
655 ) -> list[DatasetRef]:
656 # Docstring inherited from DatasetRecordStorageManager.
657 if (storage := self._find_storage(dataset_type_name)) is None:
658 raise MissingDatasetTypeError(f"Dataset type {dataset_type_name!r} has not been registered.")
659 # Current timestamp, type depends on schema version. Use microsecond
660 # precision for astropy time to keep things consistent with
661 # TIMESTAMP(6) SQL type.
662 timestamp: datetime.datetime | astropy.time.Time
663 if self._use_astropy_ingest_date:
664 # Astropy `now()` precision should be the same as `now()` which
665 # should mean microsecond.
666 timestamp = astropy.time.Time.now()
667 else:
668 timestamp = datetime.datetime.now(datetime.UTC)
670 # Iterate over data IDs, transforming a possibly-single-pass iterable
671 # into a list.
672 data_id_list: list[DataCoordinate] = []
673 rows = []
674 summary = CollectionSummary()
675 for dataId in summary.add_data_ids_generator(storage.dataset_type, data_ids):
676 data_id_list.append(dataId)
677 rows.append(
678 {
679 "id": self._id_maker.makeDatasetId(
680 run.name, storage.dataset_type, dataId, id_generation_mode
681 ),
682 "dataset_type_id": storage.dataset_type_id,
683 self._run_key_column: run.key,
684 "ingest_date": timestamp,
685 }
686 )
687 if not rows:
688 # Just in case an empty collection is provided we want to avoid
689 # adding dataset type to summary tables.
690 return []
692 with self._db.transaction():
693 # Insert into the static dataset table.
694 self._db.insert(self._static.dataset, *rows)
695 # Update the summary tables for this collection in case this is the
696 # first time this dataset type or these governor values will be
697 # inserted there.
698 self._summaries.update(run, [storage.dataset_type_id], summary)
699 # Combine the generated dataset_id values and data ID fields to
700 # form rows to be inserted into the tags table.
701 protoTagsRow = {
702 "dataset_type_id": storage.dataset_type_id,
703 self._collections.getCollectionForeignKeyName(): run.key,
704 }
705 tagsRows = [
706 dict(protoTagsRow, dataset_id=row["id"], **dataId.required)
707 for dataId, row in zip(data_id_list, rows, strict=True)
708 ]
709 # Insert those rows into the tags table.
710 self._db.insert(self._get_tags_table(storage.dynamic_tables), *tagsRows)
712 return [
713 DatasetRef(
714 datasetType=storage.dataset_type,
715 dataId=dataId,
716 id=row["id"],
717 run=run.name,
718 )
719 for dataId, row in zip(data_id_list, rows, strict=True)
720 ]
722 def import_(self, run: RunRecord, refs: list[DatasetRef], assume_new: bool = False) -> None:
723 # Docstring inherited from DatasetRecordStorageManager.
724 if not refs:
725 # Just in case an empty mapping is provided we want to avoid
726 # adding dataset type to summary tables.
727 return
728 assert all(ref.run == run.name for ref in refs), (
729 "Run names in refs must match the run we are inserting into"
730 )
732 dataset_types = {ref.datasetType for ref in refs}
733 dimensions = _ensure_dimension_groups_match(dataset_types)
734 dataset_type_storage: dict[str, _DatasetRecordStorage] = {}
735 for dt in dataset_types:
736 if (storage := self._find_storage(dt.name)) is None:
737 raise MissingDatasetTypeError(f"Dataset type {dt.name!r} has not been registered.")
738 dataset_type_storage[dt.name] = storage
740 dynamic_tables = self._get_dynamic_tables(dimensions)
741 tags_table = self._get_tags_table(dynamic_tables)
742 # Current timestamp, type depends on schema version.
743 if self._use_astropy_ingest_date:
744 # Astropy `now()` precision should be the same as `now()` which
745 # should mean microsecond.
746 timestamp = sqlalchemy.sql.literal(astropy.time.Time.now(), type_=ddl.AstropyTimeNsecTai)
747 else:
748 timestamp = sqlalchemy.sql.literal(datetime.datetime.now(datetime.UTC))
749 collection_fkey_name = self._collections.getCollectionForeignKeyName()
750 tags_rows = [
751 {
752 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id,
753 collection_fkey_name: run.key,
754 "dataset_id": ref.id,
755 **ref.dataId.required,
756 }
757 for ref in refs
758 ]
759 if assume_new:
760 self._import_new(run, refs, dataset_type_storage, tags_table, tags_rows, timestamp)
761 else:
762 self._import_guarded(
763 run, refs, dimensions, dataset_type_storage, tags_table, tags_rows, timestamp
764 )
766 def _import_guarded(
767 self,
768 run: RunRecord,
769 refs: list[DatasetRef],
770 dimensions: DimensionGroup,
771 dataset_type_storage: dict[str, _DatasetRecordStorage],
772 tags_table: sqlalchemy.Table,
773 tags_rows: list[dict[str, object]],
774 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime],
775 ) -> None:
776 # We'll insert all new rows into a temporary table
777 table_spec = makeTagTableSpec(dimensions, type(self._collections), constraints=False)
778 collection_fkey_name = self._collections.getCollectionForeignKeyName()
779 with self._db.transaction(for_temp_tables=True), self._db.temporary_table(table_spec) as tmp_tags:
780 # store all incoming data in a temporary table
781 self._db.insert(tmp_tags, *tags_rows)
782 # There are some checks that we want to make for consistency
783 # of the new datasets with existing ones.
784 self._validate_import(dimensions, tags_table, tmp_tags, run)
785 # Before we merge temporary table into dataset/tags we need to
786 # drop datasets which are already there (and do not conflict).
787 self._db.deleteWhere(
788 tmp_tags,
789 tmp_tags.columns.dataset_id.in_(sqlalchemy.sql.select(self._static.dataset.columns.id)),
790 )
791 # Copy it into dataset table, need to re-label some columns.
792 self._db.insert(
793 self._static.dataset,
794 select=sqlalchemy.sql.select(
795 tmp_tags.columns.dataset_id.label("id"),
796 tmp_tags.columns.dataset_type_id,
797 tmp_tags.columns[collection_fkey_name].label(self._run_key_column),
798 timestamp.label("ingest_date"),
799 ),
800 )
801 self._update_summaries(run, refs, dataset_type_storage)
802 # Copy from temp table into tags table.
803 self._db.insert(tags_table, select=tmp_tags.select())
805 def _update_summaries(
806 self, run: RunRecord, refs: list[DatasetRef], dataset_type_storage: dict[str, _DatasetRecordStorage]
807 ) -> None:
808 summary = CollectionSummary()
809 summary.add_datasets(refs)
810 self._summaries.update(
811 run, [storage.dataset_type_id for storage in dataset_type_storage.values()], summary
812 )
814 def _validate_import(
815 self,
816 dimensions: DimensionGroup,
817 tags: sqlalchemy.schema.Table,
818 tmp_tags: sqlalchemy.schema.Table,
819 run: RunRecord,
820 ) -> None:
821 """Validate imported refs against existing datasets.
823 Parameters
824 ----------
825 dimensions : `DimensionGroup`
826 Dimensions to validate.
827 tags : `sqlalchemy.schema.Table`
828 ???
829 tmp_tags : `sqlalchemy.schema.Table`
830 Temporary table with new datasets and the same schema as tags
831 table.
832 run : `RunRecord`
833 The record object describing the `~CollectionType.RUN` collection.
835 Raises
836 ------
837 ConflictingDefinitionError
838 Raise if new datasets conflict with existing ones.
839 """
840 dataset = self._static.dataset
841 collection_fkey_name = self._collections.getCollectionForeignKeyName()
843 # Check that existing datasets have the same dataset type and
844 # run.
845 query = (
846 sqlalchemy.sql.select(
847 dataset.columns.id.label("dataset_id"),
848 dataset.columns.dataset_type_id.label("dataset_type_id"),
849 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"),
850 dataset.columns[self._run_key_column].label("run"),
851 tmp_tags.columns[collection_fkey_name].label("new_run"),
852 )
853 .select_from(dataset.join(tmp_tags, dataset.columns.id == tmp_tags.columns.dataset_id))
854 .where(
855 sqlalchemy.sql.or_(
856 dataset.columns.dataset_type_id != tmp_tags.columns.dataset_type_id,
857 dataset.columns[self._run_key_column] != tmp_tags.columns[collection_fkey_name],
858 )
859 )
860 .limit(1)
861 )
862 with self._db.query(query) as result:
863 # Only include the first one in the exception message
864 if (row := result.first()) is not None:
865 existing_run = self._collections[row.run].name
866 new_run = self._collections[row.new_run].name
867 if row.dataset_type_id == row.new_dataset_type_id:
868 raise ConflictingDefinitionError(
869 f"Current run {existing_run!r} and new run {new_run!r} do not agree for "
870 f"dataset {row.dataset_id}."
871 )
872 else:
873 raise ConflictingDefinitionError(
874 f"Dataset {row.dataset_id} was provided with type ID {row.new_dataset_type_id} "
875 f"in run {new_run!r}, but was already defined with type ID "
876 f"{row.dataset_type_id} in run {run!r}."
877 )
879 # Check that matching dataset in tags table has the same DataId.
880 query = (
881 sqlalchemy.sql.select(
882 tags.columns.dataset_id,
883 tags.columns.dataset_type_id.label("type_id"),
884 tmp_tags.columns.dataset_type_id.label("new_type_id"),
885 *[tags.columns[dim] for dim in dimensions.required],
886 *[tmp_tags.columns[dim].label(f"new_{dim}") for dim in dimensions.required],
887 )
888 .select_from(tags.join(tmp_tags, tags.columns.dataset_id == tmp_tags.columns.dataset_id))
889 .where(
890 sqlalchemy.sql.or_(
891 tags.columns.dataset_type_id != tmp_tags.columns.dataset_type_id,
892 *[tags.columns[dim] != tmp_tags.columns[dim] for dim in dimensions.required],
893 )
894 )
895 .limit(1)
896 )
898 with self._db.query(query) as result:
899 if (row := result.first()) is not None:
900 # Only include the first one in the exception message
901 raise ConflictingDefinitionError(
902 f"Existing dataset type or dataId do not match new dataset: {row._asdict()}"
903 )
905 # Check that matching run+dataId have the same dataset ID.
906 query = (
907 sqlalchemy.sql.select(
908 *[tags.columns[dim] for dim in dimensions.required],
909 tags.columns.dataset_id,
910 tmp_tags.columns.dataset_id.label("new_dataset_id"),
911 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"),
912 tags.columns[collection_fkey_name],
913 tmp_tags.columns[collection_fkey_name].label(f"new_{collection_fkey_name}"),
914 )
915 .select_from(
916 tags.join(
917 tmp_tags,
918 sqlalchemy.sql.and_(
919 tags.columns.dataset_type_id == tmp_tags.columns.dataset_type_id,
920 tags.columns[collection_fkey_name] == tmp_tags.columns[collection_fkey_name],
921 *[tags.columns[dim] == tmp_tags.columns[dim] for dim in dimensions.required],
922 ),
923 )
924 )
925 .where(tags.columns.dataset_id != tmp_tags.columns.dataset_id)
926 .limit(1)
927 )
928 with self._db.query(query) as result:
929 # only include the first one in the exception message
930 if (row := result.first()) is not None:
931 data_id = {dim: getattr(row, dim) for dim in dimensions.required}
932 existing_collection = self._collections[getattr(row, collection_fkey_name)].name
933 new_collection = self._collections[getattr(row, f"new_{collection_fkey_name}")].name
934 raise ConflictingDefinitionError(
935 f"Dataset with type ID {row.new_dataset_type_id} and data ID {data_id} "
936 f"has ID {row.dataset_id} in existing collection {existing_collection!r} "
937 f"but ID {row.new_dataset_id} in new collection {new_collection!r}."
938 )
940 def _import_new(
941 self,
942 run: RunRecord,
943 refs: list[DatasetRef],
944 dataset_type_storage: dict[str, _DatasetRecordStorage],
945 tags_table: sqlalchemy.Table,
946 tags_rows: list[dict[str, object]],
947 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime],
948 ) -> None:
949 static_rows = [
950 {
951 "id": ref.id,
952 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id,
953 self._run_key_column: run.key,
954 "ingest_date": timestamp.value,
955 }
956 for ref in refs
957 ]
958 with self._db.transaction():
959 self._db.insert(self._static.dataset, *static_rows)
960 self._update_summaries(run, refs, dataset_type_storage)
961 self._db.insert(tags_table, *tags_rows)
963 def delete(self, datasets: Iterable[DatasetId | DatasetRef]) -> None:
964 # Docstring inherited from DatasetRecordStorageManager.
965 # Only delete from common dataset table; ON DELETE foreign key clauses
966 # will handle the rest.
967 self._db.delete(
968 self._static.dataset,
969 ["id"],
970 *[{"id": getattr(dataset, "id", dataset)} for dataset in datasets],
971 )
973 def associate(
974 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef]
975 ) -> None:
976 # Docstring inherited from DatasetRecordStorageManager.
977 if (storage := self._find_storage(dataset_type.name)) is None:
978 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
979 if collection.type is not CollectionType.TAGGED:
980 raise CollectionTypeError(
981 f"Cannot associate into collection '{collection.name}' "
982 f"of type {collection.type.name}; must be TAGGED."
983 )
984 proto_row = {
985 self._collections.getCollectionForeignKeyName(): collection.key,
986 "dataset_type_id": storage.dataset_type_id,
987 }
988 rows = []
989 summary = CollectionSummary()
990 for dataset in summary.add_datasets_generator(datasets):
991 rows.append(dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required))
992 if rows:
993 # Update the summary tables for this collection in case this is the
994 # first time this dataset type or these governor values will be
995 # inserted there.
996 self._summaries.update(collection, [storage.dataset_type_id], summary)
997 # Update the tag table itself.
998 self._db.replace(self._get_tags_table(storage.dynamic_tables), *rows)
1000 def disassociate(
1001 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef]
1002 ) -> None:
1003 # Docstring inherited from DatasetRecordStorageManager.
1004 if (storage := self._find_storage(dataset_type.name)) is None:
1005 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
1006 if collection.type is not CollectionType.TAGGED:
1007 raise CollectionTypeError(
1008 f"Cannot disassociate from collection '{collection.name}' "
1009 f"of type {collection.type.name}; must be TAGGED."
1010 )
1011 rows = [
1012 {
1013 "dataset_id": dataset.id,
1014 self._collections.getCollectionForeignKeyName(): collection.key,
1015 }
1016 for dataset in datasets
1017 ]
1018 self._db.delete(
1019 self._get_tags_table(storage.dynamic_tables),
1020 ["dataset_id", self._collections.getCollectionForeignKeyName()],
1021 *rows,
1022 )
1024 def certify(
1025 self,
1026 dataset_type: DatasetType,
1027 collection: CollectionRecord,
1028 datasets: Iterable[DatasetRef],
1029 timespan: Timespan,
1030 query_func: QueryFactoryFunction,
1031 ) -> None:
1032 # Docstring inherited from DatasetRecordStorageManager.
1033 if (storage := self._find_storage(dataset_type.name)) is None:
1034 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
1035 if not dataset_type.isCalibration():
1036 raise DatasetTypeError(
1037 f"Cannot certify datasets of type {dataset_type.name!r}, for which "
1038 "DatasetType.isCalibration() is False."
1039 )
1040 if collection.type is not CollectionType.CALIBRATION:
1041 raise CollectionTypeError(
1042 f"Cannot certify into collection '{collection.name}' "
1043 f"of type {collection.type.name}; must be CALIBRATION."
1044 )
1045 TimespanReprClass = self._db.getTimespanRepresentation()
1046 proto_row = {
1047 self._collections.getCollectionForeignKeyName(): collection.key,
1048 "dataset_type_id": storage.dataset_type_id,
1049 }
1050 rows = []
1051 data_ids: set[DataCoordinate] | None = (
1052 set() if not TimespanReprClass.hasExclusionConstraint() else None
1053 )
1054 summary = CollectionSummary()
1055 for dataset in summary.add_datasets_generator(datasets):
1056 row = dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required)
1057 TimespanReprClass.update(timespan, result=row)
1058 rows.append(row)
1059 if data_ids is not None:
1060 data_ids.add(dataset.dataId)
1061 if not rows:
1062 # Just in case an empty dataset collection is provided we want to
1063 # avoid adding dataset type to summary tables.
1064 return
1065 # Update the summary tables for this collection in case this is the
1066 # first time this dataset type or these governor values will be
1067 # inserted there.
1068 self._summaries.update(collection, [storage.dataset_type_id], summary)
1069 # Update the association table itself.
1070 calibs_table = self._get_calibs_table(storage.dynamic_tables)
1071 if TimespanReprClass.hasExclusionConstraint():
1072 # Rely on database constraint to enforce invariants; we just
1073 # reraise the exception for consistency across DB engines.
1074 try:
1075 self._db.insert(calibs_table, *rows)
1076 except sqlalchemy.exc.IntegrityError as err:
1077 raise ConflictingDefinitionError(
1078 f"Validity range conflict certifying datasets of type {dataset_type.name!r} "
1079 f"into {collection.name!r} for range {timespan}."
1080 ) from err
1081 else:
1082 # Have to implement exclusion constraint ourselves.
1083 # Acquire a table lock to ensure there are no concurrent writes
1084 # could invalidate our checking before we finish the inserts. We
1085 # use a SAVEPOINT in case there is an outer transaction that a
1086 # failure here should not roll back.
1087 with self._db.transaction(
1088 lock=[calibs_table],
1089 savepoint=True,
1090 # join_data_coordinates sometimes requires a temp table
1091 for_temp_tables=True,
1092 ):
1093 # Query for any rows that would overlap this one.
1094 with query_func() as query:
1095 if data_ids is not None:
1096 query = query.join_data_coordinates(data_ids)
1097 timespan_column = query.expression_factory[dataset_type.name].timespan
1098 result = query.datasets(dataset_type, collection.name, find_first=False).where(
1099 timespan_column.overlaps(timespan)
1100 )
1101 conflicting = result.count()
1102 if conflicting > 0:
1103 raise ConflictingDefinitionError(
1104 f"{conflicting} validity range conflicts certifying datasets of type "
1105 f"{dataset_type.name} into {collection.name} for range "
1106 f"[{timespan.begin}, {timespan.end})."
1107 )
1108 # Proceed with the insert.
1109 self._db.insert(calibs_table, *rows)
1111 def decertify(
1112 self,
1113 dataset_type: DatasetType,
1114 collection: CollectionRecord,
1115 timespan: Timespan,
1116 *,
1117 data_ids: Iterable[DataCoordinate] | None = None,
1118 query_func: QueryFactoryFunction,
1119 ) -> None:
1120 # Docstring inherited from DatasetRecordStorageManager.
1121 if (storage := self._find_storage(dataset_type.name)) is None:
1122 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
1123 if not dataset_type.isCalibration():
1124 raise DatasetTypeError(
1125 f"Cannot certify datasets of type {dataset_type.name!r}, for which "
1126 "DatasetType.isCalibration() is False."
1127 )
1128 if collection.type is not CollectionType.CALIBRATION:
1129 raise CollectionTypeError(
1130 f"Cannot decertify from collection '{collection.name}' "
1131 f"of type {collection.type.name}; must be CALIBRATION."
1132 )
1133 TimespanReprClass = self._db.getTimespanRepresentation()
1134 data_id_set: set[DataCoordinate] | None
1135 if data_ids is not None:
1136 data_id_set = set(data_ids)
1137 else:
1138 data_id_set = None
1140 # Set up collections to populate with the rows we'll want to modify.
1141 # The insert rows will have the same values for collection and
1142 # dataset type.
1143 proto_insert_row = {
1144 self._collections.getCollectionForeignKeyName(): collection.key,
1145 "dataset_type_id": storage.dataset_type_id,
1146 }
1147 rows_to_delete = []
1148 rows_to_insert = []
1149 # Acquire a table lock to ensure there are no concurrent writes
1150 # between the SELECT and the DELETE and INSERT queries based on it.
1151 calibs_table = self._get_calibs_table(storage.dynamic_tables)
1152 with self._db.transaction(lock=[calibs_table], savepoint=True):
1153 # Find rows overlapping our inputs.
1154 with query_func() as query:
1155 query = query.join_dataset_search(dataset_type, [collection.name])
1156 if data_id_set is not None:
1157 query = query.join_data_coordinates(data_id_set)
1158 timespan_column = query.expression_factory[dataset_type.name].timespan
1159 query = query.where(timespan_column.overlaps(timespan))
1160 result = query.general(
1161 dataset_type.dimensions,
1162 dataset_fields={dataset_type.name: {"dataset_id", "timespan"}},
1163 find_first=False,
1164 )._with_added_dataset_field(dataset_type.name, "calib_pkey")
1166 calib_pkey_key = f"{dataset_type.name}.calib_pkey"
1167 dataset_id_key = f"{dataset_type.name}.dataset_id"
1168 timespan_key = f"{dataset_type.name}.timespan"
1169 for row in result.iter_tuples():
1170 rows_to_delete.append({"id": row.raw_row[calib_pkey_key]})
1171 # Construct the insert row(s) by copying the prototype row,
1172 # then adding the dimension column values, then adding
1173 # what's left of the timespan from that row after we
1174 # subtract the given timespan.
1175 new_insert_row = proto_insert_row.copy()
1176 new_insert_row["dataset_id"] = row.raw_row[dataset_id_key]
1177 for name, value in row.data_id.required.items():
1178 new_insert_row[name] = value
1179 row_timespan = row.raw_row[timespan_key]
1180 assert row_timespan is not None, "Field should have a NOT NULL constraint."
1181 for diff_timespan in row_timespan.difference(timespan):
1182 rows_to_insert.append(
1183 TimespanReprClass.update(diff_timespan, result=new_insert_row.copy())
1184 )
1185 # Run the DELETE and INSERT queries.
1186 self._db.delete(calibs_table, ["id"], *rows_to_delete)
1187 self._db.insert(calibs_table, *rows_to_insert)
1189 def make_joins_builder(
1190 self,
1191 dataset_type: DatasetType,
1192 collections: Sequence[CollectionRecord],
1193 fields: Set[qt.AnyDatasetFieldName],
1194 is_union: bool = False,
1195 ) -> SqlJoinsBuilder:
1196 if (storage := self._find_storage(dataset_type.name)) is None:
1197 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
1198 # This method largely mimics `make_relation`, but it uses the new query
1199 # system primitives instead of the old one. In terms of the SQL
1200 # queries it builds, there are two more main differences:
1201 #
1202 # - Collection and run columns are now string names rather than IDs.
1203 # This insulates the query result-processing code from collection
1204 # caching and the collection manager subclass details.
1205 #
1206 # - The subquery always has unique rows, which is achieved by using
1207 # SELECT DISTINCT when necessary.
1208 #
1209 collection_types = {collection.type for collection in collections}
1210 assert CollectionType.CHAINED not in collection_types, "CHAINED collections must be flattened."
1211 #
1212 # There are two kinds of table in play here:
1213 #
1214 # - the static dataset table (with the dataset ID, dataset type ID,
1215 # run ID/name, and ingest date);
1216 #
1217 # - the dynamic tags/calibs table (with the dataset ID, dataset type
1218 # type ID, collection ID/name, data ID, and possibly validity
1219 # range).
1220 #
1221 # That means that we might want to return a query against either table
1222 # or a JOIN of both, depending on which quantities the caller wants.
1223 # But the data ID is always included, which means we'll always include
1224 # the tags/calibs table and join in the static dataset table only if we
1225 # need things from it that we can't get from the tags/calibs table.
1226 #
1227 # Note that it's important that we include a WHERE constraint on both
1228 # tables for any column (e.g. dataset_type_id) that is in both when
1229 # it's given explicitly; not doing can prevent the query planner from
1230 # using very important indexes. At present, we don't include those
1231 # redundant columns in the JOIN ON expression, however, because the
1232 # FOREIGN KEY (and its index) are defined only on dataset_id.
1233 columns = qt.ColumnSet(dataset_type.dimensions)
1234 columns.drop_implied_dimension_keys()
1235 fields_key: str | qt.AnyDatasetType = qt.ANY_DATASET if is_union else dataset_type.name
1236 columns.dataset_fields[fields_key].update(fields)
1237 tags_builder: SqlSelectBuilder | None = None
1238 if collection_types != {CollectionType.CALIBRATION}:
1239 # We'll need a subquery for the tags table if any of the given
1240 # collections are not a CALIBRATION collection. This intentionally
1241 # also fires when the list of collections is empty as a way to
1242 # create a dummy subquery that we know will fail.
1243 # We give the table an alias because it might appear multiple times
1244 # in the same query, for different dataset types.
1245 tags_table = self._get_tags_table(storage.dynamic_tables).alias(
1246 f"{dataset_type.name}_tags{'_union' if is_union else ''}"
1247 )
1248 tags_builder = self._finish_query_builder(
1249 storage,
1250 SqlJoinsBuilder(db=self._db, from_clause=tags_table).to_select_builder(columns),
1251 [record for record in collections if record.type is not CollectionType.CALIBRATION],
1252 fields,
1253 fields_key,
1254 )
1255 if "timespan" in fields:
1256 tags_builder.joins.timespans[fields_key] = self._db.getTimespanRepresentation().fromLiteral(
1257 Timespan(None, None)
1258 )
1259 assert "calib_pkey" not in fields, (
1260 "Calibration primary key for internal use only on calibration collections."
1261 )
1262 calibs_builder: SqlSelectBuilder | None = None
1263 if CollectionType.CALIBRATION in collection_types:
1264 # If at least one collection is a CALIBRATION collection, we'll
1265 # need a subquery for the calibs table, and could include the
1266 # timespan as a result or constraint.
1267 calibs_table = self._get_calibs_table(storage.dynamic_tables).alias(
1268 f"{dataset_type.name}_calibs{'_union' if is_union else ''}"
1269 )
1270 calibs_builder = self._finish_query_builder(
1271 storage,
1272 SqlJoinsBuilder(db=self._db, from_clause=calibs_table).to_select_builder(columns),
1273 [record for record in collections if record.type is CollectionType.CALIBRATION],
1274 fields,
1275 fields_key,
1276 )
1277 if "timespan" in fields:
1278 calibs_builder.joins.timespans[fields_key] = (
1279 self._db.getTimespanRepresentation().from_columns(calibs_table.columns)
1280 )
1281 if "calib_pkey" in fields:
1282 calibs_builder.joins.fields[fields_key]["calib_pkey"] = calibs_table.columns["id"]
1284 # In calibration collections, we need timespan as well as data ID
1285 # to ensure unique rows.
1286 calibs_builder.distinct = calibs_builder.distinct and "timespan" not in fields
1287 if tags_builder is not None:
1288 if calibs_builder is not None:
1289 # Need a UNION subquery.
1290 return tags_builder.union_subquery([calibs_builder])
1291 else:
1292 return tags_builder.into_joins_builder(postprocessing=None)
1293 elif calibs_builder is not None:
1294 return calibs_builder.into_joins_builder(postprocessing=None)
1295 else:
1296 raise AssertionError("Branch should be unreachable.")
1298 def _finish_query_builder(
1299 self,
1300 storage: _DatasetRecordStorage,
1301 sql_projection: SqlSelectBuilder,
1302 collections: Sequence[CollectionRecord],
1303 fields: Set[qt.AnyDatasetFieldName],
1304 fields_key: str | qt.AnyDatasetType,
1305 ) -> SqlSelectBuilder:
1306 # This method plays the same role as _finish_single_relation in the new
1307 # query system. It is called exactly one or two times by
1308 # make_sql_builder, just as _finish_single_relation is called exactly
1309 # one or two times by make_relation. See make_sql_builder comments for
1310 # what's different.
1311 assert sql_projection.joins.from_clause is not None
1312 run_collections_only = all(record.type is CollectionType.RUN for record in collections)
1313 sql_projection.joins.where(
1314 sql_projection.joins.from_clause.c.dataset_type_id == storage.dataset_type_id
1315 )
1316 dataset_id_col = sql_projection.joins.from_clause.c.dataset_id
1317 collection_col = sql_projection.joins.from_clause.c[self._collections.getCollectionForeignKeyName()]
1318 fields_provided = sql_projection.joins.fields[fields_key]
1319 # We always constrain and optionally retrieve the collection(s) via the
1320 # tags/calibs table.
1321 if "collection_key" in fields:
1322 sql_projection.joins.fields[fields_key]["collection_key"] = collection_col
1323 if len(collections) == 1:
1324 only_collection_record = collections[0]
1325 sql_projection.joins.where(collection_col == only_collection_record.key)
1326 if "collection" in fields:
1327 fields_provided["collection"] = sqlalchemy.literal(only_collection_record.name).cast(
1328 # This cast is necessary to ensure that Postgres knows the
1329 # type of this column if it is used in an aggregate
1330 # function.
1331 sqlalchemy.String
1332 )
1334 elif not collections:
1335 sql_projection.joins.where(sqlalchemy.literal(False))
1336 if "collection" in fields:
1337 fields_provided["collection"] = sqlalchemy.literal("NO COLLECTIONS")
1338 else:
1339 sql_projection.joins.where(collection_col.in_([collection.key for collection in collections]))
1340 if "collection" in fields:
1341 # Avoid a join to the collection table to get the name by using
1342 # a CASE statement. The SQL will be a bit more verbose but
1343 # more efficient.
1344 fields_provided["collection"] = _create_case_expression_for_collections(
1345 collections, collection_col
1346 )
1347 # Add more column definitions, starting with the data ID.
1348 sql_projection.joins.extract_dimensions(storage.dataset_type.dimensions.required)
1349 # We can always get the dataset_id from the tags/calibs table, even if
1350 # could also get it from the 'static' dataset table.
1351 if "dataset_id" in fields:
1352 fields_provided["dataset_id"] = dataset_id_col
1354 # It's possible we now have everything we need, from just the
1355 # tags/calibs table. The things we might need to get from the static
1356 # dataset table are the run key and the ingest date.
1357 need_static_table = False
1358 need_collection_table = False
1359 # Ingest date can only come from the static table.
1360 if "ingest_date" in fields:
1361 fields_provided["ingest_date"] = self._static.dataset.c.ingest_date
1362 need_static_table = True
1363 if "run" in fields:
1364 if len(collections) == 1 and run_collections_only:
1365 # If we are searching exactly one RUN collection, we
1366 # know that if we find the dataset in that collection,
1367 # then that's the datasets's run; we don't need to
1368 # query for it.
1369 #
1370 fields_provided["run"] = sqlalchemy.literal(only_collection_record.name).cast(
1371 # This cast is necessary to ensure that Postgres knows the
1372 # type of this column if it is used in an aggregate
1373 # function.
1374 sqlalchemy.String
1375 )
1376 elif run_collections_only:
1377 # Once again we can avoid joining to the collection table by
1378 # adding a CASE statement.
1379 fields_provided["run"] = _create_case_expression_for_collections(
1380 collections, self._static.dataset.c[self._run_key_column]
1381 )
1382 need_static_table = True
1383 else:
1384 # Here we can't avoid a join to the collection table, because
1385 # we might find a dataset via something other than its RUN
1386 # collection.
1387 #
1388 # We have to defer adding the join until after we have joined
1389 # in the static dataset table, because the ON clause involves
1390 # the run collection from the static dataset table. Postgres
1391 # cares about the join ordering (though SQLite does not.)
1392 need_collection_table = True
1393 need_static_table = True
1394 if need_static_table:
1395 # If we need the static table, join it in via dataset_id. We don't
1396 # use SqlJoinsBuilder.join because we're joining on dataset ID, not
1397 # dimensions.
1398 sql_projection.joins.from_clause = sql_projection.joins.from_clause.join(
1399 self._static.dataset, onclause=(dataset_id_col == self._static.dataset.c.id)
1400 )
1401 # Also constrain dataset_type_id in static table in case that helps
1402 # generate a better plan. We could also include this in the JOIN ON
1403 # clause, but my guess is that that's a good idea IFF it's in the
1404 # foreign key, and right now it isn't.
1405 sql_projection.joins.where(self._static.dataset.c.dataset_type_id == storage.dataset_type_id)
1406 if need_collection_table:
1407 # Join the collection table to look up the RUN collection name
1408 # associated with the dataset.
1409 (
1410 fields_provided["run"],
1411 sql_projection.joins.from_clause,
1412 ) = self._collections.lookup_name_sql(
1413 self._static.dataset.c[self._run_key_column],
1414 sql_projection.joins.from_clause,
1415 )
1417 sql_projection.distinct = (
1418 # If there are multiple collections, this subquery might have
1419 # non-unique rows.
1420 len(collections) > 1 and not fields
1421 )
1422 return sql_projection
1424 def refresh_collection_summaries(self, dataset_type: DatasetType) -> None:
1425 # Docstring inherited.
1426 if (storage := self._find_storage(dataset_type.name)) is None:
1427 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.")
1428 with self._db.transaction():
1429 # The main issue here is consistency in the presence of concurrent
1430 # updates (using default READ COMMITTED isolation). Regular clients
1431 # only add to summary tables, and we want to avoid deleting what
1432 # other concurrent transactions may add while we are in this
1433 # transaction. This ordering of operations should guarantee it:
1434 # - read collections for this dataset type from summary tables,
1435 # - read collections for this dataset type from dataset tables
1436 # (both tags and calibs),
1437 # - whatever is in the first set but not in the second can be
1438 # dropped from summary tables.
1439 summary_collection_ids = set(self._summaries.get_collection_ids(storage.dataset_type_id))
1441 # Query datasets tables for associated collections.
1442 column_name = self._collections.getCollectionForeignKeyName()
1443 tags_table = self._get_tags_table(storage.dynamic_tables)
1444 query: sqlalchemy.sql.expression.SelectBase = (
1445 sqlalchemy.select(tags_table.columns[column_name])
1446 .where(tags_table.columns.dataset_type_id == storage.dataset_type_id)
1447 .distinct()
1448 )
1449 if dataset_type.isCalibration():
1450 calibs_table = self._get_calibs_table(storage.dynamic_tables)
1451 query2 = (
1452 sqlalchemy.select(calibs_table.columns[column_name])
1453 .where(calibs_table.columns.dataset_type_id == storage.dataset_type_id)
1454 .distinct()
1455 )
1456 query = sqlalchemy.sql.expression.union(query, query2)
1458 with self._db.query(query) as result:
1459 collection_ids = set(result.scalars())
1461 collections_to_delete = summary_collection_ids - collection_ids
1462 self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete)
1464 def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table:
1465 return table.tags(self._db, type(self._collections), self._cache.tables)
1467 def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table:
1468 return table.calibs(self._db, type(self._collections), self._cache.tables)
1471def _create_case_expression_for_collections(
1472 collections: Iterable[CollectionRecord], id_column: sqlalchemy.ColumnElement
1473) -> sqlalchemy.ColumnElement:
1474 """Return a SQLAlchemy Case expression that converts collection IDs to
1475 collection names for the given set of collections.
1477 Parameters
1478 ----------
1479 collections : `~collections.abc.Iterable` [ `CollectionRecord` ]
1480 List of collections to include in conversion table. This should be an
1481 exhaustive list of collections that could appear in `id_column`.
1482 id_column : `sqlalchemy.ColumnElement`
1483 The column containing the collection ID that we want to convert to a
1484 collection name.
1485 """
1486 mapping = {record.key: record.name for record in collections}
1487 if not mapping:
1488 # SQLAlchemy does not correctly handle an empty mapping in case() -- it
1489 # crashes when trying to compile the expression with an
1490 # "AttributeError('NoneType' object has no attribute 'dialect_impl')"
1491 # when trying to access the 'type' property of the Case object. If you
1492 # explicitly specify a type via type_coerce it instead generates
1493 # invalid SQL syntax.
1494 #
1495 # We can end up with empty mappings here in certain "doomed query" edge
1496 # cases, e.g. we start with a list of valid collections but they are
1497 # all filtered out by higher-level code on the basis of collection
1498 # summaries.
1499 return sqlalchemy.cast(sqlalchemy.null(), sqlalchemy.String)
1501 return sqlalchemy.case(mapping, value=id_column)
1504def _ensure_dimension_groups_match(dataset_types: Iterable[DatasetType]) -> DimensionGroup:
1505 dimensions = set(dt.dimensions for dt in dataset_types)
1506 assert len(dimensions) > 0, "At least one dataset type is required"
1507 if len(dimensions) != 1:
1508 raise DatasetTypeError(
1509 "Dataset types have more than one dimension group.\n"
1510 f"Dataset types: {dataset_types}\n"
1511 f"Dimension groups: {dimensions}"
1512 )
1513 return dimensions.pop()