Coverage for python / lsst / ctrl / bps / construct.py: 13%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Driver for constructing a generic workflow running a custom job."""
30__all__ = ["construct"]
32import logging
33import shutil
34from collections.abc import Callable
35from pathlib import Path
37from lsst.ctrl.bps import (
38 BpsConfig,
39 GenericWorkflow,
40 GenericWorkflowExec,
41 GenericWorkflowFile,
42 GenericWorkflowJob,
43)
44from lsst.ctrl.bps.transform import _get_job_values
46_LOG = logging.getLogger(__name__)
49def construct(config: BpsConfig) -> tuple[GenericWorkflow, BpsConfig]:
50 """Create a workflow for running a custom job.
52 Parameters
53 ----------
54 config : `lsst.ctrl.bps.BpsConfig`
55 Configuration values to be used by submission.
57 Returns
58 -------
59 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
60 Generic workflow for running a standalone job.
61 generic_workflow_config : `lsst.ctrl.BpsConfig`
62 Configuration to accompany created generic workflow.
63 """
64 generic_workflow, generic_workflow_config = create_custom_workflow(config)
65 return generic_workflow, generic_workflow_config
68def create_custom_workflow(config: BpsConfig) -> tuple[GenericWorkflow, BpsConfig]:
69 """Create a workflow that will run a custom job.
71 Parameters
72 ----------
73 config : `lsst.ctrl.bps.BpsConfig`
74 BPS configuration.
76 Returns
77 -------
78 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
79 Generic workflow for running a custom job.
80 generic_workflow_config : `lsst.ctrl.BpsConfig`
81 Configuration to accompany created generic workflow.
82 """
83 gwjob, inputs, outputs = create_custom_job(config)
85 _, name = config.search("uniqProcName", opt={"required": True})
86 generic_workflow = GenericWorkflow(name)
87 generic_workflow.add_job(gwjob)
88 generic_workflow.run_attrs.update(
89 {
90 "bps_isjob": "True",
91 "bps_iscustom": "True",
92 "bps_project": config["project"],
93 "bps_campaign": config["campaign"],
94 "bps_run": generic_workflow.name,
95 "bps_operator": config["operator"],
96 "bps_payload": config["payloadName"],
97 "bps_runsite": config["computeSite"],
98 }
99 )
100 if inputs:
101 generic_workflow.add_job_inputs(gwjob.name, inputs)
102 if outputs:
103 generic_workflow.add_job_outputs(gwjob.name, outputs)
105 generic_workflow_config = BpsConfig(config)
106 generic_workflow_config["workflowName"] = config["uniqProcName"]
107 generic_workflow_config["workflowPath"] = config["submitPath"]
109 return generic_workflow, generic_workflow_config
112def create_custom_job(
113 config: BpsConfig,
114) -> tuple[GenericWorkflowJob, list[GenericWorkflowFile], list[GenericWorkflowFile]]:
115 """Create a job that will run a custom command or script.
117 Parameters
118 ----------
119 config : `lsst.ctrl.bps.BpsConfig`
120 BPS configuration.
122 Returns
123 -------
124 job : `lsst.ctrl.bps.GenericWorkflowJob`
125 A custom job responsible for running the command.
126 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
127 List of job's input files, empty if the job has no input files.
128 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
129 List of job's output files, empty if the job has no output files.
130 """
131 prefix = Path(config["submitPath"])
132 job_label = "customJob"
134 search_opts = {"searchobj": config[job_label], "curvals": {}}
135 found, value = config.search("computeSite", opt=search_opts)
136 if found:
137 search_opts["curvals"]["curr_site"] = value
138 found, value = config.search("computeCloud", opt=search_opts)
139 if found:
140 search_opts["curvals"]["curr_cloud"] = value
142 script_file = Path(config[f".{job_label}.executable"])
143 script_name = script_file.name
145 shutil.copy2(script_file, prefix)
147 job = GenericWorkflowJob(name=script_name, label=job_label)
148 job_values = _get_job_values(config, search_opts, "")
149 for attr, value in job_values.items():
150 if not getattr(job, attr):
151 setattr(job, attr, value)
152 job.executable = GenericWorkflowExec(
153 name=script_name, src_uri=str(prefix / script_name), transfer_executable=True
154 )
155 _, job.arguments = config.search("arguments", opt=search_opts | {"replaceVars": False})
157 inputs = []
158 found, mapping = config.search("inputs", opt=search_opts)
159 if found:
160 inputs = create_job_files(mapping, prefix, path_creator=create_input_path)
162 outputs = []
163 found, mapping = config.search("outputs", opt=search_opts)
164 if found:
165 outputs = create_job_files(mapping, prefix, path_creator=create_output_path)
167 for gwfile in inputs + outputs:
168 job.arguments = job.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>")
170 return job, inputs, outputs
173def create_job_files(
174 file_specs: BpsConfig, prefix: str | Path, path_creator: Callable[[Path, Path], Path]
175) -> list[GenericWorkflowFile]:
176 """Create files for a job.
178 Parameters
179 ----------
180 file_specs : `lsst.ctrl.bps.BpsConfig`
181 The mapping between file keys and file paths.
182 prefix : `str` | `pathlib.Path`
183 The root directory to which the files will be written.
184 path_creator : `~collections.abc.Callable` \
185 [[`pathlib.Path`, `pathlib.Path`], `pathlib.Path`]
186 File category that determines actions that need to be taken during
187 file creation.
189 Returns
190 -------
191 gwfiles : `list` [`lsst.ctrl.bps.GenericWorkflowFile`]
192 List of files created for the job.
193 """
194 prefix = Path(prefix)
196 gwfiles = []
197 for key, path in file_specs.items():
198 src = Path(path)
199 dest = path_creator(src, prefix)
200 gwfiles.append(GenericWorkflowFile(name=key, src_uri=str(dest), wms_transfer=True))
201 return gwfiles
204def create_input_path(path: Path, prefix: Path) -> Path:
205 """Process an input path.
207 Parameters
208 ----------
209 path : `pathlib.Path`
210 The input path.
211 prefix : `pathlib.Path`
212 The root directory to which the file will be written.
214 Raises
215 ------
216 ValueError
217 Raised if the input path does not exist or is a directory.
218 """
219 if path.exists():
220 if path.is_dir():
221 raise ValueError(f"input path '{path} is a directory, must be file")
222 else:
223 raise ValueError(f"input path '{path}' does not exist")
224 dest = prefix / path.name
225 shutil.copy2(path, dest)
226 return dest
229def create_output_path(path: Path, prefix: Path) -> Path:
230 """Process an output path.
232 Parameters
233 ----------
234 path : `pathlib.Path`
235 The output path.
236 prefix : `pathlib.Path`
237 The root directory to which the file will be written.
238 """
239 if path.is_absolute():
240 dest = path
241 else:
242 dest = prefix / path
243 dest.parent.mkdir(parents=True, exist_ok=True)
244 return dest