Coverage for python / lsst / analysis / tools / interfaces / _task.py: 16%
268 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:36 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = ("AnalysisBaseConnections", "AnalysisBaseConfig", "AnalysisPipelineTask")
25import datetime
26import logging
27import pprint
28import traceback
29import warnings
30import weakref
31from collections.abc import Collection, Iterable, Mapping, MutableMapping
32from copy import deepcopy
33from typing import TYPE_CHECKING, Any, cast
35import matplotlib.pyplot as plt
37from lsst.verify import Measurement
39if TYPE_CHECKING:
40 from lsst.daf.butler import DatasetRef, DeferredDatasetHandle
41 from lsst.pipe.base import QuantumContext
43from lsst.daf.butler import DataCoordinate
44from lsst.pex.config import Field, ListField
45from lsst.pex.config.configurableActions import ConfigurableActionStructField
46from lsst.pipe.base import (
47 AlgorithmError,
48 AnnotatedPartialOutputsError,
49 Instrument,
50 PipelineTask,
51 PipelineTaskConfig,
52 PipelineTaskConnections,
53 Struct,
54)
55from lsst.pipe.base import connectionTypes as ct
56from lsst.pipe.base.connections import InputQuantizedConnection, OutputQuantizedConnection
57from lsst.pipe.base.pipelineIR import ConfigIR, ParametersIR
59from ._actions import JointAction, MetricAction, NoMetric
60from ._analysisTools import AnalysisTool
61from ._interfaces import KeyedData, PlotTypes
62from ._metricMeasurementBundle import MetricMeasurementBundle
64"""Base class implementation for the classes needed in creating `PipelineTasks`
65which execute `AnalysisTools`.
67The classes defined in this module have all the required behaviors for
68defining, introspecting, and executing `AnalysisTools` against an input dataset
69type.
71Subclasses of these tasks should specify specific datasets to consume in their
72connection classes and should specify a unique name
73"""
75# TODO: This rcParams modification is a temporary solution, hiding
76# a matplotlib warning indicating too many figures have been opened.
77# When DM-39114 is implemented, this should be removed.
78plt.rcParams.update({"figure.max_open_warning": 0})
81# TODO: This _plotCloser function assists in closing all open plots at the
82# conclusion of a PipelineTask. When DM-39114 is implemented, this function and
83# all associated usage thereof should be removed.
84def _plotCloser(*args):
85 """Close all the plots in the given list."""
86 for plot in args:
87 plt.close(plot)
90class PartialAtoolsFailureError(AlgorithmError):
91 """Raised if an analysis tools action has partially failed.
93 Parameters
94 ----------
95 caughtErrors : `dict` of (`str`, `str`)
96 A dictionary mapping the action name to the string representation of
97 the exception raised by that action.
98 """
100 def __init__(self, caughtErrors) -> None:
101 self._caughtErrors = caughtErrors
102 errString = "These tasks have failed for the following reasons: "
103 for errName in caughtErrors.keys():
104 errString += errName + ": "
105 errString += pprint.pformat(caughtErrors[errName])
106 errString += ", "
107 errString = errString[:-2]
108 super().__init__(errString)
110 @property
111 def metadata(self) -> dict:
112 return self._caughtErrors
115class AnalysisBaseConnections(
116 PipelineTaskConnections, dimensions={}, defaultTemplates={"outputName": "Placeholder"}
117):
118 r"""Base class for Connections used for AnalysisTools PipelineTasks.
120 This class has a pre-defined output connection for the
121 MetricMeasurementMapping. The dataset type name for this connection is
122 determined by the template ``outputName``.
124 Output connections for plots created by `AnalysisPlot`\ s are created
125 dynamically when an instance of the class is created. The init method
126 examines all the `AnalysisPlot` actions specified in the associated
127 `AnalysisBaseConfig` subclass accumulating all the info needed to
128 create the output connections.
130 The dimensions for all of the output connections (metric and plot) will
131 be the same as the dimensions specified for the AnalysisBaseConnections
132 subclass (i.e. quantum dimensions).
133 """
135 metrics = ct.Output(
136 doc="Metrics calculated on input dataset type",
137 name="{outputName}_metrics",
138 storageClass="MetricMeasurementBundle",
139 )
141 def __init__(self, *, config: AnalysisBaseConfig = None): # type: ignore
142 # Validate that the outputName template has been set in config. This
143 # should have been checked early with the configs validate method, but
144 # it is possible for someone to manually create everything in a script
145 # without running validate, so also check it late here.
146 if (outputName := config.connections.outputName) == "Placeholder": # type: ignore
147 raise RuntimeError(
148 "Subclasses must specify an alternative value for the defaultTemplate `outputName`"
149 )
150 super().__init__(config=config)
152 # All arguments must be passed by kw, but python has not method to do
153 # that without specifying a default, so None is used. Validate that
154 # it is not None. This is largely for typing reasons, as in the normal
155 # course of operation code execution paths ensure this will not be None
156 assert config is not None
158 for tool in config.atools:
159 match tool.produce:
160 case JointAction():
161 if isinstance(tool.produce.metric, NoMetric):
162 continue
163 if len(tool.produce.metric.units) != 0:
164 hasMetrics = True
165 break
166 case MetricAction():
167 hasMetrics = True
168 break
169 else:
170 hasMetrics = False
172 # Set the dimensions for the metric
173 if hasMetrics:
174 self.metrics = ct.Output(
175 name=self.metrics.name,
176 doc=self.metrics.doc,
177 storageClass=self.metrics.storageClass,
178 dimensions=self.dimensions,
179 multiple=False,
180 isCalibration=False,
181 )
182 else:
183 # There are no metrics to produce, remove the output connection
184 self.outputs.remove("metrics")
186 # Look for any conflicting names, creating a set of them, as these
187 # will be added to the instance as well as recorded in the outputs
188 # set.
189 existingNames = set(dir(self))
191 # Accumulate all the names to be used from all of the defined
192 # AnalysisPlots.
193 names: Mapping[str, AnalysisTool] = {}
194 for action in config.atools:
195 if action.dynamicOutputNames:
196 outNames = action.getOutputNames(config=config)
197 else:
198 outNames = action.getOutputNames()
199 if action.parameterizedBand:
200 for band in config.bands:
201 names.update({name.format(band=band): action for name in outNames})
202 else:
203 names.update({name: action for name in outNames})
205 # For each of the names found, create output connections.
206 for name, action in names.items():
207 name = f"{outputName}_{name}"
208 if name in self.outputs or name in existingNames:
209 raise NameError(
210 f"Plot with name {name} conflicts with existing connection are two plots named the same?"
211 )
213 if action.parameterizedBand and "band" not in self.dimensions:
214 # If band is in self.dimensions, but the bands still appear
215 # in the output names, then it is likely a user error or the
216 # user is intentional in having a pair-wise plot. So we will
217 # let the band names pass through in the else block. To avoid
218 # the band from appearing, parametrizedBand needs to be False.
219 multiple = True
220 dimensions = self.dimensions.union({"band"})
221 # Struct attributes have the structure
222 # {outputName}_{band}_{name}.
223 band_name = name.split(outputName, maxsplit=1)[1] # _{band}_{name}.
224 # Limit the maximum split to 2 to avoid splitting the name
225 # that may contain underscores.
226 band, name = band_name.split("_", maxsplit=2)[1:] # ["", band, name].
227 name = f"{outputName}_{name}"
228 else:
229 multiple = False
230 dimensions = self.dimensions
232 outConnection = ct.Output(
233 name=name,
234 storageClass="Plot",
235 doc="Dynamic connection for plotting",
236 dimensions=dimensions,
237 multiple=multiple,
238 )
239 setattr(self, name, outConnection)
241 def adjustQuantum(
242 self,
243 inputs: dict[str, tuple[ct.BaseInput, Collection[DatasetRef]]],
244 outputs: dict[str, tuple[ct.Output, Collection[DatasetRef]]],
245 label: str,
246 data_id: DataCoordinate,
247 ) -> tuple[
248 Mapping[str, tuple[ct.BaseInput, Collection[DatasetRef]]],
249 Mapping[str, tuple[ct.Output, Collection[DatasetRef]]],
250 ]:
251 # If the task does not have band in its dimensions but an output
252 # connection does, only keep the output refs that are consistent with
253 # the producing AnalysisTool's 'bands' attribute.
254 if "band" in data_id.dimensions.names:
255 # Nothing to do, since the task dimensions already have band; just
256 # delegate to super. Note that we use the data ID dimensions since
257 # self.dimensions is not an (expanded) DimensionGroup.
258 return super().adjustQuantum(inputs, outputs, label, data_id)
259 adjusted_outputs: dict[str, tuple[ct.Output, list[DatasetRef]]] = {}
260 for name, (connection, old_refs) in outputs.items():
261 # We also don't want to rely on connection.dimensions; the
262 # dimensions in the refs are expanded and hence safer.
263 new_refs: list[DatasetRef] | None = None
264 for old_ref in old_refs:
265 if new_refs is None:
266 if "band" in old_ref.datasetType.dimensions:
267 new_refs = []
268 else:
269 # If this ref doesn't have 'band' none of the others
270 # for this connection will either; move on to the next.
271 break
272 if old_ref.dataId["band"] in self.config.bands:
273 new_refs.append(old_ref)
274 if new_refs is not None:
275 adjusted_outputs[name] = (connection, new_refs)
276 # Update the original outputs so we can pass them to super.
277 outputs.update(adjusted_outputs)
278 super().adjustQuantum(inputs, outputs, label, data_id)
279 # Return only the connections we modified.
280 return {}, adjusted_outputs
283def _timestampValidator(value: str) -> bool:
284 if value in ("reference_package_timestamp", "run_timestamp", "current_timestamp", "dataset_timestamp"):
285 return True
286 elif "explicit_timestamp" in value:
287 try:
288 _, splitTime = value.split(":")
289 except ValueError:
290 logging.error(
291 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
292 r"where datetime is given in the form '%Y%m%dT%H%M%S%z"
293 )
294 return False
295 try:
296 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z")
297 except ValueError:
298 # This is explicitly chosen to be an f string as the string
299 # contains control characters.
300 logging.error(
301 f"The supplied datetime {splitTime} could not be parsed correctly into "
302 r"%Y%m%dT%H%M%S%z format"
303 )
304 return False
305 return True
306 else:
307 return False
310class AnalysisBaseConfig(PipelineTaskConfig, pipelineConnections=AnalysisBaseConnections):
311 """Base class for all configs used to define an `AnalysisPipelineTask`.
313 This base class defines two fields that should be used in all subclasses,
314 atools, and bands.
316 The ``atools`` field is where the user configures which analysis tools will
317 be run as part of this `PipelineTask`.
319 The bands field specifies which bands will be looped over for
320 `AnalysisTools` which support parameterized bands. I.e. called once for
321 each band in the list.
322 """
324 atools = ConfigurableActionStructField[AnalysisTool](
325 doc="The analysis tools that are to be run by this task at execution"
326 )
327 # Temporarally alias these for backwards compatibility
328 plots = atools
329 metrics = atools
330 bands = ListField[str](
331 doc="Filter bands on which to run all of the actions", default=["u", "g", "r", "i", "z", "y"]
332 )
333 metric_tags = ListField[str](
334 doc="List of tags which will be added to all configurable actions", default=[]
335 )
336 dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True)
337 reference_package = Field[str](
338 doc="A package who's version, at the time of metric upload to a "
339 "time series database, will be converted to a timestamp of when "
340 "that version was produced",
341 default="lsst_distrib",
342 )
343 timestamp_version = Field[str](
344 doc="Which time stamp should be used as the reference timestamp for a "
345 "metric in a time series database, valid values are; "
346 "reference_package_timestamp, run_timestamp, current_timestamp, "
347 "dataset_timestamp and explicit_timestamp:datetime where datetime is "
348 "given in the form %Y%m%dT%H%M%S%z",
349 default="run_timestamp",
350 check=_timestampValidator,
351 )
352 addOutputNamePrefix = Field[bool](
353 doc="If True, the connections class output name will be prefixed "
354 "to the analysis 'atools' name when the value is dispatched to sasquatch.",
355 default=False,
356 )
358 def applyConfigOverrides(
359 self,
360 instrument: Instrument | None,
361 taskDefaultName: str,
362 pipelineConfigs: Iterable[ConfigIR] | None,
363 parameters: ParametersIR,
364 label: str,
365 ) -> None:
366 extraConfig = {}
367 if (value := parameters.mapping.get("sasquatch_dataset_identifier", None)) is not None:
368 extraConfig["dataset_identifier"] = value
369 if (value := parameters.mapping.get("sasquatch_reference_package", None)) is not None:
370 extraConfig["reference_package"] = value
371 if (value := parameters.mapping.get("sasquatch_timestamp_version", None)) is not None:
372 if "explicit_timestamp" in value:
373 try:
374 _, splitTime = value.split(":")
375 except ValueError as excpt:
376 raise ValueError(
377 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
378 "where datetime is given in the form '%Y%m%dT%H%M%S%z"
379 ) from excpt
380 try:
381 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z")
382 except ValueError as excpt:
383 raise ValueError(
384 f"The supplied datetime {splitTime} could not be parsed correctly into "
385 "%Y%m%dT%H%M%S%z format"
386 ) from excpt
387 extraConfig["timestamp_version"] = value
388 if extraConfig:
389 newPipelineConfigs = [ConfigIR(rest=extraConfig)]
390 if pipelineConfigs is not None:
391 newPipelineConfigs.extend(pipelineConfigs)
392 pipelineConfigs = newPipelineConfigs
393 return super().applyConfigOverrides(instrument, taskDefaultName, pipelineConfigs, parameters, label)
395 def freeze(self):
396 # Copy the meta configuration values to each of the configured tools
397 # only do this if the tool has not been further specialized
398 if not self._frozen:
399 for tool in self.atools:
400 for tag in self.metric_tags:
401 tool.metric_tags.insert(-1, tag)
402 super().freeze()
404 def validate(self):
405 super().validate()
406 # Validate that the required connections template is set.
407 if self.connections.outputName == "Placeholder": # type: ignore
408 raise RuntimeError("Connections class 'outputName' must have a config explicitly set")
411class _StandinPlotInfo(dict):
412 """This class is an implementation detail to support plots in the instance
413 no PlotInfo object is present in the call to run.
414 """
416 def __missing__(self, key):
417 return ""
420class AnalysisPipelineTask(PipelineTask):
421 """Base class for `PipelineTasks` intended to run `AnalysisTools`.
423 The run method will run all of the `AnalysisTools` defined in the config
424 class.
425 """
427 # Typing config because type checkers dont know about our Task magic
428 config: AnalysisBaseConfig
429 ConfigClass = AnalysisBaseConfig
431 warnings_all = (
432 "divide by zero encountered in divide",
433 "invalid value encountered in arcsin",
434 "invalid value encountered in cos",
435 "invalid value encountered in divide",
436 "invalid value encountered in log10",
437 "invalid value encountered in scalar divide",
438 "invalid value encountered in sin",
439 "invalid value encountered in sqrt",
440 "invalid value encountered in true_divide",
441 "Mean of empty slice",
442 )
444 def _runTools(self, data: KeyedData, **kwargs) -> Struct:
445 caughtErrors = {}
446 with warnings.catch_warnings():
447 # Change below to "in self.warnings_all" to find otherwise
448 # unfiltered numpy warnings.
449 for warning in ():
450 warnings.filterwarnings("error", warning, RuntimeWarning)
451 results = Struct()
452 prefixName = f"{self.config.connections.outputName}_" if self.config.addOutputNamePrefix else ""
453 results.metrics = MetricMeasurementBundle(
454 dataset_identifier=self.config.dataset_identifier,
455 reference_package=self.config.reference_package,
456 timestamp_version=self.config.timestamp_version,
457 metricNamePrefix=prefixName,
458 )
459 # copy plot info to be sure each action sees its own copy
460 plotInfo = kwargs.get("plotInfo")
461 plotKey = f"{self.config.connections.outputName}_{{name}}"
462 weakrefArgs = []
463 for name, action in self.config.atools.items():
464 kwargs["plotInfo"] = deepcopy(plotInfo)
465 try:
466 actionResult = action(data, **kwargs)
467 except Exception:
468 tb = traceback.format_exc()
469 caughtErrors[name] = tb
471 continue
472 metricAccumulate = []
473 for resultName, value in actionResult.items():
474 match value:
475 case PlotTypes():
476 setattr(results, plotKey.format(name=resultName), value)
477 weakrefArgs.append(value)
478 case Measurement():
479 metricAccumulate.append(value)
480 # only add the metrics if there are some
481 if metricAccumulate:
482 results.metrics[name] = metricAccumulate
483 # Wrap the return struct in a finalizer so that when results is
484 # garbage collected the plots will be closed.
485 # TODO: This finalize step closes all open plots at the conclusion
486 # of a task. When DM-39114 is implemented, this step should not
487 # be required and may be removed.
488 weakref.finalize(results, _plotCloser, *weakrefArgs)
489 results.__caughtErrors = caughtErrors
490 return results
492 def run(self, *, data: KeyedData | None = None, **kwargs) -> Struct:
493 """Produce the outputs associated with this `PipelineTask`.
495 Parameters
496 ----------
497 data : `KeyedData`
498 The input data from which all `AnalysisTools` will run and produce
499 outputs. A side note, the python typing specifies that this can be
500 None, but this is only due to a limitation in python where in order
501 to specify that all arguments be passed only as keywords the
502 argument must be given a default. This argument most not actually
503 be None.
504 **kwargs
505 Additional arguments that are passed through to the `AnalysisTools`
506 specified in the configuration.
508 Returns
509 -------
510 results : `~lsst.pipe.base.Struct`
511 The accumulated results of all the plots and metrics produced by
512 this `PipelineTask`.
514 Raises
515 ------
516 ValueError
517 Raised if the supplied data argument is `None`
518 """
519 if data is None:
520 raise ValueError("data must not be none")
521 if "bands" not in kwargs:
522 kwargs["bands"] = list(self.config.bands)
523 if "plotInfo" not in kwargs:
524 kwargs["plotInfo"] = _StandinPlotInfo()
525 kwargs["plotInfo"]["bands"] = kwargs["bands"]
526 return self._runTools(data, **kwargs)
528 def runQuantum(
529 self,
530 butlerQC: QuantumContext,
531 inputRefs: InputQuantizedConnection,
532 outputRefs: OutputQuantizedConnection,
533 ) -> None:
534 """Override default runQuantum to load the minimal columns necessary
535 to complete the action.
537 Parameters
538 ----------
539 butlerQC : `~lsst.pipe.base.QuantumContext`
540 A butler which is specialized to operate in the context of a
541 `lsst.daf.butler.Quantum`.
542 inputRefs : `InputQuantizedConnection`
543 Datastructure whose attribute names are the names that identify
544 connections defined in corresponding `PipelineTaskConnections`
545 class. The values of these attributes are the
546 `lsst.daf.butler.DatasetRef` objects associated with the defined
547 input/prerequisite connections.
548 outputRefs : `OutputQuantizedConnection`
549 Datastructure whose attribute names are the names that identify
550 connections defined in corresponding `PipelineTaskConnections`
551 class. The values of these attributes are the
552 `lsst.daf.butler.DatasetRef` objects associated with the defined
553 output connections.
554 """
555 inputs = butlerQC.get(inputRefs)
556 dataId = butlerQC.quantum.dataId
557 plotInfo = self.parsePlotInfo(inputs, dataId)
558 # We implicitly assume that 'data' has been defined, but do not have a
559 # corresponding input connection in the base class. Thus, we capture
560 # and re-raise the error with a more helpful message.
561 try:
562 # data has to be popped out to avoid duplication in the call to the
563 # `run` method.
564 inputData = inputs.pop("data")
565 except KeyError:
566 raise RuntimeError("'data' is a required input connection, but is not defined.")
567 data = self.loadData(inputData)
568 outputs = self.run(data=data, plotInfo=plotInfo, **inputs)
569 self.putByBand(butlerQC, outputs, outputRefs)
571 def putByBand(self, butlerQC: QuantumContext, outputs: Struct, outputRefs: OutputQuantizedConnection):
572 """Handle the outputs by band.
574 This is a convenience method to handle the case where the
575 PipelineTaskConnection had to instantiate multiple output connections
576 for plots to loop over bands.
578 Parameters
579 ----------
580 butlerQC : `~lsst.pipe.base.QuantumContext`
581 A butler which is specialized to operate in the context of a
582 `lsst.daf.butler.Quantum`.
583 outputs : `~lsst.pipe.base.Struct`
584 The accumulated results of all the plots and metrics produced by
585 the `run` method of this `PipelineTask`.
586 outputRefs : `OutputQuantizedConnection`
587 Datastructure whose attribute names are the names that identify
588 connections defined in corresponding `PipelineTaskConnections`
589 class. The values of these attributes are the
590 `lsst.daf.butler.DatasetRef` objects associated with the defined
591 output connections.
592 """
593 for outputRefName in outputRefs.keys():
594 if outputRefName == "metrics":
595 butlerQC.put(getattr(outputs, outputRefName), getattr(outputRefs, outputRefName))
596 continue
598 name = outputRefName.split(self.config.connections.outputName, maxsplit=1)[1]
600 datasetRef = getattr(outputRefs, outputRefName)
601 if hasattr(datasetRef, "__iter__"):
602 for outputRef in datasetRef:
603 band = outputRef.dataId.get("band", "")
604 # name would already have a leading underscore.
605 newOutputName = f"{self.config.connections.outputName}_{band}{name}"
606 if dataset := getattr(outputs, newOutputName, None):
607 butlerQC.put(dataset, outputRef)
608 else:
609 if dataset := getattr(outputs, outputRefName, None):
610 butlerQC.put(dataset, datasetRef)
612 # If any of the actions encountered errors, raise an
613 # AnnotatedPartialOutputsError with the details.
614 if outputs.__caughtErrors:
615 e = PartialAtoolsFailureError(outputs.__caughtErrors)
616 error = AnnotatedPartialOutputsError.annotate(e, self, log=self.log)
617 raise error from e
619 def _populatePlotInfoWithDataId(
620 self, plotInfo: MutableMapping[str, Any], dataId: DataCoordinate | None
621 ) -> None:
622 """Update the plotInfo with the dataId values.
624 Parameters
625 ----------
626 plotInfo : `dict`
627 The plotInfo dictionary to update.
628 dataId : `lsst.daf.butler.DataCoordinate`
629 The dataId to use to update the plotInfo.
630 """
631 if dataId is not None:
632 plotInfo.update(dataId.mapping)
634 def parsePlotInfo(
635 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data"
636 ) -> Mapping[str, str]:
637 """Parse the inputs and dataId to get the information needed to
638 to add to the figure.
640 Parameters
641 ----------
642 inputs: `dict`
643 The inputs to the task
644 dataCoordinate: `lsst.daf.butler.DataCoordinate`
645 The dataId that the task is being run on.
646 connectionName: `str`, optional
647 Name of the input connection to use for determining table name.
649 Returns
650 -------
651 plotInfo : `dict`
652 """
654 if inputs is None:
655 tableName = ""
656 run = ""
657 else:
658 tableName = inputs[connectionName].ref.datasetType.name
659 run = inputs[connectionName].ref.run
661 # Initialize the plot info dictionary
662 plotInfo = {"tableName": tableName, "run": run}
664 self._populatePlotInfoWithDataId(plotInfo, dataId)
665 return plotInfo
667 def loadData(self, handle: DeferredDatasetHandle, names: Iterable[str] | None = None) -> KeyedData:
668 """Load the minimal set of keyed data from the input dataset.
670 Parameters
671 ----------
672 handle : `DeferredDatasetHandle`
673 Handle to load the dataset with only the specified columns.
674 names : `Iterable` of `str`
675 The names of keys to extract from the dataset.
676 If `names` is `None` then the `collectInputNames` method
677 is called to generate the names.
678 For most purposes these are the names of columns to load from
679 a catalog or data frame.
681 Returns
682 -------
683 result: `KeyedData`
684 The dataset with only the specified keys loaded.
685 """
686 if names is None:
687 names = self.collectInputNames()
688 return cast(KeyedData, handle.get(parameters={"columns": names}))
690 def collectInputNames(self) -> Iterable[str]:
691 """Get the names of the inputs.
693 If using the default `loadData` method this will gather the names
694 of the keys to be loaded from an input dataset.
696 Returns
697 -------
698 inputs : `Iterable` of `str`
699 The names of the keys in the `KeyedData` object to extract.
701 """
702 inputs = set()
704 if not (localBands := self.config.bands):
705 localBands = [""]
706 for band in localBands:
707 for action in self.config.atools:
708 for key, _ in action.getFormattedInputSchema(band=band):
709 inputs.add(key)
710 return inputs