Coverage for python / lsst / daf / butler / dimensions / _record_set.py: 22%
283 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:43 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "DimensionDataAttacher",
32 "DimensionDataExtractor",
33 "DimensionRecordFactory",
34 "DimensionRecordSet",
35 "DimensionRecordSetDeserializer",
36 "SerializableDimensionData",
37)
39import dataclasses
40from collections.abc import Collection, Iterable, Iterator
41from typing import TYPE_CHECKING, Any, Protocol, Self, TypeAlias, final
43import pydantic
45from ._coordinate import DataCoordinate, DataIdValue
46from ._records import DimensionRecord, SerializedKeyValueDimensionRecord
48if TYPE_CHECKING:
49 from ..queries import Query
50 from ._elements import DimensionElement
51 from ._group import DimensionGroup
52 from ._skypix import SkyPixDimension
53 from ._universe import DimensionUniverse
54 from .record_cache import DimensionRecordCache
57SerializedDimensionRecordSetMapping: TypeAlias = dict[str, list[SerializedKeyValueDimensionRecord]]
60class DimensionRecordFactory(Protocol):
61 """Protocol for a callback that can be used to create a dimension record
62 to add to a `DimensionRecordSet` when a search for an existing one fails.
63 """
65 def __call__(
66 self, record_class: type[DimensionRecord], required_values: tuple[DataIdValue, ...]
67 ) -> DimensionRecord:
68 """Make a new `DimensionRecord` instance.
70 Parameters
71 ----------
72 record_class : `type` [ `DimensionRecord` ]
73 A concrete `DimensionRecord` subclass.
74 required_values : `tuple`
75 Tuple of data ID values, corresponding to
76 ``record_class.definition.required``.
77 """
78 ... # pragma: no cover
81def fail_record_lookup(
82 record_class: type[DimensionRecord], required_values: tuple[DataIdValue, ...]
83) -> DimensionRecord:
84 """Raise `LookupError` to indicate that a `DimensionRecord` could not be
85 found or created.
87 This is intended for use as the default value for arguments that take a
88 `DimensionRecordFactory` callback.
90 Parameters
91 ----------
92 record_class : `type` [ `DimensionRecord` ]
93 Type of record to create.
94 required_values : `tuple`
95 Tuple of data ID required values that are sufficient to identify a
96 record that exists in the data repository.
98 Returns
99 -------
100 record : `DimensionRecord`
101 Never returned; this function always raises `LookupError`.
102 """
103 raise LookupError(
104 f"No {record_class.definition.name!r} record with data ID "
105 f"{DataCoordinate.from_required_values(record_class.definition.minimal_group, required_values)}."
106 )
109@final
110class DimensionRecordSet(Collection[DimensionRecord]): # numpydoc ignore=PR01
111 """A mutable set-like container specialized for `DimensionRecord` objects.
113 Parameters
114 ----------
115 element : `DimensionElement` or `str`, optional
116 The dimension element that defines the records held by this set. If
117 not a `DimensionElement` instance, ``universe`` must be provided.
118 records : `~collections.abc.Iterable` [ `DimensionRecord` ], optional
119 Dimension records to add to the set.
120 universe : `DimensionUniverse`, optional
121 Object that defines all dimensions. Ignored if ``element`` is a
122 `DimensionElement` instance.
124 Notes
125 -----
126 `DimensionRecordSet` maintains its insertion order (like `dict`, and unlike
127 `set`).
129 `DimensionRecordSet` implements `collections.abc.Collection` but not
130 `collections.abc.Set` because the latter would require interoperability
131 with all other `~collections.abc.Set` implementations rather than just
132 `DimensionRecordSet`, and that adds a lot of complexity without much clear
133 value. To help make this clear to type checkers it implements only the
134 named-method versions of these operations (e.g. `issubset`) rather than the
135 operator special methods (e.g. ``__le__``).
137 `DimensionRecord` equality is defined in terms of a record's data ID fields
138 only, and `DimensionRecordSet` does not generally specify which record
139 "wins" when two records with the same data ID interact (e.g. in
140 `intersection`). The `add` and `update` methods are notable exceptions:
141 they always replace the existing record with the new one.
143 Dimension records can also be held by `DimensionRecordTable`, which
144 provides column-oriented access and Arrow interoperability.
145 """
147 def __init__(
148 self,
149 element: DimensionElement | str,
150 records: Iterable[DimensionRecord] = (),
151 universe: DimensionUniverse | None = None,
152 *,
153 _by_required_values: dict[tuple[DataIdValue, ...], DimensionRecord] | None = None,
154 ):
155 if isinstance(element, str):
156 if universe is None:
157 raise TypeError("'universe' must be provided if 'element' is not a DimensionElement.")
158 element = universe[element]
159 else:
160 universe = element.universe
161 if _by_required_values is None:
162 _by_required_values = {}
163 self._record_type = element.RecordClass
164 self._by_required_values = _by_required_values
165 self._dimensions = element.minimal_group
166 self.update(records)
168 @property
169 def element(self) -> DimensionElement:
170 """Name of the dimension element these records correspond to."""
171 return self._record_type.definition
173 def __contains__(self, key: object) -> bool:
174 match key:
175 case DimensionRecord() if key.definition == self.element:
176 required_values = key.dataId.required_values
177 case DataCoordinate() if key.dimensions == self.element.minimal_group:
178 required_values = key.required_values
179 case _:
180 return False
181 return required_values in self._by_required_values
183 def __len__(self) -> int:
184 return len(self._by_required_values)
186 def __iter__(self) -> Iterator[DimensionRecord]:
187 return iter(self._by_required_values.values())
189 def __eq__(self, other: object) -> bool:
190 if not isinstance(other, DimensionRecordSet):
191 return False
192 return (
193 self._record_type is other._record_type
194 and self._by_required_values.keys() == other._by_required_values.keys()
195 )
197 def __repr__(self) -> str:
198 lines = [f"DimensionRecordSet({self.element.name}, {{"]
199 for record in self:
200 lines.append(f" {record!r},")
201 lines.append("})")
202 return "\n".join(lines)
204 def issubset(self, other: DimensionRecordSet) -> bool:
205 """Test whether all elements in ``self`` are in ``other``.
207 Parameters
208 ----------
209 other : `DimensionRecordSet`
210 Another record set with the same record type.
212 Returns
213 -------
214 issubset ; `bool`
215 Whether all elements in ``self`` are in ``other``.
216 """
217 if self._record_type is not other._record_type:
218 raise ValueError(
219 "Invalid comparison between dimension record sets for elements "
220 f"{self.element.name!r} and {other.element.name!r}."
221 )
222 return self._by_required_values.keys() <= other._by_required_values.keys()
224 def issuperset(self, other: DimensionRecordSet) -> bool:
225 """Test whether all elements in ``other`` are in ``self``.
227 Parameters
228 ----------
229 other : `DimensionRecordSet`
230 Another record set with the same record type.
232 Returns
233 -------
234 issuperset ; `bool`
235 Whether all elements in ``other`` are in ``self``.
236 """
237 if self._record_type is not other._record_type:
238 raise ValueError(
239 "Invalid comparison between dimension record sets for elements "
240 f"{self.element.name!r} and {other.element.name!r}."
241 )
242 return self._by_required_values.keys() >= other._by_required_values.keys()
244 def isdisjoint(self, other: DimensionRecordSet) -> bool:
245 """Test whether the intersection of ``self`` and ``other`` is empty.
247 Parameters
248 ----------
249 other : `DimensionRecordSet`
250 Another record set with the same record type.
252 Returns
253 -------
254 isdisjoint ; `bool`
255 Whether the intersection of ``self`` and ``other`` is empty.
256 """
257 if self._record_type is not other._record_type:
258 raise ValueError(
259 "Invalid comparison between dimension record sets for elements "
260 f"{self.element.name!r} and {other.element.name!r}."
261 )
262 return self._by_required_values.keys().isdisjoint(other._by_required_values.keys())
264 def intersection(self, other: DimensionRecordSet) -> DimensionRecordSet:
265 """Return a new set with only records that are in both ``self`` and
266 ``other``.
268 Parameters
269 ----------
270 other : `DimensionRecordSet`
271 Another record set with the same record type.
273 Returns
274 -------
275 intersection : `DimensionRecordSet`
276 A new record set with all elements in both sets.
277 """
278 if self._record_type is not other._record_type:
279 raise ValueError(
280 "Invalid intersection between dimension record sets for elements "
281 f"{self.element.name!r} and {other.element.name!r}."
282 )
283 return DimensionRecordSet(
284 self.element,
285 _by_required_values={
286 k: v for k, v in self._by_required_values.items() if k in other._by_required_values
287 },
288 )
290 def difference(self, other: DimensionRecordSet) -> DimensionRecordSet:
291 """Return a new set with only records that are in ``self`` and not in
292 ``other``.
294 Parameters
295 ----------
296 other : `DimensionRecordSet`
297 Another record set with the same record type.
299 Returns
300 -------
301 difference : `DimensionRecordSet`
302 A new record set with all elements ``self`` that are not in
303 ``other``.
304 """
305 if self._record_type is not other._record_type:
306 raise ValueError(
307 "Invalid difference between dimension record sets for elements "
308 f"{self.element.name!r} and {other.element.name!r}."
309 )
310 return DimensionRecordSet(
311 self.element,
312 _by_required_values={
313 k: v for k, v in self._by_required_values.items() if k not in other._by_required_values
314 },
315 )
317 def union(self, other: DimensionRecordSet) -> DimensionRecordSet:
318 """Return a new set with all records that are either in ``self`` or
319 ``other``.
321 Parameters
322 ----------
323 other : `DimensionRecordSet`
324 Another record set with the same record type.
326 Returns
327 -------
328 intersection : `DimensionRecordSet`
329 A new record set with all elements in either set.
330 """
331 if self._record_type is not other._record_type:
332 raise ValueError(
333 "Invalid union between dimension record sets for elements "
334 f"{self.element.name!r} and {other.element.name!r}."
335 )
336 return DimensionRecordSet(
337 self.element,
338 _by_required_values=self._by_required_values | other._by_required_values,
339 )
341 def find(
342 self,
343 data_id: DataCoordinate,
344 or_add: DimensionRecordFactory = fail_record_lookup,
345 ) -> DimensionRecord:
346 """Return the record with the given data ID.
348 Parameters
349 ----------
350 data_id : `DataCoordinate`
351 Data ID to match.
352 or_add : `DimensionRecordFactory`
353 Callback that is invoked if no existing record is found, to create
354 a new record that is added to the set and returned. The return
355 value of this callback is *not* checked to see if it is a valid
356 dimension record with the right element and data ID.
358 Returns
359 -------
360 record : `DimensionRecord`
361 Matching record.
363 Raises
364 ------
365 KeyError
366 Raised if no record with this data ID was found.
367 ValueError
368 Raised if the data ID did not have the right dimensions.
369 """
370 if data_id.dimensions != self._dimensions:
371 raise ValueError(
372 f"data ID {data_id} has incorrect dimensions for dimension records for {self.element!r}."
373 )
374 return self.find_with_required_values(data_id.required_values, or_add)
376 def find_with_required_values(
377 self, required_values: tuple[DataIdValue, ...], or_add: DimensionRecordFactory = fail_record_lookup
378 ) -> DimensionRecord:
379 """Return the record whose data ID has the given required values.
381 Parameters
382 ----------
383 required_values : `tuple` [ `int` or `str` ]
384 Data ID values to match.
385 or_add : `DimensionRecordFactory`
386 Callback that is invoked if no existing record is found, to create
387 a new record that is added to the set and returned. The return
388 value of this callback is *not* checked to see if it is a valid
389 dimension record with the right element and data ID.
391 Returns
392 -------
393 record : `DimensionRecord`
394 Matching record.
396 Raises
397 ------
398 ValueError
399 Raised if the data ID did not have the right dimensions.
400 """
401 if (result := self._by_required_values.get(required_values)) is None:
402 result = or_add(self._record_type, required_values)
403 self._by_required_values[required_values] = result
404 return result
406 def add(self, value: DimensionRecord, replace: bool = True) -> None:
407 """Add a new record to the set.
409 Parameters
410 ----------
411 value : `DimensionRecord`
412 Record to add.
413 replace : `bool`, optional
414 If `True` (default) replace any existing record with the same data
415 ID. If `False` the existing record will be kept.
417 Raises
418 ------
419 ValueError
420 Raised if ``value.element != self.element``.
421 """
422 if value.definition.name != self.element:
423 raise ValueError(
424 f"Cannot add record {value} for {value.definition.name!r} to set for {self.element!r}."
425 )
426 if replace:
427 self._by_required_values[value.dataId.required_values] = value
428 else:
429 self._by_required_values.setdefault(value.dataId.required_values, value)
431 def update(self, values: Iterable[DimensionRecord], replace: bool = True) -> None:
432 """Add new records to the set.
434 Parameters
435 ----------
436 values : `~collections.abc.Iterable` [ `DimensionRecord` ]
437 Records to add.
438 replace : `bool`, optional
439 If `True` (default) replace any existing records with the same data
440 IDs. If `False` the existing records will be kept.
442 Raises
443 ------
444 ValueError
445 Raised if ``value.element != self.element``.
446 """
447 for value in values:
448 self.add(value, replace=replace)
450 def update_from_data_coordinates(self, data_coordinates: Iterable[DataCoordinate]) -> None:
451 """Add records to the set by extracting and deduplicating them from
452 data coordinates.
454 Parameters
455 ----------
456 data_coordinates : `~collections.abc.Iterable` [ `DataCoordinate` ]
457 Data coordinates to extract from. `DataCoordinate.hasRecords` must
458 be `True`.
459 """
460 for data_coordinate in data_coordinates:
461 if record := data_coordinate._record(self.element.name):
462 self._by_required_values[record.dataId.required_values] = record
464 def discard(self, value: DimensionRecord | DataCoordinate) -> None:
465 """Remove a record if it exists.
467 Parameters
468 ----------
469 value : `DimensionRecord` or `DataCoordinate`
470 Record to remove, or its data ID.
471 """
472 if isinstance(value, DimensionRecord):
473 value = value.dataId
474 if value.dimensions != self._dimensions:
475 raise ValueError(f"{value} has incorrect dimensions for dimension records for {self.element!r}.")
476 self._by_required_values.pop(value.required_values, None)
478 def remove(self, value: DimensionRecord | DataCoordinate) -> None:
479 """Remove a record.
481 Parameters
482 ----------
483 value : `DimensionRecord` or `DataCoordinate`
484 Record to remove, or its data ID.
486 Raises
487 ------
488 KeyError
489 Raised if there is no matching record.
490 """
491 if isinstance(value, DimensionRecord):
492 value = value.dataId
493 if value.dimensions != self._dimensions:
494 raise ValueError(f"{value} has incorrect dimensions for dimension records for {self.element!r}.")
495 del self._by_required_values[value.required_values]
497 def pop(self) -> DimensionRecord:
498 """Remove and return an arbitrary record."""
499 return self._by_required_values.popitem()[1]
501 def __deepcopy__(self, memo: dict[str, Any]) -> DimensionRecordSet:
502 return DimensionRecordSet(self.element, _by_required_values=self._by_required_values.copy())
504 def serialize_records(self) -> list[SerializedKeyValueDimensionRecord]:
505 """Serialize the records to a list.
507 Returns
508 -------
509 raw_records : `list` [ `list` ]
510 Serialized records, in the form returned by
511 `DimensionRecord.serialize_key_value`.
513 Notes
514 -----
515 This does not include the dimension element shared by all of the
516 records, on the assumption that this is usually more conveniently saved
517 separately (e.g. as the key of a dictionary of which the list of
518 records is a value).
519 """
520 return [record.serialize_key_value() for record in self]
522 def deserialize_records(self, raw_records: Iterable[SerializedKeyValueDimensionRecord]) -> None:
523 """Deserialize records and add them to this set.
525 Parameters
526 ----------
527 raw_records : `~collections.abc.Iterable` [ `list` ]
528 Serialized records, as returned by `serialize_records` or repeated
529 calls to `DimensionRecord.serialize_key_value`.
531 Notes
532 -----
533 The caller is responsible for ensuring that the serialized records have
534 the same dimension element as this set, as this cannot be checked.
535 Mismatches will probably result in a (confusing) type-validation error,
536 but are not guaranteed to.
537 """
538 deserializer = DimensionRecordSetDeserializer.from_raw(self.element, raw_records)
539 self.update(deserializer)
542class DimensionRecordSetDeserializer:
543 """A helper class for deserializing sets of dimension records, with support
544 for only fully deserializing certain records.
546 The `from_raw` factory method should generally be used instead of calling
547 the constructor directly.
549 Parameters
550 ----------
551 element : `DimensionElement`
552 Dimension element that defines all records.
553 mapping : `dict` [ `tuple`, `list` ]
554 A dictionary that maps the data ID required-values `tuple` for reach
555 record to the remainder of its raw serialization (i.e. an item in this
556 `dict` is a pair returned by `DimensionRecord.deserialize_key`). This
557 `dict` will be used directly to back the deserializer, not copied.
559 Notes
560 -----
561 The keys (data ID required-values tuples) of all rows are deserialized
562 immediately, but the remaining fields are deserialized only on demand; use
563 `__iter__` to deserialize all records or `__getitem__` to deserialize only
564 a few. An instance should really only be used for a single iteration or
565 multiple `__getitem__` calls, as each call will re-deserialize the records
566 in play; deserialized records are not cached.
568 The caller is responsible for ensuring that the serialized records are for
569 the given dimension element, as this cannot be checked. Mismatches will
570 probably result in a (confusing) type-validation error, but are not
571 guaranteed to.
572 """
574 def __init__(
575 self,
576 element: DimensionElement,
577 mapping: dict[tuple[DataIdValue, ...], SerializedKeyValueDimensionRecord],
578 ):
579 self.element = element
580 self._mapping = mapping
582 @classmethod
583 def from_raw(
584 cls, element: DimensionElement, raw_records: Iterable[SerializedKeyValueDimensionRecord]
585 ) -> Self:
586 """Construct from raw serialized records.
588 Parameters
589 ----------
590 element : `DimensionElement`
591 Dimension element that defines all records.
592 raw_records : `~collections.abc.Iterable` [ `list` ]
593 Serialized records, as returned by
594 `DimensionRecordSet.serialize_records` or repeated calls to
595 `DimensionRecord.serialize_key_value`.
597 Returns
598 -------
599 deserializer : `DimensionRecordSetDeserializer`
600 New deserializer instance.
601 """
602 return cls(element=element, mapping=dict(map(element.RecordClass.deserialize_key, raw_records)))
604 def __len__(self) -> int:
605 return len(self._mapping)
607 def __iter__(self) -> Iterator[DimensionRecord]:
608 deserialize = self.element.RecordClass.deserialize_value
609 return (deserialize(k, v) for k, v in self._mapping.items())
611 def __getitem__(self, key: tuple[DataIdValue, ...]) -> DimensionRecord:
612 return self.element.RecordClass.deserialize_value(key, self._mapping[key])
615@dataclasses.dataclass
616class DimensionDataExtractor:
617 """A helper class for extracting dimension records from expanded data IDs
618 (e.g. for normalized serialization).
620 Instances of this class must be initialized with empty sets (usually by one
621 of the class method factories) with all of the dimension elements that
622 should be extracted from the data IDs passed to `update_homogeneous` or
623 `update_heterogeneous`. Dimension elements not included will not be
624 extracted (which may be useful).
625 """
627 records: dict[str, DimensionRecordSet] = dataclasses.field(default_factory=dict)
629 @classmethod
630 def from_element_names(
631 cls, element_names: Iterable[str], universe: DimensionUniverse
632 ) -> DimensionDataExtractor:
633 """Construct from an iterable of dimension element names.
635 Parameters
636 ----------
637 element_names : `~collections.abc.Iterable` [ `str` ]
638 Names of dimension elements to include.
639 universe : `DimensionUniverse`
640 Definitions of all dimensions.
642 Returns
643 -------
644 extractor : `DimensionDataExtractor`
645 New extractor.
646 """
647 return cls(
648 records={
649 element_name: DimensionRecordSet(element_name, universe=universe)
650 for element_name in element_names
651 }
652 )
654 @classmethod
655 def from_dimension_group(
656 cls,
657 dimensions: DimensionGroup,
658 *,
659 ignore: Iterable[str] = (),
660 ignore_cached: bool = False,
661 include_skypix: bool = False,
662 ) -> DimensionDataExtractor:
663 """Construct from a `DimensionGroup` and a set of dimension element
664 names to ignore.
666 Parameters
667 ----------
668 dimensions : `DimensionGroup`
669 Dimensions that span the set of elements whose elements are to be
670 extracted.
671 ignore : `~collections.abc.Iterable` [ `str` ], optional
672 Names of dimension elements that should not be extracted.
673 ignore_cached : `bool`, optional
674 If `True`, ignore all dimension elements for which
675 `DimensionElement.is_cached` is `True`.
676 include_skypix : `bool`, optional
677 If `True`, include skypix dimensions. These are ignored by default
678 because they can always be recomputed from their IDs on-the-fly.
680 Returns
681 -------
682 extractor : `DimensionDataExtractor`
683 New extractor.
684 """
685 elements = set(dimensions.elements)
686 elements.difference_update(ignore)
687 if ignore_cached:
688 elements.difference_update([e for e in elements if dimensions.universe[e].is_cached])
689 if not include_skypix:
690 elements.difference_update(dimensions.skypix)
691 return cls.from_element_names(elements, universe=dimensions.universe)
693 def update(self, data_ids: Iterable[DataCoordinate]) -> None:
694 """Extract dimension records from an iterable of data IDs.
696 Parameters
697 ----------
698 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ]
699 Data IDs to extract dimension records from.
700 """
701 for data_id in data_ids:
702 for element in data_id.dimensions.elements & self.records.keys():
703 if (record := data_id.records[element]) is not None:
704 self.records[element].add(record)
707class SerializableDimensionData(pydantic.RootModel):
708 """A pydantic model for normalized serialization of dimension records.
710 While dimension records are serialized directly via this model, they are
711 deserialized by constructing a `DimensionRecordSetDeserializer` from this
712 model, which allows full validation to be performed only on the records
713 that are actually loaded.
714 """
716 root: dict[str, list[SerializedKeyValueDimensionRecord]] = pydantic.Field(default_factory=dict)
718 @classmethod
719 def from_record_sets(cls, record_sets: Iterable[DimensionRecordSet]) -> SerializableDimensionData:
720 """Construct from an iterable of `DimensionRecordSet` objects.
722 Parameters
723 ----------
724 record_sets : `~collections.abc.Iterable` [ `DimensionRecordSet` ]
725 Sets of dimension records, each for a different dimension element.
727 Returns
728 -------
729 model : `SerializableDimensionData`
730 New model instance.
731 """
732 return cls.model_construct(
733 root={record_set.element.name: record_set.serialize_records() for record_set in record_sets}
734 )
736 def make_deserializers(self, universe: DimensionUniverse) -> list[DimensionRecordSetDeserializer]:
737 """Make objects from this model that handle the second phase of
738 deserialization.
740 Parameters
741 ----------
742 universe : `DimensionUniverse`
743 Definitions of all dimensions.
745 Returns
746 -------
747 deserializers : `list` [ `DimensionRecordSetDeserializer` ]
748 A list of deserializers objects, one for each dimension element.
749 """
750 return [
751 DimensionRecordSetDeserializer.from_raw(universe[element_name], raw_records)
752 for element_name, raw_records in self.root.items()
753 ]
756class DimensionDataAttacher:
757 """A helper class for attaching dimension records to data IDs.
759 Parameters
760 ----------
761 records : `dict` [`str`, `DimensionRecordSet`], optional
762 Regular dimension record sets, keyed by dimension element name. Not
763 copied, and may be modified in-place.
764 deserializers : `dict` [`str`, `DimensionRecordSetDeserializer`], optional
765 Partially-deserialized dimension records, keyed by dimension element
766 name. Records will be fully deserialized on demand and then cached.
767 cache : `DimensionRecordCache`, optional
768 A cache of dimension records from a butler instance. If present, this
769 is assumed to have records for elements that are not in ``records`` and
770 ``deserializers``.
771 dimensions : `DimensionGroup`, optional
772 Dimensions for which empty record sets should be added when no other
773 source of records is given. This allows data IDs with these dimensions
774 to have records attached by fetching them via the ``query`` argument
775 to the ``attach`` method, or by computing regions on the skypix
776 dimensions.
777 """
779 def __init__(
780 self,
781 *,
782 records: Iterable[DimensionRecordSet] = (),
783 deserializers: Iterable[DimensionRecordSetDeserializer] = (),
784 cache: DimensionRecordCache | None = None,
785 dimensions: DimensionGroup | None = None,
786 ):
787 self.records = {record_set.element.name: record_set for record_set in records}
788 self.deserializers: dict[str, DimensionRecordSetDeserializer] = {}
789 for deserializer in deserializers:
790 self.deserializers[deserializer.element.name] = deserializer
791 if deserializer.element.name not in self.records:
792 self.records[deserializer.element.name] = DimensionRecordSet(deserializer.element)
793 self.cache = cache
794 if dimensions is not None:
795 for element in dimensions.elements:
796 if element not in self.records and (self.cache is None or element not in self.cache):
797 self.records[element] = DimensionRecordSet(element, universe=dimensions.universe)
799 def attach(
800 self, dimensions: DimensionGroup, data_ids: Iterable[DataCoordinate], query: Query | None = None
801 ) -> list[DataCoordinate]:
802 """Attach dimension records to data IDs.
804 Parameters
805 ----------
806 dimensions : `DimensionGroup`
807 Dimensions of all given data IDs. All dimension elements must have
808 been referenced in at least one of the constructor arguments.
809 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ]
810 Data IDs to attach dimension records to (not in place; data
811 coordinates are immutable).
812 query : `.queries.Query`, optional
813 A butler query that can be used to look up missing dimension
814 records. Records fetched via query are cached in the ``records``
815 attribute.
817 Returns
818 -------
819 expanded : `list` [ `DataCoordinate` ]
820 Data IDs with dimension records attached, in the same order as the
821 original iterable.
822 """
823 lookup_helpers = [
824 _DimensionRecordLookupHelper.build(dimensions, element_name, self)
825 for element_name in dimensions.lookup_order
826 ]
827 records = [_InProgressRecordDicts(data_id) for data_id in data_ids]
828 for lookup_helper in lookup_helpers:
829 for r in records:
830 lookup_helper.lookup(r)
831 incomplete = lookup_helper.incomplete_records
832 if incomplete:
833 if query is not None:
834 lookup_helper.fetch_missing(query)
835 # We may still be missing records at this point, if they
836 # were not available in the database.
837 # This is intentional, because in existing Butler
838 # repositories dimension records are not always fully
839 # populated. (For example, it is common for a visit to
840 # exist without corresponding visit_detector_region
841 # records, since these are populated at different times
842 # by different processes.)
843 else:
844 raise LookupError(
845 f"No dimension record for element '{lookup_helper.element}' "
846 f"for data ID {incomplete[0].data_id}. "
847 f"{len(incomplete)} data ID{' was' if len(incomplete) == 1 else 's were'} "
848 "missing at least one record."
849 )
851 return [r.data_id.expanded(r.done) for r in records]
853 def serialized(
854 self, *, ignore: Iterable[str] = (), ignore_cached: bool = False, include_skypix: bool = False
855 ) -> SerializableDimensionData:
856 """Serialize all dimension data in this attacher, with deduplication
857 across fully- and partially-deserialized records.
859 Parameters
860 ----------
861 ignore : `~collections.abc.Iterable` [ `str` ], optional
862 Names of dimension elements that should not be serialized.
863 ignore_cached : `bool`, optional
864 If `True`, ignore all dimension elements for which
865 `DimensionElement.is_cached` is `True`.
866 include_skypix : `bool`, optional
867 If `True`, include skypix dimensions. These are ignored by default
868 because they can always be recomputed from their IDs on-the-fly.
870 Returns
871 -------
872 serialized : `SerializedDimensionData`
873 Serialized dimension records.
874 """
875 from ._skypix import SkyPixDimension
877 ignore = set(ignore)
878 result = SerializableDimensionData()
879 for record_set in self.records.values():
880 if record_set.element.name in ignore:
881 continue
882 if not include_skypix and isinstance(record_set.element, SkyPixDimension):
883 continue
884 if ignore_cached and record_set.element.is_cached:
885 continue
886 serialized_records: dict[tuple[DataIdValue, ...], SerializedKeyValueDimensionRecord] = {}
887 if (deserializer := self.deserializers.get(record_set.element.name)) is not None:
888 for key, value in deserializer._mapping.items():
889 serialized_record = list(key)
890 serialized_record.extend(value)
891 serialized_records[key] = serialized_record
892 for key, record in record_set._by_required_values.items():
893 if key not in serialized_records:
894 serialized_records[key] = record.serialize_key_value()
895 result.root[record_set.element.name] = list(serialized_records.values())
896 if self.cache is not None and not ignore_cached:
897 for record_set in self.cache.values():
898 result.root[record_set.element.name] = record_set.serialize_records()
899 return result
902@dataclasses.dataclass
903class _InProgressRecordDicts:
904 data_id: DataCoordinate
905 done: dict[str, DimensionRecord] = dataclasses.field(default_factory=dict)
908@dataclasses.dataclass
909class _DimensionRecordLookupHelper:
910 # These are the indices of the dimension record's data ID's required_values
911 # tuple in the to-be-expanded data ID's full-values tuple.
912 indices: list[int]
913 record_set: DimensionRecordSet
914 incomplete_records: list[_InProgressRecordDicts] = dataclasses.field(default_factory=list)
916 @property
917 def element(self) -> str:
918 return self.record_set.element.name
920 @staticmethod
921 def build(
922 dimensions: DimensionGroup, element: str, attacher: DimensionDataAttacher
923 ) -> _DimensionRecordLookupHelper:
924 indices = [
925 dimensions._data_coordinate_indices[k]
926 for k in dimensions.universe.elements[element].minimal_group.required
927 ]
928 if attacher.cache is not None and element in attacher.cache:
929 return _DimensionRecordLookupHelper(indices, attacher.cache[element])
930 elif element in dimensions.skypix:
931 return _SkyPixDimensionRecordLookupHelper(
932 indices,
933 attacher.records[element],
934 dimension=dimensions.universe.skypix_dimensions[element],
935 )
936 elif element in attacher.deserializers:
937 return _DeserializingDimensionRecordLookupHelper(
938 indices, attacher.records[element], deserializer=attacher.deserializers[element]
939 )
940 else:
941 return _DimensionRecordLookupHelper(indices, attacher.records[element])
943 def lookup(self, records: _InProgressRecordDicts) -> None:
944 required_values = self._get_required_values(records)
945 if (result := self.record_set._by_required_values.get(required_values)) is None:
946 result = self.fallback(required_values)
947 if result is not None:
948 self.record_set.add(result)
949 records.done[self.element] = result
950 else:
951 self.incomplete_records.append(records)
952 else:
953 records.done[self.element] = result
955 def _get_required_values(self, records: _InProgressRecordDicts) -> tuple[DataIdValue, ...]:
956 if records.data_id.hasFull():
957 full_values = records.data_id.full_values
958 return tuple([full_values[i] for i in self.indices])
959 else:
960 values = []
961 dimensions = self.record_set.element.minimal_group.required
962 for dimension in dimensions:
963 value = records.data_id.get(dimension)
964 if value is None:
965 value = self._find_implied_value(dimension, records)
966 values.append(value)
967 return tuple(values)
969 def _find_implied_value(self, implied_dimension: str, records: _InProgressRecordDicts) -> DataIdValue:
970 for rec in records.done.values():
971 if implied_dimension in rec.definition.implied:
972 return rec.get(implied_dimension)
974 raise LookupError(
975 f"Implied value for dimension '{implied_dimension}' not found in records for"
976 f" {list(records.done.keys())}"
977 )
979 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord | None:
980 return None
982 def fetch_missing(self, query: Query) -> None:
983 if self.incomplete_records:
984 missing_values = set(self._get_required_values(r) for r in self.incomplete_records)
985 self.record_set.update(
986 query.join_data_coordinates(
987 [
988 DataCoordinate.from_required_values(self.record_set.element.minimal_group, values)
989 for values in missing_values
990 ]
991 ).dimension_records(self.record_set.element.name)
992 )
994 missing = self.incomplete_records
995 self.incomplete_records = list()
996 for record in missing:
997 self.lookup(record)
1000@dataclasses.dataclass
1001class _DeserializingDimensionRecordLookupHelper(_DimensionRecordLookupHelper):
1002 deserializer: DimensionRecordSetDeserializer = dataclasses.field(kw_only=True)
1004 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord | None:
1005 try:
1006 return self.deserializer[required_values]
1007 except KeyError:
1008 return None
1011@dataclasses.dataclass
1012class _SkyPixDimensionRecordLookupHelper(_DimensionRecordLookupHelper):
1013 dimension: SkyPixDimension = dataclasses.field(kw_only=True)
1015 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord:
1016 id = required_values[0]
1017 return self.dimension.RecordClass(id=id, region=self.dimension.pixelization.pixel(id))