Coverage for python / lsst / ctrl / bps / generic_workflow.py: 28%
555 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Class definitions for a Generic Workflow Graph."""
30__all__ = [
31 "GenericWorkflow",
32 "GenericWorkflowExec",
33 "GenericWorkflowFile",
34 "GenericWorkflowGroup",
35 "GenericWorkflowJob",
36 "GenericWorkflowNode",
37 "GenericWorkflowNodeType",
38 "GenericWorkflowNoopJob",
39]
42import dataclasses
43import itertools
44import logging
45import pickle
46from collections import Counter, defaultdict
47from collections.abc import Iterable, Iterator
48from enum import IntEnum, auto
49from typing import IO, Any, BinaryIO, Literal, cast, overload
51from networkx import DiGraph, topological_sort
52from networkx.algorithms.dag import is_directed_acyclic_graph
54from lsst.utils.iteration import ensure_iterable
56from .bps_draw import draw_networkx_dot
57from .bps_utils import subset_dimension_values
59_LOG = logging.getLogger(__name__)
62@dataclasses.dataclass(slots=True)
63class GenericWorkflowFile:
64 """Information about a file that may be needed by various workflow
65 management services.
66 """
68 name: str
69 """Lookup key (logical file name) of file/directory. Must be unique
70 within run.
71 """
73 src_uri: str | None = None # don't know that need ResourcePath
74 """Original location of file/directory.
75 """
77 wms_transfer: bool = False
78 """Whether the WMS should ignore file or not. Default is False.
79 """
81 job_access_remote: bool = False
82 """Whether the job can remotely access file (using separately specified
83 file access protocols). Default is False.
84 """
86 job_shared: bool = False
87 """Whether job requires its own copy of this file. Default is False.
88 """
90 def __hash__(self) -> int:
91 return hash(self.name)
94@dataclasses.dataclass(slots=True)
95class GenericWorkflowExec:
96 """Information about an executable that may be needed by various workflow
97 management services.
98 """
100 name: str
101 """Lookup key (logical file name) of executable. Must be unique
102 within run.
103 """
105 src_uri: str | None = None # don't know that need ResourcePath
106 """Original location of executable.
107 """
109 transfer_executable: bool = False
110 """Whether the WMS/plugin is responsible for staging executable to
111 location usable by job.
112 """
114 def __hash__(self) -> int:
115 return hash(self.name)
118class GenericWorkflowNodeType(IntEnum):
119 """Type of valid types for nodes in the GenericWorkflow."""
121 NOOP = auto()
122 """Does nothing, but enforces special dependencies."""
124 PAYLOAD = auto()
125 """Typical workflow job."""
127 GROUP = auto()
128 """A special group (subdag) of jobs."""
131@dataclasses.dataclass(slots=True)
132class GenericWorkflowNode:
133 """Base class for nodes in the GenericWorkflow."""
135 name: str
136 """Name of node. Must be unique within workflow."""
138 label: str
139 """"Primary user-facing label for job. Does not need to be unique and
140 may be used for summary reports or to group nodes."""
142 def __hash__(self) -> int:
143 return hash(self.name)
145 @property
146 def node_type(self) -> GenericWorkflowNodeType:
147 """Type of node."""
148 raise NotImplementedError(f"{type(self).__name__} needs to override node_type.")
151@dataclasses.dataclass(slots=True)
152class GenericWorkflowNoopJob(GenericWorkflowNode):
153 """Job that does no work. Used for special dependencies."""
155 @property
156 def node_type(self) -> GenericWorkflowNodeType:
157 """Indicate this is a noop job."""
158 return GenericWorkflowNodeType.NOOP
161@dataclasses.dataclass(slots=True)
162class GenericWorkflowJob(GenericWorkflowNode):
163 """Information about a job that may be needed by various workflow
164 management services.
165 """
167 quanta_counts: Counter[str] = dataclasses.field(default_factory=Counter)
168 """Counts of quanta per task label in job.
169 """
171 tags: dict[str, Any] = dataclasses.field(default_factory=dict)
172 """Other key/value pairs for job that user may want to use as a filter.
173 """
175 executable: GenericWorkflowExec | None = None
176 """Executable for job.
177 """
179 arguments: str | None = None
180 """Command line arguments for job.
181 """
183 cmdvals: dict[str, Any] = dataclasses.field(default_factory=dict)
184 """Values for variables in cmdline when using lazy command line creation.
185 """
187 memory_multiplier: float | None = None
188 """Memory growth rate between retries.
189 """
191 request_memory: int | None = None # MB
192 """Max memory (in MB) that the job is expected to need.
193 """
195 request_memory_max: int | None = None # MB
196 """Max memory (in MB) that the job should ever use.
197 """
199 request_cpus: int | None = None # cores
200 """Max number of cpus that the job is expected to need.
201 """
203 request_disk: int | None = None # MB
204 """Max amount of job scratch disk (in MB) that the job is expected to need.
205 """
207 request_walltime: str | None = None # minutes
208 """Max amount of time (in seconds) that the job is expected to need.
209 """
211 compute_site: str | None = None
212 """Key to look up site-specific information for running the job.
213 """
215 accounting_group: str | None = None
216 """Name of the accounting group to use.
217 """
219 accounting_user: str | None = None
220 """Name of the user to use for accounting purposes.
221 """
223 mail_to: str | None = None
224 """Comma separated list of email addresses for emailing job status.
225 """
227 when_to_mail: str | None = None
228 """WMS-specific terminology for when to email job status.
229 """
231 number_of_retries: int | None = None
232 """Number of times to automatically retry a failed job.
233 """
235 retry_unless_exit: int | list[int] | None = None
236 """Exit code(s) for job that means to not automatically retry.
237 """
239 abort_on_value: int | None = None
240 """Job exit value for signals to abort the entire workflow.
241 """
243 abort_return_value: int | None = None
244 """Exit value to use when aborting the entire workflow.
245 """
247 priority: str | None = None
248 """Initial priority of job in WMS-format.
249 """
251 category: str | None = None
252 """WMS-facing label of job within single workflow (e.g., can be used for
253 throttling jobs within a single workflow).
254 """
256 concurrency_limit: str | None = None
257 """Names of concurrency limits that the WMS plugin can appropriately
258 translate to limit the number of this job across all running workflows.
259 """
261 queue: str | None = None
262 """Name of queue to use. Different WMS can translate this concept
263 differently.
264 """
266 pre_cmdline: str | None = None
267 """Command line to be executed prior to executing job.
268 """
270 post_cmdline: str | None = None
271 """Command line to be executed after job executes.
273 Should be executed regardless of exit status.
274 """
276 preemptible: bool | None = None
277 """The flag indicating whether the job can be preempted.
278 """
280 profile: dict[str, Any] = dataclasses.field(default_factory=dict)
281 """Nested dictionary of WMS-specific key/value pairs with primary key being
282 WMS key (e.g., pegasus, condor, panda).
283 """
285 attrs: dict[str, Any] = dataclasses.field(default_factory=dict)
286 """Key/value pairs of job attributes (for WMS that have attributes in
287 addition to commands).
288 """
290 environment: dict[str, Any] = dataclasses.field(default_factory=dict)
291 """Environment variable names and values to be explicitly set inside job.
292 """
294 compute_cloud: str | None = None
295 """Key to look up cloud-specific information for running the job.
296 """
298 @property
299 def node_type(self) -> GenericWorkflowNodeType:
300 """Indicate this is a payload job."""
301 return GenericWorkflowNodeType.PAYLOAD
304class GenericWorkflow(DiGraph):
305 """A generic representation of a workflow used to submit to specific
306 workflow management systems.
308 Parameters
309 ----------
310 name : `str`
311 Name of generic workflow.
312 incoming_graph_data : `~typing.Any`, optional
313 Data used to initialized graph that is passed through to DiGraph
314 constructor. Can be any type supported by networkx.DiGraph.
315 **attr : `dict`
316 Keyword arguments passed through to DiGraph constructor.
317 """
319 def __init__(self, name: str, incoming_graph_data: Any | None = None, **attr: Any) -> None:
320 super().__init__(incoming_graph_data, **attr)
321 self._name = name
322 self.run_attrs: dict[str, str] = {}
323 self._job_labels = GenericWorkflowLabels()
324 self._files: dict[str, GenericWorkflowFile] = {}
325 self._executables: dict[str, GenericWorkflowExec] = {}
326 self._inputs: dict[
327 str, list[GenericWorkflowFile]
328 ] = {} # mapping job.names to list of GenericWorkflowFile
329 self._outputs: dict[
330 str, list[GenericWorkflowFile]
331 ] = {} # mapping job.names to list of GenericWorkflowFile
332 self.run_id = None
333 self._final: GenericWorkflowJob | GenericWorkflow | None = None
335 # Starting from ver. 3.6 of NetworkX, the DiGraph class defines its custom
336 # __new__ method that explicitly defines arguments it accepts. As a result,
337 # we need to override it to let our subclass use different ones.
338 #
339 # Notes
340 # -----
341 # Most likely overriding __new__ in this manner will prevent us from using
342 # different graph backends with our subclass. However, since we are not
343 # using any backends, this should not be a problem at the moment.
344 def __new__(cls, *args, **kwargs) -> "GenericWorkflow":
345 return object.__new__(cls)
347 @property
348 def name(self) -> str:
349 """Retrieve name of generic workflow.
351 Returns
352 -------
353 name : `str`
354 Name of generic workflow.
355 """
356 return self._name
358 @property
359 def quanta_counts(self) -> Counter[str]:
360 """Count of quanta per task label (`collections.Counter`)."""
361 qcounts: Counter[str] = Counter()
362 for job_name in self:
363 gwjob = self.get_job(job_name)
364 if hasattr(gwjob, "quanta_counts"):
365 qcounts += gwjob.quanta_counts
366 return qcounts
368 @property
369 def labels(self) -> list[str]:
370 """Job labels (`list` [`str`], read-only)."""
371 return self._job_labels.labels
373 def regenerate_labels(self) -> None:
374 """Regenerate the list of job labels."""
375 self._job_labels = GenericWorkflowLabels()
376 for job_name in self:
377 job = self.get_job(job_name)
378 if job.node_type == GenericWorkflowNodeType.PAYLOAD:
379 job = cast(GenericWorkflowJob, job)
380 parents_labels: list[str] = []
381 for parent_name in self.predecessors(job.name):
382 parent_job = self.get_job(parent_name)
383 if parent_job.node_type == GenericWorkflowNodeType.PAYLOAD:
384 # parent_job = cast(GenericWorkflowJob, parent_job)
385 parents_labels.append(parent_job.label)
386 children_labels: list[str] = []
387 for child_name in self.successors(job.name):
388 child_job = self.get_job(child_name)
389 if child_job.node_type == GenericWorkflowNodeType.PAYLOAD:
390 # child_job = cast(GenericWorkflowJob, child_job)
391 children_labels.append(child_job.label)
392 self._job_labels.add_job(job, parents_labels, children_labels)
394 @property
395 def job_counts(self) -> Counter[str]:
396 """Count of jobs per job label (`collections.Counter`)."""
397 jcounts = self._job_labels.job_counts
399 # Final is separate
400 final = self.get_final()
401 if final:
402 if isinstance(final, GenericWorkflow):
403 jcounts.update(final.job_counts)
404 else:
405 jcounts[final.label] += 1
407 return jcounts
409 def __iter__(self) -> Iterator[str]:
410 """Return iterator of job names in topologically sorted order."""
411 return topological_sort(self)
413 @overload
414 def get_files(self, data: Literal[False], transfer_only: bool = True) -> list[str]: ... 414 ↛ exitline 414 didn't return from function 'get_files' because
416 @overload
417 def get_files(self, data: Literal[True], transfer_only: bool = True) -> list[GenericWorkflowFile]: ... 417 ↛ exitline 417 didn't return from function 'get_files' because
419 def get_files(
420 self, data: bool = False, transfer_only: bool = True
421 ) -> list[GenericWorkflowFile] | list[str]:
422 """Retrieve files from generic workflow.
424 Need API in case change way files are stored (e.g., make
425 workflow a bipartite graph with jobs and files nodes).
427 Parameters
428 ----------
429 data : `bool`, optional
430 Whether to return the file data as well as the file object name
431 (The default is `False`).
432 transfer_only : `bool`, optional
433 Whether to only return files for which a workflow management system
434 would be responsible for transferring.
436 Returns
437 -------
438 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`]
439 File names or objects from generic workflow meeting specifications.
440 """
441 files: list[Any] = [] # Any for mypy to allow the different append lines.
442 for filename, file in self._files.items():
443 if not transfer_only or file.wms_transfer:
444 if not data:
445 files.append(filename)
446 else:
447 files.append(file)
448 return files
450 def add_job(
451 self,
452 job: GenericWorkflowNode,
453 parent_names: str | list[str] | None = None,
454 child_names: str | list[str] | None = None,
455 ) -> None:
456 """Add job to generic workflow.
458 Parameters
459 ----------
460 job : `lsst.ctrl.bps.GenericWorkflowNode`
461 Job to add to the generic workflow.
462 parent_names : `str` | `list` [`str`], optional
463 Names of jobs that are parents of given job.
464 child_names : `str` | `list` [`str`], optional
465 Names of jobs that are children of given job.
466 """
467 _LOG.debug("job: %s (%s)", job.name, job.label)
468 _LOG.debug("parent_names: %s", parent_names)
469 _LOG.debug("child_names: %s", child_names)
470 if not isinstance(job, GenericWorkflowNode):
471 raise RuntimeError(f"Invalid type for job to be added to GenericWorkflowGraph ({type(job)}).")
472 if self.has_node(job.name):
473 raise RuntimeError(f"Job {job.name} already exists in GenericWorkflowGraph.")
474 super().add_node(job.name, job=job)
475 self.add_job_relationships(parent_names, job.name)
476 self.add_job_relationships(job.name, child_names)
477 if job.node_type == GenericWorkflowNodeType.PAYLOAD:
478 job = cast(GenericWorkflowJob, job)
479 self.add_executable(job.executable)
480 self._job_labels.add_job(
481 job,
482 [self.get_job(p).label for p in self.predecessors(job.name)],
483 [self.get_job(p).label for p in self.successors(job.name)],
484 )
486 def add_node(self, node_for_adding: GenericWorkflowNode, **attr: Any) -> None:
487 """Override networkx function to call more specific add_job function.
489 Parameters
490 ----------
491 node_for_adding : `lsst.ctrl.bps.GenericWorkflowJob`
492 Job to be added to generic workflow.
493 **attr : `~typing.Any`
494 Needed to match original networkx function, but not used.
495 """
496 self.add_job(node_for_adding)
498 def add_job_relationships(
499 self, parents: str | list[str] | None, children: str | list[str] | None
500 ) -> None:
501 """Add dependencies between parent and child jobs. All parents will
502 be connected to all children.
504 Parameters
505 ----------
506 parents : `str` or `list` [`str`], optional
507 Parent job names.
508 children : `str` or `list` [`str`], optional
509 Children job names.
510 """
511 # Allow this to be a noop if no parents or no children
512 if parents is not None and children is not None:
513 self.add_edges_from(itertools.product(ensure_iterable(parents), ensure_iterable(children)))
514 self._job_labels.add_job_relationships(
515 [self.get_job(n).label for n in ensure_iterable(parents)],
516 [self.get_job(n).label for n in ensure_iterable(children)],
517 )
519 def add_edges_from(self, ebunch_to_add: Iterable[tuple[str, str]], **attr: Any) -> None:
520 """Add several edges between jobs in the generic workflow.
522 Parameters
523 ----------
524 ebunch_to_add : Iterable [`tuple` [`str`, `str`]]
525 Iterable of job name pairs between which a dependency should be
526 saved.
527 **attr : `~typing.Any`
528 Data can be assigned using keyword arguments (not currently used).
529 """
530 for edge_to_add in ebunch_to_add:
531 self.add_edge(edge_to_add[0], edge_to_add[1], **attr)
533 def add_edge(self, u_of_edge: str, v_of_edge: str, **attr: Any) -> None:
534 """Add edge connecting jobs in workflow.
536 Parameters
537 ----------
538 u_of_edge : `str`
539 Name of parent job.
540 v_of_edge : `str`
541 Name of child job.
542 **attr
543 Attributes to save with edge.
544 """
545 if u_of_edge not in self:
546 raise RuntimeError(f"{u_of_edge} not in GenericWorkflow")
547 if v_of_edge not in self:
548 raise RuntimeError(f"{v_of_edge} not in GenericWorkflow")
549 super().add_edge(u_of_edge, v_of_edge, **attr)
551 def get_job(self, job_name: str) -> GenericWorkflowNode:
552 """Retrieve job by name from workflow.
554 Parameters
555 ----------
556 job_name : `str`
557 Name of job to retrieve.
559 Returns
560 -------
561 job : `lsst.ctrl.bps.GenericWorkflowNode`
562 Job matching given job_name.
563 """
564 return self.nodes[job_name]["job"]
566 def del_job(self, job_name: str) -> None:
567 """Delete job from generic workflow leaving connected graph.
569 Parameters
570 ----------
571 job_name : `str`
572 Name of job to delete from workflow.
573 """
574 job = self.get_job(job_name)
576 # Remove from job labels
577 if isinstance(job, GenericWorkflowJob):
578 self._job_labels.del_job(job)
580 # Connect all parent jobs to all children jobs.
581 parents = list(self.predecessors(job_name))
582 children = list(self.successors(job_name))
583 self.add_job_relationships(parents, children)
585 # Delete job node (which deletes edges).
586 self.remove_node(job_name)
588 def add_job_inputs(self, job_name: str, files: GenericWorkflowFile | list[GenericWorkflowFile]) -> None:
589 """Add files as inputs to specified job.
591 Parameters
592 ----------
593 job_name : `str`
594 Name of job to which inputs should be added.
595 files : `lsst.ctrl.bps.GenericWorkflowFile` or \
596 `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
597 File object(s) to be added as inputs to the specified job.
598 """
599 self._inputs.setdefault(job_name, [])
600 for file in ensure_iterable(files):
601 # Save the central copy
602 if file.name not in self._files:
603 self._files[file.name] = file
605 # Save the job reference to the file
606 self._inputs[job_name].append(file)
608 def get_file(self, name: str) -> GenericWorkflowFile:
609 """Retrieve a file object by name.
611 Parameters
612 ----------
613 name : `str`
614 Name of file object.
616 Returns
617 -------
618 gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
619 File matching given name.
620 """
621 return self._files[name]
623 def add_file(self, gwfile: GenericWorkflowFile) -> None:
624 """Add file object.
626 Parameters
627 ----------
628 gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
629 File object to add to workflow.
630 """
631 if gwfile.name not in self._files:
632 self._files[gwfile.name] = gwfile
633 else:
634 _LOG.debug("Skipped add_file for existing file %s", gwfile.name)
636 @overload
637 def get_job_inputs( 637 ↛ exitline 637 didn't return from function 'get_job_inputs' because
638 self, job_name: str, data: Literal[False], transfer_only: bool = False
639 ) -> list[str]: ...
641 @overload
642 def get_job_inputs( 642 ↛ exitline 642 didn't return from function 'get_job_inputs' because
643 self, job_name: str, data: Literal[True], transfer_only: bool = False
644 ) -> list[GenericWorkflowFile]: ...
646 def get_job_inputs(
647 self, job_name: str, data: bool = True, transfer_only: bool = False
648 ) -> list[GenericWorkflowFile] | list[str]:
649 """Return the input files for the given job.
651 Parameters
652 ----------
653 job_name : `str`
654 Name of the job.
655 data : `bool`, optional
656 Whether to return the file data as well as the file object name.
657 transfer_only : `bool`, optional
658 Whether to only return files for which a workflow management system
659 would be responsible for transferring.
661 Returns
662 -------
663 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or `list` [`str`]
664 Input files for the given job. If no input files for the job,
665 returns an empty list.
666 """
667 inputs: list[Any] = [] # Any for mypy to allow the different append lines.
668 if job_name in self._inputs:
669 for gwfile in self._inputs[job_name]:
670 if not transfer_only or gwfile.wms_transfer:
671 if not data:
672 inputs.append(gwfile.name)
673 else:
674 inputs.append(gwfile)
675 return inputs
677 def add_job_outputs(self, job_name: str, files: list[GenericWorkflowFile]) -> None:
678 """Add output files to a job.
680 Parameters
681 ----------
682 job_name : `str`
683 Name of job to which the files should be added as outputs.
684 files : `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
685 File objects to be added as outputs for specified job.
686 """
687 self._outputs.setdefault(job_name, [])
689 for file_ in ensure_iterable(files):
690 # Save the central copy
691 if file_.name not in self._files:
692 self._files[file_.name] = file_
694 # Save the job reference to the file
695 self._outputs[job_name].append(file_)
697 @overload
698 def get_job_outputs( 698 ↛ exitline 698 didn't return from function 'get_job_outputs' because
699 self, job_name: str, data: Literal[False], transfer_only: bool = False
700 ) -> list[str]: ...
702 @overload
703 def get_job_outputs( 703 ↛ exitline 703 didn't return from function 'get_job_outputs' because
704 self, job_name: str, data: Literal[True], transfer_only: bool = False
705 ) -> list[GenericWorkflowFile]: ...
707 def get_job_outputs(
708 self, job_name: str, data: bool = True, transfer_only: bool = False
709 ) -> list[GenericWorkflowFile] | list[str]:
710 """Return the output files for the given job.
712 Parameters
713 ----------
714 job_name : `str`
715 Name of the job.
716 data : `bool`
717 Whether to return the file data as well as the file object name.
718 It defaults to `True` thus returning file data as well.
719 transfer_only : `bool`
720 Whether to only return files for which a workflow management system
721 would be responsible for transferring. It defaults to `False` thus
722 returning all output files.
724 Returns
725 -------
726 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] or \
727 `list` [`str`]
728 Output files for the given job. If no output files for the job,
729 returns an empty list.
730 """
731 outputs: list[Any] = [] # Any for mypy to allow the different append lines.
732 if not data:
733 outputs = cast(list[str], outputs)
734 else:
735 outputs = cast(list[GenericWorkflowFile], outputs)
737 if job_name in self._outputs:
738 for gwfile in self._outputs[job_name]:
739 if not transfer_only or gwfile.wms_transfer:
740 if not data:
741 outputs.append(gwfile.name)
742 else:
743 outputs.append(gwfile)
744 return outputs
746 def draw(self, stream: str | IO[str], format_: str = "dot") -> None:
747 """Output generic workflow in a visualization format.
749 Parameters
750 ----------
751 stream : `str` or `io.BufferedIOBase`
752 Stream to which the visualization should be written.
753 format_ : `str`, optional
754 Which visualization format to use. It defaults to the format for
755 the dot program.
756 """
757 draw_funcs = {"dot": draw_networkx_dot}
758 if format_ in draw_funcs:
759 draw_funcs[format_](self, stream)
760 else:
761 raise RuntimeError(f"Unknown draw format ({format_})")
763 def save(self, stream: str | IO[bytes], format_: str = "pickle") -> None:
764 """Save the generic workflow in a format that is loadable.
766 Parameters
767 ----------
768 stream : `str` or `io.BufferedIOBase`
769 Stream to pass to the format-specific writer. Accepts anything
770 that the writer accepts.
771 format_ : `str`, optional
772 Format in which to write the data. It defaults to pickle format.
773 """
774 if format_ == "pickle":
775 stream = cast(BinaryIO, stream)
776 pickle.dump(self, stream)
777 else:
778 raise RuntimeError(f"Unknown format ({format_})")
780 @classmethod
781 def load(cls, stream: str | IO[bytes], format_: str = "pickle") -> "GenericWorkflow":
782 """Load a GenericWorkflow from the given stream.
784 Parameters
785 ----------
786 stream : `str` or `io.BufferedIOBase`
787 Stream to pass to the format-specific loader. Accepts anything that
788 the loader accepts.
789 format_ : `str`, optional
790 Format of data to expect when loading from stream. It defaults
791 to pickle format.
793 Returns
794 -------
795 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
796 Generic workflow loaded from the given stream.
797 """
798 if format_ == "pickle":
799 stream = cast(BinaryIO, stream)
800 object_ = pickle.load(stream)
801 assert isinstance(object_, GenericWorkflow) # for mypy
802 return object_
804 raise RuntimeError(f"Unknown format ({format_})")
806 def validate(self) -> None:
807 """Run checks to ensure that the generic workflow graph is valid."""
808 # Make sure a directed acyclic graph
809 assert is_directed_acyclic_graph(self)
811 def add_workflow_source(self, workflow: "GenericWorkflow") -> None:
812 """Add given workflow as new source to this workflow.
814 Parameters
815 ----------
816 workflow : `lsst.ctrl.bps.GenericWorkflow`
817 The given workflow.
818 """
819 # Find source nodes in self.
820 self_sources = [n for n in self if self.in_degree(n) == 0]
821 _LOG.debug("self_sources = %s", self_sources)
823 # Find sink nodes of workflow.
824 new_sinks = [n for n in workflow if workflow.out_degree(n) == 0]
825 _LOG.debug("new sinks = %s", new_sinks)
827 # Add new workflow nodes to self graph and make new edges.
828 self.add_nodes_from(workflow.nodes(data=True))
829 self.add_edges_from(workflow.edges())
830 for source in self_sources:
831 for sink in new_sinks:
832 self.add_edge(sink, source)
834 # Add separately stored info
835 for job_name in workflow:
836 job = self.get_job(job_name)
837 # Add job labels
838 if isinstance(job, GenericWorkflowJob):
839 self._job_labels.add_job(
840 job,
841 [self.get_job(p).label for p in self.predecessors(job.name)],
842 [self.get_job(p).label for p in self.successors(job.name)],
843 )
844 # Executables are stored separately so copy them.
845 self.add_executable(job.executable)
847 # Files are stored separately so copy them.
848 self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True))
849 self.add_job_outputs(job_name, workflow.get_job_outputs(job_name, data=True))
851 def add_final(self, final: "GenericWorkflowJob | GenericWorkflow") -> None:
852 """Add special final job/workflow to the generic workflow.
854 Parameters
855 ----------
856 final : `lsst.ctrl.bps.GenericWorkflowJob` or \
857 `lsst.ctrl.bps.GenericWorkflow`
858 Information needed to execute the special final job(s), the
859 job(s) to be executed after all jobs that can be executed
860 have been executed regardless of exit status of any of the
861 jobs.
862 """
863 if not isinstance(final, GenericWorkflowJob) and not isinstance(final, GenericWorkflow):
864 raise TypeError("Invalid type for GenericWorkflow final ({type(final)})")
866 self._final = final
867 if isinstance(final, GenericWorkflowJob):
868 self.add_executable(final.executable)
870 def get_final(self) -> "GenericWorkflowJob | GenericWorkflow | None":
871 """Return job/workflow to be executed after all jobs that can be
872 executed have been executed regardless of exit status of any of
873 the jobs.
875 Returns
876 -------
877 final : `lsst.ctrl.bps.GenericWorkflowJob` or \
878 `lsst.ctrl.bps.GenericWorkflow`
879 Information needed to execute final job(s).
880 """
881 return self._final
883 def add_executable(self, executable: GenericWorkflowExec | None) -> None:
884 """Add executable to workflow's list of executables.
886 Parameters
887 ----------
888 executable : `lsst.ctrl.bps.GenericWorkflowExec`
889 Executable object to be added to workflow.
890 """
891 if executable is not None:
892 self._executables[executable.name] = executable
893 else:
894 _LOG.warning("executable not specified (None); cannot add to the workflow's list of executables")
896 @overload
897 def get_executables(self, data: Literal[False], transfer_only: bool = False) -> list[str]: ... 897 ↛ exitline 897 didn't return from function 'get_executables' because
899 @overload
900 def get_executables( 900 ↛ exitline 900 didn't return from function 'get_executables' because
901 self, data: Literal[True], transfer_only: bool = False
902 ) -> list[GenericWorkflowExec]: ...
904 def get_executables(
905 self, data: bool = False, transfer_only: bool = True
906 ) -> list[GenericWorkflowExec] | list[str]:
907 """Retrieve executables from generic workflow.
909 Parameters
910 ----------
911 data : `bool`, optional
912 Whether to return the executable data as well as the exec object
913 name (The defaults is False).
914 transfer_only : `bool`, optional
915 Whether to only return executables for which transfer_executable
916 is True.
918 Returns
919 -------
920 execs : `list` [`lsst.ctrl.bps.GenericWorkflowExec`] or `list` [`str`]
921 Filtered executable names or objects from generic workflow.
922 """
923 execs: list[Any] = [] # This and cast lines for mypy
924 if not data:
925 execs = cast(list[str], execs)
926 else:
927 execs = cast(list[GenericWorkflowExec], execs)
929 for name, executable in self._executables.items():
930 if not transfer_only or executable.transfer_executable:
931 if not data:
932 execs.append(name)
933 else:
934 execs.append(executable)
935 return execs
937 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]:
938 """Retrieve jobs by label from workflow.
940 Parameters
941 ----------
942 label : `str`
943 Label of jobs to retrieve.
945 Returns
946 -------
947 jobs : list[`lsst.ctrl.bps.GenericWorkflowNode`]
948 Jobs having given label.
949 """
950 return self._job_labels.get_jobs_by_label(label)
952 def _check_job_ordering_config(self, ordering_config: dict[str, Any]) -> dict[str, DiGraph]:
953 """Check configuration related to job ordering.
955 Parameters
956 ----------
957 ordering_config : `dict` [`str`, `~typing.Any`]
958 Job ordering configuration to check.
960 Returns
961 -------
962 group_to_label_subgraph : `dict` [`str`, `network.DiGraph`]
963 Mapping of group name to a graph of the job labels in the group.
964 """
965 group_to_label_subgraph = {}
966 job_label_to_group: dict[str, str] = {} # Checking label appears only in one group
967 for group, group_vals in ordering_config.items():
968 implementation = group_vals.get("implementation", "group")
969 if implementation not in ["noop", "group"]:
970 raise RuntimeError(f"Invalid implementation for {group}: {implementation}")
971 ordering_type = group_vals.get("ordering_type", "sort")
972 if ordering_type != "sort":
973 raise RuntimeError(f"Invalid ordering_type for {group}: {ordering_type}")
974 if "dimensions" not in group_vals:
975 raise KeyError(f"Missing dimensions entry in ordering group {group}")
977 job_labels = [x.strip() for x in group_vals["labels"].split(",")]
978 _LOG.debug("group %s: job_labels=%s", group, job_labels)
979 unused_labels = []
980 for job_label in job_labels:
981 if job_label not in self._job_labels.labels:
982 unused_labels.append(job_label)
983 elif job_label in job_label_to_group:
984 raise RuntimeError(
985 f"Job label {job_label} appears in more than one job ordering group "
986 f"({group} {job_label_to_group[job_label]})"
987 )
988 else:
989 job_label_to_group[job_label] = group
991 if unused_labels:
992 _LOG.info("Workflow job labels = %s", ",".join(self._job_labels.labels))
993 raise RuntimeError(
994 f"Job label(s) ({','.join(unused_labels)}) from job ordering group "
995 f"{group} does not exist in workflow. Aborting."
996 )
998 label_subgraph = self._job_labels.subgraph(job_labels)
999 group_to_label_subgraph[group] = label_subgraph
1001 return group_to_label_subgraph
1003 def _group_jobs_by_values(
1004 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph
1005 ) -> dict[tuple[Any, ...], list[str]]:
1006 """Create job mapping of special sortable dimension key
1007 to job name by comparing dimension values.
1009 Parameters
1010 ----------
1011 group : `str`
1012 Name of group for which creating mapping.
1013 group_config : `dict` [`str`, `~typing.Any`]
1014 Config for group for which creating mapping.
1015 label_subgraph : `networkx.DiGraph`
1016 The graph of job labels to be used in mapping.
1018 Returns
1019 -------
1020 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1021 Mapping of dimensions to job names.
1022 """
1023 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {}
1024 for job_label in label_subgraph:
1025 jobs = self.get_jobs_by_label(job_label)
1026 for job in jobs:
1027 job_dim_values = subset_dimension_values(
1028 f"Job {job.name}",
1029 f"order group {group}",
1030 group_config["dimensions"],
1031 job.tags,
1032 group_config.get("equalDimensions", None),
1033 )
1034 job_dims = []
1035 for dim in [d.strip() for d in group_config["dimensions"].split(",")]:
1036 job_dims.append(job_dim_values[dim])
1037 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), [])
1038 dims_job_list.append(job.name)
1039 return dims_to_jobs
1041 def _group_jobs_by_dependencies(
1042 self, group: str, group_config: dict[str, Any], label_subgraph: DiGraph
1043 ) -> dict[tuple[Any, ...], list[str]]:
1044 """Create job mapping of special sortable dimension key
1045 to job name by following dependencies.
1047 Parameters
1048 ----------
1049 group : `str`
1050 Name of group for which creating mapping.
1051 group_config : `dict` [`str`, `~typing.Any`]
1052 Config for group for which creating mapping.
1053 label_subgraph : `networkx.DiGraph`
1054 The graph of job labels to be used in mapping.
1056 Returns
1057 -------
1058 dims_to_jobs : `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1059 Mapping of dimensions to job names.
1060 """
1061 method = group_config["findDependencyMethod"]
1062 dim_labels = list(topological_sort(label_subgraph))
1063 match method:
1064 case "source":
1065 find_potential_jobs = self.successors
1066 case "sink":
1067 find_potential_jobs = self.predecessors
1068 dim_labels.reverse()
1069 case _:
1070 raise RuntimeError(f"Invalid findDependencyMethod ({method})")
1072 jobs_seen: set[str] = set()
1073 dims_to_jobs: dict[tuple[Any, ...], list[str]] = {}
1074 for label in dim_labels:
1075 jobs = self.get_jobs_by_label(label)
1076 for job in jobs:
1077 if job.name not in jobs_seen:
1078 jobs_seen.add(job.name)
1079 job_dim_values = subset_dimension_values(
1080 f"Job {job.name}",
1081 f"order group {group}",
1082 group_config["dimensions"],
1083 job.tags,
1084 group_config.get("equalDimensions", None),
1085 )
1086 job_dims = []
1087 for dim in [d.strip() for d in group_config["dimensions"].split(",")]:
1088 job_dims.append(job_dim_values[dim])
1089 dims_job_list = dims_to_jobs.setdefault(tuple(job_dims), [])
1090 dims_job_list.append(job.name)
1091 # Use dependencies to find other quantum to add
1092 # Note: in testing, using the following code was faster
1093 # than using networkx descendants and ancestors functions
1094 # While traversing the subgraph, nodes may appear
1095 # repeatedly in potential_jobs.
1096 jobs_to_use = [job]
1097 while jobs_to_use:
1098 job_to_use = jobs_to_use.pop()
1100 potential_job_names = find_potential_jobs(job_to_use.name)
1101 for potential_job_name in potential_job_names:
1102 potential_job = cast(GenericWorkflowJob, self.get_job(potential_job_name))
1103 if potential_job.label in label_subgraph:
1104 if potential_job.name not in dims_job_list:
1105 _LOG.debug(
1106 "Adding potential job %s (%s) to group %s",
1107 potential_job.name,
1108 potential_job.label,
1109 group,
1110 )
1111 dims_job_list.append(potential_job.name)
1112 jobs_to_use.append(potential_job)
1113 jobs_seen.add(potential_job.name)
1114 else:
1115 _LOG.debug(
1116 "label (%s) not in ordered_tasks. Not adding potential quantum %s",
1117 potential_job.label,
1118 potential_job.name,
1119 )
1120 return dims_to_jobs
1122 def _update_by_group_sort(
1123 self,
1124 group: str,
1125 dims_to_jobs: dict[tuple[Any, ...], list[str]],
1126 blocking: bool = False,
1127 ) -> None:
1128 """Update portion of workflow for special job ordering using sort.
1130 Parameters
1131 ----------
1132 group : `str`
1133 Ordering group label used for job name and messages.
1134 dims_to_jobs: `dict` [`tuple` [`~typing.Any`, ...], `list` [`str`]]
1135 Mapping of special dimension keys to workflow job names.
1136 The sort for the special ordering is over the keys.
1137 blocking: `bool`
1138 Whether a failure in a group blocks execution of remaining groups.
1139 """
1140 group_job_names: list[str] = []
1141 for dim_key, job_list in sorted(dims_to_jobs.items()):
1142 _LOG.debug("group %s: dim_key=%s", group, dim_key)
1143 group_job_name = f"group_{group}_{'=='.join([str(dk) for dk in dim_key])}"
1144 prev_name = group_job_names[-1] if group_job_names else None
1145 group_job_names.append(group_job_name)
1147 self._replace_subgraph_with_job_group(group_job_name, group, job_list, blocking)
1149 # ordering between groups
1150 if prev_name:
1151 self.add_edge(prev_name, group_job_name)
1153 def _replace_subgraph_with_job_group(
1154 self, group_name: str, group_label: str, job_list: list[str], blocking: bool
1155 ) -> None:
1156 """Update portion of workflow for special job ordering using groups.
1158 Parameters
1159 ----------
1160 group_name : `str`
1161 Ordering group name.
1162 group_label : `str`
1163 Ordering group label.
1164 job_list: `list` [`str`]
1165 List of job names to put in the group
1166 blocking: `bool`
1167 Whether a failure in a group blocks execution of remaining groups.
1168 """
1169 job_group = GenericWorkflowGroup(group_name, group_label, blocking=blocking)
1170 self.add_node(job_group)
1172 # Add jobs, files, and executables first
1173 # then add edges later to avoid order issues
1174 for job_name in job_list:
1175 job = cast(GenericWorkflowJob, self.get_job(job_name))
1176 job_group.add_job(job)
1177 files = self.get_job_inputs(job_name, data=True)
1178 job_group.add_job_inputs(job_name, files)
1179 files = self.get_job_outputs(job_name, data=True)
1180 job_group.add_job_outputs(job_name, files)
1181 job_group.add_executable(job.executable)
1183 # Can't remove edges while looping through edge view,
1184 # so save to remove after loops
1185 edges_to_remove: list[tuple[str, str]] = []
1186 for job_name in job_list:
1187 in_edges = self.in_edges(job_name)
1188 for u, v in in_edges:
1189 if u in job_list:
1190 job_group.add_edge(u, v)
1191 else:
1192 self.add_edge(u, group_name)
1193 edges_to_remove.append((u, v))
1195 out_edges = self.out_edges(job_name)
1196 for u, v in out_edges:
1197 if v in job_list:
1198 job_group.add_edge(u, v)
1199 else:
1200 self.add_edge(group_name, v)
1201 edges_to_remove.append((u, v))
1203 # Remove edges collected earlier
1204 self.remove_edges_from(edges_to_remove)
1206 # Remove nodes from main GenericWorkflow
1207 self.remove_nodes_from(job_list)
1209 def add_special_job_ordering(self, ordering: dict[str, Any]) -> None:
1210 """Add special nodes and dependencies to enforce given ordering.
1212 Parameters
1213 ----------
1214 ordering : `dict` [`str`, `~typing.Any`]
1215 Description of the job ordering to enforce.
1216 """
1217 group_to_label_subgraph = self._check_job_ordering_config(ordering)
1219 for group, group_vals in ordering.items():
1220 if "findDependencyMethod" in group_vals:
1221 job_grouping_func = self._group_jobs_by_dependencies
1222 else:
1223 job_grouping_func = self._group_jobs_by_values
1225 dims = [x.strip() for x in group_vals["dimensions"].split(",")]
1226 _LOG.debug("group %s: dims=%s", group, dims)
1227 job_groups = job_grouping_func(group, group_vals, group_to_label_subgraph[group])
1229 # Update the workflow
1230 implementation = group_vals.get("implementation", "group")
1231 ordering_type = group_vals.get("ordering_type", "sort")
1232 match (implementation, ordering_type):
1233 case ("noop", "sort"):
1234 self._update_by_noop_sort(group, job_groups)
1235 case ("group", "sort"):
1236 blocking = group_vals.get("blocking", False)
1237 self._update_by_group_sort(group, job_groups, blocking)
1238 case _:
1239 raise RuntimeError(
1240 f"Invalid implementation, ordering_type pair for group ({implementation},"
1241 f" {ordering_type})"
1242 )
1244 def _update_by_noop_sort(self, group: str, dims_to_jobs: dict[tuple[Any, ...], list[str]]) -> None:
1245 """Update portion of workflow for special ordering of jobs using sort.
1247 Parameters
1248 ----------
1249 group : `str`
1250 Ordering group label used for job name and messages.
1251 dims_to_jobs: `dict` [`tuple`[`str`,...], `list` [`str`]]
1252 Mapping of special dimension keys to workflow job names.
1253 The sort for the special ordering is over the keys.
1254 """
1255 noop_names: list[str] = []
1256 prev_name: str | None = None
1257 for dim_key, job_list in sorted(dims_to_jobs.items()):
1258 _LOG.debug("group %s: dim_key=%s", group, dim_key)
1259 noop_name = f"noop_{group}_{'=='.join([str(dk) for dk in dim_key])}"
1260 if noop_names:
1261 prev_name = noop_names[-1]
1262 noop_names.append(noop_name)
1264 self._update_single_noop(job_list, noop_name, group, prev_name)
1266 # As implemented, loop adds one last NOOP job with
1267 # 0 children. Remove it as not useful.
1268 assert self.out_degree(noop_names[-1]) == 0
1269 self.remove_node(noop_names[-1])
1271 def _update_single_noop(
1272 self, job_list: list[str], order_node_name: str, order_label: str, prev_order_name: str | None
1273 ) -> None:
1274 """Update the workflow around the single job making special NOOP
1275 jobs when necessary.
1277 Parameters
1278 ----------
1279 job_list : `list` [`str`]
1280 Current jobs involved in special ordering
1281 order_node_name : `str`
1282 Name for the order NOOP job. If it does not exist in workflow, a
1283 new NOOP job with this name will be created and added.
1284 order_label : `str`
1285 Label for the order NOOP job.
1286 prev_order_name: `str` or None
1287 Name of the previous order NOOP job used to add edge as predecessor
1288 of current job.
1289 """
1290 if order_node_name not in self:
1291 _LOG.debug("Adding new ordering node %s", order_node_name)
1292 order_node = GenericWorkflowNoopJob(order_node_name, order_label)
1293 self.add_job(order_node)
1295 subgraph = DiGraph(self).subgraph(job_list)
1296 sinks = [n for n in job_list if subgraph.out_degree(n) == 0]
1297 for job_name in sinks:
1298 self.add_edge(job_name, order_node_name)
1300 if prev_order_name:
1301 sources = [n for n in job_list if subgraph.in_degree(n) == 0]
1302 for job_name in sources:
1303 _LOG.debug("Adding edge %s to %s", prev_order_name, job_name)
1304 self.add_edge(prev_order_name, job_name)
1307@dataclasses.dataclass(slots=True)
1308class GenericWorkflowGroup(GenericWorkflowNode, GenericWorkflow):
1309 """Node representing a group of jobs. Used for special dependencies.
1311 Parameters
1312 ----------
1313 name : `str`
1314 Name of node. Must be unique within workflow.
1315 label : `str`
1316 Primary user-facing label for job. Does not need to be unique and
1317 may be used for summary reports or to group nodes.
1318 blocking : `bool`
1319 Whether a failure inside group prunes executions of remaining groups.
1320 """
1322 blocking: bool = False
1323 """Whether a failure inside group prunes executions of remaining groups."""
1325 @property
1326 def node_type(self) -> GenericWorkflowNodeType:
1327 """Indicate this is a group job."""
1328 return GenericWorkflowNodeType.GROUP
1330 def __init__(self, name: str, label: str, blocking: bool = False) -> None:
1331 """Initialize each parent class."""
1332 _LOG.debug("%s %s %s", name, label, blocking)
1333 GenericWorkflowNode.__init__(self, name, label)
1334 GenericWorkflow.__init__(self, name)
1335 self.blocking = blocking
1338class GenericWorkflowLabels:
1339 """Label-oriented representation of the GenericWorkflowJobs."""
1341 def __init__(self) -> None:
1342 self._label_graph = DiGraph() # Dependency graph of job labels
1343 self._label_to_jobs: defaultdict[str, list[GenericWorkflowJob]] = defaultdict(
1344 list
1345 ) # mapping job label to list of GenericWorkflowJob
1347 @property
1348 def labels(self) -> list[str]:
1349 """List of job labels (`list` [`str`], read-only)."""
1350 return list(topological_sort(self._label_graph))
1352 @property
1353 def job_counts(self) -> Counter[str]:
1354 """Count of jobs per job label (`collections.Counter`)."""
1355 return Counter({label: len(self._label_to_jobs[label]) for label in self.labels})
1357 def get_jobs_by_label(self, label: str) -> list[GenericWorkflowJob]:
1358 """Retrieve jobs by label from workflow.
1360 Parameters
1361 ----------
1362 label : `str`
1363 Label of jobs to retrieve.
1365 Returns
1366 -------
1367 jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`]
1368 Jobs having given label.
1369 """
1370 return self._label_to_jobs[label]
1372 def add_job(self, job: GenericWorkflowJob, parent_labels: list[str], child_labels: list[str]) -> None:
1373 """Add job's label to labels.
1375 Parameters
1376 ----------
1377 job : `lsst.ctrl.bps.GenericWorkflowJob`
1378 The job to add to the job labels.
1379 parent_labels : `list` [`str`]
1380 Parent job labels.
1381 child_labels : `list` [`str`]
1382 Children job labels.
1383 """
1384 _LOG.debug("job: %s (%s)", job.name, job.label)
1385 _LOG.debug("parent_labels: %s", parent_labels)
1386 _LOG.debug("child_labels: %s", child_labels)
1387 self._label_to_jobs[job.label].append(job)
1388 self._label_graph.add_node(job.label)
1389 for parent in parent_labels:
1390 self._label_graph.add_edge(parent, job.label)
1391 for child in child_labels:
1392 self._label_graph.add_edge(job.label, child)
1394 def add_job_relationships(self, parent_labels: list[str], children_labels: list[str]) -> None:
1395 """Add dependencies between parent and child job labels.
1396 All parents will be connected to all children.
1398 Parameters
1399 ----------
1400 parent_labels : `list` [`str`]
1401 Parent job labels.
1402 children_labels : `list` [`str`]
1403 Children job labels.
1404 """
1405 # Since labels, must ensure not adding edge from label to itself.
1406 edges = [
1407 e
1408 for e in itertools.product(ensure_iterable(parent_labels), ensure_iterable(children_labels))
1409 if e[0] != e[1]
1410 ]
1412 self._label_graph.add_edges_from(edges)
1414 def del_job(self, job: GenericWorkflowJob) -> None:
1415 """Delete job and its label from job labels.
1417 Parameters
1418 ----------
1419 job : `lsst.ctrl.bps.GenericWorkflowJob`
1420 The job to delete from the job labels.
1421 """
1422 self._label_to_jobs[job.label].remove(job)
1423 # Don't leave keys around if removed last job
1424 if not self._label_to_jobs[job.label]:
1425 del self._label_to_jobs[job.label]
1427 parents = self._label_graph.predecessors(job.label)
1428 children = self._label_graph.successors(job.label)
1429 self._label_graph.remove_node(job.label)
1430 self._label_graph.add_edges_from(
1431 itertools.product(ensure_iterable(parents), ensure_iterable(children))
1432 )
1434 def subgraph(self, labels: Iterable[str]) -> DiGraph:
1435 """Create subgraph of workflow label graph with given labels.
1437 Parameters
1438 ----------
1439 labels : Iterable [`str`]
1440 Labels to appear in subgraph.
1442 Returns
1443 -------
1444 subgraph : `networkx.DiGraph`
1445 Subgraph of workflow label graph with given labels.
1446 """
1447 return self._label_graph.subgraph(labels)