Coverage for python / lsst / analysis / tools / interfaces / _task.py: 16%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:23 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23"""Base class implementation for the classes needed in creating `PipelineTasks` 

24which execute `AnalysisTools`. 

25 

26The classes defined in this module have all the required behaviors for 

27defining, introspecting, and executing `AnalysisTools` against an input dataset 

28type. 

29 

30Subclasses of these tasks should specify specific datasets to consume in their 

31connection classes and should specify a unique name 

32""" 

33 

34__all__ = ("AnalysisBaseConnections", "AnalysisBaseConfig", "AnalysisPipelineTask") 

35 

36import datetime 

37import logging 

38import pprint 

39import traceback 

40import warnings 

41import weakref 

42from collections.abc import Collection, Iterable 

43from copy import deepcopy 

44from typing import TYPE_CHECKING, Any, Mapping, MutableMapping, cast 

45 

46import matplotlib.pyplot as plt 

47from lsst.verify import Measurement 

48 

49if TYPE_CHECKING: 

50 from lsst.daf.butler import DatasetRef, DeferredDatasetHandle 

51 from lsst.pipe.base import QuantumContext 

52 

53from lsst.daf.butler import DataCoordinate 

54from lsst.pex.config import Field, ListField 

55from lsst.pex.config.configurableActions import ConfigurableActionStructField 

56from lsst.pipe.base import ( 

57 AlgorithmError, 

58 AnnotatedPartialOutputsError, 

59 Instrument, 

60 PipelineTask, 

61 PipelineTaskConfig, 

62 PipelineTaskConnections, 

63 Struct, 

64) 

65from lsst.pipe.base import connectionTypes as ct 

66from lsst.pipe.base.connections import InputQuantizedConnection, OutputQuantizedConnection 

67from lsst.pipe.base.pipelineIR import ConfigIR, ParametersIR 

68 

69from ._actions import JointAction, MetricAction, NoMetric 

70from ._analysisTools import AnalysisTool 

71from ._interfaces import KeyedData, PlotTypes 

72from ._metricMeasurementBundle import MetricMeasurementBundle 

73 

74# TODO: This rcParams modification is a temporary solution, hiding 

75# a matplotlib warning indicating too many figures have been opened. 

76# When DM-39114 is implemented, this should be removed. 

77plt.rcParams.update({"figure.max_open_warning": 0}) 

78 

79 

80# TODO: This _plotCloser function assists in closing all open plots at the 

81# conclusion of a PipelineTask. When DM-39114 is implemented, this function and 

82# all associated usage thereof should be removed. 

83def _plotCloser(*args): 

84 """Close all the plots in the given list.""" 

85 for plot in args: 

86 plt.close(plot) 

87 

88 

89class PartialAtoolsFailureError(AlgorithmError): 

90 """Raised if an analysis tools action has partially failed. 

91 

92 Parameters 

93 ---------- 

94 caughtErrors : `dict` of (`str`, `str`) 

95 A dictionary mapping the action name to the string representation of 

96 the exception raised by that action. 

97 """ 

98 

99 def __init__(self, caughtErrors) -> None: 

100 self._caughtErrors = caughtErrors 

101 errString = "These tasks have failed for the following reasons: " 

102 for errName in caughtErrors.keys(): 

103 errString += errName + ": " 

104 errString += pprint.pformat(caughtErrors[errName]) 

105 errString += ", " 

106 errString = errString[:-2] 

107 super().__init__(errString) 

108 

109 @property 

110 def metadata(self) -> dict: 

111 return self._caughtErrors 

112 

113 

114class AnalysisBaseConnections( 

115 PipelineTaskConnections, dimensions={}, defaultTemplates={"outputName": "Placeholder"} 

116): 

117 r"""Base class for Connections used for AnalysisTools PipelineTasks. 

118 

119 This class has a pre-defined output connection for the 

120 MetricMeasurementMapping. The dataset type name for this connection is 

121 determined by the template ``outputName``. 

122 

123 Output connections for plots created by `AnalysisPlot`\ s are created 

124 dynamically when an instance of the class is created. The init method 

125 examines all the `AnalysisPlot` actions specified in the associated 

126 `AnalysisBaseConfig` subclass accumulating all the info needed to 

127 create the output connections. 

128 

129 The dimensions for all of the output connections (metric and plot) will 

130 be the same as the dimensions specified for the AnalysisBaseConnections 

131 subclass (i.e. quantum dimensions). 

132 """ 

133 

134 metrics = ct.Output( 

135 doc="Metrics calculated on input dataset type", 

136 name="{outputName}_metrics", 

137 storageClass="MetricMeasurementBundle", 

138 ) 

139 

140 def __init__(self, *, config: AnalysisBaseConfig = None): # type: ignore 

141 # Validate that the outputName template has been set in config. This 

142 # should have been checked early with the configs validate method, but 

143 # it is possible for someone to manually create everything in a script 

144 # without running validate, so also check it late here. 

145 if (outputName := config.connections.outputName) == "Placeholder": # type: ignore 

146 raise RuntimeError( 

147 "Subclasses must specify an alternative value for the defaultTemplate `outputName`" 

148 ) 

149 super().__init__(config=config) 

150 

151 # All arguments must be passed by kw, but python has not method to do 

152 # that without specifying a default, so None is used. Validate that 

153 # it is not None. This is largely for typing reasons, as in the normal 

154 # course of operation code execution paths ensure this will not be None 

155 assert config is not None 

156 

157 for tool in config.atools: 

158 match tool.produce: 

159 case JointAction(): 

160 if isinstance(tool.produce.metric, NoMetric): 

161 continue 

162 if len(tool.produce.metric.units) != 0: 

163 hasMetrics = True 

164 break 

165 case MetricAction(): 

166 hasMetrics = True 

167 break 

168 else: 

169 hasMetrics = False 

170 

171 # Set the dimensions for the metric 

172 if hasMetrics: 

173 self.metrics = ct.Output( 

174 name=self.metrics.name, 

175 doc=self.metrics.doc, 

176 storageClass=self.metrics.storageClass, 

177 dimensions=self.dimensions, 

178 multiple=False, 

179 isCalibration=False, 

180 ) 

181 else: 

182 # There are no metrics to produce, remove the output connection 

183 self.outputs.remove("metrics") 

184 

185 # Look for any conflicting names, creating a set of them, as these 

186 # will be added to the instance as well as recorded in the outputs 

187 # set. 

188 existingNames = set(dir(self)) 

189 

190 # Accumulate all the names to be used from all of the defined 

191 # AnalysisPlots. 

192 names: Mapping[str, AnalysisTool] = {} 

193 for action in config.atools: 

194 if action.dynamicOutputNames: 

195 outNames = action.getOutputNames(config=config) 

196 else: 

197 outNames = action.getOutputNames() 

198 if action.parameterizedBand: 

199 for band in config.bands: 

200 names.update({name.format(band=band): action for name in outNames}) 

201 else: 

202 names.update({name: action for name in outNames}) 

203 

204 # For each of the names found, create output connections. 

205 for name, action in names.items(): 

206 name = f"{outputName}_{name}" 

207 if name in self.outputs or name in existingNames: 

208 raise NameError( 

209 f"Plot with name {name} conflicts with existing connection" 

210 " are two plots named the same?" 

211 ) 

212 

213 if action.parameterizedBand and "band" not in self.dimensions: 

214 # If band is in self.dimensions, but the bands still appear 

215 # in the output names, then it is likely a user error or the 

216 # user is intentional in having a pair-wise plot. So we will 

217 # let the band names pass through in the else block. To avoid 

218 # the band from appearing, parametrizedBand needs to be False. 

219 multiple = True 

220 dimensions = self.dimensions.union({"band"}) 

221 # Struct attributes have the structure 

222 # {outputName}_{band}_{name}. 

223 band_name = name.split(outputName, maxsplit=1)[1] # _{band}_{name}. 

224 # Limit the maximum split to 2 to avoid splitting the name 

225 # that may contain underscores. 

226 band, name = band_name.split("_", maxsplit=2)[1:] # ["", band, name]. 

227 name = f"{outputName}_{name}" 

228 else: 

229 multiple = False 

230 dimensions = self.dimensions 

231 

232 outConnection = ct.Output( 

233 name=name, 

234 storageClass="Plot", 

235 doc="Dynamic connection for plotting", 

236 dimensions=dimensions, 

237 multiple=multiple, 

238 ) 

239 setattr(self, name, outConnection) 

240 

241 def adjustQuantum( 

242 self, 

243 inputs: dict[str, tuple[ct.BaseInput, Collection[DatasetRef]]], 

244 outputs: dict[str, tuple[ct.Output, Collection[DatasetRef]]], 

245 label: str, 

246 data_id: DataCoordinate, 

247 ) -> tuple[ 

248 Mapping[str, tuple[ct.BaseInput, Collection[DatasetRef]]], 

249 Mapping[str, tuple[ct.Output, Collection[DatasetRef]]], 

250 ]: 

251 # If the task does not have band in its dimensions but an output 

252 # connection does, only keep the output refs that are consistent with 

253 # the producing AnalysisTool's 'bands' attribute. 

254 if "band" in data_id.dimensions.names: 

255 # Nothing to do, since the task dimensions already have band; just 

256 # delegate to super. Note that we use the data ID dimensions since 

257 # self.dimensions is not an (expanded) DimensionGroup. 

258 return super().adjustQuantum(inputs, outputs, label, data_id) 

259 adjusted_outputs: dict[str, tuple[ct.Output, list[DatasetRef]]] = {} 

260 for name, (connection, old_refs) in outputs.items(): 

261 # We also don't want to rely on connection.dimensions; the 

262 # dimensions in the refs are expanded and hence safer. 

263 new_refs: list[DatasetRef] | None = None 

264 for old_ref in old_refs: 

265 if new_refs is None: 

266 if "band" in old_ref.datasetType.dimensions: 

267 new_refs = [] 

268 else: 

269 # If this ref doesn't have 'band' none of the others 

270 # for this connection will either; move on to the next. 

271 break 

272 if old_ref.dataId["band"] in self.config.bands: 

273 new_refs.append(old_ref) 

274 if new_refs is not None: 

275 adjusted_outputs[name] = (connection, new_refs) 

276 # Update the original outputs so we can pass them to super. 

277 outputs.update(adjusted_outputs) 

278 super().adjustQuantum(inputs, outputs, label, data_id) 

279 # Return only the connections we modified. 

280 return {}, adjusted_outputs 

281 

282 

283def _timestampValidator(value: str) -> bool: 

284 if value in ("reference_package_timestamp", "run_timestamp", "current_timestamp", "dataset_timestamp"): 

285 return True 

286 elif "explicit_timestamp" in value: 

287 try: 

288 _, splitTime = value.split(":") 

289 except ValueError: 

290 logging.error( 

291 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', " 

292 r"where datetime is given in the form '%Y%m%dT%H%M%S%z" 

293 ) 

294 return False 

295 try: 

296 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z") 

297 except ValueError: 

298 # This is explicitly chosen to be an f string as the string 

299 # contains control characters. 

300 logging.error( 

301 f"The supplied datetime {splitTime} could not be parsed correctly into " 

302 r"%Y%m%dT%H%M%S%z format" 

303 ) 

304 return False 

305 return True 

306 else: 

307 return False 

308 

309 

310class AnalysisBaseConfig(PipelineTaskConfig, pipelineConnections=AnalysisBaseConnections): 

311 """Base class for all configs used to define an `AnalysisPipelineTask`. 

312 

313 This base class defines two fields that should be used in all subclasses, 

314 atools, and bands. 

315 

316 The ``atools`` field is where the user configures which analysis tools will 

317 be run as part of this `PipelineTask`. 

318 

319 The bands field specifies which bands will be looped over for 

320 `AnalysisTools` which support parameterized bands. I.e. called once for 

321 each band in the list. 

322 """ 

323 

324 atools = ConfigurableActionStructField[AnalysisTool]( 

325 doc="The analysis tools that are to be run by this task at execution" 

326 ) 

327 # Temporarally alias these for backwards compatibility 

328 plots = atools 

329 metrics = atools 

330 bands = ListField[str]( 

331 doc="Filter bands on which to run all of the actions", default=["u", "g", "r", "i", "z", "y"] 

332 ) 

333 metric_tags = ListField[str]( 

334 doc="List of tags which will be added to all configurable actions", default=[] 

335 ) 

336 dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True) 

337 reference_package = Field[str]( 

338 doc="A package who's version, at the time of metric upload to a " 

339 "time series database, will be converted to a timestamp of when " 

340 "that version was produced", 

341 default="lsst_distrib", 

342 ) 

343 timestamp_version = Field[str]( 

344 doc="Which time stamp should be used as the reference timestamp for a " 

345 "metric in a time series database, valid values are; " 

346 "reference_package_timestamp, run_timestamp, current_timestamp, " 

347 "dataset_timestamp and explicit_timestamp:datetime where datetime is " 

348 "given in the form %Y%m%dT%H%M%S%z", 

349 default="run_timestamp", 

350 check=_timestampValidator, 

351 ) 

352 addOutputNamePrefix = Field[bool]( 

353 doc="If True, the connections class output name will be prefixed " 

354 "to the analysis 'atools' name when the value is dispatched to sasquatch.", 

355 default=False, 

356 ) 

357 

358 def applyConfigOverrides( 

359 self, 

360 instrument: Instrument | None, 

361 taskDefaultName: str, 

362 pipelineConfigs: Iterable[ConfigIR] | None, 

363 parameters: ParametersIR, 

364 label: str, 

365 ) -> None: 

366 extraConfig = {} 

367 if (value := parameters.mapping.get("sasquatch_dataset_identifier", None)) is not None: 

368 extraConfig["dataset_identifier"] = value 

369 if (value := parameters.mapping.get("sasquatch_reference_package", None)) is not None: 

370 extraConfig["reference_package"] = value 

371 if (value := parameters.mapping.get("sasquatch_timestamp_version", None)) is not None: 

372 if "explicit_timestamp" in value: 

373 try: 

374 _, splitTime = value.split(":") 

375 except ValueError as excpt: 

376 raise ValueError( 

377 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', " 

378 "where datetime is given in the form '%Y%m%dT%H%M%S%z" 

379 ) from excpt 

380 try: 

381 datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z") 

382 except ValueError as excpt: 

383 raise ValueError( 

384 f"The supplied datetime {splitTime} could not be parsed correctly into " 

385 "%Y%m%dT%H%M%S%z format" 

386 ) from excpt 

387 extraConfig["timestamp_version"] = value 

388 if extraConfig: 

389 newPipelineConfigs = [ConfigIR(rest=extraConfig)] 

390 if pipelineConfigs is not None: 

391 newPipelineConfigs.extend(pipelineConfigs) 

392 pipelineConfigs = newPipelineConfigs 

393 return super().applyConfigOverrides(instrument, taskDefaultName, pipelineConfigs, parameters, label) 

394 

395 def freeze(self): 

396 # Copy the meta configuration values to each of the configured tools 

397 # only do this if the tool has not been further specialized 

398 if not self._frozen: 

399 for tool in self.atools: 

400 for tag in self.metric_tags: 

401 tool.metric_tags.insert(-1, tag) 

402 super().freeze() 

403 

404 def validate(self): 

405 super().validate() 

406 # Validate that the required connections template is set. 

407 if self.connections.outputName == "Placeholder": # type: ignore 

408 raise RuntimeError("Connections class 'outputName' must have a config explicitly set") 

409 

410 

411class _StandinPlotInfo(dict): 

412 """This class is an implementation detail to support plots in the instance 

413 no PlotInfo object is present in the call to run. 

414 """ 

415 

416 def __missing__(self, key): 

417 return "" 

418 

419 

420class AnalysisPipelineTask(PipelineTask): 

421 """Base class for `PipelineTasks` intended to run `AnalysisTools`. 

422 

423 The run method will run all of the `AnalysisTools` defined in the config 

424 class. 

425 """ 

426 

427 # Typing config because type checkers dont know about our Task magic 

428 config: AnalysisBaseConfig 

429 ConfigClass = AnalysisBaseConfig 

430 

431 warnings_all = ( 

432 "divide by zero encountered in divide", 

433 "invalid value encountered in arcsin", 

434 "invalid value encountered in cos", 

435 "invalid value encountered in divide", 

436 "invalid value encountered in log10", 

437 "invalid value encountered in scalar divide", 

438 "invalid value encountered in sin", 

439 "invalid value encountered in sqrt", 

440 "invalid value encountered in true_divide", 

441 "Mean of empty slice", 

442 ) 

443 

444 def _runTools(self, data: KeyedData, **kwargs) -> Struct: 

445 caughtErrors = {} 

446 with warnings.catch_warnings(): 

447 # Change below to "in self.warnings_all" to find otherwise 

448 # unfiltered numpy warnings. 

449 for warning in (): 

450 warnings.filterwarnings("error", warning, RuntimeWarning) 

451 results = Struct() 

452 prefixName = f"{self.config.connections.outputName}_" if self.config.addOutputNamePrefix else "" 

453 results.metrics = MetricMeasurementBundle( 

454 dataset_identifier=self.config.dataset_identifier, 

455 reference_package=self.config.reference_package, 

456 timestamp_version=self.config.timestamp_version, 

457 metricNamePrefix=prefixName, 

458 ) 

459 # copy plot info to be sure each action sees its own copy 

460 plotInfo = kwargs.get("plotInfo") 

461 plotKey = f"{self.config.connections.outputName}_{{name}}" 

462 weakrefArgs = [] 

463 for name, action in self.config.atools.items(): 

464 kwargs["plotInfo"] = deepcopy(plotInfo) 

465 try: 

466 actionResult = action(data, **kwargs) 

467 except Exception: 

468 tb = traceback.format_exc() 

469 caughtErrors[name] = tb 

470 

471 continue 

472 metricAccumulate = [] 

473 for resultName, value in actionResult.items(): 

474 match value: 

475 case PlotTypes(): 

476 setattr(results, plotKey.format(name=resultName), value) 

477 weakrefArgs.append(value) 

478 case Measurement(): 

479 metricAccumulate.append(value) 

480 # only add the metrics if there are some 

481 if metricAccumulate: 

482 results.metrics[name] = metricAccumulate 

483 # Wrap the return struct in a finalizer so that when results is 

484 # garbage collected the plots will be closed. 

485 # TODO: This finalize step closes all open plots at the conclusion 

486 # of a task. When DM-39114 is implemented, this step should not 

487 # be required and may be removed. 

488 weakref.finalize(results, _plotCloser, *weakrefArgs) 

489 results.__caughtErrors = caughtErrors 

490 return results 

491 

492 def run(self, *, data: KeyedData | None = None, **kwargs) -> Struct: 

493 """Produce the outputs associated with this `PipelineTask`. 

494 

495 Parameters 

496 ---------- 

497 data : `KeyedData` 

498 The input data from which all `AnalysisTools` will run and produce 

499 outputs. A side note, the python typing specifies that this can be 

500 None, but this is only due to a limitation in python where in order 

501 to specify that all arguments be passed only as keywords the 

502 argument must be given a default. This argument most not actually 

503 be None. 

504 **kwargs 

505 Additional arguments that are passed through to the `AnalysisTools` 

506 specified in the configuration. 

507 

508 Returns 

509 ------- 

510 results : `~lsst.pipe.base.Struct` 

511 The accumulated results of all the plots and metrics produced by 

512 this `PipelineTask`. 

513 

514 Raises 

515 ------ 

516 ValueError 

517 Raised if the supplied data argument is `None` 

518 """ 

519 if data is None: 

520 raise ValueError("data must not be none") 

521 if "bands" not in kwargs: 

522 kwargs["bands"] = list(self.config.bands) 

523 if "plotInfo" not in kwargs: 

524 kwargs["plotInfo"] = _StandinPlotInfo() 

525 kwargs["plotInfo"]["bands"] = kwargs["bands"] 

526 return self._runTools(data, **kwargs) 

527 

528 def runQuantum( 

529 self, 

530 butlerQC: QuantumContext, 

531 inputRefs: InputQuantizedConnection, 

532 outputRefs: OutputQuantizedConnection, 

533 ) -> None: 

534 """Override default runQuantum to load the minimal columns necessary 

535 to complete the action. 

536 

537 Parameters 

538 ---------- 

539 butlerQC : `~lsst.pipe.base.QuantumContext` 

540 A butler which is specialized to operate in the context of a 

541 `lsst.daf.butler.Quantum`. 

542 inputRefs : `InputQuantizedConnection` 

543 Datastructure whose attribute names are the names that identify 

544 connections defined in corresponding `PipelineTaskConnections` 

545 class. The values of these attributes are the 

546 `lsst.daf.butler.DatasetRef` objects associated with the defined 

547 input/prerequisite connections. 

548 outputRefs : `OutputQuantizedConnection` 

549 Datastructure whose attribute names are the names that identify 

550 connections defined in corresponding `PipelineTaskConnections` 

551 class. The values of these attributes are the 

552 `lsst.daf.butler.DatasetRef` objects associated with the defined 

553 output connections. 

554 """ 

555 inputs = butlerQC.get(inputRefs) 

556 dataId = butlerQC.quantum.dataId 

557 plotInfo = self.parsePlotInfo(inputs, dataId) 

558 # We implicitly assume that 'data' has been defined, but do not have a 

559 # corresponding input connection in the base class. Thus, we capture 

560 # and re-raise the error with a more helpful message. 

561 try: 

562 # data has to be popped out to avoid duplication in the call to the 

563 # `run` method. 

564 inputData = inputs.pop("data") 

565 except KeyError: 

566 raise RuntimeError("'data' is a required input connection, but is not defined.") 

567 data = self.loadData(inputData) 

568 outputs = self.run(data=data, plotInfo=plotInfo, **inputs) 

569 self.putByBand(butlerQC, outputs, outputRefs) 

570 

571 def putByBand(self, butlerQC: QuantumContext, outputs: Struct, outputRefs: OutputQuantizedConnection): 

572 """Handle the outputs by band. 

573 

574 This is a convenience method to handle the case where the 

575 PipelineTaskConnection had to instantiate multiple output connections 

576 for plots to loop over bands. 

577 

578 Parameters 

579 ---------- 

580 butlerQC : `~lsst.pipe.base.QuantumContext` 

581 A butler which is specialized to operate in the context of a 

582 `lsst.daf.butler.Quantum`. 

583 outputs : `~lsst.pipe.base.Struct` 

584 The accumulated results of all the plots and metrics produced by 

585 the `run` method of this `PipelineTask`. 

586 outputRefs : `OutputQuantizedConnection` 

587 Datastructure whose attribute names are the names that identify 

588 connections defined in corresponding `PipelineTaskConnections` 

589 class. The values of these attributes are the 

590 `lsst.daf.butler.DatasetRef` objects associated with the defined 

591 output connections. 

592 """ 

593 for outputRefName in outputRefs.keys(): 

594 if outputRefName == "metrics": 

595 butlerQC.put(getattr(outputs, outputRefName), getattr(outputRefs, outputRefName)) 

596 continue 

597 

598 name = outputRefName.split(self.config.connections.outputName, maxsplit=1)[1] 

599 

600 datasetRef = getattr(outputRefs, outputRefName) 

601 if hasattr(datasetRef, "__iter__"): 

602 for outputRef in datasetRef: 

603 band = outputRef.dataId.get("band", "") 

604 # name would already have a leading underscore. 

605 newOutputName = f"{self.config.connections.outputName}_{band}{name}" 

606 if dataset := getattr(outputs, newOutputName, None): 

607 butlerQC.put(dataset, outputRef) 

608 else: 

609 if dataset := getattr(outputs, outputRefName, None): 

610 butlerQC.put(dataset, datasetRef) 

611 

612 # If any of the actions encountered errors, raise an 

613 # AnnotatedPartialOutputsError with the details. 

614 if outputs.__caughtErrors: 

615 e = PartialAtoolsFailureError(outputs.__caughtErrors) 

616 error = AnnotatedPartialOutputsError.annotate(e, self, log=self.log) 

617 raise error from e 

618 

619 def _populatePlotInfoWithDataId( 

620 self, plotInfo: MutableMapping[str, Any], dataId: DataCoordinate | None 

621 ) -> None: 

622 """Update the plotInfo with the dataId values. 

623 

624 Parameters 

625 ---------- 

626 plotInfo : `dict` 

627 The plotInfo dictionary to update. 

628 dataId : `lsst.daf.butler.DataCoordinate` 

629 The dataId to use to update the plotInfo. 

630 """ 

631 if dataId is not None: 

632 plotInfo.update(dataId.mapping) 

633 

634 def parsePlotInfo( 

635 self, inputs: Mapping[str, Any] | None, dataId: DataCoordinate | None, connectionName: str = "data" 

636 ) -> Mapping[str, str]: 

637 """Parse the inputs and dataId to get the information needed to 

638 to add to the figure. 

639 

640 Parameters 

641 ---------- 

642 inputs: `dict` 

643 The inputs to the task 

644 dataCoordinate: `lsst.daf.butler.DataCoordinate` 

645 The dataId that the task is being run on. 

646 connectionName: `str`, optional 

647 Name of the input connection to use for determining table name. 

648 

649 Returns 

650 ------- 

651 plotInfo : `dict` 

652 """ 

653 

654 if inputs is None: 

655 tableName = "" 

656 run = "" 

657 else: 

658 tableName = inputs[connectionName].ref.datasetType.name 

659 run = inputs[connectionName].ref.run 

660 

661 # Initialize the plot info dictionary 

662 plotInfo = {"tableName": tableName, "run": run} 

663 

664 self._populatePlotInfoWithDataId(plotInfo, dataId) 

665 return plotInfo 

666 

667 def loadData(self, handle: DeferredDatasetHandle, names: Iterable[str] | None = None) -> KeyedData: 

668 """Load the minimal set of keyed data from the input dataset. 

669 

670 Parameters 

671 ---------- 

672 handle : `DeferredDatasetHandle` 

673 Handle to load the dataset with only the specified columns. 

674 names : `Iterable` of `str` 

675 The names of keys to extract from the dataset. 

676 If `names` is `None` then the `collectInputNames` method 

677 is called to generate the names. 

678 For most purposes these are the names of columns to load from 

679 a catalog or data frame. 

680 

681 Returns 

682 ------- 

683 result: `KeyedData` 

684 The dataset with only the specified keys loaded. 

685 """ 

686 if names is None: 

687 names = self.collectInputNames() 

688 return cast(KeyedData, handle.get(parameters={"columns": names})) 

689 

690 def collectInputNames(self) -> Iterable[str]: 

691 """Get the names of the inputs. 

692 

693 If using the default `loadData` method this will gather the names 

694 of the keys to be loaded from an input dataset. 

695 

696 Returns 

697 ------- 

698 inputs : `Iterable` of `str` 

699 The names of the keys in the `KeyedData` object to extract. 

700 

701 """ 

702 inputs = set() 

703 

704 if not (localBands := self.config.bands): 

705 localBands = [""] 

706 for band in localBands: 

707 for action in self.config.atools: 

708 for key, _ in action.getFormattedInputSchema(band=band): 

709 inputs.add(key) 

710 return inputs