Coverage for python / lsst / daf / butler / _dataset_ref.py: 30%

314 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = [ 

30 "AmbiguousDatasetError", 

31 "DatasetDatastoreRecords", 

32 "DatasetId", 

33 "DatasetIdFactory", 

34 "DatasetIdGenEnum", 

35 "DatasetRef", 

36 "SerializedDatasetRef", 

37 "SerializedDatasetRefContainerV1", 

38 "SerializedDatasetRefContainers", 

39] 

40 

41import enum 

42import logging 

43import sys 

44import uuid 

45from collections.abc import Callable, Iterable, Mapping 

46from typing import ( 

47 TYPE_CHECKING, 

48 Annotated, 

49 Any, 

50 ClassVar, 

51 Literal, 

52 Protocol, 

53 Self, 

54 TypeAlias, 

55 cast, 

56 runtime_checkable, 

57) 

58 

59import pydantic 

60from pydantic import StrictStr 

61 

62from lsst.utils.classes import immutable 

63 

64from ._config_support import LookupKey 

65from ._dataset_type import DatasetType, SerializedDatasetType 

66from ._named import NamedKeyDict 

67from ._uuid import generate_uuidv7 

68from .datastore.stored_file_info import StoredDatastoreItemInfo 

69from .dimensions import ( 

70 DataCoordinate, 

71 DimensionDataAttacher, 

72 DimensionDataExtractor, 

73 DimensionGroup, 

74 DimensionUniverse, 

75 SerializableDimensionData, 

76 SerializedDataCoordinate, 

77 SerializedDataId, 

78) 

79from .json import from_json_pydantic, to_json_pydantic 

80from .persistence_context import PersistenceContextVars 

81 

82if TYPE_CHECKING: 

83 from ._storage_class import StorageClass 

84 from .registry import Registry 

85 

86# Per-dataset records grouped by opaque table name, usually there is just one 

87# opaque table. 

88DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]] 

89 

90 

91_LOG = logging.getLogger(__name__) 

92 

93 

94class AmbiguousDatasetError(Exception): 

95 """Raised when a `DatasetRef` is not resolved but should be. 

96 

97 This happens when the `DatasetRef` has no ID or run but the requested 

98 operation requires one of them. 

99 """ 

100 

101 

102@runtime_checkable 

103class _DatasetRefGroupedIterable(Protocol): 

104 """A package-private interface for iterables of `DatasetRef` that know how 

105 to efficiently group their contents by `DatasetType`. 

106 

107 """ 

108 

109 def _iter_by_dataset_type(self) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]: 

110 """Iterate over `DatasetRef` instances, one `DatasetType` at a time. 

111 

112 Returns 

113 ------- 

114 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \ 

115 `~collections.abc.Iterable` [ `DatasetRef` ] 

116 An iterable of tuples, in which the first element is a dataset type 

117 and the second is an iterable of `DatasetRef` objects with exactly 

118 that dataset type. 

119 """ 

120 ... 

121 

122 

123class DatasetIdGenEnum(enum.Enum): 

124 """Enum used to specify dataset ID generation options.""" 

125 

126 UNIQUE = 0 

127 """Unique mode generates unique ID for each inserted dataset, e.g. 

128 auto-generated by database or random UUID. 

129 """ 

130 

131 DATAID_TYPE = 1 

132 """In this mode ID is computed deterministically from a combination of 

133 dataset type and dataId. 

134 """ 

135 

136 DATAID_TYPE_RUN = 2 

137 """In this mode ID is computed deterministically from a combination of 

138 dataset type, dataId, and run collection name. 

139 """ 

140 

141 

142class DatasetIdFactory: 

143 """Factory for dataset IDs (UUIDs). 

144 

145 For now the logic is hard-coded and is controlled by the user-provided 

146 value of `DatasetIdGenEnum`. In the future we may implement a configurable 

147 logic that can guess `DatasetIdGenEnum` value from other parameters. 

148 """ 

149 

150 NS_UUID = uuid.UUID("840b31d9-05cd-5161-b2c8-00d32b280d0f") 

151 """Namespace UUID used for UUID5 generation. Do not change. This was 

152 produced by ``uuid.uuid5(uuid.NAMESPACE_DNS, "lsst.org")``. 

153 """ 

154 

155 def makeDatasetId( 

156 self, 

157 run: str, 

158 datasetType: DatasetType, 

159 dataId: DataCoordinate, 

160 idGenerationMode: DatasetIdGenEnum, 

161 ) -> uuid.UUID: 

162 """Generate dataset ID for a dataset. 

163 

164 Parameters 

165 ---------- 

166 run : `str` 

167 Name of the RUN collection for the dataset. 

168 datasetType : `DatasetType` 

169 Dataset type. 

170 dataId : `DataCoordinate` 

171 Expanded data ID for the dataset. 

172 idGenerationMode : `DatasetIdGenEnum` 

173 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random 

174 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a 

175 deterministic UUID5-type ID based on a dataset type name and 

176 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a 

177 deterministic UUID5-type ID based on a dataset type name, run 

178 collection name, and ``dataId``. 

179 

180 Returns 

181 ------- 

182 datasetId : `uuid.UUID` 

183 Dataset identifier. 

184 """ 

185 if idGenerationMode is DatasetIdGenEnum.UNIQUE: 

186 # Earlier versions of this code used UUIDv4. However, totally 

187 # random IDs create problems for Postgres insert performance, 

188 # because it scatters index updates randomly around the disk. 

189 # UUIDv7 has similar uniqueness properties to v4, but IDs generated 

190 # at the same time are close together in the index. 

191 return generate_uuidv7() 

192 else: 

193 # WARNING: If you modify this code make sure that the order of 

194 # items in the `items` list below never changes. 

195 items: list[tuple[str, str]] = [] 

196 if idGenerationMode is DatasetIdGenEnum.DATAID_TYPE: 

197 items = [ 

198 ("dataset_type", datasetType.name), 

199 ] 

200 elif idGenerationMode is DatasetIdGenEnum.DATAID_TYPE_RUN: 

201 items = [ 

202 ("dataset_type", datasetType.name), 

203 ("run", run), 

204 ] 

205 else: 

206 raise ValueError(f"Unexpected ID generation mode: {idGenerationMode}") 

207 

208 for name, value in sorted(dataId.required.items()): 

209 items.append((name, str(value))) 

210 data = ",".join(f"{key}={value}" for key, value in items) 

211 return uuid.uuid5(self.NS_UUID, data) 

212 

213 

214# This is constant, so don't recreate a set for each instance 

215_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"} 

216 

217 

218class SerializedDatasetRef(pydantic.BaseModel): 

219 """Simplified model of a `DatasetRef` suitable for serialization.""" 

220 

221 id: uuid.UUID 

222 datasetType: SerializedDatasetType | None = None 

223 dataId: SerializedDataCoordinate | None = None 

224 run: StrictStr | None = None 

225 component: StrictStr | None = None 

226 

227 # Can not use "after" validator since in some cases the validator 

228 # seems to trigger with the datasetType field not yet set. 

229 @pydantic.model_validator(mode="before") # type: ignore[attr-defined] 

230 @classmethod 

231 def check_consistent_parameters(cls, data: dict[str, Any]) -> dict[str, Any]: 

232 has_datasetType = data.get("datasetType") is not None 

233 has_dataId = data.get("dataId") is not None 

234 if has_datasetType is not has_dataId: 

235 raise ValueError("If specifying datasetType or dataId, must specify both.") 

236 

237 if data.get("component") is not None and has_datasetType: 

238 raise ValueError("datasetType can not be set if component is given.") 

239 return data 

240 

241 @classmethod 

242 def direct( 

243 cls, 

244 *, 

245 id: str, 

246 run: str, 

247 datasetType: dict[str, Any] | None = None, 

248 dataId: dict[str, Any] | None = None, 

249 component: str | None = None, 

250 ) -> SerializedDatasetRef: 

251 """Construct a `SerializedDatasetRef` directly without validators. 

252 

253 Parameters 

254 ---------- 

255 id : `str` 

256 The UUID in string form. 

257 run : `str` 

258 The run for this dataset. 

259 datasetType : `dict` [`str`, `typing.Any`] 

260 A representation of the dataset type. 

261 dataId : `dict` [`str`, `typing.Any`] 

262 A representation of the data ID. 

263 component : `str` or `None` 

264 Any component associated with this ref. 

265 

266 Returns 

267 ------- 

268 serialized : `SerializedDatasetRef` 

269 A Pydantic model representing the given parameters. 

270 

271 Notes 

272 ----- 

273 This differs from the pydantic "construct" method in that the arguments 

274 are explicitly what the model requires, and it will recurse through 

275 members, constructing them from their corresponding `direct` methods. 

276 

277 The ``id`` parameter is a string representation of dataset ID, it is 

278 converted to UUID by this method. 

279 

280 This method should only be called when the inputs are trusted. 

281 """ 

282 serialized_datasetType = ( 

283 SerializedDatasetType.direct(**datasetType) if datasetType is not None else None 

284 ) 

285 serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None 

286 

287 node = cls.model_construct( 

288 _fields_set=_serializedDatasetRefFieldsSet, 

289 id=uuid.UUID(id), 

290 datasetType=serialized_datasetType, 

291 dataId=serialized_dataId, 

292 run=sys.intern(run), 

293 component=component, 

294 ) 

295 

296 return node 

297 

298 

299DatasetId: TypeAlias = uuid.UUID 

300"""A type-annotation alias for dataset ID providing typing flexibility. 

301""" 

302 

303 

304@immutable 

305class DatasetRef: 

306 """Reference to a Dataset in a `Registry`. 

307 

308 A `DatasetRef` may point to a Dataset that currently does not yet exist 

309 (e.g., because it is a predicted input for provenance). 

310 

311 Parameters 

312 ---------- 

313 datasetType : `DatasetType` 

314 The `DatasetType` for this Dataset. 

315 dataId : `DataCoordinate` 

316 A mapping of dimensions that labels the Dataset within a Collection. 

317 run : `str` 

318 The name of the run this dataset was associated with when it was 

319 created. 

320 id : `DatasetId`, optional 

321 The unique identifier assigned when the dataset is created. If ``id`` 

322 is not specified, a new unique ID will be created. 

323 conform : `bool`, optional 

324 If `True` (default), call `DataCoordinate.standardize` to ensure that 

325 the data ID's dimensions are consistent with the dataset type's. 

326 `DatasetRef` instances for which those dimensions are not equal should 

327 not be created in new code, but are still supported for backwards 

328 compatibility. New code should only pass `False` if it can guarantee 

329 that the dimensions are already consistent. 

330 id_generation_mode : `DatasetIdGenEnum` 

331 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random 

332 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a 

333 deterministic UUID5-type ID based on a dataset type name and 

334 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a 

335 deterministic UUID5-type ID based on a dataset type name, run 

336 collection name, and ``dataId``. 

337 datastore_records : `DatasetDatastoreRecords` or `None` 

338 Datastore records to attach. 

339 

340 Notes 

341 ----- 

342 See also :ref:`daf_butler_organizing_datasets` 

343 """ 

344 

345 _serializedType: ClassVar[type[pydantic.BaseModel]] = SerializedDatasetRef 

346 __slots__ = ( 

347 "_id", 

348 "datasetType", 

349 "dataId", 

350 "run", 

351 "_datastore_records", 

352 ) 

353 

354 def __init__( 

355 self, 

356 datasetType: DatasetType, 

357 dataId: DataCoordinate, 

358 run: str, 

359 *, 

360 id: DatasetId | None = None, 

361 conform: bool = True, 

362 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

363 datastore_records: DatasetDatastoreRecords | None = None, 

364 ): 

365 self.datasetType = datasetType 

366 if conform: 

367 self.dataId = DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions) 

368 else: 

369 self.dataId = dataId 

370 self.run = run 

371 if id is not None: 

372 self._id = id.int 

373 else: 

374 self._id = ( 

375 DatasetIdFactory() 

376 .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode) 

377 .int 

378 ) 

379 self._datastore_records = datastore_records 

380 

381 @property 

382 def id(self) -> DatasetId: 

383 """Primary key of the dataset (`DatasetId`). 

384 

385 Cannot be changed after a `DatasetRef` is constructed. 

386 """ 

387 return uuid.UUID(int=self._id) 

388 

389 def __eq__(self, other: Any) -> bool: 

390 try: 

391 return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id) 

392 except AttributeError: 

393 return NotImplemented 

394 

395 def __hash__(self) -> int: 

396 return hash((self.datasetType, self.dataId, self.id)) 

397 

398 @property 

399 def dimensions(self) -> DimensionGroup: 

400 """Dimensions associated with the underlying `DatasetType`.""" 

401 return self.datasetType.dimensions 

402 

403 def __repr__(self) -> str: 

404 # We delegate to __str__ (i.e use "!s") for the data ID) below because 

405 # DataCoordinate's __repr__ - while adhering to the guidelines for 

406 # __repr__ - is much harder to users to read, while its __str__ just 

407 # produces a dict that can also be passed to DatasetRef's constructor. 

408 return f"DatasetRef({self.datasetType!r}, {self.dataId!s}, run={self.run!r}, id={self.id})" 

409 

410 def __str__(self) -> str: 

411 s = ( 

412 f"{self.datasetType.name}@{self.dataId!s} [sc={self.datasetType.storageClass_name}]" 

413 f" (run={self.run} id={self.id})" 

414 ) 

415 return s 

416 

417 def __lt__(self, other: Any) -> bool: 

418 # Sort by run, DatasetType name and then by DataCoordinate 

419 # The __str__ representation is probably close enough but we 

420 # need to ensure that sorting a DatasetRef matches what you would 

421 # get if you sorted DatasetType+DataCoordinate 

422 if not isinstance(other, type(self)): 

423 return NotImplemented 

424 

425 # Group by run if defined, takes precedence over DatasetType 

426 self_run = "" if self.run is None else self.run 

427 other_run = "" if other.run is None else other.run 

428 

429 # Compare tuples in the priority order 

430 return (self_run, self.datasetType, self.dataId) < (other_run, other.datasetType, other.dataId) 

431 

432 def to_simple(self, minimal: bool = False) -> SerializedDatasetRef: 

433 """Convert this class to a simple python type. 

434 

435 This makes it suitable for serialization. 

436 

437 Parameters 

438 ---------- 

439 minimal : `bool`, optional 

440 Use minimal serialization. Requires Registry to convert 

441 back to a full type. 

442 

443 Returns 

444 ------- 

445 simple : `dict` or `int` 

446 The object converted to a dictionary. 

447 """ 

448 if minimal: 

449 # The only thing needed to uniquely define a DatasetRef is its id 

450 # so that can be used directly if it is not a component DatasetRef. 

451 # Store is in a dict to allow us to easily add the planned origin 

452 # information later without having to support an int and dict in 

453 # simple form. 

454 simple: dict[str, Any] = {"id": self.id} 

455 if self.isComponent(): 

456 # We can still be a little minimalist with a component 

457 # but we will also need to record the datasetType component 

458 simple["component"] = self.datasetType.component() 

459 return SerializedDatasetRef(**simple) 

460 

461 return SerializedDatasetRef( 

462 datasetType=self.datasetType.to_simple(minimal=minimal), 

463 dataId=self.dataId.to_simple(), 

464 run=self.run, 

465 id=self.id, 

466 ) 

467 

468 @classmethod 

469 def from_simple( 

470 cls, 

471 simple: SerializedDatasetRef, 

472 universe: DimensionUniverse | None = None, 

473 registry: Registry | None = None, 

474 datasetType: DatasetType | None = None, 

475 ) -> DatasetRef: 

476 """Construct a new object from simplified form. 

477 

478 Generally this is data returned from the `to_simple` method. 

479 

480 Parameters 

481 ---------- 

482 simple : `dict` of [`str`, `typing.Any`] 

483 The value returned by `to_simple()`. 

484 universe : `DimensionUniverse` 

485 The special graph of all known dimensions. 

486 Can be `None` if a registry is provided. 

487 registry : `lsst.daf.butler.Registry`, optional 

488 Registry to use to convert simple form of a DatasetRef to 

489 a full `DatasetRef`. Can be `None` if a full description of 

490 the type is provided along with a universe. 

491 datasetType : DatasetType, optional 

492 If datasetType is supplied, this will be used as the datasetType 

493 object in the resulting DatasetRef instead of being read from 

494 the `SerializedDatasetRef`. This is useful when many refs share 

495 the same type as memory can be saved. Defaults to None. 

496 

497 Returns 

498 ------- 

499 ref : `DatasetRef` 

500 Newly-constructed object. 

501 """ 

502 cache = PersistenceContextVars.datasetRefs.get() 

503 key = simple.id.int 

504 if cache is not None and (ref := cache.get(key, None)) is not None: 

505 if datasetType is not None: 

506 if (component := datasetType.component()) is not None: 

507 ref = ref.makeComponentRef(component) 

508 ref = ref.overrideStorageClass(datasetType.storageClass_name) 

509 return ref 

510 if simple.datasetType is not None: 

511 _, component = DatasetType.splitDatasetTypeName(simple.datasetType.name) 

512 if component is not None: 

513 ref = ref.makeComponentRef(component) 

514 if simple.datasetType.storageClass is not None: 

515 ref = ref.overrideStorageClass(simple.datasetType.storageClass) 

516 return ref 

517 # If dataset type is not given ignore the cache, because we can't 

518 # reliably return the right storage class. 

519 # Minimalist component will just specify component and id and 

520 # require registry to reconstruct 

521 if simple.datasetType is None and simple.dataId is None and simple.run is None: 

522 if registry is None: 

523 raise ValueError("Registry is required to construct component DatasetRef from integer id") 

524 if simple.id is None: 

525 raise ValueError("For minimal DatasetRef the ID must be defined.") 

526 ref = registry.getDataset(simple.id) 

527 if ref is None: 

528 raise RuntimeError(f"No matching dataset found in registry for id {simple.id}") 

529 if simple.component: 

530 ref = ref.makeComponentRef(simple.component) 

531 else: 

532 if universe is None: 

533 if registry is None: 

534 raise ValueError("One of universe or registry must be provided.") 

535 universe = registry.dimensions 

536 if datasetType is None: 

537 if simple.datasetType is None: 

538 raise ValueError("Cannot determine Dataset type of this serialized class") 

539 datasetType = DatasetType.from_simple( 

540 simple.datasetType, universe=universe, registry=registry 

541 ) 

542 if simple.dataId is None: 

543 # mypy 

544 raise ValueError("The DataId must be specified to construct a DatasetRef") 

545 dataId = DataCoordinate.from_simple(simple.dataId, universe=universe) 

546 # Check that simple ref is resolved. 

547 if simple.run is None: 

548 dstr = "" 

549 if simple.datasetType is None: 

550 dstr = f" (datasetType={datasetType.name!r})" 

551 raise ValueError( 

552 "Run collection name is missing from serialized representation. " 

553 f"Encountered with {simple!r}{dstr}." 

554 ) 

555 ref = cls( 

556 datasetType, 

557 dataId, 

558 id=simple.id, 

559 run=simple.run, 

560 ) 

561 if cache is not None: 

562 if ref.datasetType.component() is not None: 

563 cache[key] = ref.makeCompositeRef() 

564 else: 

565 cache[key] = ref 

566 return ref 

567 

568 to_json = to_json_pydantic 

569 from_json: ClassVar[Callable[..., Self]] = cast(Callable[..., Self], classmethod(from_json_pydantic)) 

570 

571 @classmethod 

572 def _unpickle( 

573 cls, 

574 datasetType: DatasetType, 

575 dataId: DataCoordinate, 

576 id: DatasetId, 

577 run: str, 

578 datastore_records: DatasetDatastoreRecords | None, 

579 ) -> DatasetRef: 

580 """Create new `DatasetRef`. 

581 

582 A custom factory method for use by `__reduce__` as a workaround for 

583 its lack of support for keyword arguments. 

584 """ 

585 return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records) 

586 

587 def __reduce__(self) -> tuple: 

588 return ( 

589 self._unpickle, 

590 (self.datasetType, self.dataId, self.id, self.run, self._datastore_records), 

591 ) 

592 

593 def __deepcopy__(self, memo: dict) -> DatasetRef: 

594 # DatasetRef is recursively immutable; see note in @immutable 

595 # decorator. 

596 return self 

597 

598 def expanded(self, dataId: DataCoordinate) -> DatasetRef: 

599 """Return a new `DatasetRef` with the given expanded data ID. 

600 

601 Parameters 

602 ---------- 

603 dataId : `DataCoordinate` 

604 Data ID for the new `DatasetRef`. Must compare equal to the 

605 original data ID. 

606 

607 Returns 

608 ------- 

609 ref : `DatasetRef` 

610 A new `DatasetRef` with the given data ID. 

611 """ 

612 assert dataId == self.dataId 

613 return DatasetRef( 

614 datasetType=self.datasetType, 

615 dataId=dataId, 

616 id=self.id, 

617 run=self.run, 

618 conform=False, 

619 datastore_records=self._datastore_records, 

620 ) 

621 

622 def isComponent(self) -> bool: 

623 """Indicate whether this `DatasetRef` refers to a component. 

624 

625 Returns 

626 ------- 

627 isComponent : `bool` 

628 `True` if this `DatasetRef` is a component, `False` otherwise. 

629 """ 

630 return self.datasetType.isComponent() 

631 

632 def isComposite(self) -> bool: 

633 """Boolean indicating whether this `DatasetRef` is a composite type. 

634 

635 Returns 

636 ------- 

637 isComposite : `bool` 

638 `True` if this `DatasetRef` is a composite type, `False` 

639 otherwise. 

640 """ 

641 return self.datasetType.isComposite() 

642 

643 def _lookupNames(self) -> tuple[LookupKey, ...]: 

644 """Name keys to use when looking up this DatasetRef in a configuration. 

645 

646 The names are returned in order of priority. 

647 

648 Returns 

649 ------- 

650 names : `tuple` of `LookupKey` 

651 Tuple of the `DatasetType` name and the `StorageClass` name. 

652 If ``instrument`` is defined in the dataId, each of those names 

653 is added to the start of the tuple with a key derived from the 

654 value of ``instrument``. 

655 """ 

656 # Special case the instrument Dimension since we allow configs 

657 # to include the instrument name in the hierarchy. 

658 names: tuple[LookupKey, ...] = self.datasetType._lookupNames() 

659 

660 if "instrument" in self.dataId: 

661 names = tuple(n.clone(dataId={"instrument": self.dataId["instrument"]}) for n in names) + names 

662 

663 return names 

664 

665 @staticmethod 

666 def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[DatasetRef]]: 

667 """Group an iterable of `DatasetRef` by `DatasetType`. 

668 

669 Parameters 

670 ---------- 

671 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

672 `DatasetRef` instances to group. 

673 

674 Returns 

675 ------- 

676 grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ] 

677 Grouped `DatasetRef` instances. 

678 

679 Notes 

680 ----- 

681 When lazy item-iterables are acceptable instead of a full mapping, 

682 `iter_by_type` can in some cases be far more efficient. 

683 """ 

684 result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict() 

685 for ref in refs: 

686 result.setdefault(ref.datasetType, []).append(ref) 

687 return result 

688 

689 @staticmethod 

690 def iter_by_type( 

691 refs: Iterable[DatasetRef], 

692 ) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]: 

693 """Group an iterable of `DatasetRef` by `DatasetType` with special 

694 hooks for custom iterables that can do this efficiently. 

695 

696 Parameters 

697 ---------- 

698 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

699 `DatasetRef` instances to group. If this satisfies the 

700 `_DatasetRefGroupedIterable` protocol, its 

701 `~_DatasetRefGroupedIterable._iter_by_dataset_type` method will 

702 be called. 

703 

704 Returns 

705 ------- 

706 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \ 

707 `~collections.abc.Iterable` [ `DatasetRef` ] ]] 

708 Grouped `DatasetRef` instances. 

709 """ 

710 if isinstance(refs, _DatasetRefGroupedIterable): 

711 return refs._iter_by_dataset_type() 

712 return DatasetRef.groupByType(refs).items() 

713 

714 def makeCompositeRef(self) -> DatasetRef: 

715 """Create a `DatasetRef` of the composite from a component ref. 

716 

717 Requires that this `DatasetRef` is a component. 

718 

719 Returns 

720 ------- 

721 ref : `DatasetRef` 

722 A `DatasetRef` with a dataset type that corresponds to the 

723 composite parent of this component, and the same ID and run 

724 (which may be `None`, if they are `None` in ``self``). 

725 """ 

726 # Assume that the data ID does not need to be standardized 

727 # and should match whatever this ref already has. 

728 return DatasetRef( 

729 self.datasetType.makeCompositeDatasetType(), 

730 self.dataId, 

731 id=self.id, 

732 run=self.run, 

733 conform=False, 

734 datastore_records=self._datastore_records, 

735 ) 

736 

737 def makeComponentRef(self, name: str) -> DatasetRef: 

738 """Create a `DatasetRef` that corresponds to a component. 

739 

740 Parameters 

741 ---------- 

742 name : `str` 

743 Name of the component. 

744 

745 Returns 

746 ------- 

747 ref : `DatasetRef` 

748 A `DatasetRef` with a dataset type that corresponds to the given 

749 component, and the same ID and run 

750 (which may be `None`, if they are `None` in ``self``). 

751 """ 

752 # Assume that the data ID does not need to be standardized 

753 # and should match whatever this ref already has. 

754 return DatasetRef( 

755 self.datasetType.makeComponentDatasetType(name), 

756 self.dataId, 

757 id=self.id, 

758 run=self.run, 

759 conform=False, 

760 datastore_records=self._datastore_records, 

761 ) 

762 

763 def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef: 

764 """Create a new `DatasetRef` from this one, but with a modified 

765 `DatasetType` that has a different `StorageClass`. 

766 

767 Parameters 

768 ---------- 

769 storageClass : `str` or `StorageClass` 

770 The new storage class. 

771 

772 Returns 

773 ------- 

774 modified : `DatasetRef` 

775 A new dataset reference that is the same as the current one but 

776 with a different storage class in the `DatasetType`. 

777 """ 

778 return self.replace(storage_class=storageClass) 

779 

780 def replace( 

781 self, 

782 *, 

783 id: DatasetId | None = None, 

784 run: str | None = None, 

785 storage_class: str | StorageClass | None = None, 

786 datastore_records: DatasetDatastoreRecords | None | Literal[False] = False, 

787 ) -> DatasetRef: 

788 """Create a new `DatasetRef` from this one, but with some modified 

789 attributes. 

790 

791 Parameters 

792 ---------- 

793 id : `DatasetId` or `None` 

794 If not `None` then update dataset ID. 

795 run : `str` or `None` 

796 If not `None` then update run collection name. If ``dataset_id`` is 

797 `None` then this will also cause new dataset ID to be generated. 

798 storage_class : `str` or `StorageClass` or `None` 

799 The new storage class. If not `None`, replaces existing storage 

800 class. 

801 datastore_records : `DatasetDatastoreRecords` or `None` 

802 New datastore records. If `None` remove all records. By default 

803 datastore records are preserved. 

804 

805 Returns 

806 ------- 

807 modified : `DatasetRef` 

808 A new dataset reference with updated attributes. 

809 """ 

810 if datastore_records is False: 

811 datastore_records = self._datastore_records 

812 if storage_class is None: 

813 datasetType = self.datasetType 

814 else: 

815 datasetType = self.datasetType.overrideStorageClass(storage_class) 

816 if run is None: 

817 run = self.run 

818 # Do not regenerate dataset ID if run is the same. 

819 if id is None: 

820 id = self.id 

821 return DatasetRef( 

822 datasetType=datasetType, 

823 dataId=self.dataId, 

824 run=run, 

825 id=id, 

826 conform=False, 

827 datastore_records=datastore_records, 

828 ) 

829 

830 def is_compatible_with(self, other: DatasetRef) -> bool: 

831 """Determine if the given `DatasetRef` is compatible with this one. 

832 

833 Parameters 

834 ---------- 

835 other : `DatasetRef` 

836 Dataset ref to check. 

837 

838 Returns 

839 ------- 

840 is_compatible : `bool` 

841 Returns `True` if the other dataset ref is either the same as this 

842 or the dataset type associated with the other is compatible with 

843 this one and the dataId and dataset ID match. 

844 

845 Notes 

846 ----- 

847 Compatibility requires that the dataId and dataset ID match and the 

848 `DatasetType` is compatible. Compatibility is defined as the storage 

849 class associated with the dataset type of the other ref can be 

850 converted to this storage class. 

851 

852 Specifically this means that if you have done: 

853 

854 .. code-block:: py 

855 

856 new_ref = ref.overrideStorageClass(sc) 

857 

858 and this is successful, then the guarantee is that: 

859 

860 .. code-block:: py 

861 

862 assert ref.is_compatible_with(new_ref) is True 

863 

864 since we know that the python type associated with the new ref can 

865 be converted to the original python type. The reverse is not guaranteed 

866 and depends on whether bidirectional converters have been registered. 

867 """ 

868 if self.id != other.id: 

869 return False 

870 if self.dataId != other.dataId: 

871 return False 

872 if self.run != other.run: 

873 return False 

874 return self.datasetType.is_compatible_with(other.datasetType) 

875 

876 datasetType: DatasetType 

877 """The definition of this dataset (`DatasetType`). 

878 

879 Cannot be changed after a `DatasetRef` is constructed. 

880 """ 

881 

882 dataId: DataCoordinate 

883 """A mapping of `Dimension` primary key values that labels the dataset 

884 within a Collection (`DataCoordinate`). 

885 

886 Cannot be changed after a `DatasetRef` is constructed. 

887 """ 

888 

889 run: str 

890 """The name of the run that produced the dataset. 

891 

892 Cannot be changed after a `DatasetRef` is constructed. 

893 """ 

894 

895 datastore_records: DatasetDatastoreRecords | None 

896 """Optional datastore records (`DatasetDatastoreRecords`). 

897 

898 Cannot be changed after a `DatasetRef` is constructed. 

899 """ 

900 

901 

902class MinimalistSerializableDatasetRef(pydantic.BaseModel): 

903 """Minimal information needed to define a DatasetRef. 

904 

905 The ID is not included and is presumed to be the key to a mapping 

906 to this information. 

907 """ 

908 

909 model_config = pydantic.ConfigDict(frozen=True) 

910 

911 dataset_type_name: str 

912 """Name of the dataset type.""" 

913 

914 run: str 

915 """Name of the RUN collection.""" 

916 

917 data_id: SerializedDataId 

918 """Data coordinate of this dataset.""" 

919 

920 def to_dataset_ref( 

921 self, 

922 id: DatasetId, 

923 *, 

924 dataset_type: DatasetType, 

925 universe: DimensionUniverse, 

926 attacher: DimensionDataAttacher | None = None, 

927 ) -> DatasetRef: 

928 """Convert serialized object to a `DatasetRef`. 

929 

930 Parameters 

931 ---------- 

932 id : `DatasetId` 

933 UUID identifying the dataset. 

934 dataset_type : `DatasetType` 

935 `DatasetType` record corresponding to the dataset type name in the 

936 serialized object. 

937 universe : `DimensionUniverse` 

938 Dimension universe for the dataset. 

939 attacher : `DimensionDataAttacher`, optional 

940 If provided, will be used to add dimension records to the 

941 deserialized `DatasetRef` instance. 

942 

943 Returns 

944 ------- 

945 ref : `DatasetRef` 

946 The deserialized object. 

947 """ 

948 assert dataset_type.name == self.dataset_type_name, ( 

949 "Given DatasetType does not match the serialized dataset type name" 

950 ) 

951 simple_data_id = SerializedDataCoordinate(dataId=self.data_id) 

952 data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe) 

953 if attacher: 

954 data_ids = attacher.attach(dataset_type.dimensions, [data_id]) 

955 data_id = data_ids[0] 

956 return DatasetRef( 

957 id=id, 

958 run=self.run, 

959 datasetType=dataset_type, 

960 dataId=data_id, 

961 ) 

962 

963 @staticmethod 

964 def from_dataset_ref(ref: DatasetRef) -> MinimalistSerializableDatasetRef: 

965 """Serialize a ``DatasetRef` to a simplified format. 

966 

967 Parameters 

968 ---------- 

969 ref : `DatasetRef` 

970 `DatasetRef` object to serialize. 

971 """ 

972 return MinimalistSerializableDatasetRef( 

973 dataset_type_name=ref.datasetType.name, run=ref.run, data_id=dict(ref.dataId.mapping) 

974 ) 

975 

976 

977class SerializedDatasetRefContainer(pydantic.BaseModel): 

978 """Serializable model for a collection of DatasetRef. 

979 

980 Dimension records are not included. 

981 """ 

982 

983 model_config = pydantic.ConfigDict(extra="allow", frozen=True) 

984 container_version: str 

985 

986 

987class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer): 

988 """Serializable model for a collection of DatasetRef. 

989 

990 Dimension records are not included. 

991 """ 

992 

993 container_version: Literal["V1"] = "V1" 

994 

995 universe_version: int 

996 """Dimension universe version.""" 

997 

998 universe_namespace: str 

999 """Dimension universe namespace.""" 

1000 

1001 dataset_types: dict[str, SerializedDatasetType] 

1002 """Dataset types indexed by their name.""" 

1003 

1004 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] 

1005 """Minimal dataset ref information indexed by UUID.""" 

1006 

1007 dimension_records: SerializableDimensionData | None = None 

1008 """Dimension record information""" 

1009 

1010 def __len__(self) -> int: 

1011 """Return the number of datasets in the container.""" 

1012 return len(self.compact_refs) 

1013 

1014 @classmethod 

1015 def from_refs(cls, refs: Iterable[DatasetRef]) -> Self: 

1016 """Construct a serializable form from a list of `DatasetRef`. 

1017 

1018 Parameters 

1019 ---------- 

1020 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

1021 The datasets to include in the container. 

1022 """ 

1023 # The serialized DatasetRef contains a lot of duplicated information. 

1024 # We also want to drop dimension records and assume that the records 

1025 # are already in the registry. 

1026 universe: DimensionUniverse | None = None 

1027 dataset_types: dict[str, SerializedDatasetType] = {} 

1028 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {} 

1029 data_ids: list[DataCoordinate] = [] 

1030 dimensions: list[DimensionGroup] = [] 

1031 for ref in refs: 

1032 if universe is None: 

1033 universe = ref.datasetType.dimensions.universe 

1034 if (name := ref.datasetType.name) not in dataset_types: 

1035 dataset_types[name] = ref.datasetType.to_simple() 

1036 compact_refs[ref.id] = MinimalistSerializableDatasetRef.from_dataset_ref(ref) 

1037 if ref.dataId.hasRecords(): 

1038 dimensions.append(ref.datasetType.dimensions) 

1039 data_ids.append(ref.dataId) 

1040 

1041 # Extract dimension record metadata if present. 

1042 dimension_records = None 

1043 if data_ids and len(compact_refs) == len(data_ids): 

1044 dimension_group = DimensionGroup.union(*dimensions, universe=universe) 

1045 

1046 # Records were attached to all refs. Store them. 

1047 extractor = DimensionDataExtractor.from_dimension_group( 

1048 dimension_group, 

1049 ignore_cached=False, 

1050 include_skypix=False, 

1051 ) 

1052 extractor.update(data_ids) 

1053 dimension_records = SerializableDimensionData.from_record_sets(extractor.records.values()) 

1054 

1055 if universe: 

1056 universe_version = universe.version 

1057 universe_namespace = universe.namespace 

1058 else: 

1059 # No refs so no universe. 

1060 universe_version = 0 

1061 universe_namespace = "unknown" 

1062 return cls( 

1063 universe_version=universe_version, 

1064 universe_namespace=universe_namespace, 

1065 dataset_types=dataset_types, 

1066 compact_refs=compact_refs, 

1067 dimension_records=dimension_records, 

1068 ) 

1069 

1070 def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]: 

1071 """Construct the original `DatasetRef`. 

1072 

1073 Parameters 

1074 ---------- 

1075 universe : `DimensionUniverse` 

1076 The universe to use when constructing the `DatasetRef`. 

1077 

1078 Returns 

1079 ------- 

1080 refs : `list` [ `DatasetRef` ] 

1081 The `DatasetRef` that were serialized. 

1082 """ 

1083 if not self.compact_refs: 

1084 return [] 

1085 

1086 if universe.namespace != self.universe_namespace: 

1087 raise RuntimeError( 

1088 f"Can not convert to refs in universe {universe.namespace} that were created from " 

1089 f"universe {self.universe_namespace}" 

1090 ) 

1091 

1092 if universe.version != self.universe_version: 

1093 _LOG.warning( 

1094 "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. " 

1095 "Serialized with version %d but asked to use version %d.", 

1096 self.universe_version, 

1097 universe.version, 

1098 ) 

1099 

1100 # Reconstruct the DatasetType objects. 

1101 dataset_types = { 

1102 name: DatasetType.from_simple(dtype, universe=universe) 

1103 for name, dtype in self.dataset_types.items() 

1104 } 

1105 

1106 # Dimension records can be attached if available. 

1107 # We assume that all dimension information was stored. 

1108 attacher = None 

1109 if self.dimension_records: 

1110 attacher = DimensionDataAttacher( 

1111 deserializers=self.dimension_records.make_deserializers(universe) 

1112 ) 

1113 

1114 refs: list[DatasetRef] = [] 

1115 for id_, minimal in self.compact_refs.items(): 

1116 ref = minimal.to_dataset_ref( 

1117 id_, 

1118 dataset_type=dataset_types[minimal.dataset_type_name], 

1119 universe=universe, 

1120 attacher=attacher, 

1121 ) 

1122 refs.append(ref) 

1123 return refs 

1124 

1125 

1126SerializedDatasetRefContainers: TypeAlias = Annotated[ 

1127 SerializedDatasetRefContainerV1, 

1128 pydantic.Field(discriminator="container_version"), 

1129]