Coverage for python / lsst / pipe / base / _quantumContext.py: 17%
154 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Module defining variants for valid values used to constrain datasets in a
29graph building query.
30"""
32from __future__ import annotations
34__all__ = ("ExecutionResources", "QuantumContext")
36import numbers
37import uuid
38from collections.abc import Callable, Sequence
39from dataclasses import dataclass
40from typing import Any
42import astropy.units as u
44from lsst.daf.butler import (
45 DataCoordinate,
46 DatasetProvenance,
47 DatasetRef,
48 DatasetType,
49 DimensionUniverse,
50 LimitedButler,
51 Quantum,
52)
53from lsst.utils.introspection import get_full_type_name
54from lsst.utils.logging import PeriodicLogger, getLogger
56from .automatic_connection_constants import LOG_OUTPUT_CONNECTION_NAME, METADATA_OUTPUT_CONNECTION_NAME
57from .connections import DeferredDatasetRef, InputQuantizedConnection, OutputQuantizedConnection
58from .struct import Struct
60_LOG = getLogger(__name__)
63@dataclass(init=False, frozen=True)
64class ExecutionResources:
65 """A description of the resources available to a running quantum.
67 Parameters
68 ----------
69 num_cores : `int`, optional
70 The number of cores allocated to the task.
71 max_mem : `~astropy.units.Quantity`, `numbers.Real`, `str`, or `None`,\
72 optional
73 The amount of memory allocated to the task. Can be specified
74 as byte-compatible `~astropy.units.Quantity`, a plain number,
75 a string with a plain number, or a string representing a quantity.
76 If `None` no limit is specified.
77 default_mem_units : `astropy.units.Unit`, optional
78 The default unit to apply when the ``max_mem`` value is given
79 as a plain number.
80 """
82 num_cores: int = 1
83 """The maximum number of cores that the task can use."""
85 max_mem: u.Quantity | None = None
86 """If defined, the amount of memory allocated to the task.
87 """
89 def __init__(
90 self,
91 *,
92 num_cores: int = 1,
93 max_mem: u.Quantity | numbers.Real | str | None = None,
94 default_mem_units: u.Unit = u.B,
95 ):
96 # Create our own __init__ to allow more flexible input parameters
97 # but with a constrained dataclass definition.
98 if num_cores < 1:
99 raise ValueError("The number of cores must be a positive integer")
101 object.__setattr__(self, "num_cores", num_cores)
103 mem: u.Quantity | None = None
105 if max_mem is None or isinstance(max_mem, u.Quantity):
106 mem = max_mem
107 elif max_mem == "":
108 # Some command line tooling can treat no value as empty string.
109 pass
110 else:
111 parsed_mem = None
112 try:
113 parsed_mem = float(max_mem)
114 except ValueError:
115 pass
116 else:
117 mem = parsed_mem * default_mem_units
119 if mem is None:
120 mem = u.Quantity(max_mem)
122 if mem is not None:
123 # Force to bytes. This also checks that we can convert to bytes.
124 mem = mem.to(u.B)
126 object.__setattr__(self, "max_mem", mem)
128 def __deepcopy__(self, memo: Any) -> ExecutionResources:
129 """Deep copy returns itself because the class is frozen."""
130 return self
132 def _reduce_kwargs(self) -> dict[str, Any]:
133 """Return a dict of the keyword arguments that should be used
134 by `__reduce__`.
136 This is necessary because the dataclass is defined to be keyword
137 only and we wish the default pickling to only store a plain number
138 for the memory allocation and not a large Quantity.
140 Returns
141 -------
142 kwargs : `dict`
143 Keyword arguments to be used when pickling.
144 """
145 kwargs: dict[str, Any] = {"num_cores": self.num_cores}
146 if self.max_mem is not None:
147 # .value is a numpy float. Cast it to a python int since we
148 # do not want fractional bytes. The constructor ensures that this
149 # uses units of byte so we do not have to convert.
150 kwargs["max_mem"] = int(self.max_mem.value)
151 return kwargs
153 @classmethod
154 def _unpickle_via_factory(
155 cls: type[ExecutionResources], args: Sequence[Any], kwargs: dict[str, Any]
156 ) -> ExecutionResources:
157 """Unpickle something by calling a factory.
159 Allows unpickle using `__reduce__` with keyword
160 arguments as well as positional arguments.
161 """
162 return cls(**kwargs)
164 def __reduce__(
165 self,
166 ) -> tuple[
167 Callable[[Sequence[Any], dict[str, Any]], ExecutionResources],
168 tuple[Sequence[Any], dict[str, Any]],
169 ]:
170 """Pickler."""
171 return self._unpickle_via_factory, ([], self._reduce_kwargs())
174class QuantumContext:
175 """A Butler-like class specialized for a single quantum along with
176 context information that can influence how the task is executed.
178 Parameters
179 ----------
180 butler : `lsst.daf.butler.LimitedButler`
181 Butler object from/to which datasets will be get/put.
182 quantum : `lsst.daf.butler.Quantum`
183 Quantum object that describes the datasets which will be get/put by a
184 single execution of this node in the pipeline graph.
185 resources : `ExecutionResources`, optional
186 The resources allocated for executing quanta.
187 quantum_id : `uuid.UUID` or `None`, optional
188 The ID of the quantum being executed. Used for provenance.
190 Notes
191 -----
192 A `QuantumContext` class wraps a standard butler interface and
193 specializes it to the context of a given quantum. What this means
194 in practice is that the only gets and puts that this class allows
195 are DatasetRefs that are contained in the quantum.
197 In the future this class will also be used to record provenance on
198 what was actually get and put. This is in contrast to what the
199 preflight expects to be get and put by looking at the graph before
200 execution.
201 """
203 resources: ExecutionResources
205 def __init__(
206 self,
207 butler: LimitedButler,
208 quantum: Quantum,
209 *,
210 resources: ExecutionResources | None = None,
211 quantum_id: uuid.UUID | None = None,
212 ):
213 self.quantum = quantum
214 if resources is None:
215 resources = ExecutionResources()
216 self.resources = resources
218 self.allInputs = set()
219 self.allOutputs = set()
220 for refs in quantum.inputs.values():
221 for ref in refs:
222 self.allInputs.add((ref.datasetType, ref.dataId, ref.id))
223 for dataset_type, refs in quantum.outputs.items():
224 if dataset_type.name.endswith(METADATA_OUTPUT_CONNECTION_NAME) or dataset_type.name.endswith(
225 LOG_OUTPUT_CONNECTION_NAME
226 ):
227 # Don't consider log and metadata datasets to be outputs in
228 # this context, because we don't want the task to be able to
229 # write them itself; that's for the execution system to do.
230 continue
231 for ref in refs:
232 self.allOutputs.add((ref.datasetType, ref.dataId, ref.id))
233 self.outputsPut: set[tuple[DatasetType, DataCoordinate, uuid.UUID]] = set()
234 self.__butler = butler
235 self.dataset_provenance = DatasetProvenance(quantum_id=quantum_id)
237 def _get(self, ref: DeferredDatasetRef | DatasetRef | None) -> Any:
238 # Butler methods below will check for unresolved DatasetRefs and
239 # raise appropriately, so no need for us to do that here.
240 if isinstance(ref, DeferredDatasetRef):
241 self._checkMembership(ref.datasetRef, self.allInputs)
242 self.dataset_provenance.add_input(ref.datasetRef)
243 return self.__butler.getDeferred(ref.datasetRef)
244 elif ref is None:
245 return None
246 else:
247 self._checkMembership(ref, self.allInputs)
248 self.dataset_provenance.add_input(ref)
249 return self.__butler.get(ref)
251 def _put(self, value: Any, ref: DatasetRef) -> None:
252 """Store data in butler."""
253 self._checkMembership(ref, self.allOutputs)
254 self.__butler.put(value, ref, provenance=self.dataset_provenance)
255 self.outputsPut.add((ref.datasetType, ref.dataId, ref.id))
257 def get(
258 self,
259 dataset: (
260 InputQuantizedConnection
261 | list[DatasetRef | None]
262 | list[DeferredDatasetRef | None]
263 | DatasetRef
264 | DeferredDatasetRef
265 | None
266 ),
267 ) -> Any:
268 """Fetch data from the butler.
270 Parameters
271 ----------
272 dataset : see description
273 This argument may either be an `InputQuantizedConnection` which
274 describes all the inputs of a quantum, a list of
275 `~lsst.daf.butler.DatasetRef`, or a single
276 `~lsst.daf.butler.DatasetRef`. The function will get and return
277 the corresponding datasets from the butler. If `None` is passed in
278 place of a `~lsst.daf.butler.DatasetRef` then the corresponding
279 returned object will be `None`.
281 Returns
282 -------
283 return : `object`
284 This function returns arbitrary objects fetched from the butler.
285 The structure these objects are returned in depends on the type of
286 the input argument. If the input dataset argument is a
287 `InputQuantizedConnection`, then the return type will be a
288 dictionary with keys corresponding to the attributes of the
289 `InputQuantizedConnection` (which in turn are the attribute
290 identifiers of the connections). If the input argument is of type
291 `list` of `~lsst.daf.butler.DatasetRef` then the return type will
292 be a list of objects. If the input argument is a single
293 `~lsst.daf.butler.DatasetRef` then a single object will be
294 returned.
296 Raises
297 ------
298 ValueError
299 Raised if a `~lsst.daf.butler.DatasetRef` is passed to get that is
300 not defined in the quantum object
301 """
302 # Set up a periodic logger so log messages can be issued if things
303 # are taking too long.
304 periodic = PeriodicLogger(_LOG)
306 if isinstance(dataset, InputQuantizedConnection):
307 retVal = {}
308 n_connections = len(dataset)
309 n_retrieved = 0
310 for i, (name, ref) in enumerate(dataset):
311 if isinstance(ref, list | tuple):
312 val = []
313 n_refs = len(ref)
314 for j, r in enumerate(ref):
315 val.append(self._get(r))
316 n_retrieved += 1
317 periodic.log(
318 "Retrieved %d out of %d datasets for connection '%s' (%d out of %d)",
319 j + 1,
320 n_refs,
321 name,
322 i + 1,
323 n_connections,
324 )
325 else:
326 val = self._get(ref)
327 periodic.log(
328 "Retrieved dataset for connection '%s' (%d out of %d)",
329 name,
330 i + 1,
331 n_connections,
332 )
333 n_retrieved += 1
334 retVal[name] = val
335 if periodic.num_issued > 0:
336 # This took long enough that we issued some periodic log
337 # messages, so issue a final confirmation message as well.
338 _LOG.verbose(
339 "Completed retrieval of %d datasets from %d connections", n_retrieved, n_connections
340 )
341 return retVal
342 elif isinstance(dataset, list | tuple):
343 n_datasets = len(dataset)
344 retrieved = []
345 for i, x in enumerate(dataset):
346 # Mypy is not sure of the type of x because of the union
347 # of lists so complains. Ignoring it is more efficient
348 # than adding an isinstance assert.
349 retrieved.append(self._get(x))
350 periodic.log("Retrieved %d out of %d datasets", i + 1, n_datasets)
351 if periodic.num_issued > 0:
352 _LOG.verbose("Completed retrieval of %d datasets", n_datasets)
353 return retrieved
354 elif isinstance(dataset, DatasetRef | DeferredDatasetRef) or dataset is None:
355 return self._get(dataset)
356 else:
357 raise TypeError(
358 f"Dataset argument ({get_full_type_name(dataset)}) is not a type that can be used to get"
359 )
361 def put(
362 self,
363 values: Struct | list[Any] | Any,
364 dataset: OutputQuantizedConnection | list[DatasetRef] | DatasetRef,
365 ) -> None:
366 """Put data into the butler.
368 Parameters
369 ----------
370 values : `Struct` or `list` of `object` or `object`
371 The data that should be put with the butler. If the type of the
372 dataset is `OutputQuantizedConnection` then this argument should be
373 a `Struct` with corresponding attribute names. Each attribute
374 should then correspond to either a list of object or a single
375 object depending of the type of the corresponding attribute on
376 dataset. I.e. if ``dataset.calexp`` is
377 ``[datasetRef1, datasetRef2]`` then ``values.calexp`` should be
378 ``[calexp1, calexp2]``. Like wise if there is a single ref, then
379 only a single object need be passed. The same restriction applies
380 if dataset is directly a `list` of `~lsst.daf.butler.DatasetRef`
381 or a single `~lsst.daf.butler.DatasetRef`. If ``values.NAME`` is
382 None, no output is written.
383 dataset : `OutputQuantizedConnection` or `list` \
384 [`lsst.daf.butler.DatasetRef`] or `lsst.daf.butler.DatasetRef`
385 This argument may either be an `InputQuantizedConnection` which
386 describes all the inputs of a quantum, a list of
387 `lsst.daf.butler.DatasetRef`, or a single
388 `lsst.daf.butler.DatasetRef`. The function will get and return
389 the corresponding datasets from the butler.
391 Raises
392 ------
393 ValueError
394 Raised if a `~lsst.daf.butler.DatasetRef` is passed to put that is
395 not defined in the `~lsst.daf.butler.Quantum` object, or the type
396 of values does not match what is expected from the type of dataset.
397 """
398 if isinstance(dataset, OutputQuantizedConnection):
399 if not isinstance(values, Struct):
400 raise ValueError(
401 "dataset is a OutputQuantizedConnection, a Struct with corresponding"
402 " attributes must be passed as the values to put"
403 )
404 for name, refs in dataset:
405 if (valuesAttribute := getattr(values, name, None)) is None:
406 continue
407 if isinstance(refs, list | tuple):
408 if len(refs) != len(valuesAttribute):
409 raise ValueError(f"There must be a object to put for every Dataset ref in {name}")
410 for i, ref in enumerate(refs):
411 self._put(valuesAttribute[i], ref)
412 else:
413 self._put(valuesAttribute, refs)
414 elif isinstance(dataset, list | tuple):
415 if not isinstance(values, Sequence):
416 raise ValueError("Values to put must be a sequence")
417 if len(dataset) != len(values):
418 raise ValueError("There must be a common number of references and values to put")
419 for i, ref in enumerate(dataset):
420 self._put(values[i], ref)
421 elif isinstance(dataset, DatasetRef):
422 self._put(values, dataset)
423 else:
424 raise TypeError("Dataset argument is not a type that can be used to put")
426 def _checkMembership(self, ref: list[DatasetRef] | DatasetRef, inout: set) -> None:
427 """Check if a `~lsst.daf.butler.DatasetRef` is part of the input
428 `~lsst.daf.butler.Quantum`.
430 This function will raise an exception if the `QuantumContext` is
431 used to get/put a `~lsst.daf.butler.DatasetRef` which is not defined
432 in the quantum.
434 Parameters
435 ----------
436 ref : `list` [ `~lsst.daf.butler.DatasetRef` ] or \
437 `~lsst.daf.butler.DatasetRef`
438 Either a `list` or a single `~lsst.daf.butler.DatasetRef` to check
439 inout : `set`
440 The connection type to check, e.g. either an input or an output.
441 This prevents both types needing to be checked for every operation,
442 which may be important for Quanta with lots of
443 `~lsst.daf.butler.DatasetRef`.
444 """
445 if not isinstance(ref, list | tuple):
446 ref = [ref]
447 for r in ref:
448 if (r.datasetType, r.dataId, r.id) not in inout:
449 raise ValueError("DatasetRef is not part of the Quantum being processed")
451 @property
452 def dimensions(self) -> DimensionUniverse:
453 """Structure managing all dimensions recognized by this data
454 repository (`~lsst.daf.butler.DimensionUniverse`).
455 """
456 return self.__butler.dimensions
458 def add_additional_provenance(self, ref: DatasetRef, extra: dict[str, int | float | str | bool]) -> None:
459 """Add additional provenance information to the dataset provenance.
461 Parameters
462 ----------
463 ref : `lsst.daf.butler.DatasetRef`
464 The dataset to attach provenance to. This dataset must have been
465 retrieved by this quantum context.
466 extra : `dict` [ `str`, `int` | `float` | `str` | `bool` ]
467 Additional information to attach as provenance information. Keys
468 must be strings and values must be simple scalars.
469 """
470 self.dataset_provenance.add_extra_provenance(ref.id, extra)