Coverage for python / lsst / ctrl / bps / transform.py: 8%
322 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Driver for the transformation of a QuantumGraph into a generic workflow."""
30import copy
31import dataclasses
32import logging
33import math
34import os
35import re
37from lsst.ctrl.bps import ClusteredQuantumGraph
38from lsst.pipe.base import QuantumGraph
39from lsst.utils.logging import VERBOSE
40from lsst.utils.timer import timeMethod
42from . import (
43 DEFAULT_MEM_RETRIES,
44 BpsConfig,
45 GenericWorkflow,
46 GenericWorkflowExec,
47 GenericWorkflowFile,
48 GenericWorkflowJob,
49)
50from .bps_utils import WhenToSaveQuantumGraphs, create_job_quantum_graph_filename, save_qg_subgraph
52# All available job attributes.
53_ATTRS_ALL = frozenset([field.name for field in dataclasses.fields(GenericWorkflowJob)])
55# Job attributes that need to be set to their maximal value in the cluster.
56_ATTRS_MAX = frozenset(
57 {
58 "memory_multiplier",
59 "number_of_retries",
60 "request_cpus",
61 "request_memory",
62 "request_memory_max",
63 }
64)
66# Job attributes that need to be set to sum of their values in the cluster.
67_ATTRS_SUM = frozenset(
68 {
69 "request_disk",
70 "request_walltime",
71 }
72)
74# Job attributes do not fall into a specific category
75_ATTRS_MISC = frozenset(
76 {
77 "label", # taskDef labels aren't same in job and may not match job label
78 "cmdvals",
79 "profile",
80 "attrs",
81 }
82)
84# Attributes that need to be the same for each quanta in the cluster.
85_ATTRS_UNIVERSAL = frozenset(_ATTRS_ALL - (_ATTRS_MAX | _ATTRS_MISC | _ATTRS_SUM))
87_LOG = logging.getLogger(__name__)
90@timeMethod(logger=_LOG, logLevel=VERBOSE)
91def transform(
92 config: BpsConfig, cqgraph: ClusteredQuantumGraph, prefix: str
93) -> tuple[GenericWorkflow, BpsConfig]:
94 """Transform a ClusteredQuantumGraph to a GenericWorkflow.
96 Parameters
97 ----------
98 config : `lsst.ctrl.bps.BpsConfig`
99 BPS configuration.
100 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
101 A clustered quantum graph to transform into a generic workflow.
102 prefix : `str`
103 Root path for any output files.
105 Returns
106 -------
107 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
108 The generic workflow transformed from the clustered quantum graph.
109 generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
110 Configuration to accompany GenericWorkflow.
111 """
112 if cqgraph.name is not None:
113 name = cqgraph.name
114 else:
115 _, name = config.search("uniqProcName", opt={"required": True})
117 generic_workflow = create_generic_workflow(config, cqgraph, name, prefix)
118 generic_workflow_config = create_generic_workflow_config(config, prefix)
120 return generic_workflow, generic_workflow_config
123def add_workflow_init_nodes(config, qgraph, generic_workflow):
124 """Add nodes to workflow graph that perform initialization steps.
126 Assumes that all of the initialization should be executed prior to any
127 of the current workflow.
129 Parameters
130 ----------
131 config : `lsst.ctrl.bps.BpsConfig`
132 BPS configuration.
133 qgraph : `lsst.pipe.base.graph.QuantumGraph`
134 The quantum graph the generic workflow represents.
135 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
136 Generic workflow to which the initialization steps should be added.
137 """
138 # Create a workflow graph that will have task and file nodes necessary for
139 # initializing the pipeline execution
140 init_workflow = create_init_workflow(config, qgraph, generic_workflow.get_file("runQgraphFile"))
141 _LOG.debug("init_workflow nodes = %s", init_workflow.nodes())
142 generic_workflow.add_workflow_source(init_workflow)
145def create_init_workflow(
146 config: BpsConfig, qgraph: QuantumGraph, qgraph_gwfile: GenericWorkflowFile
147) -> GenericWorkflow:
148 """Create workflow for running initialization job(s).
150 Parameters
151 ----------
152 config : `lsst.ctrl.bps.BpsConfig`
153 BPS configuration.
154 qgraph : `lsst.pipe.base.graph.QuantumGraph`
155 The quantum graph the generic workflow represents.
156 qgraph_gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
157 File object for the full run QuantumGraph file.
159 Returns
160 -------
161 init_workflow : `lsst.ctrl.bps.GenericWorkflow`
162 GenericWorkflow consisting of job(s) to initialize workflow.
163 """
164 _LOG.debug("creating init subgraph")
165 _LOG.debug("creating init task input(s)")
166 search_opt = {
167 "curvals": {"curr_pipetask": "pipetaskInit"},
168 "replaceVars": False,
169 "expandEnvVars": False,
170 "replaceEnvVars": True,
171 "required": False,
172 }
173 found, value = config.search("computeSite", opt=search_opt)
174 if found:
175 search_opt["curvals"]["curr_site"] = value
176 found, value = config.search("computeCloud", opt=search_opt)
177 if found:
178 search_opt["curvals"]["curr_cloud"] = value
180 init_workflow = GenericWorkflow("init")
181 init_workflow.add_file(qgraph_gwfile)
183 # create job for executing --init-only
184 gwjob = GenericWorkflowJob("pipetaskInit", "pipetaskInit")
186 job_values = _get_job_values(config, search_opt, "runQuantumCommand")
187 job_values["name"] = "pipetaskInit"
188 job_values["label"] = "pipetaskInit"
190 # Adjust job attributes values if necessary.
191 _handle_job_values(job_values, gwjob)
193 init_workflow.add_job(gwjob)
194 init_workflow.add_job_inputs(gwjob.name, [qgraph_gwfile])
195 _enhance_command(config, init_workflow, gwjob, {})
197 return init_workflow
200def _enhance_command(config, generic_workflow, gwjob, cached_job_values):
201 """Enhance command line with env and file placeholders
202 and gather command line values.
204 Parameters
205 ----------
206 config : `lsst.ctrl.bps.BpsConfig`
207 BPS configuration.
208 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
209 Generic workflow that contains the job.
210 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
211 Generic workflow job to which the updated executable, arguments,
212 and values should be saved.
213 cached_job_values : `dict` [`str`, dict[`str`, `~typing.Any`]]
214 Cached values common across jobs with same label. Updated if values
215 aren't already saved for given gwjob's label.
216 """
217 _LOG.debug("gwjob given to _enhance_command: %s", gwjob)
219 curvals = {
220 "curr_pipetask": gwjob.label,
221 "curr_cluster": gwjob.label,
222 "jobName": gwjob.name,
223 "jobLabel": gwjob.label,
224 }
225 for key, value in gwjob.tags.items():
226 curvals[key] = value
228 search_opt = {
229 "curvals": curvals,
230 "replaceVars": False,
231 "expandEnvVars": False,
232 "replaceEnvVars": True,
233 "required": False,
234 }
236 if gwjob.label not in cached_job_values:
237 cached_job_values[gwjob.label] = {}
238 # Allowing whenSaveJobQgraph and useLazyCommands per pipetask label.
239 key = "whenSaveJobQgraph"
240 _, when_save = config.search(key, opt=search_opt)
241 cached_job_values[gwjob.label][key] = WhenToSaveQuantumGraphs[when_save.upper()]
243 key = "useLazyCommands"
244 search_opt["default"] = True
245 _, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt)
246 del search_opt["default"]
248 # Change qgraph variable to match whether using run or per-job qgraph
249 # Note: these are lookup keys, not actual physical filenames.
250 if cached_job_values[gwjob.label]["whenSaveJobQgraph"] == WhenToSaveQuantumGraphs.NEVER:
251 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", "{runQgraphFile}")
252 elif gwjob.name == "pipetaskInit":
253 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", "{runQgraphFile}")
254 else: # Needed unique file keys for per-job QuantumGraphs
255 gwjob.arguments = gwjob.arguments.replace("{qgraphFile}", f"{{qgraphFile_{gwjob.name}}}")
257 # Replace files with special placeholders
258 for gwfile in generic_workflow.get_job_inputs(gwjob.name):
259 gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>")
260 for gwfile in generic_workflow.get_job_outputs(gwjob.name):
261 gwjob.arguments = gwjob.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>")
263 # Replace wms variables with wms placeholders.
264 gwjob.arguments = re.sub(
265 r"{wms([^}]+)}", lambda x: f"<WMS:{x[1][0].lower() + x[1][1:]}>", gwjob.arguments
266 )
268 # Save dict of other values needed to complete command line.
269 # (Be careful to not replace env variables as they may
270 # be different in compute job.)
271 search_opt["replaceVars"] = True
272 for key in re.findall(r"{([^}]+)}", gwjob.arguments):
273 if key in gwjob.cmdvals:
274 continue
275 elif key in cached_job_values[gwjob.label]:
276 gwjob.cmdvals[key] = cached_job_values[gwjob.label][key]
277 else:
278 _, gwjob.cmdvals[key] = config.search(key, opt=search_opt)
280 # backwards compatibility
281 if not cached_job_values[gwjob.label]["useLazyCommands"]:
282 if "bpsUseShared" not in cached_job_values[gwjob.label]:
283 key = "bpsUseShared"
284 search_opt["default"] = True
285 _, cached_job_values[gwjob.label][key] = config.search(key, opt=search_opt)
286 del search_opt["default"]
288 gwjob.arguments = _fill_arguments(
289 cached_job_values[gwjob.label]["bpsUseShared"], generic_workflow, gwjob.arguments, gwjob.cmdvals
290 )
293def _fill_arguments(use_shared, generic_workflow, arguments, cmdvals):
294 """Replace placeholders in command line string in job.
296 Parameters
297 ----------
298 use_shared : `bool`
299 Whether using shared filesystem.
300 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
301 Generic workflow containing the job.
302 arguments : `str`
303 String containing placeholders.
304 cmdvals : `dict` [`str`, `~typing.Any`]
305 Any command line values that can be used to replace placeholders.
307 Returns
308 -------
309 arguments : `str`
310 Command line with FILE and ENV placeholders replaced.
311 """
312 # Replace file placeholders
313 for file_key in re.findall(r"<FILE:([^>]+)>", arguments):
314 gwfile = generic_workflow.get_file(file_key)
315 if not gwfile.wms_transfer:
316 # Must assume full URI if in command line and told WMS is not
317 # responsible for transferring file.
318 uri = gwfile.src_uri
319 elif use_shared:
320 if gwfile.job_shared:
321 # Have shared filesystems and jobs can share file.
322 uri = gwfile.src_uri
323 else:
324 uri = os.path.basename(gwfile.src_uri)
325 else: # Using push transfer
326 uri = os.path.basename(gwfile.src_uri)
328 arguments = arguments.replace(f"<FILE:{file_key}>", uri)
330 # Replace env placeholder with submit-side values
331 arguments = re.sub(r"<ENV:([^>]+)>", r"$\1", arguments)
332 arguments = os.path.expandvars(arguments)
334 # Replace remaining vars
335 arguments = arguments.format(**cmdvals)
337 return arguments
340def _get_qgraph_gwfile(config, save_qgraph_per_job, gwjob, run_qgraph_file, prefix):
341 """Get qgraph location to be used by job.
343 Parameters
344 ----------
345 config : `lsst.ctrl.bps.BpsConfig`
346 Bps configuration.
347 save_qgraph_per_job : `lsst.ctrl.bps.bps_utils.WhenToSaveQuantumGraphs`
348 What submission stage to save per-job qgraph files (or NEVER)
349 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
350 Job for which determining QuantumGraph file.
351 run_qgraph_file : `lsst.ctrl.bps.GenericWorkflowFile`
352 File representation of the full run QuantumGraph.
353 prefix : `str`
354 Path prefix for any files written.
356 Returns
357 -------
358 gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
359 Representation of butler location (may not include filename).
360 """
361 qgraph_gwfile = None
362 if save_qgraph_per_job != WhenToSaveQuantumGraphs.NEVER:
363 qgraph_gwfile = GenericWorkflowFile(
364 f"qgraphFile_{gwjob.name}",
365 src_uri=create_job_quantum_graph_filename(config, gwjob, prefix),
366 wms_transfer=True,
367 job_access_remote=True,
368 job_shared=True,
369 )
370 else:
371 qgraph_gwfile = run_qgraph_file
373 return qgraph_gwfile
376def _get_job_values(config, search_opt, cmd_line_key):
377 """Gather generic workflow job values from the bps config.
379 Parameters
380 ----------
381 config : `lsst.ctrl.bps.BpsConfig`
382 Bps configuration.
383 search_opt : `dict` [`str`, `~typing.Any`]
384 Search options to be used when searching config.
385 cmd_line_key : `str` or None
386 Which command line key to search for (e.g., "runQuantumCommand").
388 Returns
389 -------
390 job_values : `dict` [ `str`, `~typing.Any` ]`
391 A mapping between job attributes and their values.
392 """
393 _LOG.debug("cmd_line_key=%s, search_opt=%s", cmd_line_key, search_opt)
395 # Create a dummy job to easily access the default values.
396 default_gwjob = GenericWorkflowJob("default_job", "default_label")
398 job_values = {}
399 for attr in _ATTRS_ALL:
400 # Variable names in yaml are camel case instead of snake case.
401 yaml_name = re.sub(r"_(\S)", lambda match: match.group(1).upper(), attr)
402 found, value = config.search(yaml_name, opt=search_opt)
403 if found:
404 job_values[attr] = value
405 else:
406 job_values[attr] = getattr(default_gwjob, attr)
408 # Need to replace all config variables in environment values
409 # While replacing variables, convert to plain dict
410 if job_values["environment"]:
411 old_searchobj = search_opt.get("searchobj", None)
412 old_replace_vars = search_opt.get("replaceVars", None)
413 job_env = job_values["environment"]
414 search_opt["searchobj"] = job_env
415 search_opt["replaceVars"] = True
416 job_values["environment"] = {}
417 for name in job_env:
418 job_values["environment"][name] = str(config.search(name, search_opt)[1])
419 if old_searchobj is None:
420 del search_opt["searchobj"]
421 else:
422 search_opt["searchobj"] = old_searchobj
423 if old_replace_vars is None:
424 del search_opt["replaceVars"]
425 else:
426 search_opt["replaceVars"] = old_replace_vars
428 # If the automatic memory scaling is enabled (i.e. the memory multiplier
429 # is set and it is a positive number greater than 1.0), adjust number
430 # of retries when necessary. If the memory multiplier is invalid, disable
431 # automatic memory scaling.
432 if job_values["memory_multiplier"] is not None:
433 if math.ceil(float(job_values["memory_multiplier"])) > 1:
434 if job_values["number_of_retries"] is None:
435 job_values["number_of_retries"] = DEFAULT_MEM_RETRIES
436 else:
437 job_values["memory_multiplier"] = None
439 if cmd_line_key:
440 found, cmdline = config.search(cmd_line_key, opt=search_opt)
441 # Make sure cmdline isn't None as that could be sent in as a
442 # default value in search_opt.
443 if found and cmdline:
444 cmd, args = cmdline.split(" ", 1)
445 job_values["executable"] = GenericWorkflowExec(os.path.basename(cmd), cmd, False)
446 if args:
447 job_values["arguments"] = args
449 return job_values
452def _handle_job_values(quantum_job_values, gwjob, attributes=_ATTRS_ALL):
453 """Set the job attributes in the cluster to their correct values.
455 Parameters
456 ----------
457 quantum_job_values : `dict` [`str`, Any]
458 Job values for running single Quantum.
459 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
460 Generic workflow job in which to store the universal values.
461 attributes : `~collections.abc.Iterable` [`str`], optional
462 Job attributes to be set in the job following different rules.
463 The default value is _ATTRS_ALL.
464 """
465 _LOG.debug("Call to _handle_job_values")
466 _handle_job_values_universal(quantum_job_values, gwjob, attributes)
467 _handle_job_values_max(quantum_job_values, gwjob, attributes)
468 _handle_job_values_sum(quantum_job_values, gwjob, attributes)
471def _handle_job_values_universal(quantum_job_values, gwjob, attributes=_ATTRS_UNIVERSAL):
472 """Handle job attributes that must have the same value for every quantum
473 in the cluster.
475 Parameters
476 ----------
477 quantum_job_values : `dict` [`str`, Any]
478 Job values for running single Quantum.
479 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
480 Generic workflow job in which to store the universal values.
481 attributes : `~collections.abc.Iterable` [`str`], optional
482 Job attributes to be set in the job following different rules.
483 The default value is _ATTRS_UNIVERSAL.
484 """
485 for attr in _ATTRS_UNIVERSAL & set(attributes):
486 _LOG.debug(
487 "Handling job %s (job=%s, quantum=%s)",
488 attr,
489 getattr(gwjob, attr),
490 quantum_job_values.get(attr, "MISSING"),
491 )
492 current_value = getattr(gwjob, attr)
493 try:
494 quantum_value = quantum_job_values[attr]
495 except KeyError:
496 continue
497 else:
498 if not current_value:
499 setattr(gwjob, attr, quantum_value)
500 elif current_value != quantum_value:
501 _LOG.error(
502 "Inconsistent value for %s in Cluster %s Quantum Number %s\n"
503 "Current cluster value: %s\n"
504 "Quantum value: %s",
505 attr,
506 gwjob.name,
507 quantum_job_values.get("qgraphNodeId", "MISSING"),
508 current_value,
509 quantum_value,
510 )
511 raise RuntimeError(f"Inconsistent value for {attr} in cluster {gwjob.name}.")
514def _handle_job_values_max(quantum_job_values, gwjob, attributes=_ATTRS_MAX):
515 """Handle job attributes that should be set to their maximum value in
516 the in cluster.
518 Parameters
519 ----------
520 quantum_job_values : `dict` [`str`, `~typing.Any`]
521 Job values for running single Quantum.
522 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
523 Generic workflow job in which to store the aggregate values.
524 attributes : `~collections.abc.Iterable` [`str`], optional
525 Job attributes to be set in the job following different rules.
526 The default value is _ATTR_MAX.
527 """
528 for attr in _ATTRS_MAX & set(attributes):
529 current_value = getattr(gwjob, attr)
530 try:
531 quantum_value = quantum_job_values[attr]
532 except KeyError:
533 continue
534 else:
535 needs_update = False
536 if current_value is None:
537 if quantum_value is not None:
538 needs_update = True
539 else:
540 if quantum_value is not None and current_value < quantum_value:
541 needs_update = True
542 if needs_update:
543 setattr(gwjob, attr, quantum_value)
545 # When updating memory requirements for a job, check if memory
546 # autoscaling is enabled. If it is, always use the memory
547 # multiplier and the number of retries which comes with the
548 # quantum.
549 #
550 # Note that as a result, the quantum with the biggest memory
551 # requirements will determine whether the memory autoscaling
552 # will be enabled (or disabled) depending on the value of its
553 # memory multiplier.
554 if attr == "request_memory":
555 gwjob.memory_multiplier = quantum_job_values["memory_multiplier"]
556 if gwjob.memory_multiplier is not None:
557 gwjob.number_of_retries = quantum_job_values["number_of_retries"]
560def _handle_job_values_sum(quantum_job_values, gwjob, attributes=_ATTRS_SUM):
561 """Handle job attributes that are the sum of their values in the cluster.
563 Parameters
564 ----------
565 quantum_job_values : `dict` [`str`, `~typing.Any`]
566 Job values for running single Quantum.
567 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
568 Generic workflow job in which to store the aggregate values.
569 attributes : `~collections.abc.Iterable` [`str`], optional
570 Job attributes to be set in the job following different rules.
571 The default value is _ATTRS_SUM.
572 """
573 for attr in _ATTRS_SUM & set(attributes):
574 current_value = getattr(gwjob, attr)
575 if not current_value:
576 setattr(gwjob, attr, quantum_job_values[attr])
577 else:
578 setattr(gwjob, attr, current_value + quantum_job_values[attr])
581def create_generic_workflow(
582 config: BpsConfig, cqgraph: ClusteredQuantumGraph, name: str, prefix: str
583) -> GenericWorkflow:
584 """Create a generic workflow from a ClusteredQuantumGraph such that it
585 has information needed for WMS (e.g., command lines).
587 Parameters
588 ----------
589 config : `lsst.ctrl.bps.BpsConfig`
590 BPS configuration.
591 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
592 ClusteredQuantumGraph for running a specific pipeline on a specific
593 payload.
594 name : `str`
595 Name for the workflow (typically unique).
596 prefix : `str`
597 Root path for any output files.
599 Returns
600 -------
601 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
602 Generic workflow for the given ClusteredQuantumGraph + config.
603 """
604 # Determine whether saving per-job QuantumGraph files in the loop.
605 _, when_save = config.search("whenSaveJobQgraph", {"default": WhenToSaveQuantumGraphs.TRANSFORM.name})
606 save_qgraph_per_job = WhenToSaveQuantumGraphs[when_save.upper()]
608 search_opt = {"replaceVars": False, "expandEnvVars": False, "replaceEnvVars": True, "required": False}
610 generic_workflow = GenericWorkflow(name)
612 # Save full run QuantumGraph for use by jobs
613 generic_workflow.add_file(
614 GenericWorkflowFile(
615 "runQgraphFile",
616 src_uri=config["runQgraphFile"],
617 wms_transfer=True,
618 job_access_remote=True,
619 job_shared=True,
620 )
621 )
623 # Cache pipetask specific or more generic job values to minimize number
624 # on config searches.
625 cached_job_values = {}
626 cached_pipetask_values = {}
628 for cluster in cqgraph.clusters():
629 _LOG.debug("Loop over clusters: %s, %s", cluster, type(cluster))
630 _LOG.debug(
631 "cqgraph: name=%s, len=%s, label=%s, ids=%s",
632 cluster.name,
633 len(cluster.qgraph_node_ids),
634 cluster.label,
635 cluster.qgraph_node_ids,
636 )
638 gwjob = GenericWorkflowJob(cluster.name, cluster.label)
640 # First get job values from cluster or cluster config
641 search_opt["curvals"] = {"curr_cluster": cluster.label}
642 found, value = config.search("computeSite", opt=search_opt)
643 if found:
644 search_opt["curvals"]["curr_site"] = value
645 found, value = config.search("computeCloud", opt=search_opt)
646 if found:
647 search_opt["curvals"]["curr_cloud"] = value
649 # If some config values are set for this cluster
650 if cluster.label not in cached_job_values:
651 _LOG.debug("config['cluster'][%s] = %s", cluster.label, config["cluster"][cluster.label])
652 cached_job_values[cluster.label] = {}
654 # Allowing whenSaveJobQgraph and useLazyCommands per cluster label.
655 key = "whenSaveJobQgraph"
656 _, when_save = config.search(key, opt=search_opt)
657 cached_job_values[cluster.label][key] = WhenToSaveQuantumGraphs[when_save.upper()]
659 key = "useLazyCommands"
660 search_opt["default"] = True
661 _, cached_job_values[cluster.label][key] = config.search(key, opt=search_opt)
662 del search_opt["default"]
664 if cluster.label in config["cluster"]:
665 # Don't want to get global defaults here so only look in
666 # cluster section.
667 cached_job_values[cluster.label].update(
668 _get_job_values(config["cluster"][cluster.label], search_opt, "runQuantumCommand")
669 )
670 cluster_job_values = copy.copy(cached_job_values[cluster.label])
672 cluster_job_values["name"] = cluster.name
673 cluster_job_values["label"] = cluster.label
674 cluster_job_values["quanta_counts"] = cluster.quanta_counts
675 cluster_job_values["tags"] = cluster.tags
676 _LOG.debug("cluster_job_values = %s", cluster_job_values)
677 _handle_job_values(cluster_job_values, gwjob, cluster_job_values.keys())
679 # For purposes of whether to continue searching for a value is whether
680 # the value evaluates to False.
681 unset_attributes = {attr for attr in _ATTRS_ALL if not getattr(gwjob, attr)}
683 _LOG.debug("unset_attributes=%s", unset_attributes)
684 _LOG.debug("set=%s", _ATTRS_ALL - unset_attributes)
686 # For job info not defined at cluster level, attempt to get job info
687 # either common or aggregate for all Quanta in cluster.
688 for node_id in iter(cluster.qgraph_node_ids):
689 _LOG.debug("node_id=%s", node_id)
690 quantum_info = cqgraph.get_quantum_info(node_id)
692 task_label = quantum_info["task_label"]
693 if task_label not in cached_pipetask_values:
694 search_opt["curvals"]["curr_pipetask"] = task_label
695 cached_pipetask_values[task_label] = _get_job_values(config, search_opt, "runQuantumCommand")
696 _handle_job_values(cached_pipetask_values[task_label], gwjob, unset_attributes)
698 # Update job with workflow attribute and profile values.
699 qgraph_gwfile = _get_qgraph_gwfile(
700 config, save_qgraph_per_job, gwjob, generic_workflow.get_file("runQgraphFile"), prefix
701 )
703 generic_workflow.add_job(gwjob)
704 generic_workflow.add_job_inputs(gwjob.name, [qgraph_gwfile])
706 gwjob.cmdvals["qgraphNodeId"] = ",".join(
707 sorted([f"{node_id}" for node_id in cluster.qgraph_node_ids])
708 )
709 _enhance_command(config, generic_workflow, gwjob, cached_job_values)
711 # If writing per-job QuantumGraph files during TRANSFORM stage,
712 # write it now while in memory.
713 if save_qgraph_per_job == WhenToSaveQuantumGraphs.TRANSFORM:
714 save_qg_subgraph(cqgraph.qgraph, qgraph_gwfile.src_uri, cluster.qgraph_node_ids)
716 # Create job dependencies.
717 for parent in cqgraph.clusters():
718 for child in cqgraph.successors(parent):
719 generic_workflow.add_job_relationships(parent.name, child.name)
721 # Add initial workflow.
722 if config.get("runInit", "{default: False}"):
723 add_workflow_init_nodes(config, cqgraph.qgraph, generic_workflow)
725 generic_workflow.run_attrs.update(
726 {
727 "bps_isjob": "True",
728 "bps_project": config["project"],
729 "bps_campaign": config["campaign"],
730 "bps_run": generic_workflow.name,
731 "bps_operator": config["operator"],
732 "bps_payload": config["payloadName"],
733 "bps_runsite": config["computeSite"],
734 }
735 )
737 # Add final job
738 add_final_job(config, generic_workflow, prefix)
740 if "ordering" in config:
741 generic_workflow.add_special_job_ordering(config["ordering"])
743 return generic_workflow
746def create_generic_workflow_config(config, prefix):
747 """Create generic workflow configuration.
749 Parameters
750 ----------
751 config : `lsst.ctrl.bps.BpsConfig`
752 Bps configuration.
753 prefix : `str`
754 Root path for any output files.
756 Returns
757 -------
758 generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
759 Configuration accompanying the GenericWorkflow.
760 """
761 generic_workflow_config = BpsConfig(config)
762 generic_workflow_config["workflowName"] = config["uniqProcName"]
763 generic_workflow_config["workflowPath"] = prefix
764 return generic_workflow_config
767def add_final_job(config: BpsConfig, generic_workflow: GenericWorkflow, prefix: str) -> None:
768 """Add final workflow job depending upon configuration.
770 Depending on configuration, the final job will be added as a special job
771 which will always run regardless of the exit status of the workflow or
772 a regular sink node which will only run if the workflow execution finished
773 with no errors.
775 Parameters
776 ----------
777 config : `lsst.ctrl.bps.BpsConfig`
778 Bps configuration.
779 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
780 Generic workflow to which attributes should be added.
781 prefix : `str`
782 Directory in which to output final script.
783 """
784 _, when_run = config.search(".finalJob.whenRun")
785 if when_run.upper() != "NEVER":
786 gwjob = create_final_job(config, generic_workflow, prefix)
787 if when_run.upper() == "ALWAYS":
788 generic_workflow.add_final(gwjob)
789 elif when_run.upper() == "SUCCESS":
790 add_final_job_as_sink(generic_workflow, gwjob)
791 else:
792 raise ValueError(f"Invalid value for finalJob.whenRun: {when_run}")
795def create_final_job(config: BpsConfig, generic_workflow: GenericWorkflow, prefix: str) -> GenericWorkflowJob:
796 """Create the final workflow job.
798 Parameters
799 ----------
800 config : `lsst.ctrl.bps.BpsConfig`
801 Bps configuration.
802 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
803 Generic workflow to which attributes should be added.
804 prefix : `str`
805 Directory in which to output final script.
807 Returns
808 -------
809 final_job : `lsst.ctrl.bps.GenericWorkflowJob`
810 Final workflow job.
811 """
812 job_name = "finalJob"
813 gwjob = GenericWorkflowJob(job_name, job_name)
815 search_opt = {"searchobj": config[job_name], "curvals": {}, "default": None}
816 found, value = config.search("computeSite", opt=search_opt)
817 if found:
818 search_opt["curvals"]["curr_site"] = value
819 found, value = config.search("computeCloud", opt=search_opt)
820 if found:
821 search_opt["curvals"]["curr_cloud"] = value
823 # Set job attributes based on the values find in the config excluding
824 # the ones in the _ATTRS_MISC group. The attributes in this group are
825 # somewhat "special":
826 # * HTCondor plugin, which uses 'attrs' and 'profile', has its own
827 # mechanism for setting them,
828 # * 'cmdvals' is being set internally, not via config.
829 job_values = _get_job_values(config, search_opt, None)
830 for attr in _ATTRS_ALL - _ATTRS_MISC:
831 if not getattr(gwjob, attr) and job_values.get(attr, None):
832 setattr(gwjob, attr, job_values[attr])
834 # Create script and add command line to job.
835 gwjob.executable, gwjob.arguments = create_final_command(config, prefix)
837 # Determine inputs from command line.
838 for file_key in re.findall(r"<FILE:([^>]+)>", gwjob.arguments):
839 gwfile = generic_workflow.get_file(file_key)
840 generic_workflow.add_job_inputs(gwjob.name, gwfile)
842 _enhance_command(config, generic_workflow, gwjob, {})
843 return gwjob
846def create_final_command(config: BpsConfig, prefix: str) -> tuple[GenericWorkflowExec, str]:
847 """Create the command and shell script for the final job.
849 Parameters
850 ----------
851 config : `lsst.ctrl.bps.BpsConfig`
852 Bps configuration.
853 prefix : `str`
854 Directory in which to output final script.
856 Returns
857 -------
858 executable : `lsst.ctrl.bps.GenericWorkflowExec`
859 Executable object for the final script.
860 arguments : `str`
861 Command line needed to call the final script.
863 Raises
864 ------
865 RuntimeError if no commands found.
866 """
867 search_opt = {
868 "replaceVars": True,
869 "skipNames": ["butlerConfig", "qgraphFile"],
870 "replaceEnvVars": False,
871 "expandEnvVars": False,
872 "searchobj": config["finalJob"],
873 }
875 script_file = os.path.join(prefix, "final_job.bash")
876 with open(script_file, "w", encoding="utf8") as fh:
877 print("#!/bin/bash\n", file=fh)
878 print("set -e", file=fh)
879 print("set -x", file=fh)
881 print("qgraphFile=$1", file=fh)
882 print("butlerConfig=$2", file=fh)
884 command_len = 0 # Make sure at least write one actual command
885 i = 1
886 found, command = config.search(f"command{i}", opt=search_opt)
887 while found:
888 # The files will be args to script, so change to shell vars
889 command = command.replace("{qgraphFile}", "${qgraphFile}")
890 command = command.replace("{butlerConfig}", "${butlerConfig}")
892 print(command, file=fh)
893 command_len += len(command.strip())
895 # Search for next command
896 i += 1
897 found, command = config.search(f"command{i}", opt=search_opt)
898 if command_len == 0:
899 raise RuntimeError(
900 "No finalJob commands were found. Use NEVER for finalJob.whenRun to turn off finalJob"
901 )
902 os.chmod(script_file, 0o755)
903 executable = GenericWorkflowExec(os.path.basename(script_file), script_file, True)
905 _, orig_butler = config.search("butlerConfig")
906 return executable, f"<FILE:runQgraphFile> {orig_butler}"
909def add_final_job_as_sink(generic_workflow, final_job):
910 """Add final job as the single sink for the workflow.
912 Parameters
913 ----------
914 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
915 Generic workflow to which attributes should be added.
916 final_job : `lsst.ctrl.bps.GenericWorkflowJob`
917 Job to add as new sink node depending upon all previous sink nodes.
918 """
919 # Find sink nodes of generic workflow graph.
920 gw_sinks = [n for n in generic_workflow if generic_workflow.out_degree(n) == 0]
921 _LOG.debug("gw_sinks = %s", gw_sinks)
923 generic_workflow.add_job(final_job)
924 generic_workflow.add_job_relationships(gw_sinks, final_job.name)