Coverage for python / lsst / ctrl / bps / tests / gw_test_utils.py: 4%

265 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:49 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27"""GenericWorkflow-related utilities to support ctrl_bps testing.""" 

28 

29__all__ = [ 

30 "make_3_label_workflow", 

31 "make_3_label_workflow_groups_sort", 

32 "make_3_label_workflow_noop_sort", 

33 "make_5_label_workflow", 

34 "make_5_label_workflow_2_groups", 

35 "make_5_label_workflow_middle_groups", 

36] 

37 

38import logging 

39from collections import Counter 

40from typing import cast 

41 

42from lsst.ctrl.bps import ( 

43 GenericWorkflow, 

44 GenericWorkflowExec, 

45 GenericWorkflowGroup, 

46 GenericWorkflowJob, 

47 GenericWorkflowNodeType, 

48 GenericWorkflowNoopJob, 

49) 

50 

51_LOG = logging.getLogger(__name__) 

52 

53 

54def make_3_label_workflow(workflow_name: str, final: bool) -> GenericWorkflow: 

55 """Create a simple 3 label test workflow. 

56 

57 Parameters 

58 ---------- 

59 workflow_name : `str` 

60 Name of the test workflow. 

61 final : `bool` 

62 Whether to add a final job. 

63 

64 Returns 

65 ------- 

66 gwf : `lsst.ctrl.bps.GenericWorkflow` 

67 The test workflow. 

68 """ 

69 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False) 

70 gwf = GenericWorkflow(workflow_name) 

71 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec) 

72 gwf.add_job(job) 

73 for visit, vgroup in [ 

74 (10001, "2024-06-26T07:28:26.289"), 

75 (10002, "2024-06-26T07:29:06.969"), 

76 (301, "2024-06-26T07:27:45.775"), 

77 ]: # 301 is to ensure numeric sorting 

78 for detector in [10, 11]: 

79 prev_name = "pipetaskInit" 

80 for label in ["label1", "label2", "label3"]: 

81 name = f"{label}_{visit}_{detector}" 

82 job = GenericWorkflowJob( 

83 name, 

84 label=label, 

85 executable=gwexec, 

86 quanta_counts=Counter({label: 1}), 

87 tags={"visit": visit, "detector": detector, "group": vgroup}, 

88 ) 

89 gwf.add_job(job, [prev_name], None) 

90 prev_name = name 

91 

92 if final: 

93 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True) 

94 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec) 

95 gwf.add_final(job) 

96 

97 return gwf 

98 

99 

100def make_3_label_workflow_noop_sort(workflow_name: str, final: bool) -> GenericWorkflow: 

101 """Create a test workflow that has noop jobs. 

102 

103 Parameters 

104 ---------- 

105 workflow_name : `str` 

106 Name of the test workflow. 

107 final : `bool` 

108 Whether to add a final job. 

109 

110 Returns 

111 ------- 

112 gwf : `lsst.ctrl.bps.GenericWorkflow` 

113 The test workflow. 

114 """ 

115 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False) 

116 gwf = GenericWorkflow(workflow_name) 

117 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec) 

118 gwf.add_job(job) 

119 prev_noop: GenericWorkflowNoopJob | None = None 

120 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting 

121 if visit != 10002: 

122 noop_job = GenericWorkflowNoopJob(f"noop_order1_{visit}", "order1") 

123 gwf.add_job(noop_job) 

124 for detector in [10, 11]: 

125 prev_name = "pipetaskInit" 

126 for label in ["label1", "label2", "label3"]: 

127 name = f"{label}_{visit}_{detector}" 

128 job = GenericWorkflowJob( 

129 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector} 

130 ) 

131 gwf.add_job(job, [prev_name], None) 

132 if label == "label1" and prev_noop: 

133 gwf.add_job_relationships([prev_noop.name], [name]) 

134 if label == "label2" and visit != 10002: 

135 gwf.add_job_relationships([name], [noop_job.name]) 

136 prev_name = name 

137 prev_noop = noop_job 

138 

139 if final: 

140 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True) 

141 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec) 

142 gwf.add_final(job) 

143 return gwf 

144 

145 

146def make_3_label_workflow_groups_sort(workflow_name: str, final: bool) -> GenericWorkflow: 

147 """Create a test workflow that has job groups. 

148 

149 Parameters 

150 ---------- 

151 workflow_name : `str` 

152 Name of the test workflow. 

153 final : `bool` 

154 Whether to add a final job. 

155 

156 Returns 

157 ------- 

158 gwf : `lsst.ctrl.bps.GenericWorkflow` 

159 The test workflow. 

160 """ 

161 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False) 

162 gwf = GenericWorkflow(workflow_name) 

163 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec) 

164 gwf.add_job(job) 

165 prev_group: GenericWorkflowGroup | None = None 

166 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting 

167 job_group = GenericWorkflowGroup(f"group_order1_{visit}", "order1") 

168 for detector in [10, 11]: 

169 prev_name: str | None = None 

170 for label in ["label1", "label2"]: 

171 name = f"{label}_{visit}_{detector}" 

172 job = GenericWorkflowJob( 

173 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector} 

174 ) 

175 job_group.add_job(job) 

176 if prev_name: 

177 job_group.add_job_relationships(prev_name, name) 

178 prev_name = name 

179 gwf.add_job(job_group, ["pipetaskInit"], None) 

180 if prev_group: 

181 gwf.add_job_relationships([prev_group.name], [job_group.name]) 

182 

183 prev_group = job_group 

184 for visit in sorted([10001, 10002, 301]): # 301 is to ensure numeric sorting 

185 for detector in [10, 11]: 

186 for label in ["label3"]: 

187 name = f"{label}_{visit}_{detector}" 

188 job = GenericWorkflowJob( 

189 name, label=label, executable=gwexec, tags={"visit": visit, "detector": detector} 

190 ) 

191 gwf.add_job(job, [f"group_order1_{visit}"], None) 

192 

193 if final: 

194 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True) 

195 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec) 

196 gwf.add_final(job) 

197 

198 return gwf 

199 

200 

201# 301 is to ensure numeric sorting 

202DEFAULT_DIMS = [ 

203 (10001, 10), 

204 (10001, 11), 

205 (10001, 20), 

206 (10002, 10), 

207 (10002, 11), 

208 (10002, 20), 

209 (301, 10), 

210 (301, 11), 

211 (301, 20), 

212] 

213DIM_MAPPING = {301: "gval1", 10001: "gval2", 10002: "gval3"} 

214 

215UNEVEN_LABEL_DIMS = { 

216 "T1": [(10002, 11), (10002, 20)], 

217 "T2": [(10001, 11), (10001, 20), (10002, 10), (10002, 11), (10002, 20)], 

218 "T2b": [(301, 11), (301, 20), (10001, 11), (10001, 20), (10002, 10), (10002, 11), (10002, 20)], 

219 "T3": DEFAULT_DIMS, 

220 "T4": DEFAULT_DIMS, 

221} 

222 

223EVEN_LABEL_DIMS = { 

224 "T1": DEFAULT_DIMS, 

225 "T2": DEFAULT_DIMS, 

226 "T2b": DEFAULT_DIMS, 

227 "T3": DEFAULT_DIMS, 

228 "T4": DEFAULT_DIMS, 

229} 

230 

231 

232def make_5_label_workflow( 

233 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False 

234) -> GenericWorkflow: 

235 """Create a simple 3 label test workflow. 

236 

237 Parameters 

238 ---------- 

239 workflow_name : `str` 

240 Name of the test workflow. 

241 final : `bool` 

242 Whether to add a final job. 

243 uneven : `bool`, optional 

244 Whether some of the jobs for initial tasks are 

245 not included as if finished in previous run. 

246 equiv_dims : `bool`, optional 

247 Whether first label jobs have a different but equivalent 

248 dim (like group and visit in AP pipeline). 

249 

250 Returns 

251 ------- 

252 gwf : `lsst.ctrl.bps.GenericWorkflow` 

253 The test workflow. 

254 """ 

255 gwexec = GenericWorkflowExec("exec1", "/usr/bin/uptime", False) 

256 gwf = GenericWorkflow(workflow_name) 

257 job = GenericWorkflowJob("pipetaskInit", label="pipetaskInit", executable=gwexec) 

258 gwf.add_job(job) 

259 if uneven: 

260 label_dims = UNEVEN_LABEL_DIMS 

261 else: 

262 label_dims = EVEN_LABEL_DIMS 

263 

264 prev_label = "pipetaskInit" 

265 for label in sorted(label_dims): 

266 for dim1, dim2 in label_dims[label]: 

267 tags: dict[str, str | int] = {"detector": dim2} 

268 # if want to test with equivalent dims (e.g., group and visit) 

269 if equiv_dims and label == "T1": 

270 tags["group"] = DIM_MAPPING[dim1] 

271 name = f"{label}_{DIM_MAPPING[dim1]}_{dim2}" 

272 else: 

273 tags["visit"] = dim1 

274 name = f"{label}_{dim1}_{dim2}" 

275 

276 job = GenericWorkflowJob( 

277 name, label=label, executable=gwexec, quanta_counts=Counter({label: 1}), tags=tags 

278 ) 

279 parents = [] 

280 if label == "T1": 

281 parents = ["pipetaskInit"] 

282 elif (dim1, dim2) in label_dims[prev_label]: 

283 if equiv_dims and label == "T2": 

284 prev_name = f"{prev_label}_{DIM_MAPPING[dim1]}_{dim2}" 

285 else: 

286 prev_name = f"{prev_label}_{dim1}_{dim2}" 

287 parents = [prev_name] 

288 else: 

289 parents = ["pipetaskInit"] 

290 

291 gwf.add_job(job, parents, None) 

292 

293 if label != "T2b": # nothing is a descenant of T2b 

294 prev_label = label 

295 

296 if final: 

297 gwexec = GenericWorkflowExec("finalJob.bash", "finalJob.bash", True) 

298 job = GenericWorkflowJob("finalJob", label="finalJob", executable=gwexec) 

299 gwf.add_final(job) 

300 

301 return gwf 

302 

303 

304def make_5_label_workflow_2_groups( 

305 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False, blocking: bool = False 

306) -> GenericWorkflow: 

307 """Create a simple 3 label test workflow. 

308 

309 Parameters 

310 ---------- 

311 workflow_name : `str` 

312 Name of the test workflow. 

313 final : `bool` 

314 Whether to add a final job. 

315 uneven : `bool`, optional 

316 Whether some of the jobs for initial tasks are 

317 not included as if finished in previous run. 

318 equiv_dims : `bool`, optional 

319 Whether first label jobs have a different but equivalent 

320 dim (like group and visit in AP pipeline). 

321 blocking : `bool`, optional 

322 Value to use in group nodes. 

323 

324 Returns 

325 ------- 

326 gwf : `lsst.ctrl.bps.GenericWorkflow` 

327 The test workflow. 

328 """ 

329 gwf_orig = make_5_label_workflow("sink_uneven", final, uneven, equiv_dims) 

330 

331 if uneven: 

332 label_dims = UNEVEN_LABEL_DIMS 

333 else: 

334 label_dims = EVEN_LABEL_DIMS 

335 

336 # make job lists 

337 job_lists: dict[str, list[str]] = {} 

338 group_labels: dict[str, str] = {} 

339 

340 group_label = "order1" 

341 for dim1, dim2 in label_dims["T1"]: 

342 if equiv_dims: 

343 job_name = f"T1_{DIM_MAPPING[dim1]}_{dim2}" 

344 else: 

345 job_name = f"T1_{dim1}_{dim2}" 

346 group_name = f"group_{group_label}_{dim1}" 

347 group_labels[group_name] = group_label 

348 job_lists.setdefault(group_name, []).append(job_name) 

349 

350 for dim1, dim2 in label_dims["T2"]: 

351 job_name = f"T2_{dim1}_{dim2}" 

352 group_name = f"group_{group_label}_{dim1}" 

353 group_labels[group_name] = group_label 

354 job_lists.setdefault(group_name, []).append(job_name) 

355 

356 group_label = "order2" 

357 for label in ["T3", "T4"]: 

358 for dim1, dim2 in label_dims[label]: 

359 job_name = f"{label}_{dim1}_{dim2}" 

360 group_name = f"group_{group_label}_{dim1}" 

361 group_labels[group_name] = group_label 

362 job_lists.setdefault(group_name, []).append(job_name) 

363 

364 # make groups of jobs 

365 groups = {} 

366 for group_name, job_names in job_lists.items(): 

367 if job_names: 

368 group = GenericWorkflowGroup(group_name, group_labels[group_name], blocking=blocking) 

369 # Add all jobs first then add edges 

370 for job_name in job_names: 

371 group.add_job(gwf_orig.get_job(job_name)) 

372 

373 for name in job_names: 

374 edges = [(name, p) for p in gwf_orig.predecessors(name) if p in job_names] 

375 group.add_edges_from(edges) 

376 groups[group_name] = group 

377 

378 gwf = GenericWorkflow(workflow_name) 

379 

380 # add main workflow nodes 

381 gwf.add_job(gwf_orig.get_job("pipetaskInit")) 

382 for dim1, dim2 in label_dims["T2b"]: 

383 job_name = f"T2b_{dim1}_{dim2}" 

384 gwf.add_job(gwf_orig.get_job(job_name)) 

385 

386 for group in groups.values(): 

387 gwf.add_job(group) 

388 

389 # add main workflow edges 

390 edges = [ 

391 ("pipetaskInit", "group_order1_10001"), 

392 ("pipetaskInit", "group_order1_10002"), 

393 ("group_order1_10001", "group_order2_10001"), 

394 ("group_order1_10002", "group_order2_10002"), 

395 ("group_order1_10001", "T2b_10001_11"), 

396 ("group_order1_10001", "T2b_10001_20"), 

397 ("group_order1_10002", "T2b_10002_10"), 

398 ("group_order1_10002", "T2b_10002_11"), 

399 ("group_order1_10002", "T2b_10002_20"), 

400 # group order dependencies 

401 ("group_order1_10001", "group_order1_10002"), 

402 ("group_order2_301", "group_order2_10001"), 

403 ("group_order2_10001", "group_order2_10002"), 

404 ] 

405 

406 if uneven: 

407 edges.extend( 

408 [ 

409 ("pipetaskInit", "T2b_301_11"), 

410 ("pipetaskInit", "T2b_301_20"), 

411 ("pipetaskInit", "group_order2_301"), 

412 ("pipetaskInit", "group_order2_10001"), 

413 ] 

414 ) 

415 else: 

416 edges.extend( 

417 [ 

418 ("pipetaskInit", "group_order1_301"), 

419 ("group_order1_301", "group_order1_10001"), 

420 ("group_order1_301", "group_order2_301"), 

421 ("group_order1_301", "T2b_301_10"), 

422 ("group_order1_301", "T2b_301_11"), 

423 ("group_order1_301", "T2b_301_20"), 

424 ("group_order1_10001", "T2b_10001_10"), 

425 ] 

426 ) 

427 gwf.add_edges_from(edges) 

428 

429 if final: 

430 job = cast(GenericWorkflowJob, gwf_orig.get_final()) 

431 gwf.add_final(job) 

432 

433 return gwf 

434 

435 

436def make_5_label_workflow_middle_groups( 

437 workflow_name: str, final: bool, uneven: bool = False, equiv_dims: bool = False, blocking: bool = False 

438) -> GenericWorkflow: 

439 """Create a test workflow with a group in middle of workflow 

440 (T2, T2b, and T3). 

441 

442 Parameters 

443 ---------- 

444 workflow_name : `str` 

445 Name of the test workflow. 

446 final : `bool` 

447 Whether to add a final job. 

448 uneven : `bool`, optional 

449 Whether some of the jobs for initial tasks are 

450 not included as if finished in previous run. 

451 equiv_dims : `bool`, optional 

452 Whether first label jobs have a different but equivalent 

453 dim (like group and visit in AP pipeline). 

454 blocking : `bool`, optional 

455 Value to use in group nodes. 

456 

457 Returns 

458 ------- 

459 gwf : `lsst.ctrl.bps.GenericWorkflow` 

460 The test workflow. 

461 """ 

462 gwf_orig = make_5_label_workflow(workflow_name, final, uneven, equiv_dims) 

463 

464 if uneven: 

465 label_dims = UNEVEN_LABEL_DIMS 

466 else: 

467 label_dims = EVEN_LABEL_DIMS 

468 

469 # make job lists 

470 job_lists: dict[str, list[str]] = {} 

471 group_labels: dict[str, str] = {} 

472 

473 group_label = "mid" 

474 for label in ["T2", "T2b", "T3"]: 

475 for dim1, dim2 in label_dims[label]: 

476 job_name = f"{label}_{dim1}_{dim2}" 

477 group_name = f"group_{group_label}_{dim1}" 

478 group_labels[group_name] = group_label 

479 job_lists.setdefault(group_name, []).append(job_name) 

480 

481 # make groups of jobs 

482 groups = {} 

483 for group_name, job_names in job_lists.items(): 

484 if job_names: 

485 group = GenericWorkflowGroup(group_name, group_labels[group_name], blocking=blocking) 

486 # Add all jobs first then add edges 

487 for job_name in job_names: 

488 group.add_job(gwf_orig.get_job(job_name)) 

489 

490 for name in job_names: 

491 edges = [(name, p) for p in gwf_orig.predecessors(name) if p in job_names] 

492 group.add_edges_from(edges) 

493 

494 groups[group_name] = group 

495 

496 gwf = GenericWorkflow(workflow_name) 

497 

498 # add main workflow nodes 

499 gwf.add_job(gwf_orig.get_job("pipetaskInit")) 

500 for label in ["T1", "T4"]: 

501 for dim1, dim2 in label_dims[label]: 

502 if equiv_dims and label == "T1": 

503 job_name = f"T1_{DIM_MAPPING[dim1]}_{dim2}" 

504 else: 

505 job_name = f"{label}_{dim1}_{dim2}" 

506 gwf.add_job(gwf_orig.get_job(job_name)) 

507 

508 for group in groups.values(): 

509 gwf.add_job(group) 

510 

511 # add main workflow edges 

512 edges = [ 

513 ("group_mid_301", "T4_301_10"), 

514 ("group_mid_301", "T4_301_11"), 

515 ("group_mid_301", "T4_301_20"), 

516 ("group_mid_10001", "T4_10001_10"), 

517 ("group_mid_10001", "T4_10001_11"), 

518 ("group_mid_10001", "T4_10001_20"), 

519 ("group_mid_10002", "T4_10002_10"), 

520 ("group_mid_10002", "T4_10002_11"), 

521 ("group_mid_10002", "T4_10002_20"), 

522 # group order dependencies 

523 ("group_mid_301", "group_mid_10001"), 

524 ("group_mid_10001", "group_mid_10002"), 

525 ] 

526 

527 if uneven: 

528 if equiv_dims: 

529 edges.extend( 

530 [ 

531 ("pipetaskInit", "T1_gval3_11"), 

532 ("pipetaskInit", "T1_gval3_20"), 

533 ("T1_gval3_11", "group_mid_10002"), 

534 ("T1_gval3_20", "group_mid_10002"), 

535 ] 

536 ) 

537 else: 

538 edges.extend( 

539 [ 

540 ("pipetaskInit", "T1_10002_11"), 

541 ("pipetaskInit", "T1_10002_20"), 

542 ("T1_10002_11", "group_mid_10002"), 

543 ("T1_10002_20", "group_mid_10002"), 

544 ] 

545 ) 

546 

547 # Because in orig workflow, pipetaskInit has edge to T2(10002, 10), 

548 # there will be an "extra" edge from pipetaskInit to group_mid_10002. 

549 edges.extend( 

550 [ 

551 ("pipetaskInit", "group_mid_301"), 

552 ("pipetaskInit", "group_mid_10001"), 

553 ("pipetaskInit", "group_mid_10002"), 

554 ] 

555 ) 

556 else: 

557 dim1s = [301, 10001, 10002] 

558 for dim1 in dim1s: 

559 T1_dim1: str | int = dim1 

560 if equiv_dims: 

561 T1_dim1 = DIM_MAPPING[dim1] 

562 

563 edges.extend( 

564 [ 

565 ("pipetaskInit", f"T1_{T1_dim1}_10"), 

566 ("pipetaskInit", f"T1_{T1_dim1}_11"), 

567 ("pipetaskInit", f"T1_{T1_dim1}_20"), 

568 (f"T1_{T1_dim1}_10", f"group_mid_{dim1}"), 

569 (f"T1_{T1_dim1}_11", f"group_mid_{dim1}"), 

570 (f"T1_{T1_dim1}_20", f"group_mid_{dim1}"), 

571 ] 

572 ) 

573 gwf.add_edges_from(edges) 

574 

575 if final: 

576 job = cast(GenericWorkflowJob, gwf_orig.get_final()) 

577 gwf.add_final(job) 

578 

579 return gwf 

580 

581 

582def compare_generic_workflows(gwf1: GenericWorkflow, gwf2: GenericWorkflow) -> bool: 

583 """Compare two workflows printing log messages where not equal. 

584 

585 Parameters 

586 ---------- 

587 gwf1 : `lsst.ctrl.bps.GenericWorkflow` 

588 First workflow to compare. 

589 gwf2 : `lsst.ctrl.bps.GenericWorkflow` 

590 Second workflow to compare. 

591 

592 Returns 

593 ------- 

594 equal : bool 

595 Whether the two workflows are the same. 

596 """ 

597 equal = True 

598 

599 # check edges 

600 edges1 = set(gwf1.edges) 

601 edges2 = set(gwf2.edges) 

602 only_in_first = edges1 - edges2 

603 only_in_second = edges2 - edges1 

604 if only_in_first: 

605 _LOG.debug("Edges only in %s, but not in %s: %s", gwf1.name, gwf2.name, only_in_first) 

606 equal = False 

607 if only_in_second: 

608 _LOG.debug("Edges only in %s, but not in %s: %s", gwf2.name, gwf1.name, only_in_second) 

609 equal = False 

610 

611 # check nodes 

612 names1 = set(gwf1.nodes) 

613 names2 = set(gwf2.nodes) 

614 only_in_first = names1 - names2 

615 only_in_second = names2 - names1 

616 if only_in_first: 

617 _LOG.debug("Jobs only in %s, but not in %s: %s", gwf1.name, gwf2.name, only_in_first) 

618 equal = False 

619 if only_in_second: 

620 _LOG.debug("Jobs only in %s, but not in %s: %s", gwf2.name, gwf1.name, only_in_second) 

621 equal = False 

622 

623 # check node values 

624 for name in names1 & names2: 

625 job1 = gwf1.get_job(name) 

626 job2 = gwf2.get_job(name) 

627 

628 if job1.node_type != job2.node_type: 

629 _LOG.debug( 

630 "Group jobs` node_type not equal %s=%s, %s=%s", 

631 job1.name, 

632 job1.blocking, 

633 job2.name, 

634 job2.blocking, 

635 ) 

636 equal = False 

637 elif job1.node_type == GenericWorkflowNodeType.GROUP: 

638 if job1.blocking != job2.blocking: 

639 _LOG.debug( 

640 "Group jobs` blocking not equal %s=%s, %s=%s", 

641 job1.name, 

642 job1.blocking, 

643 job2.name, 

644 job2.blocking, 

645 ) 

646 equal = False 

647 

648 # compare workflows 

649 equal = equal or compare_generic_workflows(job1, job2) 

650 

651 # check final 

652 final1 = gwf1.get_final() 

653 final2 = gwf2.get_final() 

654 if final1 != final2: 

655 _LOG.debug("Final jobs are not equal: %s vs %s", final1, final2) 

656 equal = False 

657 

658 return equal