Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / _manager.py: 0%

582 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1from __future__ import annotations 

2 

3__all__ = ("ByDimensionsDatasetRecordStorageManagerUUID",) 

4 

5import dataclasses 

6import datetime 

7import logging 

8from collections import defaultdict 

9from collections.abc import Iterable, Mapping, Sequence, Set 

10from typing import TYPE_CHECKING, Any, ClassVar 

11 

12import astropy.time 

13import pydantic 

14import sqlalchemy 

15 

16from lsst.utils.iteration import chunk_iterable 

17 

18from .... import ddl 

19from ...._collection_type import CollectionType 

20from ...._dataset_ref import DatasetId, DatasetIdFactory, DatasetIdGenEnum, DatasetRef 

21from ...._dataset_type import DatasetType, get_dataset_type_name 

22from ...._exceptions import CollectionTypeError, MissingDatasetTypeError 

23from ...._exceptions_legacy import DatasetTypeError 

24from ...._timespan import Timespan 

25from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse 

26from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only 

27from ....queries import QueryFactoryFunction 

28from ....queries import tree as qt # new query system, both clients + server 

29from ..._caching_context import CachingContext 

30from ..._collection_summary import CollectionSummary 

31from ..._exceptions import ConflictingDefinitionError, DatasetTypeExpressionError, OrphanedRecordError 

32from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple 

33from ...wildcards import DatasetTypeWildcard 

34from ._dataset_type_cache import DatasetTypeCache 

35from .summaries import CollectionSummaryManager 

36from .tables import ( 

37 DynamicTables, 

38 StaticDatasetTableSpecTuple, 

39 StaticDatasetTablesTuple, 

40 addDatasetForeignKey, 

41 makeStaticTableSpecs, 

42 makeTagTableSpec, 

43) 

44 

45if TYPE_CHECKING: 

46 from ...interfaces import ( 

47 CollectionManager, 

48 CollectionRecord, 

49 Database, 

50 DimensionRecordStorageManager, 

51 StaticTablesContext, 

52 ) 

53 

54 

55# This has to be updated on every schema change 

56# TODO: 1.0.0 can be removed once all repos were migrated to 2.0.0. 

57_VERSION_UUID = VersionTuple(1, 0, 0) 

58# Starting with 2.0.0 the `ingest_date` column type uses nanoseconds instead 

59# of TIMESTAMP. The code supports both 1.0.0 and 2.0.0 for the duration of 

60# client migration period. 

61_VERSION_UUID_NS = VersionTuple(2, 0, 0) 

62 

63_LOG = logging.getLogger(__name__) 

64 

65 

66@dataclasses.dataclass 

67class _DatasetTypeRecord: 

68 """Contents of a single dataset type record.""" 

69 

70 dataset_type: DatasetType 

71 dataset_type_id: int 

72 dimensions_key: int 

73 tag_table_name: str 

74 calib_table_name: str | None 

75 

76 def make_dynamic_tables(self) -> DynamicTables: 

77 return DynamicTables( 

78 self.dataset_type.dimensions, self.dimensions_key, self.tag_table_name, self.calib_table_name 

79 ) 

80 

81 def update_dynamic_tables(self, current: DynamicTables) -> DynamicTables: 

82 assert self.dimensions_key == current.dimensions_key 

83 assert self.tag_table_name == current.tags_name 

84 if self.calib_table_name is not None: 

85 if current.calibs_name is not None: 

86 assert self.calib_table_name == current.calibs_name 

87 else: 

88 # Some previously-cached dataset type had the same dimensions 

89 # but was not a calibration. 

90 current = current.copy(calibs_name=self.calib_table_name) 

91 # If some previously-cached dataset type was a calibration but this 

92 # one isn't, we don't want to forget the calibs table. 

93 return current 

94 

95 

96@dataclasses.dataclass 

97class _DatasetRecordStorage: 

98 """Information cached about a dataset type. 

99 

100 This combines information cached with different keys - the dataset type 

101 and its ID are cached by name, while the tables are cached by the dataset 

102 types dimensions (and hence shared with other dataset types that have the 

103 same dimensions). 

104 """ 

105 

106 dataset_type: DatasetType 

107 dataset_type_id: int 

108 dynamic_tables: DynamicTables 

109 

110 

111class _DatasetTypeOverride(pydantic.BaseModel): 

112 """A configuration struct that holds storage class overrides and/or a new 

113 name for a dataset type. 

114 """ 

115 

116 storageClass: str | None = None 

117 rename: str | None = None 

118 

119 

120class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager): 

121 """A manager class for datasets that uses one dataset-collection table for 

122 each group of dataset types that share the same dimensions. 

123 

124 In addition to the table organization, this class makes a number of 

125 other design choices that would have been cumbersome (to say the least) to 

126 try to pack into its name: 

127 

128 - It uses a private surrogate integer autoincrement field to identify 

129 dataset types, instead of using the name as the primary and foreign key 

130 directly. 

131 

132 - It aggressively loads all DatasetTypes into memory instead of fetching 

133 them from the database only when needed or attempting more clever forms 

134 of caching. 

135 

136 Alternative implementations that make different choices for these while 

137 keeping the same general table organization might be reasonable as well. 

138 

139 Parameters 

140 ---------- 

141 db : `Database` 

142 Interface to the underlying database engine and namespace. 

143 collections : `CollectionManager` 

144 Manager object for the collections in this `Registry`. 

145 dimensions : `DimensionRecordStorageManager` 

146 Manager object for the dimensions in this `Registry`. 

147 static : `StaticDatasetTablesTuple` 

148 Named tuple of `sqlalchemy.schema.Table` instances for all static 

149 tables used by this class. 

150 summaries : `CollectionSummaryManager` 

151 Structure containing tables that summarize the contents of collections. 

152 registry_schema_version : `VersionTuple` or `None`, optional 

153 Version of registry schema. 

154 renames 

155 Mapping from a dataset type name in the database to an client-level 

156 override name. 

157 reversed_renames 

158 Mapping from the override name for a dataset type to its true (DB) 

159 name. 

160 storage_class_overrides 

161 Mapping from the true (DB) name of a dataset type to an override 

162 storage class this client should use instead of the one in the 

163 database. 

164 _cache : `None`, optional 

165 For internal use only. 

166 """ 

167 

168 def __init__( 

169 self, 

170 *, 

171 db: Database, 

172 collections: CollectionManager, 

173 dimensions: DimensionRecordStorageManager, 

174 static: StaticDatasetTablesTuple, 

175 summaries: CollectionSummaryManager, 

176 renames: Mapping[str, str], 

177 reversed_renames: Mapping[str, str], 

178 storage_class_overrides: Mapping[str, str], 

179 registry_schema_version: VersionTuple | None = None, 

180 _cache: DatasetTypeCache | None = None, 

181 ): 

182 super().__init__(registry_schema_version=registry_schema_version) 

183 self._db = db 

184 self._collections = collections 

185 self._dimensions = dimensions 

186 self._static = static 

187 self._summaries = summaries 

188 self._cache = _cache if _cache is not None else DatasetTypeCache() 

189 self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai 

190 self._run_key_column = collections.getRunForeignKeyName() 

191 self._renames = renames 

192 self._reversed_renames = reversed_renames 

193 self._storage_class_overrides = storage_class_overrides 

194 

195 _versions: ClassVar[list[VersionTuple]] = [_VERSION_UUID, _VERSION_UUID_NS] 

196 

197 _id_maker: ClassVar[DatasetIdFactory] = DatasetIdFactory() 

198 """Factory for dataset IDs. In the future this factory may be shared with 

199 other classes (e.g. Registry). 

200 """ 

201 

202 @classmethod 

203 def initialize( 

204 cls, 

205 db: Database, 

206 context: StaticTablesContext, 

207 *, 

208 collections: CollectionManager, 

209 dimensions: DimensionRecordStorageManager, 

210 caching_context: CachingContext, 

211 config: Mapping, 

212 registry_schema_version: VersionTuple | None = None, 

213 ) -> DatasetRecordStorageManager: 

214 # Docstring inherited from DatasetRecordStorageManager. 

215 specs = cls.makeStaticTableSpecs( 

216 type(collections), universe=dimensions.universe, schema_version=registry_schema_version 

217 ) 

218 static: StaticDatasetTablesTuple = context.addTableTuple(specs) # type: ignore 

219 dataset_type_overrides = { 

220 dataset_type_name: _DatasetTypeOverride.model_validate(override_config) 

221 for dataset_type_name, override_config in config.get("overrides", {}).items() 

222 } 

223 renames = { 

224 db_name: override.rename 

225 for db_name, override in dataset_type_overrides.items() 

226 if override.rename is not None 

227 } 

228 reversed_renames = {v: k for k, v in renames.items()} 

229 storage_class_overrides = { 

230 db_name: override.storageClass 

231 for db_name, override in dataset_type_overrides.items() 

232 if override.storageClass is not None 

233 } 

234 summaries = CollectionSummaryManager.initialize( 

235 db, 

236 context, 

237 collections=collections, 

238 dimensions=dimensions, 

239 dataset_type_table=static.dataset_type, 

240 caching_context=caching_context, 

241 reversed_renames=reversed_renames, 

242 ) 

243 return cls( 

244 db=db, 

245 collections=collections, 

246 dimensions=dimensions, 

247 static=static, 

248 summaries=summaries, 

249 registry_schema_version=registry_schema_version, 

250 renames=renames, 

251 reversed_renames=reversed_renames, 

252 storage_class_overrides=storage_class_overrides, 

253 ) 

254 

255 @classmethod 

256 def currentVersions(cls) -> list[VersionTuple]: 

257 # Docstring inherited from VersionedExtension. 

258 return cls._versions 

259 

260 @classmethod 

261 def makeStaticTableSpecs( 

262 cls, 

263 collections: type[CollectionManager], 

264 universe: DimensionUniverse, 

265 schema_version: VersionTuple | None, 

266 ) -> StaticDatasetTableSpecTuple: 

267 """Construct all static tables used by the classes in this package. 

268 

269 Static tables are those that are present in all Registries and do not 

270 depend on what DatasetTypes have been registered. 

271 

272 Parameters 

273 ---------- 

274 collections : `CollectionManager` 

275 Manager object for the collections in this `Registry`. 

276 universe : `DimensionUniverse` 

277 Universe graph containing all dimensions known to this `Registry`. 

278 schema_version : `VersionTuple` or `None` 

279 Version of the schema that should be created, if `None` then 

280 default schema should be used. 

281 

282 Returns 

283 ------- 

284 specs : `StaticDatasetTablesTuple` 

285 A named tuple containing `ddl.TableSpec` instances. 

286 """ 

287 schema_version = cls.clsNewSchemaVersion(schema_version) 

288 assert schema_version is not None, "New schema version cannot be None" 

289 return makeStaticTableSpecs( 

290 collections, 

291 universe=universe, 

292 schema_version=schema_version, 

293 ) 

294 

295 @classmethod 

296 def addDatasetForeignKey( 

297 cls, 

298 tableSpec: ddl.TableSpec, 

299 *, 

300 name: str = "dataset", 

301 constraint: bool = True, 

302 onDelete: str | None = None, 

303 **kwargs: Any, 

304 ) -> ddl.FieldSpec: 

305 # Docstring inherited from DatasetRecordStorageManager. 

306 return addDatasetForeignKey(tableSpec, name=name, onDelete=onDelete, constraint=constraint, **kwargs) 

307 

308 @classmethod 

309 def _newDefaultSchemaVersion(cls) -> VersionTuple: 

310 # Docstring inherited from VersionedExtension. 

311 return _VERSION_UUID_NS 

312 

313 def clone( 

314 self, 

315 *, 

316 db: Database, 

317 collections: CollectionManager, 

318 dimensions: DimensionRecordStorageManager, 

319 caching_context: CachingContext, 

320 ) -> ByDimensionsDatasetRecordStorageManagerUUID: 

321 return ByDimensionsDatasetRecordStorageManagerUUID( 

322 db=db, 

323 collections=collections, 

324 dimensions=dimensions, 

325 static=self._static, 

326 summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context), 

327 renames=self._renames, 

328 reversed_renames=self._reversed_renames, 

329 storage_class_overrides=self._storage_class_overrides, 

330 registry_schema_version=self._registry_schema_version, 

331 # See notes on DatasetTypeCache.clone() about cache behavior after 

332 # cloning. 

333 _cache=self._cache.clone(), 

334 ) 

335 

336 def refresh(self) -> None: 

337 # Docstring inherited from DatasetRecordStorageManager. 

338 self._cache.clear() 

339 

340 def remove_dataset_type(self, name: str) -> None: 

341 # Docstring inherited from DatasetRecordStorageManager. 

342 db_name = self._reversed_renames.get(name, name) 

343 compositeName, componentName = DatasetType.splitDatasetTypeName(name) 

344 if componentName is not None: 

345 raise ValueError(f"Cannot delete a dataset type of a component of a composite (given {name})") 

346 

347 # Delete the row 

348 try: 

349 self._db.delete(self._static.dataset_type, ["name"], {"name": db_name}) 

350 except sqlalchemy.exc.IntegrityError as e: 

351 msg = name if db_name == name else f"{db_name} (renamed to {name})" 

352 raise OrphanedRecordError( 

353 f"Dataset type {msg} can not be removed." 

354 " It is associated with datasets that must be removed first." 

355 ) from e 

356 

357 # Now refresh everything -- removal is rare enough that this does 

358 # not need to be fast. 

359 self.refresh() 

360 

361 def get_dataset_type(self, name: str) -> DatasetType: 

362 # Docstring inherited from DatasetRecordStorageManager. 

363 return self._find_storage(name).dataset_type 

364 

365 def register_dataset_type(self, dataset_type: DatasetType) -> bool: 

366 # Docstring inherited from DatasetRecordStorageManager. 

367 # 

368 # This is one of the places where we populate the dataset type cache. 

369 # See the comment in _fetch_dataset_types for how these are related and 

370 # invariants they must maintain. 

371 # 

372 if dataset_type.isComponent(): 

373 raise ValueError( 

374 f"Component dataset types can not be stored in registry. Rejecting {dataset_type.name}" 

375 ) 

376 

377 # If database universe and dimension group universe are different it 

378 # can cause unexpected effects. 

379 if dataset_type.dimensions.universe is not self._dimensions.universe: 

380 raise ValueError( 

381 "Incompatible dimension universe versions - " 

382 f"database universe: {self._dimensions.universe}, " 

383 f"dataset type universe: {dataset_type.dimensions.universe}." 

384 ) 

385 

386 record = self._fetch_dataset_type_record(dataset_type.name) 

387 if record is None: 

388 if (dynamic_tables := self._cache.get_by_dimensions(dataset_type.dimensions)) is None: 

389 dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions) 

390 dynamic_tables = DynamicTables.from_dimensions_key( 

391 dataset_type.dimensions, dimensions_key, dataset_type.isCalibration() 

392 ) 

393 dynamic_tables.create(self._db, type(self._collections), self._cache.tables) 

394 elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None: 

395 dynamic_tables = dynamic_tables.add_calibs( 

396 self._db, type(self._collections), self._cache.tables 

397 ) 

398 db_name = self._reversed_renames.get(dataset_type.name, dataset_type.name) 

399 if db_name == dataset_type.name and db_name in self._renames: 

400 # We were asked to fetch a dataset type that has been renamed 

401 # a config override, with no other existing dataset type 

402 # to replace it. It should appear missing, but we still can't 

403 # register it. 

404 raise ConflictingDefinitionError( 

405 f"A dataset type with the name {db_name!r} has been renamed to , " 

406 f"{self._renames[db_name]!r} in this client; it can only be registered " 

407 "from a client without any rename configuration." 

408 ) 

409 if db_name in self._storage_class_overrides: 

410 raise ConflictingDefinitionError( 

411 f"Dataset type {db_name!r} has had its storage class overridden to " 

412 f"{dataset_type.storageClass_name!r} via butler configuration, and cannot " 

413 "be registered via this client since the database-level storage class is unknown." 

414 ) 

415 row, inserted = self._db.sync( 

416 self._static.dataset_type, 

417 keys={"name": db_name}, 

418 compared={ 

419 "dimensions_key": dynamic_tables.dimensions_key, 

420 # Force the storage class to be loaded to ensure it 

421 # exists and there is no typo in the name. 

422 "storage_class": dataset_type.storageClass.name, 

423 }, 

424 extra={ 

425 "tag_association_table": dynamic_tables.tags_name, 

426 "calibration_association_table": ( 

427 dynamic_tables.calibs_name if dataset_type.isCalibration() else None 

428 ), 

429 }, 

430 returning=["id", "tag_association_table"], 

431 ) 

432 # Make sure that cache is updated 

433 if row is not None: 

434 self._cache.add(dataset_type, row["id"]) 

435 self._cache.add_by_dimensions(dataset_type.dimensions, dynamic_tables) 

436 else: 

437 if dataset_type != record.dataset_type: 

438 raise ConflictingDefinitionError( 

439 f"Given dataset type {dataset_type} is inconsistent " 

440 f"with database definition {record.dataset_type}." 

441 ) 

442 inserted = False 

443 return bool(inserted) 

444 

445 def resolve_wildcard( 

446 self, 

447 expression: Any, 

448 missing: list[str] | None = None, 

449 explicit_only: bool = False, 

450 ) -> list[DatasetType]: 

451 wildcard = DatasetTypeWildcard.from_expression(expression) 

452 result: list[DatasetType] = [] 

453 for name, dataset_type in wildcard.values.items(): 

454 parent_name, component_name = DatasetType.splitDatasetTypeName(name) 

455 if component_name is not None: 

456 raise DatasetTypeError( 

457 "Component dataset types are not supported in Registry methods; use DatasetRef or " 

458 "DatasetType methods to obtain components from parents instead." 

459 ) 

460 try: 

461 resolved_dataset_type = self.get_dataset_type(parent_name) 

462 except MissingDatasetTypeError: 

463 if missing is not None: 

464 missing.append(name) 

465 else: 

466 if dataset_type is not None: 

467 if dataset_type.is_compatible_with(resolved_dataset_type): 

468 # Prefer the given dataset type to enable storage class 

469 # conversions. 

470 resolved_dataset_type = dataset_type 

471 else: 

472 raise DatasetTypeError( 

473 f"Dataset type definition in query expression {dataset_type} is " 

474 f"not compatible with the registered type {resolved_dataset_type}." 

475 ) 

476 result.append(resolved_dataset_type) 

477 if wildcard.patterns is ...: 

478 if explicit_only: 

479 raise TypeError( 

480 "Universal wildcard '...' is not permitted for dataset types in this context." 

481 ) 

482 for datasetType in self._fetch_dataset_types(): 

483 result.append(datasetType) 

484 elif wildcard.patterns: 

485 if explicit_only: 

486 raise DatasetTypeExpressionError( 

487 "Dataset type wildcard expressions are not supported in this context." 

488 ) 

489 dataset_types = self._fetch_dataset_types() 

490 for datasetType in dataset_types: 

491 if any(p.fullmatch(datasetType.name) for p in wildcard.patterns): 

492 result.append(datasetType) 

493 

494 return result 

495 

496 def get_dataset_refs(self, ids: list[DatasetId]) -> list[DatasetRef]: 

497 dataset_type_map: dict[DatasetId, DatasetType] = {} 

498 for batch in chunk_iterable(set(ids), 50000): 

499 # Look up the dataset types corresponding to the given Dataset IDs. 

500 id_col = self._static.dataset.columns["id"] 

501 sql = sqlalchemy.sql.select( 

502 id_col, 

503 self._static.dataset.columns["dataset_type_id"], 

504 ).where(id_col.in_(batch)) 

505 with self._db.query(sql) as sql_result: 

506 dataset_rows = sql_result.mappings().all() 

507 for row in dataset_rows: 

508 dataset_type_map[row["id"]] = self._get_dataset_type_by_id(row["dataset_type_id"]) 

509 

510 # Group the given dataset IDs by the DimensionGroup of their dataset 

511 # types -- there is a separate tags table for each DimensionGroup. 

512 dimension_groups = defaultdict[DimensionGroup, set[DatasetId]](set) 

513 for id, dataset_type in dataset_type_map.items(): 

514 dimension_groups[dataset_type.dimensions].add(id) 

515 

516 output_refs: list[DatasetRef] = [] 

517 for dimension_group, datasets in dimension_groups.items(): 

518 # Query the tags table for each dimension group to look up the 

519 # data IDs corresponding to the UUIDs found from the dataset table. 

520 dynamic_tables = self._get_dynamic_tables(dimension_group) 

521 tags_table = self._get_tags_table(dynamic_tables) 

522 for batch in chunk_iterable(datasets, 50000): 

523 tags_sql = tags_table.select().where(tags_table.columns["dataset_id"].in_(batch)) 

524 # Join in the collection table to fetch the run name. 

525 collection_column = tags_table.columns[self._collections.getCollectionForeignKeyName()] 

526 joined_collections = self._collections.join_collections_sql(collection_column, tags_sql) 

527 tags_sql = joined_collections.joined_sql 

528 run_name_column = joined_collections.name_column 

529 tags_sql = tags_sql.add_columns(run_name_column) 

530 # Tags table includes run collections and tagged 

531 # collections. 

532 # In theory the data ID for a given dataset should be the 

533 # same in both, but nothing actually guarantees this. 

534 # So skip any tagged collections, using the run collection 

535 # as the definitive definition. 

536 tags_sql = tags_sql.where(joined_collections.type_column == int(CollectionType.RUN)) 

537 

538 with self._db.query(tags_sql) as sql_result: 

539 data_id_rows = sql_result.mappings().all() 

540 

541 assert run_name_column.key is not None 

542 for data_id_row in data_id_rows: 

543 id = data_id_row["dataset_id"] 

544 dataset_type = dataset_type_map[id] 

545 run_name = data_id_row[run_name_column.key] 

546 data_id = DataCoordinate.from_required_values( 

547 dimension_group, 

548 tuple(data_id_row[dimension] for dimension in dimension_group.required), 

549 ) 

550 ref = DatasetRef( 

551 datasetType=dataset_type, 

552 dataId=data_id, 

553 id=id, 

554 run=run_name, 

555 ) 

556 output_refs.append(ref) 

557 

558 return output_refs 

559 

560 def _fetch_dataset_type_record(self, name: str) -> _DatasetTypeRecord | None: 

561 """Retrieve all dataset types defined in database. 

562 

563 Returns 

564 ------- 

565 dataset_type 

566 Information from a single database record. 

567 """ 

568 db_name = self._reversed_renames.get(name, name) 

569 if db_name == name and db_name in self._renames: 

570 # We were asked to fetch a dataset type that has been renamed via 

571 # a config override, with no other existing dataset type renamed 

572 # to replace it. It should appear missing. 

573 return None 

574 c = self._static.dataset_type.columns 

575 stmt = self._static.dataset_type.select().where(c.name == db_name) 

576 with self._db.query(stmt) as sql_result: 

577 row = sql_result.mappings().one_or_none() 

578 if row is None: 

579 return None 

580 else: 

581 return self._record_from_row(row) 

582 

583 def _record_from_row(self, row: Mapping) -> _DatasetTypeRecord: 

584 name = self._renames.get(row["name"], row["name"]) 

585 dimensions = self._dimensions.load_dimension_group(row["dimensions_key"]) 

586 calibTableName = row["calibration_association_table"] 

587 storage_class_name = self._storage_class_overrides.get(row["name"], row["storage_class"]) 

588 datasetType = DatasetType( 

589 name, dimensions, storage_class_name, isCalibration=(calibTableName is not None) 

590 ) 

591 return _DatasetTypeRecord( 

592 dataset_type=datasetType, 

593 dataset_type_id=row["id"], 

594 dimensions_key=row["dimensions_key"], 

595 tag_table_name=row["tag_association_table"], 

596 calib_table_name=calibTableName, 

597 ) 

598 

599 def _dataset_type_from_row(self, row: Mapping) -> DatasetType: 

600 return self._record_from_row(row).dataset_type 

601 

602 def preload_cache(self) -> None: 

603 self._fetch_dataset_types() 

604 

605 def _fetch_dataset_types(self, force_refresh: bool = False) -> list[DatasetType]: 

606 """Fetch list of all defined dataset types.""" 

607 # This is one of two places we populate the dataset type cache: 

608 # 

609 # - This method handles almost all requests for dataset types that 

610 # should already exist. It always marks the cache as "full" in both 

611 # dataset type names and dimensions. 

612 # 

613 # - register_dataset_type handles the case where the dataset type might 

614 # not existing yet. Since it can only add a single dataset type, it 

615 # never changes whether the cache is full. 

616 # 

617 # In both cases, we require that the per-dimensions data be cached 

618 # whenever a dataset type is added to the cache by name, to reduce the 

619 # number of possible states the cache can be in and minimize the number 

620 # of queries. 

621 if self._cache.full and not force_refresh: 

622 return [dataset_type for dataset_type, _ in self._cache.items()] 

623 with self._db.query(self._static.dataset_type.select()) as sql_result: 

624 sql_rows = sql_result.mappings().fetchall() 

625 records = [self._record_from_row(row) for row in sql_rows] 

626 # Cache everything and specify that cache is complete. 

627 cache_data: list[tuple[DatasetType, int]] = [] 

628 cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {} 

629 for record in records: 

630 cache_data.append((record.dataset_type, record.dataset_type_id)) 

631 if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: 

632 tables = record.make_dynamic_tables() 

633 else: 

634 tables = record.update_dynamic_tables(dynamic_tables) 

635 cache_dimensions_data[record.dataset_type.dimensions] = tables 

636 self._cache.set( 

637 cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True 

638 ) 

639 return [record.dataset_type for record in records] 

640 

641 def _get_dataset_type_by_id(self, id: int) -> DatasetType: 

642 dt = self._cache.get_by_id(id) 

643 if dt is None: 

644 # Since the ID is not a concept exposed to the public API, it 

645 # had to have come from a dataset table, and our cache must be 

646 # empty or out of date. 

647 self._fetch_dataset_types(force_refresh=True) 

648 dt = self._cache.get_by_id(id) 

649 if dt is None: 

650 raise RuntimeError(f"Failed to look up dataset type with ID {id}") 

651 return dt 

652 

653 def _find_storage(self, name: str) -> _DatasetRecordStorage: 

654 """Find a dataset type and the extra information needed to work with 

655 it, utilizing and populating the cache as needed. 

656 """ 

657 dataset_type, dataset_type_id = self._cache.get(name) 

658 if dataset_type is not None: 

659 tables = self._get_dynamic_tables(dataset_type.dimensions) 

660 assert dataset_type_id is not None, "Dataset type cache population is incomplete." 

661 return _DatasetRecordStorage( 

662 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables 

663 ) 

664 else: 

665 # On the first cache miss populate the cache with complete list 

666 # of dataset types (if it was not done yet). 

667 if not self._cache.full: 

668 self._fetch_dataset_types() 

669 # Try again 

670 dataset_type, dataset_type_id = self._cache.get(name) 

671 if dataset_type is not None: 

672 tables = self._get_dynamic_tables(dataset_type.dimensions) 

673 assert dataset_type_id is not None, "Dataset type cache population is incomplete." 

674 return _DatasetRecordStorage( 

675 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables 

676 ) 

677 record = self._fetch_dataset_type_record(name) 

678 if record is not None: 

679 self._cache.add(record.dataset_type, record.dataset_type_id) 

680 tables = record.make_dynamic_tables() 

681 self._cache.add_by_dimensions(record.dataset_type.dimensions, tables) 

682 return _DatasetRecordStorage(record.dataset_type, record.dataset_type_id, tables) 

683 raise MissingDatasetTypeError(f"Dataset type {name!r} does not exist.") 

684 

685 def _get_dynamic_tables(self, dimensions: DimensionGroup) -> DynamicTables: 

686 tables = self._cache.get_by_dimensions(dimensions) 

687 assert tables is not None, ( 

688 "_fetch_dataset_types is supposed to guarantee that the tables cache is populated." 

689 ) 

690 return tables 

691 

692 def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: 

693 # Docstring inherited from DatasetRecordStorageManager. 

694 summaries = self._summaries.fetch_summaries([collection], None, self._dataset_type_from_row) 

695 return summaries[collection.key] 

696 

697 def fetch_summaries( 

698 self, 

699 collections: Iterable[CollectionRecord], 

700 dataset_types: Iterable[DatasetType] | Iterable[str] | None = None, 

701 ) -> Mapping[Any, CollectionSummary]: 

702 # Docstring inherited from DatasetRecordStorageManager. 

703 dataset_type_names: Iterable[str] | None = None 

704 if dataset_types is not None: 

705 dataset_type_names = set(get_dataset_type_name(dt) for dt in dataset_types) 

706 return self._summaries.fetch_summaries(collections, dataset_type_names, self._dataset_type_from_row) 

707 

708 def fetch_run_dataset_ids(self, run: RunRecord) -> list[DatasetId]: 

709 # Docstring inherited. 

710 sql = sqlalchemy.select(self._static.dataset.c.id).where( 

711 self._static.dataset.c[self._run_key_column] == run.key 

712 ) 

713 with self._db.query(sql) as result: 

714 return list(result.scalars()) 

715 

716 def ingest_date_dtype(self) -> type: 

717 """Return type of the ``ingest_date`` column.""" 

718 schema_version = self.newSchemaVersion() 

719 if schema_version is not None and schema_version.major > 1: 

720 return ddl.AstropyTimeNsecTai 

721 else: 

722 return sqlalchemy.TIMESTAMP 

723 

724 def insert( 

725 self, 

726 dataset_type_name: str, 

727 run: RunRecord, 

728 data_ids: Iterable[DataCoordinate], 

729 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

730 ) -> list[DatasetRef]: 

731 # Docstring inherited from DatasetRecordStorageManager. 

732 if (storage := self._find_storage(dataset_type_name)) is None: 

733 raise MissingDatasetTypeError(f"Dataset type {dataset_type_name!r} has not been registered.") 

734 # Current timestamp, type depends on schema version. Use microsecond 

735 # precision for astropy time to keep things consistent with 

736 # TIMESTAMP(6) SQL type. 

737 timestamp: datetime.datetime | astropy.time.Time 

738 if self._use_astropy_ingest_date: 

739 # Astropy `now()` precision should be the same as `now()` which 

740 # should mean microsecond. 

741 timestamp = astropy.time.Time.now() 

742 else: 

743 timestamp = datetime.datetime.now(datetime.UTC) 

744 

745 # Iterate over data IDs, transforming a possibly-single-pass iterable 

746 # into a list. 

747 data_id_list: list[DataCoordinate] = [] 

748 rows = [] 

749 summary = CollectionSummary() 

750 for dataId in summary.add_data_ids_generator(storage.dataset_type, data_ids): 

751 data_id_list.append(dataId) 

752 rows.append( 

753 { 

754 "id": self._id_maker.makeDatasetId( 

755 run.name, storage.dataset_type, dataId, id_generation_mode 

756 ), 

757 "dataset_type_id": storage.dataset_type_id, 

758 self._run_key_column: run.key, 

759 "ingest_date": timestamp, 

760 } 

761 ) 

762 if not rows: 

763 # Just in case an empty collection is provided we want to avoid 

764 # adding dataset type to summary tables. 

765 return [] 

766 

767 with self._db.transaction(): 

768 # Insert into the static dataset table. 

769 self._db.insert(self._static.dataset, *rows) 

770 # Update the summary tables for this collection in case this is the 

771 # first time this dataset type or these governor values will be 

772 # inserted there. 

773 self._summaries.update(run, [storage.dataset_type_id], summary) 

774 # Combine the generated dataset_id values and data ID fields to 

775 # form rows to be inserted into the tags table. 

776 protoTagsRow = { 

777 "dataset_type_id": storage.dataset_type_id, 

778 self._collections.getCollectionForeignKeyName(): run.key, 

779 } 

780 tagsRows = [ 

781 dict(protoTagsRow, dataset_id=row["id"], **dataId.required) 

782 for dataId, row in zip(data_id_list, rows, strict=True) 

783 ] 

784 # Insert those rows into the tags table. 

785 self._db.insert(self._get_tags_table(storage.dynamic_tables), *tagsRows) 

786 

787 return [ 

788 DatasetRef( 

789 datasetType=storage.dataset_type, 

790 dataId=dataId, 

791 id=row["id"], 

792 run=run.name, 

793 ) 

794 for dataId, row in zip(data_id_list, rows, strict=True) 

795 ] 

796 

797 def import_(self, run: RunRecord, refs: list[DatasetRef], assume_new: bool = False) -> None: 

798 # Docstring inherited from DatasetRecordStorageManager. 

799 if not refs: 

800 # Just in case an empty mapping is provided we want to avoid 

801 # adding dataset type to summary tables. 

802 return 

803 assert all(ref.run == run.name for ref in refs), ( 

804 "Run names in refs must match the run we are inserting into" 

805 ) 

806 

807 dataset_types = {ref.datasetType for ref in refs} 

808 dimensions = _ensure_dimension_groups_match(dataset_types) 

809 dataset_type_storage: dict[str, _DatasetRecordStorage] = {} 

810 for dt in dataset_types: 

811 if (storage := self._find_storage(dt.name)) is None: 

812 raise MissingDatasetTypeError(f"Dataset type {dt.name!r} has not been registered.") 

813 dataset_type_storage[dt.name] = storage 

814 

815 dynamic_tables = self._get_dynamic_tables(dimensions) 

816 tags_table = self._get_tags_table(dynamic_tables) 

817 # Current timestamp, type depends on schema version. 

818 if self._use_astropy_ingest_date: 

819 # Astropy `now()` precision should be the same as `now()` which 

820 # should mean microsecond. 

821 timestamp = sqlalchemy.sql.literal(astropy.time.Time.now(), type_=ddl.AstropyTimeNsecTai) 

822 else: 

823 timestamp = sqlalchemy.sql.literal(datetime.datetime.now(datetime.UTC)) 

824 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

825 tags_rows = [ 

826 { 

827 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id, 

828 collection_fkey_name: run.key, 

829 "dataset_id": ref.id, 

830 **ref.dataId.required, 

831 } 

832 for ref in refs 

833 ] 

834 if assume_new: 

835 self._import_new(run, refs, dataset_type_storage, tags_table, tags_rows, timestamp) 

836 else: 

837 self._import_guarded( 

838 run, refs, dimensions, dataset_type_storage, tags_table, tags_rows, timestamp 

839 ) 

840 

841 def _import_guarded( 

842 self, 

843 run: RunRecord, 

844 refs: list[DatasetRef], 

845 dimensions: DimensionGroup, 

846 dataset_type_storage: dict[str, _DatasetRecordStorage], 

847 tags_table: sqlalchemy.Table, 

848 tags_rows: list[dict[str, object]], 

849 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime], 

850 ) -> None: 

851 # We'll insert all new rows into a temporary table 

852 table_spec = makeTagTableSpec(dimensions, type(self._collections), constraints=False) 

853 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

854 with self._db.transaction(for_temp_tables=True), self._db.temporary_table(table_spec) as tmp_tags: 

855 # store all incoming data in a temporary table 

856 self._db.insert(tmp_tags, *tags_rows) 

857 # There are some checks that we want to make for consistency 

858 # of the new datasets with existing ones. 

859 self._validate_import(dimensions, tags_table, tmp_tags, run) 

860 # Before we merge temporary table into dataset/tags we need to 

861 # drop datasets which are already there (and do not conflict). 

862 self._db.deleteWhere( 

863 tmp_tags, 

864 tmp_tags.columns.dataset_id.in_(sqlalchemy.sql.select(self._static.dataset.columns.id)), 

865 ) 

866 # Copy it into dataset table, need to re-label some columns. 

867 self._db.insert( 

868 self._static.dataset, 

869 select=sqlalchemy.sql.select( 

870 tmp_tags.columns.dataset_id.label("id"), 

871 tmp_tags.columns.dataset_type_id, 

872 tmp_tags.columns[collection_fkey_name].label(self._run_key_column), 

873 timestamp.label("ingest_date"), 

874 ), 

875 ) 

876 self._update_summaries(run, refs, dataset_type_storage) 

877 # Copy from temp table into tags table. 

878 self._db.insert(tags_table, select=tmp_tags.select()) 

879 

880 def _update_summaries( 

881 self, run: RunRecord, refs: list[DatasetRef], dataset_type_storage: dict[str, _DatasetRecordStorage] 

882 ) -> None: 

883 summary = CollectionSummary() 

884 summary.add_datasets(refs) 

885 self._summaries.update( 

886 run, [storage.dataset_type_id for storage in dataset_type_storage.values()], summary 

887 ) 

888 

889 def _validate_import( 

890 self, 

891 dimensions: DimensionGroup, 

892 tags: sqlalchemy.schema.Table, 

893 tmp_tags: sqlalchemy.schema.Table, 

894 run: RunRecord, 

895 ) -> None: 

896 """Validate imported refs against existing datasets. 

897 

898 Parameters 

899 ---------- 

900 dimensions : `DimensionGroup` 

901 Dimensions to validate. 

902 tags : `sqlalchemy.schema.Table` 

903 ??? 

904 tmp_tags : `sqlalchemy.schema.Table` 

905 Temporary table with new datasets and the same schema as tags 

906 table. 

907 run : `RunRecord` 

908 The record object describing the `~CollectionType.RUN` collection. 

909 

910 Raises 

911 ------ 

912 ConflictingDefinitionError 

913 Raise if new datasets conflict with existing ones. 

914 """ 

915 dataset = self._static.dataset 

916 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

917 

918 # Check that existing datasets have the same dataset type and 

919 # run. 

920 query = ( 

921 sqlalchemy.sql.select( 

922 dataset.columns.id.label("dataset_id"), 

923 dataset.columns.dataset_type_id.label("dataset_type_id"), 

924 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"), 

925 dataset.columns[self._run_key_column].label("run"), 

926 tmp_tags.columns[collection_fkey_name].label("new_run"), 

927 ) 

928 .select_from(dataset.join(tmp_tags, dataset.columns.id == tmp_tags.columns.dataset_id)) 

929 .where( 

930 sqlalchemy.sql.or_( 

931 dataset.columns.dataset_type_id != tmp_tags.columns.dataset_type_id, 

932 dataset.columns[self._run_key_column] != tmp_tags.columns[collection_fkey_name], 

933 ) 

934 ) 

935 .limit(1) 

936 ) 

937 with self._db.query(query) as result: 

938 # Only include the first one in the exception message 

939 if (row := result.first()) is not None: 

940 existing_run = self._collections[row.run].name 

941 new_run = self._collections[row.new_run].name 

942 if row.dataset_type_id == row.new_dataset_type_id: 

943 raise ConflictingDefinitionError( 

944 f"Current run {existing_run!r} and new run {new_run!r} do not agree for " 

945 f"dataset {row.dataset_id}." 

946 ) 

947 else: 

948 raise ConflictingDefinitionError( 

949 f"Dataset {row.dataset_id} was provided with type ID {row.new_dataset_type_id} " 

950 f"in run {new_run!r}, but was already defined with type ID " 

951 f"{row.dataset_type_id} in run {run!r}." 

952 ) 

953 

954 # Check that matching dataset in tags table has the same DataId. 

955 query = ( 

956 sqlalchemy.sql.select( 

957 tags.columns.dataset_id, 

958 tags.columns.dataset_type_id.label("type_id"), 

959 tmp_tags.columns.dataset_type_id.label("new_type_id"), 

960 *[tags.columns[dim] for dim in dimensions.required], 

961 *[tmp_tags.columns[dim].label(f"new_{dim}") for dim in dimensions.required], 

962 ) 

963 .select_from(tags.join(tmp_tags, tags.columns.dataset_id == tmp_tags.columns.dataset_id)) 

964 .where( 

965 sqlalchemy.sql.or_( 

966 tags.columns.dataset_type_id != tmp_tags.columns.dataset_type_id, 

967 *[tags.columns[dim] != tmp_tags.columns[dim] for dim in dimensions.required], 

968 ) 

969 ) 

970 .limit(1) 

971 ) 

972 

973 with self._db.query(query) as result: 

974 if (row := result.first()) is not None: 

975 # Only include the first one in the exception message 

976 raise ConflictingDefinitionError( 

977 f"Existing dataset type or dataId do not match new dataset: {row._asdict()}" 

978 ) 

979 

980 # Check that matching run+dataId have the same dataset ID. 

981 query = ( 

982 sqlalchemy.sql.select( 

983 *[tags.columns[dim] for dim in dimensions.required], 

984 tags.columns.dataset_id, 

985 tmp_tags.columns.dataset_id.label("new_dataset_id"), 

986 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"), 

987 tags.columns[collection_fkey_name], 

988 tmp_tags.columns[collection_fkey_name].label(f"new_{collection_fkey_name}"), 

989 ) 

990 .select_from( 

991 tags.join( 

992 tmp_tags, 

993 sqlalchemy.sql.and_( 

994 tags.columns.dataset_type_id == tmp_tags.columns.dataset_type_id, 

995 tags.columns[collection_fkey_name] == tmp_tags.columns[collection_fkey_name], 

996 *[tags.columns[dim] == tmp_tags.columns[dim] for dim in dimensions.required], 

997 ), 

998 ) 

999 ) 

1000 .where(tags.columns.dataset_id != tmp_tags.columns.dataset_id) 

1001 .limit(1) 

1002 ) 

1003 with self._db.query(query) as result: 

1004 # only include the first one in the exception message 

1005 if (row := result.first()) is not None: 

1006 data_id = {dim: getattr(row, dim) for dim in dimensions.required} 

1007 existing_collection = self._collections[getattr(row, collection_fkey_name)].name 

1008 new_collection = self._collections[getattr(row, f"new_{collection_fkey_name}")].name 

1009 raise ConflictingDefinitionError( 

1010 f"Dataset with type ID {row.new_dataset_type_id} and data ID {data_id} " 

1011 f"has ID {row.dataset_id} in existing collection {existing_collection!r} " 

1012 f"but ID {row.new_dataset_id} in new collection {new_collection!r}." 

1013 ) 

1014 

1015 def _import_new( 

1016 self, 

1017 run: RunRecord, 

1018 refs: list[DatasetRef], 

1019 dataset_type_storage: dict[str, _DatasetRecordStorage], 

1020 tags_table: sqlalchemy.Table, 

1021 tags_rows: list[dict[str, object]], 

1022 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime], 

1023 ) -> None: 

1024 static_rows = [ 

1025 { 

1026 "id": ref.id, 

1027 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id, 

1028 self._run_key_column: run.key, 

1029 "ingest_date": timestamp.value, 

1030 } 

1031 for ref in refs 

1032 ] 

1033 with self._db.transaction(): 

1034 self._db.insert(self._static.dataset, *static_rows) 

1035 self._update_summaries(run, refs, dataset_type_storage) 

1036 self._db.insert(tags_table, *tags_rows) 

1037 

1038 def delete(self, datasets: Iterable[DatasetId | DatasetRef]) -> None: 

1039 # Docstring inherited from DatasetRecordStorageManager. 

1040 # Only delete from common dataset table; ON DELETE foreign key clauses 

1041 # will handle the rest. 

1042 self._db.delete( 

1043 self._static.dataset, 

1044 ["id"], 

1045 *[{"id": getattr(dataset, "id", dataset)} for dataset in datasets], 

1046 ) 

1047 

1048 def associate( 

1049 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

1050 ) -> None: 

1051 # Docstring inherited from DatasetRecordStorageManager. 

1052 if (storage := self._find_storage(dataset_type.name)) is None: 

1053 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1054 if collection.type is not CollectionType.TAGGED: 

1055 raise CollectionTypeError( 

1056 f"Cannot associate into collection '{collection.name}' " 

1057 f"of type {collection.type.name}; must be TAGGED." 

1058 ) 

1059 proto_row = { 

1060 self._collections.getCollectionForeignKeyName(): collection.key, 

1061 "dataset_type_id": storage.dataset_type_id, 

1062 } 

1063 rows = [] 

1064 summary = CollectionSummary() 

1065 for dataset in summary.add_datasets_generator(datasets): 

1066 rows.append(dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required)) 

1067 if rows: 

1068 # Update the summary tables for this collection in case this is the 

1069 # first time this dataset type or these governor values will be 

1070 # inserted there. 

1071 self._summaries.update(collection, [storage.dataset_type_id], summary) 

1072 # Update the tag table itself. 

1073 self._db.replace(self._get_tags_table(storage.dynamic_tables), *rows) 

1074 

1075 def disassociate( 

1076 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

1077 ) -> None: 

1078 # Docstring inherited from DatasetRecordStorageManager. 

1079 if (storage := self._find_storage(dataset_type.name)) is None: 

1080 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1081 if collection.type is not CollectionType.TAGGED: 

1082 raise CollectionTypeError( 

1083 f"Cannot disassociate from collection '{collection.name}' " 

1084 f"of type {collection.type.name}; must be TAGGED." 

1085 ) 

1086 rows = [ 

1087 { 

1088 "dataset_id": dataset.id, 

1089 self._collections.getCollectionForeignKeyName(): collection.key, 

1090 } 

1091 for dataset in datasets 

1092 ] 

1093 self._db.delete( 

1094 self._get_tags_table(storage.dynamic_tables), 

1095 ["dataset_id", self._collections.getCollectionForeignKeyName()], 

1096 *rows, 

1097 ) 

1098 

1099 def certify( 

1100 self, 

1101 dataset_type: DatasetType, 

1102 collection: CollectionRecord, 

1103 datasets: Iterable[DatasetRef], 

1104 timespan: Timespan, 

1105 query_func: QueryFactoryFunction, 

1106 ) -> None: 

1107 # Docstring inherited from DatasetRecordStorageManager. 

1108 if (storage := self._find_storage(dataset_type.name)) is None: 

1109 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1110 if not dataset_type.isCalibration(): 

1111 raise DatasetTypeError( 

1112 f"Cannot certify datasets of type {dataset_type.name!r}, for which " 

1113 "DatasetType.isCalibration() is False." 

1114 ) 

1115 if collection.type is not CollectionType.CALIBRATION: 

1116 raise CollectionTypeError( 

1117 f"Cannot certify into collection '{collection.name}' " 

1118 f"of type {collection.type.name}; must be CALIBRATION." 

1119 ) 

1120 TimespanReprClass = self._db.getTimespanRepresentation() 

1121 proto_row = { 

1122 self._collections.getCollectionForeignKeyName(): collection.key, 

1123 "dataset_type_id": storage.dataset_type_id, 

1124 } 

1125 rows = [] 

1126 data_ids: set[DataCoordinate] | None = ( 

1127 set() if not TimespanReprClass.hasExclusionConstraint() else None 

1128 ) 

1129 summary = CollectionSummary() 

1130 for dataset in summary.add_datasets_generator(datasets): 

1131 row = dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required) 

1132 TimespanReprClass.update(timespan, result=row) 

1133 rows.append(row) 

1134 if data_ids is not None: 

1135 data_ids.add(dataset.dataId) 

1136 if not rows: 

1137 # Just in case an empty dataset collection is provided we want to 

1138 # avoid adding dataset type to summary tables. 

1139 return 

1140 # Update the summary tables for this collection in case this is the 

1141 # first time this dataset type or these governor values will be 

1142 # inserted there. 

1143 self._summaries.update(collection, [storage.dataset_type_id], summary) 

1144 # Update the association table itself. 

1145 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1146 if TimespanReprClass.hasExclusionConstraint(): 

1147 # Rely on database constraint to enforce invariants; we just 

1148 # reraise the exception for consistency across DB engines. 

1149 try: 

1150 self._db.insert(calibs_table, *rows) 

1151 except sqlalchemy.exc.IntegrityError as err: 

1152 raise ConflictingDefinitionError( 

1153 f"Validity range conflict certifying datasets of type {dataset_type.name!r} " 

1154 f"into {collection.name!r} for range {timespan}." 

1155 ) from err 

1156 else: 

1157 # Have to implement exclusion constraint ourselves. 

1158 # Acquire a table lock to ensure there are no concurrent writes 

1159 # could invalidate our checking before we finish the inserts. We 

1160 # use a SAVEPOINT in case there is an outer transaction that a 

1161 # failure here should not roll back. 

1162 with self._db.transaction( 

1163 lock=[calibs_table], 

1164 savepoint=True, 

1165 # join_data_coordinates sometimes requires a temp table 

1166 for_temp_tables=True, 

1167 ): 

1168 # Query for any rows that would overlap this one. 

1169 with query_func() as query: 

1170 if data_ids is not None: 

1171 query = query.join_data_coordinates(data_ids) 

1172 timespan_column = query.expression_factory[dataset_type.name].timespan 

1173 result = query.datasets(dataset_type, collection.name, find_first=False).where( 

1174 timespan_column.overlaps(timespan) 

1175 ) 

1176 conflicting = result.count() 

1177 if conflicting > 0: 

1178 raise ConflictingDefinitionError( 

1179 f"{conflicting} validity range conflicts certifying datasets of type " 

1180 f"{dataset_type.name} into {collection.name} for range " 

1181 f"[{timespan.begin}, {timespan.end})." 

1182 ) 

1183 # Proceed with the insert. 

1184 self._db.insert(calibs_table, *rows) 

1185 

1186 def decertify( 

1187 self, 

1188 dataset_type: DatasetType, 

1189 collection: CollectionRecord, 

1190 timespan: Timespan, 

1191 *, 

1192 data_ids: Iterable[DataCoordinate] | None = None, 

1193 query_func: QueryFactoryFunction, 

1194 ) -> None: 

1195 # Docstring inherited from DatasetRecordStorageManager. 

1196 if (storage := self._find_storage(dataset_type.name)) is None: 

1197 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1198 if not dataset_type.isCalibration(): 

1199 raise DatasetTypeError( 

1200 f"Cannot certify datasets of type {dataset_type.name!r}, for which " 

1201 "DatasetType.isCalibration() is False." 

1202 ) 

1203 if collection.type is not CollectionType.CALIBRATION: 

1204 raise CollectionTypeError( 

1205 f"Cannot decertify from collection '{collection.name}' " 

1206 f"of type {collection.type.name}; must be CALIBRATION." 

1207 ) 

1208 TimespanReprClass = self._db.getTimespanRepresentation() 

1209 data_id_set: set[DataCoordinate] | None 

1210 if data_ids is not None: 

1211 data_id_set = set(data_ids) 

1212 else: 

1213 data_id_set = None 

1214 

1215 # Set up collections to populate with the rows we'll want to modify. 

1216 # The insert rows will have the same values for collection and 

1217 # dataset type. 

1218 proto_insert_row = { 

1219 self._collections.getCollectionForeignKeyName(): collection.key, 

1220 "dataset_type_id": storage.dataset_type_id, 

1221 } 

1222 rows_to_delete = [] 

1223 rows_to_insert = [] 

1224 # Acquire a table lock to ensure there are no concurrent writes 

1225 # between the SELECT and the DELETE and INSERT queries based on it. 

1226 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1227 with self._db.transaction(lock=[calibs_table], savepoint=True): 

1228 # Find rows overlapping our inputs. 

1229 with query_func() as query: 

1230 query = query.join_dataset_search(dataset_type, [collection.name]) 

1231 if data_id_set is not None: 

1232 query = query.join_data_coordinates(data_id_set) 

1233 timespan_column = query.expression_factory[dataset_type.name].timespan 

1234 query = query.where(timespan_column.overlaps(timespan)) 

1235 result = query.general( 

1236 dataset_type.dimensions, 

1237 dataset_fields={dataset_type.name: {"dataset_id", "timespan"}}, 

1238 find_first=False, 

1239 )._with_added_dataset_field(dataset_type.name, "calib_pkey") 

1240 

1241 calib_pkey_key = f"{dataset_type.name}.calib_pkey" 

1242 dataset_id_key = f"{dataset_type.name}.dataset_id" 

1243 timespan_key = f"{dataset_type.name}.timespan" 

1244 for row in result.iter_tuples(): 

1245 rows_to_delete.append({"id": row.raw_row[calib_pkey_key]}) 

1246 # Construct the insert row(s) by copying the prototype row, 

1247 # then adding the dimension column values, then adding 

1248 # what's left of the timespan from that row after we 

1249 # subtract the given timespan. 

1250 new_insert_row = proto_insert_row.copy() 

1251 new_insert_row["dataset_id"] = row.raw_row[dataset_id_key] 

1252 for name, value in row.data_id.required.items(): 

1253 new_insert_row[name] = value 

1254 row_timespan = row.raw_row[timespan_key] 

1255 assert row_timespan is not None, "Field should have a NOT NULL constraint." 

1256 for diff_timespan in row_timespan.difference(timespan): 

1257 rows_to_insert.append( 

1258 TimespanReprClass.update(diff_timespan, result=new_insert_row.copy()) 

1259 ) 

1260 # Run the DELETE and INSERT queries. 

1261 self._db.delete(calibs_table, ["id"], *rows_to_delete) 

1262 self._db.insert(calibs_table, *rows_to_insert) 

1263 

1264 def make_joins_builder( 

1265 self, 

1266 dataset_type: DatasetType, 

1267 collections: Sequence[CollectionRecord], 

1268 fields: Set[qt.AnyDatasetFieldName], 

1269 is_union: bool = False, 

1270 ) -> SqlJoinsBuilder: 

1271 if (storage := self._find_storage(dataset_type.name)) is None: 

1272 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1273 # This method largely mimics `make_relation`, but it uses the new query 

1274 # system primitives instead of the old one. In terms of the SQL 

1275 # queries it builds, there are two more main differences: 

1276 # 

1277 # - Collection and run columns are now string names rather than IDs. 

1278 # This insulates the query result-processing code from collection 

1279 # caching and the collection manager subclass details. 

1280 # 

1281 # - The subquery always has unique rows, which is achieved by using 

1282 # SELECT DISTINCT when necessary. 

1283 # 

1284 collection_types = {collection.type for collection in collections} 

1285 assert CollectionType.CHAINED not in collection_types, "CHAINED collections must be flattened." 

1286 # 

1287 # There are two kinds of table in play here: 

1288 # 

1289 # - the static dataset table (with the dataset ID, dataset type ID, 

1290 # run ID/name, and ingest date); 

1291 # 

1292 # - the dynamic tags/calibs table (with the dataset ID, dataset type 

1293 # type ID, collection ID/name, data ID, and possibly validity 

1294 # range). 

1295 # 

1296 # That means that we might want to return a query against either table 

1297 # or a JOIN of both, depending on which quantities the caller wants. 

1298 # But the data ID is always included, which means we'll always include 

1299 # the tags/calibs table and join in the static dataset table only if we 

1300 # need things from it that we can't get from the tags/calibs table. 

1301 # 

1302 # Note that it's important that we include a WHERE constraint on both 

1303 # tables for any column (e.g. dataset_type_id) that is in both when 

1304 # it's given explicitly; not doing can prevent the query planner from 

1305 # using very important indexes. At present, we don't include those 

1306 # redundant columns in the JOIN ON expression, however, because the 

1307 # FOREIGN KEY (and its index) are defined only on dataset_id. 

1308 columns = qt.ColumnSet(dataset_type.dimensions) 

1309 columns.drop_implied_dimension_keys() 

1310 fields_key: str | qt.AnyDatasetType = qt.ANY_DATASET if is_union else dataset_type.name 

1311 columns.dataset_fields[fields_key].update(fields) 

1312 tags_builder: SqlSelectBuilder | None = None 

1313 if collection_types != {CollectionType.CALIBRATION}: 

1314 # We'll need a subquery for the tags table if any of the given 

1315 # collections are not a CALIBRATION collection. This intentionally 

1316 # also fires when the list of collections is empty as a way to 

1317 # create a dummy subquery that we know will fail. 

1318 # We give the table an alias because it might appear multiple times 

1319 # in the same query, for different dataset types. 

1320 tags_table = self._get_tags_table(storage.dynamic_tables).alias( 

1321 f"{dataset_type.name}_tags{'_union' if is_union else ''}" 

1322 ) 

1323 tags_builder = self._finish_query_builder( 

1324 storage, 

1325 SqlJoinsBuilder(db=self._db, from_clause=tags_table).to_select_builder(columns), 

1326 [record for record in collections if record.type is not CollectionType.CALIBRATION], 

1327 fields, 

1328 fields_key, 

1329 ) 

1330 if "timespan" in fields: 

1331 tags_builder.joins.timespans[fields_key] = self._db.getTimespanRepresentation().fromLiteral( 

1332 Timespan(None, None) 

1333 ) 

1334 assert "calib_pkey" not in fields, ( 

1335 "Calibration primary key for internal use only on calibration collections." 

1336 ) 

1337 calibs_builder: SqlSelectBuilder | None = None 

1338 if CollectionType.CALIBRATION in collection_types: 

1339 # If at least one collection is a CALIBRATION collection, we'll 

1340 # need a subquery for the calibs table, and could include the 

1341 # timespan as a result or constraint. 

1342 calibs_table = self._get_calibs_table(storage.dynamic_tables).alias( 

1343 f"{dataset_type.name}_calibs{'_union' if is_union else ''}" 

1344 ) 

1345 calibs_builder = self._finish_query_builder( 

1346 storage, 

1347 SqlJoinsBuilder(db=self._db, from_clause=calibs_table).to_select_builder(columns), 

1348 [record for record in collections if record.type is CollectionType.CALIBRATION], 

1349 fields, 

1350 fields_key, 

1351 ) 

1352 if "timespan" in fields: 

1353 calibs_builder.joins.timespans[fields_key] = ( 

1354 self._db.getTimespanRepresentation().from_columns(calibs_table.columns) 

1355 ) 

1356 if "calib_pkey" in fields: 

1357 calibs_builder.joins.fields[fields_key]["calib_pkey"] = calibs_table.columns["id"] 

1358 

1359 # In calibration collections, we need timespan as well as data ID 

1360 # to ensure unique rows. 

1361 calibs_builder.distinct = calibs_builder.distinct and "timespan" not in fields 

1362 if tags_builder is not None: 

1363 if calibs_builder is not None: 

1364 # Need a UNION subquery. 

1365 return tags_builder.union_subquery([calibs_builder]) 

1366 else: 

1367 return tags_builder.into_joins_builder(postprocessing=None) 

1368 elif calibs_builder is not None: 

1369 return calibs_builder.into_joins_builder(postprocessing=None) 

1370 else: 

1371 raise AssertionError("Branch should be unreachable.") 

1372 

1373 def _finish_query_builder( 

1374 self, 

1375 storage: _DatasetRecordStorage, 

1376 sql_projection: SqlSelectBuilder, 

1377 collections: Sequence[CollectionRecord], 

1378 fields: Set[qt.AnyDatasetFieldName], 

1379 fields_key: str | qt.AnyDatasetType, 

1380 ) -> SqlSelectBuilder: 

1381 # This method plays the same role as _finish_single_relation in the new 

1382 # query system. It is called exactly one or two times by 

1383 # make_sql_builder, just as _finish_single_relation is called exactly 

1384 # one or two times by make_relation. See make_sql_builder comments for 

1385 # what's different. 

1386 assert sql_projection.joins.from_clause is not None 

1387 run_collections_only = all(record.type is CollectionType.RUN for record in collections) 

1388 sql_projection.joins.where( 

1389 sql_projection.joins.from_clause.c.dataset_type_id == storage.dataset_type_id 

1390 ) 

1391 dataset_id_col = sql_projection.joins.from_clause.c.dataset_id 

1392 collection_col = sql_projection.joins.from_clause.c[self._collections.getCollectionForeignKeyName()] 

1393 fields_provided = sql_projection.joins.fields[fields_key] 

1394 # We always constrain and optionally retrieve the collection(s) via the 

1395 # tags/calibs table. 

1396 if "collection_key" in fields: 

1397 sql_projection.joins.fields[fields_key]["collection_key"] = collection_col 

1398 if len(collections) == 1: 

1399 only_collection_record = collections[0] 

1400 sql_projection.joins.where(collection_col == only_collection_record.key) 

1401 if "collection" in fields: 

1402 fields_provided["collection"] = sqlalchemy.literal(only_collection_record.name).cast( 

1403 # This cast is necessary to ensure that Postgres knows the 

1404 # type of this column if it is used in an aggregate 

1405 # function. 

1406 sqlalchemy.String 

1407 ) 

1408 

1409 elif not collections: 

1410 sql_projection.joins.where(sqlalchemy.literal(False)) 

1411 if "collection" in fields: 

1412 fields_provided["collection"] = sqlalchemy.literal("NO COLLECTIONS") 

1413 else: 

1414 sql_projection.joins.where(collection_col.in_([collection.key for collection in collections])) 

1415 if "collection" in fields: 

1416 # Avoid a join to the collection table to get the name by using 

1417 # a CASE statement. The SQL will be a bit more verbose but 

1418 # more efficient. 

1419 fields_provided["collection"] = _create_case_expression_for_collections( 

1420 collections, collection_col 

1421 ) 

1422 # Add more column definitions, starting with the data ID. 

1423 sql_projection.joins.extract_dimensions(storage.dataset_type.dimensions.required) 

1424 # We can always get the dataset_id from the tags/calibs table, even if 

1425 # could also get it from the 'static' dataset table. 

1426 if "dataset_id" in fields: 

1427 fields_provided["dataset_id"] = dataset_id_col 

1428 

1429 # It's possible we now have everything we need, from just the 

1430 # tags/calibs table. The things we might need to get from the static 

1431 # dataset table are the run key and the ingest date. 

1432 need_static_table = False 

1433 need_collection_table = False 

1434 # Ingest date can only come from the static table. 

1435 if "ingest_date" in fields: 

1436 fields_provided["ingest_date"] = self._static.dataset.c.ingest_date 

1437 need_static_table = True 

1438 if "run" in fields: 

1439 if len(collections) == 1 and run_collections_only: 

1440 # If we are searching exactly one RUN collection, we 

1441 # know that if we find the dataset in that collection, 

1442 # then that's the datasets's run; we don't need to 

1443 # query for it. 

1444 # 

1445 fields_provided["run"] = sqlalchemy.literal(only_collection_record.name).cast( 

1446 # This cast is necessary to ensure that Postgres knows the 

1447 # type of this column if it is used in an aggregate 

1448 # function. 

1449 sqlalchemy.String 

1450 ) 

1451 elif run_collections_only: 

1452 # Once again we can avoid joining to the collection table by 

1453 # adding a CASE statement. 

1454 fields_provided["run"] = _create_case_expression_for_collections( 

1455 collections, self._static.dataset.c[self._run_key_column] 

1456 ) 

1457 need_static_table = True 

1458 else: 

1459 # Here we can't avoid a join to the collection table, because 

1460 # we might find a dataset via something other than its RUN 

1461 # collection. 

1462 # 

1463 # We have to defer adding the join until after we have joined 

1464 # in the static dataset table, because the ON clause involves 

1465 # the run collection from the static dataset table. Postgres 

1466 # cares about the join ordering (though SQLite does not.) 

1467 need_collection_table = True 

1468 need_static_table = True 

1469 if need_static_table: 

1470 # If we need the static table, join it in via dataset_id. We don't 

1471 # use SqlJoinsBuilder.join because we're joining on dataset ID, not 

1472 # dimensions. 

1473 sql_projection.joins.from_clause = sql_projection.joins.from_clause.join( 

1474 self._static.dataset, onclause=(dataset_id_col == self._static.dataset.c.id) 

1475 ) 

1476 # Also constrain dataset_type_id in static table in case that helps 

1477 # generate a better plan. We could also include this in the JOIN ON 

1478 # clause, but my guess is that that's a good idea IFF it's in the 

1479 # foreign key, and right now it isn't. 

1480 sql_projection.joins.where(self._static.dataset.c.dataset_type_id == storage.dataset_type_id) 

1481 if need_collection_table: 

1482 # Join the collection table to look up the RUN collection name 

1483 # associated with the dataset. 

1484 ( 

1485 fields_provided["run"], 

1486 sql_projection.joins.from_clause, 

1487 ) = self._collections.lookup_name_sql( 

1488 self._static.dataset.c[self._run_key_column], 

1489 sql_projection.joins.from_clause, 

1490 ) 

1491 

1492 sql_projection.distinct = ( 

1493 # If there are multiple collections, this subquery might have 

1494 # non-unique rows. 

1495 len(collections) > 1 and not fields 

1496 ) 

1497 return sql_projection 

1498 

1499 def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: 

1500 # Docstring inherited. 

1501 if (storage := self._find_storage(dataset_type.name)) is None: 

1502 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1503 with self._db.transaction(): 

1504 # The main issue here is consistency in the presence of concurrent 

1505 # updates (using default READ COMMITTED isolation). Regular clients 

1506 # only add to summary tables, and we want to avoid deleting what 

1507 # other concurrent transactions may add while we are in this 

1508 # transaction. This ordering of operations should guarantee it: 

1509 # - read collections for this dataset type from summary tables, 

1510 # - read collections for this dataset type from dataset tables 

1511 # (both tags and calibs), 

1512 # - whatever is in the first set but not in the second can be 

1513 # dropped from summary tables. 

1514 summary_collection_ids = set(self._summaries.get_collection_ids(storage.dataset_type_id)) 

1515 

1516 # Query datasets tables for associated collections. 

1517 column_name = self._collections.getCollectionForeignKeyName() 

1518 tags_table = self._get_tags_table(storage.dynamic_tables) 

1519 query: sqlalchemy.sql.expression.SelectBase = ( 

1520 sqlalchemy.select(tags_table.columns[column_name]) 

1521 .where(tags_table.columns.dataset_type_id == storage.dataset_type_id) 

1522 .distinct() 

1523 ) 

1524 if dataset_type.isCalibration(): 

1525 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1526 query2 = ( 

1527 sqlalchemy.select(calibs_table.columns[column_name]) 

1528 .where(calibs_table.columns.dataset_type_id == storage.dataset_type_id) 

1529 .distinct() 

1530 ) 

1531 query = sqlalchemy.sql.expression.union(query, query2) 

1532 

1533 with self._db.query(query) as result: 

1534 collection_ids = set(result.scalars()) 

1535 

1536 collections_to_delete = summary_collection_ids - collection_ids 

1537 self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete) 

1538 

1539 def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table: 

1540 return table.tags(self._db, type(self._collections), self._cache.tables) 

1541 

1542 def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table: 

1543 return table.calibs(self._db, type(self._collections), self._cache.tables) 

1544 

1545 

1546def _create_case_expression_for_collections( 

1547 collections: Iterable[CollectionRecord], id_column: sqlalchemy.ColumnElement 

1548) -> sqlalchemy.ColumnElement: 

1549 """Return a SQLAlchemy Case expression that converts collection IDs to 

1550 collection names for the given set of collections. 

1551 

1552 Parameters 

1553 ---------- 

1554 collections : `~collections.abc.Iterable` [ `CollectionRecord` ] 

1555 List of collections to include in conversion table. This should be an 

1556 exhaustive list of collections that could appear in `id_column`. 

1557 id_column : `sqlalchemy.ColumnElement` 

1558 The column containing the collection ID that we want to convert to a 

1559 collection name. 

1560 """ 

1561 mapping = {record.key: record.name for record in collections} 

1562 if not mapping: 

1563 # SQLAlchemy does not correctly handle an empty mapping in case() -- it 

1564 # crashes when trying to compile the expression with an 

1565 # "AttributeError('NoneType' object has no attribute 'dialect_impl')" 

1566 # when trying to access the 'type' property of the Case object. If you 

1567 # explicitly specify a type via type_coerce it instead generates 

1568 # invalid SQL syntax. 

1569 # 

1570 # We can end up with empty mappings here in certain "doomed query" edge 

1571 # cases, e.g. we start with a list of valid collections but they are 

1572 # all filtered out by higher-level code on the basis of collection 

1573 # summaries. 

1574 return sqlalchemy.cast(sqlalchemy.null(), sqlalchemy.String) 

1575 

1576 return sqlalchemy.case(mapping, value=id_column) 

1577 

1578 

1579def _ensure_dimension_groups_match(dataset_types: Iterable[DatasetType]) -> DimensionGroup: 

1580 dimensions = set(dt.dimensions for dt in dataset_types) 

1581 assert len(dimensions) > 0, "At least one dataset type is required" 

1582 if len(dimensions) != 1: 

1583 raise DatasetTypeError( 

1584 "Dataset types have more than one dimension group.\n" 

1585 f"Dataset types: {dataset_types}\n" 

1586 f"Dimension groups: {dimensions}" 

1587 ) 

1588 return dimensions.pop()