Coverage for python / lsst / daf / butler / registry / interfaces / _datasets.py: 77%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from ... import ddl 

31 

32__all__ = ("DatasetRecordStorageManager",) 

33 

34from abc import abstractmethod 

35from collections.abc import Iterable, Mapping, Sequence, Set 

36from typing import TYPE_CHECKING, Any 

37 

38from ..._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef 

39from ..._dataset_type import DatasetType 

40from ..._exceptions import DatasetTypeError, DatasetTypeNotSupportedError 

41from ..._timespan import Timespan 

42from ...dimensions import DataCoordinate 

43from ...queries import QueryFactoryFunction 

44from ...queries.tree import AnyDatasetFieldName 

45from ._versioning import VersionedExtension, VersionTuple 

46 

47if TYPE_CHECKING: 

48 from ...direct_query_driver import SqlJoinsBuilder # new query system, server+direct only 

49 from .._caching_context import CachingContext 

50 from .._collection_summary import CollectionSummary 

51 from ._collections import CollectionManager, CollectionRecord, RunRecord 

52 from ._database import Database, StaticTablesContext 

53 from ._dimensions import DimensionRecordStorageManager 

54 

55 

56class DatasetRecordStorageManager(VersionedExtension): 

57 """An interface that manages the tables that describe datasets. 

58 

59 `DatasetRecordStorageManager` primarily serves as a container and factory 

60 for `DatasetRecordStorage` instances, which each provide access to the 

61 records for a different `DatasetType`. 

62 

63 Parameters 

64 ---------- 

65 registry_schema_version : `VersionTuple` or `None`, optional 

66 Version of registry schema. 

67 """ 

68 

69 def __init__(self, *, registry_schema_version: VersionTuple | None = None) -> None: 

70 super().__init__(registry_schema_version=registry_schema_version) 

71 

72 @abstractmethod 

73 def clone( 

74 self, 

75 *, 

76 db: Database, 

77 collections: CollectionManager, 

78 dimensions: DimensionRecordStorageManager, 

79 caching_context: CachingContext, 

80 ) -> DatasetRecordStorageManager: 

81 """Make an independent copy of this manager instance bound to new 

82 instances of `Database` and other managers. 

83 

84 Parameters 

85 ---------- 

86 db : `Database` 

87 New `Database` object to use when instantiating the manager. 

88 collections : `CollectionManager` 

89 New `CollectionManager` object to use when instantiating the 

90 manager. 

91 dimensions : `DimensionRecordStorageManager` 

92 New `DimensionRecordStorageManager` object to use when 

93 instantiating the manager. 

94 caching_context : `CachingContext` 

95 New `CachingContext` object to use when instantiating the manager. 

96 

97 Returns 

98 ------- 

99 instance : `DatasetRecordStorageManager` 

100 New manager instance with the same configuration as this instance, 

101 but bound to a new Database object. 

102 """ 

103 raise NotImplementedError() 

104 

105 @abstractmethod 

106 def preload_cache(self) -> None: 

107 """Fetch data from the database and use it to pre-populate caches to 

108 speed up later operations. 

109 """ 

110 raise NotImplementedError() 

111 

112 @classmethod 

113 @abstractmethod 

114 def initialize( 

115 cls, 

116 db: Database, 

117 context: StaticTablesContext, 

118 *, 

119 collections: CollectionManager, 

120 dimensions: DimensionRecordStorageManager, 

121 caching_context: CachingContext, 

122 config: Mapping, 

123 registry_schema_version: VersionTuple | None = None, 

124 ) -> DatasetRecordStorageManager: 

125 """Construct an instance of the manager. 

126 

127 Parameters 

128 ---------- 

129 db : `Database` 

130 Interface to the underlying database engine and namespace. 

131 context : `StaticTablesContext` 

132 Context object obtained from `Database.declareStaticTables`; used 

133 to declare any tables that should always be present. 

134 collections : `CollectionManager` 

135 Manager object for the collections in this `Registry`. 

136 dimensions : `DimensionRecordStorageManager` 

137 Manager object for the dimensions in this `Registry`. 

138 caching_context : `CachingContext` 

139 Object controlling caching of information returned by managers. 

140 config 

141 Additional configuration for this manager. 

142 registry_schema_version : `VersionTuple` or `None` 

143 Schema version of this extension as defined in registry. 

144 

145 Returns 

146 ------- 

147 manager : `DatasetRecordStorageManager` 

148 An instance of a concrete `DatasetRecordStorageManager` subclass. 

149 """ 

150 raise NotImplementedError() 

151 

152 @classmethod 

153 @abstractmethod 

154 def addDatasetForeignKey( 

155 cls, 

156 tableSpec: ddl.TableSpec, 

157 *, 

158 name: str = "dataset", 

159 constraint: bool = True, 

160 onDelete: str | None = None, 

161 **kwargs: Any, 

162 ) -> ddl.FieldSpec: 

163 """Add a foreign key (field and constraint) referencing the dataset 

164 table. 

165 

166 Parameters 

167 ---------- 

168 tableSpec : `ddl.TableSpec` 

169 Specification for the table that should reference the dataset 

170 table. Will be modified in place. 

171 name : `str`, optional 

172 A name to use for the prefix of the new field; the full name is 

173 ``{name}_id``. 

174 constraint : `bool`, optional 

175 If `False` (`True` is default), add a field that can be joined to 

176 the dataset primary key, but do not add a foreign key constraint. 

177 onDelete : `str`, optional 

178 One of "CASCADE" or "SET NULL", indicating what should happen to 

179 the referencing row if the collection row is deleted. `None` 

180 indicates that this should be an integrity error. 

181 **kwargs 

182 Additional keyword arguments are forwarded to the `ddl.FieldSpec` 

183 constructor (only the ``name`` and ``dtype`` arguments are 

184 otherwise provided). 

185 

186 Returns 

187 ------- 

188 idSpec : `ddl.FieldSpec` 

189 Specification for the ID field. 

190 """ 

191 raise NotImplementedError() 

192 

193 @abstractmethod 

194 def refresh(self) -> None: 

195 """Ensure all other operations on this manager are aware of any 

196 dataset types that may have been registered by other clients since 

197 it was initialized or last refreshed. 

198 """ 

199 raise NotImplementedError() 

200 

201 @abstractmethod 

202 def get_dataset_type(self, name: str) -> DatasetType: 

203 """Look up a dataset type by name. 

204 

205 Parameters 

206 ---------- 

207 name : `str` 

208 Name of a parent dataset type. 

209 

210 Returns 

211 ------- 

212 dataset_type : `DatasetType` 

213 The object representing the records for the given dataset type. 

214 

215 Raises 

216 ------ 

217 MissingDatasetTypeError 

218 Raised if there is no dataset type with the given name. 

219 """ 

220 raise NotImplementedError() 

221 

222 def conform_exact_dataset_type(self, dataset_type: DatasetType | str) -> DatasetType: 

223 """Conform a value that may be a dataset type or dataset type name to 

224 just the dataset type name, while checking that the dataset type is not 

225 a component and (if a `DatasetType` instance is given) has the exact 

226 same definition in the registry. 

227 

228 Parameters 

229 ---------- 

230 dataset_type : `str` or `DatasetType` 

231 Dataset type object or name. 

232 

233 Returns 

234 ------- 

235 dataset_type : `DatasetType` 

236 The corresponding registered dataset type. 

237 

238 Raises 

239 ------ 

240 DatasetTypeError 

241 Raised if ``dataset_type`` is a component, or if its definition 

242 does not exactly match the registered dataset type. 

243 MissingDatasetTypeError 

244 Raised if this dataset type is not registered at all. 

245 """ 

246 if isinstance(dataset_type, DatasetType): 

247 dataset_type_name = dataset_type.name 

248 given_dataset_type = dataset_type 

249 else: 

250 dataset_type_name = dataset_type 

251 given_dataset_type = None 

252 parent_name, component = DatasetType.splitDatasetTypeName(dataset_type_name) 

253 if component is not None: 

254 raise DatasetTypeNotSupportedError( 

255 f"Component dataset {dataset_type_name!r} is not supported in this context." 

256 ) 

257 registered_dataset_type = self.get_dataset_type(dataset_type_name) 

258 if given_dataset_type is not None and registered_dataset_type != given_dataset_type: 

259 raise DatasetTypeError( 

260 f"Given dataset type {given_dataset_type} is not identical to the " 

261 f"registered one {registered_dataset_type}." 

262 ) 

263 return registered_dataset_type 

264 

265 @abstractmethod 

266 def register_dataset_type(self, dataset_type: DatasetType) -> bool: 

267 """Ensure that this `Registry` can hold records for the given 

268 `DatasetType`, creating new tables as necessary. 

269 

270 Parameters 

271 ---------- 

272 dataset_type : `DatasetType` 

273 Dataset type for which a table should created (as necessary) and 

274 an associated `DatasetRecordStorage` returned. 

275 

276 Returns 

277 ------- 

278 inserted : `bool` 

279 `True` if the dataset type did not exist in the registry before. 

280 

281 Notes 

282 ----- 

283 This operation may not be invoked within a `Database.transaction` 

284 context. 

285 """ 

286 raise NotImplementedError() 

287 

288 @abstractmethod 

289 def remove_dataset_type(self, name: str) -> None: 

290 """Remove the dataset type. 

291 

292 Parameters 

293 ---------- 

294 name : `str` 

295 Name of the dataset type. 

296 """ 

297 raise NotImplementedError() 

298 

299 @abstractmethod 

300 def resolve_wildcard( 

301 self, 

302 expression: Any, 

303 missing: list[str] | None = None, 

304 explicit_only: bool = False, 

305 ) -> list[DatasetType]: 

306 """Resolve a dataset type wildcard expression. 

307 

308 Parameters 

309 ---------- 

310 expression : `~typing.Any` 

311 Expression to resolve. Will be passed to 

312 `DatasetTypeWildcard.from_expression`. 

313 missing : `list` of `str`, optional 

314 String dataset type names that were explicitly given (i.e. not 

315 regular expression patterns) but not found will be appended to this 

316 list, if it is provided. 

317 explicit_only : `bool`, optional 

318 If `True`, require explicit `DatasetType` instances or `str` names, 

319 with `re.Pattern` instances deprecated and ``...`` prohibited. 

320 

321 Returns 

322 ------- 

323 dataset_types : `list` [ `DatasetType` ] 

324 A list of resolved dataset types. 

325 """ 

326 raise NotImplementedError() 

327 

328 @abstractmethod 

329 def get_dataset_refs(self, ids: list[DatasetId]) -> list[DatasetRef]: 

330 """ 

331 Return a `DatasetRef` for each of the given dataset UUID values. 

332 

333 Parameters 

334 ---------- 

335 ids : `list` [ `DatasetId` ] 

336 List of UUID instances to look up. 

337 

338 Returns 

339 ------- 

340 refs : `list` [ `DatasetRef` ] 

341 A list containing a `DatasetRef` for each of the given UUIDs that 

342 was found in the database. If a dataset was not found, no error is 

343 thrown -- it is just not included in the list. The returned 

344 datasets are in no particular order. 

345 """ 

346 raise NotImplementedError() 

347 

348 @abstractmethod 

349 def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: 

350 """Return a summary for the given collection. 

351 

352 Parameters 

353 ---------- 

354 collection : `CollectionRecord` 

355 Record describing the collection for which a summary is to be 

356 retrieved. 

357 

358 Returns 

359 ------- 

360 summary : `CollectionSummary` 

361 Summary of the dataset types and governor dimension values in 

362 this collection. 

363 """ 

364 raise NotImplementedError() 

365 

366 @abstractmethod 

367 def fetch_summaries( 

368 self, 

369 collections: Iterable[CollectionRecord], 

370 dataset_types: Iterable[DatasetType] | Iterable[str] | None = None, 

371 ) -> Mapping[Any, CollectionSummary]: 

372 """Fetch collection summaries given their names and dataset types. 

373 

374 Parameters 

375 ---------- 

376 collections : `~collections.abc.Iterable` [`CollectionRecord`] 

377 Collection records to query. 

378 dataset_types : `~collections.abc.Iterable` [`DatasetType`] or `None` 

379 Dataset types to include into returned summaries. If `None` then 

380 all dataset types will be included. 

381 

382 Returns 

383 ------- 

384 summaries : `~collections.abc.Mapping` [`typing.Any`, \ 

385 `CollectionSummary`] 

386 Collection summaries indexed by collection record key. This mapping 

387 will also contain all nested non-chained collections of the chained 

388 collections. 

389 """ 

390 raise NotImplementedError() 

391 

392 @abstractmethod 

393 def fetch_run_dataset_ids(self, run: RunRecord) -> list[DatasetId]: 

394 """Return the IDs of all datasets in the given ``RUN`` 

395 collection. 

396 

397 Parameters 

398 ---------- 

399 run : `RunRecord` 

400 Record describing the collection. 

401 

402 Returns 

403 ------- 

404 dataset_ids : `list` [`uuid.UUID`] 

405 List of dataset IDs. 

406 """ 

407 raise NotImplementedError() 

408 

409 @abstractmethod 

410 def ingest_date_dtype(self) -> type: 

411 """Return type of the ``ingest_date`` column.""" 

412 raise NotImplementedError() 

413 

414 @abstractmethod 

415 def insert( 

416 self, 

417 dataset_type_name: str, 

418 run: RunRecord, 

419 data_ids: Iterable[DataCoordinate], 

420 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

421 ) -> list[DatasetRef]: 

422 """Insert one or more dataset entries into the database. 

423 

424 Parameters 

425 ---------- 

426 dataset_type_name : `str` 

427 Name of the dataset type. 

428 run : `RunRecord` 

429 The record object describing the `~CollectionType.RUN` collection 

430 these datasets will be associated with. 

431 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ] 

432 Expanded data IDs (`DataCoordinate` instances) for the 

433 datasets to be added. The dimensions of all data IDs must be the 

434 same as ``dataset_type.dimensions``. 

435 id_generation_mode : `DatasetIdGenEnum` 

436 With `~DatasetIdGenEnum.UNIQUE` each new dataset is inserted with 

437 its new unique ID. With non-`~DatasetIdGenEnum.UNIQUE` mode the ID 

438 is computed from some combination of dataset type, dataId, and run 

439 collection name; if the same ID is already in the database then new 

440 record is not inserted. 

441 

442 Returns 

443 ------- 

444 datasets : `list` [ `DatasetRef` ] 

445 References to the inserted datasets. 

446 """ 

447 raise NotImplementedError() 

448 

449 @abstractmethod 

450 def import_(self, run: RunRecord, refs: list[DatasetRef], assume_new: bool = False) -> None: 

451 """Insert one or more dataset entries into the database. 

452 

453 Parameters 

454 ---------- 

455 run : `RunRecord` 

456 The record object describing the `~CollectionType.RUN` collection 

457 these datasets will be associated with. 

458 refs : `list` [ `DatasetRef` ] 

459 List of datasets to be be inserted. All of the ``DatasetRef`` 

460 ``run`` attributes must match the ``run`` parameter. 

461 assume_new : `bool`, optional 

462 If `True`, assume all datasets are new and skip conflict resolution 

463 logic. 

464 """ 

465 raise NotImplementedError() 

466 

467 @abstractmethod 

468 def delete(self, datasets: Iterable[DatasetId | DatasetRef]) -> None: 

469 """Fully delete the given datasets from the registry. 

470 

471 Parameters 

472 ---------- 

473 datasets : `~collections.abc.Iterable` [ `DatasetId` or `DatasetRef` ] 

474 Datasets to be deleted. If `DatasetRef` instances are passed, 

475 only the `DatasetRef.id` attribute is used. 

476 """ 

477 raise NotImplementedError() 

478 

479 @abstractmethod 

480 def associate( 

481 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

482 ) -> None: 

483 """Associate one or more datasets with a collection. 

484 

485 Parameters 

486 ---------- 

487 dataset_type : `DatasetType` 

488 Type of all datasets. 

489 collection : `CollectionRecord` 

490 The record object describing the collection. ``collection.type`` 

491 must be `~CollectionType.TAGGED`. 

492 datasets : `~collections.abc.Iterable` [ `DatasetRef` ] 

493 Datasets to be associated. All datasets must have the same 

494 `DatasetType` as ``dataset_type``, but this is not checked. 

495 

496 Notes 

497 ----- 

498 Associating a dataset into collection that already contains a 

499 different dataset with the same `DatasetType` and data ID will remove 

500 the existing dataset from that collection. 

501 

502 Associating the same dataset into a collection multiple times is a 

503 no-op, but is still not permitted on read-only databases. 

504 """ 

505 raise NotImplementedError() 

506 

507 @abstractmethod 

508 def disassociate( 

509 self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] 

510 ) -> None: 

511 """Remove one or more datasets from a collection. 

512 

513 Parameters 

514 ---------- 

515 dataset_type : `DatasetType` 

516 Type of all datasets. 

517 collection : `CollectionRecord` 

518 The record object describing the collection. ``collection.type`` 

519 must be `~CollectionType.TAGGED`. 

520 datasets : `~collections.abc.Iterable` [ `DatasetRef` ] 

521 Datasets to be disassociated. All datasets must have the same 

522 `DatasetType` as ``dataset_type``, but this is not checked. 

523 """ 

524 raise NotImplementedError() 

525 

526 @abstractmethod 

527 def certify( 

528 self, 

529 dataset_type: DatasetType, 

530 collection: CollectionRecord, 

531 datasets: Iterable[DatasetRef], 

532 timespan: Timespan, 

533 query_func: QueryFactoryFunction, 

534 ) -> None: 

535 """Associate one or more datasets with a calibration collection and a 

536 validity range within it. 

537 

538 Parameters 

539 ---------- 

540 dataset_type : `DatasetType` 

541 Type of all datasets. 

542 collection : `CollectionRecord` 

543 The record object describing the collection. ``collection.type`` 

544 must be `~CollectionType.CALIBRATION`. 

545 datasets : `~collections.abc.Iterable` [ `DatasetRef` ] 

546 Datasets to be associated. All datasets must have the same 

547 `DatasetType` as ``dataset_type``, but this is not checked. 

548 timespan : `Timespan` 

549 The validity range for these datasets within the collection. 

550 query_func : `QueryFactoryFunction` 

551 Function returning a context manager that sets up a `Query` object 

552 for querying the registry. (That is, a function equivalent to 

553 ``Butler.query()``). 

554 

555 Raises 

556 ------ 

557 ConflictingDefinitionError 

558 Raised if the collection already contains a different dataset with 

559 the same `DatasetType` and data ID and an overlapping validity 

560 range. 

561 DatasetTypeError 

562 Raised if ``dataset_type.isCalibration() is False``. 

563 CollectionTypeError 

564 Raised if 

565 ``collection.type is not CollectionType.CALIBRATION``. 

566 """ 

567 raise NotImplementedError() 

568 

569 @abstractmethod 

570 def decertify( 

571 self, 

572 dataset_type: DatasetType, 

573 collection: CollectionRecord, 

574 timespan: Timespan, 

575 *, 

576 data_ids: Iterable[DataCoordinate] | None = None, 

577 query_func: QueryFactoryFunction, 

578 ) -> None: 

579 """Remove or adjust datasets to clear a validity range within a 

580 calibration collection. 

581 

582 Parameters 

583 ---------- 

584 dataset_type : `DatasetType` 

585 Type of all datasets. 

586 collection : `CollectionRecord` 

587 The record object describing the collection. ``collection.type`` 

588 must be `~CollectionType.CALIBRATION`. 

589 timespan : `Timespan` 

590 The validity range to remove datasets from within the collection. 

591 Datasets that overlap this range but are not contained by it will 

592 have their validity ranges adjusted to not overlap it, which may 

593 split a single dataset validity range into two. 

594 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ], optional 

595 Data IDs that should be decertified within the given validity range 

596 If `None`, all data IDs for ``dataset_type`` in ``collection`` will 

597 be decertified. 

598 query_func : `QueryFactoryFunction` 

599 Function returning a context manager that sets up a `Query` object 

600 for querying the registry. (That is, a function equivalent to 

601 ``Butler.query()``). 

602 

603 Raises 

604 ------ 

605 DatasetTypeError 

606 Raised if ``dataset_type.isCalibration() is False``. 

607 CollectionTypeError 

608 Raised if 

609 ``collection.type is not CollectionType.CALIBRATION``. 

610 """ 

611 raise NotImplementedError() 

612 

613 @abstractmethod 

614 def make_joins_builder( 

615 self, 

616 dataset_type: DatasetType, 

617 collections: Sequence[CollectionRecord], 

618 fields: Set[AnyDatasetFieldName], 

619 is_union: bool = False, 

620 ) -> SqlJoinsBuilder: 

621 """Make a `lsst.daf.butler.direct_query_driver.SqlJoinsBuilder` 

622 that represents a search for datasets of this type. 

623 

624 Parameters 

625 ---------- 

626 dataset_type : `DatasetType` 

627 Type of dataset to query for. 

628 collections : `~collections.abc.Sequence` [ `CollectionRecord` ] 

629 Collections to search, in order, after filtering out collections 

630 with no datasets of this type via collection summaries. 

631 fields : `~collections.abc.Set` [ `str` ] 

632 Names of fields to make available in the builder. Options include: 

633 

634 - ``dataset_id`` (UUID) 

635 - ``run`` (collection name, `str`) 

636 - ``collection`` (collection name, `str`) 

637 - ``collection_key`` (collection primary key, manager-dependent) 

638 - ``timespan`` (validity range, or unbounded for non-calibrations) 

639 - ``ingest_date`` (time dataset was ingested into repository) 

640 

641 Dimension keys for the dataset type's required dimensions are 

642 always included. 

643 is_union : `bool`, optional 

644 If `True`, this search is being joined in as part of one term in 

645 a union over all dataset types. This causes fields to be added to 

646 the builder via the special ``...`` instad of the dataset type 

647 name. 

648 

649 Returns 

650 ------- 

651 builder : `lsst.daf.butler.direct_query_driver.SqlJoinsBuilder` 

652 A query-construction object representing a table or subquery. 

653 """ 

654 raise NotImplementedError() 

655 

656 @abstractmethod 

657 def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: 

658 """Make sure that collection summaries for this dataset type are 

659 consistent with the contents of the dataset tables. 

660 

661 Parameters 

662 ---------- 

663 dataset_type : `DatasetType` 

664 Dataset type whose summary entries should be refreshed. 

665 """ 

666 raise NotImplementedError()