Coverage for python / lsst / daf / butler / _dataset_ref.py: 30%
314 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = [
30 "AmbiguousDatasetError",
31 "DatasetDatastoreRecords",
32 "DatasetId",
33 "DatasetIdFactory",
34 "DatasetIdGenEnum",
35 "DatasetRef",
36 "SerializedDatasetRef",
37 "SerializedDatasetRefContainerV1",
38 "SerializedDatasetRefContainers",
39]
41import enum
42import logging
43import sys
44import uuid
45from collections.abc import Callable, Iterable, Mapping
46from typing import (
47 TYPE_CHECKING,
48 Annotated,
49 Any,
50 ClassVar,
51 Literal,
52 Protocol,
53 Self,
54 TypeAlias,
55 cast,
56 runtime_checkable,
57)
59import pydantic
60from pydantic import StrictStr
62from lsst.utils.classes import immutable
64from ._config_support import LookupKey
65from ._dataset_type import DatasetType, SerializedDatasetType
66from ._named import NamedKeyDict
67from ._uuid import generate_uuidv7
68from .datastore.stored_file_info import StoredDatastoreItemInfo
69from .dimensions import (
70 DataCoordinate,
71 DimensionDataAttacher,
72 DimensionDataExtractor,
73 DimensionGroup,
74 DimensionUniverse,
75 SerializableDimensionData,
76 SerializedDataCoordinate,
77 SerializedDataId,
78)
79from .json import from_json_pydantic, to_json_pydantic
80from .persistence_context import PersistenceContextVars
82if TYPE_CHECKING:
83 from ._storage_class import StorageClass
84 from .registry import Registry
86# Per-dataset records grouped by opaque table name, usually there is just one
87# opaque table.
88DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]
91_LOG = logging.getLogger(__name__)
94class AmbiguousDatasetError(Exception):
95 """Raised when a `DatasetRef` is not resolved but should be.
97 This happens when the `DatasetRef` has no ID or run but the requested
98 operation requires one of them.
99 """
102@runtime_checkable
103class _DatasetRefGroupedIterable(Protocol):
104 """A package-private interface for iterables of `DatasetRef` that know how
105 to efficiently group their contents by `DatasetType`.
107 """
109 def _iter_by_dataset_type(self) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
110 """Iterate over `DatasetRef` instances, one `DatasetType` at a time.
112 Returns
113 -------
114 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
115 `~collections.abc.Iterable` [ `DatasetRef` ]
116 An iterable of tuples, in which the first element is a dataset type
117 and the second is an iterable of `DatasetRef` objects with exactly
118 that dataset type.
119 """
120 ...
123class DatasetIdGenEnum(enum.Enum):
124 """Enum used to specify dataset ID generation options."""
126 UNIQUE = 0
127 """Unique mode generates unique ID for each inserted dataset, e.g.
128 auto-generated by database or random UUID.
129 """
131 DATAID_TYPE = 1
132 """In this mode ID is computed deterministically from a combination of
133 dataset type and dataId.
134 """
136 DATAID_TYPE_RUN = 2
137 """In this mode ID is computed deterministically from a combination of
138 dataset type, dataId, and run collection name.
139 """
142class DatasetIdFactory:
143 """Factory for dataset IDs (UUIDs).
145 For now the logic is hard-coded and is controlled by the user-provided
146 value of `DatasetIdGenEnum`. In the future we may implement a configurable
147 logic that can guess `DatasetIdGenEnum` value from other parameters.
148 """
150 NS_UUID = uuid.UUID("840b31d9-05cd-5161-b2c8-00d32b280d0f")
151 """Namespace UUID used for UUID5 generation. Do not change. This was
152 produced by ``uuid.uuid5(uuid.NAMESPACE_DNS, "lsst.org")``.
153 """
155 def makeDatasetId(
156 self,
157 run: str,
158 datasetType: DatasetType,
159 dataId: DataCoordinate,
160 idGenerationMode: DatasetIdGenEnum,
161 ) -> uuid.UUID:
162 """Generate dataset ID for a dataset.
164 Parameters
165 ----------
166 run : `str`
167 Name of the RUN collection for the dataset.
168 datasetType : `DatasetType`
169 Dataset type.
170 dataId : `DataCoordinate`
171 Expanded data ID for the dataset.
172 idGenerationMode : `DatasetIdGenEnum`
173 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random
174 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a
175 deterministic UUID5-type ID based on a dataset type name and
176 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a
177 deterministic UUID5-type ID based on a dataset type name, run
178 collection name, and ``dataId``.
180 Returns
181 -------
182 datasetId : `uuid.UUID`
183 Dataset identifier.
184 """
185 if idGenerationMode is DatasetIdGenEnum.UNIQUE:
186 # Earlier versions of this code used UUIDv4. However, totally
187 # random IDs create problems for Postgres insert performance,
188 # because it scatters index updates randomly around the disk.
189 # UUIDv7 has similar uniqueness properties to v4, but IDs generated
190 # at the same time are close together in the index.
191 return generate_uuidv7()
192 else:
193 # WARNING: If you modify this code make sure that the order of
194 # items in the `items` list below never changes.
195 items: list[tuple[str, str]] = []
196 if idGenerationMode is DatasetIdGenEnum.DATAID_TYPE:
197 items = [
198 ("dataset_type", datasetType.name),
199 ]
200 elif idGenerationMode is DatasetIdGenEnum.DATAID_TYPE_RUN:
201 items = [
202 ("dataset_type", datasetType.name),
203 ("run", run),
204 ]
205 else:
206 raise ValueError(f"Unexpected ID generation mode: {idGenerationMode}")
208 for name, value in sorted(dataId.required.items()):
209 items.append((name, str(value)))
210 data = ",".join(f"{key}={value}" for key, value in items)
211 return uuid.uuid5(self.NS_UUID, data)
214# This is constant, so don't recreate a set for each instance
215_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}
218class SerializedDatasetRef(pydantic.BaseModel):
219 """Simplified model of a `DatasetRef` suitable for serialization."""
221 id: uuid.UUID
222 datasetType: SerializedDatasetType | None = None
223 dataId: SerializedDataCoordinate | None = None
224 run: StrictStr | None = None
225 component: StrictStr | None = None
227 # Can not use "after" validator since in some cases the validator
228 # seems to trigger with the datasetType field not yet set.
229 @pydantic.model_validator(mode="before") # type: ignore[attr-defined]
230 @classmethod
231 def check_consistent_parameters(cls, data: dict[str, Any]) -> dict[str, Any]:
232 has_datasetType = data.get("datasetType") is not None
233 has_dataId = data.get("dataId") is not None
234 if has_datasetType is not has_dataId:
235 raise ValueError("If specifying datasetType or dataId, must specify both.")
237 if data.get("component") is not None and has_datasetType:
238 raise ValueError("datasetType can not be set if component is given.")
239 return data
241 @classmethod
242 def direct(
243 cls,
244 *,
245 id: str,
246 run: str,
247 datasetType: dict[str, Any] | None = None,
248 dataId: dict[str, Any] | None = None,
249 component: str | None = None,
250 ) -> SerializedDatasetRef:
251 """Construct a `SerializedDatasetRef` directly without validators.
253 Parameters
254 ----------
255 id : `str`
256 The UUID in string form.
257 run : `str`
258 The run for this dataset.
259 datasetType : `dict` [`str`, `typing.Any`]
260 A representation of the dataset type.
261 dataId : `dict` [`str`, `typing.Any`]
262 A representation of the data ID.
263 component : `str` or `None`
264 Any component associated with this ref.
266 Returns
267 -------
268 serialized : `SerializedDatasetRef`
269 A Pydantic model representing the given parameters.
271 Notes
272 -----
273 This differs from the pydantic "construct" method in that the arguments
274 are explicitly what the model requires, and it will recurse through
275 members, constructing them from their corresponding `direct` methods.
277 The ``id`` parameter is a string representation of dataset ID, it is
278 converted to UUID by this method.
280 This method should only be called when the inputs are trusted.
281 """
282 serialized_datasetType = (
283 SerializedDatasetType.direct(**datasetType) if datasetType is not None else None
284 )
285 serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None
287 node = cls.model_construct(
288 _fields_set=_serializedDatasetRefFieldsSet,
289 id=uuid.UUID(id),
290 datasetType=serialized_datasetType,
291 dataId=serialized_dataId,
292 run=sys.intern(run),
293 component=component,
294 )
296 return node
299DatasetId: TypeAlias = uuid.UUID
300"""A type-annotation alias for dataset ID providing typing flexibility.
301"""
304@immutable
305class DatasetRef:
306 """Reference to a Dataset in a `Registry`.
308 A `DatasetRef` may point to a Dataset that currently does not yet exist
309 (e.g., because it is a predicted input for provenance).
311 Parameters
312 ----------
313 datasetType : `DatasetType`
314 The `DatasetType` for this Dataset.
315 dataId : `DataCoordinate`
316 A mapping of dimensions that labels the Dataset within a Collection.
317 run : `str`
318 The name of the run this dataset was associated with when it was
319 created.
320 id : `DatasetId`, optional
321 The unique identifier assigned when the dataset is created. If ``id``
322 is not specified, a new unique ID will be created.
323 conform : `bool`, optional
324 If `True` (default), call `DataCoordinate.standardize` to ensure that
325 the data ID's dimensions are consistent with the dataset type's.
326 `DatasetRef` instances for which those dimensions are not equal should
327 not be created in new code, but are still supported for backwards
328 compatibility. New code should only pass `False` if it can guarantee
329 that the dimensions are already consistent.
330 id_generation_mode : `DatasetIdGenEnum`
331 ID generation option. `~DatasetIdGenEnum.UNIQUE` makes a random
332 UUID4-type ID. `~DatasetIdGenEnum.DATAID_TYPE` makes a
333 deterministic UUID5-type ID based on a dataset type name and
334 ``dataId``. `~DatasetIdGenEnum.DATAID_TYPE_RUN` makes a
335 deterministic UUID5-type ID based on a dataset type name, run
336 collection name, and ``dataId``.
337 datastore_records : `DatasetDatastoreRecords` or `None`
338 Datastore records to attach.
340 Notes
341 -----
342 See also :ref:`daf_butler_organizing_datasets`
343 """
345 _serializedType: ClassVar[type[pydantic.BaseModel]] = SerializedDatasetRef
346 __slots__ = (
347 "_id",
348 "datasetType",
349 "dataId",
350 "run",
351 "_datastore_records",
352 )
354 def __init__(
355 self,
356 datasetType: DatasetType,
357 dataId: DataCoordinate,
358 run: str,
359 *,
360 id: DatasetId | None = None,
361 conform: bool = True,
362 id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
363 datastore_records: DatasetDatastoreRecords | None = None,
364 ):
365 self.datasetType = datasetType
366 if conform:
367 self.dataId = DataCoordinate.standardize(dataId, dimensions=datasetType.dimensions)
368 else:
369 self.dataId = dataId
370 self.run = run
371 if id is not None:
372 self._id = id.int
373 else:
374 self._id = (
375 DatasetIdFactory()
376 .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode)
377 .int
378 )
379 self._datastore_records = datastore_records
381 @property
382 def id(self) -> DatasetId:
383 """Primary key of the dataset (`DatasetId`).
385 Cannot be changed after a `DatasetRef` is constructed.
386 """
387 return uuid.UUID(int=self._id)
389 def __eq__(self, other: Any) -> bool:
390 try:
391 return (self.datasetType, self.dataId, self.id) == (other.datasetType, other.dataId, other.id)
392 except AttributeError:
393 return NotImplemented
395 def __hash__(self) -> int:
396 return hash((self.datasetType, self.dataId, self.id))
398 @property
399 def dimensions(self) -> DimensionGroup:
400 """Dimensions associated with the underlying `DatasetType`."""
401 return self.datasetType.dimensions
403 def __repr__(self) -> str:
404 # We delegate to __str__ (i.e use "!s") for the data ID) below because
405 # DataCoordinate's __repr__ - while adhering to the guidelines for
406 # __repr__ - is much harder to users to read, while its __str__ just
407 # produces a dict that can also be passed to DatasetRef's constructor.
408 return f"DatasetRef({self.datasetType!r}, {self.dataId!s}, run={self.run!r}, id={self.id})"
410 def __str__(self) -> str:
411 s = (
412 f"{self.datasetType.name}@{self.dataId!s} [sc={self.datasetType.storageClass_name}]"
413 f" (run={self.run} id={self.id})"
414 )
415 return s
417 def __lt__(self, other: Any) -> bool:
418 # Sort by run, DatasetType name and then by DataCoordinate
419 # The __str__ representation is probably close enough but we
420 # need to ensure that sorting a DatasetRef matches what you would
421 # get if you sorted DatasetType+DataCoordinate
422 if not isinstance(other, type(self)):
423 return NotImplemented
425 # Group by run if defined, takes precedence over DatasetType
426 self_run = "" if self.run is None else self.run
427 other_run = "" if other.run is None else other.run
429 # Compare tuples in the priority order
430 return (self_run, self.datasetType, self.dataId) < (other_run, other.datasetType, other.dataId)
432 def to_simple(self, minimal: bool = False) -> SerializedDatasetRef:
433 """Convert this class to a simple python type.
435 This makes it suitable for serialization.
437 Parameters
438 ----------
439 minimal : `bool`, optional
440 Use minimal serialization. Requires Registry to convert
441 back to a full type.
443 Returns
444 -------
445 simple : `dict` or `int`
446 The object converted to a dictionary.
447 """
448 if minimal:
449 # The only thing needed to uniquely define a DatasetRef is its id
450 # so that can be used directly if it is not a component DatasetRef.
451 # Store is in a dict to allow us to easily add the planned origin
452 # information later without having to support an int and dict in
453 # simple form.
454 simple: dict[str, Any] = {"id": self.id}
455 if self.isComponent():
456 # We can still be a little minimalist with a component
457 # but we will also need to record the datasetType component
458 simple["component"] = self.datasetType.component()
459 return SerializedDatasetRef(**simple)
461 return SerializedDatasetRef(
462 datasetType=self.datasetType.to_simple(minimal=minimal),
463 dataId=self.dataId.to_simple(),
464 run=self.run,
465 id=self.id,
466 )
468 @classmethod
469 def from_simple(
470 cls,
471 simple: SerializedDatasetRef,
472 universe: DimensionUniverse | None = None,
473 registry: Registry | None = None,
474 datasetType: DatasetType | None = None,
475 ) -> DatasetRef:
476 """Construct a new object from simplified form.
478 Generally this is data returned from the `to_simple` method.
480 Parameters
481 ----------
482 simple : `dict` of [`str`, `typing.Any`]
483 The value returned by `to_simple()`.
484 universe : `DimensionUniverse`
485 The special graph of all known dimensions.
486 Can be `None` if a registry is provided.
487 registry : `lsst.daf.butler.Registry`, optional
488 Registry to use to convert simple form of a DatasetRef to
489 a full `DatasetRef`. Can be `None` if a full description of
490 the type is provided along with a universe.
491 datasetType : DatasetType, optional
492 If datasetType is supplied, this will be used as the datasetType
493 object in the resulting DatasetRef instead of being read from
494 the `SerializedDatasetRef`. This is useful when many refs share
495 the same type as memory can be saved. Defaults to None.
497 Returns
498 -------
499 ref : `DatasetRef`
500 Newly-constructed object.
501 """
502 cache = PersistenceContextVars.datasetRefs.get()
503 key = simple.id.int
504 if cache is not None and (ref := cache.get(key, None)) is not None:
505 if datasetType is not None:
506 if (component := datasetType.component()) is not None:
507 ref = ref.makeComponentRef(component)
508 ref = ref.overrideStorageClass(datasetType.storageClass_name)
509 return ref
510 if simple.datasetType is not None:
511 _, component = DatasetType.splitDatasetTypeName(simple.datasetType.name)
512 if component is not None:
513 ref = ref.makeComponentRef(component)
514 if simple.datasetType.storageClass is not None:
515 ref = ref.overrideStorageClass(simple.datasetType.storageClass)
516 return ref
517 # If dataset type is not given ignore the cache, because we can't
518 # reliably return the right storage class.
519 # Minimalist component will just specify component and id and
520 # require registry to reconstruct
521 if simple.datasetType is None and simple.dataId is None and simple.run is None:
522 if registry is None:
523 raise ValueError("Registry is required to construct component DatasetRef from integer id")
524 if simple.id is None:
525 raise ValueError("For minimal DatasetRef the ID must be defined.")
526 ref = registry.getDataset(simple.id)
527 if ref is None:
528 raise RuntimeError(f"No matching dataset found in registry for id {simple.id}")
529 if simple.component:
530 ref = ref.makeComponentRef(simple.component)
531 else:
532 if universe is None:
533 if registry is None:
534 raise ValueError("One of universe or registry must be provided.")
535 universe = registry.dimensions
536 if datasetType is None:
537 if simple.datasetType is None:
538 raise ValueError("Cannot determine Dataset type of this serialized class")
539 datasetType = DatasetType.from_simple(
540 simple.datasetType, universe=universe, registry=registry
541 )
542 if simple.dataId is None:
543 # mypy
544 raise ValueError("The DataId must be specified to construct a DatasetRef")
545 dataId = DataCoordinate.from_simple(simple.dataId, universe=universe)
546 # Check that simple ref is resolved.
547 if simple.run is None:
548 dstr = ""
549 if simple.datasetType is None:
550 dstr = f" (datasetType={datasetType.name!r})"
551 raise ValueError(
552 "Run collection name is missing from serialized representation. "
553 f"Encountered with {simple!r}{dstr}."
554 )
555 ref = cls(
556 datasetType,
557 dataId,
558 id=simple.id,
559 run=simple.run,
560 )
561 if cache is not None:
562 if ref.datasetType.component() is not None:
563 cache[key] = ref.makeCompositeRef()
564 else:
565 cache[key] = ref
566 return ref
568 to_json = to_json_pydantic
569 from_json: ClassVar[Callable[..., Self]] = cast(Callable[..., Self], classmethod(from_json_pydantic))
571 @classmethod
572 def _unpickle(
573 cls,
574 datasetType: DatasetType,
575 dataId: DataCoordinate,
576 id: DatasetId,
577 run: str,
578 datastore_records: DatasetDatastoreRecords | None,
579 ) -> DatasetRef:
580 """Create new `DatasetRef`.
582 A custom factory method for use by `__reduce__` as a workaround for
583 its lack of support for keyword arguments.
584 """
585 return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records)
587 def __reduce__(self) -> tuple:
588 return (
589 self._unpickle,
590 (self.datasetType, self.dataId, self.id, self.run, self._datastore_records),
591 )
593 def __deepcopy__(self, memo: dict) -> DatasetRef:
594 # DatasetRef is recursively immutable; see note in @immutable
595 # decorator.
596 return self
598 def expanded(self, dataId: DataCoordinate) -> DatasetRef:
599 """Return a new `DatasetRef` with the given expanded data ID.
601 Parameters
602 ----------
603 dataId : `DataCoordinate`
604 Data ID for the new `DatasetRef`. Must compare equal to the
605 original data ID.
607 Returns
608 -------
609 ref : `DatasetRef`
610 A new `DatasetRef` with the given data ID.
611 """
612 assert dataId == self.dataId
613 return DatasetRef(
614 datasetType=self.datasetType,
615 dataId=dataId,
616 id=self.id,
617 run=self.run,
618 conform=False,
619 datastore_records=self._datastore_records,
620 )
622 def isComponent(self) -> bool:
623 """Indicate whether this `DatasetRef` refers to a component.
625 Returns
626 -------
627 isComponent : `bool`
628 `True` if this `DatasetRef` is a component, `False` otherwise.
629 """
630 return self.datasetType.isComponent()
632 def isComposite(self) -> bool:
633 """Boolean indicating whether this `DatasetRef` is a composite type.
635 Returns
636 -------
637 isComposite : `bool`
638 `True` if this `DatasetRef` is a composite type, `False`
639 otherwise.
640 """
641 return self.datasetType.isComposite()
643 def _lookupNames(self) -> tuple[LookupKey, ...]:
644 """Name keys to use when looking up this DatasetRef in a configuration.
646 The names are returned in order of priority.
648 Returns
649 -------
650 names : `tuple` of `LookupKey`
651 Tuple of the `DatasetType` name and the `StorageClass` name.
652 If ``instrument`` is defined in the dataId, each of those names
653 is added to the start of the tuple with a key derived from the
654 value of ``instrument``.
655 """
656 # Special case the instrument Dimension since we allow configs
657 # to include the instrument name in the hierarchy.
658 names: tuple[LookupKey, ...] = self.datasetType._lookupNames()
660 if "instrument" in self.dataId:
661 names = tuple(n.clone(dataId={"instrument": self.dataId["instrument"]}) for n in names) + names
663 return names
665 @staticmethod
666 def groupByType(refs: Iterable[DatasetRef]) -> NamedKeyDict[DatasetType, list[DatasetRef]]:
667 """Group an iterable of `DatasetRef` by `DatasetType`.
669 Parameters
670 ----------
671 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
672 `DatasetRef` instances to group.
674 Returns
675 -------
676 grouped : `NamedKeyDict` [ `DatasetType`, `list` [ `DatasetRef` ] ]
677 Grouped `DatasetRef` instances.
679 Notes
680 -----
681 When lazy item-iterables are acceptable instead of a full mapping,
682 `iter_by_type` can in some cases be far more efficient.
683 """
684 result: NamedKeyDict[DatasetType, list[DatasetRef]] = NamedKeyDict()
685 for ref in refs:
686 result.setdefault(ref.datasetType, []).append(ref)
687 return result
689 @staticmethod
690 def iter_by_type(
691 refs: Iterable[DatasetRef],
692 ) -> Iterable[tuple[DatasetType, Iterable[DatasetRef]]]:
693 """Group an iterable of `DatasetRef` by `DatasetType` with special
694 hooks for custom iterables that can do this efficiently.
696 Parameters
697 ----------
698 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
699 `DatasetRef` instances to group. If this satisfies the
700 `_DatasetRefGroupedIterable` protocol, its
701 `~_DatasetRefGroupedIterable._iter_by_dataset_type` method will
702 be called.
704 Returns
705 -------
706 grouped : `~collections.abc.Iterable` [ `tuple` [ `DatasetType`, \
707 `~collections.abc.Iterable` [ `DatasetRef` ] ]]
708 Grouped `DatasetRef` instances.
709 """
710 if isinstance(refs, _DatasetRefGroupedIterable):
711 return refs._iter_by_dataset_type()
712 return DatasetRef.groupByType(refs).items()
714 def makeCompositeRef(self) -> DatasetRef:
715 """Create a `DatasetRef` of the composite from a component ref.
717 Requires that this `DatasetRef` is a component.
719 Returns
720 -------
721 ref : `DatasetRef`
722 A `DatasetRef` with a dataset type that corresponds to the
723 composite parent of this component, and the same ID and run
724 (which may be `None`, if they are `None` in ``self``).
725 """
726 # Assume that the data ID does not need to be standardized
727 # and should match whatever this ref already has.
728 return DatasetRef(
729 self.datasetType.makeCompositeDatasetType(),
730 self.dataId,
731 id=self.id,
732 run=self.run,
733 conform=False,
734 datastore_records=self._datastore_records,
735 )
737 def makeComponentRef(self, name: str) -> DatasetRef:
738 """Create a `DatasetRef` that corresponds to a component.
740 Parameters
741 ----------
742 name : `str`
743 Name of the component.
745 Returns
746 -------
747 ref : `DatasetRef`
748 A `DatasetRef` with a dataset type that corresponds to the given
749 component, and the same ID and run
750 (which may be `None`, if they are `None` in ``self``).
751 """
752 # Assume that the data ID does not need to be standardized
753 # and should match whatever this ref already has.
754 return DatasetRef(
755 self.datasetType.makeComponentDatasetType(name),
756 self.dataId,
757 id=self.id,
758 run=self.run,
759 conform=False,
760 datastore_records=self._datastore_records,
761 )
763 def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef:
764 """Create a new `DatasetRef` from this one, but with a modified
765 `DatasetType` that has a different `StorageClass`.
767 Parameters
768 ----------
769 storageClass : `str` or `StorageClass`
770 The new storage class.
772 Returns
773 -------
774 modified : `DatasetRef`
775 A new dataset reference that is the same as the current one but
776 with a different storage class in the `DatasetType`.
777 """
778 return self.replace(storage_class=storageClass)
780 def replace(
781 self,
782 *,
783 id: DatasetId | None = None,
784 run: str | None = None,
785 storage_class: str | StorageClass | None = None,
786 datastore_records: DatasetDatastoreRecords | None | Literal[False] = False,
787 ) -> DatasetRef:
788 """Create a new `DatasetRef` from this one, but with some modified
789 attributes.
791 Parameters
792 ----------
793 id : `DatasetId` or `None`
794 If not `None` then update dataset ID.
795 run : `str` or `None`
796 If not `None` then update run collection name. If ``dataset_id`` is
797 `None` then this will also cause new dataset ID to be generated.
798 storage_class : `str` or `StorageClass` or `None`
799 The new storage class. If not `None`, replaces existing storage
800 class.
801 datastore_records : `DatasetDatastoreRecords` or `None`
802 New datastore records. If `None` remove all records. By default
803 datastore records are preserved.
805 Returns
806 -------
807 modified : `DatasetRef`
808 A new dataset reference with updated attributes.
809 """
810 if datastore_records is False:
811 datastore_records = self._datastore_records
812 if storage_class is None:
813 datasetType = self.datasetType
814 else:
815 datasetType = self.datasetType.overrideStorageClass(storage_class)
816 if run is None:
817 run = self.run
818 # Do not regenerate dataset ID if run is the same.
819 if id is None:
820 id = self.id
821 return DatasetRef(
822 datasetType=datasetType,
823 dataId=self.dataId,
824 run=run,
825 id=id,
826 conform=False,
827 datastore_records=datastore_records,
828 )
830 def is_compatible_with(self, other: DatasetRef) -> bool:
831 """Determine if the given `DatasetRef` is compatible with this one.
833 Parameters
834 ----------
835 other : `DatasetRef`
836 Dataset ref to check.
838 Returns
839 -------
840 is_compatible : `bool`
841 Returns `True` if the other dataset ref is either the same as this
842 or the dataset type associated with the other is compatible with
843 this one and the dataId and dataset ID match.
845 Notes
846 -----
847 Compatibility requires that the dataId and dataset ID match and the
848 `DatasetType` is compatible. Compatibility is defined as the storage
849 class associated with the dataset type of the other ref can be
850 converted to this storage class.
852 Specifically this means that if you have done:
854 .. code-block:: py
856 new_ref = ref.overrideStorageClass(sc)
858 and this is successful, then the guarantee is that:
860 .. code-block:: py
862 assert ref.is_compatible_with(new_ref) is True
864 since we know that the python type associated with the new ref can
865 be converted to the original python type. The reverse is not guaranteed
866 and depends on whether bidirectional converters have been registered.
867 """
868 if self.id != other.id:
869 return False
870 if self.dataId != other.dataId:
871 return False
872 if self.run != other.run:
873 return False
874 return self.datasetType.is_compatible_with(other.datasetType)
876 datasetType: DatasetType
877 """The definition of this dataset (`DatasetType`).
879 Cannot be changed after a `DatasetRef` is constructed.
880 """
882 dataId: DataCoordinate
883 """A mapping of `Dimension` primary key values that labels the dataset
884 within a Collection (`DataCoordinate`).
886 Cannot be changed after a `DatasetRef` is constructed.
887 """
889 run: str
890 """The name of the run that produced the dataset.
892 Cannot be changed after a `DatasetRef` is constructed.
893 """
895 datastore_records: DatasetDatastoreRecords | None
896 """Optional datastore records (`DatasetDatastoreRecords`).
898 Cannot be changed after a `DatasetRef` is constructed.
899 """
902class MinimalistSerializableDatasetRef(pydantic.BaseModel):
903 """Minimal information needed to define a DatasetRef.
905 The ID is not included and is presumed to be the key to a mapping
906 to this information.
907 """
909 model_config = pydantic.ConfigDict(frozen=True)
911 dataset_type_name: str
912 """Name of the dataset type."""
914 run: str
915 """Name of the RUN collection."""
917 data_id: SerializedDataId
918 """Data coordinate of this dataset."""
920 def to_dataset_ref(
921 self,
922 id: DatasetId,
923 *,
924 dataset_type: DatasetType,
925 universe: DimensionUniverse,
926 attacher: DimensionDataAttacher | None = None,
927 ) -> DatasetRef:
928 """Convert serialized object to a `DatasetRef`.
930 Parameters
931 ----------
932 id : `DatasetId`
933 UUID identifying the dataset.
934 dataset_type : `DatasetType`
935 `DatasetType` record corresponding to the dataset type name in the
936 serialized object.
937 universe : `DimensionUniverse`
938 Dimension universe for the dataset.
939 attacher : `DimensionDataAttacher`, optional
940 If provided, will be used to add dimension records to the
941 deserialized `DatasetRef` instance.
943 Returns
944 -------
945 ref : `DatasetRef`
946 The deserialized object.
947 """
948 assert dataset_type.name == self.dataset_type_name, (
949 "Given DatasetType does not match the serialized dataset type name"
950 )
951 simple_data_id = SerializedDataCoordinate(dataId=self.data_id)
952 data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
953 if attacher:
954 data_ids = attacher.attach(dataset_type.dimensions, [data_id])
955 data_id = data_ids[0]
956 return DatasetRef(
957 id=id,
958 run=self.run,
959 datasetType=dataset_type,
960 dataId=data_id,
961 )
963 @staticmethod
964 def from_dataset_ref(ref: DatasetRef) -> MinimalistSerializableDatasetRef:
965 """Serialize a ``DatasetRef` to a simplified format.
967 Parameters
968 ----------
969 ref : `DatasetRef`
970 `DatasetRef` object to serialize.
971 """
972 return MinimalistSerializableDatasetRef(
973 dataset_type_name=ref.datasetType.name, run=ref.run, data_id=dict(ref.dataId.mapping)
974 )
977class SerializedDatasetRefContainer(pydantic.BaseModel):
978 """Serializable model for a collection of DatasetRef.
980 Dimension records are not included.
981 """
983 model_config = pydantic.ConfigDict(extra="allow", frozen=True)
984 container_version: str
987class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
988 """Serializable model for a collection of DatasetRef.
990 Dimension records are not included.
991 """
993 container_version: Literal["V1"] = "V1"
995 universe_version: int
996 """Dimension universe version."""
998 universe_namespace: str
999 """Dimension universe namespace."""
1001 dataset_types: dict[str, SerializedDatasetType]
1002 """Dataset types indexed by their name."""
1004 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef]
1005 """Minimal dataset ref information indexed by UUID."""
1007 dimension_records: SerializableDimensionData | None = None
1008 """Dimension record information"""
1010 def __len__(self) -> int:
1011 """Return the number of datasets in the container."""
1012 return len(self.compact_refs)
1014 @classmethod
1015 def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
1016 """Construct a serializable form from a list of `DatasetRef`.
1018 Parameters
1019 ----------
1020 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
1021 The datasets to include in the container.
1022 """
1023 # The serialized DatasetRef contains a lot of duplicated information.
1024 # We also want to drop dimension records and assume that the records
1025 # are already in the registry.
1026 universe: DimensionUniverse | None = None
1027 dataset_types: dict[str, SerializedDatasetType] = {}
1028 compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {}
1029 data_ids: list[DataCoordinate] = []
1030 dimensions: list[DimensionGroup] = []
1031 for ref in refs:
1032 if universe is None:
1033 universe = ref.datasetType.dimensions.universe
1034 if (name := ref.datasetType.name) not in dataset_types:
1035 dataset_types[name] = ref.datasetType.to_simple()
1036 compact_refs[ref.id] = MinimalistSerializableDatasetRef.from_dataset_ref(ref)
1037 if ref.dataId.hasRecords():
1038 dimensions.append(ref.datasetType.dimensions)
1039 data_ids.append(ref.dataId)
1041 # Extract dimension record metadata if present.
1042 dimension_records = None
1043 if data_ids and len(compact_refs) == len(data_ids):
1044 dimension_group = DimensionGroup.union(*dimensions, universe=universe)
1046 # Records were attached to all refs. Store them.
1047 extractor = DimensionDataExtractor.from_dimension_group(
1048 dimension_group,
1049 ignore_cached=False,
1050 include_skypix=False,
1051 )
1052 extractor.update(data_ids)
1053 dimension_records = SerializableDimensionData.from_record_sets(extractor.records.values())
1055 if universe:
1056 universe_version = universe.version
1057 universe_namespace = universe.namespace
1058 else:
1059 # No refs so no universe.
1060 universe_version = 0
1061 universe_namespace = "unknown"
1062 return cls(
1063 universe_version=universe_version,
1064 universe_namespace=universe_namespace,
1065 dataset_types=dataset_types,
1066 compact_refs=compact_refs,
1067 dimension_records=dimension_records,
1068 )
1070 def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
1071 """Construct the original `DatasetRef`.
1073 Parameters
1074 ----------
1075 universe : `DimensionUniverse`
1076 The universe to use when constructing the `DatasetRef`.
1078 Returns
1079 -------
1080 refs : `list` [ `DatasetRef` ]
1081 The `DatasetRef` that were serialized.
1082 """
1083 if not self.compact_refs:
1084 return []
1086 if universe.namespace != self.universe_namespace:
1087 raise RuntimeError(
1088 f"Can not convert to refs in universe {universe.namespace} that were created from "
1089 f"universe {self.universe_namespace}"
1090 )
1092 if universe.version != self.universe_version:
1093 _LOG.warning(
1094 "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
1095 "Serialized with version %d but asked to use version %d.",
1096 self.universe_version,
1097 universe.version,
1098 )
1100 # Reconstruct the DatasetType objects.
1101 dataset_types = {
1102 name: DatasetType.from_simple(dtype, universe=universe)
1103 for name, dtype in self.dataset_types.items()
1104 }
1106 # Dimension records can be attached if available.
1107 # We assume that all dimension information was stored.
1108 attacher = None
1109 if self.dimension_records:
1110 attacher = DimensionDataAttacher(
1111 deserializers=self.dimension_records.make_deserializers(universe)
1112 )
1114 refs: list[DatasetRef] = []
1115 for id_, minimal in self.compact_refs.items():
1116 ref = minimal.to_dataset_ref(
1117 id_,
1118 dataset_type=dataset_types[minimal.dataset_type_name],
1119 universe=universe,
1120 attacher=attacher,
1121 )
1122 refs.append(ref)
1123 return refs
1126SerializedDatasetRefContainers: TypeAlias = Annotated[
1127 SerializedDatasetRefContainerV1,
1128 pydantic.Field(discriminator="container_version"),
1129]