Coverage for python / lsst / pipe / base / pipelineIR.py: 18%

474 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ( 

30 "ConfigIR", 

31 "ContractError", 

32 "ContractIR", 

33 "ImportIR", 

34 "LabeledSubset", 

35 "ParametersIR", 

36 "PipelineIR", 

37 "TaskIR", 

38) 

39 

40import copy 

41import enum 

42import os 

43import re 

44import warnings 

45from collections import Counter 

46from collections.abc import Generator, Hashable, Iterable, MutableMapping 

47from dataclasses import dataclass, field 

48from typing import Any, Literal, cast 

49 

50import yaml 

51 

52from lsst.resources import ResourcePath, ResourcePathExpression 

53from lsst.utils.introspection import find_outside_stacklevel 

54 

55 

56class PipelineSubsetCtrl(enum.Enum): 

57 """An Enumeration of the various ways a pipeline subsetting operation will 

58 handle labeled subsets when task labels they defined are missing. 

59 """ 

60 

61 DROP = enum.auto() 

62 """Drop any subsets that contain labels which are no longer in the set of 

63 task labels when subsetting an entire pipeline 

64 """ 

65 EDIT = enum.auto() 

66 """Edit any subsets that contain labels which are no longer in the set of 

67 task labels to remove the missing label, but leave the subset when 

68 subsetting a pipeline. 

69 """ 

70 

71 

72class _Tags(enum.Enum): 

73 KeepInstrument = enum.auto() 

74 

75 

76class PipelineYamlLoader(yaml.SafeLoader): 

77 """Specialized version of yaml's SafeLoader. 

78 

79 It checks and raises an exception if it finds that there are multiple 

80 instances of the same key found inside a pipeline file at a given scope. 

81 """ 

82 

83 def construct_mapping(self, node: yaml.MappingNode, deep: bool = False) -> dict[Hashable, Any]: 

84 # do the call to super first so that it can do all the other forms of 

85 # checking on this node. If you check the uniqueness of keys first 

86 # it would save the work that super does in the case of a failure, but 

87 # it might fail in the case that the node was the incorrect node due 

88 # to a parsing error, and the resulting exception would be difficult to 

89 # understand. 

90 mapping = super().construct_mapping(node, deep) 

91 # Check if there are any duplicate keys 

92 all_keys = Counter(key_node.value for key_node, _ in node.value) 

93 duplicates = {k for k, i in all_keys.items() if i != 1} 

94 if duplicates: 

95 raise KeyError( 

96 f"Pipeline files must not have duplicated keys, {duplicates} appeared multiple times" 

97 ) 

98 return mapping 

99 

100 

101class MultilineStringDumper(yaml.Dumper): 

102 """Custom YAML dumper that makes multi-line strings use the '|' 

103 continuation style instead of unreadable newlines and tons of quotes. 

104 

105 Basic approach is taken from 

106 https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data, 

107 but is written as a Dumper subclass to make its effects non-global (vs 

108 `yaml.add_representer`). 

109 """ 

110 

111 def represent_scalar(self, tag: str, value: Any, style: str | None = None) -> yaml.ScalarNode: 

112 if style is None and tag == "tag:yaml.org,2002:str" and len(value.splitlines()) > 1: 

113 style = "|" 

114 return super().represent_scalar(tag, value, style) 

115 

116 

117class ContractError(Exception): 

118 """An exception that is raised when a pipeline contract is not 

119 satisfied. 

120 """ 

121 

122 pass 

123 

124 

125@dataclass 

126class ContractIR: 

127 """Intermediate representation of configuration contracts read from a 

128 pipeline yaml file. 

129 """ 

130 

131 contract: str 

132 """A string of python code representing one or more conditions on configs 

133 in a pipeline. This code-as-string should, once evaluated, should be True 

134 if the configs are fine, and False otherwise. 

135 """ 

136 msg: str | None = None 

137 """An optional message to be shown to the user if a contract fails 

138 """ 

139 

140 def to_primitives(self) -> dict[str, str]: 

141 """Convert to a representation used in yaml serialization.""" 

142 accumulate = {"contract": self.contract} 

143 if self.msg is not None: 

144 accumulate["msg"] = self.msg 

145 return accumulate 

146 

147 def __eq__(self, other: object) -> bool: 

148 if not isinstance(other, ContractIR): 

149 return False 

150 return self.contract == other.contract and self.msg == other.msg 

151 

152 

153@dataclass 

154class LabeledSubset: 

155 """Intermediate representation of named subset of task labels read from 

156 a pipeline yaml file. 

157 """ 

158 

159 label: str 

160 """The label used to identify the subset of task labels. 

161 """ 

162 subset: set[str] 

163 """A set of task labels contained in this subset. 

164 """ 

165 description: str | None 

166 """A description of what this subset of tasks is intended to do 

167 """ 

168 

169 @staticmethod 

170 def from_primitives(label: str, value: list[str] | dict) -> LabeledSubset: 

171 """Generate `LabeledSubset` objects given a properly formatted object 

172 that as been created by a yaml loader. 

173 

174 Parameters 

175 ---------- 

176 label : `str` 

177 The label that will be used to identify this labeled subset. 

178 value : `list` of `str` or `dict` 

179 Object returned from loading a labeled subset section from a yaml 

180 document. 

181 

182 Returns 

183 ------- 

184 labeledSubset : `LabeledSubset` 

185 A `LabeledSubset` object build from the inputs. 

186 

187 Raises 

188 ------ 

189 ValueError 

190 Raised if the value input is not properly formatted for parsing 

191 """ 

192 if isinstance(value, MutableMapping): 

193 subset = value.pop("subset", None) 

194 if subset is None: 

195 raise ValueError( 

196 "If a labeled subset is specified as a mapping, it must contain the key 'subset'" 

197 ) 

198 description = value.pop("description", None) 

199 elif isinstance(value, Iterable): 

200 subset = value 

201 description = None 

202 else: 

203 raise ValueError( 

204 f"There was a problem parsing the labeled subset {label}, make sure the " 

205 "definition is either a valid yaml list, or a mapping with keys " 

206 "(subset, description) where subset points to a yaml list, and description is " 

207 "associated with a string" 

208 ) 

209 return LabeledSubset(label, set(subset), description) 

210 

211 def to_primitives(self) -> dict[str, list[str] | str]: 

212 """Convert to a representation used in yaml serialization.""" 

213 accumulate: dict[str, list[str] | str] = {"subset": list(self.subset)} 

214 if self.description is not None: 

215 accumulate["description"] = self.description 

216 return accumulate 

217 

218 

219@dataclass 

220class ParametersIR: 

221 """Intermediate representation of parameters that are global to a pipeline. 

222 

223 Notes 

224 ----- 

225 These parameters are specified under a top level key named ``parameters`` 

226 and are declared as a yaml mapping. These entries can then be used inside 

227 task configuration blocks to specify configuration values. They may not be 

228 used in the special ``file`` or ``python`` blocks. 

229 

230 Examples 

231 -------- 

232 .. code-block:: yaml 

233 

234 \u200bparameters: 

235 shared_value: 14 

236 tasks: 

237 taskA: 

238 class: modA 

239 config: 

240 field1: parameters.shared_value 

241 taskB: 

242 class: modB 

243 config: 

244 field2: parameters.shared_value 

245 """ 

246 

247 mapping: MutableMapping[str, Any] 

248 """A mutable mapping of identifiers as keys, and shared configuration 

249 as values. 

250 """ 

251 

252 def update(self, other: ParametersIR | None) -> None: 

253 if other is not None: 

254 self.mapping.update(other.mapping) 

255 

256 def to_primitives(self) -> MutableMapping[str, str]: 

257 """Convert to a representation used in yaml serialization.""" 

258 return self.mapping 

259 

260 def __contains__(self, value: str) -> bool: 

261 return value in self.mapping 

262 

263 def __getitem__(self, item: str) -> Any: 

264 return self.mapping[item] 

265 

266 def __bool__(self) -> bool: 

267 return bool(self.mapping) 

268 

269 

270@dataclass 

271class ConfigIR: 

272 """Intermediate representation of configurations read from a pipeline yaml 

273 file. 

274 """ 

275 

276 python: str | None = None 

277 """A string of python code that is used to modify a configuration. This can 

278 also be None if there are no modifications to do. 

279 """ 

280 dataId: dict | None = None 

281 """A dataId that is used to constrain these config overrides to only quanta 

282 with matching dataIds. This field can be None if there is no constraint. 

283 This is currently an unimplemented feature, and is placed here for future 

284 use. 

285 """ 

286 file: list[str] = field(default_factory=list) 

287 """A list of paths which points to a file containing config overrides to be 

288 applied. This value may be an empty list if there are no overrides to 

289 apply. 

290 """ 

291 rest: dict = field(default_factory=dict) 

292 """This is a dictionary of key value pairs, where the keys are strings 

293 corresponding to qualified fields on a config to override, and the values 

294 are strings representing the values to apply. 

295 """ 

296 

297 def to_primitives(self) -> dict[str, str | dict | list[str]]: 

298 """Convert to a representation used in yaml serialization.""" 

299 accumulate = {} 

300 for name in ("python", "dataId", "file"): 

301 # if this attribute is truthy add it to the accumulation 

302 # dictionary 

303 if getattr(self, name): 

304 accumulate[name] = getattr(self, name) 

305 # Add the dictionary containing the rest of the config keys to the 

306 # # accumulated dictionary 

307 accumulate.update(self.rest) 

308 return accumulate 

309 

310 def formatted(self, parameters: ParametersIR) -> ConfigIR: 

311 """Return a new ConfigIR object that is formatted according to the 

312 specified parameters. 

313 

314 Parameters 

315 ---------- 

316 parameters : `ParametersIR` 

317 Object that contains variable mappings used in substitution. 

318 

319 Returns 

320 ------- 

321 config : `ConfigIR` 

322 A new ConfigIR object formatted with the input parameters. 

323 """ 

324 new_config = copy.deepcopy(self) 

325 for key, value in new_config.rest.items(): 

326 if not isinstance(value, str): 

327 continue 

328 match = re.match("parameters[.](.*)", value) 

329 if match and match.group(1) in parameters: 

330 new_config.rest[key] = parameters[match.group(1)] 

331 if match and match.group(1) not in parameters: 

332 warnings.warn( 

333 f"config {key} contains value {match.group(0)} which is formatted like a " 

334 "Pipeline parameter but was not found within the Pipeline, if this was not " 

335 "intentional, check for a typo", 

336 stacklevel=find_outside_stacklevel("lsst.pipe.base"), 

337 ) 

338 return new_config 

339 

340 def maybe_merge(self, other_config: ConfigIR) -> Generator[ConfigIR]: 

341 """Merge another instance of a `ConfigIR` into this instance if 

342 possible. This function returns a generator that is either self 

343 if the configs were merged, or self, and other_config if that could 

344 not be merged. 

345 

346 Parameters 

347 ---------- 

348 other_config : `ConfigIR` 

349 An instance of `ConfigIR` to merge into this instance. 

350 

351 Yields 

352 ------ 

353 Generator : `ConfigIR` 

354 A generator containing either self, or self and other_config if 

355 the configs could be merged or not respectively. 

356 """ 

357 # Verify that the config blocks can be merged 

358 if ( 

359 self.dataId != other_config.dataId 

360 or self.python 

361 or other_config.python 

362 or self.file 

363 or other_config.file 

364 ): 

365 yield from (self, other_config) 

366 return 

367 

368 # create a set of all keys, and verify two keys do not have different 

369 # values 

370 key_union = self.rest.keys() & other_config.rest.keys() 

371 for key in key_union: 

372 if self.rest[key] != other_config.rest[key]: 

373 yield from (self, other_config) 

374 return 

375 self.rest.update(other_config.rest) 

376 

377 # Combine the lists of override files to load 

378 self_file_set = set(self.file) 

379 other_file_set = set(other_config.file) 

380 self.file = list(self_file_set.union(other_file_set)) 

381 

382 yield self 

383 

384 def __eq__(self, other: object) -> bool: 

385 if not isinstance(other, ConfigIR): 

386 return False 

387 return all( 

388 getattr(self, attr) == getattr(other, attr) for attr in ("python", "dataId", "file", "rest") 

389 ) 

390 

391 

392@dataclass 

393class TaskIR: 

394 """Intermediate representation of tasks read from a pipeline yaml file.""" 

395 

396 label: str 

397 """An identifier used to refer to a task. 

398 """ 

399 klass: str 

400 """A string containing a fully qualified python class to be run in a 

401 pipeline. 

402 """ 

403 config: list[ConfigIR] | None = None 

404 """list of all configs overrides associated with this task, and may be 

405 `None` if there are no config overrides. 

406 """ 

407 

408 def to_primitives(self) -> dict[str, str | list[dict]]: 

409 """Convert to a representation used in yaml serialization.""" 

410 accumulate: dict[str, str | list[dict]] = {"class": self.klass} 

411 if self.config: 

412 accumulate["config"] = [c.to_primitives() for c in self.config] 

413 return accumulate 

414 

415 def add_or_update_config(self, other_config: ConfigIR) -> None: 

416 """Add a `ConfigIR` to this task if one is not present. Merges configs 

417 if there is a `ConfigIR` present and the dataId keys of both configs 

418 match, otherwise adds a new entry to the config list. The exception to 

419 the above is that if either the last config or other_config has a 

420 python block, then other_config is always added, as python blocks can 

421 modify configs in ways that cannot be predicted. 

422 

423 Parameters 

424 ---------- 

425 other_config : `ConfigIR` 

426 A `ConfigIR` instance to add or merge into the config attribute of 

427 this task. 

428 """ 

429 if not self.config: 

430 self.config = [other_config] 

431 return 

432 self.config.extend(self.config.pop().maybe_merge(other_config)) 

433 

434 def __eq__(self, other: object) -> bool: 

435 if not isinstance(other, TaskIR): 

436 return False 

437 return all(getattr(self, attr) == getattr(other, attr) for attr in ("label", "klass", "config")) 

438 

439 

440@dataclass 

441class ImportIR: 

442 """An intermediate representation of imported pipelines.""" 

443 

444 location: str 

445 """This is the location of the pipeline to inherit. The path should be 

446 specified as an absolute path. Environment variables may be used in the 

447 path and should be specified as a python string template, with the name of 

448 the environment variable inside braces. 

449 """ 

450 include: list[str] | None = None 

451 """list of tasks that should be included when inheriting this pipeline. 

452 Either the include or exclude attributes may be specified, but not both. 

453 """ 

454 exclude: list[str] | None = None 

455 """list of tasks that should be excluded when inheriting this pipeline. 

456 Either the include or exclude attributes may be specified, but not both. 

457 """ 

458 rename: dict[str, str] = field(default_factory=dict) 

459 """dict of tasks to rename, keyed by old name with new name value.""" 

460 importContracts: bool = True 

461 """Boolean attribute to dictate if contracts should be inherited with the 

462 pipeline or not. 

463 """ 

464 importSteps: bool = True 

465 """Boolean attribute to dictate if steps should be inherited with the 

466 pipeline or not. 

467 """ 

468 labeledSubsetModifyMode: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP 

469 """Controls how labeled subsets are handled when an import ends up not 

470 including (either through an include or exclusion list) a task label that 

471 is defined in the `Pipeline` being imported. DROP will remove any 

472 subsets which contain a missing label. EDIT will change any subsets to not 

473 include the missing label. 

474 """ 

475 instrument: Literal[_Tags.KeepInstrument] | str | None = _Tags.KeepInstrument 

476 """Instrument to assign to the Pipeline at import. The default value of 

477 `_Tags.KeepInstrument`` indicates that whatever instrument the pipeline is 

478 declared with will not be modified. setting this value to None will drop 

479 any declared instrument prior to import. 

480 """ 

481 

482 def toPipelineIR(self) -> PipelineIR: 

483 """Load in the Pipeline specified by this object, and turn it into a 

484 PipelineIR instance. 

485 

486 Returns 

487 ------- 

488 pipeline : `PipelineIR` 

489 A pipeline generated from the imported pipeline file. 

490 """ 

491 if self.include and self.exclude: 

492 raise ValueError( 

493 "An include list and an exclude list cannot both be specified" 

494 " when declaring a pipeline import." 

495 ) 

496 if rename_keys := self.rename.keys(): 

497 rename_values_set = set(self.rename.values()) 

498 if len(rename_values_set) != len(rename_keys): 

499 raise ValueError(f"rename {rename_keys=} must not have duplicates") 

500 if rename_values_set.intersection(rename_keys): 

501 raise ValueError( 

502 f"rename keys={rename_keys} must not intersect with values={self.rename.values()}" 

503 ) 

504 

505 tmp_pipeline = PipelineIR.from_uri(os.path.expandvars(self.location)) 

506 if self.instrument is not _Tags.KeepInstrument: 

507 tmp_pipeline.instrument = self.instrument 

508 

509 included_labels = set() 

510 renamed_tasks = {} 

511 for label in tmp_pipeline.tasks: 

512 is_included = self.include and label in self.include 

513 if ( 

514 is_included 

515 or (self.exclude and label not in self.exclude) 

516 or (self.include is None and self.exclude is None) 

517 ): 

518 if (label_new := self.rename.get(label)) is not None: 

519 renamed_tasks[label] = label_new 

520 if is_included: 

521 self.include = [ 

522 label_new if (x == label) else label for x in cast(list[str], self.include) 

523 ] 

524 else: 

525 label_new = label 

526 included_labels.add(label_new) 

527 

528 rename_errors = [] 

529 for label, label_new in renamed_tasks.items(): 

530 if label_new in tmp_pipeline.tasks: 

531 rename_errors.append(f"Can't rename {label=} to existing {label_new=}") 

532 else: 

533 task = tmp_pipeline.tasks.pop(label) 

534 task.label = label_new 

535 tmp_pipeline.tasks[label_new] = task 

536 

537 if rename_errors: 

538 raise ValueError("; ".join(rename_errors)) 

539 

540 # Handle labeled subsets being specified in the include or exclude 

541 # list, adding or removing labels. 

542 if self.include is not None: 

543 subsets_in_include = tmp_pipeline.labeled_subsets.keys() & self.include 

544 for label in subsets_in_include: 

545 included_labels.update(tmp_pipeline.labeled_subsets[label].subset) 

546 

547 elif self.exclude is not None: 

548 subsets_in_exclude = tmp_pipeline.labeled_subsets.keys() & self.exclude 

549 for label in subsets_in_exclude: 

550 included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset) 

551 

552 if not self.importSteps: 

553 tmp_pipeline.steps = [] 

554 

555 tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels, self.labeledSubsetModifyMode) 

556 

557 if not self.importContracts: 

558 tmp_pipeline.contracts = [] 

559 

560 return tmp_pipeline 

561 

562 def __eq__(self, other: object) -> bool: 

563 if not isinstance(other, ImportIR): 

564 return False 

565 return all( 

566 getattr(self, attr) == getattr(other, attr) 

567 for attr in ("location", "include", "exclude", "importContracts") 

568 ) 

569 

570 

571@dataclass 

572class StepIR: 

573 """Intermediate representation of a step definition.""" 

574 

575 label: str 

576 """The label associated with this step.""" 

577 

578 dimensions: list[str] 

579 """The dimensions to use when sharding this step.""" 

580 

581 

582class PipelineIR: 

583 """Intermediate representation of a pipeline definition. 

584 

585 Parameters 

586 ---------- 

587 loaded_yaml : `dict` 

588 A dictionary which matches the structure that would be produced by a 

589 yaml reader which parses a pipeline definition document. 

590 

591 Raises 

592 ------ 

593 ValueError 

594 Raised if: 

595 

596 - a pipeline is declared without a description; 

597 - no tasks are declared in a pipeline, and no pipelines are to be 

598 inherited; 

599 - more than one instrument is specified; 

600 - more than one inherited pipeline share a label. 

601 """ 

602 

603 def __init__(self, loaded_yaml: dict[str, Any]): 

604 # Check required fields are present 

605 if "description" not in loaded_yaml: 

606 raise ValueError("A pipeline must be declared with a description") 

607 if "tasks" not in loaded_yaml and len({"imports", "inherits"} - loaded_yaml.keys()) == 2: 

608 raise ValueError("A pipeline must be declared with one or more tasks") 

609 

610 # These steps below must happen in this call order 

611 

612 # Process pipeline description 

613 self.description = loaded_yaml.pop("description") 

614 

615 # Process tasks 

616 self._read_tasks(loaded_yaml) 

617 

618 # Process instrument keys 

619 inst = loaded_yaml.pop("instrument", None) 

620 if isinstance(inst, list): 

621 raise ValueError("Only one top level instrument can be defined in a pipeline") 

622 self.instrument: str | None = inst 

623 

624 # Process any contracts 

625 self._read_contracts(loaded_yaml) 

626 

627 # Process any defined parameters 

628 self._read_parameters(loaded_yaml) 

629 

630 # Process any named label subsets 

631 self._read_labeled_subsets(loaded_yaml) 

632 

633 # Process defined sets 

634 self._read_step_declaration(loaded_yaml) 

635 

636 # Process any inherited pipelines 

637 self._read_imports(loaded_yaml) 

638 

639 # verify named subsets, must be done after inheriting 

640 self._verify_labeled_subsets() 

641 

642 # verify steps, must be done after inheriting 

643 self._verify_steps() 

644 

645 def _read_contracts(self, loaded_yaml: dict[str, Any]) -> None: 

646 """Process the contracts portion of the loaded yaml document 

647 

648 Parameters 

649 ---------- 

650 loaded_yaml : `dict` 

651 A dictionary which matches the structure that would be produced by 

652 a yaml reader which parses a pipeline definition document 

653 """ 

654 loaded_contracts = loaded_yaml.pop("contracts", []) 

655 if isinstance(loaded_contracts, str): 

656 loaded_contracts = [loaded_contracts] 

657 self.contracts: list[ContractIR] = [] 

658 for contract in loaded_contracts: 

659 if isinstance(contract, dict): 

660 self.contracts.append(ContractIR(**contract)) 

661 if isinstance(contract, str): 

662 self.contracts.append(ContractIR(contract=contract)) 

663 

664 def _read_parameters(self, loaded_yaml: dict[str, Any]) -> None: 

665 """Process the parameters portion of the loaded yaml document 

666 

667 Parameters 

668 ---------- 

669 loaded_yaml : `dict` 

670 A dictionary which matches the structure that would be produced by 

671 a yaml reader which parses a pipeline definition document 

672 """ 

673 loaded_parameters = loaded_yaml.pop("parameters", {}) 

674 if not isinstance(loaded_parameters, dict): 

675 raise ValueError("The parameters section must be a yaml mapping") 

676 self.parameters = ParametersIR(loaded_parameters) 

677 

678 def _read_labeled_subsets(self, loaded_yaml: dict[str, Any]) -> None: 

679 """Process the subsets portion of the loaded yaml document 

680 

681 Parameters 

682 ---------- 

683 loaded_yaml : `MutableMapping` 

684 A dictionary which matches the structure that would be produced 

685 by a yaml reader which parses a pipeline definition document 

686 """ 

687 loaded_subsets = loaded_yaml.pop("subsets", {}) 

688 self.labeled_subsets: dict[str, LabeledSubset] = {} 

689 if not loaded_subsets and "subset" in loaded_yaml: 

690 raise ValueError("Top level key should be subsets and not subset, add an s") 

691 for key, value in loaded_subsets.items(): 

692 self.labeled_subsets[key] = LabeledSubset.from_primitives(key, value) 

693 

694 def _read_step_declaration(self, loaded_yaml: dict[str, Any]) -> None: 

695 """Process the steps portion of the loaded yaml document 

696 

697 Steps are subsets that are declared to be normal parts of the overall 

698 processing of the pipeline. Not all subsets need to be a step, as they 

699 can exist for certain targeted processing, such as debugging. 

700 

701 Parameters 

702 ---------- 

703 loaded_yaml : `dict` 

704 A dictionary which matches the structure that would be produced 

705 by a yaml reader which parses a pipeline definition document 

706 """ 

707 loaded_steps = loaded_yaml.pop("steps", []) 

708 temp_steps: dict[str, StepIR] = {} 

709 for declaration in loaded_steps: 

710 new_step = StepIR(**declaration) 

711 existing = temp_steps.setdefault(new_step.label, new_step) 

712 if existing is not new_step: 

713 raise ValueError(f"Step {existing.label} was declared twice.") 

714 self.steps = [step for step in temp_steps.values()] 

715 

716 def _verify_labeled_subsets(self) -> None: 

717 """Verify that all the labels in each named subset exist within the 

718 pipeline. 

719 """ 

720 # Verify that all labels defined in a labeled subset are in the 

721 # Pipeline 

722 for labeled_subset in self.labeled_subsets.values(): 

723 if not labeled_subset.subset.issubset(self.tasks.keys()): 

724 raise ValueError( 

725 f"Labels {labeled_subset.subset - self.tasks.keys()} were not found in the " 

726 "declared pipeline" 

727 ) 

728 # Verify subset labels are not already task labels 

729 label_intersection = self.labeled_subsets.keys() & self.tasks.keys() 

730 if label_intersection: 

731 raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}") 

732 

733 def _verify_steps(self) -> None: 

734 """Verify that all step definitions have a corresponding labeled 

735 subset. 

736 """ 

737 for step in self.steps: 

738 if step.label not in self.labeled_subsets: 

739 raise ValueError( 

740 f"{step.label} was declared to be a step, but was not declared to be a labeled subset" 

741 ) 

742 

743 def _read_imports(self, loaded_yaml: dict[str, Any]) -> None: 

744 """Process the inherits portion of the loaded yaml document 

745 

746 Parameters 

747 ---------- 

748 loaded_yaml : `dict` 

749 A dictionary which matches the structure that would be produced by 

750 a yaml reader which parses a pipeline definition document 

751 """ 

752 

753 def process_args(argument: str | dict) -> dict: 

754 if isinstance(argument, str): 

755 return {"location": argument} 

756 elif isinstance(argument, dict): 

757 if "exclude" in argument and isinstance(argument["exclude"], str): 

758 argument["exclude"] = [argument["exclude"]] 

759 if "include" in argument and isinstance(argument["include"], str): 

760 argument["include"] = [argument["include"]] 

761 if "instrument" in argument and argument["instrument"] == "None": 

762 argument["instrument"] = None 

763 if "labeledSubsetModifyMode" in argument: 

764 match argument["labeledSubsetModifyMode"]: 

765 case "DROP": 

766 argument["labeledSubsetModifyMode"] = PipelineSubsetCtrl.DROP 

767 case "EDIT": 

768 argument["labeledSubsetModifyMode"] = PipelineSubsetCtrl.EDIT 

769 case unknown: 

770 raise ValueError(f"{unknown} is not a valid mode for labeledSubsetModifyMode") 

771 return argument 

772 

773 if not {"inherits", "imports"} - loaded_yaml.keys(): 

774 raise ValueError("Cannot define both inherits and imports sections, use imports") 

775 tmp_import = loaded_yaml.pop("inherits", None) 

776 if tmp_import is None: 

777 tmp_import = loaded_yaml.pop("imports", None) 

778 else: 

779 raise ValueError("The 'inherits' key is not supported. Please use the key 'imports' instead") 

780 if tmp_import is None: 

781 self.imports: list[ImportIR] = [] 

782 elif isinstance(tmp_import, list): 

783 self.imports = [ImportIR(**process_args(args)) for args in tmp_import] 

784 else: 

785 self.imports = [ImportIR(**process_args(tmp_import))] 

786 

787 self.merge_pipelines([fragment.toPipelineIR() for fragment in self.imports]) 

788 

789 def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None: 

790 """Merge one or more other `PipelineIR` objects into this object. 

791 

792 Parameters 

793 ---------- 

794 pipelines : `~collections.abc.Iterable` of `PipelineIR` objects 

795 An `~collections.abc.Iterable` that contains one or more 

796 `PipelineIR` objects to merge into this object. 

797 

798 Raises 

799 ------ 

800 ValueError 

801 Raised if there is a conflict in instrument specifications. 

802 Raised if a task label appears in more than one of the input 

803 `PipelineIR` objects which are to be merged. 

804 Raised if a labeled subset appears in more than one of the input 

805 `PipelineIR` objects which are to be merged, and with any subset 

806 existing in this object. 

807 """ 

808 # integrate any imported pipelines 

809 accumulate_tasks: dict[str, TaskIR] = {} 

810 accumulate_labeled_subsets: dict[str, LabeledSubset] = {} 

811 accumulated_parameters = ParametersIR({}) 

812 accumulated_steps: dict[str, StepIR] = {} 

813 

814 for tmp_IR in pipelines: 

815 if self.instrument is None: 

816 self.instrument = tmp_IR.instrument 

817 elif self.instrument != tmp_IR.instrument and tmp_IR.instrument is not None: 

818 msg = ( 

819 "Only one instrument can be declared in a pipeline or its imports. " 

820 f"Top level pipeline defines {self.instrument} but pipeline to merge " 

821 f"defines {tmp_IR.instrument}." 

822 ) 

823 raise ValueError(msg) 

824 if duplicate_labels := accumulate_tasks.keys() & tmp_IR.tasks.keys(): 

825 msg = ( 

826 "Task labels in the imported pipelines must be unique. " 

827 f"These labels appear multiple times: {duplicate_labels}" 

828 ) 

829 raise ValueError(msg) 

830 accumulate_tasks.update(tmp_IR.tasks) 

831 self.contracts.extend(tmp_IR.contracts) 

832 # verify that tmp_IR has unique labels for named subset among 

833 # existing labeled subsets, and with existing task labels. 

834 overlapping_subsets = accumulate_labeled_subsets.keys() & tmp_IR.labeled_subsets.keys() 

835 task_subset_overlap = ( 

836 accumulate_labeled_subsets.keys() | tmp_IR.labeled_subsets.keys() 

837 ) & accumulate_tasks.keys() 

838 if overlapping_subsets or task_subset_overlap: 

839 raise ValueError( 

840 "Labeled subset names must be unique amongst imports in both labels and " 

841 f" named Subsets. Duplicate: {overlapping_subsets | task_subset_overlap}" 

842 ) 

843 accumulate_labeled_subsets.update(tmp_IR.labeled_subsets) 

844 accumulated_parameters.update(tmp_IR.parameters) 

845 for tmp_step in tmp_IR.steps: 

846 existing = accumulated_steps.setdefault(tmp_step.label, tmp_step) 

847 if existing != tmp_step: 

848 raise ValueError( 

849 f"There were conflicting step definitions in import {tmp_step}, {existing}" 

850 ) 

851 

852 for tmp_step in self.steps: 

853 existing = accumulated_steps.setdefault(tmp_step.label, tmp_step) 

854 if existing != tmp_step: 

855 raise ValueError(f"There were conflicting step definitions in import {tmp_step}, {existing}") 

856 

857 # verify that any accumulated labeled subsets dont clash with a label 

858 # from this pipeline 

859 if accumulate_labeled_subsets.keys() & self.tasks.keys(): 

860 raise ValueError( 

861 "Labeled subset names must be unique amongst imports in both labels and named Subsets" 

862 ) 

863 # merge in the named subsets for self so this document can override any 

864 # that have been declared 

865 accumulate_labeled_subsets.update(self.labeled_subsets) 

866 self.labeled_subsets = accumulate_labeled_subsets 

867 

868 # merge the dict of label:TaskIR objects, preserving any configs in the 

869 # imported pipeline if the labels point to the same class 

870 for label, task in self.tasks.items(): 

871 if label not in accumulate_tasks: 

872 accumulate_tasks[label] = task 

873 elif accumulate_tasks[label].klass == task.klass: 

874 if task.config is not None: 

875 for config in task.config: 

876 accumulate_tasks[label].add_or_update_config(config) 

877 else: 

878 accumulate_tasks[label] = task 

879 self.tasks: dict[str, TaskIR] = accumulate_tasks 

880 accumulated_parameters.update(self.parameters) 

881 self.parameters = accumulated_parameters 

882 self.steps = list(accumulated_steps.values()) 

883 

884 def _read_tasks(self, loaded_yaml: dict[str, Any]) -> None: 

885 """Process the tasks portion of the loaded yaml document 

886 

887 Parameters 

888 ---------- 

889 loaded_yaml : `dict` 

890 A dictionary which matches the structure that would be produced by 

891 a yaml reader which parses a pipeline definition document 

892 """ 

893 self.tasks = {} 

894 tmp_tasks = loaded_yaml.pop("tasks", None) 

895 if tmp_tasks is None: 

896 tmp_tasks = {} 

897 

898 if "parameters" in tmp_tasks: 

899 raise ValueError("parameters is a reserved word and cannot be used as a task label") 

900 

901 for label, definition in tmp_tasks.items(): 

902 if isinstance(definition, str): 

903 definition = {"class": definition} 

904 config = definition.get("config", None) 

905 if config is None: 

906 task_config_ir = None 

907 else: 

908 if isinstance(config, dict): 

909 config = [config] 

910 task_config_ir = [] 

911 for c in config: 

912 file = c.pop("file", None) 

913 if file is None: 

914 file = [] 

915 elif not isinstance(file, list): 

916 file = [file] 

917 task_config_ir.append( 

918 ConfigIR( 

919 python=c.pop("python", None), dataId=c.pop("dataId", None), file=file, rest=c 

920 ) 

921 ) 

922 self.tasks[label] = TaskIR(label, definition["class"], task_config_ir) 

923 

924 def _remove_contracts(self, label: str) -> None: 

925 """Remove any contracts that contain the given label 

926 

927 String comparison used in this way is not the most elegant and may 

928 have issues, but it is the only feasible way when users can specify 

929 contracts with generic strings. 

930 """ 

931 new_contracts = [] 

932 for contract in self.contracts: 

933 # match a label that is not preceded by an ASCII identifier, or 

934 # is the start of a line and is followed by a dot 

935 if re.match(f".*([^A-Za-z0-9_]|^){label}[.]", contract.contract): 

936 continue 

937 new_contracts.append(contract) 

938 self.contracts = new_contracts 

939 

940 def subset_from_labels( 

941 self, labelSpecifier: set[str], subsetCtrl: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP 

942 ) -> PipelineIR: 

943 """Subset a pipelineIR to contain only labels specified in 

944 labelSpecifier. 

945 

946 Parameters 

947 ---------- 

948 labelSpecifier : `set` of `str` 

949 Set containing labels that describes how to subset a pipeline. 

950 subsetCtrl : `PipelineSubsetCtrl` 

951 Control object which decides how subsets with missing labels are 

952 handled. Setting to `PipelineSubsetCtrl.DROP` (the default) will 

953 cause any subsets that have labels which are not in the set of all 

954 task labels to be dropped. Setting to `PipelineSubsetCtrl.EDIT` 

955 will cause the subset to instead be edited to remove the 

956 nonexistent label. 

957 

958 Returns 

959 ------- 

960 pipeline : `PipelineIR` 

961 A new pipelineIR object that is a subset of the old pipelineIR. 

962 

963 Raises 

964 ------ 

965 ValueError 

966 Raised if there is an issue with specified labels. 

967 

968 Notes 

969 ----- 

970 This method attempts to prune any contracts that contain labels which 

971 are not in the declared subset of labels. This pruning is done using a 

972 string based matching due to the nature of contracts and may prune more 

973 than it should. 

974 """ 

975 pipeline = copy.deepcopy(self) 

976 

977 # update the label specifier to expand any named subsets 

978 toRemove = set() 

979 toAdd = set() 

980 for label in labelSpecifier: 

981 if label in pipeline.labeled_subsets: 

982 toRemove.add(label) 

983 toAdd.update(pipeline.labeled_subsets[label].subset) 

984 labelSpecifier.difference_update(toRemove) 

985 labelSpecifier.update(toAdd) 

986 # verify all the labels are in the pipeline 

987 if not labelSpecifier.issubset(pipeline.tasks.keys() | pipeline.labeled_subsets): 

988 difference = labelSpecifier.difference(pipeline.tasks.keys()) 

989 raise ValueError( 

990 "Not all supplied labels (specified or named subsets) are in the pipeline " 

991 f"definition, extra labels: {difference}" 

992 ) 

993 # copy needed so as to not modify while iterating 

994 pipeline_labels = set(pipeline.tasks.keys()) 

995 # Remove the labels from the pipelineIR, and any contracts that contain 

996 # those labels (see docstring on _remove_contracts for why this may 

997 # cause issues) 

998 for label in pipeline_labels: 

999 if label not in labelSpecifier: 

1000 pipeline.tasks.pop(label) 

1001 pipeline._remove_contracts(label) 

1002 

1003 # create a copy of the object to iterate over 

1004 labeled_subsets = copy.copy(pipeline.labeled_subsets) 

1005 # remove or edit any labeled subsets that no longer have a complete set 

1006 for label, labeled_subset in labeled_subsets.items(): 

1007 if extraTaskLabels := (labeled_subset.subset - pipeline.tasks.keys()): 

1008 match subsetCtrl: 

1009 case PipelineSubsetCtrl.DROP: 

1010 del pipeline.labeled_subsets[label] 

1011 case PipelineSubsetCtrl.EDIT: 

1012 for extra in extraTaskLabels: 

1013 labeled_subset.subset.discard(extra) 

1014 elif subsetCtrl is PipelineSubsetCtrl.DROP and not labeled_subset.subset: 

1015 # When mode is DROP, also drop any subsets that were already 

1016 # empty. This ensures we drop steps that were emptied-out by 

1017 # (earlier) imports with exclude in EDIT mode. Note that we 

1018 # don't want to drop those steps when they're first excluded 

1019 # down to nothing, because the pipeline might be about to add 

1020 # new tasks back into them, and then we'd want to preserve the 

1021 # step definitions. 

1022 del pipeline.labeled_subsets[label] 

1023 

1024 # remove any steps that correspond to removed subsets 

1025 new_steps = [] 

1026 for step in pipeline.steps: 

1027 if step.label not in pipeline.labeled_subsets: 

1028 continue 

1029 new_steps.append(step) 

1030 pipeline.steps = new_steps 

1031 

1032 return pipeline 

1033 

1034 @classmethod 

1035 def from_string(cls, pipeline_string: str) -> PipelineIR: 

1036 """Create a `PipelineIR` object from a string formatted like a pipeline 

1037 document. 

1038 

1039 Parameters 

1040 ---------- 

1041 pipeline_string : `str` 

1042 A string that is formatted according like a pipeline document. 

1043 """ 

1044 loaded_yaml = yaml.load(pipeline_string, Loader=PipelineYamlLoader) 

1045 return cls(loaded_yaml) 

1046 

1047 @classmethod 

1048 def from_uri(cls, uri: ResourcePathExpression) -> PipelineIR: 

1049 """Create a `PipelineIR` object from the document specified by the 

1050 input uri. 

1051 

1052 Parameters 

1053 ---------- 

1054 uri : convertible to `~lsst.resources.ResourcePath` 

1055 Location of document to use in creating a `PipelineIR` object. 

1056 

1057 Returns 

1058 ------- 

1059 pipelineIR : `PipelineIR` 

1060 The loaded pipeline. 

1061 """ 

1062 loaded_uri = ResourcePath(uri) 

1063 with loaded_uri.open("r") as buffer: 

1064 loaded_yaml = yaml.load(buffer, Loader=PipelineYamlLoader) 

1065 return cls(loaded_yaml) 

1066 

1067 def write_to_uri(self, uri: ResourcePathExpression) -> None: 

1068 """Serialize this `PipelineIR` object into a yaml formatted string and 

1069 write the output to a file at the specified uri. 

1070 

1071 Parameters 

1072 ---------- 

1073 uri : convertible to `~lsst.resources.ResourcePath` 

1074 Location of document to write a `PipelineIR` object. 

1075 """ 

1076 with ResourcePath(uri).open("w") as buffer: 

1077 yaml.dump(self.to_primitives(), buffer, sort_keys=False, Dumper=MultilineStringDumper) 

1078 

1079 def to_primitives(self) -> dict[str, Any]: 

1080 """Convert to a representation used in yaml serialization. 

1081 

1082 Returns 

1083 ------- 

1084 primitives : `dict` 

1085 Dictionary that maps directly to the serialized YAML form. 

1086 """ 

1087 accumulate = {"description": self.description} 

1088 if self.instrument is not None: 

1089 accumulate["instrument"] = self.instrument 

1090 if self.parameters: 

1091 accumulate["parameters"] = self.parameters.to_primitives() 

1092 accumulate["tasks"] = {m: t.to_primitives() for m, t in self.tasks.items()} 

1093 if len(self.contracts) > 0: 

1094 # sort contracts lexicographical order by the contract string in 

1095 # absence of any other ordering principle 

1096 contracts_list = [c.to_primitives() for c in self.contracts] 

1097 contracts_list.sort(key=lambda x: x["contract"]) 

1098 accumulate["contracts"] = contracts_list 

1099 if self.labeled_subsets: 

1100 accumulate["subsets"] = {k: v.to_primitives() for k, v in self.labeled_subsets.items()} 

1101 return accumulate 

1102 

1103 def __str__(self) -> str: 

1104 """Instance formatting as how it would look in yaml representation""" 

1105 return yaml.dump(self.to_primitives(), sort_keys=False, Dumper=MultilineStringDumper) 

1106 

1107 def __repr__(self) -> str: 

1108 """Instance formatting as how it would look in yaml representation""" 

1109 return str(self) 

1110 

1111 def __eq__(self, other: object) -> bool: 

1112 if not isinstance(other, PipelineIR): 

1113 return False 

1114 # special case contracts because it is a list, but order is not 

1115 # important 

1116 return ( 

1117 all( 

1118 getattr(self, attr) == getattr(other, attr) 

1119 for attr in ("tasks", "instrument", "labeled_subsets", "parameters") 

1120 ) 

1121 and len(self.contracts) == len(other.contracts) 

1122 and all(c in self.contracts for c in other.contracts) 

1123 )