Coverage for python / lsst / daf / butler / datastore / record_data.py: 27%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Support for generic data stores."""
30from __future__ import annotations
32__all__ = ("DatastoreRecordData", "SerializedDatastoreRecordData")
34import dataclasses
35import uuid
36from collections.abc import Mapping
37from typing import TYPE_CHECKING, TypeAlias
39import pydantic
41from .._dataset_ref import DatasetId
42from ..dimensions import DimensionUniverse
43from ..persistence_context import PersistenceContextVars
44from .stored_file_info import StoredDatastoreItemInfo
46if TYPE_CHECKING:
47 from ..registry import Registry
49# Pydantic requires the possible value types to be explicitly enumerated in
50# order for `uuid.UUID` in particular to work. `typing.Any` does not work
51# here.
52_Record: TypeAlias = dict[str, int | str | None]
55class SerializedDatastoreRecordData(pydantic.BaseModel):
56 """Representation of a `DatastoreRecordData` suitable for serialization."""
58 dataset_ids: list[uuid.UUID]
59 """List of dataset IDs"""
61 records: Mapping[str, Mapping[str, Mapping[str, list[_Record]]]]
62 """List of records indexed by record class name, dataset ID (encoded as
63 str, because JSON), and opaque table name.
64 """
66 @classmethod
67 def direct(
68 cls,
69 *,
70 dataset_ids: list[str | uuid.UUID],
71 records: dict[str, dict[str, dict[str, list[_Record]]]],
72 ) -> SerializedDatastoreRecordData:
73 """Construct a `SerializedDatastoreRecordData` directly without
74 validators.
76 Parameters
77 ----------
78 dataset_ids : `list` [`str` or `uuid.UUID`]
79 The dataset UUIDs.
80 records : `dict`
81 The datastore records.
83 Notes
84 -----
85 This differs from the pydantic "construct" method in that the
86 arguments are explicitly what the model requires, and it will recurse
87 through members, constructing them from their corresponding `direct`
88 methods.
90 This method should only be called when the inputs are trusted.
91 """
92 data = cls.model_construct(
93 _fields_set={"dataset_ids", "records"},
94 # JSON makes strings out of UUIDs, need to convert them back
95 dataset_ids=[uuid.UUID(id) if isinstance(id, str) else id for id in dataset_ids],
96 records=records,
97 )
99 return data
102@dataclasses.dataclass
103class DatastoreRecordData:
104 """A struct that represents a tabular data export from a single
105 datastore.
106 """
108 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = dataclasses.field(
109 default_factory=dict
110 )
111 """Opaque table data, indexed by dataset ID and grouped by opaque table
112 name."""
114 @staticmethod
115 def merge_mappings(*args: Mapping[str, DatastoreRecordData]) -> dict[str, DatastoreRecordData]:
116 """Merge mappings of datastore record data.
118 Parameters
119 ----------
120 *args : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ]
121 Mappings of record data, keyed by datastore name.
123 Returns
124 -------
125 merged : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ]
126 Merged mapping of record data, keyed by datastore name.
127 """
128 result: dict[str, DatastoreRecordData] = {}
129 for arg in args:
130 for datastore_name, record_data in arg.items():
131 if datastore_name not in result:
132 result[datastore_name] = DatastoreRecordData()
133 result[datastore_name].update(record_data)
134 return result
136 def update(self, other: DatastoreRecordData) -> None:
137 """Update contents of this instance with data from another instance.
139 Parameters
140 ----------
141 other : `DatastoreRecordData`
142 Records to merge into this instance.
144 Notes
145 -----
146 If a ``(dataset_id, table_name)`` combination has any records in
147 ``self``, it is assumed that all records for that combination are
148 already present. This allows duplicates of the same dataset to be
149 handled gracefully.
150 """
151 for dataset_id, table_records in other.records.items():
152 this_table_records = self.records.setdefault(dataset_id, {})
153 for table_name, records in table_records.items():
154 # If this (dataset_id, table_name) combination already has
155 # records in `self`, we assume that means all of the records
156 # for that combination; we require other code to ensure entire
157 # (parent) datasets are exported to these data structures
158 # (never components).
159 if not (this_records := this_table_records.setdefault(table_name, [])):
160 this_records.extend(records)
162 def subset(self, dataset_ids: set[DatasetId]) -> DatastoreRecordData | None:
163 """Extract a subset of the records that match given dataset IDs.
165 Parameters
166 ----------
167 dataset_ids : `set` [ `DatasetId` ]
168 Dataset IDs to match.
170 Returns
171 -------
172 record_data : `DatastoreRecordData` or `None`
173 `None` is returned if there are no matching refs.
175 Notes
176 -----
177 Records in the returned instance are shared with this instance, clients
178 should not update or extend records in the returned instance.
179 """
180 matching_records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
181 for dataset_id in dataset_ids:
182 if (id_records := self.records.get(dataset_id)) is not None:
183 matching_records[dataset_id] = id_records
184 if matching_records:
185 return DatastoreRecordData(records=matching_records)
186 else:
187 return None
189 def to_simple(self, minimal: bool = False) -> SerializedDatastoreRecordData:
190 """Make representation of the object for serialization.
192 Implements `~lsst.daf.butler.json.SupportsSimple` protocol.
194 Parameters
195 ----------
196 minimal : `bool`, optional
197 If True produce minimal representation, not used by this method.
199 Returns
200 -------
201 simple : `dict`
202 Representation of this instance as a simple dictionary.
203 """
204 records: dict[str, dict[str, dict[str, list[_Record]]]] = {}
205 for dataset_id, table_data in self.records.items():
206 for table_name, table_records in table_data.items():
207 class_name, infos = StoredDatastoreItemInfo.to_records(table_records)
208 class_records = records.setdefault(class_name, {})
209 dataset_records = class_records.setdefault(dataset_id.hex, {})
210 dataset_records.setdefault(table_name, []).extend(dict(info) for info in infos)
211 return SerializedDatastoreRecordData(dataset_ids=list(self.records.keys()), records=records)
213 @classmethod
214 def from_simple(
215 cls,
216 simple: SerializedDatastoreRecordData,
217 universe: DimensionUniverse | None = None,
218 registry: Registry | None = None,
219 ) -> DatastoreRecordData:
220 """Make an instance of this class from serialized data.
222 Implements `~lsst.daf.butler.json.SupportsSimple` protocol.
224 Parameters
225 ----------
226 simple : `dict`
227 Serialized representation returned from `to_simple` method.
228 universe : `DimensionUniverse`, optional
229 Dimension universe, not used by this method.
230 registry : `Registry`, optional
231 Registry instance, not used by this method.
233 Returns
234 -------
235 item_info : `StoredDatastoreItemInfo`
236 De-serialized instance of `StoredDatastoreItemInfo`.
237 """
238 cache = PersistenceContextVars.dataStoreRecords.get()
239 key = frozenset(simple.dataset_ids)
240 if cache is not None and (cachedRecord := cache.get(key)) is not None:
241 return cachedRecord
242 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
243 # make sure that all dataset IDs appear in the dict even if they don't
244 # have records.
245 for dataset_id in simple.dataset_ids:
246 records[dataset_id] = {}
247 for class_name, class_data in simple.records.items():
248 for dataset_id_str, dataset_data in class_data.items():
249 for table_name, table_records in dataset_data.items():
250 try:
251 infos = StoredDatastoreItemInfo.from_records(class_name, table_records)
252 except TypeError as exc:
253 raise RuntimeError(
254 "The class specified in the SerializedDatastoreRecordData "
255 f"({class_name}) is not a StoredDatastoreItemInfo."
256 ) from exc
257 dataset_records = records.setdefault(uuid.UUID(dataset_id_str), {})
258 dataset_records.setdefault(table_name, []).extend(infos)
259 newRecord = cls(records=records)
260 if cache is not None:
261 cache[key] = newRecord
262 return newRecord