Coverage for python / lsst / ctrl / bps / generic_workflow.py: 28%
555 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Class definitions for a Generic Workflow Graph."""
30__all__ = [
31 "GenericWorkflow",
32 "GenericWorkflowExec",
33 "GenericWorkflowFile",
34 "GenericWorkflowGroup",
35 "GenericWorkflowJob",
36 "GenericWorkflowNode",
37 "GenericWorkflowNodeType",
38 "GenericWorkflowNoopJob",
39]
42import dataclasses
43import itertools
44import logging
45import pickle
46from collections import Counter, defaultdict
47from collections.abc import Iterable, Iterator
48from enum import IntEnum, auto
49from typing import IO, Any, BinaryIO, Literal, cast, overload
51from networkx import DiGraph, topological_sort
52from networkx.algorithms.dag import is_directed_acyclic_graph
54from lsst.utils.iteration import ensure_iterable
56from .bps_draw import draw_networkx_dot
57from .bps_utils import subset_dimension_values
59_LOG = logging.getLogger(__name__)
62@dataclasses.dataclass(slots=True)
63class GenericWorkflowFile:
64 """Information about a file that may be needed by various workflow
65 management services.
66 """
68 name: str
69 """Lookup key (logical file name) of file/directory. Must be unique
70 within run.
71 """
73 src_uri: str | None = None # don't know that need ResourcePath
74 """Original location of file/directory.
75 """
77 wms_transfer: bool = False
78 """Whether the WMS should ignore file or not. Default is False.
79 """
81 job_access_remote: bool = False
82 """Whether the job can remotely access file (using separately specified
83 file access protocols). Default is False.
84 """
86 job_shared: bool = False
87 """Whether job requires its own copy of this file. Default is False.
88 """
90 def __hash__(self) -> int:
91 return hash(self.name)
94@dataclasses.dataclass(slots=True)
95class GenericWorkflowExec:
96 """Information about an executable that may be needed by various workflow
97 management services.
98 """
100 name: str
101 """Lookup key (logical file name) of executable. Must be unique
102 within run.
103 """
105 src_uri: str | None = None # don't know that need ResourcePath
106 """Original location of executable.
107 """
109 transfer_executable: bool = False
110 """Whether the WMS/plugin is responsible for staging executable to
111 location usable by job.
112 """
114 def __hash__(self) -> int:
115 return hash(self.name)
118class GenericWorkflowNodeType(IntEnum):
119 """Type of valid types for nodes in the GenericWorkflow."""
121 NOOP = auto()
122 """Does nothing, but enforces special dependencies."""
124 PAYLOAD = auto()
125 """Typical workflow job."""
127 GROUP = auto()
128 """A special group (subdag) of jobs."""
131@dataclasses.dataclass(slots=True)
132class GenericWorkflowNode:
133 """Base class for nodes in the GenericWorkflow."""
135 name: str
136 """Name of node. Must be unique within workflow."""
138 label: str
139 """"Primary user-facing label for job. Does not need to be unique and
140 may be used for summary reports or to group nodes."""
142 def __hash__(self) -> int:
143 return hash(self.name)
145 @property
146 def node_type(self) -> GenericWorkflowNodeType:
147 """Type of node."""
148 raise NotImplementedError(f"{type(self).__name__} needs to override node_type.")
151@dataclasses.dataclass(slots=True)
152class GenericWorkflowNoopJob(GenericWorkflowNode):
153 """Job that does no work. Used for special dependencies."""
155 @property
156 def node_type(self) -> GenericWorkflowNodeType:
157 """Indicate this is a noop job."""
158 return GenericWorkflowNodeType.NOOP
161@dataclasses.dataclass(slots=True)
162class GenericWorkflowJob(GenericWorkflowNode):
163 """Information about a job that may be needed by various workflow
164 management services.
165 """
167 quanta_counts: Counter[str] = dataclasses.field(default_factory=Counter)
168 """Counts of quanta per task label in job.
169 """
171 tags: dict[str, Any] = dataclasses.field(default_factory=dict)
172 """Other key/value pairs for job that user may want to use as a filter.
173 """
175 executable: GenericWorkflowExec | None = None
176 """Executable for job.
177 """
179 arguments: str | None = None
180 """Command line arguments for job.
181 """
183 cmdvals: dict[str, Any] = dataclasses.field(default_factory=dict)
184 """Values for variables in cmdline when using lazy command line creation.
185 """
187 memory_multiplier: float | None = None
188 """Memory growth rate between retries.
189 """
191 request_memory: int | None = None # MB
192 """Max memory (in MB) that the job is expected to need.
193 """
195 request_memory_max: int | None = None # MB
196 """Max memory (in MB) that the job should ever use.
197 """
199 request_cpus: int | None = None # cores
200 """Max number of cpus that the job is expected to need.
201 """
203 request_disk: int | None = None # MB
204 """Max amount of job scratch disk (in MB) that the job is expected to need.
205 """
207 request_walltime: str | None = None # minutes
208 """Max amount of time (in seconds) that the job is expected to need.
209 """
211 compute_site: str | None = None
212 """Key to look up site-specific information for running the job.
213 """
215 accounting_group: str | None = None
216 """Name of the accounting group to use.
217 """
219 accounting_user: str | None = None
220 """Name of the user to use for accounting purposes.
221 """
223 mail_to: str | None = None
224 """Comma separated list of email addresses for emailing job status.
225 """
227 when_to_mail: str | None = None
228 """WMS-specific terminology for when to email job status.
229 """
231 number_of_retries: int | None = None
232 """Number of times to automatically retry a failed job.
233 """
235 retry_unless_exit: int | list[int] | None = None
236 """Exit code(s) for job that means to not automatically retry.
237 """
239 abort_on_value: int | None = None
240 """Job exit value for signals to abort the entire workflow.
241 """
243 abort_return_value: int | None = None
244 """Exit value to use when aborting the entire workflow.
245 """
247 priority: str | None = None
248 """Initial priority of job in WMS-format.
249 """
251 category: str | None = None
252 """WMS-facing label of job within single workflow (e.g., can be used for
253 throttling jobs within a single workflow).
254 """
256 concurrency_limit: str | None = None
257 """Names of concurrency limits that the WMS plugin can appropriately
258 translate to limit the number of this job across all running workflows.
259 """
261 queue: str | None = None
262 """Name of queue to use. Different WMS can translate this concept
263 differently.
264 """
266 pre_cmdline: str | None = None
267 """Command line to be executed prior to executing job.
268 """
270 post_cmdline: str | None = None
271 """Command line to be executed after job executes.
273 Should be executed regardless of exit status.
274 """
276 preemptible: bool | None = None
277 """The flag indicating whether the job can be preempted.
278 """
280 profile: dict[str, Any] = dataclasses.field(default_factory=dict)
281 """Nested dictionary of WMS-specific key/value pairs with primary key being
282 WMS key (e.g., pegasus, condor, panda).
283 """
285 attrs: dict[str, Any] = dataclasses.field(default_factory=dict)
286 """Key/value pairs of job attributes (for WMS that have attributes in
287 addition to commands).
288 """
290 environment: dict[str, Any] = dataclasses.field(default_factory=dict)
291 """Environment variable names and values to be explicitly set inside job.
292 """
294 compute_cloud: str | None = None
295 """Key to look up cloud-specific information for running the job.
296 """
298 @property
299 def node_type(self) -> GenericWorkflowNodeType:
300 """Indicate this is a payload job."""
301 return GenericWorkflowNodeType.PAYLOAD
304class GenericWorkflow(DiGraph):
305 """A generic representation of a workflow used to submit to specific
306 workflow management systems.
308 Parameters
309 ----------
310 name : `str`
311 Name of generic workflow.
312 incoming_graph_data : `~typing.Any`, optional
313 Data used to initialized graph that is passed through to DiGraph
314 constructor. Can be any type supported by networkx.DiGraph.
315 **attr : `dict`
316 Keyword arguments passed through to DiGraph constructor.
317 """
319 def __init__(self, name: str, incoming_graph_data: Any | None = None, **attr: Any) -> None:
320 super().__init__(incoming_graph_data, **attr)
321 self._name = name
322 self.run_attrs: dict[str, str] = {}
323 self._job_labels = GenericWorkflowLabels()
324 self._files: dict[str, GenericWorkflowFile] = {}
325 self._executables: dict[str, GenericWorkflowExec] = {}
326 self._inputs: dict[
327 str, list[GenericWorkflowFile]
328 ] = {} # mapping job.names to list of GenericWorkflowFile
329 self._outputs: dict[
330 str, list[GenericWorkflowFile]
331 ] = {} # mapping job.names to list of GenericWorkflowFile
332 self.run_id = None
333 self._final: GenericWorkflowJob | GenericWorkflow | None = None
335 # Starting from ver. 3.6 of NetworkX, the DiGraph class defines its custom
336 # __new__ method that explicitly defines arguments it accepts. As a result,
337 # we need to override it to let our subclass use different ones.
338 #
339 # Notes
340 # -----
341 # Most likely overriding __new__ in this manner will prevent us from using
342 # different graph backends with our subclass. However, since we are not
343 # using any backends, this should not be a problem at the moment.
344 def __new__(cls, *args, **kwargs) -> "GenericWorkflow":
345 return object.__new__(cls)
347 @property
348 def name(self) -> str:
349 """Retrieve name of generic workflow.
351 Returns
352 -------
353 name : `str`
354 Name of generic workflow.
355 """
356 return self._name
358 @property
359 def quanta_counts(self) -> Counter[str]:
360 """Count of quanta per task label (`collections.Counter`)."""
361 qcounts: Counter[str] = Counter()
362 for job_name in self:
363 gwjob = self.get_job(job_name)
364 if hasattr(gwjob, "quanta_counts"):
365 qcounts += gwjob.quanta_counts
366 return qcounts
368 @property
369 def labels(self) -> list[str]:
370 """Job labels (`list` [`str`], read-only)."""
371 return self._job_labels.labels
373 def regenerate_labels(self) -> None:
374 """Regenerate the list of job labels."""
375 self._job_labels = GenericWorkflowLabels()
376 for job_name in self:
377 job = self.get_job(job_name)
378 if job.node_type == GenericWorkflowNodeType.PAYLOAD:
379 job = cast(GenericWorkflowJob, job)
380 parents_labels: list[str] = []
381 for parent_name in self.predecessors(job.name):
382 parent_job = self.get_job(parent_name)
383 if parent_job.node_type == GenericWorkflowNodeType.PAYLOAD:
384 # parent_job = cast(GenericWorkflowJob, parent_job)
385 parents_labels.append(parent_job.label)
386 children_labels: list[str] = []
387 for child_name in self.successors(job.name):
388 child_job = self.get_job(child_name)
389 if child_job.node_type == GenericWorkflowNodeType.PAYLOAD:
390 # child_job = cast(GenericWorkflowJob, child_job)
391 children_labels.append(child_job.label)
392 self._job_labels.add_job(job, parents_labels, children_labels)
394 @property
395 def job_counts(self) -> Counter[str]:
396 """Count of jobs per job label (`collections.Counter`)."""
397 jcounts = self._job_labels.job_counts
399 # Final is separate
400 final = self.get_final()
401 if final:
402 if isinstance(final, GenericWorkflow):
403 jcounts.update(final.job_counts)
404 else:
405 jcounts[final.label] += 1
407 return jcounts
409 def __iter__(self) -> Iterator[str]:
410 """Return iterator of job names in topologically sorted order."""
411 return topological_sort(self)
413 @overload
414 def get_files(self, data: Literal[False], transfer_only: bool = True) -> list[str]: ... 414 ↛ exitline 414 didn't return from function 'get_files' because
416 @overload
417 def get_files(self, data: Literal[True], transfer_only: bool = True) -> list[GenericWorkflowFile]: ... 417 ↛ exitline 417 didn't return from function 'get_files' because
419 def get_files(
420 self, data: bool = False, transfer_only: bool = True
421 ) -> list[GenericWorkflowFile] | list[str]:
422 """Retrieve files from generic workflow.
424 Need API in case change way files are stored (e.g., make
425 workflow a bipartite graph with jobs and files nodes).
427 Parameters
428 ----------
429 data : `bool`, optional
430 Whether to return the file data as well as the file object name
431 (The default is `False`).
432 transfer_only : `bool`, optional
433 Whether to only return files for which a workflow management system
434 would be responsible for transferring.
436 Returns
437 -------
438 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`]
439 File names or objects from generic workflow meeting specifications.
440 """
441 files: list[Any] = [] # Any for mypy to allow the different append lines.
442 for filename, file in self._files.items():
443 if not transfer_only or file.wms_transfer:
444 if not data:
445 files.append(filename)
446 else:
447 files.append(file)
448 return files
450 def add_job(
451 self,
452 job: GenericWorkflowNode,
453 parent_names: str | list[str] | None = None,
454 child_names: str | list[str] | None = None,
455 ) -> None:
456 """Add job to generic workflow.
458 Parameters
459 ----------
460 job : `lsst.ctrl.bps.GenericWorkflowNode`
461 Job to add to the generic workflow.
462 parent_names : `str` | `list` [`str`], optional
463 Names of jobs that are parents of given job.
464 child_names : `str` | `list` [`str`], optional
465 Names of jobs that are children of given job.
466 """
467 _LOG.debug("job: %s (%s)", job.name, job.label)
468 _LOG.debug("parent_names: %s", parent_names)
469 _LOG.debug("child_names: %s", child_names)
470 if not isinstance(job, GenericWorkflowNode):
471 raise RuntimeError(f"Invalid type for job to be added to GenericWorkflowGraph ({type(job)}).")
472 if self.has_node(job.name):
473 raise RuntimeError(f"Job {job.name} already exists in GenericWorkflowGraph.")
474 super().add_node(job.name, job=job)
475 self.add_job_relationships(parent_names, job.name)
476 self.add_job_relationships(job.name, child_names)
477 if job.node_type == GenericWorkflowNodeType.PAYLOAD:
478 job = cast(GenericWorkflowJob, job)
479 self.add_executable(job.executable)
480 self._job_labels.add_job(
481 job,
482 [self.get_job(p).label for p in self.predecessors(job.name)],
483 [self.get_job(p).label for p in self.successors(job.name)],
484 )
486 def add_node(self, node_for_adding: GenericWorkflowNode, **attr: Any) -> None:
487 """Override networkx function to call more specific add_job function.
489 Parameters
490 ----------
491 node_for_adding : `lsst.ctrl.bps.GenericWorkflowJob`
492 Job to be added to generic workflow.
493 **attr : `~typing.Any`
494 Needed to match original networkx function, but not used.
495 """
496 self.add_job(node_for_adding)
498 def add_job_relationships(
499 self, parents: str | list[str] | None, children: str | list[str] | None
500 ) -> None:
501 """Add dependencies between parent and child jobs. All parents will
502 be connected to all children.
504 Parameters
505 ----------
506 parents : `str` or `list` [`str`], optional
507 Parent job names.
508 children : `str` or `list` [`str`], optional
509 Children job names.
510 """
511 # Allow this to be a noop if no parents or no children
512 if parents is not None and children is not None:
513 self.add_edges_from(itertools.product(ensure_iterable(parents), ensure_iterable(children)))
514 self._job_labels.add_job_relationships(
515 [self.get_job(n).label for n in ensure_iterable(parents)],
516 [self.get_job(n).label for n in ensure_iterable(children)],
517 )
519 def add_edges_from(self, ebunch_to_add: Iterable[tuple[str, str]], **attr: Any) -> None:
520 """Add several edges between jobs in the generic workflow.
522 Parameters
523 ----------
524 ebunch_to_add : Iterable [`tuple` [`str`, `str`]]
525 Iterable of job name pairs between which a dependency should be
526 saved.
527 **attr : `~typing.Any`
528 Data can be assigned using keyword arguments (not currently used).
529 """
530 for edge_to_add in ebunch_to_add:
531 self.add_edge(edge_to_add[0], edge_to_add[1], **attr)
533 def add_edge(self, u_of_edge: str, v_of_edge: str, **attr: Any) -> None:
534 """Add edge connecting jobs in workflow.
536 Parameters
537 ----------
538 u_of_edge : `str`
539 Name of parent job.
540 v_of_edge : `str`
541 Name of child job.
542 **attr
543 Attributes to save with edge.
544 """
545 if u_of_edge not in self:
546 raise RuntimeError(f"{u_of_edge} not in GenericWorkflow")
547 if v_of_edge not in self:
548 raise RuntimeError(f"{v_of_edge} not in GenericWorkflow")
549 super().add_edge(u_of_edge, v_of_edge, **attr)
551 def get_job(self, job_name: str) -> GenericWorkflowNode:
552 """Retrieve job by name from workflow.
554 Parameters
555 ----------
556 job_name : `str`
557 Name of job to retrieve.
559 Returns
560 -------
561 job : `lsst.ctrl.bps.GenericWorkflowNode`
562 Job matching given job_name.
563 """
564 return self.nodes[job_name]["job"]
566 def del_job(self, job_name: str) -> None:
567 """Delete job from generic workflow leaving connected graph.
569 Parameters
570 ----------
571 job_name : `str`
572 Name of job to delete from workflow.
573 """
574 job = self.get_job(job_name)
576 # Remove from job labels
577 if isinstance(job, GenericWorkflowJob):
578 self._job_labels.del_job(job)
580 # Connect all parent jobs to all children jobs.
581 parents = list(self.predecessors(job_name))
582 children = list(self.successors(job_name))
583 self.add_job_relationships(parents, children)
585 # Delete job node (which deletes edges).
586 self.remove_node(job_name)
588 def add_job_inputs(self, job_name: str, files: GenericWorkflowFile | list[GenericWorkflowFile]) -> None:
589 """Add files as inputs to specified job.
591 Parameters
592 ----------
593 job_name : `str`
594 Name of job to which inputs should be added.
595 files : `lsst.ctrl.bps.GenericWorkflowFile` or \
596 `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
597 File object(s) to be added as inputs to the specified job.
598 """
599 self._inputs.setdefault(job_name, [])
600 for file in ensure_iterable(files):
601 # Save the central copy
602 if file.name not in self._files:
603 self._files[file.name] = file
605 # Save the job reference to the file
606 self._inputs[job_name].append(file)
608 def get_file(self, name: str) -> GenericWorkflowFile:
609 """Retrieve a file object by name.
611 Parameters
612 ----------
613 name : `str`
614 Name of file object.
616 Returns
617 -------
618 gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
619 File matching given name.
620 """
621 return self._files[name]
623 def add_file(self, gwfile: GenericWorkflowFile) -> None:
624 """Add file object.
626 Parameters
627 ----------
628 gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
629 File object to add to workflow.
630 """
631 if gwfile.name not in self._files:
632 self._files[gwfile.name] = gwfile
633 else:
634 _LOG.debug("Skipped add_file for existing file %s", gwfile.name)
636 @overload
637 def get_job_inputs( 637 ↛ exitline 637 didn't return from function 'get_job_inputs' because
638 self, job_name: str, data: Literal[False], transfer_only: bool = False
639 ) -> list[str]: ...
641 @overload
642 def get_job_inputs( 642 ↛ exitline 642 didn't return from function 'get_job_inputs' because
643 self, job_name: str, data: Literal[True], transfer_only: bool = False
644 ) -> list[GenericWorkflowFile]: ...
646 def get_job_inputs(
647 self, job_name: str, data: bool = True, transfer_only: bool = False
648 ) -> list[GenericWorkflowFile] | list[str]:
649 """Return the input files for the given job.
651 Parameters
652 ----------
653 job_name : `str`
654 Name of the job.
655 data : `bool`, optional
656 Whether to return the file data as well as the file object name.
657 transfer_only : `bool`, optional
658 Whether to only return files for which a workflow management system
659 would be responsible for transferring.
661 Returns
662 -------
663 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`]
664 Input files for the given job. If no input files for the job,
665 returns an empty list.
666 """
667 inputs: list[Any] = [] # Any for mypy to allow the different append lines.
668 if job_name in self._inputs:
669 for gwfile in self._inputs[job_name]:
670 if not transfer_only or gwfile.wms_transfer:
671 if not data:
672 inputs.append(gwfile.name)
673 else:
674 inputs.append(gwfile)
675 return inputs
677 def add_job_outputs(self, job_name: str, files: list[GenericWorkflowFile]) -> None:
678 """Add output files to a job.
680 Parameters
681 ----------
682 job_name : `str`
683 Name of job to which the files should be added as outputs.
684 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
685 File objects to be added as outputs for specified job.
686 """
687 self._outputs.setdefault(job_name, [])
689 for file_ in ensure_iterable(files):
690 # Save the central copy
691 if file_.name not in self._files:
692 self._files[file_.name] = file_
694 # Save the job reference to the file
695 self._outputs[job_name].append(file_)
697 @overload
698 def get_job_outputs( 698 ↛ exitline 698 didn't return from function 'get_job_outputs' because
699 self, job_name: str, data: Literal[False], transfer_only: bool = False
700 ) -> list[str]: ...
702 @overload
703 def get_job_outputs( 703 ↛ exitline 703 didn't return from function 'get_job_outputs' because
704 self, job_name: str, data: Literal[True], transfer_only: bool = False
705 ) -> list[GenericWorkflowFile]: ...
707 def get_job_outputs(
708 self, job_name: str, data: bool = True, transfer_only: bool = False
709 ) -> list[GenericWorkflowFile] | list[str]:
710 """Return the output files for the given job.
712 Parameters
713 ----------
714 job_name : `str`
715 Name of the job.
716 data : `bool`
717 Whether to return the file data as well as the file object name.
718 It defaults to `True` thus returning file data as well.
719 transfer_only : `bool`
720 Whether to only return files for which a workflow management system
721 would be responsible for transferring. It defaults to `False` thus
722 returning all output files.
724 Returns
725 -------
726 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or \
727 `list` [`str`]
728 Output files for the given job. If no output files for the job,
729 returns an empty list.
730 """
731 outputs: list[Any] = [] # Any for mypy to allow the different append lines.
732 if not data:
733 outputs = cast(list[str], outputs)
734 else:
735 outputs = cast(list[GenericWorkflowFile], outputs)
737 if job_name in self._outputs:
738 for gwfile in self._outputs[job_name]:
739 if not transfer_only or gwfile.wms_transfer:
740 if not data:
741 outputs.append(gwfile.name)
742 else:
743 outputs.append(gwfile)
744 return outputs
746 def draw(self, stream: str | IO[str], format_: str = "dot") -> None:
747 """Output generic workflow in a visualization format.
749 Parameters
750 ----------
751 stream : `str` or `io.BufferedIOBase`
752 Stream to which the visualization should be written.
753 format_ : `str`, optional
754 Which visualization format to use. It defaults to the format for
755 the dot program.
756 """
757 draw_funcs = {"dot": draw_networkx_dot}
758 if format_ in draw_funcs:
759 draw_funcs[format_](self, stream)
760 else:
761 raise RuntimeError(f"Unknown draw format ({format_})")
763 def save(self, stream: str | IO[bytes], format_: str = "pickle") -> None:
764 """Save the generic workflow in a format that is loadable.
766 Parameters
767 ----------
768 stream : `str` or `io.BufferedIOBase`
769 Stream to pass to the format-specific writer. Accepts anything
770 that the writer accepts.
771 format_ : `str`, optional
772 Format in which to write the data. It defaults to pickle format.
773 """
774 if format_ == "pickle":
775 stream = cast(BinaryIO, stream)
776 pickle.dump(self, stream)
777 else:
778 raise RuntimeError(f"Unknown format ({format_})")
780 @classmethod
781 def load(cls, stream: str | IO[bytes], format_: str = "pickle") -> "GenericWorkflow":
782 """Load a GenericWorkflow from the given stream.
784 Parameters
785 ----------
786 stream : `str` or `io.BufferedIOBase`
787 Stream to pass to the format-specific loader. Accepts anything that
788 the loader accepts.
789 format_ : `str`, optional
790 Format of data to expect when loading from stream. It defaults
791 to pickle format.
793 Returns
794 -------
795 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
796 Generic workflow loaded from the given stream.
797 """
798 if format_ == "pickle":
799 stream = cast(BinaryIO, stream)
800 object_ = pickle.load(stream)
801 assert isinstance(object_, GenericWorkflow) # for mypy
802 return object_
804 raise RuntimeError(f"Unknown format ({format_})")
806 def validate(self) -> None:
807 """Run checks to ensure that the generic workflow graph is valid."""
808 # Make sure a directed acyclic graph
809 assert is_directed_acyclic_graph(self)
811 def add_workflow_source(self, workflow: "GenericWorkflow") -> None:
812 """Add given workflow as new source to this workflow.
814 Parameters
815 ----------
816 workflow : `lsst.ctrl.bps.GenericWorkflow`
817 The given workflow.
818 """
819 # Find source nodes in self.
820 self_sources = [n for n in self if self.in_degree(n) == 0]
821 _LOG.debug("self_sources = %s", self_sources)
823 # Find sink nodes of workflow.
824 new_sinks = [n for n in workflow if workflow.out_degree(n) == 0]
825 _LOG.debug("new sinks = %s", new_sinks)
827 # Add new workflow nodes to self graph and make new edges.
828 self.add_nodes_from(workflow.nodes(data=True))
829 self.add_edges_from(workflow.edges())
830 for source in self_sources:
831 for sink in new_sinks:
832 self.add_edge(sink, source)
834 # Add separately stored info
835 for job_name in workflow:
836 job = self.get_job(job_name)
837 # Add job labels
838 if isinstance(job, GenericWorkflowJob):
839 self._job_labels.add_job(
840 job,
841 [self.get_job(p).label for p in self.predecessors(job.name)],
842 [self.get_job(p).label for p in self.successors(job.name)],
843 )
844 # Executables are stored separately so copy them.
845 self.add_executable(job.executable)
847 # Files are stored separately so copy them.
848 self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True))
849 self.add_job_outputs(job_name, workflow.get_job_outputs(job_name, data=True))
851 def add_final(self, final: "GenericWorkflowJob | GenericWorkflow") -> None:
852 """Add special final job/workflow to the generic workflow.
854 Parameters
855 ----------
856 final : `lsst.ctrl.bps.GenericWorkflowJob` or \
857 `lsst.ctrl.bps.GenericWorkflow`
858 Information needed to execute the special final job(s), the
859 job(s) to be executed after all jobs that can be executed
860 have been executed regardless of exit status of any of the
861 jobs.
862 """
863 if not isinstance(final, GenericWorkflowJob) and not isinstance(final, GenericWorkflow):
864 raise TypeError("Invalid type for GenericWorkflow final ({type(final)})")
866 self._final = final
867 if isinstance(final, GenericWorkflowJob):
868 self.add_executable(final.executable)
870 def get_final(self) -> "GenericWorkflowJob | GenericWorkflow | None":
871 """Return job/workflow to be executed after all jobs that can be
872 executed have been executed regardless of exit status of any of
873 the jobs.
875 Returns
876 -------
877 final : `lsst.ctrl.bps.GenericWorkflowJob` or \
878 `lsst.ctrl.bps.GenericWorkflow`
879 Information needed to execute final job(s).
880 """
881 return self._final
883 def add_executable(self, executable: GenericWorkflowExec | None) -> None:
884 """Add executable to workflow's list of executables.
886 Parameters
887 ----------
888 executable : `lsst.ctrl.bps.GenericWorkflowExec`
889 Executable object to be added to workflow.
890 """
891 if executable is not None:
892 self._executables[executable.name] = executable
893 else:
894 _LOG.warning("executable not specified (None); cannot add to the workflow's list of executables")
896 @overload
897 def get_executables(self, data: Literal[False], transfer_only: bool = False) -> list[str]: ... 897 ↛ exitline 897 didn't return from function 'get_executables' because
899 @overload
900 def get_executables( 900 ↛ exitline 900 didn't return from function 'get_executables' because
901 self, data: Literal[True], transfer_only: bool = False
902 ) -> list[GenericWorkflowExec]: ...
904 def get_executables(
905 self, data: bool = False, transfer_only: bool = True
906 ) -> list[GenericWorkflowExec] | list[str]:
907 """Retrieve executables from generic workflow.
909 Parameters
910 ----------
911 data : `bool`, optional
912 Whether to return the executable data as well as the exec object
913 name (The defaults is False).
914 transfer_only : `bool`, optional
915 Whether to only return executables for which transfer_executable
916 is True.
918 Returns
919 -------
920 execs : `list` [`lsst.ctrl.bps.GenericWorkflowExec`] or `list` [`str`]
921 Filtered executable names or objects from generic workflow.
922 """
923 execs: list[Any] = [] # This and cast lines for mypy
924 if not data:
925 execs = cast(list[str], execs)
926 else:
927 execs = cast(list[GenericWorkflowExec], execs)
929 for name, executable in self._executables.items():
930 if not transfer_only or executable.transfer_executable:
931 if not data:
932 execs.append(name)
933 else:
934 execs.append(executable)
935 return execs
937 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]:
938 """Retrieve jobs by label from workflow.
940 Parameters
941 ----------
942 label : `str`
943 Label of jobs to retrieve.
945 Returns
946 -------
947 jobs : list[`lsst.ctrl.bps.GenericWorkflowNode`]
948 Jobs having given label.
949 """
950 return self._job_labels.get_jobs_by_label(label)
952 def _check_job_ordering_config(self, ordering_config: dict[str, Any]) -> dict[str, DiGraph]:
953 """Check configuration related to job ordering.
955 Parameters
956 ----------
957 ordering_config : `dict` [`str`, `~typing.Any`]
958 Job ordering configuration to check.
960 Returns
961 -------
962 group_to_label_subgraph : `dict` [`str`, `network.DiGraph`]
963 Mapping of group name to a graph of the job labels in the group.
964 """
965 group_to_label_subgraph = {}
966 job_label_to_group: dict[str, str] = {} # Checking label appears only in one group
967 for group, group_vals in ordering_config.items():
968 implementation = group_vals.get("implementation", "group")
969 if implementation not in ["noop", "group"]:
970 raise RuntimeError(f"Invalid implementation for {group}: {implementation}")
971 ordering_type = group_vals.get("ordering_type", "sort")
972 if ordering_type != "sort":
973 raise RuntimeError(f"Invalid ordering_type for {group}: {ordering_type}")
974 if "dimensions" not in group_vals:
975 raise KeyError(f"Missing dimensions entry in ordering group {group}")
977 job_labels = [x.strip() for x in group_vals["labels"].split(",")]
978 _LOG.debug("group %s: job_labels=%s", group, job_labels)
979 unused_labels = []
980 for job_label in job_labels:
981 if job_label not in self._job_labels.labels:
982 unused_labels.append(job_label)
983 elif job_label in job_label_to_group:
984 raise RuntimeError(
985 f"Job label {job_label} appears in more than one job ordering group "
986 f"({group} {job_label_to_group[job_label]})"
987 )
988 else:
989 job_label_to_group[job_label] = group
991 if unused_labels:
992 _LOG.info("Workflow job labels = %s", ",".join(self._job_labels.labels))
993 _LOG.warning(
994 "Job label(s) (%s) from job ordering group %s does not exist in workflow.",
995 ",".join(unused_labels),
996 group,
997 )
999 label_subgraph = self._job_labels.subgraph(job_labels)
1000 group_to_label_subgraph[group] = label_subgraph
1002 return group_to_label_subgraph
1004 def _group_jobs_by_values(
1005 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph
1006 ) -> dict[tuple[Any, ...], list[str]]:
1007 """Create job mapping of special sortable dimension key
1008 to job name by comparing dimension values.
1010 Parameters
1011 ----------
1012 group : `str`
1013 Name of group for which creating mapping.
1014 group_config : `dict` [`str`, `~typing.Any`]
1015 Config for group for which creating mapping.
1016 label_subgraph : `networkx.DiGraph`
1017 The graph of job labels to be used in mapping.
1019 Returns
1020 -------
1021 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1022 Mapping of dimensions to job names.
1023 """
1024 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {}
1025 for job_label in label_subgraph:
1026 jobs = self.get_jobs_by_label(job_label)
1027 for job in jobs:
1028 job_dim_values = subset_dimension_values(
1029 f"Job {job.name}",
1030 f"order group {group}",
1031 group_config["dimensions"],
1032 job.tags,
1033 group_config.get("equalDimensions", None),
1034 )
1035 job_dims = []
1036 for dim in [d.strip() for d in group_config["dimensions"].split(",")]:
1037 job_dims.append(job_dim_values[dim])
1038 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), [])
1039 dims_job_list.append(job.name)
1040 return dims_to_jobs
1042 def _group_jobs_by_dependencies(
1043 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph
1044 ) -> dict[tuple[Any, ...], list[str]]:
1045 """Create job mapping of special sortable dimension key
1046 to job name by following dependencies.
1048 Parameters
1049 ----------
1050 group : `str`
1051 Name of group for which creating mapping.
1052 group_config : `dict` [`str`, `~typing.Any`]
1053 Config for group for which creating mapping.
1054 label_subgraph : `networkx.DiGraph`
1055 The graph of job labels to be used in mapping.
1057 Returns
1058 -------
1059 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1060 Mapping of dimensions to job names.
1061 """
1062 method = group_config["findDependencyMethod"]
1063 dim_labels = list(topological_sort(label_subgraph))
1064 match method:
1065 case "source":
1066 find_potential_jobs = self.successors
1067 case "sink":
1068 find_potential_jobs = self.predecessors
1069 dim_labels.reverse()
1070 case _:
1071 raise RuntimeError(f"Invalid findDependencyMethod ({method})")
1073 jobs_seen: set[str] = set()
1074 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {}
1075 for label in dim_labels:
1076 jobs = self.get_jobs_by_label(label)
1077 for job in jobs:
1078 if job.name not in jobs_seen:
1079 jobs_seen.add(job.name)
1080 job_dim_values = subset_dimension_values(
1081 f"Job {job.name}",
1082 f"order group {group}",
1083 group_config["dimensions"],
1084 job.tags,
1085 group_config.get("equalDimensions", None),
1086 )
1087 job_dims = []
1088 for dim in [d.strip() for d in group_config["dimensions"].split(",")]:
1089 job_dims.append(job_dim_values[dim])
1090 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), [])
1091 dims_job_list.append(job.name)
1092 # Use dependencies to find other quantum to add
1093 # Note: in testing, using the following code was faster
1094 # than using networkx descendants and ancestors functions
1095 # While traversing the subgraph, nodes may appear
1096 # repeatedly in potential_jobs.
1097 jobs_to_use = [job]
1098 while jobs_to_use:
1099 job_to_use = jobs_to_use.pop()
1101 potential_job_names = find_potential_jobs(job_to_use.name)
1102 for potential_job_name in potential_job_names:
1103 potential_job = cast(GenericWorkflowJob, self.get_job(potential_job_name))
1104 if potential_job.label in label_subgraph:
1105 if potential_job.name not in dims_job_list:
1106 _LOG.debug(
1107 "Adding potential job %s (%s) to group %s",
1108 potential_job.name,
1109 potential_job.label,
1110 group,
1111 )
1112 dims_job_list.append(potential_job.name)
1113 jobs_to_use.append(potential_job)
1114 jobs_seen.add(potential_job.name)
1115 else:
1116 _LOG.debug(
1117 "label (%s) not in ordered_tasks. Not adding potential quantum %s",
1118 potential_job.label,
1119 potential_job.name,
1120 )
1121 return dims_to_jobs
1123 def _update_by_group_sort(
1124 self,
1125 group: str,
1126 dims_to_jobs: dict[tuple[Any, ...], list[str]],
1127 blocking: bool = False,
1128 ) -> None:
1129 """Update portion of workflow for special job ordering using sort.
1131 Parameters
1132 ----------
1133 group : `str`
1134 Ordering group label used for job name and messages.
1135 dims_to_jobs: `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1136 Mapping of special dimension keys to workflow job names.
1137 The sort for the special ordering is over the keys.
1138 blocking: `bool`
1139 Whether a failure in a group blocks execution of remaining groups.
1140 """
1141 group_job_names: list[str] = []
1142 for dim_key, job_list in sorted(dims_to_jobs.items()):
1143 _LOG.debug("group %s: dim_key=%s", group, dim_key)
1144 group_job_name = f"group_{group}_{'=='.join([str(dk) for dk in dim_key])}"
1145 prev_name = group_job_names[-1] if group_job_names else None
1146 group_job_names.append(group_job_name)
1148 self._replace_subgraph_with_job_group(group_job_name, group, job_list, blocking)
1150 # ordering between groups
1151 if prev_name:
1152 self.add_edge(prev_name, group_job_name)
1154 def _replace_subgraph_with_job_group(
1155 self, group_name: str, group_label: str, job_list: list[str], blocking: bool
1156 ) -> None:
1157 """Update portion of workflow for special job ordering using groups.
1159 Parameters
1160 ----------
1161 group_name : `str`
1162 Ordering group name.
1163 group_label : `str`
1164 Ordering group label.
1165 job_list: `list` [`str`]
1166 List of job names to put in the group
1167 blocking: `bool`
1168 Whether a failure in a group blocks execution of remaining groups.
1169 """
1170 job_group = GenericWorkflowGroup(group_name, group_label, blocking=blocking)
1171 self.add_node(job_group)
1173 # Add jobs, files, and executables first
1174 # then add edges later to avoid order issues
1175 for job_name in job_list:
1176 job = cast(GenericWorkflowJob, self.get_job(job_name))
1177 job_group.add_job(job)
1178 files = self.get_job_inputs(job_name, data=True)
1179 job_group.add_job_inputs(job_name, files)
1180 files = self.get_job_outputs(job_name, data=True)
1181 job_group.add_job_outputs(job_name, files)
1182 job_group.add_executable(job.executable)
1184 # Can't remove edges while looping through edge view,
1185 # so save to remove after loops
1186 edges_to_remove: list[tuple[str, str]] = []
1187 for job_name in job_list:
1188 in_edges = self.in_edges(job_name)
1189 for u, v in in_edges:
1190 if u in job_list:
1191 job_group.add_edge(u, v)
1192 else:
1193 self.add_edge(u, group_name)
1194 edges_to_remove.append((u, v))
1196 out_edges = self.out_edges(job_name)
1197 for u, v in out_edges:
1198 if v in job_list:
1199 job_group.add_edge(u, v)
1200 else:
1201 self.add_edge(group_name, v)
1202 edges_to_remove.append((u, v))
1204 # Remove edges collected earlier
1205 self.remove_edges_from(edges_to_remove)
1207 # Remove nodes from main GenericWorkflow
1208 self.remove_nodes_from(job_list)
1210 def add_special_job_ordering(self, ordering: dict[str, Any]) -> None:
1211 """Add special nodes and dependencies to enforce given ordering.
1213 Parameters
1214 ----------
1215 ordering : `dict` [`str`, `~typing.Any`]
1216 Description of the job ordering to enforce.
1217 """
1218 group_to_label_subgraph = self._check_job_ordering_config(ordering)
1220 for group, group_vals in ordering.items():
1221 if "findDependencyMethod" in group_vals:
1222 job_grouping_func = self._group_jobs_by_dependencies
1223 else:
1224 job_grouping_func = self._group_jobs_by_values
1226 dims = [x.strip() for x in group_vals["dimensions"].split(",")]
1227 _LOG.debug("group %s: dims=%s", group, dims)
1228 job_groups = job_grouping_func(group, group_vals, group_to_label_subgraph[group])
1230 # Update the workflow
1231 implementation = group_vals.get("implementation", "group")
1232 ordering_type = group_vals.get("ordering_type", "sort")
1233 match (implementation, ordering_type):
1234 case ("noop", "sort"):
1235 self._update_by_noop_sort(group, job_groups)
1236 case ("group", "sort"):
1237 blocking = group_vals.get("blocking", False)
1238 self._update_by_group_sort(group, job_groups, blocking)
1239 case _:
1240 raise RuntimeError(
1241 f"Invalid implementation, ordering_type pair for group ({implementation},"
1242 f" {ordering_type})"
1243 )
1245 def _update_by_noop_sort(self, group: str, dims_to_jobs: dict[tuple[Any, ...], list[str]]) -> None:
1246 """Update portion of workflow for special ordering of jobs using sort.
1248 Parameters
1249 ----------
1250 group : `str`
1251 Ordering group label used for job name and messages.
1252 dims_to_jobs: `dict` [`tuple`[`str`,...], `list` [`str`]]
1253 Mapping of special dimension keys to workflow job names.
1254 The sort for the special ordering is over the keys.
1255 """
1256 noop_names: list[str] = []
1257 prev_name: str | None = None
1258 for dim_key, job_list in sorted(dims_to_jobs.items()):
1259 _LOG.debug("group %s: dim_key=%s", group, dim_key)
1260 noop_name = f"noop_{group}_{'=='.join([str(dk) for dk in dim_key])}"
1261 if noop_names:
1262 prev_name = noop_names[-1]
1263 noop_names.append(noop_name)
1265 self._update_single_noop(job_list, noop_name, group, prev_name)
1267 # As implemented, loop adds one last NOOP job with
1268 # 0 children. Remove it as not useful.
1269 assert self.out_degree(noop_names[-1]) == 0
1270 self.remove_node(noop_names[-1])
1272 def _update_single_noop(
1273 self, job_list: list[str], order_node_name: str, order_label: str, prev_order_name: str | None
1274 ) -> None:
1275 """Update the workflow around the single job making special NOOP
1276 jobs when necessary.
1278 Parameters
1279 ----------
1280 job_list : `list` [`str`]
1281 Current jobs involved in special ordering
1282 order_node_name : `str`
1283 Name for the order NOOP job. If it does not exist in workflow, a
1284 new NOOP job with this name will be created and added.
1285 order_label : `str`
1286 Label for the order NOOP job.
1287 prev_order_name: `str` or None
1288 Name of the previous order NOOP job used to add edge as predecessor
1289 of current job.
1290 """
1291 if order_node_name not in self:
1292 _LOG.debug("Adding new ordering node %s", order_node_name)
1293 order_node = GenericWorkflowNoopJob(order_node_name, order_label)
1294 self.add_job(order_node)
1296 subgraph = DiGraph(self).subgraph(job_list)
1297 sinks = [n for n in job_list if subgraph.out_degree(n) == 0]
1298 for job_name in sinks:
1299 self.add_edge(job_name, order_node_name)
1301 if prev_order_name:
1302 sources = [n for n in job_list if subgraph.in_degree(n) == 0]
1303 for job_name in sources:
1304 _LOG.debug("Adding edge %s to %s", prev_order_name, job_name)
1305 self.add_edge(prev_order_name, job_name)
1308@dataclasses.dataclass(slots=True)
1309class GenericWorkflowGroup(GenericWorkflowNode, GenericWorkflow):
1310 """Node representing a group of jobs. Used for special dependencies.
1312 Parameters
1313 ----------
1314 name : `str`
1315 Name of node. Must be unique within workflow.
1316 label : `str`
1317 Primary user-facing label for job. Does not need to be unique and
1318 may be used for summary reports or to group nodes.
1319 blocking : `bool`
1320 Whether a failure inside group prunes executions of remaining groups.
1321 """
1323 blocking: bool = False
1324 """Whether a failure inside group prunes executions of remaining groups."""
1326 @property
1327 def node_type(self) -> GenericWorkflowNodeType:
1328 """Indicate this is a group job."""
1329 return GenericWorkflowNodeType.GROUP
1331 def __init__(self, name: str, label: str, blocking: bool = False) -> None:
1332 """Initialize each parent class."""
1333 _LOG.debug("%s %s %s", name, label, blocking)
1334 GenericWorkflowNode.__init__(self, name, label)
1335 GenericWorkflow.__init__(self, name)
1336 self.blocking = blocking
1339class GenericWorkflowLabels:
1340 """Label-oriented representation of the GenericWorkflowJobs."""
1342 def __init__(self) -> None:
1343 self._label_graph = DiGraph() # Dependency graph of job labels
1344 self._label_to_jobs: defaultdict[str, list[GenericWorkflowJob]] = defaultdict(
1345 list
1346 ) # mapping job label to list of GenericWorkflowJob
1348 @property
1349 def labels(self) -> list[str]:
1350 """List of job labels (`list` [`str`], read-only)."""
1351 return list(topological_sort(self._label_graph))
1353 @property
1354 def job_counts(self) -> Counter[str]:
1355 """Count of jobs per job label (`collections.Counter`)."""
1356 return Counter({label: len(self._label_to_jobs[label]) for label in self.labels})
1358 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]:
1359 """Retrieve jobs by label from workflow.
1361 Parameters
1362 ----------
1363 label : `str`
1364 Label of jobs to retrieve.
1366 Returns
1367 -------
1368 jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`]
1369 Jobs having given label.
1370 """
1371 return self._label_to_jobs[label]
1373 def add_job(self, job: GenericWorkflowJob, parent_labels: list[str], child_labels: list[str]) -> None:
1374 """Add job's label to labels.
1376 Parameters
1377 ----------
1378 job : `lsst.ctrl.bps.GenericWorkflowJob`
1379 The job to add to the job labels.
1380 parent_labels : `list` [`str`]
1381 Parent job labels.
1382 child_labels : `list` [`str`]
1383 Children job labels.
1384 """
1385 _LOG.debug("job: %s (%s)", job.name, job.label)
1386 _LOG.debug("parent_labels: %s", parent_labels)
1387 _LOG.debug("child_labels: %s", child_labels)
1388 self._label_to_jobs[job.label].append(job)
1389 self._label_graph.add_node(job.label)
1390 for parent in parent_labels:
1391 self._label_graph.add_edge(parent, job.label)
1392 for child in child_labels:
1393 self._label_graph.add_edge(job.label, child)
1395 def add_job_relationships(self, parent_labels: list[str], children_labels: list[str]) -> None:
1396 """Add dependencies between parent and child job labels.
1397 All parents will be connected to all children.
1399 Parameters
1400 ----------
1401 parent_labels : `list` [`str`]
1402 Parent job labels.
1403 children_labels : `list` [`str`]
1404 Children job labels.
1405 """
1406 # Since labels, must ensure not adding edge from label to itself.
1407 edges = [
1408 e
1409 for e in itertools.product(ensure_iterable(parent_labels), ensure_iterable(children_labels))
1410 if e[0] != e[1]
1411 ]
1413 self._label_graph.add_edges_from(edges)
1415 def del_job(self, job: GenericWorkflowJob) -> None:
1416 """Delete job and its label from job labels.
1418 Parameters
1419 ----------
1420 job : `lsst.ctrl.bps.GenericWorkflowJob`
1421 The job to delete from the job labels.
1422 """
1423 self._label_to_jobs[job.label].remove(job)
1424 # Don't leave keys around if removed last job
1425 if not self._label_to_jobs[job.label]:
1426 del self._label_to_jobs[job.label]
1428 parents = self._label_graph.predecessors(job.label)
1429 children = self._label_graph.successors(job.label)
1430 self._label_graph.remove_node(job.label)
1431 self._label_graph.add_edges_from(
1432 itertools.product(ensure_iterable(parents), ensure_iterable(children))
1433 )
1435 def subgraph(self, labels: Iterable[str]) -> DiGraph:
1436 """Create subgraph of workflow label graph with given labels.
1438 Parameters
1439 ----------
1440 labels : Iterable [`str`]
1441 Labels to appear in subgraph.
1443 Returns
1444 -------
1445 subgraph : `networkx.DiGraph`
1446 Subgraph of workflow label graph with given labels.
1447 """
1448 return self._label_graph.subgraph(labels)