Coverage for python / lsst / ctrl / bps / generic_workflow.py: 28%

555 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:52 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Class definitions for a Generic Workflow Graph.""" 

29 

30__all__ = [ 

31 "GenericWorkflow", 

32 "GenericWorkflowExec", 

33 "GenericWorkflowFile", 

34 "GenericWorkflowGroup", 

35 "GenericWorkflowJob", 

36 "GenericWorkflowNode", 

37 "GenericWorkflowNodeType", 

38 "GenericWorkflowNoopJob", 

39] 

40 

41 

42import dataclasses 

43import itertools 

44import logging 

45import pickle 

46from collections import Counter, defaultdict 

47from collections.abc import Iterable, Iterator 

48from enum import IntEnum, auto 

49from typing import IO, Any, BinaryIO, Literal, cast, overload 

50 

51from networkx import DiGraph, topological_sort 

52from networkx.algorithms.dag import is_directed_acyclic_graph 

53 

54from lsst.utils.iteration import ensure_iterable 

55 

56from .bps_draw import draw_networkx_dot 

57from .bps_utils import subset_dimension_values 

58 

59_LOG = logging.getLogger(__name__) 

60 

61 

62@dataclasses.dataclass(slots=True) 

63class GenericWorkflowFile: 

64 """Information about a file that may be needed by various workflow 

65 management services. 

66 """ 

67 

68 name: str 

69 """Lookup key (logical file name) of file/directory. Must be unique 

70 within run. 

71 """ 

72 

73 src_uri: str | None = None # don't know that need ResourcePath 

74 """Original location of file/directory. 

75 """ 

76 

77 wms_transfer: bool = False 

78 """Whether the WMS should ignore file or not. Default is False. 

79 """ 

80 

81 job_access_remote: bool = False 

82 """Whether the job can remotely access file (using separately specified 

83 file access protocols). Default is False. 

84 """ 

85 

86 job_shared: bool = False 

87 """Whether job requires its own copy of this file. Default is False. 

88 """ 

89 

90 def __hash__(self) -> int: 

91 return hash(self.name) 

92 

93 

94@dataclasses.dataclass(slots=True) 

95class GenericWorkflowExec: 

96 """Information about an executable that may be needed by various workflow 

97 management services. 

98 """ 

99 

100 name: str 

101 """Lookup key (logical file name) of executable. Must be unique 

102 within run. 

103 """ 

104 

105 src_uri: str | None = None # don't know that need ResourcePath 

106 """Original location of executable. 

107 """ 

108 

109 transfer_executable: bool = False 

110 """Whether the WMS/plugin is responsible for staging executable to 

111 location usable by job. 

112 """ 

113 

114 def __hash__(self) -> int: 

115 return hash(self.name) 

116 

117 

118class GenericWorkflowNodeType(IntEnum): 

119 """Type of valid types for nodes in the GenericWorkflow.""" 

120 

121 NOOP = auto() 

122 """Does nothing, but enforces special dependencies.""" 

123 

124 PAYLOAD = auto() 

125 """Typical workflow job.""" 

126 

127 GROUP = auto() 

128 """A special group (subdag) of jobs.""" 

129 

130 

131@dataclasses.dataclass(slots=True) 

132class GenericWorkflowNode: 

133 """Base class for nodes in the GenericWorkflow.""" 

134 

135 name: str 

136 """Name of node. Must be unique within workflow.""" 

137 

138 label: str 

139 """"Primary user-facing label for job. Does not need to be unique and 

140 may be used for summary reports or to group nodes.""" 

141 

142 def __hash__(self) -> int: 

143 return hash(self.name) 

144 

145 @property 

146 def node_type(self) -> GenericWorkflowNodeType: 

147 """Type of node.""" 

148 raise NotImplementedError(f"{type(self).__name__} needs to override node_type.") 

149 

150 

151@dataclasses.dataclass(slots=True) 

152class GenericWorkflowNoopJob(GenericWorkflowNode): 

153 """Job that does no work. Used for special dependencies.""" 

154 

155 @property 

156 def node_type(self) -> GenericWorkflowNodeType: 

157 """Indicate this is a noop job.""" 

158 return GenericWorkflowNodeType.NOOP 

159 

160 

161@dataclasses.dataclass(slots=True) 

162class GenericWorkflowJob(GenericWorkflowNode): 

163 """Information about a job that may be needed by various workflow 

164 management services. 

165 """ 

166 

167 quanta_counts: Counter[str] = dataclasses.field(default_factory=Counter) 

168 """Counts of quanta per task label in job. 

169 """ 

170 

171 tags: dict[str, Any] = dataclasses.field(default_factory=dict) 

172 """Other key/value pairs for job that user may want to use as a filter. 

173 """ 

174 

175 executable: GenericWorkflowExec | None = None 

176 """Executable for job. 

177 """ 

178 

179 arguments: str | None = None 

180 """Command line arguments for job. 

181 """ 

182 

183 cmdvals: dict[str, Any] = dataclasses.field(default_factory=dict) 

184 """Values for variables in cmdline when using lazy command line creation. 

185 """ 

186 

187 memory_multiplier: float | None = None 

188 """Memory growth rate between retries. 

189 """ 

190 

191 request_memory: int | None = None # MB 

192 """Max memory (in MB) that the job is expected to need. 

193 """ 

194 

195 request_memory_max: int | None = None # MB 

196 """Max memory (in MB) that the job should ever use. 

197 """ 

198 

199 request_cpus: int | None = None # cores 

200 """Max number of cpus that the job is expected to need. 

201 """ 

202 

203 request_disk: int | None = None # MB 

204 """Max amount of job scratch disk (in MB) that the job is expected to need. 

205 """ 

206 

207 request_walltime: str | None = None # minutes 

208 """Max amount of time (in seconds) that the job is expected to need. 

209 """ 

210 

211 compute_site: str | None = None 

212 """Key to look up site-specific information for running the job. 

213 """ 

214 

215 accounting_group: str | None = None 

216 """Name of the accounting group to use. 

217 """ 

218 

219 accounting_user: str | None = None 

220 """Name of the user to use for accounting purposes. 

221 """ 

222 

223 mail_to: str | None = None 

224 """Comma separated list of email addresses for emailing job status. 

225 """ 

226 

227 when_to_mail: str | None = None 

228 """WMS-specific terminology for when to email job status. 

229 """ 

230 

231 number_of_retries: int | None = None 

232 """Number of times to automatically retry a failed job. 

233 """ 

234 

235 retry_unless_exit: int | list[int] | None = None 

236 """Exit code(s) for job that means to not automatically retry. 

237 """ 

238 

239 abort_on_value: int | None = None 

240 """Job exit value for signals to abort the entire workflow. 

241 """ 

242 

243 abort_return_value: int | None = None 

244 """Exit value to use when aborting the entire workflow. 

245 """ 

246 

247 priority: str | None = None 

248 """Initial priority of job in WMS-format. 

249 """ 

250 

251 category: str | None = None 

252 """WMS-facing label of job within single workflow (e.g., can be used for 

253 throttling jobs within a single workflow). 

254 """ 

255 

256 concurrency_limit: str | None = None 

257 """Names of concurrency limits that the WMS plugin can appropriately 

258 translate to limit the number of this job across all running workflows. 

259 """ 

260 

261 queue: str | None = None 

262 """Name of queue to use. Different WMS can translate this concept 

263 differently. 

264 """ 

265 

266 pre_cmdline: str | None = None 

267 """Command line to be executed prior to executing job. 

268 """ 

269 

270 post_cmdline: str | None = None 

271 """Command line to be executed after job executes. 

272 

273 Should be executed regardless of exit status. 

274 """ 

275 

276 preemptible: bool | None = None 

277 """The flag indicating whether the job can be preempted. 

278 """ 

279 

280 profile: dict[str, Any] = dataclasses.field(default_factory=dict) 

281 """Nested dictionary of WMS-specific key/value pairs with primary key being 

282 WMS key (e.g., pegasus, condor, panda). 

283 """ 

284 

285 attrs: dict[str, Any] = dataclasses.field(default_factory=dict) 

286 """Key/value pairs of job attributes (for WMS that have attributes in 

287 addition to commands). 

288 """ 

289 

290 environment: dict[str, Any] = dataclasses.field(default_factory=dict) 

291 """Environment variable names and values to be explicitly set inside job. 

292 """ 

293 

294 compute_cloud: str | None = None 

295 """Key to look up cloud-specific information for running the job. 

296 """ 

297 

298 @property 

299 def node_type(self) -> GenericWorkflowNodeType: 

300 """Indicate this is a payload job.""" 

301 return GenericWorkflowNodeType.PAYLOAD 

302 

303 

304class GenericWorkflow(DiGraph): 

305 """A generic representation of a workflow used to submit to specific 

306 workflow management systems. 

307 

308 Parameters 

309 ---------- 

310 name : `str` 

311 Name of generic workflow. 

312 incoming_graph_data : `~typing.Any`, optional 

313 Data used to initialized graph that is passed through to DiGraph 

314 constructor. Can be any type supported by networkx.DiGraph. 

315 **attr : `dict` 

316 Keyword arguments passed through to DiGraph constructor. 

317 """ 

318 

319 def __init__(self, name: str, incoming_graph_data: Any | None = None, **attr: Any) -> None: 

320 super().__init__(incoming_graph_data, **attr) 

321 self._name = name 

322 self.run_attrs: dict[str, str] = {} 

323 self._job_labels = GenericWorkflowLabels() 

324 self._files: dict[str, GenericWorkflowFile] = {} 

325 self._executables: dict[str, GenericWorkflowExec] = {} 

326 self._inputs: dict[ 

327 str, list[GenericWorkflowFile] 

328 ] = {} # mapping job.names to list of GenericWorkflowFile 

329 self._outputs: dict[ 

330 str, list[GenericWorkflowFile] 

331 ] = {} # mapping job.names to list of GenericWorkflowFile 

332 self.run_id = None 

333 self._final: GenericWorkflowJob | GenericWorkflow | None = None 

334 

335 # Starting from ver. 3.6 of NetworkX, the DiGraph class defines its custom 

336 # __new__ method that explicitly defines arguments it accepts. As a result, 

337 # we need to override it to let our subclass use different ones. 

338 # 

339 # Notes 

340 # ----- 

341 # Most likely overriding __new__ in this manner will prevent us from using 

342 # different graph backends with our subclass. However, since we are not 

343 # using any backends, this should not be a problem at the moment. 

344 def __new__(cls, *args, **kwargs) -> "GenericWorkflow": 

345 return object.__new__(cls) 

346 

347 @property 

348 def name(self) -> str: 

349 """Retrieve name of generic workflow. 

350 

351 Returns 

352 ------- 

353 name : `str` 

354 Name of generic workflow. 

355 """ 

356 return self._name 

357 

358 @property 

359 def quanta_counts(self) -> Counter[str]: 

360 """Count of quanta per task label (`collections.Counter`).""" 

361 qcounts: Counter[str] = Counter() 

362 for job_name in self: 

363 gwjob = self.get_job(job_name) 

364 if hasattr(gwjob, "quanta_counts"): 

365 qcounts += gwjob.quanta_counts 

366 return qcounts 

367 

368 @property 

369 def labels(self) -> list[str]: 

370 """Job labels (`list` [`str`], read-only).""" 

371 return self._job_labels.labels 

372 

373 def regenerate_labels(self) -> None: 

374 """Regenerate the list of job labels.""" 

375 self._job_labels = GenericWorkflowLabels() 

376 for job_name in self: 

377 job = self.get_job(job_name) 

378 if job.node_type == GenericWorkflowNodeType.PAYLOAD: 

379 job = cast(GenericWorkflowJob, job) 

380 parents_labels: list[str] = [] 

381 for parent_name in self.predecessors(job.name): 

382 parent_job = self.get_job(parent_name) 

383 if parent_job.node_type == GenericWorkflowNodeType.PAYLOAD: 

384 # parent_job = cast(GenericWorkflowJob, parent_job) 

385 parents_labels.append(parent_job.label) 

386 children_labels: list[str] = [] 

387 for child_name in self.successors(job.name): 

388 child_job = self.get_job(child_name) 

389 if child_job.node_type == GenericWorkflowNodeType.PAYLOAD: 

390 # child_job = cast(GenericWorkflowJob, child_job) 

391 children_labels.append(child_job.label) 

392 self._job_labels.add_job(job, parents_labels, children_labels) 

393 

394 @property 

395 def job_counts(self) -> Counter[str]: 

396 """Count of jobs per job label (`collections.Counter`).""" 

397 jcounts = self._job_labels.job_counts 

398 

399 # Final is separate 

400 final = self.get_final() 

401 if final: 

402 if isinstance(final, GenericWorkflow): 

403 jcounts.update(final.job_counts) 

404 else: 

405 jcounts[final.label] += 1 

406 

407 return jcounts 

408 

409 def __iter__(self) -> Iterator[str]: 

410 """Return iterator of job names in topologically sorted order.""" 

411 return topological_sort(self) 

412 

413 @overload 

414 def get_files(self, data: Literal[False], transfer_only: bool = True) -> list[str]: ... 414 ↛ exitline 414 didn't return from function 'get_files' because

415 

416 @overload 

417 def get_files(self, data: Literal[True], transfer_only: bool = True) -> list[GenericWorkflowFile]: ... 417 ↛ exitline 417 didn't return from function 'get_files' because

418 

419 def get_files( 

420 self, data: bool = False, transfer_only: bool = True 

421 ) -> list[GenericWorkflowFile] | list[str]: 

422 """Retrieve files from generic workflow. 

423 

424 Need API in case change way files are stored (e.g., make 

425 workflow a bipartite graph with jobs and files nodes). 

426 

427 Parameters 

428 ---------- 

429 data : `bool`, optional 

430 Whether to return the file data as well as the file object name 

431 (The default is `False`). 

432 transfer_only : `bool`, optional 

433 Whether to only return files for which a workflow management system 

434 would be responsible for transferring. 

435 

436 Returns 

437 ------- 

438 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`] 

439 File names or objects from generic workflow meeting specifications. 

440 """ 

441 files: list[Any] = [] # Any for mypy to allow the different append lines. 

442 for filename, file in self._files.items(): 

443 if not transfer_only or file.wms_transfer: 

444 if not data: 

445 files.append(filename) 

446 else: 

447 files.append(file) 

448 return files 

449 

450 def add_job( 

451 self, 

452 job: GenericWorkflowNode, 

453 parent_names: str | list[str] | None = None, 

454 child_names: str | list[str] | None = None, 

455 ) -> None: 

456 """Add job to generic workflow. 

457 

458 Parameters 

459 ---------- 

460 job : `lsst.ctrl.bps.GenericWorkflowNode` 

461 Job to add to the generic workflow. 

462 parent_names : `str` | `list` [`str`], optional 

463 Names of jobs that are parents of given job. 

464 child_names : `str` | `list` [`str`], optional 

465 Names of jobs that are children of given job. 

466 """ 

467 _LOG.debug("job: %s (%s)", job.name, job.label) 

468 _LOG.debug("parent_names: %s", parent_names) 

469 _LOG.debug("child_names: %s", child_names) 

470 if not isinstance(job, GenericWorkflowNode): 

471 raise RuntimeError(f"Invalid type for job to be added to GenericWorkflowGraph ({type(job)}).") 

472 if self.has_node(job.name): 

473 raise RuntimeError(f"Job {job.name} already exists in GenericWorkflowGraph.") 

474 super().add_node(job.name, job=job) 

475 self.add_job_relationships(parent_names, job.name) 

476 self.add_job_relationships(job.name, child_names) 

477 if job.node_type == GenericWorkflowNodeType.PAYLOAD: 

478 job = cast(GenericWorkflowJob, job) 

479 self.add_executable(job.executable) 

480 self._job_labels.add_job( 

481 job, 

482 [self.get_job(p).label for p in self.predecessors(job.name)], 

483 [self.get_job(p).label for p in self.successors(job.name)], 

484 ) 

485 

486 def add_node(self, node_for_adding: GenericWorkflowNode, **attr: Any) -> None: 

487 """Override networkx function to call more specific add_job function. 

488 

489 Parameters 

490 ---------- 

491 node_for_adding : `lsst.ctrl.bps.GenericWorkflowJob` 

492 Job to be added to generic workflow. 

493 **attr : `~typing.Any` 

494 Needed to match original networkx function, but not used. 

495 """ 

496 self.add_job(node_for_adding) 

497 

498 def add_job_relationships( 

499 self, parents: str | list[str] | None, children: str | list[str] | None 

500 ) -> None: 

501 """Add dependencies between parent and child jobs. All parents will 

502 be connected to all children. 

503 

504 Parameters 

505 ---------- 

506 parents : `str` or `list` [`str`], optional 

507 Parent job names. 

508 children : `str` or `list` [`str`], optional 

509 Children job names. 

510 """ 

511 # Allow this to be a noop if no parents or no children 

512 if parents is not None and children is not None: 

513 self.add_edges_from(itertools.product(ensure_iterable(parents), ensure_iterable(children))) 

514 self._job_labels.add_job_relationships( 

515 [self.get_job(n).label for n in ensure_iterable(parents)], 

516 [self.get_job(n).label for n in ensure_iterable(children)], 

517 ) 

518 

519 def add_edges_from(self, ebunch_to_add: Iterable[tuple[str, str]], **attr: Any) -> None: 

520 """Add several edges between jobs in the generic workflow. 

521 

522 Parameters 

523 ---------- 

524 ebunch_to_add : Iterable [`tuple` [`str`, `str`]] 

525 Iterable of job name pairs between which a dependency should be 

526 saved. 

527 **attr : `~typing.Any` 

528 Data can be assigned using keyword arguments (not currently used). 

529 """ 

530 for edge_to_add in ebunch_to_add: 

531 self.add_edge(edge_to_add[0], edge_to_add[1], **attr) 

532 

533 def add_edge(self, u_of_edge: str, v_of_edge: str, **attr: Any) -> None: 

534 """Add edge connecting jobs in workflow. 

535 

536 Parameters 

537 ---------- 

538 u_of_edge : `str` 

539 Name of parent job. 

540 v_of_edge : `str` 

541 Name of child job. 

542 **attr 

543 Attributes to save with edge. 

544 """ 

545 if u_of_edge not in self: 

546 raise RuntimeError(f"{u_of_edge} not in GenericWorkflow") 

547 if v_of_edge not in self: 

548 raise RuntimeError(f"{v_of_edge} not in GenericWorkflow") 

549 super().add_edge(u_of_edge, v_of_edge, **attr) 

550 

551 def get_job(self, job_name: str) -> GenericWorkflowNode: 

552 """Retrieve job by name from workflow. 

553 

554 Parameters 

555 ---------- 

556 job_name : `str` 

557 Name of job to retrieve. 

558 

559 Returns 

560 ------- 

561 job : `lsst.ctrl.bps.GenericWorkflowNode` 

562 Job matching given job_name. 

563 """ 

564 return self.nodes[job_name]["job"] 

565 

566 def del_job(self, job_name: str) -> None: 

567 """Delete job from generic workflow leaving connected graph. 

568 

569 Parameters 

570 ---------- 

571 job_name : `str` 

572 Name of job to delete from workflow. 

573 """ 

574 job = self.get_job(job_name) 

575 

576 # Remove from job labels 

577 if isinstance(job, GenericWorkflowJob): 

578 self._job_labels.del_job(job) 

579 

580 # Connect all parent jobs to all children jobs. 

581 parents = list(self.predecessors(job_name)) 

582 children = list(self.successors(job_name)) 

583 self.add_job_relationships(parents, children) 

584 

585 # Delete job node (which deletes edges). 

586 self.remove_node(job_name) 

587 

588 def add_job_inputs(self, job_name: str, files: GenericWorkflowFile | list[GenericWorkflowFile]) -> None: 

589 """Add files as inputs to specified job. 

590 

591 Parameters 

592 ---------- 

593 job_name : `str` 

594 Name of job to which inputs should be added. 

595 files : `lsst.ctrl.bps.GenericWorkflowFile` or \ 

596 `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

597 File object(s) to be added as inputs to the specified job. 

598 """ 

599 self._inputs.setdefault(job_name, []) 

600 for file in ensure_iterable(files): 

601 # Save the central copy 

602 if file.name not in self._files: 

603 self._files[file.name] = file 

604 

605 # Save the job reference to the file 

606 self._inputs[job_name].append(file) 

607 

608 def get_file(self, name: str) -> GenericWorkflowFile: 

609 """Retrieve a file object by name. 

610 

611 Parameters 

612 ---------- 

613 name : `str` 

614 Name of file object. 

615 

616 Returns 

617 ------- 

618 gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

619 File matching given name. 

620 """ 

621 return self._files[name] 

622 

623 def add_file(self, gwfile: GenericWorkflowFile) -> None: 

624 """Add file object. 

625 

626 Parameters 

627 ---------- 

628 gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

629 File object to add to workflow. 

630 """ 

631 if gwfile.name not in self._files: 

632 self._files[gwfile.name] = gwfile 

633 else: 

634 _LOG.debug("Skipped add_file for existing file %s", gwfile.name) 

635 

636 @overload 

637 def get_job_inputs( 637 ↛ exitline 637 didn't return from function 'get_job_inputs' because

638 self, job_name: str, data: Literal[False], transfer_only: bool = False 

639 ) -> list[str]: ... 

640 

641 @overload 

642 def get_job_inputs( 642 ↛ exitline 642 didn't return from function 'get_job_inputs' because

643 self, job_name: str, data: Literal[True], transfer_only: bool = False 

644 ) -> list[GenericWorkflowFile]: ... 

645 

646 def get_job_inputs( 

647 self, job_name: str, data: bool = True, transfer_only: bool = False 

648 ) -> list[GenericWorkflowFile] | list[str]: 

649 """Return the input files for the given job. 

650 

651 Parameters 

652 ---------- 

653 job_name : `str` 

654 Name of the job. 

655 data : `bool`, optional 

656 Whether to return the file data as well as the file object name. 

657 transfer_only : `bool`, optional 

658 Whether to only return files for which a workflow management system 

659 would be responsible for transferring. 

660 

661 Returns 

662 ------- 

663 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`] 

664 Input files for the given job. If no input files for the job, 

665 returns an empty list. 

666 """ 

667 inputs: list[Any] = [] # Any for mypy to allow the different append lines. 

668 if job_name in self._inputs: 

669 for gwfile in self._inputs[job_name]: 

670 if not transfer_only or gwfile.wms_transfer: 

671 if not data: 

672 inputs.append(gwfile.name) 

673 else: 

674 inputs.append(gwfile) 

675 return inputs 

676 

677 def add_job_outputs(self, job_name: str, files: list[GenericWorkflowFile]) -> None: 

678 """Add output files to a job. 

679 

680 Parameters 

681 ---------- 

682 job_name : `str` 

683 Name of job to which the files should be added as outputs. 

684 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

685 File objects to be added as outputs for specified job. 

686 """ 

687 self._outputs.setdefault(job_name, []) 

688 

689 for file_ in ensure_iterable(files): 

690 # Save the central copy 

691 if file_.name not in self._files: 

692 self._files[file_.name] = file_ 

693 

694 # Save the job reference to the file 

695 self._outputs[job_name].append(file_) 

696 

697 @overload 

698 def get_job_outputs( 698 ↛ exitline 698 didn't return from function 'get_job_outputs' because

699 self, job_name: str, data: Literal[False], transfer_only: bool = False 

700 ) -> list[str]: ... 

701 

702 @overload 

703 def get_job_outputs( 703 ↛ exitline 703 didn't return from function 'get_job_outputs' because

704 self, job_name: str, data: Literal[True], transfer_only: bool = False 

705 ) -> list[GenericWorkflowFile]: ... 

706 

707 def get_job_outputs( 

708 self, job_name: str, data: bool = True, transfer_only: bool = False 

709 ) -> list[GenericWorkflowFile] | list[str]: 

710 """Return the output files for the given job. 

711 

712 Parameters 

713 ---------- 

714 job_name : `str` 

715 Name of the job. 

716 data : `bool` 

717 Whether to return the file data as well as the file object name. 

718 It defaults to `True` thus returning file data as well. 

719 transfer_only : `bool` 

720 Whether to only return files for which a workflow management system 

721 would be responsible for transferring. It defaults to `False` thus 

722 returning all output files. 

723 

724 Returns 

725 ------- 

726 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or \ 

727 `list` [`str`] 

728 Output files for the given job. If no output files for the job, 

729 returns an empty list. 

730 """ 

731 outputs: list[Any] = [] # Any for mypy to allow the different append lines. 

732 if not data: 

733 outputs = cast(list[str], outputs) 

734 else: 

735 outputs = cast(list[GenericWorkflowFile], outputs) 

736 

737 if job_name in self._outputs: 

738 for gwfile in self._outputs[job_name]: 

739 if not transfer_only or gwfile.wms_transfer: 

740 if not data: 

741 outputs.append(gwfile.name) 

742 else: 

743 outputs.append(gwfile) 

744 return outputs 

745 

746 def draw(self, stream: str | IO[str], format_: str = "dot") -> None: 

747 """Output generic workflow in a visualization format. 

748 

749 Parameters 

750 ---------- 

751 stream : `str` or `io.BufferedIOBase` 

752 Stream to which the visualization should be written. 

753 format_ : `str`, optional 

754 Which visualization format to use. It defaults to the format for 

755 the dot program. 

756 """ 

757 draw_funcs = {"dot": draw_networkx_dot} 

758 if format_ in draw_funcs: 

759 draw_funcs[format_](self, stream) 

760 else: 

761 raise RuntimeError(f"Unknown draw format ({format_})") 

762 

763 def save(self, stream: str | IO[bytes], format_: str = "pickle") -> None: 

764 """Save the generic workflow in a format that is loadable. 

765 

766 Parameters 

767 ---------- 

768 stream : `str` or `io.BufferedIOBase` 

769 Stream to pass to the format-specific writer. Accepts anything 

770 that the writer accepts. 

771 format_ : `str`, optional 

772 Format in which to write the data. It defaults to pickle format. 

773 """ 

774 if format_ == "pickle": 

775 stream = cast(BinaryIO, stream) 

776 pickle.dump(self, stream) 

777 else: 

778 raise RuntimeError(f"Unknown format ({format_})") 

779 

780 @classmethod 

781 def load(cls, stream: str | IO[bytes], format_: str = "pickle") -> "GenericWorkflow": 

782 """Load a GenericWorkflow from the given stream. 

783 

784 Parameters 

785 ---------- 

786 stream : `str` or `io.BufferedIOBase` 

787 Stream to pass to the format-specific loader. Accepts anything that 

788 the loader accepts. 

789 format_ : `str`, optional 

790 Format of data to expect when loading from stream. It defaults 

791 to pickle format. 

792 

793 Returns 

794 ------- 

795 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

796 Generic workflow loaded from the given stream. 

797 """ 

798 if format_ == "pickle": 

799 stream = cast(BinaryIO, stream) 

800 object_ = pickle.load(stream) 

801 assert isinstance(object_, GenericWorkflow) # for mypy 

802 return object_ 

803 

804 raise RuntimeError(f"Unknown format ({format_})") 

805 

806 def validate(self) -> None: 

807 """Run checks to ensure that the generic workflow graph is valid.""" 

808 # Make sure a directed acyclic graph 

809 assert is_directed_acyclic_graph(self) 

810 

811 def add_workflow_source(self, workflow: "GenericWorkflow") -> None: 

812 """Add given workflow as new source to this workflow. 

813 

814 Parameters 

815 ---------- 

816 workflow : `lsst.ctrl.bps.GenericWorkflow` 

817 The given workflow. 

818 """ 

819 # Find source nodes in self. 

820 self_sources = [n for n in self if self.in_degree(n) == 0] 

821 _LOG.debug("self_sources = %s", self_sources) 

822 

823 # Find sink nodes of workflow. 

824 new_sinks = [n for n in workflow if workflow.out_degree(n) == 0] 

825 _LOG.debug("new sinks = %s", new_sinks) 

826 

827 # Add new workflow nodes to self graph and make new edges. 

828 self.add_nodes_from(workflow.nodes(data=True)) 

829 self.add_edges_from(workflow.edges()) 

830 for source in self_sources: 

831 for sink in new_sinks: 

832 self.add_edge(sink, source) 

833 

834 # Add separately stored info 

835 for job_name in workflow: 

836 job = self.get_job(job_name) 

837 # Add job labels 

838 if isinstance(job, GenericWorkflowJob): 

839 self._job_labels.add_job( 

840 job, 

841 [self.get_job(p).label for p in self.predecessors(job.name)], 

842 [self.get_job(p).label for p in self.successors(job.name)], 

843 ) 

844 # Executables are stored separately so copy them. 

845 self.add_executable(job.executable) 

846 

847 # Files are stored separately so copy them. 

848 self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True)) 

849 self.add_job_outputs(job_name, workflow.get_job_outputs(job_name, data=True)) 

850 

851 def add_final(self, final: "GenericWorkflowJob | GenericWorkflow") -> None: 

852 """Add special final job/workflow to the generic workflow. 

853 

854 Parameters 

855 ---------- 

856 final : `lsst.ctrl.bps.GenericWorkflowJob` or \ 

857 `lsst.ctrl.bps.GenericWorkflow` 

858 Information needed to execute the special final job(s), the 

859 job(s) to be executed after all jobs that can be executed 

860 have been executed regardless of exit status of any of the 

861 jobs. 

862 """ 

863 if not isinstance(final, GenericWorkflowJob) and not isinstance(final, GenericWorkflow): 

864 raise TypeError("Invalid type for GenericWorkflow final ({type(final)})") 

865 

866 self._final = final 

867 if isinstance(final, GenericWorkflowJob): 

868 self.add_executable(final.executable) 

869 

870 def get_final(self) -> "GenericWorkflowJob | GenericWorkflow | None": 

871 """Return job/workflow to be executed after all jobs that can be 

872 executed have been executed regardless of exit status of any of 

873 the jobs. 

874 

875 Returns 

876 ------- 

877 final : `lsst.ctrl.bps.GenericWorkflowJob` or \ 

878 `lsst.ctrl.bps.GenericWorkflow` 

879 Information needed to execute final job(s). 

880 """ 

881 return self._final 

882 

883 def add_executable(self, executable: GenericWorkflowExec | None) -> None: 

884 """Add executable to workflow's list of executables. 

885 

886 Parameters 

887 ---------- 

888 executable : `lsst.ctrl.bps.GenericWorkflowExec` 

889 Executable object to be added to workflow. 

890 """ 

891 if executable is not None: 

892 self._executables[executable.name] = executable 

893 else: 

894 _LOG.warning("executable not specified (None); cannot add to the workflow's list of executables") 

895 

896 @overload 

897 def get_executables(self, data: Literal[False], transfer_only: bool = False) -> list[str]: ... 897 ↛ exitline 897 didn't return from function 'get_executables' because

898 

899 @overload 

900 def get_executables( 900 ↛ exitline 900 didn't return from function 'get_executables' because

901 self, data: Literal[True], transfer_only: bool = False 

902 ) -> list[GenericWorkflowExec]: ... 

903 

904 def get_executables( 

905 self, data: bool = False, transfer_only: bool = True 

906 ) -> list[GenericWorkflowExec] | list[str]: 

907 """Retrieve executables from generic workflow. 

908 

909 Parameters 

910 ---------- 

911 data : `bool`, optional 

912 Whether to return the executable data as well as the exec object 

913 name (The defaults is False). 

914 transfer_only : `bool`, optional 

915 Whether to only return executables for which transfer_executable 

916 is True. 

917 

918 Returns 

919 ------- 

920 execs : `list` [`lsst.ctrl.bps.GenericWorkflowExec`] or `list` [`str`] 

921 Filtered executable names or objects from generic workflow. 

922 """ 

923 execs: list[Any] = [] # This and cast lines for mypy 

924 if not data: 

925 execs = cast(list[str], execs) 

926 else: 

927 execs = cast(list[GenericWorkflowExec], execs) 

928 

929 for name, executable in self._executables.items(): 

930 if not transfer_only or executable.transfer_executable: 

931 if not data: 

932 execs.append(name) 

933 else: 

934 execs.append(executable) 

935 return execs 

936 

937 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]: 

938 """Retrieve jobs by label from workflow. 

939 

940 Parameters 

941 ---------- 

942 label : `str` 

943 Label of jobs to retrieve. 

944 

945 Returns 

946 ------- 

947 jobs : list[`lsst.ctrl.bps.GenericWorkflowNode`] 

948 Jobs having given label. 

949 """ 

950 return self._job_labels.get_jobs_by_label(label) 

951 

952 def _check_job_ordering_config(self, ordering_config: dict[str, Any]) -> dict[str, DiGraph]: 

953 """Check configuration related to job ordering. 

954 

955 Parameters 

956 ---------- 

957 ordering_config : `dict` [`str`, `~typing.Any`] 

958 Job ordering configuration to check. 

959 

960 Returns 

961 ------- 

962 group_to_label_subgraph : `dict` [`str`, `network.DiGraph`] 

963 Mapping of group name to a graph of the job labels in the group. 

964 """ 

965 group_to_label_subgraph = {} 

966 job_label_to_group: dict[str, str] = {} # Checking label appears only in one group 

967 for group, group_vals in ordering_config.items(): 

968 implementation = group_vals.get("implementation", "group") 

969 if implementation not in ["noop", "group"]: 

970 raise RuntimeError(f"Invalid implementation for {group}: {implementation}") 

971 ordering_type = group_vals.get("ordering_type", "sort") 

972 if ordering_type != "sort": 

973 raise RuntimeError(f"Invalid ordering_type for {group}: {ordering_type}") 

974 if "dimensions" not in group_vals: 

975 raise KeyError(f"Missing dimensions entry in ordering group {group}") 

976 

977 job_labels = [x.strip() for x in group_vals["labels"].split(",")] 

978 _LOG.debug("group %s: job_labels=%s", group, job_labels) 

979 unused_labels = [] 

980 for job_label in job_labels: 

981 if job_label not in self._job_labels.labels: 

982 unused_labels.append(job_label) 

983 elif job_label in job_label_to_group: 

984 raise RuntimeError( 

985 f"Job label {job_label} appears in more than one job ordering group " 

986 f"({group} {job_label_to_group[job_label]})" 

987 ) 

988 else: 

989 job_label_to_group[job_label] = group 

990 

991 if unused_labels: 

992 _LOG.info("Workflow job labels = %s", ",".join(self._job_labels.labels)) 

993 raise RuntimeError( 

994 f"Job label(s) ({','.join(unused_labels)}) from job ordering group " 

995 f"{group} does not exist in workflow. Aborting." 

996 ) 

997 

998 label_subgraph = self._job_labels.subgraph(job_labels) 

999 group_to_label_subgraph[group] = label_subgraph 

1000 

1001 return group_to_label_subgraph 

1002 

1003 def _group_jobs_by_values( 

1004 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph 

1005 ) -> dict[tuple[Any, ...], list[str]]: 

1006 """Create job mapping of special sortable dimension key 

1007 to job name by comparing dimension values. 

1008 

1009 Parameters 

1010 ---------- 

1011 group : `str` 

1012 Name of group for which creating mapping. 

1013 group_config : `dict` [`str`, `~typing.Any`] 

1014 Config for group for which creating mapping. 

1015 label_subgraph : `networkx.DiGraph` 

1016 The graph of job labels to be used in mapping. 

1017 

1018 Returns 

1019 ------- 

1020 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1021 Mapping of dimensions to job names. 

1022 """ 

1023 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {} 

1024 for job_label in label_subgraph: 

1025 jobs = self.get_jobs_by_label(job_label) 

1026 for job in jobs: 

1027 job_dim_values = subset_dimension_values( 

1028 f"Job {job.name}", 

1029 f"order group {group}", 

1030 group_config["dimensions"], 

1031 job.tags, 

1032 group_config.get("equalDimensions", None), 

1033 ) 

1034 job_dims = [] 

1035 for dim in [d.strip() for d in group_config["dimensions"].split(",")]: 

1036 job_dims.append(job_dim_values[dim]) 

1037 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), []) 

1038 dims_job_list.append(job.name) 

1039 return dims_to_jobs 

1040 

1041 def _group_jobs_by_dependencies( 

1042 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph 

1043 ) -> dict[tuple[Any, ...], list[str]]: 

1044 """Create job mapping of special sortable dimension key 

1045 to job name by following dependencies. 

1046 

1047 Parameters 

1048 ---------- 

1049 group : `str` 

1050 Name of group for which creating mapping. 

1051 group_config : `dict` [`str`, `~typing.Any`] 

1052 Config for group for which creating mapping. 

1053 label_subgraph : `networkx.DiGraph` 

1054 The graph of job labels to be used in mapping. 

1055 

1056 Returns 

1057 ------- 

1058 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1059 Mapping of dimensions to job names. 

1060 """ 

1061 method = group_config["findDependencyMethod"] 

1062 dim_labels = list(topological_sort(label_subgraph)) 

1063 match method: 

1064 case "source": 

1065 find_potential_jobs = self.successors 

1066 case "sink": 

1067 find_potential_jobs = self.predecessors 

1068 dim_labels.reverse() 

1069 case _: 

1070 raise RuntimeError(f"Invalid findDependencyMethod ({method})") 

1071 

1072 jobs_seen: set[str] = set() 

1073 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {} 

1074 for label in dim_labels: 

1075 jobs = self.get_jobs_by_label(label) 

1076 for job in jobs: 

1077 if job.name not in jobs_seen: 

1078 jobs_seen.add(job.name) 

1079 job_dim_values = subset_dimension_values( 

1080 f"Job {job.name}", 

1081 f"order group {group}", 

1082 group_config["dimensions"], 

1083 job.tags, 

1084 group_config.get("equalDimensions", None), 

1085 ) 

1086 job_dims = [] 

1087 for dim in [d.strip() for d in group_config["dimensions"].split(",")]: 

1088 job_dims.append(job_dim_values[dim]) 

1089 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), []) 

1090 dims_job_list.append(job.name) 

1091 # Use dependencies to find other quantum to add 

1092 # Note: in testing, using the following code was faster 

1093 # than using networkx descendants and ancestors functions 

1094 # While traversing the subgraph, nodes may appear 

1095 # repeatedly in potential_jobs. 

1096 jobs_to_use = [job] 

1097 while jobs_to_use: 

1098 job_to_use = jobs_to_use.pop() 

1099 

1100 potential_job_names = find_potential_jobs(job_to_use.name) 

1101 for potential_job_name in potential_job_names: 

1102 potential_job = cast(GenericWorkflowJob, self.get_job(potential_job_name)) 

1103 if potential_job.label in label_subgraph: 

1104 if potential_job.name not in dims_job_list: 

1105 _LOG.debug( 

1106 "Adding potential job %s (%s) to group %s", 

1107 potential_job.name, 

1108 potential_job.label, 

1109 group, 

1110 ) 

1111 dims_job_list.append(potential_job.name) 

1112 jobs_to_use.append(potential_job) 

1113 jobs_seen.add(potential_job.name) 

1114 else: 

1115 _LOG.debug( 

1116 "label (%s) not in ordered_tasks. Not adding potential quantum %s", 

1117 potential_job.label, 

1118 potential_job.name, 

1119 ) 

1120 return dims_to_jobs 

1121 

1122 def _update_by_group_sort( 

1123 self, 

1124 group: str, 

1125 dims_to_jobs: dict[tuple[Any, ...], list[str]], 

1126 blocking: bool = False, 

1127 ) -> None: 

1128 """Update portion of workflow for special job ordering using sort. 

1129 

1130 Parameters 

1131 ---------- 

1132 group : `str` 

1133 Ordering group label used for job name and messages. 

1134 dims_to_jobs: `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1135 Mapping of special dimension keys to workflow job names. 

1136 The sort for the special ordering is over the keys. 

1137 blocking: `bool` 

1138 Whether a failure in a group blocks execution of remaining groups. 

1139 """ 

1140 group_job_names: list[str] = [] 

1141 for dim_key, job_list in sorted(dims_to_jobs.items()): 

1142 _LOG.debug("group %s: dim_key=%s", group, dim_key) 

1143 group_job_name = f"group_{group}_{'=='.join([str(dk) for dk in dim_key])}" 

1144 prev_name = group_job_names[-1] if group_job_names else None 

1145 group_job_names.append(group_job_name) 

1146 

1147 self._replace_subgraph_with_job_group(group_job_name, group, job_list, blocking) 

1148 

1149 # ordering between groups 

1150 if prev_name: 

1151 self.add_edge(prev_name, group_job_name) 

1152 

1153 def _replace_subgraph_with_job_group( 

1154 self, group_name: str, group_label: str, job_list: list[str], blocking: bool 

1155 ) -> None: 

1156 """Update portion of workflow for special job ordering using groups. 

1157 

1158 Parameters 

1159 ---------- 

1160 group_name : `str` 

1161 Ordering group name. 

1162 group_label : `str` 

1163 Ordering group label. 

1164 job_list: `list` [`str`] 

1165 List of job names to put in the group 

1166 blocking: `bool` 

1167 Whether a failure in a group blocks execution of remaining groups. 

1168 """ 

1169 job_group = GenericWorkflowGroup(group_name, group_label, blocking=blocking) 

1170 self.add_node(job_group) 

1171 

1172 # Add jobs, files, and executables first 

1173 # then add edges later to avoid order issues 

1174 for job_name in job_list: 

1175 job = cast(GenericWorkflowJob, self.get_job(job_name)) 

1176 job_group.add_job(job) 

1177 files = self.get_job_inputs(job_name, data=True) 

1178 job_group.add_job_inputs(job_name, files) 

1179 files = self.get_job_outputs(job_name, data=True) 

1180 job_group.add_job_outputs(job_name, files) 

1181 job_group.add_executable(job.executable) 

1182 

1183 # Can't remove edges while looping through edge view, 

1184 # so save to remove after loops 

1185 edges_to_remove: list[tuple[str, str]] = [] 

1186 for job_name in job_list: 

1187 in_edges = self.in_edges(job_name) 

1188 for u, v in in_edges: 

1189 if u in job_list: 

1190 job_group.add_edge(u, v) 

1191 else: 

1192 self.add_edge(u, group_name) 

1193 edges_to_remove.append((u, v)) 

1194 

1195 out_edges = self.out_edges(job_name) 

1196 for u, v in out_edges: 

1197 if v in job_list: 

1198 job_group.add_edge(u, v) 

1199 else: 

1200 self.add_edge(group_name, v) 

1201 edges_to_remove.append((u, v)) 

1202 

1203 # Remove edges collected earlier 

1204 self.remove_edges_from(edges_to_remove) 

1205 

1206 # Remove nodes from main GenericWorkflow 

1207 self.remove_nodes_from(job_list) 

1208 

1209 def add_special_job_ordering(self, ordering: dict[str, Any]) -> None: 

1210 """Add special nodes and dependencies to enforce given ordering. 

1211 

1212 Parameters 

1213 ---------- 

1214 ordering : `dict` [`str`, `~typing.Any`] 

1215 Description of the job ordering to enforce. 

1216 """ 

1217 group_to_label_subgraph = self._check_job_ordering_config(ordering) 

1218 

1219 for group, group_vals in ordering.items(): 

1220 if "findDependencyMethod" in group_vals: 

1221 job_grouping_func = self._group_jobs_by_dependencies 

1222 else: 

1223 job_grouping_func = self._group_jobs_by_values 

1224 

1225 dims = [x.strip() for x in group_vals["dimensions"].split(",")] 

1226 _LOG.debug("group %s: dims=%s", group, dims) 

1227 job_groups = job_grouping_func(group, group_vals, group_to_label_subgraph[group]) 

1228 

1229 # Update the workflow 

1230 implementation = group_vals.get("implementation", "group") 

1231 ordering_type = group_vals.get("ordering_type", "sort") 

1232 match (implementation, ordering_type): 

1233 case ("noop", "sort"): 

1234 self._update_by_noop_sort(group, job_groups) 

1235 case ("group", "sort"): 

1236 blocking = group_vals.get("blocking", False) 

1237 self._update_by_group_sort(group, job_groups, blocking) 

1238 case _: 

1239 raise RuntimeError( 

1240 f"Invalid implementation, ordering_type pair for group ({implementation}," 

1241 f" {ordering_type})" 

1242 ) 

1243 

1244 def _update_by_noop_sort(self, group: str, dims_to_jobs: dict[tuple[Any, ...], list[str]]) -> None: 

1245 """Update portion of workflow for special ordering of jobs using sort. 

1246 

1247 Parameters 

1248 ---------- 

1249 group : `str` 

1250 Ordering group label used for job name and messages. 

1251 dims_to_jobs: `dict` [`tuple`[`str`,...], `list` [`str`]] 

1252 Mapping of special dimension keys to workflow job names. 

1253 The sort for the special ordering is over the keys. 

1254 """ 

1255 noop_names: list[str] = [] 

1256 prev_name: str | None = None 

1257 for dim_key, job_list in sorted(dims_to_jobs.items()): 

1258 _LOG.debug("group %s: dim_key=%s", group, dim_key) 

1259 noop_name = f"noop_{group}_{'=='.join([str(dk) for dk in dim_key])}" 

1260 if noop_names: 

1261 prev_name = noop_names[-1] 

1262 noop_names.append(noop_name) 

1263 

1264 self._update_single_noop(job_list, noop_name, group, prev_name) 

1265 

1266 # As implemented, loop adds one last NOOP job with 

1267 # 0 children. Remove it as not useful. 

1268 assert self.out_degree(noop_names[-1]) == 0 

1269 self.remove_node(noop_names[-1]) 

1270 

1271 def _update_single_noop( 

1272 self, job_list: list[str], order_node_name: str, order_label: str, prev_order_name: str | None 

1273 ) -> None: 

1274 """Update the workflow around the single job making special NOOP 

1275 jobs when necessary. 

1276 

1277 Parameters 

1278 ---------- 

1279 job_list : `list` [`str`] 

1280 Current jobs involved in special ordering 

1281 order_node_name : `str` 

1282 Name for the order NOOP job. If it does not exist in workflow, a 

1283 new NOOP job with this name will be created and added. 

1284 order_label : `str` 

1285 Label for the order NOOP job. 

1286 prev_order_name: `str` or None 

1287 Name of the previous order NOOP job used to add edge as predecessor 

1288 of current job. 

1289 """ 

1290 if order_node_name not in self: 

1291 _LOG.debug("Adding new ordering node %s", order_node_name) 

1292 order_node = GenericWorkflowNoopJob(order_node_name, order_label) 

1293 self.add_job(order_node) 

1294 

1295 subgraph = DiGraph(self).subgraph(job_list) 

1296 sinks = [n for n in job_list if subgraph.out_degree(n) == 0] 

1297 for job_name in sinks: 

1298 self.add_edge(job_name, order_node_name) 

1299 

1300 if prev_order_name: 

1301 sources = [n for n in job_list if subgraph.in_degree(n) == 0] 

1302 for job_name in sources: 

1303 _LOG.debug("Adding edge %s to %s", prev_order_name, job_name) 

1304 self.add_edge(prev_order_name, job_name) 

1305 

1306 

1307@dataclasses.dataclass(slots=True) 

1308class GenericWorkflowGroup(GenericWorkflowNode, GenericWorkflow): 

1309 """Node representing a group of jobs. Used for special dependencies. 

1310 

1311 Parameters 

1312 ---------- 

1313 name : `str` 

1314 Name of node. Must be unique within workflow. 

1315 label : `str` 

1316 Primary user-facing label for job. Does not need to be unique and 

1317 may be used for summary reports or to group nodes. 

1318 blocking : `bool` 

1319 Whether a failure inside group prunes executions of remaining groups. 

1320 """ 

1321 

1322 blocking: bool = False 

1323 """Whether a failure inside group prunes executions of remaining groups.""" 

1324 

1325 @property 

1326 def node_type(self) -> GenericWorkflowNodeType: 

1327 """Indicate this is a group job.""" 

1328 return GenericWorkflowNodeType.GROUP 

1329 

1330 def __init__(self, name: str, label: str, blocking: bool = False) -> None: 

1331 """Initialize each parent class.""" 

1332 _LOG.debug("%s %s %s", name, label, blocking) 

1333 GenericWorkflowNode.__init__(self, name, label) 

1334 GenericWorkflow.__init__(self, name) 

1335 self.blocking = blocking 

1336 

1337 

1338class GenericWorkflowLabels: 

1339 """Label-oriented representation of the GenericWorkflowJobs.""" 

1340 

1341 def __init__(self) -> None: 

1342 self._label_graph = DiGraph() # Dependency graph of job labels 

1343 self._label_to_jobs: defaultdict[str, list[GenericWorkflowJob]] = defaultdict( 

1344 list 

1345 ) # mapping job label to list of GenericWorkflowJob 

1346 

1347 @property 

1348 def labels(self) -> list[str]: 

1349 """List of job labels (`list` [`str`], read-only).""" 

1350 return list(topological_sort(self._label_graph)) 

1351 

1352 @property 

1353 def job_counts(self) -> Counter[str]: 

1354 """Count of jobs per job label (`collections.Counter`).""" 

1355 return Counter({label: len(self._label_to_jobs[label]) for label in self.labels}) 

1356 

1357 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]: 

1358 """Retrieve jobs by label from workflow. 

1359 

1360 Parameters 

1361 ---------- 

1362 label : `str` 

1363 Label of jobs to retrieve. 

1364 

1365 Returns 

1366 ------- 

1367 jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`] 

1368 Jobs having given label. 

1369 """ 

1370 return self._label_to_jobs[label] 

1371 

1372 def add_job(self, job: GenericWorkflowJob, parent_labels: list[str], child_labels: list[str]) -> None: 

1373 """Add job's label to labels. 

1374 

1375 Parameters 

1376 ---------- 

1377 job : `lsst.ctrl.bps.GenericWorkflowJob` 

1378 The job to add to the job labels. 

1379 parent_labels : `list` [`str`] 

1380 Parent job labels. 

1381 child_labels : `list` [`str`] 

1382 Children job labels. 

1383 """ 

1384 _LOG.debug("job: %s (%s)", job.name, job.label) 

1385 _LOG.debug("parent_labels: %s", parent_labels) 

1386 _LOG.debug("child_labels: %s", child_labels) 

1387 self._label_to_jobs[job.label].append(job) 

1388 self._label_graph.add_node(job.label) 

1389 for parent in parent_labels: 

1390 self._label_graph.add_edge(parent, job.label) 

1391 for child in child_labels: 

1392 self._label_graph.add_edge(job.label, child) 

1393 

1394 def add_job_relationships(self, parent_labels: list[str], children_labels: list[str]) -> None: 

1395 """Add dependencies between parent and child job labels. 

1396 All parents will be connected to all children. 

1397 

1398 Parameters 

1399 ---------- 

1400 parent_labels : `list` [`str`] 

1401 Parent job labels. 

1402 children_labels : `list` [`str`] 

1403 Children job labels. 

1404 """ 

1405 # Since labels, must ensure not adding edge from label to itself. 

1406 edges = [ 

1407 e 

1408 for e in itertools.product(ensure_iterable(parent_labels), ensure_iterable(children_labels)) 

1409 if e[0] != e[1] 

1410 ] 

1411 

1412 self._label_graph.add_edges_from(edges) 

1413 

1414 def del_job(self, job: GenericWorkflowJob) -> None: 

1415 """Delete job and its label from job labels. 

1416 

1417 Parameters 

1418 ---------- 

1419 job : `lsst.ctrl.bps.GenericWorkflowJob` 

1420 The job to delete from the job labels. 

1421 """ 

1422 self._label_to_jobs[job.label].remove(job) 

1423 # Don't leave keys around if removed last job 

1424 if not self._label_to_jobs[job.label]: 

1425 del self._label_to_jobs[job.label] 

1426 

1427 parents = self._label_graph.predecessors(job.label) 

1428 children = self._label_graph.successors(job.label) 

1429 self._label_graph.remove_node(job.label) 

1430 self._label_graph.add_edges_from( 

1431 itertools.product(ensure_iterable(parents), ensure_iterable(children)) 

1432 ) 

1433 

1434 def subgraph(self, labels: Iterable[str]) -> DiGraph: 

1435 """Create subgraph of workflow label graph with given labels. 

1436 

1437 Parameters 

1438 ---------- 

1439 labels : Iterable [`str`] 

1440 Labels to appear in subgraph. 

1441 

1442 Returns 

1443 ------- 

1444 subgraph : `networkx.DiGraph` 

1445 Subgraph of workflow label graph with given labels. 

1446 """ 

1447 return self._label_graph.subgraph(labels)