Coverage for python / lsst / daf / butler / _quantum.py: 19%
206 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DimensionRecordsAccumulator", "Quantum", "SerializedQuantum")
32import sys
33from collections.abc import Iterable, Mapping, MutableMapping, Sequence
34from typing import Any
36import pydantic
38from lsst.utils import doImportType
40from ._dataset_ref import DatasetRef, SerializedDatasetRef
41from ._dataset_type import DatasetType, SerializedDatasetType
42from ._named import NamedKeyDict, NamedKeyMapping
43from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
44from .dimensions import (
45 DataCoordinate,
46 DimensionRecord,
47 DimensionUniverse,
48 SerializedDataCoordinate,
49 SerializedDimensionRecord,
50)
53def _reconstructDatasetRef(
54 simple: SerializedDatasetRef,
55 type_: DatasetType | None,
56 ids: Iterable[int],
57 dimensionRecords: dict[int, SerializedDimensionRecord] | None,
58 universe: DimensionUniverse,
59) -> DatasetRef:
60 """Reconstruct a DatasetRef stored in a Serialized Quantum."""
61 # Reconstruct the dimension records
62 # if the dimension record has been loaded previously use that,
63 # otherwise load it from the dict of Serialized DimensionRecords
64 if dimensionRecords is None and ids:
65 raise ValueError("Cannot construct from a SerializedQuantum with no dimension records. ")
66 records = {}
67 for dId in ids:
68 # Ignore typing because it is missing that the above if statement
69 # ensures that if there is a loop that dimensionRecords is not None.
70 tmpSerialized = dimensionRecords[dId] # type: ignore
71 records[tmpSerialized.definition] = tmpSerialized
72 if simple.dataId is not None:
73 simple.dataId.records = records or None
74 rebuiltDatasetRef = DatasetRef.from_simple(simple, universe, datasetType=type_)
75 return rebuiltDatasetRef
78class SerializedQuantum(pydantic.BaseModel):
79 """Simplified model of a `Quantum` suitable for serialization."""
81 taskName: str | None = None
82 dataId: SerializedDataCoordinate | None = None
83 datasetTypeMapping: Mapping[str, SerializedDatasetType]
84 initInputs: Mapping[str, tuple[SerializedDatasetRef, list[int]]]
85 inputs: Mapping[str, list[tuple[SerializedDatasetRef, list[int]]]]
86 outputs: Mapping[str, list[tuple[SerializedDatasetRef, list[int]]]]
87 dimensionRecords: dict[int, SerializedDimensionRecord] | None = None
88 datastoreRecords: dict[str, SerializedDatastoreRecordData] | None = None
90 @classmethod
91 def direct(
92 cls,
93 *,
94 taskName: str | None,
95 dataId: dict | None,
96 datasetTypeMapping: Mapping[str, dict],
97 initInputs: Mapping[str, tuple[dict, list[int]]],
98 inputs: Mapping[str, list[tuple[dict, list[int]]]],
99 outputs: Mapping[str, list[tuple[dict, list[int]]]],
100 dimensionRecords: dict[int, dict] | None,
101 datastoreRecords: dict[str, dict] | None,
102 ) -> SerializedQuantum:
103 """Construct a `SerializedQuantum` directly without validators.
105 Parameters
106 ----------
107 taskName : `str` or `None`
108 The name of the task.
109 dataId : `dict` or `None`
110 The dataId of the quantum.
111 datasetTypeMapping : `~collections.abc.Mapping` [`str`, `dict`]
112 Dataset type definitions.
113 initInputs : `~collections.abc.Mapping`
114 The quantum init inputs.
115 inputs : `~collections.abc.Mapping`
116 The quantum inputs.
117 outputs : `~collections.abc.Mapping`
118 The quantum outputs.
119 dimensionRecords : `dict` [`int`, `dict`] or `None`
120 The dimension records.
121 datastoreRecords : `dict` [`str`, `dict`] or `None`
122 The datastore records.
124 Returns
125 -------
126 quantum : `SerializedQuantum`
127 Serializable model of the quantum.
129 Notes
130 -----
131 This differs from the pydantic "construct" method in that the arguments
132 are explicitly what the model requires, and it will recurse through
133 members, constructing them from their corresponding `direct` methods.
135 This method should only be called when the inputs are trusted.
136 """
137 serialized_dataId = SerializedDataCoordinate.direct(**dataId) if dataId is not None else None
138 serialized_datasetTypeMapping = {
139 k: SerializedDatasetType.direct(**v) for k, v in datasetTypeMapping.items()
140 }
141 serialized_initInputs = {
142 k: (SerializedDatasetRef.direct(**v), refs) for k, (v, refs) in initInputs.items()
143 }
144 serialized_inputs = {
145 k: [(SerializedDatasetRef.direct(**ref), id) for ref, id in v] for k, v in inputs.items()
146 }
147 serialized_outputs = {
148 k: [(SerializedDatasetRef.direct(**ref), id) for ref, id in v] for k, v in outputs.items()
149 }
150 serialized_records = (
151 {int(k): SerializedDimensionRecord.direct(**v) for k, v in dimensionRecords.items()}
152 if dimensionRecords is not None
153 else None
154 )
155 serialized_datastore_records = (
156 {k: SerializedDatastoreRecordData.direct(**v) for k, v in datastoreRecords.items()}
157 if datastoreRecords is not None
158 else None
159 )
161 node = cls.model_construct(
162 taskName=sys.intern(taskName or ""),
163 dataId=serialized_dataId,
164 datasetTypeMapping=serialized_datasetTypeMapping,
165 initInputs=serialized_initInputs,
166 inputs=serialized_inputs,
167 outputs=serialized_outputs,
168 dimensionRecords=serialized_records,
169 datastoreRecords=serialized_datastore_records,
170 )
172 return node
175class Quantum:
176 """Class representing a discrete unit of work.
178 A Quantum may depend on one or more datasets and produce one or more
179 datasets.
181 Most Quanta will be executions of a particular ``PipelineTask``’s
182 ``runQuantum`` method, but they can also be used to represent discrete
183 units of work performed manually by human operators or other software
184 agents.
186 Parameters
187 ----------
188 taskName : `str`, optional
189 Fully-qualified name of the Task class that executed or will execute
190 this Quantum. If not provided, ``taskClass`` must be.
191 taskClass : `type`, optional
192 The Task class that executed or will execute this Quantum. If not
193 provided, ``taskName`` must be. Overrides ``taskName`` if both are
194 provided.
195 dataId : `DataId`, optional
196 The dimension values that identify this `Quantum`.
197 initInputs : collection of `DatasetRef`, optional
198 Datasets that are needed to construct an instance of the Task. May
199 be a flat iterable of `DatasetRef` instances or a mapping from
200 `DatasetType` to `DatasetRef`.
201 inputs : `~collections.abc.Mapping`, optional
202 Inputs identified prior to execution, organized as a mapping from
203 `DatasetType` to a list of `DatasetRef`.
204 outputs : `~collections.abc.Mapping`, optional
205 Outputs from executing this quantum of work, organized as a mapping
206 from `DatasetType` to a list of `DatasetRef`.
207 datastore_records : `DatastoreRecordData`, optional
208 Datastore record data for input or initInput datasets that already
209 exist.
210 """
212 __slots__ = (
213 "_taskName",
214 "_taskClass",
215 "_dataId",
216 "_initInputs",
217 "_inputs",
218 "_outputs",
219 "_datastore_records",
220 )
222 def __init__(
223 self,
224 *,
225 taskName: str | None = None,
226 taskClass: type | None = None,
227 dataId: DataCoordinate | None = None,
228 initInputs: Mapping[DatasetType, DatasetRef] | Iterable[DatasetRef] | None = None,
229 inputs: Mapping[DatasetType, Sequence[DatasetRef]] | None = None,
230 outputs: Mapping[DatasetType, Sequence[DatasetRef]] | None = None,
231 datastore_records: Mapping[str, DatastoreRecordData] | None = None,
232 ):
233 if taskClass is not None:
234 taskName = f"{taskClass.__module__}.{taskClass.__name__}"
235 self._taskName = taskName
236 self._taskClass = taskClass
237 self._dataId = dataId
238 if initInputs is None:
239 initInputs = {}
240 elif not isinstance(initInputs, Mapping):
241 initInputs = {ref.datasetType: ref for ref in initInputs}
242 if inputs is None:
243 inputs = {}
244 if outputs is None:
245 outputs = {}
246 self._initInputs = NamedKeyDict[DatasetType, DatasetRef](initInputs).freeze()
247 self._inputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](
248 (k, tuple(v)) for k, v in inputs.items()
249 ).freeze()
250 self._outputs = NamedKeyDict[DatasetType, tuple[DatasetRef]](
251 (k, tuple(v)) for k, v in outputs.items()
252 ).freeze()
253 if datastore_records is None:
254 datastore_records = {}
255 self._datastore_records = datastore_records
257 def to_simple(self, accumulator: DimensionRecordsAccumulator | None = None) -> SerializedQuantum:
258 """Convert this class to a simple python type.
260 This makes it suitable for serialization.
262 Parameters
263 ----------
264 accumulator : `DimensionRecordsAccumulator`, optional
265 This accumulator can be used to aggregate dimension records accross
266 multiple Quanta. If this is None, the default, dimension records
267 are serialized with this Quantum. If an accumulator is supplied it
268 is assumed something else is responsible for serializing the
269 records, and they will not be stored with the SerializedQuantum.
271 Returns
272 -------
273 simple : `SerializedQuantum`
274 This object converted to a serializable representation.
275 """
276 typeMapping = {}
277 initInputs = {}
279 if accumulator is None:
280 accumulator = DimensionRecordsAccumulator()
281 writeDimensionRecords = True
282 else:
283 writeDimensionRecords = False
285 # collect the init inputs for serialization, recording the types into
286 # their own mapping, used throughout to minimize saving the same object
287 # multiple times. String name of the type used to index mappings.
288 for key, value in self._initInputs.items():
289 # add the type to the typeMapping
290 typeMapping[key.name] = key.to_simple()
291 # convert to a simple DatasetRef representation
292 simple = value.to_simple()
293 # extract the dimension records
294 recIds = []
295 if simple.dataId is not None and simple.dataId.records is not None:
296 # for each dimension record get a id by adding it to the
297 # record accumulator.
298 for element_name in value.dataId.dimensions.elements:
299 rec = value.dataId.records[element_name]
300 if rec is not None:
301 recordId = accumulator.addRecord(rec)
302 recIds.append(recordId)
303 # Set properties to None to save space
304 simple.dataId.records = None
305 simple.datasetType = None
306 initInputs[key.name] = (simple, recIds)
308 # container for all the SerializedDatasetRefs, keyed on the
309 # DatasetType name.
310 inputs = {}
312 # collect the inputs
313 for key, values in self._inputs.items():
314 # collect type if it is not already in the mapping
315 if key.name not in typeMapping:
316 typeMapping[key.name] = key.to_simple()
317 # for each input type there are a list of inputs, collect them
318 tmp = []
319 for e in values:
320 simp = e.to_simple()
321 # This container will hold ids (hashes) that point to all the
322 # dimension records within the SerializedDatasetRef dataId
323 # These dimension records repeat in almost every DatasetRef
324 # So it is hugely wasteful in terms of disk and cpu time to
325 # store them over and over again.
326 recIds = []
327 if simp.dataId is not None and simp.dataId.records is not None:
328 for element_name in e.dataId.dimensions.elements:
329 rec = e.dataId.records[element_name]
330 # for each dimension record get a id by adding it to
331 # the record accumulator.
332 if rec is not None:
333 recordId = accumulator.addRecord(rec)
334 recIds.append(recordId)
335 # Set the records to None to avoid serializing them
336 simp.dataId.records = None
337 # Dataset type is the same as the key in _inputs, no need
338 # to serialize it out multiple times, set it to None
339 simp.datasetType = None
340 # append a tuple of the simplified SerializedDatasetRef, along
341 # with the list of all the keys for the dimension records
342 # needed for reconstruction.
343 tmp.append((simp, recIds))
344 inputs[key.name] = tmp
346 # container for all the SerializedDatasetRefs, keyed on the
347 # DatasetType name.
348 outputs = {}
349 for key, values in self._outputs.items():
350 # collect type if it is not already in the mapping
351 if key.name not in typeMapping:
352 typeMapping[key.name] = key.to_simple()
353 # for each output type there are a list of inputs, collect them
354 tmp = []
355 for e in values:
356 simp = e.to_simple()
357 # This container will hold ids (hashes) that point to all the
358 # dimension records within the SerializedDatasetRef dataId
359 # These dimension records repeat in almost every DatasetRef
360 # So it is hugely wasteful in terms of disk and cpu time to
361 # store them over and over again.
362 recIds = []
363 if simp.dataId is not None and simp.dataId.records is not None:
364 for element_name in e.dataId.dimensions.elements:
365 rec = e.dataId.records[element_name]
366 # for each dimension record get a id by adding it to
367 # the record accumulator.
368 if rec is not None:
369 recordId = accumulator.addRecord(rec)
370 recIds.append(recordId)
371 # Set the records to None to avoid serializing them
372 simp.dataId.records = None
373 # Dataset type is the same as the key in _outputs, no need
374 # to serialize it out multiple times, set it to None
375 simp.datasetType = None
376 # append a tuple of the simplified SerializedDatasetRef, along
377 # with the list of all the keys for the dimension records
378 # needed for reconstruction.
379 tmp.append((simp, recIds))
380 outputs[key.name] = tmp
382 dimensionRecords: Mapping[int, SerializedDimensionRecord] | None
383 if writeDimensionRecords:
384 dimensionRecords = accumulator.makeSerializedDimensionRecordMapping()
385 else:
386 dimensionRecords = None
388 datastore_records: dict[str, SerializedDatastoreRecordData] | None = None
389 if self.datastore_records is not None:
390 datastore_records = {
391 datastore_name: record_data.to_simple()
392 for datastore_name, record_data in self.datastore_records.items()
393 }
395 return SerializedQuantum(
396 taskName=self._taskName,
397 dataId=self.dataId.to_simple() if self.dataId is not None else None,
398 datasetTypeMapping=typeMapping,
399 initInputs=initInputs,
400 inputs=inputs,
401 outputs=outputs,
402 dimensionRecords=dimensionRecords,
403 datastoreRecords=datastore_records,
404 )
406 @classmethod
407 def from_simple(
408 cls,
409 simple: SerializedQuantum,
410 universe: DimensionUniverse,
411 ) -> Quantum:
412 """Construct a new object from a simplified form.
414 Generally this is data returned from the `to_simple` method.
416 Parameters
417 ----------
418 simple : SerializedQuantum
419 The value returned by a call to `to_simple`.
420 universe : `DimensionUniverse`
421 The special graph of all known dimensions.
422 """
423 initInputs: MutableMapping[DatasetType, DatasetRef] = {}
425 # Unpersist all the init inputs
426 for key, (value, dimensionIds) in simple.initInputs.items():
427 type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe)
428 # reconstruct the dimension records
429 rebuiltDatasetRef = _reconstructDatasetRef(
430 value, type_, dimensionIds, simple.dimensionRecords, universe
431 )
432 initInputs[type_] = rebuiltDatasetRef
434 # containers for the dataset refs
435 inputs: MutableMapping[DatasetType, list[DatasetRef]] = {}
436 outputs: MutableMapping[DatasetType, list[DatasetRef]] = {}
438 for container, simpleRefs in ((inputs, simple.inputs), (outputs, simple.outputs)):
439 for key, values in simpleRefs.items():
440 type_ = DatasetType.from_simple(simple.datasetTypeMapping[key], universe=universe)
441 # reconstruct the list of DatasetRefs for this DatasetType
442 tmp: list[DatasetRef] = []
443 for v, recIds in values:
444 rebuiltDatasetRef = _reconstructDatasetRef(
445 v, type_, recIds, simple.dimensionRecords, universe
446 )
447 tmp.append(rebuiltDatasetRef)
448 container[type_] = tmp
450 dataId = (
451 DataCoordinate.from_simple(simple.dataId, universe=universe)
452 if simple.dataId is not None
453 else None
454 )
456 datastore_records: dict[str, DatastoreRecordData] | None = None
457 if simple.datastoreRecords is not None:
458 datastore_records = {
459 datastore_name: DatastoreRecordData.from_simple(record_data)
460 for datastore_name, record_data in simple.datastoreRecords.items()
461 }
463 quant = Quantum(
464 taskName=simple.taskName,
465 dataId=dataId,
466 initInputs=initInputs,
467 inputs=inputs,
468 outputs=outputs,
469 datastore_records=datastore_records,
470 )
471 return quant
473 @property
474 def taskClass(self) -> type | None:
475 """Task class associated with this `Quantum` (`type`)."""
476 if self._taskClass is None:
477 if self._taskName is None:
478 raise ValueError("No task class defined and task name is None")
479 task_class = doImportType(self._taskName)
480 self._taskClass = task_class
481 return self._taskClass
483 @property
484 def taskName(self) -> str | None:
485 """Return Fully-qualified name of the task associated with `Quantum`.
487 (`str`).
488 """
489 return self._taskName
491 @property
492 def dataId(self) -> DataCoordinate | None:
493 """Return dimension values of the unit of processing (`DataId`)."""
494 return self._dataId
496 @property
497 def initInputs(self) -> NamedKeyMapping[DatasetType, DatasetRef]:
498 """Return mapping of datasets used to construct the Task.
500 Has `DatasetType` instances as keys (names can also be used for
501 lookups) and `DatasetRef` instances as values.
502 """
503 return self._initInputs
505 @property
506 def inputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]:
507 """Return mapping of input datasets that were expected to be used.
509 Has `DatasetType` instances as keys (names can also be used for
510 lookups) and a list of `DatasetRef` instances as values.
512 Notes
513 -----
514 We cannot use `set` instead of `list` for the nested container because
515 `DatasetRef` instances cannot be compared reliably when some have
516 integers IDs and others do not.
517 """
518 return self._inputs
520 @property
521 def outputs(self) -> NamedKeyMapping[DatasetType, tuple[DatasetRef]]:
522 """Return mapping of output datasets (to be) generated by this quantum.
524 Has the same form as ``predictedInputs``.
526 Notes
527 -----
528 We cannot use `set` instead of `list` for the nested container because
529 `DatasetRef` instances cannot be compared reliably when some have
530 integers IDs and others do not.
531 """
532 return self._outputs
534 @property
535 def datastore_records(self) -> Mapping[str, DatastoreRecordData]:
536 """Tabular data stored with this quantum (`dict`).
538 This attribute may be modified in place, but not assigned to.
539 """
540 return self._datastore_records
542 def __eq__(self, other: object) -> bool:
543 if not isinstance(other, Quantum):
544 return False
545 for item in ("taskClass", "dataId", "initInputs", "inputs", "outputs"):
546 if getattr(self, item) != getattr(other, item):
547 return False
548 return True
550 def __hash__(self) -> int:
551 return hash((self.taskClass, self.dataId))
553 def __reduce__(self) -> str | tuple[Any, ...]:
554 return (
555 self._reduceFactory,
556 (
557 self.taskName,
558 self.taskClass,
559 self.dataId,
560 dict(self.initInputs.items()),
561 dict(self.inputs),
562 dict(self.outputs),
563 self.datastore_records,
564 ),
565 )
567 def __str__(self) -> str:
568 return f"{self.__class__.__name__}(taskName={self.taskName}, dataId={self.dataId})"
570 @staticmethod
571 def _reduceFactory(
572 taskName: str | None,
573 taskClass: type | None,
574 dataId: DataCoordinate | None,
575 initInputs: Mapping[DatasetType, DatasetRef] | Iterable[DatasetRef] | None,
576 inputs: Mapping[DatasetType, list[DatasetRef]] | None,
577 outputs: Mapping[DatasetType, list[DatasetRef]] | None,
578 datastore_records: Mapping[str, DatastoreRecordData],
579 ) -> Quantum:
580 return Quantum(
581 taskName=taskName,
582 taskClass=taskClass,
583 dataId=dataId,
584 initInputs=initInputs,
585 inputs=inputs,
586 outputs=outputs,
587 datastore_records=datastore_records,
588 )
591class DimensionRecordsAccumulator:
592 """Class used to accumulate dimension records for serialization.
594 This class generates an auto increment key for each unique dimension record
595 added to it. This allows serialization of dimension records to occur once
596 for each record but be refereed to multiple times.
597 """
599 def __init__(self) -> None:
600 self._counter = 0
601 self.mapping: MutableMapping[DimensionRecord, tuple[int, SerializedDimensionRecord]] = {}
603 def addRecord(self, record: DimensionRecord) -> int:
604 """Add a dimension record to the accumulator if it has not already been
605 added. When a record is inserted for the first time it is assigned
606 a unique integer key.
608 This function returns the key associated with the record (either the
609 newly allocated key, or the existing one).
611 Parameters
612 ----------
613 record : `DimensionRecord`
614 The record to add to the accumulator.
616 Returns
617 -------
618 accumulatorKey : int
619 The key that is associated with the supplied record.
620 """
621 if (mappingValue := self.mapping.get(record)) is None:
622 simple = record.to_simple()
623 mappingValue = (self._counter, simple)
624 self._counter += 1
625 self.mapping[record] = mappingValue
626 return mappingValue[0]
628 def makeSerializedDimensionRecordMapping(self) -> dict[int, SerializedDimensionRecord]:
629 return dict(self.mapping.values())