Coverage for python / lsst / ctrl / bps / transform.py: 8%

322 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:00 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Driver for the transformation of a QuantumGraph into a generic workflow.""" 

29 

30import copy 

31import dataclasses 

32import logging 

33import math 

34import os 

35import re 

36 

37from lsst.ctrl.bps import ClusteredQuantumGraph 

38from lsst.pipe.base import QuantumGraph 

39from lsst.utils.logging import VERBOSE 

40from lsst.utils.timer import timeMethod 

41 

42from . import ( 

43 DEFAULT_MEM_RETRIES, 

44 BpsConfig, 

45 GenericWorkflow, 

46 GenericWorkflowExec, 

47 GenericWorkflowFile, 

48 GenericWorkflowJob, 

49) 

50from .bps_utils import WhenToSaveQuantumGraphs, create_job_quantum_graph_filename, save_qg_subgraph 

51 

52# All available job attributes. 

53_ATTRS_ALL = frozenset([field.name for field in dataclasses.fields(GenericWorkflowJob)]) 

54 

55# Job attributes that need to be set to their maximal value in the cluster. 

56_ATTRS_MAX = frozenset( 

57 { 

58 "memory_multiplier", 

59 "number_of_retries", 

60 "request_cpus", 

61 "request_memory", 

62 "request_memory_max", 

63 } 

64) 

65 

66# Job attributes that need to be set to sum of their values in the cluster. 

67_ATTRS_SUM = frozenset( 

68 { 

69 "request_disk", 

70 "request_walltime", 

71 } 

72) 

73 

74# Job attributes do not fall into a specific category 

75_ATTRS_MISC = frozenset( 

76 { 

77 "label", # taskDef labels aren't same in job and may not match job label 

78 "cmdvals", 

79 "profile", 

80 "attrs", 

81 } 

82) 

83 

84# Attributes that need to be the same for each quanta in the cluster. 

85_ATTRS_UNIVERSAL = frozenset(_ATTRS_ALL - (_ATTRS_MAX | _ATTRS_MISC | _ATTRS_SUM)) 

86 

87_LOG = logging.getLogger(__name__) 

88 

89 

90@timeMethod(logger=_LOG, logLevel=VERBOSE) 

91def transform( 

92 config: BpsConfig, cqgraph: ClusteredQuantumGraph, prefix: str 

93) -> tuple[GenericWorkflow, BpsConfig]: 

94 """Transform a ClusteredQuantumGraph to a GenericWorkflow. 

95 

96 Parameters 

97 ---------- 

98 config : `lsst.ctrl.bps.BpsConfig` 

99 BPS configuration. 

100 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

101 A clustered quantum graph to transform into a generic workflow. 

102 prefix : `str` 

103 Root path for any output files. 

104 

105 Returns 

106 ------- 

107 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

108 The generic workflow transformed from the clustered quantum graph. 

109 generic_workflow_config : `lsst.ctrl.bps.BpsConfig` 

110 Configuration to accompany GenericWorkflow. 

111 """ 

112 if cqgraph.name is not None: 

113 name = cqgraph.name 

114 else: 

115 _, name = config.search("uniqProcName", opt={"required": True}) 

116 

117 generic_workflow = create_generic_workflow(config, cqgraph, name, prefix) 

118 generic_workflow_config = create_generic_workflow_config(config, prefix) 

119 

120 return generic_workflow, generic_workflow_config 

121 

122 

123def add_workflow_init_nodes(config, qgraph, generic_workflow): 

124 """Add nodes to workflow graph that perform initialization steps. 

125 

126 Assumes that all of the initialization should be executed prior to any 

127 of the current workflow. 

128 

129 Parameters 

130 ---------- 

131 config : `lsst.ctrl.bps.BpsConfig` 

132 BPS configuration. 

133 qgraph : `lsst.pipe.base.graph.QuantumGraph` 

134 The quantum graph the generic workflow represents. 

135 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

136 Generic workflow to which the initialization steps should be added. 

137 """ 

138 # Create a workflow graph that will have task and file nodes necessary for 

139 # initializing the pipeline execution 

140 init_workflow = create_init_workflow(config, qgraph, generic_workflow.get_file("runQgraphFile")) 

141 _LOG.debug("init_workflow nodes = %s", init_workflow.nodes()) 

142 generic_workflow.add_workflow_source(init_workflow) 

143 

144 

145def create_init_workflow( 

146 config: BpsConfig, qgraph: QuantumGraph, qgraph_gwfile: GenericWorkflowFile 

147) -> GenericWorkflow: 

148 """Create workflow for running initialization job(s). 

149 

150 Parameters 

151 ---------- 

152 config : `lsst.ctrl.bps.BpsConfig` 

153 BPS configuration. 

154 qgraph : `lsst.pipe.base.graph.QuantumGraph` 

155 The quantum graph the generic workflow represents. 

156 qgraph_gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

157 File object for the full run QuantumGraph file. 

158 

159 Returns 

160 ------- 

161 init_workflow : `lsst.ctrl.bps.GenericWorkflow` 

162 GenericWorkflow consisting of job(s) to initialize workflow. 

163 """ 

164 _LOG.debug("creating init subgraph") 

165 _LOG.debug("creating init task input(s)") 

166 search_opt = { 

167 "curvals": {"curr_pipetask": "pipetaskInit"}, 

168 "replaceVars": False, 

169 "expandEnvVars": False, 

170 "replaceEnvVars": True, 

171 "required": False, 

172 } 

173 found, value = config.search("computeSite", opt=search_opt) 

174 if found: 

175 search_opt["curvals"]["curr_site"] = value 

176 found, value = config.search("computeCloud", opt=search_opt) 

177 if found: 

178 search_opt["curvals"]["curr_cloud"] = value 

179 

180 init_workflow = GenericWorkflow("init") 

181 init_workflow.add_file(qgraph_gwfile) 

182 

183 # create job for executing --init-only 

184 gwjob = GenericWorkflowJob("pipetaskInit", "pipetaskInit") 

185 

186 job_values = _get_job_values(config, search_opt, "runQuantumCommand") 

187 job_values["name"] = "pipetaskInit" 

188 job_values["label"] = "pipetaskInit" 

189 

190 # Adjust job attributes values if necessary. 

191 _handle_job_values(job_values, gwjob) 

192 

193 init_workflow.add_job(gwjob) 

194 init_workflow.add_job_inputs(gwjob.name, [qgraph_gwfile]) 

195 _enhance_command(config, init_workflow, gwjob, {}) 

196 

197 return init_workflow 

198 

199 

200def _enhance_command(config, generic_workflow, gwjob, cached_job_values): 

201 """Enhance command line with env and file placeholders 

202 and gather command line values. 

203 

204 Parameters 

205 ---------- 

206 config : `lsst.ctrl.bps.BpsConfig` 

207 BPS configuration. 

208 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

209 Generic workflow that contains the job. 

210 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

211 Generic workflow job to which the updated executable, arguments, 

212 and values should be saved. 

213 cached_job_values : `dict` [`str`, dict[`str`, `~typing.Any`]] 

214 Cached values common across jobs with same label. Updated if values 

215 aren't already saved for given gwjob's label. 

216 """ 

217 _LOG.debug("gwjob given to _enhance_command: %s", gwjob) 

218 

219 curvals = { 

220 "curr_pipetask": gwjob.label, 

221 "curr_cluster": gwjob.label, 

222 "jobName": gwjob.name, 

223 "jobLabel": gwjob.label, 

224 } 

225 for key, value in gwjob.tags.items(): 

226 curvals[key] = value 

227 

228 search_opt = { 

229 "curvals": curvals, 

230 "replaceVars": False, 

231 "expandEnvVars": False, 

232 "replaceEnvVars": True, 

233 "required": False, 

234 } 

235 

236 if gwjob.label not in cached_job_values: 

237 cached_job_values[gwjob.label] = {} 

238 # Allowing whenSaveJobQgraph and useLazyCommands per pipetask label. 

239 key = "whenSaveJobQgraph" 

240 _, when_save = config.search(key, opt=search_opt) 

241 cached_job_values[gwjob.label][key] = WhenToSaveQuantumGraphs[when_save.upper()] 

242 

243 key = "useLazyCommands" 

244 search_opt["default"] = True 

245 _, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt) 

246 del search_opt["default"] 

247 

248 # Change qgraph variable to match whether using run or per-job qgraph 

249 # Note: these are lookup keys, not actual physical filenames. 

250 if cached_job_values[gwjob.label]["whenSaveJobQgraph"] == WhenToSaveQuantumGraphs.NEVER: 

251 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", "{runQgraphFile}") 

252 elif gwjob.name == "pipetaskInit": 

253 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", "{runQgraphFile}") 

254 else: # Needed unique file keys for per-job QuantumGraphs 

255 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", f"{{qgraphFile_{gwjob.name}}}") 

256 

257 # Replace files with special placeholders 

258 for gwfile in generic_workflow.get_job_inputs(gwjob.name): 

259 gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>") 

260 for gwfile in generic_workflow.get_job_outputs(gwjob.name): 

261 gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>") 

262 

263 # Replace wms variables with wms placeholders. 

264 gwjob.arguments = re.sub( 

265 r"{wms([^}]+)}", lambda x: f"<WMS:{x[1][0].lower() + x[1][1:]}>", gwjob.arguments 

266 ) 

267 

268 # Save dict of other values needed to complete command line. 

269 # (Be careful to not replace env variables as they may 

270 # be different in compute job.) 

271 search_opt["replaceVars"] = True 

272 for key in re.findall(r"{([^}]+)}", gwjob.arguments): 

273 if key in gwjob.cmdvals: 

274 continue 

275 elif key in cached_job_values[gwjob.label]: 

276 gwjob.cmdvals[key] = cached_job_values[gwjob.label][key] 

277 else: 

278 _, gwjob.cmdvals[key] = config.search(key, opt=search_opt) 

279 

280 # backwards compatibility 

281 if not cached_job_values[gwjob.label]["useLazyCommands"]: 

282 if "bpsUseShared" not in cached_job_values[gwjob.label]: 

283 key = "bpsUseShared" 

284 search_opt["default"] = True 

285 _, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt) 

286 del search_opt["default"] 

287 

288 gwjob.arguments = _fill_arguments( 

289 cached_job_values[gwjob.label]["bpsUseShared"], generic_workflow, gwjob.arguments, gwjob.cmdvals 

290 ) 

291 

292 

293def _fill_arguments(use_shared, generic_workflow, arguments, cmdvals): 

294 """Replace placeholders in command line string in job. 

295 

296 Parameters 

297 ---------- 

298 use_shared : `bool` 

299 Whether using shared filesystem. 

300 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

301 Generic workflow containing the job. 

302 arguments : `str` 

303 String containing placeholders. 

304 cmdvals : `dict` [`str`, `~typing.Any`] 

305 Any command line values that can be used to replace placeholders. 

306 

307 Returns 

308 ------- 

309 arguments : `str` 

310 Command line with FILE and ENV placeholders replaced. 

311 """ 

312 # Replace file placeholders 

313 for file_key in re.findall(r"<FILE:([^>]+)>", arguments): 

314 gwfile = generic_workflow.get_file(file_key) 

315 if not gwfile.wms_transfer: 

316 # Must assume full URI if in command line and told WMS is not 

317 # responsible for transferring file. 

318 uri = gwfile.src_uri 

319 elif use_shared: 

320 if gwfile.job_shared: 

321 # Have shared filesystems and jobs can share file. 

322 uri = gwfile.src_uri 

323 else: 

324 uri = os.path.basename(gwfile.src_uri) 

325 else: # Using push transfer 

326 uri = os.path.basename(gwfile.src_uri) 

327 

328 arguments = arguments.replace(f"<FILE:{file_key}>", uri) 

329 

330 # Replace env placeholder with submit-side values 

331 arguments = re.sub(r"<ENV:([^>]+)>", r"$\1", arguments) 

332 arguments = os.path.expandvars(arguments) 

333 

334 # Replace remaining vars 

335 arguments = arguments.format(**cmdvals) 

336 

337 return arguments 

338 

339 

340def _get_qgraph_gwfile(config, save_qgraph_per_job, gwjob, run_qgraph_file, prefix): 

341 """Get qgraph location to be used by job. 

342 

343 Parameters 

344 ---------- 

345 config : `lsst.ctrl.bps.BpsConfig` 

346 Bps configuration. 

347 save_qgraph_per_job : `lsst.ctrl.bps.bps_utils.WhenToSaveQuantumGraphs` 

348 What submission stage to save per-job qgraph files (or NEVER) 

349 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

350 Job for which determining QuantumGraph file. 

351 run_qgraph_file : `lsst.ctrl.bps.GenericWorkflowFile` 

352 File representation of the full run QuantumGraph. 

353 prefix : `str` 

354 Path prefix for any files written. 

355 

356 Returns 

357 ------- 

358 gwfile : `lsst.ctrl.bps.GenericWorkflowFile` 

359 Representation of butler location (may not include filename). 

360 """ 

361 qgraph_gwfile = None 

362 if save_qgraph_per_job != WhenToSaveQuantumGraphs.NEVER: 

363 qgraph_gwfile = GenericWorkflowFile( 

364 f"qgraphFile_{gwjob.name}", 

365 src_uri=create_job_quantum_graph_filename(config, gwjob, prefix), 

366 wms_transfer=True, 

367 job_access_remote=True, 

368 job_shared=True, 

369 ) 

370 else: 

371 qgraph_gwfile = run_qgraph_file 

372 

373 return qgraph_gwfile 

374 

375 

376def _get_job_values(config, search_opt, cmd_line_key): 

377 """Gather generic workflow job values from the bps config. 

378 

379 Parameters 

380 ---------- 

381 config : `lsst.ctrl.bps.BpsConfig` 

382 Bps configuration. 

383 search_opt : `dict` [`str`, `~typing.Any`] 

384 Search options to be used when searching config. 

385 cmd_line_key : `str` or None 

386 Which command line key to search for (e.g., "runQuantumCommand"). 

387 

388 Returns 

389 ------- 

390 job_values : `dict` [ `str`, `~typing.Any` ]` 

391 A mapping between job attributes and their values. 

392 """ 

393 _LOG.debug("cmd_line_key=%s, search_opt=%s", cmd_line_key, search_opt) 

394 

395 # Create a dummy job to easily access the default values. 

396 default_gwjob = GenericWorkflowJob("default_job", "default_label") 

397 

398 job_values = {} 

399 for attr in _ATTRS_ALL: 

400 # Variable names in yaml are camel case instead of snake case. 

401 yaml_name = re.sub(r"_(\S)", lambda match: match.group(1).upper(), attr) 

402 found, value = config.search(yaml_name, opt=search_opt) 

403 if found: 

404 job_values[attr] = value 

405 else: 

406 job_values[attr] = getattr(default_gwjob, attr) 

407 

408 # Need to replace all config variables in environment values 

409 # While replacing variables, convert to plain dict 

410 if job_values["environment"]: 

411 old_searchobj = search_opt.get("searchobj", None) 

412 old_replace_vars = search_opt.get("replaceVars", None) 

413 job_env = job_values["environment"] 

414 search_opt["searchobj"] = job_env 

415 search_opt["replaceVars"] = True 

416 job_values["environment"] = {} 

417 for name in job_env: 

418 job_values["environment"][name] = str(config.search(name, search_opt)[1]) 

419 if old_searchobj is None: 

420 del search_opt["searchobj"] 

421 else: 

422 search_opt["searchobj"] = old_searchobj 

423 if old_replace_vars is None: 

424 del search_opt["replaceVars"] 

425 else: 

426 search_opt["replaceVars"] = old_replace_vars 

427 

428 # If the automatic memory scaling is enabled (i.e. the memory multiplier 

429 # is set and it is a positive number greater than 1.0), adjust number 

430 # of retries when necessary. If the memory multiplier is invalid, disable 

431 # automatic memory scaling. 

432 if job_values["memory_multiplier"] is not None: 

433 if math.ceil(float(job_values["memory_multiplier"])) > 1: 

434 if job_values["number_of_retries"] is None: 

435 job_values["number_of_retries"] = DEFAULT_MEM_RETRIES 

436 else: 

437 job_values["memory_multiplier"] = None 

438 

439 if cmd_line_key: 

440 found, cmdline = config.search(cmd_line_key, opt=search_opt) 

441 # Make sure cmdline isn't None as that could be sent in as a 

442 # default value in search_opt. 

443 if found and cmdline: 

444 cmd, args = cmdline.split(" ", 1) 

445 job_values["executable"] = GenericWorkflowExec(os.path.basename(cmd), cmd, False) 

446 if args: 

447 job_values["arguments"] = args 

448 

449 return job_values 

450 

451 

452def _handle_job_values(quantum_job_values, gwjob, attributes=_ATTRS_ALL): 

453 """Set the job attributes in the cluster to their correct values. 

454 

455 Parameters 

456 ---------- 

457 quantum_job_values : `dict` [`str`, Any] 

458 Job values for running single Quantum. 

459 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

460 Generic workflow job in which to store the universal values. 

461 attributes : `~collections.abc.Iterable` [`str`], optional 

462 Job attributes to be set in the job following different rules. 

463 The default value is _ATTRS_ALL. 

464 """ 

465 _LOG.debug("Call to _handle_job_values") 

466 _handle_job_values_universal(quantum_job_values, gwjob, attributes) 

467 _handle_job_values_max(quantum_job_values, gwjob, attributes) 

468 _handle_job_values_sum(quantum_job_values, gwjob, attributes) 

469 

470 

471def _handle_job_values_universal(quantum_job_values, gwjob, attributes=_ATTRS_UNIVERSAL): 

472 """Handle job attributes that must have the same value for every quantum 

473 in the cluster. 

474 

475 Parameters 

476 ---------- 

477 quantum_job_values : `dict` [`str`, Any] 

478 Job values for running single Quantum. 

479 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

480 Generic workflow job in which to store the universal values. 

481 attributes : `~collections.abc.Iterable` [`str`], optional 

482 Job attributes to be set in the job following different rules. 

483 The default value is _ATTRS_UNIVERSAL. 

484 """ 

485 for attr in _ATTRS_UNIVERSAL & set(attributes): 

486 _LOG.debug( 

487 "Handling job %s (job=%s, quantum=%s)", 

488 attr, 

489 getattr(gwjob, attr), 

490 quantum_job_values.get(attr, "MISSING"), 

491 ) 

492 current_value = getattr(gwjob, attr) 

493 try: 

494 quantum_value = quantum_job_values[attr] 

495 except KeyError: 

496 continue 

497 else: 

498 if not current_value: 

499 setattr(gwjob, attr, quantum_value) 

500 elif current_value != quantum_value: 

501 _LOG.error( 

502 "Inconsistent value for %s in Cluster %s Quantum Number %s\n" 

503 "Current cluster value: %s\n" 

504 "Quantum value: %s", 

505 attr, 

506 gwjob.name, 

507 quantum_job_values.get("qgraphNodeId", "MISSING"), 

508 current_value, 

509 quantum_value, 

510 ) 

511 raise RuntimeError(f"Inconsistent value for {attr} in cluster {gwjob.name}.") 

512 

513 

514def _handle_job_values_max(quantum_job_values, gwjob, attributes=_ATTRS_MAX): 

515 """Handle job attributes that should be set to their maximum value in 

516 the in cluster. 

517 

518 Parameters 

519 ---------- 

520 quantum_job_values : `dict` [`str`, `~typing.Any`] 

521 Job values for running single Quantum. 

522 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

523 Generic workflow job in which to store the aggregate values. 

524 attributes : `~collections.abc.Iterable` [`str`], optional 

525 Job attributes to be set in the job following different rules. 

526 The default value is _ATTR_MAX. 

527 """ 

528 for attr in _ATTRS_MAX & set(attributes): 

529 current_value = getattr(gwjob, attr) 

530 try: 

531 quantum_value = quantum_job_values[attr] 

532 except KeyError: 

533 continue 

534 else: 

535 needs_update = False 

536 if current_value is None: 

537 if quantum_value is not None: 

538 needs_update = True 

539 else: 

540 if quantum_value is not None and current_value < quantum_value: 

541 needs_update = True 

542 if needs_update: 

543 setattr(gwjob, attr, quantum_value) 

544 

545 # When updating memory requirements for a job, check if memory 

546 # autoscaling is enabled. If it is, always use the memory 

547 # multiplier and the number of retries which comes with the 

548 # quantum. 

549 # 

550 # Note that as a result, the quantum with the biggest memory 

551 # requirements will determine whether the memory autoscaling 

552 # will be enabled (or disabled) depending on the value of its 

553 # memory multiplier. 

554 if attr == "request_memory": 

555 gwjob.memory_multiplier = quantum_job_values["memory_multiplier"] 

556 if gwjob.memory_multiplier is not None: 

557 gwjob.number_of_retries = quantum_job_values["number_of_retries"] 

558 

559 

560def _handle_job_values_sum(quantum_job_values, gwjob, attributes=_ATTRS_SUM): 

561 """Handle job attributes that are the sum of their values in the cluster. 

562 

563 Parameters 

564 ---------- 

565 quantum_job_values : `dict` [`str`, `~typing.Any`] 

566 Job values for running single Quantum. 

567 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

568 Generic workflow job in which to store the aggregate values. 

569 attributes : `~collections.abc.Iterable` [`str`], optional 

570 Job attributes to be set in the job following different rules. 

571 The default value is _ATTRS_SUM. 

572 """ 

573 for attr in _ATTRS_SUM & set(attributes): 

574 current_value = getattr(gwjob, attr) 

575 if not current_value: 

576 setattr(gwjob, attr, quantum_job_values[attr]) 

577 else: 

578 setattr(gwjob, attr, current_value + quantum_job_values[attr]) 

579 

580 

581def create_generic_workflow( 

582 config: BpsConfig, cqgraph: ClusteredQuantumGraph, name: str, prefix: str 

583) -> GenericWorkflow: 

584 """Create a generic workflow from a ClusteredQuantumGraph such that it 

585 has information needed for WMS (e.g., command lines). 

586 

587 Parameters 

588 ---------- 

589 config : `lsst.ctrl.bps.BpsConfig` 

590 BPS configuration. 

591 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

592 ClusteredQuantumGraph for running a specific pipeline on a specific 

593 payload. 

594 name : `str` 

595 Name for the workflow (typically unique). 

596 prefix : `str` 

597 Root path for any output files. 

598 

599 Returns 

600 ------- 

601 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

602 Generic workflow for the given ClusteredQuantumGraph + config. 

603 """ 

604 # Determine whether saving per-job QuantumGraph files in the loop. 

605 _, when_save = config.search("whenSaveJobQgraph", {"default": WhenToSaveQuantumGraphs.TRANSFORM.name}) 

606 save_qgraph_per_job = WhenToSaveQuantumGraphs[when_save.upper()] 

607 

608 search_opt = {"replaceVars": False, "expandEnvVars": False, "replaceEnvVars": True, "required": False} 

609 

610 generic_workflow = GenericWorkflow(name) 

611 

612 # Save full run QuantumGraph for use by jobs 

613 generic_workflow.add_file( 

614 GenericWorkflowFile( 

615 "runQgraphFile", 

616 src_uri=config["runQgraphFile"], 

617 wms_transfer=True, 

618 job_access_remote=True, 

619 job_shared=True, 

620 ) 

621 ) 

622 

623 # Cache pipetask specific or more generic job values to minimize number 

624 # on config searches. 

625 cached_job_values = {} 

626 cached_pipetask_values = {} 

627 

628 for cluster in cqgraph.clusters(): 

629 _LOG.debug("Loop over clusters: %s, %s", cluster, type(cluster)) 

630 _LOG.debug( 

631 "cqgraph: name=%s, len=%s, label=%s, ids=%s", 

632 cluster.name, 

633 len(cluster.qgraph_node_ids), 

634 cluster.label, 

635 cluster.qgraph_node_ids, 

636 ) 

637 

638 gwjob = GenericWorkflowJob(cluster.name, cluster.label) 

639 

640 # First get job values from cluster or cluster config 

641 search_opt["curvals"] = {"curr_cluster": cluster.label} 

642 found, value = config.search("computeSite", opt=search_opt) 

643 if found: 

644 search_opt["curvals"]["curr_site"] = value 

645 found, value = config.search("computeCloud", opt=search_opt) 

646 if found: 

647 search_opt["curvals"]["curr_cloud"] = value 

648 

649 # If some config values are set for this cluster 

650 if cluster.label not in cached_job_values: 

651 _LOG.debug("config['cluster'][%s] = %s", cluster.label, config["cluster"][cluster.label]) 

652 cached_job_values[cluster.label] = {} 

653 

654 # Allowing whenSaveJobQgraph and useLazyCommands per cluster label. 

655 key = "whenSaveJobQgraph" 

656 _, when_save = config.search(key, opt=search_opt) 

657 cached_job_values[cluster.label][key] = WhenToSaveQuantumGraphs[when_save.upper()] 

658 

659 key = "useLazyCommands" 

660 search_opt["default"] = True 

661 _, cached_job_values[cluster.label][key] = config.search(key, opt=search_opt) 

662 del search_opt["default"] 

663 

664 if cluster.label in config["cluster"]: 

665 # Don't want to get global defaults here so only look in 

666 # cluster section. 

667 cached_job_values[cluster.label].update( 

668 _get_job_values(config["cluster"][cluster.label], search_opt, "runQuantumCommand") 

669 ) 

670 cluster_job_values = copy.copy(cached_job_values[cluster.label]) 

671 

672 cluster_job_values["name"] = cluster.name 

673 cluster_job_values["label"] = cluster.label 

674 cluster_job_values["quanta_counts"] = cluster.quanta_counts 

675 cluster_job_values["tags"] = cluster.tags 

676 _LOG.debug("cluster_job_values = %s", cluster_job_values) 

677 _handle_job_values(cluster_job_values, gwjob, cluster_job_values.keys()) 

678 

679 # For purposes of whether to continue searching for a value is whether 

680 # the value evaluates to False. 

681 unset_attributes = {attr for attr in _ATTRS_ALL if not getattr(gwjob, attr)} 

682 

683 _LOG.debug("unset_attributes=%s", unset_attributes) 

684 _LOG.debug("set=%s", _ATTRS_ALL - unset_attributes) 

685 

686 # For job info not defined at cluster level, attempt to get job info 

687 # either common or aggregate for all Quanta in cluster. 

688 for node_id in iter(cluster.qgraph_node_ids): 

689 _LOG.debug("node_id=%s", node_id) 

690 quantum_info = cqgraph.get_quantum_info(node_id) 

691 

692 task_label = quantum_info["task_label"] 

693 if task_label not in cached_pipetask_values: 

694 search_opt["curvals"]["curr_pipetask"] = task_label 

695 cached_pipetask_values[task_label] = _get_job_values(config, search_opt, "runQuantumCommand") 

696 _handle_job_values(cached_pipetask_values[task_label], gwjob, unset_attributes) 

697 

698 # Update job with workflow attribute and profile values. 

699 qgraph_gwfile = _get_qgraph_gwfile( 

700 config, save_qgraph_per_job, gwjob, generic_workflow.get_file("runQgraphFile"), prefix 

701 ) 

702 

703 generic_workflow.add_job(gwjob) 

704 generic_workflow.add_job_inputs(gwjob.name, [qgraph_gwfile]) 

705 

706 gwjob.cmdvals["qgraphNodeId"] = ",".join( 

707 sorted([f"{node_id}" for node_id in cluster.qgraph_node_ids]) 

708 ) 

709 _enhance_command(config, generic_workflow, gwjob, cached_job_values) 

710 

711 # If writing per-job QuantumGraph files during TRANSFORM stage, 

712 # write it now while in memory. 

713 if save_qgraph_per_job == WhenToSaveQuantumGraphs.TRANSFORM: 

714 save_qg_subgraph(cqgraph.qgraph, qgraph_gwfile.src_uri, cluster.qgraph_node_ids) 

715 

716 # Create job dependencies. 

717 for parent in cqgraph.clusters(): 

718 for child in cqgraph.successors(parent): 

719 generic_workflow.add_job_relationships(parent.name, child.name) 

720 

721 # Add initial workflow. 

722 if config.get("runInit", "{default: False}"): 

723 add_workflow_init_nodes(config, cqgraph.qgraph, generic_workflow) 

724 

725 generic_workflow.run_attrs.update( 

726 { 

727 "bps_isjob": "True", 

728 "bps_project": config["project"], 

729 "bps_campaign": config["campaign"], 

730 "bps_run": generic_workflow.name, 

731 "bps_operator": config["operator"], 

732 "bps_payload": config["payloadName"], 

733 "bps_runsite": config["computeSite"], 

734 } 

735 ) 

736 

737 # Add final job 

738 add_final_job(config, generic_workflow, prefix) 

739 

740 if "ordering" in config: 

741 generic_workflow.add_special_job_ordering(config["ordering"]) 

742 

743 return generic_workflow 

744 

745 

746def create_generic_workflow_config(config, prefix): 

747 """Create generic workflow configuration. 

748 

749 Parameters 

750 ---------- 

751 config : `lsst.ctrl.bps.BpsConfig` 

752 Bps configuration. 

753 prefix : `str` 

754 Root path for any output files. 

755 

756 Returns 

757 ------- 

758 generic_workflow_config : `lsst.ctrl.bps.BpsConfig` 

759 Configuration accompanying the GenericWorkflow. 

760 """ 

761 generic_workflow_config = BpsConfig(config) 

762 generic_workflow_config["workflowName"] = config["uniqProcName"] 

763 generic_workflow_config["workflowPath"] = prefix 

764 return generic_workflow_config 

765 

766 

767def add_final_job(config: BpsConfig, generic_workflow: GenericWorkflow, prefix: str) -> None: 

768 """Add final workflow job depending upon configuration. 

769 

770 Depending on configuration, the final job will be added as a special job 

771 which will always run regardless of the exit status of the workflow or 

772 a regular sink node which will only run if the workflow execution finished 

773 with no errors. 

774 

775 Parameters 

776 ---------- 

777 config : `lsst.ctrl.bps.BpsConfig` 

778 Bps configuration. 

779 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

780 Generic workflow to which attributes should be added. 

781 prefix : `str` 

782 Directory in which to output final script. 

783 """ 

784 _, when_run = config.search(".finalJob.whenRun") 

785 if when_run.upper() != "NEVER": 

786 gwjob = create_final_job(config, generic_workflow, prefix) 

787 if when_run.upper() == "ALWAYS": 

788 generic_workflow.add_final(gwjob) 

789 elif when_run.upper() == "SUCCESS": 

790 add_final_job_as_sink(generic_workflow, gwjob) 

791 else: 

792 raise ValueError(f"Invalid value for finalJob.whenRun: {when_run}") 

793 

794 

795def create_final_job(config: BpsConfig, generic_workflow: GenericWorkflow, prefix: str) -> GenericWorkflowJob: 

796 """Create the final workflow job. 

797 

798 Parameters 

799 ---------- 

800 config : `lsst.ctrl.bps.BpsConfig` 

801 Bps configuration. 

802 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

803 Generic workflow to which attributes should be added. 

804 prefix : `str` 

805 Directory in which to output final script. 

806 

807 Returns 

808 ------- 

809 final_job : `lsst.ctrl.bps.GenericWorkflowJob` 

810 Final workflow job. 

811 """ 

812 job_name = "finalJob" 

813 gwjob = GenericWorkflowJob(job_name, job_name) 

814 

815 search_opt = {"searchobj": config[job_name], "curvals": {}, "default": None} 

816 found, value = config.search("computeSite", opt=search_opt) 

817 if found: 

818 search_opt["curvals"]["curr_site"] = value 

819 found, value = config.search("computeCloud", opt=search_opt) 

820 if found: 

821 search_opt["curvals"]["curr_cloud"] = value 

822 

823 # Set job attributes based on the values find in the config excluding 

824 # the ones in the _ATTRS_MISC group. The attributes in this group are 

825 # somewhat "special": 

826 # * HTCondor plugin, which uses 'attrs' and 'profile', has its own 

827 # mechanism for setting them, 

828 # * 'cmdvals' is being set internally, not via config. 

829 job_values = _get_job_values(config, search_opt, None) 

830 for attr in _ATTRS_ALL - _ATTRS_MISC: 

831 if not getattr(gwjob, attr) and job_values.get(attr, None): 

832 setattr(gwjob, attr, job_values[attr]) 

833 

834 # Create script and add command line to job. 

835 gwjob.executable, gwjob.arguments = create_final_command(config, prefix) 

836 

837 # Determine inputs from command line. 

838 for file_key in re.findall(r"<FILE:([^>]+)>", gwjob.arguments): 

839 gwfile = generic_workflow.get_file(file_key) 

840 generic_workflow.add_job_inputs(gwjob.name, gwfile) 

841 

842 _enhance_command(config, generic_workflow, gwjob, {}) 

843 return gwjob 

844 

845 

846def create_final_command(config: BpsConfig, prefix: str) -> tuple[GenericWorkflowExec, str]: 

847 """Create the command and shell script for the final job. 

848 

849 Parameters 

850 ---------- 

851 config : `lsst.ctrl.bps.BpsConfig` 

852 Bps configuration. 

853 prefix : `str` 

854 Directory in which to output final script. 

855 

856 Returns 

857 ------- 

858 executable : `lsst.ctrl.bps.GenericWorkflowExec` 

859 Executable object for the final script. 

860 arguments : `str` 

861 Command line needed to call the final script. 

862 

863 Raises 

864 ------ 

865 RuntimeError if no commands found. 

866 """ 

867 search_opt = { 

868 "replaceVars": True, 

869 "skipNames": ["butlerConfig", "qgraphFile"], 

870 "replaceEnvVars": False, 

871 "expandEnvVars": False, 

872 "searchobj": config["finalJob"], 

873 } 

874 

875 script_file = os.path.join(prefix, "final_job.bash") 

876 with open(script_file, "w", encoding="utf8") as fh: 

877 print("#!/bin/bash\n", file=fh) 

878 print("set -e", file=fh) 

879 print("set -x", file=fh) 

880 

881 print("qgraphFile=$1", file=fh) 

882 print("butlerConfig=$2", file=fh) 

883 

884 command_len = 0 # Make sure at least write one actual command 

885 i = 1 

886 found, command = config.search(f"command{i}", opt=search_opt) 

887 while found: 

888 # The files will be args to script, so change to shell vars 

889 command = command.replace("{qgraphFile}", "${qgraphFile}") 

890 command = command.replace("{butlerConfig}", "${butlerConfig}") 

891 

892 print(command, file=fh) 

893 command_len += len(command.strip()) 

894 

895 # Search for next command 

896 i += 1 

897 found, command = config.search(f"command{i}", opt=search_opt) 

898 if command_len == 0: 

899 raise RuntimeError( 

900 "No finalJob commands were found. Use NEVER for finalJob.whenRun to turn off finalJob" 

901 ) 

902 os.chmod(script_file, 0o755) 

903 executable = GenericWorkflowExec(os.path.basename(script_file), script_file, True) 

904 

905 _, orig_butler = config.search("butlerConfig") 

906 return executable, f"<FILE:runQgraphFile> {orig_butler}" 

907 

908 

909def add_final_job_as_sink(generic_workflow, final_job): 

910 """Add final job as the single sink for the workflow. 

911 

912 Parameters 

913 ---------- 

914 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

915 Generic workflow to which attributes should be added. 

916 final_job : `lsst.ctrl.bps.GenericWorkflowJob` 

917 Job to add as new sink node depending upon all previous sink nodes. 

918 """ 

919 # Find sink nodes of generic workflow graph. 

920 gw_sinks = [n for n in generic_workflow if generic_workflow.out_degree(n) == 0] 

921 _LOG.debug("gw_sinks = %s", gw_sinks) 

922 

923 generic_workflow.add_job(final_job) 

924 generic_workflow.add_job_relationships(gw_sinks, final_job.name)