Coverage for python / lsst / ctrl / bps / tests / gw_test_utils.py: 4%
265 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""GenericWorkflow-related utilities to support ctrl_bps testing."""
29__all__ = [
30 "make_3_label_workflow",
31 "make_3_label_workflow_groups_sort",
32 "make_3_label_workflow_noop_sort",
33 "make_5_label_workflow",
34 "make_5_label_workflow_2_groups",
35 "make_5_label_workflow_middle_groups",
36]
38import logging
39from collections import Counter
40from typing import cast
42from lsst.ctrl.bps import (
43 GenericWorkflow,
44 GenericWorkflowExec,
45 GenericWorkflowGroup,
46 GenericWorkflowJob,
47 GenericWorkflowNodeType,
48 GenericWorkflowNoopJob,
49)
51_LOG = logging.getLogger(__name__)
54def make_3_label_workflow(workflow_name: str, final: bool) -> GenericWorkflow:
55 """Create a simple 3 label test workflow.
57 Parameters
58 ----------
59 workflow_name : `str`
60 Name of the test workflow.
61 final : `bool`
62 Whether to add a final job.
64 Returns
65 -------
66 gwf : `lsst.ctrl.bps.GenericWorkflow`
67 The test workflow.
68 """
69 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False)
70 gwf = GenericWorkflow(workflow_name)
71 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec)
72 gwf.add_job(job)
73 for visit, vgroup in [
74 (10001, "2024-06-26T07:28:26.289"),
75 (10002, "2024-06-26T07:29:06.969"),
76 (301, "2024-06-26T07:27:45.775"),
77 ]: # 301 is to ensure numeric sorting
78 for detector in [10, 11]:
79 prev_name = "pipetaskInit"
80 for label in ["label1", "label2", "label3"]:
81 name = f"{label}_{visit}_{detector}"
82 job = GenericWorkflowJob(
83 name,
84 label=label,
85 executable=gwexec,
86 quanta_counts=Counter({label: 1}),
87 tags={"visit": visit, "detector": detector, "group": vgroup},
88 )
89 gwf.add_job(job, [prev_name], None)
90 prev_name = name
92 if final:
93 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True)
94 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec)
95 gwf.add_final(job)
97 return gwf
100def make_3_label_workflow_noop_sort(workflow_name: str, final: bool) -> GenericWorkflow:
101 """Create a test workflow that has noop jobs.
103 Parameters
104 ----------
105 workflow_name : `str`
106 Name of the test workflow.
107 final : `bool`
108 Whether to add a final job.
110 Returns
111 -------
112 gwf : `lsst.ctrl.bps.GenericWorkflow`
113 The test workflow.
114 """
115 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False)
116 gwf = GenericWorkflow(workflow_name)
117 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec)
118 gwf.add_job(job)
119 prev_noop: GenericWorkflowNoopJob | None = None
120 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting
121 if visit != 10002:
122 noop_job = GenericWorkflowNoopJob(f"noop_order1_{visit}", "order1")
123 gwf.add_job(noop_job)
124 for detector in [10, 11]:
125 prev_name = "pipetaskInit"
126 for label in ["label1", "label2", "label3"]:
127 name = f"{label}_{visit}_{detector}"
128 job = GenericWorkflowJob(
129 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector}
130 )
131 gwf.add_job(job, [prev_name], None)
132 if label == "label1" and prev_noop:
133 gwf.add_job_relationships([prev_noop.name], [name])
134 if label == "label2" and visit != 10002:
135 gwf.add_job_relationships([name], [noop_job.name])
136 prev_name = name
137 prev_noop = noop_job
139 if final:
140 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True)
141 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec)
142 gwf.add_final(job)
143 return gwf
146def make_3_label_workflow_groups_sort(workflow_name: str, final: bool) -> GenericWorkflow:
147 """Create a test workflow that has job groups.
149 Parameters
150 ----------
151 workflow_name : `str`
152 Name of the test workflow.
153 final : `bool`
154 Whether to add a final job.
156 Returns
157 -------
158 gwf : `lsst.ctrl.bps.GenericWorkflow`
159 The test workflow.
160 """
161 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False)
162 gwf = GenericWorkflow(workflow_name)
163 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec)
164 gwf.add_job(job)
165 prev_group: GenericWorkflowGroup | None = None
166 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting
167 job_group = GenericWorkflowGroup(f"group_order1_{visit}", "order1")
168 for detector in [10, 11]:
169 prev_name: str | None = None
170 for label in ["label1", "label2"]:
171 name = f"{label}_{visit}_{detector}"
172 job = GenericWorkflowJob(
173 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector}
174 )
175 job_group.add_job(job)
176 if prev_name:
177 job_group.add_job_relationships(prev_name, name)
178 prev_name = name
179 gwf.add_job(job_group, ["pipetaskInit"], None)
180 if prev_group:
181 gwf.add_job_relationships([prev_group.name], [job_group.name])
183 prev_group = job_group
184 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting
185 for detector in [10, 11]:
186 for label in ["label3"]:
187 name = f"{label}_{visit}_{detector}"
188 job = GenericWorkflowJob(
189 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector}
190 )
191 gwf.add_job(job, [f"group_order1_{visit}"], None)
193 if final:
194 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True)
195 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec)
196 gwf.add_final(job)
198 return gwf
201# 301 is to ensure numeric sorting
202DEFAULT_DIMS = [
203 (10001, 10),
204 (10001, 11),
205 (10001, 20),
206 (10002, 10),
207 (10002, 11),
208 (10002, 20),
209 (301, 10),
210 (301, 11),
211 (301, 20),
212]
213DIM_MAPPING = {301: "gval1", 10001: "gval2", 10002: "gval3"}
215UNEVEN_LABEL_DIMS = {
216 "T1": [(10002, 11), (10002, 20)],
217 "T2": [(10001, 11), (10001, 20), (10002, 10), (10002, 11), (10002, 20)],
218 "T2b": [(301, 11), (301, 20), (10001, 11), (10001, 20), (10002, 10), (10002, 11), (10002, 20)],
219 "T3": DEFAULT_DIMS,
220 "T4": DEFAULT_DIMS,
221}
223EVEN_LABEL_DIMS = {
224 "T1": DEFAULT_DIMS,
225 "T2": DEFAULT_DIMS,
226 "T2b": DEFAULT_DIMS,
227 "T3": DEFAULT_DIMS,
228 "T4": DEFAULT_DIMS,
229}
232def make_5_label_workflow(
233 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False
234) -> GenericWorkflow:
235 """Create a simple 3 label test workflow.
237 Parameters
238 ----------
239 workflow_name : `str`
240 Name of the test workflow.
241 final : `bool`
242 Whether to add a final job.
243 uneven : `bool`, optional
244 Whether some of the jobs for initial tasks are
245 not included as if finished in previous run.
246 equiv_dims : `bool`, optional
247 Whether first label jobs have a different but equivalent
248 dim (like group and visit in AP pipeline).
250 Returns
251 -------
252 gwf : `lsst.ctrl.bps.GenericWorkflow`
253 The test workflow.
254 """
255 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False)
256 gwf = GenericWorkflow(workflow_name)
257 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec)
258 gwf.add_job(job)
259 if uneven:
260 label_dims = UNEVEN_LABEL_DIMS
261 else:
262 label_dims = EVEN_LABEL_DIMS
264 prev_label = "pipetaskInit"
265 for label in sorted(label_dims):
266 for dim1, dim2 in label_dims[label]:
267 tags: dict[str, str | int] = {"detector": dim2}
268 # if want to test with equivalent dims (e.g., group and visit)
269 if equiv_dims and label == "T1":
270 tags["group"] = DIM_MAPPING[dim1]
271 name = f"{label}_{DIM_MAPPING[dim1]}_{dim2}"
272 else:
273 tags["visit"] = dim1
274 name = f"{label}_{dim1}_{dim2}"
276 job = GenericWorkflowJob(
277 name, label=label, executable=gwexec, quanta_counts=Counter({label: 1}), tags=tags
278 )
279 parents = []
280 if label == "T1":
281 parents = ["pipetaskInit"]
282 elif (dim1, dim2) in label_dims[prev_label]:
283 if equiv_dims and label == "T2":
284 prev_name = f"{prev_label}_{DIM_MAPPING[dim1]}_{dim2}"
285 else:
286 prev_name = f"{prev_label}_{dim1}_{dim2}"
287 parents = [prev_name]
288 else:
289 parents = ["pipetaskInit"]
291 gwf.add_job(job, parents, None)
293 if label != "T2b": # nothing is a descenant of T2b
294 prev_label = label
296 if final:
297 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True)
298 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec)
299 gwf.add_final(job)
301 return gwf
304def make_5_label_workflow_2_groups(
305 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False, blocking: bool = False
306) -> GenericWorkflow:
307 """Create a simple 3 label test workflow.
309 Parameters
310 ----------
311 workflow_name : `str`
312 Name of the test workflow.
313 final : `bool`
314 Whether to add a final job.
315 uneven : `bool`, optional
316 Whether some of the jobs for initial tasks are
317 not included as if finished in previous run.
318 equiv_dims : `bool`, optional
319 Whether first label jobs have a different but equivalent
320 dim (like group and visit in AP pipeline).
321 blocking : `bool`, optional
322 Value to use in group nodes.
324 Returns
325 -------
326 gwf : `lsst.ctrl.bps.GenericWorkflow`
327 The test workflow.
328 """
329 gwf_orig = make_5_label_workflow("sink_uneven", final, uneven, equiv_dims)
331 if uneven:
332 label_dims = UNEVEN_LABEL_DIMS
333 else:
334 label_dims = EVEN_LABEL_DIMS
336 # make job lists
337 job_lists: dict[str, list[str]] = {}
338 group_labels: dict[str, str] = {}
340 group_label = "order1"
341 for dim1, dim2 in label_dims["T1"]:
342 if equiv_dims:
343 job_name = f"T1_{DIM_MAPPING[dim1]}_{dim2}"
344 else:
345 job_name = f"T1_{dim1}_{dim2}"
346 group_name = f"group_{group_label}_{dim1}"
347 group_labels[group_name] = group_label
348 job_lists.setdefault(group_name, []).append(job_name)
350 for dim1, dim2 in label_dims["T2"]:
351 job_name = f"T2_{dim1}_{dim2}"
352 group_name = f"group_{group_label}_{dim1}"
353 group_labels[group_name] = group_label
354 job_lists.setdefault(group_name, []).append(job_name)
356 group_label = "order2"
357 for label in ["T3", "T4"]:
358 for dim1, dim2 in label_dims[label]:
359 job_name = f"{label}_{dim1}_{dim2}"
360 group_name = f"group_{group_label}_{dim1}"
361 group_labels[group_name] = group_label
362 job_lists.setdefault(group_name, []).append(job_name)
364 # make groups of jobs
365 groups = {}
366 for group_name, job_names in job_lists.items():
367 if job_names:
368 group = GenericWorkflowGroup(group_name, group_labels[group_name], blocking=blocking)
369 # Add all jobs first then add edges
370 for job_name in job_names:
371 group.add_job(gwf_orig.get_job(job_name))
373 for name in job_names:
374 edges = [(name, p) for p in gwf_orig.predecessors(name) if p in job_names]
375 group.add_edges_from(edges)
376 groups[group_name] = group
378 gwf = GenericWorkflow(workflow_name)
380 # add main workflow nodes
381 gwf.add_job(gwf_orig.get_job("pipetaskInit"))
382 for dim1, dim2 in label_dims["T2b"]:
383 job_name = f"T2b_{dim1}_{dim2}"
384 gwf.add_job(gwf_orig.get_job(job_name))
386 for group in groups.values():
387 gwf.add_job(group)
389 # add main workflow edges
390 edges = [
391 ("pipetaskInit", "group_order1_10001"),
392 ("pipetaskInit", "group_order1_10002"),
393 ("group_order1_10001", "group_order2_10001"),
394 ("group_order1_10002", "group_order2_10002"),
395 ("group_order1_10001", "T2b_10001_11"),
396 ("group_order1_10001", "T2b_10001_20"),
397 ("group_order1_10002", "T2b_10002_10"),
398 ("group_order1_10002", "T2b_10002_11"),
399 ("group_order1_10002", "T2b_10002_20"),
400 # group order dependencies
401 ("group_order1_10001", "group_order1_10002"),
402 ("group_order2_301", "group_order2_10001"),
403 ("group_order2_10001", "group_order2_10002"),
404 ]
406 if uneven:
407 edges.extend(
408 [
409 ("pipetaskInit", "T2b_301_11"),
410 ("pipetaskInit", "T2b_301_20"),
411 ("pipetaskInit", "group_order2_301"),
412 ("pipetaskInit", "group_order2_10001"),
413 ]
414 )
415 else:
416 edges.extend(
417 [
418 ("pipetaskInit", "group_order1_301"),
419 ("group_order1_301", "group_order1_10001"),
420 ("group_order1_301", "group_order2_301"),
421 ("group_order1_301", "T2b_301_10"),
422 ("group_order1_301", "T2b_301_11"),
423 ("group_order1_301", "T2b_301_20"),
424 ("group_order1_10001", "T2b_10001_10"),
425 ]
426 )
427 gwf.add_edges_from(edges)
429 if final:
430 job = cast(GenericWorkflowJob, gwf_orig.get_final())
431 gwf.add_final(job)
433 return gwf
436def make_5_label_workflow_middle_groups(
437 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False, blocking: bool = False
438) -> GenericWorkflow:
439 """Create a test workflow with a group in middle of workflow
440 (T2, T2b, and T3).
442 Parameters
443 ----------
444 workflow_name : `str`
445 Name of the test workflow.
446 final : `bool`
447 Whether to add a final job.
448 uneven : `bool`, optional
449 Whether some of the jobs for initial tasks are
450 not included as if finished in previous run.
451 equiv_dims : `bool`, optional
452 Whether first label jobs have a different but equivalent
453 dim (like group and visit in AP pipeline).
454 blocking : `bool`, optional
455 Value to use in group nodes.
457 Returns
458 -------
459 gwf : `lsst.ctrl.bps.GenericWorkflow`
460 The test workflow.
461 """
462 gwf_orig = make_5_label_workflow(workflow_name, final, uneven, equiv_dims)
464 if uneven:
465 label_dims = UNEVEN_LABEL_DIMS
466 else:
467 label_dims = EVEN_LABEL_DIMS
469 # make job lists
470 job_lists: dict[str, list[str]] = {}
471 group_labels: dict[str, str] = {}
473 group_label = "mid"
474 for label in ["T2", "T2b", "T3"]:
475 for dim1, dim2 in label_dims[label]:
476 job_name = f"{label}_{dim1}_{dim2}"
477 group_name = f"group_{group_label}_{dim1}"
478 group_labels[group_name] = group_label
479 job_lists.setdefault(group_name, []).append(job_name)
481 # make groups of jobs
482 groups = {}
483 for group_name, job_names in job_lists.items():
484 if job_names:
485 group = GenericWorkflowGroup(group_name, group_labels[group_name], blocking=blocking)
486 # Add all jobs first then add edges
487 for job_name in job_names:
488 group.add_job(gwf_orig.get_job(job_name))
490 for name in job_names:
491 edges = [(name, p) for p in gwf_orig.predecessors(name) if p in job_names]
492 group.add_edges_from(edges)
494 groups[group_name] = group
496 gwf = GenericWorkflow(workflow_name)
498 # add main workflow nodes
499 gwf.add_job(gwf_orig.get_job("pipetaskInit"))
500 for label in ["T1", "T4"]:
501 for dim1, dim2 in label_dims[label]:
502 if equiv_dims and label == "T1":
503 job_name = f"T1_{DIM_MAPPING[dim1]}_{dim2}"
504 else:
505 job_name = f"{label}_{dim1}_{dim2}"
506 gwf.add_job(gwf_orig.get_job(job_name))
508 for group in groups.values():
509 gwf.add_job(group)
511 # add main workflow edges
512 edges = [
513 ("group_mid_301", "T4_301_10"),
514 ("group_mid_301", "T4_301_11"),
515 ("group_mid_301", "T4_301_20"),
516 ("group_mid_10001", "T4_10001_10"),
517 ("group_mid_10001", "T4_10001_11"),
518 ("group_mid_10001", "T4_10001_20"),
519 ("group_mid_10002", "T4_10002_10"),
520 ("group_mid_10002", "T4_10002_11"),
521 ("group_mid_10002", "T4_10002_20"),
522 # group order dependencies
523 ("group_mid_301", "group_mid_10001"),
524 ("group_mid_10001", "group_mid_10002"),
525 ]
527 if uneven:
528 if equiv_dims:
529 edges.extend(
530 [
531 ("pipetaskInit", "T1_gval3_11"),
532 ("pipetaskInit", "T1_gval3_20"),
533 ("T1_gval3_11", "group_mid_10002"),
534 ("T1_gval3_20", "group_mid_10002"),
535 ]
536 )
537 else:
538 edges.extend(
539 [
540 ("pipetaskInit", "T1_10002_11"),
541 ("pipetaskInit", "T1_10002_20"),
542 ("T1_10002_11", "group_mid_10002"),
543 ("T1_10002_20", "group_mid_10002"),
544 ]
545 )
547 # Because in orig workflow, pipetaskInit has edge to T2(10002, 10),
548 # there will be an "extra" edge from pipetaskInit to group_mid_10002.
549 edges.extend(
550 [
551 ("pipetaskInit", "group_mid_301"),
552 ("pipetaskInit", "group_mid_10001"),
553 ("pipetaskInit", "group_mid_10002"),
554 ]
555 )
556 else:
557 dim1s = [301, 10001, 10002]
558 for dim1 in dim1s:
559 T1_dim1: str | int = dim1
560 if equiv_dims:
561 T1_dim1 = DIM_MAPPING[dim1]
563 edges.extend(
564 [
565 ("pipetaskInit", f"T1_{T1_dim1}_10"),
566 ("pipetaskInit", f"T1_{T1_dim1}_11"),
567 ("pipetaskInit", f"T1_{T1_dim1}_20"),
568 (f"T1_{T1_dim1}_10", f"group_mid_{dim1}"),
569 (f"T1_{T1_dim1}_11", f"group_mid_{dim1}"),
570 (f"T1_{T1_dim1}_20", f"group_mid_{dim1}"),
571 ]
572 )
573 gwf.add_edges_from(edges)
575 if final:
576 job = cast(GenericWorkflowJob, gwf_orig.get_final())
577 gwf.add_final(job)
579 return gwf
582def compare_generic_workflows(gwf1: GenericWorkflow, gwf2: GenericWorkflow) -> bool:
583 """Compare two workflows printing log messages where not equal.
585 Parameters
586 ----------
587 gwf1 : `lsst.ctrl.bps.GenericWorkflow`
588 First workflow to compare.
589 gwf2 : `lsst.ctrl.bps.GenericWorkflow`
590 Second workflow to compare.
592 Returns
593 -------
594 equal : bool
595 Whether the two workflows are the same.
596 """
597 equal = True
599 # check edges
600 edges1 = set(gwf1.edges)
601 edges2 = set(gwf2.edges)
602 only_in_first = edges1 - edges2
603 only_in_second = edges2 - edges1
604 if only_in_first:
605 _LOG.debug("Edges only in %s, but not in %s: %s", gwf1.name, gwf2.name, only_in_first)
606 equal = False
607 if only_in_second:
608 _LOG.debug("Edges only in %s, but not in %s: %s", gwf2.name, gwf1.name, only_in_second)
609 equal = False
611 # check nodes
612 names1 = set(gwf1.nodes)
613 names2 = set(gwf2.nodes)
614 only_in_first = names1 - names2
615 only_in_second = names2 - names1
616 if only_in_first:
617 _LOG.debug("Jobs only in %s, but not in %s: %s", gwf1.name, gwf2.name, only_in_first)
618 equal = False
619 if only_in_second:
620 _LOG.debug("Jobs only in %s, but not in %s: %s", gwf2.name, gwf1.name, only_in_second)
621 equal = False
623 # check node values
624 for name in names1 & names2:
625 job1 = gwf1.get_job(name)
626 job2 = gwf2.get_job(name)
628 if job1.node_type != job2.node_type:
629 _LOG.debug(
630 "Group jobs` node_type not equal %s=%s, %s=%s",
631 job1.name,
632 job1.blocking,
633 job2.name,
634 job2.blocking,
635 )
636 equal = False
637 elif job1.node_type == GenericWorkflowNodeType.GROUP:
638 if job1.blocking != job2.blocking:
639 _LOG.debug(
640 "Group jobs` blocking not equal %s=%s, %s=%s",
641 job1.name,
642 job1.blocking,
643 job2.name,
644 job2.blocking,
645 )
646 equal = False
648 # compare workflows
649 equal = equal or compare_generic_workflows(job1, job2)
651 # check final
652 final1 = gwf1.get_final()
653 final2 = gwf2.get_final()
654 if final1 != final2:
655 _LOG.debug("Final jobs are not equal: %s vs %s", final1, final2)
656 equal = False
658 return equal