Coverage for python / lsst / pipe / base / pipeline.py: 21%

285 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:49 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Module defining Pipeline class and related methods.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ["LabelSpecifier", "Pipeline", "TaskDef"] 

33 

34import copy 

35import logging 

36import re 

37import urllib.parse 

38 

39# ------------------------------- 

40# Imports of standard modules -- 

41# ------------------------------- 

42from collections.abc import Callable, Set 

43from dataclasses import dataclass 

44from types import MappingProxyType 

45from typing import TYPE_CHECKING, cast 

46 

47# ----------------------------- 

48# Imports for other modules -- 

49# ----------------------------- 

50from lsst.daf.butler import DataCoordinate, DimensionUniverse, Registry 

51from lsst.resources import ResourcePath, ResourcePathExpression 

52from lsst.utils import doImportType 

53from lsst.utils.introspection import get_full_type_name 

54 

55from . import automatic_connection_constants as acc 

56from . import pipeline_graph, pipelineIR 

57from ._instrument import Instrument as Instrument 

58from .config import PipelineTaskConfig 

59from .connections import PipelineTaskConnections 

60from .pipelineTask import PipelineTask 

61 

62if TYPE_CHECKING: # Imports needed only for type annotations; may be circular. 

63 from lsst.pex.config import Config 

64 

65# ---------------------------------- 

66# Local non-exported definitions -- 

67# ---------------------------------- 

68 

69_LOG = logging.getLogger(__name__) 

70 

71# ------------------------ 

72# Exported definitions -- 

73# ------------------------ 

74 

75 

76@dataclass 

77class LabelSpecifier: 

78 """A structure to specify a subset of labels to load. 

79 

80 This structure may contain a set of labels to be used in subsetting a 

81 pipeline, or a beginning and end point. Beginning or end may be empty, in 

82 which case the range will be a half open interval. Unlike python iteration 

83 bounds, end bounds are *INCLUDED*. 

84 

85 There are multiple potential definitions of range-based slicing for graphs 

86 that are not a simple linear sequence. The definition used here is the 

87 intersection of the tasks downstream of ``begin`` and the tasks upstream of 

88 ``end``, i.e. tasks with no dependency relationship to a bounding task are 

89 not included. 

90 """ 

91 

92 labels: set[str] | None = None 

93 begin: str | None = None 

94 end: str | None = None 

95 

96 def __post_init__(self) -> None: 

97 if self.labels is not None and (self.begin or self.end): 

98 raise ValueError( 

99 "This struct can only be initialized with a labels set or a begin (and/or) end specifier" 

100 ) 

101 

102 

103class TaskDef: 

104 """TaskDef is a collection of information about task needed by Pipeline. 

105 

106 The information includes task name, configuration object and optional 

107 task class. This class is just a collection of attributes and it exposes 

108 all of them so that attributes could potentially be modified in place 

109 (e.g. if configuration needs extra overrides). 

110 

111 Parameters 

112 ---------- 

113 taskName : `str`, optional 

114 The fully-qualified `PipelineTask` class name. If not provided, 

115 ``taskClass`` must be. 

116 config : `lsst.pipe.base.config.PipelineTaskConfig`, optional 

117 Instance of the configuration class corresponding to this task class, 

118 usually with all overrides applied. This config will be frozen. If 

119 not provided, ``taskClass`` must be provided and 

120 ``taskClass.ConfigClass()`` will be used. 

121 taskClass : `type`, optional 

122 `PipelineTask` class object; if provided and ``taskName`` is as well, 

123 the caller guarantees that they are consistent. If not provided, 

124 ``taskName`` is used to import the type. 

125 label : `str`, optional 

126 Task label, usually a short string unique in a pipeline. If not 

127 provided, ``taskClass`` must be, and ``taskClass._DefaultName`` will 

128 be used. 

129 connections : `PipelineTaskConnections`, optional 

130 Object that describes the dataset types used by the task. If not 

131 provided, one will be constructed from the given configuration. If 

132 provided, it is assumed that ``config`` has already been validated 

133 and frozen. 

134 """ 

135 

136 def __init__( 

137 self, 

138 taskName: str | None = None, 

139 config: PipelineTaskConfig | None = None, 

140 taskClass: type[PipelineTask] | None = None, 

141 label: str | None = None, 

142 connections: PipelineTaskConnections | None = None, 

143 ): 

144 if taskName is None: 

145 if taskClass is None: 

146 raise ValueError("At least one of `taskName` and `taskClass` must be provided.") 

147 taskName = get_full_type_name(taskClass) 

148 elif taskClass is None: 

149 taskClass = doImportType(taskName) 

150 if config is None: 

151 if taskClass is None: 

152 raise ValueError("`taskClass` must be provided if `config` is not.") 

153 config = taskClass.ConfigClass() 

154 if label is None: 

155 if taskClass is None: 

156 raise ValueError("`taskClass` must be provided if `label` is not.") 

157 label = taskClass._DefaultName 

158 self.taskName = taskName 

159 if connections is None: 

160 # If we don't have connections yet, assume the config hasn't been 

161 # validated yet. 

162 try: 

163 config.validate() 

164 except Exception: 

165 _LOG.error("Configuration validation failed for task %s (%s)", label, taskName) 

166 raise 

167 config.freeze() 

168 connections = config.connections.ConnectionsClass(config=config) 

169 self.config = config 

170 self.taskClass = taskClass 

171 self.label = label 

172 self.connections = connections 

173 

174 @property 

175 def configDatasetName(self) -> str: 

176 """Name of a dataset type for configuration of this task (`str`).""" 

177 return acc.CONFIG_INIT_OUTPUT_TEMPLATE.format(label=self.label) 

178 

179 @property 

180 def metadataDatasetName(self) -> str: 

181 """Name of a dataset type for metadata of this task (`str`).""" 

182 return self.makeMetadataDatasetName(self.label) 

183 

184 @classmethod 

185 def makeMetadataDatasetName(cls, label: str) -> str: 

186 """Construct the name of the dataset type for metadata for a task. 

187 

188 Parameters 

189 ---------- 

190 label : `str` 

191 Label for the task within its pipeline. 

192 

193 Returns 

194 ------- 

195 name : `str` 

196 Name of the task's metadata dataset type. 

197 """ 

198 return acc.METADATA_OUTPUT_TEMPLATE.format(label=label) 

199 

200 @property 

201 def logOutputDatasetName(self) -> str | None: 

202 """Name of a dataset type for log output from this task, `None` if 

203 logs are not to be saved (`str`). 

204 """ 

205 if self.config.saveLogOutput: 

206 return acc.LOG_OUTPUT_TEMPLATE.format(label=self.label) 

207 else: 

208 return None 

209 

210 def __str__(self) -> str: 

211 rep = "TaskDef(" + self.taskName 

212 if self.label: 

213 rep += ", label=" + self.label 

214 rep += ")" 

215 return rep 

216 

217 def __eq__(self, other: object) -> bool: 

218 if not isinstance(other, TaskDef): 

219 return False 

220 # This does not consider equality of configs when determining equality 

221 # as config equality is a difficult thing to define. Should be updated 

222 # after DM-27847 

223 return self.taskClass == other.taskClass and self.label == other.label 

224 

225 def __hash__(self) -> int: 

226 return hash((self.taskClass, self.label)) 

227 

228 @classmethod 

229 def _unreduce(cls, taskName: str, config: PipelineTaskConfig, label: str) -> TaskDef: 

230 """Unpickle pickle. Custom callable for unpickling. 

231 

232 All arguments are forwarded directly to the constructor; this 

233 trampoline is only needed because ``__reduce__`` callables can't be 

234 called with keyword arguments. 

235 """ 

236 return cls(taskName=taskName, config=config, label=label) 

237 

238 def __reduce__(self) -> tuple[Callable[[str, PipelineTaskConfig, str], TaskDef], tuple[str, Config, str]]: 

239 return (self._unreduce, (self.taskName, self.config, self.label)) 

240 

241 

242class Pipeline: 

243 """A `Pipeline` is a representation of a series of tasks to run, and the 

244 configuration for those tasks. 

245 

246 Parameters 

247 ---------- 

248 description : `str` 

249 A description of that this pipeline does. 

250 """ 

251 

252 PipelineSubsetCtrl = pipelineIR.PipelineSubsetCtrl 

253 

254 def __init__(self, description: str): 

255 pipeline_dict = {"description": description, "tasks": {}} 

256 self._pipelineIR = pipelineIR.PipelineIR(pipeline_dict) 

257 

258 @classmethod 

259 def fromFile(cls, filename: str) -> Pipeline: 

260 """Load a pipeline defined in a pipeline yaml file. 

261 

262 Parameters 

263 ---------- 

264 filename : `str` 

265 A path that points to a pipeline defined in yaml format. This 

266 filename may also supply additional labels to be used in 

267 subsetting the loaded Pipeline. These labels are separated from 

268 the path by a ``#``, and may be specified as a comma separated 

269 list, or a range denoted as beginning..end. Beginning or end may 

270 be empty, in which case the range will be a half open interval. 

271 Unlike python iteration bounds, end bounds are *INCLUDED*. Note 

272 that range based selection is not well defined for pipelines that 

273 are not linear in nature, and correct behavior is not guaranteed, 

274 or may vary from run to run. 

275 

276 Returns 

277 ------- 

278 pipeline: `Pipeline` 

279 The pipeline loaded from specified location with appropriate (if 

280 any) subsetting. 

281 

282 Notes 

283 ----- 

284 This method attempts to prune any contracts that contain labels which 

285 are not in the declared subset of labels. This pruning is done using a 

286 string based matching due to the nature of contracts and may prune more 

287 than it should. 

288 """ 

289 return cls.from_uri(filename) 

290 

291 @classmethod 

292 def from_uri(cls, uri: ResourcePathExpression) -> Pipeline: 

293 """Load a pipeline defined in a pipeline yaml file at a location 

294 specified by a URI. 

295 

296 Parameters 

297 ---------- 

298 uri : convertible to `~lsst.resources.ResourcePath` 

299 If a string is supplied this should be a URI path that points to a 

300 pipeline defined in yaml format, either as a direct path to the 

301 yaml file, or as a directory containing a ``pipeline.yaml`` file 

302 the form used by `write_to_uri` with ``expand=True``). This uri may 

303 also supply additional labels to be used in subsetting the loaded 

304 `Pipeline`. These labels are separated from the path by a ``#``, 

305 and may be specified as a comma separated list, or a range denoted 

306 as beginning..end. Beginning or end may be empty, in which case the 

307 range will be a half open interval. Unlike python iteration bounds, 

308 end bounds are *INCLUDED*. Note that range based selection is not 

309 well defined for pipelines that are not linear in nature, and 

310 correct behavior is not guaranteed, or may vary from run to run. 

311 The same specifiers can be used with a 

312 `~lsst.resources.ResourcePath` object, by being the sole contents 

313 in the fragments attribute. 

314 

315 Returns 

316 ------- 

317 pipeline : `Pipeline` 

318 The pipeline loaded from specified location with appropriate (if 

319 any) subsetting. 

320 

321 Notes 

322 ----- 

323 This method attempts to prune any contracts that contain labels which 

324 are not in the declared subset of labels. This pruning is done using a 

325 string based matching due to the nature of contracts and may prune more 

326 than it should. 

327 """ 

328 # Split up the uri and any labels that were supplied 

329 uri, label_specifier = cls._parse_file_specifier(uri) 

330 pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri)) 

331 

332 # If there are labels supplied, only keep those 

333 if label_specifier is not None: 

334 pipeline = pipeline.subsetFromLabels(label_specifier) 

335 return pipeline 

336 

337 def subsetFromLabels( 

338 self, 

339 labelSpecifier: LabelSpecifier, 

340 subsetCtrl: pipelineIR.PipelineSubsetCtrl = PipelineSubsetCtrl.DROP, 

341 ) -> Pipeline: 

342 """Subset a pipeline to contain only labels specified in 

343 ``labelSpecifier``. 

344 

345 Parameters 

346 ---------- 

347 labelSpecifier : `labelSpecifier` 

348 Object containing labels that describes how to subset a pipeline. 

349 subsetCtrl : `PipelineSubsetCtrl` 

350 Control object which decides how subsets with missing labels are 

351 handled. Setting to `PipelineSubsetCtrl.DROP` (the default) will 

352 cause any subsets that have labels which are not in the set of all 

353 task labels to be dropped. Setting to `PipelineSubsetCtrl.EDIT` 

354 will cause the subset to instead be edited to remove the 

355 nonexistent label. 

356 

357 Returns 

358 ------- 

359 pipeline : `Pipeline` 

360 A new pipeline object that is a subset of the old pipeline. 

361 

362 Raises 

363 ------ 

364 ValueError 

365 Raised if there is an issue with specified labels 

366 

367 Notes 

368 ----- 

369 This method attempts to prune any contracts that contain labels which 

370 are not in the declared subset of labels. This pruning is done using a 

371 string based matching due to the nature of contracts and may prune more 

372 than it should. 

373 """ 

374 # Labels supplied as a set 

375 if labelSpecifier.labels: 

376 labelSet = labelSpecifier.labels 

377 # Labels supplied as a range, first create a list of all the labels 

378 # in the pipeline sorted according to task dependency. Then only 

379 # keep labels that lie between the supplied bounds 

380 else: 

381 # Create a copy of the pipeline to use when assessing the label 

382 # ordering. Use a dict for fast searching while preserving order. 

383 # Remove contracts so they do not fail in the expansion step. This 

384 # is needed because a user may only configure the tasks they intend 

385 # to run, which may cause some contracts to fail if they will later 

386 # be dropped 

387 pipeline = copy.deepcopy(self) 

388 pipeline._pipelineIR.contracts = [] 

389 graph = pipeline.to_graph() 

390 

391 # Verify the bounds are in the labels 

392 if labelSpecifier.begin is not None and labelSpecifier.begin not in graph.tasks: 

393 raise ValueError( 

394 f"Beginning of range subset, {labelSpecifier.begin}, not found in pipeline definition" 

395 ) 

396 if labelSpecifier.end is not None and labelSpecifier.end not in graph.tasks: 

397 raise ValueError( 

398 f"End of range subset, {labelSpecifier.end}, not found in pipeline definition" 

399 ) 

400 

401 labelSet = set(graph.tasks.between(labelSpecifier.begin, labelSpecifier.end)) 

402 return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet, subsetCtrl)) 

403 

404 @staticmethod 

405 def _parse_file_specifier(uri: ResourcePathExpression) -> tuple[ResourcePath, LabelSpecifier | None]: 

406 """Split appart a uri and any possible label subsets""" 

407 if isinstance(uri, str): 

408 # This is to support legacy pipelines during transition 

409 uri, num_replace = re.subn("[:](?!\\/\\/)", "#", uri) 

410 if num_replace: 

411 raise ValueError( 

412 f"The pipeline file {uri} seems to use the legacy :" 

413 " to separate labels, please use # instead." 

414 ) 

415 if uri.count("#") > 1: 

416 raise ValueError("Only one set of labels is allowed when specifying a pipeline to load") 

417 # Everything else can be converted directly to ResourcePath. 

418 uri = ResourcePath(uri) 

419 label_subset = uri.fragment or None 

420 

421 specifier: LabelSpecifier | None 

422 if label_subset is not None: 

423 label_subset = urllib.parse.unquote(label_subset) 

424 args: dict[str, set[str] | str | None] 

425 # labels supplied as a list 

426 if "," in label_subset: 

427 if ".." in label_subset: 

428 raise ValueError( 

429 "Can only specify a list of labels or a range when loading a Pipeline, not both." 

430 ) 

431 args = {"labels": set(label_subset.split(","))} 

432 # labels supplied as a range 

433 elif ".." in label_subset: 

434 # Try to de-structure the labelSubset, this will fail if more 

435 # than one range is specified 

436 begin, end, *rest = label_subset.split("..") 

437 if rest: 

438 raise ValueError("Only one range can be specified when loading a pipeline") 

439 args = {"begin": begin if begin else None, "end": end if end else None} 

440 # Assume anything else is a single label 

441 else: 

442 args = {"labels": {label_subset}} 

443 

444 # MyPy doesn't like how cavalier kwarg construction is with types. 

445 specifier = LabelSpecifier(**args) # type: ignore 

446 else: 

447 specifier = None 

448 

449 return uri, specifier 

450 

451 @classmethod 

452 def fromString(cls, pipeline_string: str) -> Pipeline: 

453 """Create a pipeline from string formatted as a pipeline document. 

454 

455 Parameters 

456 ---------- 

457 pipeline_string : `str` 

458 A string that is formatted according like a pipeline document. 

459 

460 Returns 

461 ------- 

462 pipeline: `Pipeline` 

463 The new pipeline. 

464 """ 

465 pipeline = cls.fromIR(pipelineIR.PipelineIR.from_string(pipeline_string)) 

466 return pipeline 

467 

468 @classmethod 

469 def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline: 

470 """Create a pipeline from an already created `PipelineIR` object. 

471 

472 Parameters 

473 ---------- 

474 deserialized_pipeline : `PipelineIR` 

475 An already created pipeline intermediate representation object. 

476 

477 Returns 

478 ------- 

479 pipeline: `Pipeline` 

480 The new pipeline. 

481 """ 

482 pipeline = cls.__new__(cls) 

483 pipeline._pipelineIR = deserialized_pipeline 

484 return pipeline 

485 

486 @classmethod 

487 def fromPipeline(cls, pipeline: Pipeline) -> Pipeline: 

488 """Create a new pipeline by copying an already existing `Pipeline`. 

489 

490 Parameters 

491 ---------- 

492 pipeline : `Pipeline` 

493 An already created pipeline intermediate representation object. 

494 

495 Returns 

496 ------- 

497 pipeline: `Pipeline` 

498 The new pipeline. 

499 """ 

500 return cls.fromIR(copy.deepcopy(pipeline._pipelineIR)) 

501 

502 def __str__(self) -> str: 

503 return str(self._pipelineIR) 

504 

505 def mergePipeline(self, pipeline: Pipeline) -> None: 

506 """Merge another in-memory `Pipeline` object into this one. 

507 

508 This merges another pipeline into this object, as if it were declared 

509 in the import block of the yaml definition of this pipeline. This 

510 modifies this pipeline in place. 

511 

512 Parameters 

513 ---------- 

514 pipeline : `Pipeline` 

515 The `Pipeline` object that is to be merged into this object. 

516 """ 

517 self._pipelineIR.merge_pipelines((pipeline._pipelineIR,)) 

518 

519 def addLabelToSubset(self, subset: str, label: str) -> None: 

520 """Add a task label from the specified subset. 

521 

522 Parameters 

523 ---------- 

524 subset : `str` 

525 The labeled subset to modify. 

526 label : `str` 

527 The task label to add to the specified subset. 

528 

529 Raises 

530 ------ 

531 ValueError 

532 Raised if the specified subset does not exist within the pipeline. 

533 Raised if the specified label does not exist within the pipeline. 

534 """ 

535 if label not in self._pipelineIR.tasks: 

536 raise ValueError(f"Label {label} does not appear within the pipeline") 

537 if subset not in self._pipelineIR.labeled_subsets: 

538 raise ValueError(f"Subset {subset} does not appear within the pipeline") 

539 self._pipelineIR.labeled_subsets[subset].subset.add(label) 

540 

541 def removeLabelFromSubset(self, subset: str, label: str) -> None: 

542 """Remove a task label from the specified subset. 

543 

544 Parameters 

545 ---------- 

546 subset : `str` 

547 The labeled subset to modify. 

548 label : `str` 

549 The task label to remove from the specified subset. 

550 

551 Raises 

552 ------ 

553 ValueError 

554 Raised if the specified subset does not exist in the pipeline. 

555 Raised if the specified label does not exist within the specified 

556 subset. 

557 """ 

558 if subset not in self._pipelineIR.labeled_subsets: 

559 raise ValueError(f"Subset {subset} does not appear within the pipeline") 

560 if label not in self._pipelineIR.labeled_subsets[subset].subset: 

561 raise ValueError(f"Label {label} does not appear within the pipeline") 

562 self._pipelineIR.labeled_subsets[subset].subset.remove(label) 

563 

564 def findSubsetsWithLabel(self, label: str) -> set[str]: 

565 """Find any subsets which may contain the specified label. 

566 

567 This function returns the name of subsets which return the specified 

568 label. May return an empty set if there are no subsets, or no subsets 

569 containing the specified label. 

570 

571 Parameters 

572 ---------- 

573 label : `str` 

574 The task label to use in membership check. 

575 

576 Returns 

577 ------- 

578 subsets : `set` of `str` 

579 Returns a set (possibly empty) of subsets names which contain the 

580 specified label. 

581 

582 Raises 

583 ------ 

584 ValueError 

585 Raised if the specified label does not exist within this pipeline. 

586 """ 

587 results = set() 

588 if label not in self._pipelineIR.tasks: 

589 raise ValueError(f"Label {label} does not appear within the pipeline") 

590 for subset in self._pipelineIR.labeled_subsets.values(): 

591 if label in subset.subset: 

592 results.add(subset.label) 

593 return results 

594 

595 @property 

596 def task_labels(self) -> Set[str]: 

597 """Labels of all tasks in the pipelines. 

598 

599 For simple pipelines with no imports, iteration over this set will 

600 match the order in which tasks are defined in the pipeline file. In 

601 all other cases the order is unspecified but deterministic. It is not 

602 dependency-ordered (use ``to_graph().tasks.keys()`` for that). 

603 """ 

604 return self._pipelineIR.tasks.keys() 

605 

606 @property 

607 def subsets(self) -> MappingProxyType[str, set]: 

608 """Returns a `types.MappingProxyType` where the keys are the labels of 

609 labeled subsets in the `Pipeline` and the values are the set of task 

610 labels contained within that subset. 

611 """ 

612 return MappingProxyType( 

613 {label: subsetIr.subset for label, subsetIr in self._pipelineIR.labeled_subsets.items()} 

614 ) 

615 

616 def addLabeledSubset(self, label: str, description: str, taskLabels: set[str]) -> None: 

617 """Add a new labeled subset to the `Pipeline`. 

618 

619 Parameters 

620 ---------- 

621 label : `str` 

622 The label to assign to the subset. 

623 description : `str` 

624 A description of what the subset is for. 

625 taskLabels : `set` [`str`] 

626 The set of task labels to be associated with the labeled subset. 

627 

628 Raises 

629 ------ 

630 ValueError 

631 Raised if label already exists in the `Pipeline`. 

632 Raised if a task label is not found within the `Pipeline`. 

633 """ 

634 if label in self._pipelineIR.labeled_subsets.keys(): 

635 raise ValueError(f"Subset label {label} is already found within the Pipeline") 

636 if extra := (taskLabels - self._pipelineIR.tasks.keys()): 

637 raise ValueError(f"Task labels {extra} were not found within the Pipeline") 

638 self._pipelineIR.labeled_subsets[label] = pipelineIR.LabeledSubset(label, taskLabels, description) 

639 

640 def removeLabeledSubset(self, label: str) -> None: 

641 """Remove a labeled subset from the `Pipeline`. 

642 

643 Parameters 

644 ---------- 

645 label : `str` 

646 The label of the subset to remove from the `Pipeline`. 

647 

648 Raises 

649 ------ 

650 ValueError 

651 Raised if the label is not found within the `Pipeline`. 

652 """ 

653 if label not in self._pipelineIR.labeled_subsets.keys(): 

654 raise ValueError(f"Subset label {label} was not found in the pipeline") 

655 self._pipelineIR.labeled_subsets.pop(label) 

656 

657 def addInstrument(self, instrument: Instrument | str) -> None: 

658 """Add an instrument to the pipeline, or replace an instrument that is 

659 already defined. 

660 

661 Parameters 

662 ---------- 

663 instrument : `~lsst.daf.butler.instrument.Instrument` or `str` 

664 Either a derived class object of a `lsst.daf.butler.instrument` or 

665 a string corresponding to a fully qualified 

666 `lsst.daf.butler.instrument` name. 

667 """ 

668 if isinstance(instrument, str): 

669 pass 

670 else: 

671 # TODO: assume that this is a subclass of Instrument, no type 

672 # checking 

673 instrument = get_full_type_name(instrument) 

674 self._pipelineIR.instrument = instrument 

675 

676 def getInstrument(self) -> str | None: 

677 """Get the instrument from the pipeline. 

678 

679 Returns 

680 ------- 

681 instrument : `str`, or None 

682 The fully qualified name of a `lsst.obs.base.Instrument` subclass, 

683 name, or None if the pipeline does not have an instrument. 

684 """ 

685 return self._pipelineIR.instrument 

686 

687 def get_data_id(self, universe: DimensionUniverse) -> DataCoordinate: 

688 """Return a data ID with all dimension constraints embedded in the 

689 pipeline. 

690 

691 Parameters 

692 ---------- 

693 universe : `lsst.daf.butler.DimensionUniverse` 

694 Object that defines all dimensions. 

695 

696 Returns 

697 ------- 

698 data_id : `lsst.daf.butler.DataCoordinate` 

699 Data ID with all dimension constraints embedded in the 

700 pipeline. 

701 """ 

702 instrument_class_name = self._pipelineIR.instrument 

703 if instrument_class_name is not None: 

704 instrument_class = cast(Instrument, doImportType(instrument_class_name)) 

705 if instrument_class is not None: 

706 return DataCoordinate.standardize(instrument=instrument_class.getName(), universe=universe) 

707 return DataCoordinate.make_empty(universe) 

708 

709 def addTask(self, task: type[PipelineTask] | str, label: str) -> None: 

710 """Add a new task to the pipeline, or replace a task that is already 

711 associated with the supplied label. 

712 

713 Parameters 

714 ---------- 

715 task : `PipelineTask` or `str` 

716 Either a derived class object of a `PipelineTask` or a string 

717 corresponding to a fully qualified `PipelineTask` name. 

718 label : `str` 

719 A label that is used to identify the `PipelineTask` being added. 

720 """ 

721 if isinstance(task, str): 

722 taskName = task 

723 elif issubclass(task, PipelineTask): 

724 taskName = get_full_type_name(task) 

725 else: 

726 raise ValueError( 

727 "task must be either a child class of PipelineTask or a string containing" 

728 " a fully qualified name to one" 

729 ) 

730 if not label: 

731 # in some cases (with command line-generated pipeline) tasks can 

732 # be defined without label which is not acceptable, use task 

733 # _DefaultName in that case 

734 if isinstance(task, str): 

735 task_class = cast(PipelineTask, doImportType(task)) 

736 label = task_class._DefaultName 

737 self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName) 

738 

739 def removeTask(self, label: str) -> None: 

740 """Remove a task from the pipeline. 

741 

742 Parameters 

743 ---------- 

744 label : `str` 

745 The label used to identify the task that is to be removed. 

746 

747 Raises 

748 ------ 

749 KeyError 

750 If no task with that label exists in the pipeline. 

751 """ 

752 self._pipelineIR.tasks.pop(label) 

753 

754 def addConfigOverride(self, label: str, key: str, value: object) -> None: 

755 """Apply single config override. 

756 

757 Parameters 

758 ---------- 

759 label : `str` 

760 Label of the task. 

761 key : `str` 

762 Fully-qualified field name. 

763 value : object 

764 Value to be given to a field. 

765 """ 

766 self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value})) 

767 

768 def addConfigFile(self, label: str, filename: str) -> None: 

769 """Add overrides from a specified file. 

770 

771 Parameters 

772 ---------- 

773 label : `str` 

774 The label used to identify the task associated with config to 

775 modify. 

776 filename : `str` 

777 Path to the override file. 

778 """ 

779 self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename])) 

780 

781 def addConfigPython(self, label: str, pythonString: str) -> None: 

782 """Add Overrides by running a snippet of python code against a config. 

783 

784 Parameters 

785 ---------- 

786 label : `str` 

787 The label used to identity the task associated with config to 

788 modify. 

789 pythonString : `str` 

790 A string which is valid python code to be executed. This is done 

791 with config as the only local accessible value. 

792 """ 

793 self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString)) 

794 

795 def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR) -> None: 

796 if label == "parameters": 

797 self._pipelineIR.parameters.mapping.update(newConfig.rest) 

798 if newConfig.file: 

799 raise ValueError("Setting parameters section with config file is not supported") 

800 if newConfig.python: 

801 raise ValueError("Setting parameters section using python block in unsupported") 

802 return 

803 if label not in self._pipelineIR.tasks: 

804 raise LookupError(f"There are no tasks labeled '{label}' in the pipeline") 

805 self._pipelineIR.tasks[label].add_or_update_config(newConfig) 

806 

807 def write_to_uri(self, uri: ResourcePathExpression) -> None: 

808 """Write the pipeline to a file or directory. 

809 

810 Parameters 

811 ---------- 

812 uri : convertible to `~lsst.resources.ResourcePath` 

813 URI to write to; may have any scheme with 

814 `~lsst.resources.ResourcePath` write support or no scheme for a 

815 local file/directory. Should have a ``.yaml`` extension. 

816 """ 

817 self._pipelineIR.write_to_uri(uri) 

818 

819 def to_graph( 

820 self, registry: Registry | None = None, visualization_only: bool = False 

821 ) -> pipeline_graph.PipelineGraph: 

822 """Construct a pipeline graph from this pipeline. 

823 

824 Constructing a graph applies all configuration overrides, freezes all 

825 configuration, checks all contracts, and checks for dataset type 

826 consistency between tasks (as much as possible without access to a data 

827 repository). It cannot be reversed. 

828 

829 Parameters 

830 ---------- 

831 registry : `lsst.daf.butler.Registry`, optional 

832 Data repository client. If provided, the graph's dataset types 

833 and dimensions will be resolved (see `PipelineGraph.resolve`). 

834 visualization_only : `bool`, optional 

835 Resolve the graph as well as possible even when dimensions and 

836 storage classes cannot really be determined. This can include 

837 using the ``universe.commonSkyPix`` as the assumed dimensions of 

838 connections that use the "skypix" placeholder and using "<UNKNOWN>" 

839 as a storage class name (which will fail if the storage class 

840 itself is ever actually loaded). 

841 

842 Returns 

843 ------- 

844 graph : `pipeline_graph.PipelineGraph` 

845 Representation of the pipeline as a graph. 

846 """ 

847 instrument_class_name = self._pipelineIR.instrument 

848 data_id = {} 

849 if instrument_class_name is not None: 

850 instrument_class: type[Instrument] = doImportType(instrument_class_name) 

851 if instrument_class is not None: 

852 data_id["instrument"] = instrument_class.getName() 

853 graph = pipeline_graph.PipelineGraph(data_id=data_id) 

854 graph.description = self._pipelineIR.description 

855 for label in self._pipelineIR.tasks: 

856 self._add_task_to_graph(label, graph) 

857 if self._pipelineIR.contracts is not None: 

858 label_to_config = {x.label: x.config for x in graph.tasks.values()} 

859 for contract in self._pipelineIR.contracts: 

860 # execute this in its own line so it can raise a good error 

861 # message if there was problems with the eval 

862 success = eval(contract.contract, None, label_to_config) 

863 if not success: 

864 extra_info = f": {contract.msg}" if contract.msg is not None else "" 

865 raise pipelineIR.ContractError( 

866 f"Contract(s) '{contract.contract}' were not satisfied{extra_info}" 

867 ) 

868 for label, subset in self._pipelineIR.labeled_subsets.items(): 

869 graph.add_task_subset( 

870 label, subset.subset, subset.description if subset.description is not None else "" 

871 ) 

872 for step_ir in self._pipelineIR.steps: 

873 graph.steps.append(step_ir.label, step_ir.dimensions) 

874 if registry is not None or visualization_only: 

875 graph.resolve(registry=registry, visualization_only=visualization_only) 

876 else: 

877 graph.sort() 

878 return graph 

879 

880 def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) -> None: 

881 """Add a single task from this pipeline to a pipeline graph that is 

882 under construction. 

883 

884 Parameters 

885 ---------- 

886 label : `str` 

887 Label for the task to be added. 

888 graph : `pipeline_graph.PipelineGraph` 

889 Graph to add the task to. 

890 """ 

891 if (taskIR := self._pipelineIR.tasks.get(label)) is None: 

892 raise NameError(f"Label {label} does not appear in this pipeline") 

893 taskClass: type[PipelineTask] = doImportType(taskIR.klass) 

894 config = taskClass.ConfigClass() 

895 instrument: Instrument | None = None 

896 if (instrumentName := self._pipelineIR.instrument) is not None: 

897 instrument_cls: type = doImportType(instrumentName) 

898 instrument = instrument_cls() 

899 config.applyConfigOverrides( 

900 instrument, 

901 getattr(taskClass, "_DefaultName", ""), 

902 taskIR.config, 

903 self._pipelineIR.parameters, 

904 label, 

905 ) 

906 graph.add_task(label, taskClass, config) 

907 

908 def __len__(self) -> int: 

909 return len(self._pipelineIR.tasks) 

910 

911 def __eq__(self, other: object) -> bool: 

912 if not isinstance(other, Pipeline): 

913 return False 

914 elif self._pipelineIR == other._pipelineIR: 

915 # Shortcut: if the IR is the same, the expanded pipeline must be 

916 # the same as well. But the converse is not true. 

917 return True 

918 else: 

919 # Compare as much as we can (task classes and their edges). 

920 if self.to_graph().diff_tasks(other.to_graph()): 

921 return False 

922 # After DM-27847, we should compare configuration here. 

923 raise NotImplementedError( 

924 "Pipelines cannot be compared because config instances cannot be compared; see DM-27847." 

925 )