Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / _manager.py: 0%

560 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1from __future__ import annotations 

2 

3__all__ = ("ByDimensionsDatasetRecordStorageManagerUUID",) 

4 

5import dataclasses 

6import datetime 

7import logging 

8from collections import defaultdict 

9from collections.abc import Iterable, Mapping, Sequence, Set 

10from typing import TYPE_CHECKING, Any, ClassVar 

11 

12import astropy.time 

13import sqlalchemy 

14 

15from lsst.utils.iteration import chunk_iterable 

16 

17from .... import ddl 

18from ...._collection_type import CollectionType 

19from ...._dataset_ref import DatasetId, DatasetIdFactory, DatasetIdGenEnum, DatasetRef 

20from ...._dataset_type import DatasetType, get_dataset_type_name 

21from ...._exceptions import CollectionTypeError, MissingDatasetTypeError 

22from ...._exceptions_legacy import DatasetTypeError 

23from ...._timespan import Timespan 

24from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse 

25from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only 

26from ....queries import QueryFactoryFunction 

27from ....queries import tree as qt # new query system, both clients + server 

28from ..._caching_context import CachingContext 

29from ..._collection_summary import CollectionSummary 

30from ..._exceptions import ConflictingDefinitionError, DatasetTypeExpressionError, OrphanedRecordError 

31from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple 

32from ...wildcards import DatasetTypeWildcard 

33from ._dataset_type_cache import DatasetTypeCache 

34from .summaries import CollectionSummaryManager 

35from .tables import ( 

36 DynamicTables, 

37 StaticDatasetTableSpecTuple, 

38 StaticDatasetTablesTuple, 

39 addDatasetForeignKey, 

40 makeStaticTableSpecs, 

41 makeTagTableSpec, 

42) 

43 

44if TYPE_CHECKING: 

45 from ...interfaces import ( 

46 CollectionManager, 

47 CollectionRecord, 

48 Database, 

49 DimensionRecordStorageManager, 

50 StaticTablesContext, 

51 ) 

52 

53 

54# This has to be updated on every schema change 

55# TODO: 1.0.0 can be removed once all repos were migrated to 2.0.0. 

56_VERSION_UUID = VersionTuple(1, 0, 0) 

57# Starting with 2.0.0 the `ingest_date` column type uses nanoseconds instead 

58# of TIMESTAMP. The code supports both 1.0.0 and 2.0.0 for the duration of 

59# client migration period. 

60_VERSION_UUID_NS = VersionTuple(2, 0, 0) 

61 

62_LOG = logging.getLogger(__name__) 

63 

64 

65@dataclasses.dataclass 

66class _DatasetTypeRecord: 

67 """Contents of a single dataset type record.""" 

68 

69 dataset_type: DatasetType 

70 dataset_type_id: int 

71 dimensions_key: int 

72 tag_table_name: str 

73 calib_table_name: str | None 

74 

75 def make_dynamic_tables(self) -> DynamicTables: 

76 return DynamicTables( 

77 self.dataset_type.dimensions, self.dimensions_key, self.tag_table_name, self.calib_table_name 

78 ) 

79 

80 def update_dynamic_tables(self, current: DynamicTables) -> DynamicTables: 

81 assert self.dimensions_key == current.dimensions_key 

82 assert self.tag_table_name == current.tags_name 

83 if self.calib_table_name is not None: 

84 if current.calibs_name is not None: 

85 assert self.calib_table_name == current.calibs_name 

86 else: 

87 # Some previously-cached dataset type had the same dimensions 

88 # but was not a calibration. 

89 current = current.copy(calibs_name=self.calib_table_name) 

90 # If some previously-cached dataset type was a calibration but this 

91 # one isn't, we don't want to forget the calibs table. 

92 return current 

93 

94 

95@dataclasses.dataclass 

96class _DatasetRecordStorage: 

97 """Information cached about a dataset type. 

98 

99 This combines information cached with different keys - the dataset type 

100 and its ID are cached by name, while the tables are cached by the dataset 

101 types dimensions (and hence shared with other dataset types that have the 

102 same dimensions). 

103 """ 

104 

105 dataset_type: DatasetType 

106 dataset_type_id: int 

107 dynamic_tables: DynamicTables 

108 

109 

110class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager): 

111 """A manager class for datasets that uses one dataset-collection table for 

112 each group of dataset types that share the same dimensions. 

113 

114 In addition to the table organization, this class makes a number of 

115 other design choices that would have been cumbersome (to say the least) to 

116 try to pack into its name: 

117 

118 - It uses a private surrogate integer autoincrement field to identify 

119 dataset types, instead of using the name as the primary and foreign key 

120 directly. 

121 

122 - It aggressively loads all DatasetTypes into memory instead of fetching 

123 them from the database only when needed or attempting more clever forms 

124 of caching. 

125 

126 Alternative implementations that make different choices for these while 

127 keeping the same general table organization might be reasonable as well. 

128 

129 Parameters 

130 ---------- 

131 db : `Database` 

132 Interface to the underlying database engine and namespace. 

133 collections : `CollectionManager` 

134 Manager object for the collections in this `Registry`. 

135 dimensions : `DimensionRecordStorageManager` 

136 Manager object for the dimensions in this `Registry`. 

137 static : `StaticDatasetTablesTuple` 

138 Named tuple of `sqlalchemy.schema.Table` instances for all static 

139 tables used by this class. 

140 summaries : `CollectionSummaryManager` 

141 Structure containing tables that summarize the contents of collections. 

142 registry_schema_version : `VersionTuple` or `None`, optional 

143 Version of registry schema. 

144 _cache : `None`, optional 

145 For internal use only. 

146 """ 

147 

148 def __init__( 

149 self, 

150 *, 

151 db: Database, 

152 collections: CollectionManager, 

153 dimensions: DimensionRecordStorageManager, 

154 static: StaticDatasetTablesTuple, 

155 summaries: CollectionSummaryManager, 

156 registry_schema_version: VersionTuple | None = None, 

157 _cache: DatasetTypeCache | None = None, 

158 ): 

159 super().__init__(registry_schema_version=registry_schema_version) 

160 self._db = db 

161 self._collections = collections 

162 self._dimensions = dimensions 

163 self._static = static 

164 self._summaries = summaries 

165 self._cache = _cache if _cache is not None else DatasetTypeCache() 

166 self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai 

167 self._run_key_column = collections.getRunForeignKeyName() 

168 

169 _versions: ClassVar[list[VersionTuple]] = [_VERSION_UUID, _VERSION_UUID_NS] 

170 

171 _id_maker: ClassVar[DatasetIdFactory] = DatasetIdFactory() 

172 """Factory for dataset IDs. In the future this factory may be shared with 

173 other classes (e.g. Registry). 

174 """ 

175 

176 @classmethod 

177 def initialize( 

178 cls, 

179 db: Database, 

180 context: StaticTablesContext, 

181 *, 

182 collections: CollectionManager, 

183 dimensions: DimensionRecordStorageManager, 

184 caching_context: CachingContext, 

185 registry_schema_version: VersionTuple | None = None, 

186 ) -> DatasetRecordStorageManager: 

187 # Docstring inherited from DatasetRecordStorageManager. 

188 specs = cls.makeStaticTableSpecs( 

189 type(collections), universe=dimensions.universe, schema_version=registry_schema_version 

190 ) 

191 static: StaticDatasetTablesTuple = context.addTableTuple(specs) # type: ignore 

192 summaries = CollectionSummaryManager.initialize( 

193 db, 

194 context, 

195 collections=collections, 

196 dimensions=dimensions, 

197 dataset_type_table=static.dataset_type, 

198 caching_context=caching_context, 

199 ) 

200 return cls( 

201 db=db, 

202 collections=collections, 

203 dimensions=dimensions, 

204 static=static, 

205 summaries=summaries, 

206 registry_schema_version=registry_schema_version, 

207 ) 

208 

209 @classmethod 

210 def currentVersions(cls) -> list[VersionTuple]: 

211 # Docstring inherited from VersionedExtension. 

212 return cls._versions 

213 

214 @classmethod 

215 def makeStaticTableSpecs( 

216 cls, 

217 collections: type[CollectionManager], 

218 universe: DimensionUniverse, 

219 schema_version: VersionTuple | None, 

220 ) -> StaticDatasetTableSpecTuple: 

221 """Construct all static tables used by the classes in this package. 

222 

223 Static tables are those that are present in all Registries and do not 

224 depend on what DatasetTypes have been registered. 

225 

226 Parameters 

227 ---------- 

228 collections : `CollectionManager` 

229 Manager object for the collections in this `Registry`. 

230 universe : `DimensionUniverse` 

231 Universe graph containing all dimensions known to this `Registry`. 

232 schema_version : `VersionTuple` or `None` 

233 Version of the schema that should be created, if `None` then 

234 default schema should be used. 

235 

236 Returns 

237 ------- 

238 specs : `StaticDatasetTablesTuple` 

239 A named tuple containing `ddl.TableSpec` instances. 

240 """ 

241 schema_version = cls.clsNewSchemaVersion(schema_version) 

242 assert schema_version is not None, "New schema version cannot be None" 

243 return makeStaticTableSpecs( 

244 collections, 

245 universe=universe, 

246 schema_version=schema_version, 

247 ) 

248 

249 @classmethod 

250 def addDatasetForeignKey( 

251 cls, 

252 tableSpec: ddl.TableSpec, 

253 *, 

254 name: str = "dataset", 

255 constraint: bool = True, 

256 onDelete: str | None = None, 

257 **kwargs: Any, 

258 ) -> ddl.FieldSpec: 

259 # Docstring inherited from DatasetRecordStorageManager. 

260 return addDatasetForeignKey(tableSpec, name=name, onDelete=onDelete, constraint=constraint, **kwargs) 

261 

262 @classmethod 

263 def _newDefaultSchemaVersion(cls) -> VersionTuple: 

264 # Docstring inherited from VersionedExtension. 

265 return _VERSION_UUID_NS 

266 

267 def clone( 

268 self, 

269 *, 

270 db: Database, 

271 collections: CollectionManager, 

272 dimensions: DimensionRecordStorageManager, 

273 caching_context: CachingContext, 

274 ) -> ByDimensionsDatasetRecordStorageManagerUUID: 

275 return ByDimensionsDatasetRecordStorageManagerUUID( 

276 db=db, 

277 collections=collections, 

278 dimensions=dimensions, 

279 static=self._static, 

280 summaries=self._summaries.clone(db=db, collections=collections, caching_context=caching_context), 

281 registry_schema_version=self._registry_schema_version, 

282 # See notes on DatasetTypeCache.clone() about cache behavior after 

283 # cloning. 

284 _cache=self._cache.clone(), 

285 ) 

286 

287 def refresh(self) -> None: 

288 # Docstring inherited from DatasetRecordStorageManager. 

289 self._cache.clear() 

290 

291 def remove_dataset_type(self, name: str) -> None: 

292 # Docstring inherited from DatasetRecordStorageManager. 

293 compositeName, componentName = DatasetType.splitDatasetTypeName(name) 

294 if componentName is not None: 

295 raise ValueError(f"Cannot delete a dataset type of a component of a composite (given {name})") 

296 

297 # Delete the row 

298 try: 

299 self._db.delete(self._static.dataset_type, ["name"], {"name": name}) 

300 except sqlalchemy.exc.IntegrityError as e: 

301 raise OrphanedRecordError( 

302 f"Dataset type {name} can not be removed." 

303 " It is associated with datasets that must be removed first." 

304 ) from e 

305 

306 # Now refresh everything -- removal is rare enough that this does 

307 # not need to be fast. 

308 self.refresh() 

309 

310 def get_dataset_type(self, name: str) -> DatasetType: 

311 # Docstring inherited from DatasetRecordStorageManager. 

312 return self._find_storage(name).dataset_type 

313 

314 def register_dataset_type(self, dataset_type: DatasetType) -> bool: 

315 # Docstring inherited from DatasetRecordStorageManager. 

316 # 

317 # This is one of the places where we populate the dataset type cache. 

318 # See the comment in _fetch_dataset_types for how these are related and 

319 # invariants they must maintain. 

320 # 

321 if dataset_type.isComponent(): 

322 raise ValueError( 

323 f"Component dataset types can not be stored in registry. Rejecting {dataset_type.name}" 

324 ) 

325 

326 # If database universe and dimension group universe are different it 

327 # can cause unexpected effects. 

328 if dataset_type.dimensions.universe is not self._dimensions.universe: 

329 raise ValueError( 

330 "Incompatible dimension universe versions - " 

331 f"database universe: {self._dimensions.universe}, " 

332 f"dataset type universe: {dataset_type.dimensions.universe}." 

333 ) 

334 

335 record = self._fetch_dataset_type_record(dataset_type.name) 

336 if record is None: 

337 if (dynamic_tables := self._cache.get_by_dimensions(dataset_type.dimensions)) is None: 

338 dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions) 

339 dynamic_tables = DynamicTables.from_dimensions_key( 

340 dataset_type.dimensions, dimensions_key, dataset_type.isCalibration() 

341 ) 

342 dynamic_tables.create(self._db, type(self._collections), self._cache.tables) 

343 elif dataset_type.isCalibration() and dynamic_tables.calibs_name is None: 

344 dynamic_tables = dynamic_tables.add_calibs( 

345 self._db, type(self._collections), self._cache.tables 

346 ) 

347 row, inserted = self._db.sync( 

348 self._static.dataset_type, 

349 keys={"name": dataset_type.name}, 

350 compared={ 

351 "dimensions_key": dynamic_tables.dimensions_key, 

352 # Force the storage class to be loaded to ensure it 

353 # exists and there is no typo in the name. 

354 "storage_class": dataset_type.storageClass.name, 

355 }, 

356 extra={ 

357 "tag_association_table": dynamic_tables.tags_name, 

358 "calibration_association_table": ( 

359 dynamic_tables.calibs_name if dataset_type.isCalibration() else None 

360 ), 

361 }, 

362 returning=["id", "tag_association_table"], 

363 ) 

364 # Make sure that cache is updated 

365 if row is not None: 

366 self._cache.add(dataset_type, row["id"]) 

367 self._cache.add_by_dimensions(dataset_type.dimensions, dynamic_tables) 

368 else: 

369 if dataset_type != record.dataset_type: 

370 raise ConflictingDefinitionError( 

371 f"Given dataset type {dataset_type} is inconsistent " 

372 f"with database definition {record.dataset_type}." 

373 ) 

374 inserted = False 

375 return bool(inserted) 

376 

377 def resolve_wildcard( 

378 self, 

379 expression: Any, 

380 missing: list[str] | None = None, 

381 explicit_only: bool = False, 

382 ) -> list[DatasetType]: 

383 wildcard = DatasetTypeWildcard.from_expression(expression) 

384 result: list[DatasetType] = [] 

385 for name, dataset_type in wildcard.values.items(): 

386 parent_name, component_name = DatasetType.splitDatasetTypeName(name) 

387 if component_name is not None: 

388 raise DatasetTypeError( 

389 "Component dataset types are not supported in Registry methods; use DatasetRef or " 

390 "DatasetType methods to obtain components from parents instead." 

391 ) 

392 try: 

393 resolved_dataset_type = self.get_dataset_type(parent_name) 

394 except MissingDatasetTypeError: 

395 if missing is not None: 

396 missing.append(name) 

397 else: 

398 if dataset_type is not None: 

399 if dataset_type.is_compatible_with(resolved_dataset_type): 

400 # Prefer the given dataset type to enable storage class 

401 # conversions. 

402 resolved_dataset_type = dataset_type 

403 else: 

404 raise DatasetTypeError( 

405 f"Dataset type definition in query expression {dataset_type} is " 

406 f"not compatible with the registered type {resolved_dataset_type}." 

407 ) 

408 result.append(resolved_dataset_type) 

409 if wildcard.patterns is ...: 

410 if explicit_only: 

411 raise TypeError( 

412 "Universal wildcard '...' is not permitted for dataset types in this context." 

413 ) 

414 for datasetType in self._fetch_dataset_types(): 

415 result.append(datasetType) 

416 elif wildcard.patterns: 

417 if explicit_only: 

418 raise DatasetTypeExpressionError( 

419 "Dataset type wildcard expressions are not supported in this context." 

420 ) 

421 dataset_types = self._fetch_dataset_types() 

422 for datasetType in dataset_types: 

423 if any(p.fullmatch(datasetType.name) for p in wildcard.patterns): 

424 result.append(datasetType) 

425 

426 return result 

427 

428 def get_dataset_refs(self, ids: list[DatasetId]) -> list[DatasetRef]: 

429 dataset_type_map: dict[DatasetId, DatasetType] = {} 

430 for batch in chunk_iterable(set(ids), 50000): 

431 # Look up the dataset types corresponding to the given Dataset IDs. 

432 id_col = self._static.dataset.columns["id"] 

433 sql = sqlalchemy.sql.select( 

434 id_col, 

435 self._static.dataset.columns["dataset_type_id"], 

436 ).where(id_col.in_(batch)) 

437 with self._db.query(sql) as sql_result: 

438 dataset_rows = sql_result.mappings().all() 

439 for row in dataset_rows: 

440 dataset_type_map[row["id"]] = self._get_dataset_type_by_id(row["dataset_type_id"]) 

441 

442 # Group the given dataset IDs by the DimensionGroup of their dataset 

443 # types -- there is a separate tags table for each DimensionGroup. 

444 dimension_groups = defaultdict[DimensionGroup, set[DatasetId]](set) 

445 for id, dataset_type in dataset_type_map.items(): 

446 dimension_groups[dataset_type.dimensions].add(id) 

447 

448 output_refs: list[DatasetRef] = [] 

449 for dimension_group, datasets in dimension_groups.items(): 

450 # Query the tags table for each dimension group to look up the 

451 # data IDs corresponding to the UUIDs found from the dataset table. 

452 dynamic_tables = self._get_dynamic_tables(dimension_group) 

453 tags_table = self._get_tags_table(dynamic_tables) 

454 for batch in chunk_iterable(datasets, 50000): 

455 tags_sql = tags_table.select().where(tags_table.columns["dataset_id"].in_(batch)) 

456 # Join in the collection table to fetch the run name. 

457 collection_column = tags_table.columns[self._collections.getCollectionForeignKeyName()] 

458 joined_collections = self._collections.join_collections_sql(collection_column, tags_sql) 

459 tags_sql = joined_collections.joined_sql 

460 run_name_column = joined_collections.name_column 

461 tags_sql = tags_sql.add_columns(run_name_column) 

462 # Tags table includes run collections and tagged 

463 # collections. 

464 # In theory the data ID for a given dataset should be the 

465 # same in both, but nothing actually guarantees this. 

466 # So skip any tagged collections, using the run collection 

467 # as the definitive definition. 

468 tags_sql = tags_sql.where(joined_collections.type_column == int(CollectionType.RUN)) 

469 

470 with self._db.query(tags_sql) as sql_result: 

471 data_id_rows = sql_result.mappings().all() 

472 

473 assert run_name_column.key is not None 

474 for data_id_row in data_id_rows: 

475 id = data_id_row["dataset_id"] 

476 dataset_type = dataset_type_map[id] 

477 run_name = data_id_row[run_name_column.key] 

478 data_id = DataCoordinate.from_required_values( 

479 dimension_group, 

480 tuple(data_id_row[dimension] for dimension in dimension_group.required), 

481 ) 

482 ref = DatasetRef( 

483 datasetType=dataset_type, 

484 dataId=data_id, 

485 id=id, 

486 run=run_name, 

487 ) 

488 output_refs.append(ref) 

489 

490 return output_refs 

491 

492 def _fetch_dataset_type_record(self, name: str) -> _DatasetTypeRecord | None: 

493 """Retrieve all dataset types defined in database. 

494 

495 Yields 

496 ------ 

497 dataset_types : `_DatasetTypeRecord` 

498 Information from a single database record. 

499 """ 

500 c = self._static.dataset_type.columns 

501 stmt = self._static.dataset_type.select().where(c.name == name) 

502 with self._db.query(stmt) as sql_result: 

503 row = sql_result.mappings().one_or_none() 

504 if row is None: 

505 return None 

506 else: 

507 return self._record_from_row(row) 

508 

509 def _record_from_row(self, row: Mapping) -> _DatasetTypeRecord: 

510 name = row["name"] 

511 dimensions = self._dimensions.load_dimension_group(row["dimensions_key"]) 

512 calibTableName = row["calibration_association_table"] 

513 datasetType = DatasetType( 

514 name, dimensions, row["storage_class"], isCalibration=(calibTableName is not None) 

515 ) 

516 return _DatasetTypeRecord( 

517 dataset_type=datasetType, 

518 dataset_type_id=row["id"], 

519 dimensions_key=row["dimensions_key"], 

520 tag_table_name=row["tag_association_table"], 

521 calib_table_name=calibTableName, 

522 ) 

523 

524 def _dataset_type_from_row(self, row: Mapping) -> DatasetType: 

525 return self._record_from_row(row).dataset_type 

526 

527 def preload_cache(self) -> None: 

528 self._fetch_dataset_types() 

529 

530 def _fetch_dataset_types(self, force_refresh: bool = False) -> list[DatasetType]: 

531 """Fetch list of all defined dataset types.""" 

532 # This is one of two places we populate the dataset type cache: 

533 # 

534 # - This method handles almost all requests for dataset types that 

535 # should already exist. It always marks the cache as "full" in both 

536 # dataset type names and dimensions. 

537 # 

538 # - register_dataset_type handles the case where the dataset type might 

539 # not existing yet. Since it can only add a single dataset type, it 

540 # never changes whether the cache is full. 

541 # 

542 # In both cases, we require that the per-dimensions data be cached 

543 # whenever a dataset type is added to the cache by name, to reduce the 

544 # number of possible states the cache can be in and minimize the number 

545 # of queries. 

546 if self._cache.full and not force_refresh: 

547 return [dataset_type for dataset_type, _ in self._cache.items()] 

548 with self._db.query(self._static.dataset_type.select()) as sql_result: 

549 sql_rows = sql_result.mappings().fetchall() 

550 records = [self._record_from_row(row) for row in sql_rows] 

551 # Cache everything and specify that cache is complete. 

552 cache_data: list[tuple[DatasetType, int]] = [] 

553 cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {} 

554 for record in records: 

555 cache_data.append((record.dataset_type, record.dataset_type_id)) 

556 if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: 

557 tables = record.make_dynamic_tables() 

558 else: 

559 tables = record.update_dynamic_tables(dynamic_tables) 

560 cache_dimensions_data[record.dataset_type.dimensions] = tables 

561 self._cache.set( 

562 cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True 

563 ) 

564 return [record.dataset_type for record in records] 

565 

566 def _get_dataset_type_by_id(self, id: int) -> DatasetType: 

567 dt = self._cache.get_by_id(id) 

568 if dt is None: 

569 # Since the ID is not a concept exposed to the public API, it 

570 # had to have come from a dataset table, and our cache must be 

571 # empty or out of date. 

572 self._fetch_dataset_types(force_refresh=True) 

573 dt = self._cache.get_by_id(id) 

574 if dt is None: 

575 raise RuntimeError(f"Failed to look up dataset type with ID {id}") 

576 return dt 

577 

578 def _find_storage(self, name: str) -> _DatasetRecordStorage: 

579 """Find a dataset type and the extra information needed to work with 

580 it, utilizing and populating the cache as needed. 

581 """ 

582 dataset_type, dataset_type_id = self._cache.get(name) 

583 if dataset_type is not None: 

584 tables = self._get_dynamic_tables(dataset_type.dimensions) 

585 assert dataset_type_id is not None, "Dataset type cache population is incomplete." 

586 return _DatasetRecordStorage( 

587 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables 

588 ) 

589 else: 

590 # On the first cache miss populate the cache with complete list 

591 # of dataset types (if it was not done yet). 

592 if not self._cache.full: 

593 self._fetch_dataset_types() 

594 # Try again 

595 dataset_type, dataset_type_id = self._cache.get(name) 

596 if dataset_type is not None: 

597 tables = self._get_dynamic_tables(dataset_type.dimensions) 

598 assert dataset_type_id is not None, "Dataset type cache population is incomplete." 

599 return _DatasetRecordStorage( 

600 dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables 

601 ) 

602 record = self._fetch_dataset_type_record(name) 

603 if record is not None: 

604 self._cache.add(record.dataset_type, record.dataset_type_id) 

605 tables = record.make_dynamic_tables() 

606 self._cache.add_by_dimensions(record.dataset_type.dimensions, tables) 

607 return _DatasetRecordStorage(record.dataset_type, record.dataset_type_id, tables) 

608 raise MissingDatasetTypeError(f"Dataset type {name!r} does not exist.") 

609 

610 def _get_dynamic_tables(self, dimensions: DimensionGroup) -> DynamicTables: 

611 tables = self._cache.get_by_dimensions(dimensions) 

612 assert tables is not None, ( 

613 "_fetch_dataset_types is supposed to guarantee that the tables cache is populated." 

614 ) 

615 return tables 

616 

617 def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: 

618 # Docstring inherited from DatasetRecordStorageManager. 

619 summaries = self._summaries.fetch_summaries([collection], None, self._dataset_type_from_row) 

620 return summaries[collection.key] 

621 

622 def fetch_summaries( 

623 self, 

624 collections: Iterable[CollectionRecord], 

625 dataset_types: Iterable[DatasetType] | Iterable[str] | None = None, 

626 ) -> Mapping[Any, CollectionSummary]: 

627 # Docstring inherited from DatasetRecordStorageManager. 

628 dataset_type_names: Iterable[str] | None = None 

629 if dataset_types is not None: 

630 dataset_type_names = set(get_dataset_type_name(dt) for dt in dataset_types) 

631 return self._summaries.fetch_summaries(collections, dataset_type_names, self._dataset_type_from_row) 

632 

633 def fetch_run_dataset_ids(self, run: RunRecord) -> list[DatasetId]: 

634 # Docstring inherited. 

635 sql = sqlalchemy.select(self._static.dataset.c.id).where( 

636 self._static.dataset.c[self._run_key_column] == run.key 

637 ) 

638 with self._db.query(sql) as result: 

639 return list(result.scalars()) 

640 

641 def ingest_date_dtype(self) -> type: 

642 """Return type of the ``ingest_date`` column.""" 

643 schema_version = self.newSchemaVersion() 

644 if schema_version is not None and schema_version.major > 1: 

645 return ddl.AstropyTimeNsecTai 

646 else: 

647 return sqlalchemy.TIMESTAMP 

648 

649 def insert( 

650 self, 

651 dataset_type_name: str, 

652 run: RunRecord, 

653 data_ids: Iterable[DataCoordinate], 

654 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

655 ) -> list[DatasetRef]: 

656 # Docstring inherited from DatasetRecordStorageManager. 

657 if (storage := self._find_storage(dataset_type_name)) is None: 

658 raise MissingDatasetTypeError(f"Dataset type {dataset_type_name!r} has not been registered.") 

659 # Current timestamp, type depends on schema version. Use microsecond 

660 # precision for astropy time to keep things consistent with 

661 # TIMESTAMP(6) SQL type. 

662 timestamp: datetime.datetime | astropy.time.Time 

663 if self._use_astropy_ingest_date: 

664 # Astropy `now()` precision should be the same as `now()` which 

665 # should mean microsecond. 

666 timestamp = astropy.time.Time.now() 

667 else: 

668 timestamp = datetime.datetime.now(datetime.UTC) 

669 

670 # Iterate over data IDs, transforming a possibly-single-pass iterable 

671 # into a list. 

672 data_id_list: list[DataCoordinate] = [] 

673 rows = [] 

674 summary = CollectionSummary() 

675 for dataId in summary.add_data_ids_generator(storage.dataset_type, data_ids): 

676 data_id_list.append(dataId) 

677 rows.append( 

678 { 

679 "id": self._id_maker.makeDatasetId( 

680 run.name, storage.dataset_type, dataId, id_generation_mode 

681 ), 

682 "dataset_type_id": storage.dataset_type_id, 

683 self._run_key_column: run.key, 

684 "ingest_date": timestamp, 

685 } 

686 ) 

687 if not rows: 

688 # Just in case an empty collection is provided we want to avoid 

689 # adding dataset type to summary tables. 

690 return [] 

691 

692 with self._db.transaction(): 

693 # Insert into the static dataset table. 

694 self._db.insert(self._static.dataset, *rows) 

695 # Update the summary tables for this collection in case this is the 

696 # first time this dataset type or these governor values will be 

697 # inserted there. 

698 self._summaries.update(run, [storage.dataset_type_id], summary) 

699 # Combine the generated dataset_id values and data ID fields to 

700 # form rows to be inserted into the tags table. 

701 protoTagsRow = { 

702 "dataset_type_id": storage.dataset_type_id, 

703 self._collections.getCollectionForeignKeyName(): run.key, 

704 } 

705 tagsRows = [ 

706 dict(protoTagsRow, dataset_id=row["id"], **dataId.required) 

707 for dataId, row in zip(data_id_list, rows, strict=True) 

708 ] 

709 # Insert those rows into the tags table. 

710 self._db.insert(self._get_tags_table(storage.dynamic_tables), *tagsRows) 

711 

712 return [ 

713 DatasetRef( 

714 datasetType=storage.dataset_type, 

715 dataId=dataId, 

716 id=row["id"], 

717 run=run.name, 

718 ) 

719 for dataId, row in zip(data_id_list, rows, strict=True) 

720 ] 

721 

722 def import_(self, run: RunRecord, refs: list[DatasetRef], assume_new: bool = False) -> None: 

723 # Docstring inherited from DatasetRecordStorageManager. 

724 if not refs: 

725 # Just in case an empty mapping is provided we want to avoid 

726 # adding dataset type to summary tables. 

727 return 

728 assert all(ref.run == run.name for ref in refs), ( 

729 "Run names in refs must match the run we are inserting into" 

730 ) 

731 

732 dataset_types = {ref.datasetType for ref in refs} 

733 dimensions = _ensure_dimension_groups_match(dataset_types) 

734 dataset_type_storage: dict[str, _DatasetRecordStorage] = {} 

735 for dt in dataset_types: 

736 if (storage := self._find_storage(dt.name)) is None: 

737 raise MissingDatasetTypeError(f"Dataset type {dt.name!r} has not been registered.") 

738 dataset_type_storage[dt.name] = storage 

739 

740 dynamic_tables = self._get_dynamic_tables(dimensions) 

741 tags_table = self._get_tags_table(dynamic_tables) 

742 # Current timestamp, type depends on schema version. 

743 if self._use_astropy_ingest_date: 

744 # Astropy `now()` precision should be the same as `now()` which 

745 # should mean microsecond. 

746 timestamp = sqlalchemy.sql.literal(astropy.time.Time.now(), type_=ddl.AstropyTimeNsecTai) 

747 else: 

748 timestamp = sqlalchemy.sql.literal(datetime.datetime.now(datetime.UTC)) 

749 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

750 tags_rows = [ 

751 { 

752 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id, 

753 collection_fkey_name: run.key, 

754 "dataset_id": ref.id, 

755 **ref.dataId.required, 

756 } 

757 for ref in refs 

758 ] 

759 if assume_new: 

760 self._import_new(run, refs, dataset_type_storage, tags_table, tags_rows, timestamp) 

761 else: 

762 self._import_guarded( 

763 run, refs, dimensions, dataset_type_storage, tags_table, tags_rows, timestamp 

764 ) 

765 

766 def _import_guarded( 

767 self, 

768 run: RunRecord, 

769 refs: list[DatasetRef], 

770 dimensions: DimensionGroup, 

771 dataset_type_storage: dict[str, _DatasetRecordStorage], 

772 tags_table: sqlalchemy.Table, 

773 tags_rows: list[dict[str, object]], 

774 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime], 

775 ) -> None: 

776 # We'll insert all new rows into a temporary table 

777 table_spec = makeTagTableSpec(dimensions, type(self._collections), constraints=False) 

778 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

779 with self._db.transaction(for_temp_tables=True), self._db.temporary_table(table_spec) as tmp_tags: 

780 # store all incoming data in a temporary table 

781 self._db.insert(tmp_tags, *tags_rows) 

782 # There are some checks that we want to make for consistency 

783 # of the new datasets with existing ones. 

784 self._validate_import(dimensions, tags_table, tmp_tags, run) 

785 # Before we merge temporary table into dataset/tags we need to 

786 # drop datasets which are already there (and do not conflict). 

787 self._db.deleteWhere( 

788 tmp_tags, 

789 tmp_tags.columns.dataset_id.in_(sqlalchemy.sql.select(self._static.dataset.columns.id)), 

790 ) 

791 # Copy it into dataset table, need to re-label some columns. 

792 self._db.insert( 

793 self._static.dataset, 

794 select=sqlalchemy.sql.select( 

795 tmp_tags.columns.dataset_id.label("id"), 

796 tmp_tags.columns.dataset_type_id, 

797 tmp_tags.columns[collection_fkey_name].label(self._run_key_column), 

798 timestamp.label("ingest_date"), 

799 ), 

800 ) 

801 self._update_summaries(run, refs, dataset_type_storage) 

802 # Copy from temp table into tags table. 

803 self._db.insert(tags_table, select=tmp_tags.select()) 

804 

805 def _update_summaries( 

806 self, run: RunRecord, refs: list[DatasetRef], dataset_type_storage: dict[str, _DatasetRecordStorage] 

807 ) -> None: 

808 summary = CollectionSummary() 

809 summary.add_datasets(refs) 

810 self._summaries.update( 

811 run, [storage.dataset_type_id for storage in dataset_type_storage.values()], summary 

812 ) 

813 

814 def _validate_import( 

815 self, 

816 dimensions: DimensionGroup, 

817 tags: sqlalchemy.schema.Table, 

818 tmp_tags: sqlalchemy.schema.Table, 

819 run: RunRecord, 

820 ) -> None: 

821 """Validate imported refs against existing datasets. 

822 

823 Parameters 

824 ---------- 

825 dimensions : `DimensionGroup` 

826 Dimensions to validate. 

827 tags : `sqlalchemy.schema.Table` 

828 ??? 

829 tmp_tags : `sqlalchemy.schema.Table` 

830 Temporary table with new datasets and the same schema as tags 

831 table. 

832 run : `RunRecord` 

833 The record object describing the `~CollectionType.RUN` collection. 

834 

835 Raises 

836 ------ 

837 ConflictingDefinitionError 

838 Raise if new datasets conflict with existing ones. 

839 """ 

840 dataset = self._static.dataset 

841 collection_fkey_name = self._collections.getCollectionForeignKeyName() 

842 

843 # Check that existing datasets have the same dataset type and 

844 # run. 

845 query = ( 

846 sqlalchemy.sql.select( 

847 dataset.columns.id.label("dataset_id"), 

848 dataset.columns.dataset_type_id.label("dataset_type_id"), 

849 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"), 

850 dataset.columns[self._run_key_column].label("run"), 

851 tmp_tags.columns[collection_fkey_name].label("new_run"), 

852 ) 

853 .select_from(dataset.join(tmp_tags, dataset.columns.id == tmp_tags.columns.dataset_id)) 

854 .where( 

855 sqlalchemy.sql.or_( 

856 dataset.columns.dataset_type_id != tmp_tags.columns.dataset_type_id, 

857 dataset.columns[self._run_key_column] != tmp_tags.columns[collection_fkey_name], 

858 ) 

859 ) 

860 .limit(1) 

861 ) 

862 with self._db.query(query) as result: 

863 # Only include the first one in the exception message 

864 if (row := result.first()) is not None: 

865 existing_run = self._collections[row.run].name 

866 new_run = self._collections[row.new_run].name 

867 if row.dataset_type_id == row.new_dataset_type_id: 

868 raise ConflictingDefinitionError( 

869 f"Current run {existing_run!r} and new run {new_run!r} do not agree for " 

870 f"dataset {row.dataset_id}." 

871 ) 

872 else: 

873 raise ConflictingDefinitionError( 

874 f"Dataset {row.dataset_id} was provided with type ID {row.new_dataset_type_id} " 

875 f"in run {new_run!r}, but was already defined with type ID " 

876 f"{row.dataset_type_id} in run {run!r}." 

877 ) 

878 

879 # Check that matching dataset in tags table has the same DataId. 

880 query = ( 

881 sqlalchemy.sql.select( 

882 tags.columns.dataset_id, 

883 tags.columns.dataset_type_id.label("type_id"), 

884 tmp_tags.columns.dataset_type_id.label("new_type_id"), 

885 *[tags.columns[dim] for dim in dimensions.required], 

886 *[tmp_tags.columns[dim].label(f"new_{dim}") for dim in dimensions.required], 

887 ) 

888 .select_from(tags.join(tmp_tags, tags.columns.dataset_id == tmp_tags.columns.dataset_id)) 

889 .where( 

890 sqlalchemy.sql.or_( 

891 tags.columns.dataset_type_id != tmp_tags.columns.dataset_type_id, 

892 *[tags.columns[dim] != tmp_tags.columns[dim] for dim in dimensions.required], 

893 ) 

894 ) 

895 .limit(1) 

896 ) 

897 

898 with self._db.query(query) as result: 

899 if (row := result.first()) is not None: 

900 # Only include the first one in the exception message 

901 raise ConflictingDefinitionError( 

902 f"Existing dataset type or dataId do not match new dataset: {row._asdict()}" 

903 ) 

904 

905 # Check that matching run+dataId have the same dataset ID. 

906 query = ( 

907 sqlalchemy.sql.select( 

908 *[tags.columns[dim] for dim in dimensions.required], 

909 tags.columns.dataset_id, 

910 tmp_tags.columns.dataset_id.label("new_dataset_id"), 

911 tmp_tags.columns.dataset_type_id.label("new_dataset_type_id"), 

912 tags.columns[collection_fkey_name], 

913 tmp_tags.columns[collection_fkey_name].label(f"new_{collection_fkey_name}"), 

914 ) 

915 .select_from( 

916 tags.join( 

917 tmp_tags, 

918 sqlalchemy.sql.and_( 

919 tags.columns.dataset_type_id == tmp_tags.columns.dataset_type_id, 

920 tags.columns[collection_fkey_name] == tmp_tags.columns[collection_fkey_name], 

921 *[tags.columns[dim] == tmp_tags.columns[dim] for dim in dimensions.required], 

922 ), 

923 ) 

924 ) 

925 .where(tags.columns.dataset_id != tmp_tags.columns.dataset_id) 

926 .limit(1) 

927 ) 

928 with self._db.query(query) as result: 

929 # only include the first one in the exception message 

930 if (row := result.first()) is not None: 

931 data_id = {dim: getattr(row, dim) for dim in dimensions.required} 

932 existing_collection = self._collections[getattr(row, collection_fkey_name)].name 

933 new_collection = self._collections[getattr(row, f"new_{collection_fkey_name}")].name 

934 raise ConflictingDefinitionError( 

935 f"Dataset with type ID {row.new_dataset_type_id} and data ID {data_id} " 

936 f"has ID {row.dataset_id} in existing collection {existing_collection!r} " 

937 f"but ID {row.new_dataset_id} in new collection {new_collection!r}." 

938 ) 

939 

940 def _import_new( 

941 self, 

942 run: RunRecord, 

943 refs: list[DatasetRef], 

944 dataset_type_storage: dict[str, _DatasetRecordStorage], 

945 tags_table: sqlalchemy.Table, 

946 tags_rows: list[dict[str, object]], 

947 timestamp: sqlalchemy.BindParameter[astropy.time.Time | datetime.datetime], 

948 ) -> None: 

949 static_rows = [ 

950 { 

951 "id": ref.id, 

952 "dataset_type_id": dataset_type_storage[ref.datasetType.name].dataset_type_id, 

953 self._run_key_column: run.key, 

954 "ingest_date": timestamp.value, 

955 } 

956 for ref in refs 

957 ] 

958 with self._db.transaction(): 

959 self._db.insert(self._static.dataset, *static_rows) 

960 self._update_summaries(run, refs, dataset_type_storage) 

961 self._db.insert(tags_table, *tags_rows) 

962 

963 def delete(self, datasets: Iterable[DatasetId | DatasetRef]) -> None: 

964 # Docstring inherited from DatasetRecordStorageManager. 

965 # Only delete from common dataset table; ON DELETE foreign key clauses 

966 # will handle the rest. 

967 self._db.delete( 

968 self._static.dataset, 

969 ["id"], 

970 *[{"id": getattr(dataset, "id", dataset)} for dataset in datasets], 

971 ) 

972 

973 def associate( 

974 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

975 ) -> None: 

976 # Docstring inherited from DatasetRecordStorageManager. 

977 if (storage := self._find_storage(dataset_type.name)) is None: 

978 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

979 if collection.type is not CollectionType.TAGGED: 

980 raise CollectionTypeError( 

981 f"Cannot associate into collection '{collection.name}' " 

982 f"of type {collection.type.name}; must be TAGGED." 

983 ) 

984 proto_row = { 

985 self._collections.getCollectionForeignKeyName(): collection.key, 

986 "dataset_type_id": storage.dataset_type_id, 

987 } 

988 rows = [] 

989 summary = CollectionSummary() 

990 for dataset in summary.add_datasets_generator(datasets): 

991 rows.append(dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required)) 

992 if rows: 

993 # Update the summary tables for this collection in case this is the 

994 # first time this dataset type or these governor values will be 

995 # inserted there. 

996 self._summaries.update(collection, [storage.dataset_type_id], summary) 

997 # Update the tag table itself. 

998 self._db.replace(self._get_tags_table(storage.dynamic_tables), *rows) 

999 

1000 def disassociate( 

1001 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

1002 ) -> None: 

1003 # Docstring inherited from DatasetRecordStorageManager. 

1004 if (storage := self._find_storage(dataset_type.name)) is None: 

1005 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1006 if collection.type is not CollectionType.TAGGED: 

1007 raise CollectionTypeError( 

1008 f"Cannot disassociate from collection '{collection.name}' " 

1009 f"of type {collection.type.name}; must be TAGGED." 

1010 ) 

1011 rows = [ 

1012 { 

1013 "dataset_id": dataset.id, 

1014 self._collections.getCollectionForeignKeyName(): collection.key, 

1015 } 

1016 for dataset in datasets 

1017 ] 

1018 self._db.delete( 

1019 self._get_tags_table(storage.dynamic_tables), 

1020 ["dataset_id", self._collections.getCollectionForeignKeyName()], 

1021 *rows, 

1022 ) 

1023 

1024 def certify( 

1025 self, 

1026 dataset_type: DatasetType, 

1027 collection: CollectionRecord, 

1028 datasets: Iterable[DatasetRef], 

1029 timespan: Timespan, 

1030 query_func: QueryFactoryFunction, 

1031 ) -> None: 

1032 # Docstring inherited from DatasetRecordStorageManager. 

1033 if (storage := self._find_storage(dataset_type.name)) is None: 

1034 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1035 if not dataset_type.isCalibration(): 

1036 raise DatasetTypeError( 

1037 f"Cannot certify datasets of type {dataset_type.name!r}, for which " 

1038 "DatasetType.isCalibration() is False." 

1039 ) 

1040 if collection.type is not CollectionType.CALIBRATION: 

1041 raise CollectionTypeError( 

1042 f"Cannot certify into collection '{collection.name}' " 

1043 f"of type {collection.type.name}; must be CALIBRATION." 

1044 ) 

1045 TimespanReprClass = self._db.getTimespanRepresentation() 

1046 proto_row = { 

1047 self._collections.getCollectionForeignKeyName(): collection.key, 

1048 "dataset_type_id": storage.dataset_type_id, 

1049 } 

1050 rows = [] 

1051 data_ids: set[DataCoordinate] | None = ( 

1052 set() if not TimespanReprClass.hasExclusionConstraint() else None 

1053 ) 

1054 summary = CollectionSummary() 

1055 for dataset in summary.add_datasets_generator(datasets): 

1056 row = dict(proto_row, dataset_id=dataset.id, **dataset.dataId.required) 

1057 TimespanReprClass.update(timespan, result=row) 

1058 rows.append(row) 

1059 if data_ids is not None: 

1060 data_ids.add(dataset.dataId) 

1061 if not rows: 

1062 # Just in case an empty dataset collection is provided we want to 

1063 # avoid adding dataset type to summary tables. 

1064 return 

1065 # Update the summary tables for this collection in case this is the 

1066 # first time this dataset type or these governor values will be 

1067 # inserted there. 

1068 self._summaries.update(collection, [storage.dataset_type_id], summary) 

1069 # Update the association table itself. 

1070 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1071 if TimespanReprClass.hasExclusionConstraint(): 

1072 # Rely on database constraint to enforce invariants; we just 

1073 # reraise the exception for consistency across DB engines. 

1074 try: 

1075 self._db.insert(calibs_table, *rows) 

1076 except sqlalchemy.exc.IntegrityError as err: 

1077 raise ConflictingDefinitionError( 

1078 f"Validity range conflict certifying datasets of type {dataset_type.name!r} " 

1079 f"into {collection.name!r} for range {timespan}." 

1080 ) from err 

1081 else: 

1082 # Have to implement exclusion constraint ourselves. 

1083 # Acquire a table lock to ensure there are no concurrent writes 

1084 # could invalidate our checking before we finish the inserts. We 

1085 # use a SAVEPOINT in case there is an outer transaction that a 

1086 # failure here should not roll back. 

1087 with self._db.transaction( 

1088 lock=[calibs_table], 

1089 savepoint=True, 

1090 # join_data_coordinates sometimes requires a temp table 

1091 for_temp_tables=True, 

1092 ): 

1093 # Query for any rows that would overlap this one. 

1094 with query_func() as query: 

1095 if data_ids is not None: 

1096 query = query.join_data_coordinates(data_ids) 

1097 timespan_column = query.expression_factory[dataset_type.name].timespan 

1098 result = query.datasets(dataset_type, collection.name, find_first=False).where( 

1099 timespan_column.overlaps(timespan) 

1100 ) 

1101 conflicting = result.count() 

1102 if conflicting > 0: 

1103 raise ConflictingDefinitionError( 

1104 f"{conflicting} validity range conflicts certifying datasets of type " 

1105 f"{dataset_type.name} into {collection.name} for range " 

1106 f"[{timespan.begin}, {timespan.end})." 

1107 ) 

1108 # Proceed with the insert. 

1109 self._db.insert(calibs_table, *rows) 

1110 

1111 def decertify( 

1112 self, 

1113 dataset_type: DatasetType, 

1114 collection: CollectionRecord, 

1115 timespan: Timespan, 

1116 *, 

1117 data_ids: Iterable[DataCoordinate] | None = None, 

1118 query_func: QueryFactoryFunction, 

1119 ) -> None: 

1120 # Docstring inherited from DatasetRecordStorageManager. 

1121 if (storage := self._find_storage(dataset_type.name)) is None: 

1122 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1123 if not dataset_type.isCalibration(): 

1124 raise DatasetTypeError( 

1125 f"Cannot certify datasets of type {dataset_type.name!r}, for which " 

1126 "DatasetType.isCalibration() is False." 

1127 ) 

1128 if collection.type is not CollectionType.CALIBRATION: 

1129 raise CollectionTypeError( 

1130 f"Cannot decertify from collection '{collection.name}' " 

1131 f"of type {collection.type.name}; must be CALIBRATION." 

1132 ) 

1133 TimespanReprClass = self._db.getTimespanRepresentation() 

1134 data_id_set: set[DataCoordinate] | None 

1135 if data_ids is not None: 

1136 data_id_set = set(data_ids) 

1137 else: 

1138 data_id_set = None 

1139 

1140 # Set up collections to populate with the rows we'll want to modify. 

1141 # The insert rows will have the same values for collection and 

1142 # dataset type. 

1143 proto_insert_row = { 

1144 self._collections.getCollectionForeignKeyName(): collection.key, 

1145 "dataset_type_id": storage.dataset_type_id, 

1146 } 

1147 rows_to_delete = [] 

1148 rows_to_insert = [] 

1149 # Acquire a table lock to ensure there are no concurrent writes 

1150 # between the SELECT and the DELETE and INSERT queries based on it. 

1151 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1152 with self._db.transaction(lock=[calibs_table], savepoint=True): 

1153 # Find rows overlapping our inputs. 

1154 with query_func() as query: 

1155 query = query.join_dataset_search(dataset_type, [collection.name]) 

1156 if data_id_set is not None: 

1157 query = query.join_data_coordinates(data_id_set) 

1158 timespan_column = query.expression_factory[dataset_type.name].timespan 

1159 query = query.where(timespan_column.overlaps(timespan)) 

1160 result = query.general( 

1161 dataset_type.dimensions, 

1162 dataset_fields={dataset_type.name: {"dataset_id", "timespan"}}, 

1163 find_first=False, 

1164 )._with_added_dataset_field(dataset_type.name, "calib_pkey") 

1165 

1166 calib_pkey_key = f"{dataset_type.name}.calib_pkey" 

1167 dataset_id_key = f"{dataset_type.name}.dataset_id" 

1168 timespan_key = f"{dataset_type.name}.timespan" 

1169 for row in result.iter_tuples(): 

1170 rows_to_delete.append({"id": row.raw_row[calib_pkey_key]}) 

1171 # Construct the insert row(s) by copying the prototype row, 

1172 # then adding the dimension column values, then adding 

1173 # what's left of the timespan from that row after we 

1174 # subtract the given timespan. 

1175 new_insert_row = proto_insert_row.copy() 

1176 new_insert_row["dataset_id"] = row.raw_row[dataset_id_key] 

1177 for name, value in row.data_id.required.items(): 

1178 new_insert_row[name] = value 

1179 row_timespan = row.raw_row[timespan_key] 

1180 assert row_timespan is not None, "Field should have a NOT NULL constraint." 

1181 for diff_timespan in row_timespan.difference(timespan): 

1182 rows_to_insert.append( 

1183 TimespanReprClass.update(diff_timespan, result=new_insert_row.copy()) 

1184 ) 

1185 # Run the DELETE and INSERT queries. 

1186 self._db.delete(calibs_table, ["id"], *rows_to_delete) 

1187 self._db.insert(calibs_table, *rows_to_insert) 

1188 

1189 def make_joins_builder( 

1190 self, 

1191 dataset_type: DatasetType, 

1192 collections: Sequence[CollectionRecord], 

1193 fields: Set[qt.AnyDatasetFieldName], 

1194 is_union: bool = False, 

1195 ) -> SqlJoinsBuilder: 

1196 if (storage := self._find_storage(dataset_type.name)) is None: 

1197 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1198 # This method largely mimics `make_relation`, but it uses the new query 

1199 # system primitives instead of the old one. In terms of the SQL 

1200 # queries it builds, there are two more main differences: 

1201 # 

1202 # - Collection and run columns are now string names rather than IDs. 

1203 # This insulates the query result-processing code from collection 

1204 # caching and the collection manager subclass details. 

1205 # 

1206 # - The subquery always has unique rows, which is achieved by using 

1207 # SELECT DISTINCT when necessary. 

1208 # 

1209 collection_types = {collection.type for collection in collections} 

1210 assert CollectionType.CHAINED not in collection_types, "CHAINED collections must be flattened." 

1211 # 

1212 # There are two kinds of table in play here: 

1213 # 

1214 # - the static dataset table (with the dataset ID, dataset type ID, 

1215 # run ID/name, and ingest date); 

1216 # 

1217 # - the dynamic tags/calibs table (with the dataset ID, dataset type 

1218 # type ID, collection ID/name, data ID, and possibly validity 

1219 # range). 

1220 # 

1221 # That means that we might want to return a query against either table 

1222 # or a JOIN of both, depending on which quantities the caller wants. 

1223 # But the data ID is always included, which means we'll always include 

1224 # the tags/calibs table and join in the static dataset table only if we 

1225 # need things from it that we can't get from the tags/calibs table. 

1226 # 

1227 # Note that it's important that we include a WHERE constraint on both 

1228 # tables for any column (e.g. dataset_type_id) that is in both when 

1229 # it's given explicitly; not doing can prevent the query planner from 

1230 # using very important indexes. At present, we don't include those 

1231 # redundant columns in the JOIN ON expression, however, because the 

1232 # FOREIGN KEY (and its index) are defined only on dataset_id. 

1233 columns = qt.ColumnSet(dataset_type.dimensions) 

1234 columns.drop_implied_dimension_keys() 

1235 fields_key: str | qt.AnyDatasetType = qt.ANY_DATASET if is_union else dataset_type.name 

1236 columns.dataset_fields[fields_key].update(fields) 

1237 tags_builder: SqlSelectBuilder | None = None 

1238 if collection_types != {CollectionType.CALIBRATION}: 

1239 # We'll need a subquery for the tags table if any of the given 

1240 # collections are not a CALIBRATION collection. This intentionally 

1241 # also fires when the list of collections is empty as a way to 

1242 # create a dummy subquery that we know will fail. 

1243 # We give the table an alias because it might appear multiple times 

1244 # in the same query, for different dataset types. 

1245 tags_table = self._get_tags_table(storage.dynamic_tables).alias( 

1246 f"{dataset_type.name}_tags{'_union' if is_union else ''}" 

1247 ) 

1248 tags_builder = self._finish_query_builder( 

1249 storage, 

1250 SqlJoinsBuilder(db=self._db, from_clause=tags_table).to_select_builder(columns), 

1251 [record for record in collections if record.type is not CollectionType.CALIBRATION], 

1252 fields, 

1253 fields_key, 

1254 ) 

1255 if "timespan" in fields: 

1256 tags_builder.joins.timespans[fields_key] = self._db.getTimespanRepresentation().fromLiteral( 

1257 Timespan(None, None) 

1258 ) 

1259 assert "calib_pkey" not in fields, ( 

1260 "Calibration primary key for internal use only on calibration collections." 

1261 ) 

1262 calibs_builder: SqlSelectBuilder | None = None 

1263 if CollectionType.CALIBRATION in collection_types: 

1264 # If at least one collection is a CALIBRATION collection, we'll 

1265 # need a subquery for the calibs table, and could include the 

1266 # timespan as a result or constraint. 

1267 calibs_table = self._get_calibs_table(storage.dynamic_tables).alias( 

1268 f"{dataset_type.name}_calibs{'_union' if is_union else ''}" 

1269 ) 

1270 calibs_builder = self._finish_query_builder( 

1271 storage, 

1272 SqlJoinsBuilder(db=self._db, from_clause=calibs_table).to_select_builder(columns), 

1273 [record for record in collections if record.type is CollectionType.CALIBRATION], 

1274 fields, 

1275 fields_key, 

1276 ) 

1277 if "timespan" in fields: 

1278 calibs_builder.joins.timespans[fields_key] = ( 

1279 self._db.getTimespanRepresentation().from_columns(calibs_table.columns) 

1280 ) 

1281 if "calib_pkey" in fields: 

1282 calibs_builder.joins.fields[fields_key]["calib_pkey"] = calibs_table.columns["id"] 

1283 

1284 # In calibration collections, we need timespan as well as data ID 

1285 # to ensure unique rows. 

1286 calibs_builder.distinct = calibs_builder.distinct and "timespan" not in fields 

1287 if tags_builder is not None: 

1288 if calibs_builder is not None: 

1289 # Need a UNION subquery. 

1290 return tags_builder.union_subquery([calibs_builder]) 

1291 else: 

1292 return tags_builder.into_joins_builder(postprocessing=None) 

1293 elif calibs_builder is not None: 

1294 return calibs_builder.into_joins_builder(postprocessing=None) 

1295 else: 

1296 raise AssertionError("Branch should be unreachable.") 

1297 

1298 def _finish_query_builder( 

1299 self, 

1300 storage: _DatasetRecordStorage, 

1301 sql_projection: SqlSelectBuilder, 

1302 collections: Sequence[CollectionRecord], 

1303 fields: Set[qt.AnyDatasetFieldName], 

1304 fields_key: str | qt.AnyDatasetType, 

1305 ) -> SqlSelectBuilder: 

1306 # This method plays the same role as _finish_single_relation in the new 

1307 # query system. It is called exactly one or two times by 

1308 # make_sql_builder, just as _finish_single_relation is called exactly 

1309 # one or two times by make_relation. See make_sql_builder comments for 

1310 # what's different. 

1311 assert sql_projection.joins.from_clause is not None 

1312 run_collections_only = all(record.type is CollectionType.RUN for record in collections) 

1313 sql_projection.joins.where( 

1314 sql_projection.joins.from_clause.c.dataset_type_id == storage.dataset_type_id 

1315 ) 

1316 dataset_id_col = sql_projection.joins.from_clause.c.dataset_id 

1317 collection_col = sql_projection.joins.from_clause.c[self._collections.getCollectionForeignKeyName()] 

1318 fields_provided = sql_projection.joins.fields[fields_key] 

1319 # We always constrain and optionally retrieve the collection(s) via the 

1320 # tags/calibs table. 

1321 if "collection_key" in fields: 

1322 sql_projection.joins.fields[fields_key]["collection_key"] = collection_col 

1323 if len(collections) == 1: 

1324 only_collection_record = collections[0] 

1325 sql_projection.joins.where(collection_col == only_collection_record.key) 

1326 if "collection" in fields: 

1327 fields_provided["collection"] = sqlalchemy.literal(only_collection_record.name).cast( 

1328 # This cast is necessary to ensure that Postgres knows the 

1329 # type of this column if it is used in an aggregate 

1330 # function. 

1331 sqlalchemy.String 

1332 ) 

1333 

1334 elif not collections: 

1335 sql_projection.joins.where(sqlalchemy.literal(False)) 

1336 if "collection" in fields: 

1337 fields_provided["collection"] = sqlalchemy.literal("NO COLLECTIONS") 

1338 else: 

1339 sql_projection.joins.where(collection_col.in_([collection.key for collection in collections])) 

1340 if "collection" in fields: 

1341 # Avoid a join to the collection table to get the name by using 

1342 # a CASE statement. The SQL will be a bit more verbose but 

1343 # more efficient. 

1344 fields_provided["collection"] = _create_case_expression_for_collections( 

1345 collections, collection_col 

1346 ) 

1347 # Add more column definitions, starting with the data ID. 

1348 sql_projection.joins.extract_dimensions(storage.dataset_type.dimensions.required) 

1349 # We can always get the dataset_id from the tags/calibs table, even if 

1350 # could also get it from the 'static' dataset table. 

1351 if "dataset_id" in fields: 

1352 fields_provided["dataset_id"] = dataset_id_col 

1353 

1354 # It's possible we now have everything we need, from just the 

1355 # tags/calibs table. The things we might need to get from the static 

1356 # dataset table are the run key and the ingest date. 

1357 need_static_table = False 

1358 need_collection_table = False 

1359 # Ingest date can only come from the static table. 

1360 if "ingest_date" in fields: 

1361 fields_provided["ingest_date"] = self._static.dataset.c.ingest_date 

1362 need_static_table = True 

1363 if "run" in fields: 

1364 if len(collections) == 1 and run_collections_only: 

1365 # If we are searching exactly one RUN collection, we 

1366 # know that if we find the dataset in that collection, 

1367 # then that's the datasets's run; we don't need to 

1368 # query for it. 

1369 # 

1370 fields_provided["run"] = sqlalchemy.literal(only_collection_record.name).cast( 

1371 # This cast is necessary to ensure that Postgres knows the 

1372 # type of this column if it is used in an aggregate 

1373 # function. 

1374 sqlalchemy.String 

1375 ) 

1376 elif run_collections_only: 

1377 # Once again we can avoid joining to the collection table by 

1378 # adding a CASE statement. 

1379 fields_provided["run"] = _create_case_expression_for_collections( 

1380 collections, self._static.dataset.c[self._run_key_column] 

1381 ) 

1382 need_static_table = True 

1383 else: 

1384 # Here we can't avoid a join to the collection table, because 

1385 # we might find a dataset via something other than its RUN 

1386 # collection. 

1387 # 

1388 # We have to defer adding the join until after we have joined 

1389 # in the static dataset table, because the ON clause involves 

1390 # the run collection from the static dataset table. Postgres 

1391 # cares about the join ordering (though SQLite does not.) 

1392 need_collection_table = True 

1393 need_static_table = True 

1394 if need_static_table: 

1395 # If we need the static table, join it in via dataset_id. We don't 

1396 # use SqlJoinsBuilder.join because we're joining on dataset ID, not 

1397 # dimensions. 

1398 sql_projection.joins.from_clause = sql_projection.joins.from_clause.join( 

1399 self._static.dataset, onclause=(dataset_id_col == self._static.dataset.c.id) 

1400 ) 

1401 # Also constrain dataset_type_id in static table in case that helps 

1402 # generate a better plan. We could also include this in the JOIN ON 

1403 # clause, but my guess is that that's a good idea IFF it's in the 

1404 # foreign key, and right now it isn't. 

1405 sql_projection.joins.where(self._static.dataset.c.dataset_type_id == storage.dataset_type_id) 

1406 if need_collection_table: 

1407 # Join the collection table to look up the RUN collection name 

1408 # associated with the dataset. 

1409 ( 

1410 fields_provided["run"], 

1411 sql_projection.joins.from_clause, 

1412 ) = self._collections.lookup_name_sql( 

1413 self._static.dataset.c[self._run_key_column], 

1414 sql_projection.joins.from_clause, 

1415 ) 

1416 

1417 sql_projection.distinct = ( 

1418 # If there are multiple collections, this subquery might have 

1419 # non-unique rows. 

1420 len(collections) > 1 and not fields 

1421 ) 

1422 return sql_projection 

1423 

1424 def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: 

1425 # Docstring inherited. 

1426 if (storage := self._find_storage(dataset_type.name)) is None: 

1427 raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") 

1428 with self._db.transaction(): 

1429 # The main issue here is consistency in the presence of concurrent 

1430 # updates (using default READ COMMITTED isolation). Regular clients 

1431 # only add to summary tables, and we want to avoid deleting what 

1432 # other concurrent transactions may add while we are in this 

1433 # transaction. This ordering of operations should guarantee it: 

1434 # - read collections for this dataset type from summary tables, 

1435 # - read collections for this dataset type from dataset tables 

1436 # (both tags and calibs), 

1437 # - whatever is in the first set but not in the second can be 

1438 # dropped from summary tables. 

1439 summary_collection_ids = set(self._summaries.get_collection_ids(storage.dataset_type_id)) 

1440 

1441 # Query datasets tables for associated collections. 

1442 column_name = self._collections.getCollectionForeignKeyName() 

1443 tags_table = self._get_tags_table(storage.dynamic_tables) 

1444 query: sqlalchemy.sql.expression.SelectBase = ( 

1445 sqlalchemy.select(tags_table.columns[column_name]) 

1446 .where(tags_table.columns.dataset_type_id == storage.dataset_type_id) 

1447 .distinct() 

1448 ) 

1449 if dataset_type.isCalibration(): 

1450 calibs_table = self._get_calibs_table(storage.dynamic_tables) 

1451 query2 = ( 

1452 sqlalchemy.select(calibs_table.columns[column_name]) 

1453 .where(calibs_table.columns.dataset_type_id == storage.dataset_type_id) 

1454 .distinct() 

1455 ) 

1456 query = sqlalchemy.sql.expression.union(query, query2) 

1457 

1458 with self._db.query(query) as result: 

1459 collection_ids = set(result.scalars()) 

1460 

1461 collections_to_delete = summary_collection_ids - collection_ids 

1462 self._summaries.delete_collections(storage.dataset_type_id, collections_to_delete) 

1463 

1464 def _get_tags_table(self, table: DynamicTables) -> sqlalchemy.Table: 

1465 return table.tags(self._db, type(self._collections), self._cache.tables) 

1466 

1467 def _get_calibs_table(self, table: DynamicTables) -> sqlalchemy.Table: 

1468 return table.calibs(self._db, type(self._collections), self._cache.tables) 

1469 

1470 

1471def _create_case_expression_for_collections( 

1472 collections: Iterable[CollectionRecord], id_column: sqlalchemy.ColumnElement 

1473) -> sqlalchemy.ColumnElement: 

1474 """Return a SQLAlchemy Case expression that converts collection IDs to 

1475 collection names for the given set of collections. 

1476 

1477 Parameters 

1478 ---------- 

1479 collections : `~collections.abc.Iterable` [ `CollectionRecord` ] 

1480 List of collections to include in conversion table. This should be an 

1481 exhaustive list of collections that could appear in `id_column`. 

1482 id_column : `sqlalchemy.ColumnElement` 

1483 The column containing the collection ID that we want to convert to a 

1484 collection name. 

1485 """ 

1486 mapping = {record.key: record.name for record in collections} 

1487 if not mapping: 

1488 # SQLAlchemy does not correctly handle an empty mapping in case() -- it 

1489 # crashes when trying to compile the expression with an 

1490 # "AttributeError('NoneType' object has no attribute 'dialect_impl')" 

1491 # when trying to access the 'type' property of the Case object. If you 

1492 # explicitly specify a type via type_coerce it instead generates 

1493 # invalid SQL syntax. 

1494 # 

1495 # We can end up with empty mappings here in certain "doomed query" edge 

1496 # cases, e.g. we start with a list of valid collections but they are 

1497 # all filtered out by higher-level code on the basis of collection 

1498 # summaries. 

1499 return sqlalchemy.cast(sqlalchemy.null(), sqlalchemy.String) 

1500 

1501 return sqlalchemy.case(mapping, value=id_column) 

1502 

1503 

1504def _ensure_dimension_groups_match(dataset_types: Iterable[DatasetType]) -> DimensionGroup: 

1505 dimensions = set(dt.dimensions for dt in dataset_types) 

1506 assert len(dimensions) > 0, "At least one dataset type is required" 

1507 if len(dimensions) != 1: 

1508 raise DatasetTypeError( 

1509 "Dataset types have more than one dimension group.\n" 

1510 f"Dataset types: {dataset_types}\n" 

1511 f"Dimension groups: {dimensions}" 

1512 ) 

1513 return dimensions.pop()