Coverage for python / lsst / ctrl / bps / generic_workflow.py: 28%

555 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:49 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Class definitions for a Generic Workflow Graph.""" 

29 

30__all__ = [ 

31 "GenericWorkflow", 

32 "GenericWorkflowExec", 

33 "GenericWorkflowFile", 

34 "GenericWorkflowGroup", 

35 "GenericWorkflowJob", 

36 "GenericWorkflowNode", 

37 "GenericWorkflowNodeType", 

38 "GenericWorkflowNoopJob", 

39] 

40 

41 

42import dataclasses 

43import itertools 

44import logging 

45import pickle 

46from collections import Counter, defaultdict 

47from collections.abc import Iterable, Iterator 

48from enum import IntEnum, auto 

49from typing import IO, Any, BinaryIO, Literal, cast, overload 

50 

51from networkx import DiGraph, topological_sort 

52from networkx.algorithms.dag import is_directed_acyclic_graph 

53 

54from lsst.utils.iteration import ensure_iterable 

55 

56from .bps_draw import draw_networkx_dot 

57from .bps_utils import subset_dimension_values 

58 

59_LOG = logging.getLogger(__name__) 

60 

61 

62@dataclasses.dataclass(slots=True) 

63class GenericWorkflowFile: 

64 """Information about a file that may be needed by various workflow 

65 management services. 

66 """ 

67 

68 name: str 

69 """Lookup key (logical file name) of file/directory. Must be unique 

70 within run. 

71 """ 

72 

73 src_uri: str | None = None # don't know that need ResourcePath 

74 """Original location of file/directory. 

75 """ 

76 

77 wms_transfer: bool = False 

78 """Whether the WMS should ignore file or not. Default is False. 

79 """ 

80 

81 job_access_remote: bool = False 

82 """Whether the job can remotely access file (using separately specified 

83 file access protocols). Default is False. 

84 """ 

85 

86 job_shared: bool = False 

87 """Whether job requires its own copy of this file. Default is False. 

88 """ 

89 

90 def __hash__(self) -> int: 

91 return hash(self.name) 

92 

93 

94@dataclasses.dataclass(slots=True) 

95class GenericWorkflowExec: 

96 """Information about an executable that may be needed by various workflow 

97 management services. 

98 """ 

99 

100 name: str 

101 """Lookup key (logical file name) of executable. Must be unique 

102 within run. 

103 """ 

104 

105 src_uri: str | None = None # don't know that need ResourcePath 

106 """Original location of executable. 

107 """ 

108 

109 transfer_executable: bool = False 

110 """Whether the WMS/plugin is responsible for staging executable to 

111 location usable by job. 

112 """ 

113 

114 def __hash__(self) -> int: 

115 return hash(self.name) 

116 

117 

118class GenericWorkflowNodeType(IntEnum): 

119 """Type of valid types for nodes in the GenericWorkflow.""" 

120 

121 NOOP = auto() 

122 """Does nothing, but enforces special dependencies.""" 

123 

124 PAYLOAD = auto() 

125 """Typical workflow job.""" 

126 

127 GROUP = auto() 

128 """A special group (subdag) of jobs.""" 

129 

130 

131@dataclasses.dataclass(slots=True) 

132class GenericWorkflowNode: 

133 """Base class for nodes in the GenericWorkflow.""" 

134 

135 name: str 

136 """Name of node. Must be unique within workflow.""" 

137 

138 label: str 

139 """"Primary user-facing label for job. Does not need to be unique and 

140 may be used for summary reports or to group nodes.""" 

141 

142 def __hash__(self) -> int: 

143 return hash(self.name) 

144 

145 @property 

146 def node_type(self) -> GenericWorkflowNodeType: 

147 """Type of node.""" 

148 raise NotImplementedError(f"{type(self).__name__} needs to override node_type.") 

149 

150 

151@dataclasses.dataclass(slots=True) 

152class GenericWorkflowNoopJob(GenericWorkflowNode): 

153 """Job that does no work. Used for special dependencies.""" 

154 

155 @property 

156 def node_type(self) -> GenericWorkflowNodeType: 

157 """Indicate this is a noop job.""" 

158 return GenericWorkflowNodeType.NOOP 

159 

160 

161@dataclasses.dataclass(slots=True) 

162class GenericWorkflowJob(GenericWorkflowNode): 

163 """Information about a job that may be needed by various workflow 

164 management services. 

165 """ 

166 

167 quanta_counts: Counter[str] = dataclasses.field(default_factory=Counter) 

168 """Counts of quanta per task label in job. 

169 """ 

170 

171 tags: dict[str, Any] = dataclasses.field(default_factory=dict) 

172 """Other key/value pairs for job that user may want to use as a filter. 

173 """ 

174 

175 executable: GenericWorkflowExec | None = None 

176 """Executable for job. 

177 """ 

178 

179 arguments: str | None = None 

180 """Command line arguments for job. 

181 """ 

182 

183 cmdvals: dict[str, Any] = dataclasses.field(default_factory=dict) 

184 """Values for variables in cmdline when using lazy command line creation. 

185 """ 

186 

187 memory_multiplier: float | None = None 

188 """Memory growth rate between retries. 

189 """ 

190 

191 request_memory: int | None = None # MB 

192 """Max memory (in MB) that the job is expected to need. 

193 """ 

194 

195 request_memory_max: int | None = None # MB 

196 """Max memory (in MB) that the job should ever use. 

197 """ 

198 

199 request_cpus: int | None = None # cores 

200 """Max number of cpus that the job is expected to need. 

201 """ 

202 

203 request_disk: int | None = None # MB 

204 """Max amount of job scratch disk (in MB) that the job is expected to need. 

205 """ 

206 

207 request_walltime: str | None = None # minutes 

208 """Max amount of time (in seconds) that the job is expected to need. 

209 """ 

210 

211 compute_site: str | None = None 

212 """Key to look up site-specific information for running the job. 

213 """ 

214 

215 accounting_group: str | None = None 

216 """Name of the accounting group to use. 

217 """ 

218 

219 accounting_user: str | None = None 

220 """Name of the user to use for accounting purposes. 

221 """ 

222 

223 mail_to: str | None = None 

224 """Comma separated list of email addresses for emailing job status. 

225 """ 

226 

227 when_to_mail: str | None = None 

228 """WMS-specific terminology for when to email job status. 

229 """ 

230 

231 number_of_retries: int | None = None 

232 """Number of times to automatically retry a failed job. 

233 """ 

234 

235 retry_unless_exit: int | list[int] | None = None 

236 """Exit code(s) for job that means to not automatically retry. 

237 """ 

238 

239 abort_on_value: int | None = None 

240 """Job exit value for signals to abort the entire workflow. 

241 """ 

242 

243 abort_return_value: int | None = None 

244 """Exit value to use when aborting the entire workflow. 

245 """ 

246 

247 priority: str | None = None 

248 """Initial priority of job in WMS-format. 

249 """ 

250 

251 category: str | None = None 

252 """WMS-facing label of job within single workflow (e.g., can be used for 

253 throttling jobs within a single workflow). 

254 """ 

255 

256 concurrency_limit: str | None = None 

257 """Names of concurrency limits that the WMS plugin can appropriately 

258 translate to limit the number of this job across all running workflows. 

259 """ 

260 

261 queue: str | None = None 

262 """Name of queue to use. Different WMS can translate this concept 

263 differently. 

264 """ 

265 

266 pre_cmdline: str | None = None 

267 """Command line to be executed prior to executing job. 

268 """ 

269 

270 post_cmdline: str | None = None 

271 """Command line to be executed after job executes. 

272 

273 Should be executed regardless of exit status. 

274 """ 

275 

276 preemptible: bool | None = None 

277 """The flag indicating whether the job can be preempted. 

278 """ 

279 

280 profile: dict[str, Any] = dataclasses.field(default_factory=dict) 

281 """Nested dictionary of WMS-specific key/value pairs with primary key being 

282 WMS key (e.g., pegasus, condor, panda). 

283 """ 

284 

285 attrs: dict[str, Any] = dataclasses.field(default_factory=dict) 

286 """Key/value pairs of job attributes (for WMS that have attributes in 

287 addition to commands). 

288 """ 

289 

290 environment: dict[str, Any] = dataclasses.field(default_factory=dict) 

291 """Environment variable names and values to be explicitly set inside job. 

292 """ 

293 

294 compute_cloud: str | None = None 

295 """Key to look up cloud-specific information for running the job. 

296 """ 

297 

298 @property 

299 def node_type(self) -> GenericWorkflowNodeType: 

300 """Indicate this is a payload job.""" 

301 return GenericWorkflowNodeType.PAYLOAD 

302 

303 

304class GenericWorkflow(DiGraph): 

305 """A generic representation of a workflow used to submit to specific 

306 workflow management systems. 

307 

308 Parameters 

309 ---------- 

310 name : `str` 

311 Name of generic workflow. 

312 incoming_graph_data : `~typing.Any`, optional 

313 Data used to initialized graph that is passed through to DiGraph 

314 constructor. Can be any type supported by networkx.DiGraph. 

315 **attr : `dict` 

316 Keyword arguments passed through to DiGraph constructor. 

317 """ 

318 

319 def __init__(self, name: str, incoming_graph_data: Any | None = None, **attr: Any) -> None: 

320 super().__init__(incoming_graph_data, **attr) 

321 self._name = name 

322 self.run_attrs: dict[str, str] = {} 

323 self._job_labels = GenericWorkflowLabels() 

324 self._files: dict[str, GenericWorkflowFile] = {} 

325 self._executables: dict[str, GenericWorkflowExec] = {} 

326 self._inputs: dict[ 

327 str, list[GenericWorkflowFile] 

328 ] = {} # mapping job.names to list of GenericWorkflowFile 

329 self._outputs: dict[ 

330 str, list[GenericWorkflowFile] 

331 ] = {} # mapping job.names to list of GenericWorkflowFile 

332 self.run_id = None 

333 self._final: GenericWorkflowJob | GenericWorkflow | None = None 

334 

335 # Starting from ver. 3.6 of NetworkX, the DiGraph class defines its custom 

336 # __new__ method that explicitly defines arguments it accepts. As a result, 

337 # we need to override it to let our subclass use different ones. 

338 # 

339 # Notes 

340 # ----- 

341 # Most likely overriding __new__ in this manner will prevent us from using 

342 # different graph backends with our subclass. However, since we are not 

343 # using any backends, this should not be a problem at the moment. 

344 def __new__(cls, *args, **kwargs) -> "GenericWorkflow": 

345 return object.__new__(cls) 

346 

347 @property 

348 def name(self) -> str: 

349 """Retrieve name of generic workflow. 

350 

351 Returns 

352 ------- 

353 name : `str` 

354 Name of generic workflow. 

355 """ 

356 return self._name 

357 

358 @property 

359 def quanta_counts(self) -> Counter[str]: 

360 """Count of quanta per task label (`collections.Counter`).""" 

361 qcounts: Counter[str] = Counter() 

362 for job_name in self: 

363 gwjob = self.get_job(job_name) 

364 if hasattr(gwjob, "quanta_counts"): 

365 qcounts += gwjob.quanta_counts 

366 return qcounts 

367 

368 @property 

369 def labels(self) -> list[str]: 

370 """Job labels (`list` [`str`], read-only).""" 

371 return self._job_labels.labels 

372 

373 def regenerate_labels(self) -> None: 

374 """Regenerate the list of job labels.""" 

375 self._job_labels = GenericWorkflowLabels() 

376 for job_name in self: 

377 job = self.get_job(job_name) 

378 if job.node_type == GenericWorkflowNodeType.PAYLOAD: 

379 job = cast(GenericWorkflowJob, job) 

380 parents_labels: list[str] = [] 

381 for parent_name in self.predecessors(job.name): 

382 parent_job = self.get_job(parent_name) 

383 if parent_job.node_type == GenericWorkflowNodeType.PAYLOAD: 

384 # parent_job = cast(GenericWorkflowJob, parent_job) 

385 parents_labels.append(parent_job.label) 

386 children_labels: list[str] = [] 

387 for child_name in self.successors(job.name): 

388 child_job = self.get_job(child_name) 

389 if child_job.node_type == GenericWorkflowNodeType.PAYLOAD: 

390 # child_job = cast(GenericWorkflowJob, child_job) 

391 children_labels.append(child_job.label) 

392 self._job_labels.add_job(job, parents_labels, children_labels) 

393 

394 @property 

395 def job_counts(self) -> Counter[str]: 

396 """Count of jobs per job label (`collections.Counter`).""" 

397 jcounts = self._job_labels.job_counts 

398 

399 # Final is separate 

400 final = self.get_final() 

401 if final: 

402 if isinstance(final, GenericWorkflow): 

403 jcounts.update(final.job_counts) 

404 else: 

405 jcounts[final.label] += 1 

406 

407 return jcounts 

408 

409 def __iter__(self) -> Iterator[str]: 

410 """Return iterator of job names in topologically sorted order.""" 

411 return topological_sort(self) 

412 

413 @overload 

414 def get_files(self, data: Literal[False], transfer_only: bool = True) -> list[str]: ... 414 ↛ exitline 414 didn't return from function 'get_files' because

415 

416 @overload 

417 def get_files(self, data: Literal[True], transfer_only: bool = True) -> list[GenericWorkflowFile]: ... 417 ↛ exitline 417 didn't return from function 'get_files' because

418 

419 def get_files( 

420 self, data: bool = False, transfer_only: bool = True 

421 ) -> list[GenericWorkflowFile] | list[str]: 

422 """Retrieve files from generic workflow. 

423 

424 Need API in case change way files are stored (e.g., make 

425 workflow a bipartite graph with jobs and files nodes). 

426 

427 Parameters 

428 ---------- 

429 data : `bool`, optional 

430 Whether to return the file data as well as the file object name 

431 (The default is `False`). 

432 transfer_only : `bool`, optional 

433 Whether to only return files for which a workflow management system 

434 would be responsible for transferring. 

435 

436 Returns 

437 ------- 

438 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`] 

439 File names or objects from generic workflow meeting specifications. 

440 """ 

441 files: list[Any] = [] # Any for mypy to allow the different append lines. 

442 for filename, file in self._files.items(): 

443 if not transfer_only or file.wms_transfer: 

444 if not data: 

445 files.append(filename) 

446 else: 

447 files.append(file) 

448 return files 

449 

450 def add_job( 

451 self, 

452 job: GenericWorkflowNode, 

453 parent_names: str | list[str] | None = None, 

454 child_names: str | list[str] | None = None, 

455 ) -> None: 

456 """Add job to generic workflow. 

457 

458 Parameters 

459 ---------- 

460 job : `lsst.ctrl.bps.GenericWorkflowNode` 

461 Job to add to the generic workflow. 

462 parent_names : `str` | `list` [`str`], optional 

463 Names of jobs that are parents of given job. 

464 child_names : `str` | `list` [`str`], optional 

465 Names of jobs that are children of given job. 

466 """ 

467 _LOG.debug("job: %s (%s)", job.name, job.label) 

468 _LOG.debug("parent_names: %s", parent_names) 

469 _LOG.debug("child_names: %s", child_names) 

470 if not isinstance(job, GenericWorkflowNode): 

471 raise RuntimeError(f"Invalid type for job to be added to GenericWorkflowGraph ({type(job)}).") 

472 if self.has_node(job.name): 

473 raise RuntimeError(f"Job {job.name} already exists in GenericWorkflowGraph.") 

474 super().add_node(job.name, job=job) 

475 self.add_job_relationships(parent_names, job.name) 

476 self.add_job_relationships(job.name, child_names) 

477 if job.node_type == GenericWorkflowNodeType.PAYLOAD: 

478 job = cast(GenericWorkflowJob, job) 

479 self.add_executable(job.executable) 

480 self._job_labels.add_job( 

481 job, 

482 [self.get_job(p).label for p in self.predecessors(job.name)], 

483 [self.get_job(p).label for p in self.successors(job.name)], 

484 ) 

485 

486 def add_node(self, node_for_adding: GenericWorkflowNode, **attr: Any) -> None: 

487 """Override networkx function to call more specific add_job function. 

488 

489 Parameters 

490 ---------- 

491 node_for_adding : `lsst.ctrl.bps.GenericWorkflowJob` 

492 Job to be added to generic workflow. 

493 **attr : `~typing.Any` 

494 Needed to match original networkx function, but not used. 

495 """ 

496 self.add_job(node_for_adding) 

497 

498 def add_job_relationships( 

499 self, parents: str | list[str] | None, children: str | list[str] | None 

500 ) -> None: 

501 """Add dependencies between parent and child jobs. All parents will 

502 be connected to all children. 

503 

504 Parameters 

505 ---------- 

506 parents : `str` or `list` [`str`], optional 

507 Parent job names. 

508 children : `str` or `list` [`str`], optional 

509 Children job names. 

510 """ 

511 # Allow this to be a noop if no parents or no children 

512 if parents is not None and children is not None: 

513 self.add_edges_from(itertools.product(ensure_iterable(parents), ensure_iterable(children))) 

514 self._job_labels.add_job_relationships( 

515 [self.get_job(n).label for n in ensure_iterable(parents)], 

516 [self.get_job(n).label for n in ensure_iterable(children)], 

517 ) 

518 

519 def add_edges_from(self, ebunch_to_add: Iterable[tuple[str, str]], **attr: Any) -> None: 

520 """Add several edges between jobs in the generic workflow. 

521 

522 Parameters 

523 ---------- 

524 ebunch_to_add : Iterable [`tuple` [`str`, `str`]] 

525 Iterable of job name pairs between which a dependency should be 

526 saved. 

527 **attr : `~typing.Any` 

528 Data can be assigned using keyword arguments (not currently used). 

529 """ 

530 for edge_to_add in ebunch_to_add: 

531 self.add_edge(edge_to_add[0], edge_to_add[1], **attr) 

532 

533 def add_edge(self, u_of_edge: str, v_of_edge: str, **attr: Any) -> None: 

534 """Add edge connecting jobs in workflow. 

535 

536 Parameters 

537 ---------- 

538 u_of_edge : `str` 

539 Name of parent job. 

540 v_of_edge : `str` 

541 Name of child job. 

542 **attr 

543 Attributes to save with edge. 

544 """ 

545 if u_of_edge not in self: 

546 raise RuntimeError(f"{u_of_edge} not in GenericWorkflow") 

547 if v_of_edge not in self: 

548 raise RuntimeError(f"{v_of_edge} not in GenericWorkflow") 

549 super().add_edge(u_of_edge, v_of_edge, **attr) 

550 

551 def get_job(self, job_name: str) -> GenericWorkflowNode: 

552 """Retrieve job by name from workflow. 

553 

554 Parameters 

555 ---------- 

556 job_name : `str` 

557 Name of job to retrieve. 

558 

559 Returns 

560 ------- 

561 job : `lsst.ctrl.bps.GenericWorkflowNode` 

562 Job matching given job_name. 

563 """ 

564 return self.nodes[job_name]["job"] 

565 

566 def del_job(self, job_name: str) -> None: 

567 """Delete job from generic workflow leaving connected graph. 

568 

569 Parameters 

570 ---------- 

571 job_name : `str` 

572 Name of job to delete from workflow. 

573 """ 

574 job = self.get_job(job_name) 

575 

576 # Remove from job labels 

577 if isinstance(job, GenericWorkflowJob): 

578 self._job_labels.del_job(job) 

579 

580 # Connect all parent jobs to all children jobs. 

581 parents = list(self.predecessors(job_name)) 

582 children = list(self.successors(job_name)) 

583 self.add_job_relationships(parents, children) 

584 

585 # Delete job node (which deletes edges). 

586 self.remove_node(job_name) 

587 

588 def add_job_inputs(self, job_name: str, files: GenericWorkflowFile | list[GenericWorkflowFile]) -> None: 

589 """Add files as inputs to specified job. 

590 

591 Parameters 

592 ---------- 

593 job_name : `str` 

594 Name of job to which inputs should be added. 

595 files : `lsst.ctrl.bps.GenericWorkflowFile` or \ 

596 `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

597 File object(s) to be added as inputs to the specified job. 

598 """ 

599 self._inputs.setdefault(job_name, []) 

600 for file in ensure_iterable(files): 

601 # Save the central copy 

602 if file.name not in self._files: 

603 self._files[file.name] = file 

604 

605 # Save the job reference to the file 

606 self._inputs[job_name].append(file) 

607 

608 def get_file(self, name: str) -> GenericWorkflowFile: 

609 """Retrieve a file object by name. 

610 

611 Parameters 

612 ---------- 

613 name : `str` 

614 Name of file object. 

615 

616 Returns 

617 ------- 

618 gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

619 File matching given name. 

620 """ 

621 return self._files[name] 

622 

623 def add_file(self, gwfile: GenericWorkflowFile) -> None: 

624 """Add file object. 

625 

626 Parameters 

627 ---------- 

628 gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

629 File object to add to workflow. 

630 """ 

631 if gwfile.name not in self._files: 

632 self._files[gwfile.name] = gwfile 

633 else: 

634 _LOG.debug("Skipped add_file for existing file %s", gwfile.name) 

635 

636 @overload 

637 def get_job_inputs( 637 ↛ exitline 637 didn't return from function 'get_job_inputs' because

638 self, job_name: str, data: Literal[False], transfer_only: bool = False 

639 ) -> list[str]: ... 

640 

641 @overload 

642 def get_job_inputs( 642 ↛ exitline 642 didn't return from function 'get_job_inputs' because

643 self, job_name: str, data: Literal[True], transfer_only: bool = False 

644 ) -> list[GenericWorkflowFile]: ... 

645 

646 def get_job_inputs( 

647 self, job_name: str, data: bool = True, transfer_only: bool = False 

648 ) -> list[GenericWorkflowFile] | list[str]: 

649 """Return the input files for the given job. 

650 

651 Parameters 

652 ---------- 

653 job_name : `str` 

654 Name of the job. 

655 data : `bool`, optional 

656 Whether to return the file data as well as the file object name. 

657 transfer_only : `bool`, optional 

658 Whether to only return files for which a workflow management system 

659 would be responsible for transferring. 

660 

661 Returns 

662 ------- 

663 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`] 

664 Input files for the given job. If no input files for the job, 

665 returns an empty list. 

666 """ 

667 inputs: list[Any] = [] # Any for mypy to allow the different append lines. 

668 if job_name in self._inputs: 

669 for gwfile in self._inputs[job_name]: 

670 if not transfer_only or gwfile.wms_transfer: 

671 if not data: 

672 inputs.append(gwfile.name) 

673 else: 

674 inputs.append(gwfile) 

675 return inputs 

676 

677 def add_job_outputs(self, job_name: str, files: list[GenericWorkflowFile]) -> None: 

678 """Add output files to a job. 

679 

680 Parameters 

681 ---------- 

682 job_name : `str` 

683 Name of job to which the files should be added as outputs. 

684 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

685 File objects to be added as outputs for specified job. 

686 """ 

687 self._outputs.setdefault(job_name, []) 

688 

689 for file_ in ensure_iterable(files): 

690 # Save the central copy 

691 if file_.name not in self._files: 

692 self._files[file_.name] = file_ 

693 

694 # Save the job reference to the file 

695 self._outputs[job_name].append(file_) 

696 

697 @overload 

698 def get_job_outputs( 698 ↛ exitline 698 didn't return from function 'get_job_outputs' because

699 self, job_name: str, data: Literal[False], transfer_only: bool = False 

700 ) -> list[str]: ... 

701 

702 @overload 

703 def get_job_outputs( 703 ↛ exitline 703 didn't return from function 'get_job_outputs' because

704 self, job_name: str, data: Literal[True], transfer_only: bool = False 

705 ) -> list[GenericWorkflowFile]: ... 

706 

707 def get_job_outputs( 

708 self, job_name: str, data: bool = True, transfer_only: bool = False 

709 ) -> list[GenericWorkflowFile] | list[str]: 

710 """Return the output files for the given job. 

711 

712 Parameters 

713 ---------- 

714 job_name : `str` 

715 Name of the job. 

716 data : `bool` 

717 Whether to return the file data as well as the file object name. 

718 It defaults to `True` thus returning file data as well. 

719 transfer_only : `bool` 

720 Whether to only return files for which a workflow management system 

721 would be responsible for transferring. It defaults to `False` thus 

722 returning all output files. 

723 

724 Returns 

725 ------- 

726 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or \ 

727 `list` [`str`] 

728 Output files for the given job. If no output files for the job, 

729 returns an empty list. 

730 """ 

731 outputs: list[Any] = [] # Any for mypy to allow the different append lines. 

732 if not data: 

733 outputs = cast(list[str], outputs) 

734 else: 

735 outputs = cast(list[GenericWorkflowFile], outputs) 

736 

737 if job_name in self._outputs: 

738 for gwfile in self._outputs[job_name]: 

739 if not transfer_only or gwfile.wms_transfer: 

740 if not data: 

741 outputs.append(gwfile.name) 

742 else: 

743 outputs.append(gwfile) 

744 return outputs 

745 

746 def draw(self, stream: str | IO[str], format_: str = "dot") -> None: 

747 """Output generic workflow in a visualization format. 

748 

749 Parameters 

750 ---------- 

751 stream : `str` or `io.BufferedIOBase` 

752 Stream to which the visualization should be written. 

753 format_ : `str`, optional 

754 Which visualization format to use. It defaults to the format for 

755 the dot program. 

756 """ 

757 draw_funcs = {"dot": draw_networkx_dot} 

758 if format_ in draw_funcs: 

759 draw_funcs[format_](self, stream) 

760 else: 

761 raise RuntimeError(f"Unknown draw format ({format_})") 

762 

763 def save(self, stream: str | IO[bytes], format_: str = "pickle") -> None: 

764 """Save the generic workflow in a format that is loadable. 

765 

766 Parameters 

767 ---------- 

768 stream : `str` or `io.BufferedIOBase` 

769 Stream to pass to the format-specific writer. Accepts anything 

770 that the writer accepts. 

771 format_ : `str`, optional 

772 Format in which to write the data. It defaults to pickle format. 

773 """ 

774 if format_ == "pickle": 

775 stream = cast(BinaryIO, stream) 

776 pickle.dump(self, stream) 

777 else: 

778 raise RuntimeError(f"Unknown format ({format_})") 

779 

780 @classmethod 

781 def load(cls, stream: str | IO[bytes], format_: str = "pickle") -> "GenericWorkflow": 

782 """Load a GenericWorkflow from the given stream. 

783 

784 Parameters 

785 ---------- 

786 stream : `str` or `io.BufferedIOBase` 

787 Stream to pass to the format-specific loader. Accepts anything that 

788 the loader accepts. 

789 format_ : `str`, optional 

790 Format of data to expect when loading from stream. It defaults 

791 to pickle format. 

792 

793 Returns 

794 ------- 

795 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

796 Generic workflow loaded from the given stream. 

797 """ 

798 if format_ == "pickle": 

799 stream = cast(BinaryIO, stream) 

800 object_ = pickle.load(stream) 

801 assert isinstance(object_, GenericWorkflow) # for mypy 

802 return object_ 

803 

804 raise RuntimeError(f"Unknown format ({format_})") 

805 

806 def validate(self) -> None: 

807 """Run checks to ensure that the generic workflow graph is valid.""" 

808 # Make sure a directed acyclic graph 

809 assert is_directed_acyclic_graph(self) 

810 

811 def add_workflow_source(self, workflow: "GenericWorkflow") -> None: 

812 """Add given workflow as new source to this workflow. 

813 

814 Parameters 

815 ---------- 

816 workflow : `lsst.ctrl.bps.GenericWorkflow` 

817 The given workflow. 

818 """ 

819 # Find source nodes in self. 

820 self_sources = [n for n in self if self.in_degree(n) == 0] 

821 _LOG.debug("self_sources = %s", self_sources) 

822 

823 # Find sink nodes of workflow. 

824 new_sinks = [n for n in workflow if workflow.out_degree(n) == 0] 

825 _LOG.debug("new sinks = %s", new_sinks) 

826 

827 # Add new workflow nodes to self graph and make new edges. 

828 self.add_nodes_from(workflow.nodes(data=True)) 

829 self.add_edges_from(workflow.edges()) 

830 for source in self_sources: 

831 for sink in new_sinks: 

832 self.add_edge(sink, source) 

833 

834 # Add separately stored info 

835 for job_name in workflow: 

836 job = self.get_job(job_name) 

837 # Add job labels 

838 if isinstance(job, GenericWorkflowJob): 

839 self._job_labels.add_job( 

840 job, 

841 [self.get_job(p).label for p in self.predecessors(job.name)], 

842 [self.get_job(p).label for p in self.successors(job.name)], 

843 ) 

844 # Executables are stored separately so copy them. 

845 self.add_executable(job.executable) 

846 

847 # Files are stored separately so copy them. 

848 self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True)) 

849 self.add_job_outputs(job_name, workflow.get_job_outputs(job_name, data=True)) 

850 

851 def add_final(self, final: "GenericWorkflowJob | GenericWorkflow") -> None: 

852 """Add special final job/workflow to the generic workflow. 

853 

854 Parameters 

855 ---------- 

856 final : `lsst.ctrl.bps.GenericWorkflowJob` or \ 

857 `lsst.ctrl.bps.GenericWorkflow` 

858 Information needed to execute the special final job(s), the 

859 job(s) to be executed after all jobs that can be executed 

860 have been executed regardless of exit status of any of the 

861 jobs. 

862 """ 

863 if not isinstance(final, GenericWorkflowJob) and not isinstance(final, GenericWorkflow): 

864 raise TypeError("Invalid type for GenericWorkflow final ({type(final)})") 

865 

866 self._final = final 

867 if isinstance(final, GenericWorkflowJob): 

868 self.add_executable(final.executable) 

869 

870 def get_final(self) -> "GenericWorkflowJob | GenericWorkflow | None": 

871 """Return job/workflow to be executed after all jobs that can be 

872 executed have been executed regardless of exit status of any of 

873 the jobs. 

874 

875 Returns 

876 ------- 

877 final : `lsst.ctrl.bps.GenericWorkflowJob` or \ 

878 `lsst.ctrl.bps.GenericWorkflow` 

879 Information needed to execute final job(s). 

880 """ 

881 return self._final 

882 

883 def add_executable(self, executable: GenericWorkflowExec | None) -> None: 

884 """Add executable to workflow's list of executables. 

885 

886 Parameters 

887 ---------- 

888 executable : `lsst.ctrl.bps.GenericWorkflowExec` 

889 Executable object to be added to workflow. 

890 """ 

891 if executable is not None: 

892 self._executables[executable.name] = executable 

893 else: 

894 _LOG.warning("executable not specified (None); cannot add to the workflow's list of executables") 

895 

896 @overload 

897 def get_executables(self, data: Literal[False], transfer_only: bool = False) -> list[str]: ... 897 ↛ exitline 897 didn't return from function 'get_executables' because

898 

899 @overload 

900 def get_executables( 900 ↛ exitline 900 didn't return from function 'get_executables' because

901 self, data: Literal[True], transfer_only: bool = False 

902 ) -> list[GenericWorkflowExec]: ... 

903 

904 def get_executables( 

905 self, data: bool = False, transfer_only: bool = True 

906 ) -> list[GenericWorkflowExec] | list[str]: 

907 """Retrieve executables from generic workflow. 

908 

909 Parameters 

910 ---------- 

911 data : `bool`, optional 

912 Whether to return the executable data as well as the exec object 

913 name (The defaults is False). 

914 transfer_only : `bool`, optional 

915 Whether to only return executables for which transfer_executable 

916 is True. 

917 

918 Returns 

919 ------- 

920 execs : `list` [`lsst.ctrl.bps.GenericWorkflowExec`] or `list` [`str`] 

921 Filtered executable names or objects from generic workflow. 

922 """ 

923 execs: list[Any] = [] # This and cast lines for mypy 

924 if not data: 

925 execs = cast(list[str], execs) 

926 else: 

927 execs = cast(list[GenericWorkflowExec], execs) 

928 

929 for name, executable in self._executables.items(): 

930 if not transfer_only or executable.transfer_executable: 

931 if not data: 

932 execs.append(name) 

933 else: 

934 execs.append(executable) 

935 return execs 

936 

937 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]: 

938 """Retrieve jobs by label from workflow. 

939 

940 Parameters 

941 ---------- 

942 label : `str` 

943 Label of jobs to retrieve. 

944 

945 Returns 

946 ------- 

947 jobs : list[`lsst.ctrl.bps.GenericWorkflowNode`] 

948 Jobs having given label. 

949 """ 

950 return self._job_labels.get_jobs_by_label(label) 

951 

952 def _check_job_ordering_config(self, ordering_config: dict[str, Any]) -> dict[str, DiGraph]: 

953 """Check configuration related to job ordering. 

954 

955 Parameters 

956 ---------- 

957 ordering_config : `dict` [`str`, `~typing.Any`] 

958 Job ordering configuration to check. 

959 

960 Returns 

961 ------- 

962 group_to_label_subgraph : `dict` [`str`, `network.DiGraph`] 

963 Mapping of group name to a graph of the job labels in the group. 

964 """ 

965 group_to_label_subgraph = {} 

966 job_label_to_group: dict[str, str] = {} # Checking label appears only in one group 

967 for group, group_vals in ordering_config.items(): 

968 implementation = group_vals.get("implementation", "group") 

969 if implementation not in ["noop", "group"]: 

970 raise RuntimeError(f"Invalid implementation for {group}: {implementation}") 

971 ordering_type = group_vals.get("ordering_type", "sort") 

972 if ordering_type != "sort": 

973 raise RuntimeError(f"Invalid ordering_type for {group}: {ordering_type}") 

974 if "dimensions" not in group_vals: 

975 raise KeyError(f"Missing dimensions entry in ordering group {group}") 

976 

977 job_labels = [x.strip() for x in group_vals["labels"].split(",")] 

978 _LOG.debug("group %s: job_labels=%s", group, job_labels) 

979 unused_labels = [] 

980 for job_label in job_labels: 

981 if job_label not in self._job_labels.labels: 

982 unused_labels.append(job_label) 

983 elif job_label in job_label_to_group: 

984 raise RuntimeError( 

985 f"Job label {job_label} appears in more than one job ordering group " 

986 f"({group} {job_label_to_group[job_label]})" 

987 ) 

988 else: 

989 job_label_to_group[job_label] = group 

990 

991 if unused_labels: 

992 _LOG.info("Workflow job labels = %s", ",".join(self._job_labels.labels)) 

993 _LOG.warning( 

994 "Job label(s) (%s) from job ordering group %s does not exist in workflow.", 

995 ",".join(unused_labels), 

996 group, 

997 ) 

998 

999 label_subgraph = self._job_labels.subgraph(job_labels) 

1000 group_to_label_subgraph[group] = label_subgraph 

1001 

1002 return group_to_label_subgraph 

1003 

1004 def _group_jobs_by_values( 

1005 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph 

1006 ) -> dict[tuple[Any, ...], list[str]]: 

1007 """Create job mapping of special sortable dimension key 

1008 to job name by comparing dimension values. 

1009 

1010 Parameters 

1011 ---------- 

1012 group : `str` 

1013 Name of group for which creating mapping. 

1014 group_config : `dict` [`str`, `~typing.Any`] 

1015 Config for group for which creating mapping. 

1016 label_subgraph : `networkx.DiGraph` 

1017 The graph of job labels to be used in mapping. 

1018 

1019 Returns 

1020 ------- 

1021 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1022 Mapping of dimensions to job names. 

1023 """ 

1024 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {} 

1025 for job_label in label_subgraph: 

1026 jobs = self.get_jobs_by_label(job_label) 

1027 for job in jobs: 

1028 job_dim_values = subset_dimension_values( 

1029 f"Job {job.name}", 

1030 f"order group {group}", 

1031 group_config["dimensions"], 

1032 job.tags, 

1033 group_config.get("equalDimensions", None), 

1034 ) 

1035 job_dims = [] 

1036 for dim in [d.strip() for d in group_config["dimensions"].split(",")]: 

1037 job_dims.append(job_dim_values[dim]) 

1038 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), []) 

1039 dims_job_list.append(job.name) 

1040 return dims_to_jobs 

1041 

1042 def _group_jobs_by_dependencies( 

1043 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph 

1044 ) -> dict[tuple[Any, ...], list[str]]: 

1045 """Create job mapping of special sortable dimension key 

1046 to job name by following dependencies. 

1047 

1048 Parameters 

1049 ---------- 

1050 group : `str` 

1051 Name of group for which creating mapping. 

1052 group_config : `dict` [`str`, `~typing.Any`] 

1053 Config for group for which creating mapping. 

1054 label_subgraph : `networkx.DiGraph` 

1055 The graph of job labels to be used in mapping. 

1056 

1057 Returns 

1058 ------- 

1059 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1060 Mapping of dimensions to job names. 

1061 """ 

1062 method = group_config["findDependencyMethod"] 

1063 dim_labels = list(topological_sort(label_subgraph)) 

1064 match method: 

1065 case "source": 

1066 find_potential_jobs = self.successors 

1067 case "sink": 

1068 find_potential_jobs = self.predecessors 

1069 dim_labels.reverse() 

1070 case _: 

1071 raise RuntimeError(f"Invalid findDependencyMethod ({method})") 

1072 

1073 jobs_seen: set[str] = set() 

1074 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {} 

1075 for label in dim_labels: 

1076 jobs = self.get_jobs_by_label(label) 

1077 for job in jobs: 

1078 if job.name not in jobs_seen: 

1079 jobs_seen.add(job.name) 

1080 job_dim_values = subset_dimension_values( 

1081 f"Job {job.name}", 

1082 f"order group {group}", 

1083 group_config["dimensions"], 

1084 job.tags, 

1085 group_config.get("equalDimensions", None), 

1086 ) 

1087 job_dims = [] 

1088 for dim in [d.strip() for d in group_config["dimensions"].split(",")]: 

1089 job_dims.append(job_dim_values[dim]) 

1090 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), []) 

1091 dims_job_list.append(job.name) 

1092 # Use dependencies to find other quantum to add 

1093 # Note: in testing, using the following code was faster 

1094 # than using networkx descendants and ancestors functions 

1095 # While traversing the subgraph, nodes may appear 

1096 # repeatedly in potential_jobs. 

1097 jobs_to_use = [job] 

1098 while jobs_to_use: 

1099 job_to_use = jobs_to_use.pop() 

1100 

1101 potential_job_names = find_potential_jobs(job_to_use.name) 

1102 for potential_job_name in potential_job_names: 

1103 potential_job = cast(GenericWorkflowJob, self.get_job(potential_job_name)) 

1104 if potential_job.label in label_subgraph: 

1105 if potential_job.name not in dims_job_list: 

1106 _LOG.debug( 

1107 "Adding potential job %s (%s) to group %s", 

1108 potential_job.name, 

1109 potential_job.label, 

1110 group, 

1111 ) 

1112 dims_job_list.append(potential_job.name) 

1113 jobs_to_use.append(potential_job) 

1114 jobs_seen.add(potential_job.name) 

1115 else: 

1116 _LOG.debug( 

1117 "label (%s) not in ordered_tasks. Not adding potential quantum %s", 

1118 potential_job.label, 

1119 potential_job.name, 

1120 ) 

1121 return dims_to_jobs 

1122 

1123 def _update_by_group_sort( 

1124 self, 

1125 group: str, 

1126 dims_to_jobs: dict[tuple[Any, ...], list[str]], 

1127 blocking: bool = False, 

1128 ) -> None: 

1129 """Update portion of workflow for special job ordering using sort. 

1130 

1131 Parameters 

1132 ---------- 

1133 group : `str` 

1134 Ordering group label used for job name and messages. 

1135 dims_to_jobs: `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]] 

1136 Mapping of special dimension keys to workflow job names. 

1137 The sort for the special ordering is over the keys. 

1138 blocking: `bool` 

1139 Whether a failure in a group blocks execution of remaining groups. 

1140 """ 

1141 group_job_names: list[str] = [] 

1142 for dim_key, job_list in sorted(dims_to_jobs.items()): 

1143 _LOG.debug("group %s: dim_key=%s", group, dim_key) 

1144 group_job_name = f"group_{group}_{'=='.join([str(dk) for dk in dim_key])}" 

1145 prev_name = group_job_names[-1] if group_job_names else None 

1146 group_job_names.append(group_job_name) 

1147 

1148 self._replace_subgraph_with_job_group(group_job_name, group, job_list, blocking) 

1149 

1150 # ordering between groups 

1151 if prev_name: 

1152 self.add_edge(prev_name, group_job_name) 

1153 

1154 def _replace_subgraph_with_job_group( 

1155 self, group_name: str, group_label: str, job_list: list[str], blocking: bool 

1156 ) -> None: 

1157 """Update portion of workflow for special job ordering using groups. 

1158 

1159 Parameters 

1160 ---------- 

1161 group_name : `str` 

1162 Ordering group name. 

1163 group_label : `str` 

1164 Ordering group label. 

1165 job_list: `list` [`str`] 

1166 List of job names to put in the group 

1167 blocking: `bool` 

1168 Whether a failure in a group blocks execution of remaining groups. 

1169 """ 

1170 job_group = GenericWorkflowGroup(group_name, group_label, blocking=blocking) 

1171 self.add_node(job_group) 

1172 

1173 # Add jobs, files, and executables first 

1174 # then add edges later to avoid order issues 

1175 for job_name in job_list: 

1176 job = cast(GenericWorkflowJob, self.get_job(job_name)) 

1177 job_group.add_job(job) 

1178 files = self.get_job_inputs(job_name, data=True) 

1179 job_group.add_job_inputs(job_name, files) 

1180 files = self.get_job_outputs(job_name, data=True) 

1181 job_group.add_job_outputs(job_name, files) 

1182 job_group.add_executable(job.executable) 

1183 

1184 # Can't remove edges while looping through edge view, 

1185 # so save to remove after loops 

1186 edges_to_remove: list[tuple[str, str]] = [] 

1187 for job_name in job_list: 

1188 in_edges = self.in_edges(job_name) 

1189 for u, v in in_edges: 

1190 if u in job_list: 

1191 job_group.add_edge(u, v) 

1192 else: 

1193 self.add_edge(u, group_name) 

1194 edges_to_remove.append((u, v)) 

1195 

1196 out_edges = self.out_edges(job_name) 

1197 for u, v in out_edges: 

1198 if v in job_list: 

1199 job_group.add_edge(u, v) 

1200 else: 

1201 self.add_edge(group_name, v) 

1202 edges_to_remove.append((u, v)) 

1203 

1204 # Remove edges collected earlier 

1205 self.remove_edges_from(edges_to_remove) 

1206 

1207 # Remove nodes from main GenericWorkflow 

1208 self.remove_nodes_from(job_list) 

1209 

1210 def add_special_job_ordering(self, ordering: dict[str, Any]) -> None: 

1211 """Add special nodes and dependencies to enforce given ordering. 

1212 

1213 Parameters 

1214 ---------- 

1215 ordering : `dict` [`str`, `~typing.Any`] 

1216 Description of the job ordering to enforce. 

1217 """ 

1218 group_to_label_subgraph = self._check_job_ordering_config(ordering) 

1219 

1220 for group, group_vals in ordering.items(): 

1221 if "findDependencyMethod" in group_vals: 

1222 job_grouping_func = self._group_jobs_by_dependencies 

1223 else: 

1224 job_grouping_func = self._group_jobs_by_values 

1225 

1226 dims = [x.strip() for x in group_vals["dimensions"].split(",")] 

1227 _LOG.debug("group %s: dims=%s", group, dims) 

1228 job_groups = job_grouping_func(group, group_vals, group_to_label_subgraph[group]) 

1229 

1230 # Update the workflow 

1231 implementation = group_vals.get("implementation", "group") 

1232 ordering_type = group_vals.get("ordering_type", "sort") 

1233 match (implementation, ordering_type): 

1234 case ("noop", "sort"): 

1235 self._update_by_noop_sort(group, job_groups) 

1236 case ("group", "sort"): 

1237 blocking = group_vals.get("blocking", False) 

1238 self._update_by_group_sort(group, job_groups, blocking) 

1239 case _: 

1240 raise RuntimeError( 

1241 f"Invalid implementation, ordering_type pair for group ({implementation}," 

1242 f" {ordering_type})" 

1243 ) 

1244 

1245 def _update_by_noop_sort(self, group: str, dims_to_jobs: dict[tuple[Any, ...], list[str]]) -> None: 

1246 """Update portion of workflow for special ordering of jobs using sort. 

1247 

1248 Parameters 

1249 ---------- 

1250 group : `str` 

1251 Ordering group label used for job name and messages. 

1252 dims_to_jobs: `dict` [`tuple`[`str`,...], `list` [`str`]] 

1253 Mapping of special dimension keys to workflow job names. 

1254 The sort for the special ordering is over the keys. 

1255 """ 

1256 noop_names: list[str] = [] 

1257 prev_name: str | None = None 

1258 for dim_key, job_list in sorted(dims_to_jobs.items()): 

1259 _LOG.debug("group %s: dim_key=%s", group, dim_key) 

1260 noop_name = f"noop_{group}_{'=='.join([str(dk) for dk in dim_key])}" 

1261 if noop_names: 

1262 prev_name = noop_names[-1] 

1263 noop_names.append(noop_name) 

1264 

1265 self._update_single_noop(job_list, noop_name, group, prev_name) 

1266 

1267 # As implemented, loop adds one last NOOP job with 

1268 # 0 children. Remove it as not useful. 

1269 assert self.out_degree(noop_names[-1]) == 0 

1270 self.remove_node(noop_names[-1]) 

1271 

1272 def _update_single_noop( 

1273 self, job_list: list[str], order_node_name: str, order_label: str, prev_order_name: str | None 

1274 ) -> None: 

1275 """Update the workflow around the single job making special NOOP 

1276 jobs when necessary. 

1277 

1278 Parameters 

1279 ---------- 

1280 job_list : `list` [`str`] 

1281 Current jobs involved in special ordering 

1282 order_node_name : `str` 

1283 Name for the order NOOP job. If it does not exist in workflow, a 

1284 new NOOP job with this name will be created and added. 

1285 order_label : `str` 

1286 Label for the order NOOP job. 

1287 prev_order_name: `str` or None 

1288 Name of the previous order NOOP job used to add edge as predecessor 

1289 of current job. 

1290 """ 

1291 if order_node_name not in self: 

1292 _LOG.debug("Adding new ordering node %s", order_node_name) 

1293 order_node = GenericWorkflowNoopJob(order_node_name, order_label) 

1294 self.add_job(order_node) 

1295 

1296 subgraph = DiGraph(self).subgraph(job_list) 

1297 sinks = [n for n in job_list if subgraph.out_degree(n) == 0] 

1298 for job_name in sinks: 

1299 self.add_edge(job_name, order_node_name) 

1300 

1301 if prev_order_name: 

1302 sources = [n for n in job_list if subgraph.in_degree(n) == 0] 

1303 for job_name in sources: 

1304 _LOG.debug("Adding edge %s to %s", prev_order_name, job_name) 

1305 self.add_edge(prev_order_name, job_name) 

1306 

1307 

1308@dataclasses.dataclass(slots=True) 

1309class GenericWorkflowGroup(GenericWorkflowNode, GenericWorkflow): 

1310 """Node representing a group of jobs. Used for special dependencies. 

1311 

1312 Parameters 

1313 ---------- 

1314 name : `str` 

1315 Name of node. Must be unique within workflow. 

1316 label : `str` 

1317 Primary user-facing label for job. Does not need to be unique and 

1318 may be used for summary reports or to group nodes. 

1319 blocking : `bool` 

1320 Whether a failure inside group prunes executions of remaining groups. 

1321 """ 

1322 

1323 blocking: bool = False 

1324 """Whether a failure inside group prunes executions of remaining groups.""" 

1325 

1326 @property 

1327 def node_type(self) -> GenericWorkflowNodeType: 

1328 """Indicate this is a group job.""" 

1329 return GenericWorkflowNodeType.GROUP 

1330 

1331 def __init__(self, name: str, label: str, blocking: bool = False) -> None: 

1332 """Initialize each parent class.""" 

1333 _LOG.debug("%s %s %s", name, label, blocking) 

1334 GenericWorkflowNode.__init__(self, name, label) 

1335 GenericWorkflow.__init__(self, name) 

1336 self.blocking = blocking 

1337 

1338 

1339class GenericWorkflowLabels: 

1340 """Label-oriented representation of the GenericWorkflowJobs.""" 

1341 

1342 def __init__(self) -> None: 

1343 self._label_graph = DiGraph() # Dependency graph of job labels 

1344 self._label_to_jobs: defaultdict[str, list[GenericWorkflowJob]] = defaultdict( 

1345 list 

1346 ) # mapping job label to list of GenericWorkflowJob 

1347 

1348 @property 

1349 def labels(self) -> list[str]: 

1350 """List of job labels (`list` [`str`], read-only).""" 

1351 return list(topological_sort(self._label_graph)) 

1352 

1353 @property 

1354 def job_counts(self) -> Counter[str]: 

1355 """Count of jobs per job label (`collections.Counter`).""" 

1356 return Counter({label: len(self._label_to_jobs[label]) for label in self.labels}) 

1357 

1358 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]: 

1359 """Retrieve jobs by label from workflow. 

1360 

1361 Parameters 

1362 ---------- 

1363 label : `str` 

1364 Label of jobs to retrieve. 

1365 

1366 Returns 

1367 ------- 

1368 jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`] 

1369 Jobs having given label. 

1370 """ 

1371 return self._label_to_jobs[label] 

1372 

1373 def add_job(self, job: GenericWorkflowJob, parent_labels: list[str], child_labels: list[str]) -> None: 

1374 """Add job's label to labels. 

1375 

1376 Parameters 

1377 ---------- 

1378 job : `lsst.ctrl.bps.GenericWorkflowJob` 

1379 The job to add to the job labels. 

1380 parent_labels : `list` [`str`] 

1381 Parent job labels. 

1382 child_labels : `list` [`str`] 

1383 Children job labels. 

1384 """ 

1385 _LOG.debug("job: %s (%s)", job.name, job.label) 

1386 _LOG.debug("parent_labels: %s", parent_labels) 

1387 _LOG.debug("child_labels: %s", child_labels) 

1388 self._label_to_jobs[job.label].append(job) 

1389 self._label_graph.add_node(job.label) 

1390 for parent in parent_labels: 

1391 self._label_graph.add_edge(parent, job.label) 

1392 for child in child_labels: 

1393 self._label_graph.add_edge(job.label, child) 

1394 

1395 def add_job_relationships(self, parent_labels: list[str], children_labels: list[str]) -> None: 

1396 """Add dependencies between parent and child job labels. 

1397 All parents will be connected to all children. 

1398 

1399 Parameters 

1400 ---------- 

1401 parent_labels : `list` [`str`] 

1402 Parent job labels. 

1403 children_labels : `list` [`str`] 

1404 Children job labels. 

1405 """ 

1406 # Since labels, must ensure not adding edge from label to itself. 

1407 edges = [ 

1408 e 

1409 for e in itertools.product(ensure_iterable(parent_labels), ensure_iterable(children_labels)) 

1410 if e[0] != e[1] 

1411 ] 

1412 

1413 self._label_graph.add_edges_from(edges) 

1414 

1415 def del_job(self, job: GenericWorkflowJob) -> None: 

1416 """Delete job and its label from job labels. 

1417 

1418 Parameters 

1419 ---------- 

1420 job : `lsst.ctrl.bps.GenericWorkflowJob` 

1421 The job to delete from the job labels. 

1422 """ 

1423 self._label_to_jobs[job.label].remove(job) 

1424 # Don't leave keys around if removed last job 

1425 if not self._label_to_jobs[job.label]: 

1426 del self._label_to_jobs[job.label] 

1427 

1428 parents = self._label_graph.predecessors(job.label) 

1429 children = self._label_graph.successors(job.label) 

1430 self._label_graph.remove_node(job.label) 

1431 self._label_graph.add_edges_from( 

1432 itertools.product(ensure_iterable(parents), ensure_iterable(children)) 

1433 ) 

1434 

1435 def subgraph(self, labels: Iterable[str]) -> DiGraph: 

1436 """Create subgraph of workflow label graph with given labels. 

1437 

1438 Parameters 

1439 ---------- 

1440 labels : Iterable [`str`] 

1441 Labels to appear in subgraph. 

1442 

1443 Returns 

1444 ------- 

1445 subgraph : `networkx.DiGraph` 

1446 Subgraph of workflow label graph with given labels. 

1447 """ 

1448 return self._label_graph.subgraph(labels)