Coverage for python / lsst / analysis / tools / interfaces / _task.py: 16%
268 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:23 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23"""Base class implementation for the classes needed in creating `PipelineTasks`
24which execute `AnalysisTools`.
26The classes defined in this module have all the required behaviors for
27defining, introspecting, and executing `AnalysisTools` against an input dataset
28type.
30Subclasses of these tasks should specify specific datasets to consume in their
31connection classes and should specify a unique name
32"""
34__all__ = ("AnalysisBaseConnections", "AnalysisBaseConfig", "AnalysisPipelineTask")
36import datetime
37import logging
38import pprint
39import traceback
40import warnings
41import weakref
42from collections.abc import Collection, Iterable
43from copy import deepcopy
44from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, cast
46import matplotlib.pyplot as plt
47from lsst.verify import Measurement
49if TYPE_CHECKING:
50 from lsst.daf.butler import DatasetRef, DeferredDatasetHandle
51 from lsst.pipe.base import QuantumContext
53from lsst.daf.butler import DataCoordinate
54from lsst.pex.config import Field, ListField
55from lsst.pex.config.configurableActions import ConfigurableActionStructField
56from lsst.pipe.base import (
57 AlgorithmError,
58 AnnotatedPartialOutputsError,
59 Instrument,
60 PipelineTask,
61 PipelineTaskConfig,
62 PipelineTaskConnections,
63 Struct,
64)
65from lsst.pipe.base import connectionTypes as ct
66from lsst.pipe.base.connections import InputQuantizedConnection, OutputQuantizedConnection
67from lsst.pipe.base.pipelineIR import ConfigIR, ParametersIR
69from ._actions import JointAction, MetricAction, NoMetric
70from ._analysisTools import AnalysisTool
71from ._interfaces import KeyedData, PlotTypes
72from ._metricMeasurementBundle import MetricMeasurementBundle
74# TODO: This rcParams modification is a temporary solution, hiding
75# a matplotlib warning indicating too many figures have been opened.
76# When DM-39114 is implemented, this should be removed.
77plt.rcParams.update({"figure.max_open_warning": 0})
80# TODO: This _plotCloser function assists in closing all open plots at the
81# conclusion of a PipelineTask. When DM-39114 is implemented, this function and
82# all associated usage thereof should be removed.
83def _plotCloser(*args):
84 """Close all the plots in the given list."""
85 for plot in args:
86 plt.close(plot)
89class PartialAtoolsFailureError(AlgorithmError):
90 """Raised if an analysis tools action has partially failed.
92 Parameters
93 ----------
94 caughtErrors : `dict` of (`str`, `str`)
95 A dictionary mapping the action name to the string representation of
96 the exception raised by that action.
97 """
99 def __init__(self, caughtErrors) -> None:
100 self._caughtErrors = caughtErrors
101 errString = "These tasks have failed for the following reasons: "
102 for errName in caughtErrors.keys():
103 errString += errName + ": "
104 errString += pprint.pformat(caughtErrors[errName])
105 errString += ", "
106 errString = errString[:-2]
107 super().__init__(errString)
109 @property
110 def metadata(self) -> dict:
111 return self._caughtErrors
114class AnalysisBaseConnections(
115 PipelineTaskConnections, dimensions={}, defaultTemplates={"outputName": "Placeholder"}
116):
117 r"""Base class for Connections used for AnalysisTools PipelineTasks.
119 This class has a pre-defined output connection for the
120 MetricMeasurementMapping. The dataset type name for this connection is
121 determined by the template ``outputName``.
123 Output connections for plots created by `AnalysisPlot`\ s are created
124 dynamically when an instance of the class is created. The init method
125 examines all the `AnalysisPlot` actions specified in the associated
126 `AnalysisBaseConfig` subclass accumulating all the info needed to
127 create the output connections.
129 The dimensions for all of the output connections (metric and plot) will
130 be the same as the dimensions specified for the AnalysisBaseConnections
131 subclass (i.e. quantum dimensions).
132 """
134 metrics = ct.Output(
135 doc="Metrics calculated on input dataset type",
136 name="{outputName}_metrics",
137 storageClass="MetricMeasurementBundle",
138 )
140 def __init__(self, *, config: AnalysisBaseConfig = None): # type: ignore
141 # Validate that the outputName template has been set in config. This
142 # should have been checked early with the configs validate method, but
143 # it is possible for someone to manually create everything in a script
144 # without running validate, so also check it late here.
145 if (outputName := config.connections.outputName) == "Placeholder": # type: ignore
146 raise RuntimeError(
147 "Subclasses must specify an alternative value for the defaultTemplate `outputName`"
148 )
149 super().__init__(config=config)
151 # All arguments must be passed by kw, but python has not method to do
152 # that without specifying a default, so None is used. Validate that
153 # it is not None. This is largely for typing reasons, as in the normal
154 # course of operation code execution paths ensure this will not be None
155 assert config is not None
157 for tool in config.atools:
158 match tool.produce:
159 case JointAction():
160 if isinstance(tool.produce.metric, NoMetric):
161 continue
162 if len(tool.produce.metric.units) != 0:
163 hasMetrics = True
164 break
165 case MetricAction():
166 hasMetrics = True
167 break
168 else:
169 hasMetrics = False
171 # Set the dimensions for the metric
172 if hasMetrics:
173 self.metrics = ct.Output(
174 name=self.metrics.name,
175 doc=self.metrics.doc,
176 storageClass=self.metrics.storageClass,
177 dimensions=self.dimensions,
178 multiple=False,
179 isCalibration=False,
180 )
181 else:
182 # There are no metrics to produce, remove the output connection
183 self.outputs.remove("metrics")
185 # Look for any conflicting names, creating a set of them, as these
186 # will be added to the instance as well as recorded in the outputs
187 # set.
188 existingNames = set(dir(self))
190 # Accumulate all the names to be used from all of the defined
191 # AnalysisPlots.
192 names: Mapping[str, AnalysisTool] = {}
193 for action in config.atools:
194 if action.dynamicOutputNames:
195 outNames = action.getOutputNames(config=config)
196 else:
197 outNames = action.getOutputNames()
198 if action.parameterizedBand:
199 for band in config.bands:
200 names.update({name.format(band=band): action for name in outNames})
201 else:
202 names.update({name: action for name in outNames})
204 # For each of the names found, create output connections.
205 for name, action in names.items():
206 name = f"{outputName}_{name}"
207 if name in self.outputs or name in existingNames:
208 raise NameError(
209 f"Plot with name {name} conflicts with existing connection"
210 " are two plots named the same?"
211 )
213 if action.parameterizedBand and "band" not in self.dimensions:
214 # If band is in self.dimensions, but the bands still appear
215 # in the output names, then it is likely a user error or the
216 # user is intentional in having a pair-wise plot. So we will
217 # let the band names pass through in the else block. To avoid
218 # the band from appearing, parametrizedBand needs to be False.
219 multiple = True
220 dimensions = self.dimensions.union({"band"})
221 # Struct attributes have the structure
222 # {outputName}_{band}_{name}.
223 band_name = name.split(outputName, maxsplit=1)[1] # _{band}_{name}.
224 # Limit the maximum split to 2 to avoid splitting the name
225 # that may contain underscores.
226 band, name = band_name.split("_", maxsplit=2)[1:] # ["", band, name].
227 name = f"{outputName}_{name}"
228 else:
229 multiple = False
230 dimensions = self.dimensions
232 outConnection = ct.Output(
233 name=name,
234 storageClass="Plot",
235 doc="Dynamic connection for plotting",
236 dimensions=dimensions,
237 multiple=multiple,
238 )
239 setattr(self, name, outConnection)
241 def adjustQuantum(
242 self,
243 inputs: dict[str, tuple[ct.BaseInput, Collection[DatasetRef]]],
244 outputs: dict[str, tuple[ct.Output, Collection[DatasetRef]]],
245 label: str,
246 data_id: DataCoordinate,
247 ) -> tuple[
248 Mapping[str, tuple[ct.BaseInput, Collection[DatasetRef]]],
249 Mapping[str, tuple[ct.Output, Collection[DatasetRef]]],
250 ]:
251 # If the task does not have band in its dimensions but an output
252 # connection does, only keep the output refs that are consistent with
253 # the producing AnalysisTool's 'bands' attribute.
254 if "band" in data_id.dimensions.names:
255 # Nothing to do, since the task dimensions already have band; just
256 # delegate to super. Note that we use the data ID dimensions since
257 # self.dimensions is not an (expanded) DimensionGroup.
258 return super().adjustQuantum(inputs, outputs, label, data_id)
259 adjusted_outputs: dict[str, tuple[ct.Output, list[DatasetRef]]] = {}
260 for name, (connection, old_refs) in outputs.items():
261 # We also don't want to rely on connection.dimensions; the
262 # dimensions in the refs are expanded and hence safer.
263 new_refs: list[DatasetRef] | None = None
264 for old_ref in old_refs:
265 if new_refs is None:
266 if "band" in old_ref.datasetType.dimensions:
267 new_refs = []
268 else:
269 # If this ref doesn't have 'band' none of the others
270 # for this connection will either; move on to the next.
271 break
272 if old_ref.dataId["band"] in self.config.bands:
273 new_refs.append(old_ref)
274 if new_refs is not None:
275 adjusted_outputs[name] = (connection, new_refs)
276 # Update the original outputs so we can pass them to super.
277 outputs.update(adjusted_outputs)
278 super().adjustQuantum(inputs, outputs, label, data_id)
279 # Return only the connections we modified.
280 return {}, adjusted_outputs
283def _timestampValidator(value: str) -> bool:
284 if value in ("reference_package_timestamp", "run_timestamp", "current_timestamp", "dataset_timestamp"):
285 return True
286 elif "explicit_timestamp" in value:
287 try:
288 _, splitTime = value.split(":")
289 except ValueError:
290 logging.error(
291 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
292 r"where datetime is given in the form '%Y%m%dT%H%M%S%z"
293 )
294 return False
295 try:
296 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z")
297 except ValueError:
298 # This is explicitly chosen to be an f string as the string
299 # contains control characters.
300 logging.error(
301 f"The supplied datetime {splitTime} could not be parsed correctly into "
302 r"%Y%m%dT%H%M%S%z format"
303 )
304 return False
305 return True
306 else:
307 return False
310class AnalysisBaseConfig(PipelineTaskConfig, pipelineConnections=AnalysisBaseConnections):
311 """Base class for all configs used to define an `AnalysisPipelineTask`.
313 This base class defines two fields that should be used in all subclasses,
314 atools, and bands.
316 The ``atools`` field is where the user configures which analysis tools will
317 be run as part of this `PipelineTask`.
319 The bands field specifies which bands will be looped over for
320 `AnalysisTools` which support parameterized bands. I.e. called once for
321 each band in the list.
322 """
324 atools = ConfigurableActionStructField[AnalysisTool](
325 doc="The analysis tools that are to be run by this task at execution"
326 )
327 # Temporarally alias these for backwards compatibility
328 plots = atools
329 metrics = atools
330 bands = ListField[str](
331 doc="Filter bands on which to run all of the actions", default=["u", "g", "r", "i", "z", "y"]
332 )
333 metric_tags = ListField[str](
334 doc="List of tags which will be added to all configurable actions", default=[]
335 )
336 dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True)
337 reference_package = Field[str](
338 doc="A package who's version, at the time of metric upload to a "
339 "time series database, will be converted to a timestamp of when "
340 "that version was produced",
341 default="lsst_distrib",
342 )
343 timestamp_version = Field[str](
344 doc="Which time stamp should be used as the reference timestamp for a "
345 "metric in a time series database, valid values are; "
346 "reference_package_timestamp, run_timestamp, current_timestamp, "
347 "dataset_timestamp and explicit_timestamp:datetime where datetime is "
348 "given in the form %Y%m%dT%H%M%S%z",
349 default="run_timestamp",
350 check=_timestampValidator,
351 )
352 addOutputNamePrefix = Field[bool](
353 doc="If True, the connections class output name will be prefixed "
354 "to the analysis 'atools' name when the value is dispatched to sasquatch.",
355 default=False,
356 )
358 def applyConfigOverrides(
359 self,
360 instrument: Instrument | None,
361 taskDefaultName: str,
362 pipelineConfigs: Iterable[ConfigIR] | None,
363 parameters: ParametersIR,
364 label: str,
365 ) -> None:
366 extraConfig = {}
367 if (value := parameters.mapping.get("sasquatch_dataset_identifier", None)) is not None:
368 extraConfig["dataset_identifier"] = value
369 if (value := parameters.mapping.get("sasquatch_reference_package", None)) is not None:
370 extraConfig["reference_package"] = value
371 if (value := parameters.mapping.get("sasquatch_timestamp_version", None)) is not None:
372 if "explicit_timestamp" in value:
373 try:
374 _, splitTime = value.split(":")
375 except ValueError as excpt:
376 raise ValueError(
377 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
378 "where datetime is given in the form '%Y%m%dT%H%M%S%z"
379 ) from excpt
380 try:
381 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z")
382 except ValueError as excpt:
383 raise ValueError(
384 f"The supplied datetime {splitTime} could not be parsed correctly into "
385 "%Y%m%dT%H%M%S%z format"
386 ) from excpt
387 extraConfig["timestamp_version"] = value
388 if extraConfig:
389 newPipelineConfigs = [ConfigIR(rest=extraConfig)]
390 if pipelineConfigs is not None:
391 newPipelineConfigs.extend(pipelineConfigs)
392 pipelineConfigs = newPipelineConfigs
393 return super().applyConfigOverrides(instrument, taskDefaultName, pipelineConfigs, parameters, label)
395 def freeze(self):
396 # Copy the meta configuration values to each of the configured tools
397 # only do this if the tool has not been further specialized
398 if not self._frozen:
399 for tool in self.atools:
400 for tag in self.metric_tags:
401 tool.metric_tags.insert(-1, tag)
402 super().freeze()
404 def validate(self):
405 super().validate()
406 # Validate that the required connections template is set.
407 if self.connections.outputName == "Placeholder": # type: ignore
408 raise RuntimeError("Connections class 'outputName' must have a config explicitly set")
411class _StandinPlotInfo(dict):
412 """This class is an implementation detail to support plots in the instance
413 no PlotInfo object is present in the call to run.
414 """
416 def __missing__(self, key):
417 return ""
420class AnalysisPipelineTask(PipelineTask):
421 """Base class for `PipelineTasks` intended to run `AnalysisTools`.
423 The run method will run all of the `AnalysisTools` defined in the config
424 class.
425 """
427 # Typing config because type checkers dont know about our Task magic
428 config: AnalysisBaseConfig
429 ConfigClass = AnalysisBaseConfig
431 warnings_all = (
432 "divide by zero encountered in divide",
433 "invalid value encountered in arcsin",
434 "invalid value encountered in cos",
435 "invalid value encountered in divide",
436 "invalid value encountered in log10",
437 "invalid value encountered in scalar divide",
438 "invalid value encountered in sin",
439 "invalid value encountered in sqrt",
440 "invalid value encountered in true_divide",
441 "Mean of empty slice",
442 )
444 def _runTools(self, data: KeyedData, **kwargs) -> Struct:
445 caughtErrors = {}
446 with warnings.catch_warnings():
447 # Change below to "in self.warnings_all" to find otherwise
448 # unfiltered numpy warnings.
449 for warning in ():
450 warnings.filterwarnings("error", warning, RuntimeWarning)
451 results = Struct()
452 prefixName = f"{self.config.connections.outputName}_" if self.config.addOutputNamePrefix else ""
453 results.metrics = MetricMeasurementBundle(
454 dataset_identifier=self.config.dataset_identifier,
455 reference_package=self.config.reference_package,
456 timestamp_version=self.config.timestamp_version,
457 metricNamePrefix=prefixName,
458 )
459 # copy plot info to be sure each action sees its own copy
460 plotInfo = kwargs.get("plotInfo")
461 plotKey = f"{self.config.connections.outputName}_{{name}}"
462 weakrefArgs = []
463 for name, action in self.config.atools.items():
464 kwargs["plotInfo"] = deepcopy(plotInfo)
465 try:
466 actionResult = action(data, **kwargs)
467 except Exception:
468 tb = traceback.format_exc()
469 caughtErrors[name] = tb
471 continue
472 metricAccumulate = []
473 for resultName, value in actionResult.items():
474 match value:
475 case PlotTypes():
476 setattr(results, plotKey.format(name=resultName), value)
477 weakrefArgs.append(value)
478 case Measurement():
479 metricAccumulate.append(value)
480 # only add the metrics if there are some
481 if metricAccumulate:
482 results.metrics[name] = metricAccumulate
483 # Wrap the return struct in a finalizer so that when results is
484 # garbage collected the plots will be closed.
485 # TODO: This finalize step closes all open plots at the conclusion
486 # of a task. When DM-39114 is implemented, this step should not
487 # be required and may be removed.
488 weakref.finalize(results, _plotCloser, *weakrefArgs)
489 results.__caughtErrors = caughtErrors
490 return results
492 def run(self, *, data: KeyedData | None = None, **kwargs) -> Struct:
493 """Produce the outputs associated with this `PipelineTask`.
495 Parameters
496 ----------
497 data : `KeyedData`
498 The input data from which all `AnalysisTools` will run and produce
499 outputs. A side note, the python typing specifies that this can be
500 None, but this is only due to a limitation in python where in order
501 to specify that all arguments be passed only as keywords the
502 argument must be given a default. This argument most not actually
503 be None.
504 **kwargs
505 Additional arguments that are passed through to the `AnalysisTools`
506 specified in the configuration.
508 Returns
509 -------
510 results : `~lsst.pipe.base.Struct`
511 The accumulated results of all the plots and metrics produced by
512 this `PipelineTask`.
514 Raises
515 ------
516 ValueError
517 Raised if the supplied data argument is `None`
518 """
519 if data is None:
520 raise ValueError("data must not be none")
521 if "bands" not in kwargs:
522 kwargs["bands"] = list(self.config.bands)
523 if "plotInfo" not in kwargs:
524 kwargs["plotInfo"] = _StandinPlotInfo()
525 kwargs["plotInfo"]["bands"] = kwargs["bands"]
526 return self._runTools(data, **kwargs)
528 def runQuantum(
529 self,
530 butlerQC: QuantumContext,
531 inputRefs: InputQuantizedConnection,
532 outputRefs: OutputQuantizedConnection,
533 ) -> None:
534 """Override default runQuantum to load the minimal columns necessary
535 to complete the action.
537 Parameters
538 ----------
539 butlerQC : `~lsst.pipe.base.QuantumContext`
540 A butler which is specialized to operate in the context of a
541 `lsst.daf.butler.Quantum`.
542 inputRefs : `InputQuantizedConnection`
543 Datastructure whose attribute names are the names that identify
544 connections defined in corresponding `PipelineTaskConnections`
545 class. The values of these attributes are the
546 `lsst.daf.butler.DatasetRef` objects associated with the defined
547 input/prerequisite connections.
548 outputRefs : `OutputQuantizedConnection`
549 Datastructure whose attribute names are the names that identify
550 connections defined in corresponding `PipelineTaskConnections`
551 class. The values of these attributes are the
552 `lsst.daf.butler.DatasetRef` objects associated with the defined
553 output connections.
554 """
555 inputs = butlerQC.get(inputRefs)
556 dataId = butlerQC.quantum.dataId
557 plotInfo = self.parsePlotInfo(inputs, dataId)
558 # We implicitly assume that 'data' has been defined, but do not have a
559 # corresponding input connection in the base class. Thus, we capture
560 # and re-raise the error with a more helpful message.
561 try:
562 # data has to be popped out to avoid duplication in the call to the
563 # `run` method.
564 inputData = inputs.pop("data")
565 except KeyError:
566 raise RuntimeError("'data' is a required input connection, but is not defined.")
567 data = self.loadData(inputData)
568 outputs = self.run(data=data, plotInfo=plotInfo, **inputs)
569 self.putByBand(butlerQC, outputs, outputRefs)
571 def putByBand(self, butlerQC: QuantumContext, outputs: Struct, outputRefs: OutputQuantizedConnection):
572 """Handle the outputs by band.
574 This is a convenience method to handle the case where the
575 PipelineTaskConnection had to instantiate multiple output connections
576 for plots to loop over bands.
578 Parameters
579 ----------
580 butlerQC : `~lsst.pipe.base.QuantumContext`
581 A butler which is specialized to operate in the context of a
582 `lsst.daf.butler.Quantum`.
583 outputs : `~lsst.pipe.base.Struct`
584 The accumulated results of all the plots and metrics produced by
585 the `run` method of this `PipelineTask`.
586 outputRefs : `OutputQuantizedConnection`
587 Datastructure whose attribute names are the names that identify
588 connections defined in corresponding `PipelineTaskConnections`
589 class. The values of these attributes are the
590 `lsst.daf.butler.DatasetRef` objects associated with the defined
591 output connections.
592 """
593 for outputRefName in outputRefs.keys():
594 if outputRefName == "metrics":
595 butlerQC.put(getattr(outputs, outputRefName), getattr(outputRefs, outputRefName))
596 continue
598 name = outputRefName.split(self.config.connections.outputName, maxsplit=1)[1]
600 datasetRef = getattr(outputRefs, outputRefName)
601 if hasattr(datasetRef, "__iter__"):
602 for outputRef in datasetRef:
603 band = outputRef.dataId.get("band", "")
604 # name would already have a leading underscore.
605 newOutputName = f"{self.config.connections.outputName}_{band}{name}"
606 if dataset := getattr(outputs, newOutputName, None):
607 butlerQC.put(dataset, outputRef)
608 else:
609 if dataset := getattr(outputs, outputRefName, None):
610 butlerQC.put(dataset, datasetRef)
612 # If any of the actions encountered errors, raise an
613 # AnnotatedPartialOutputsError with the details.
614 if outputs.__caughtErrors:
615 e = PartialAtoolsFailureError(outputs.__caughtErrors)
616 error = AnnotatedPartialOutputsError.annotate(e, self, log=self.log)
617 raise error from e
619 def _populatePlotInfoWithDataId(
620 self, plotInfo: MutableMapping[str, Any], dataId: DataCoordinate | None
621 ) -> None:
622 """Update the plotInfo with the dataId values.
624 Parameters
625 ----------
626 plotInfo : `dict`
627 The plotInfo dictionary to update.
628 dataId : `lsst.daf.butler.DataCoordinate`
629 The dataId to use to update the plotInfo.
630 """
631 if dataId is not None:
632 plotInfo.update(dataId.mapping)
634 def parsePlotInfo(
635 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data"
636 ) -> Mapping[str, str]:
637 """Parse the inputs and dataId to get the information needed to
638 to add to the figure.
640 Parameters
641 ----------
642 inputs: `dict`
643 The inputs to the task
644 dataCoordinate: `lsst.daf.butler.DataCoordinate`
645 The dataId that the task is being run on.
646 connectionName: `str`, optional
647 Name of the input connection to use for determining table name.
649 Returns
650 -------
651 plotInfo : `dict`
652 """
654 if inputs is None:
655 tableName = ""
656 run = ""
657 else:
658 tableName = inputs[connectionName].ref.datasetType.name
659 run = inputs[connectionName].ref.run
661 # Initialize the plot info dictionary
662 plotInfo = {"tableName": tableName, "run": run}
664 self._populatePlotInfoWithDataId(plotInfo, dataId)
665 return plotInfo
667 def loadData(self, handle: DeferredDatasetHandle, names: Iterable[str] | None = None) -> KeyedData:
668 """Load the minimal set of keyed data from the input dataset.
670 Parameters
671 ----------
672 handle : `DeferredDatasetHandle`
673 Handle to load the dataset with only the specified columns.
674 names : `Iterable` of `str`
675 The names of keys to extract from the dataset.
676 If `names` is `None` then the `collectInputNames` method
677 is called to generate the names.
678 For most purposes these are the names of columns to load from
679 a catalog or data frame.
681 Returns
682 -------
683 result: `KeyedData`
684 The dataset with only the specified keys loaded.
685 """
686 if names is None:
687 names = self.collectInputNames()
688 return cast(KeyedData, handle.get(parameters={"columns": names}))
690 def collectInputNames(self) -> Iterable[str]:
691 """Get the names of the inputs.
693 If using the default `loadData` method this will gather the names
694 of the keys to be loaded from an input dataset.
696 Returns
697 -------
698 inputs : `Iterable` of `str`
699 The names of the keys in the `KeyedData` object to extract.
701 """
702 inputs = set()
704 if not (localBands := self.config.bands):
705 localBands = [""]
706 for band in localBands:
707 for action in self.config.atools:
708 for key, _ in action.getFormattedInputSchema(band=band):
709 inputs.add(key)
710 return inputs