Coverage for python / lsst / ctrl / bps / construct.py: 13%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:47 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Driver for constructing a generic workflow running a custom job.""" 

29 

30__all__ = ["construct"] 

31 

32import logging 

33import shutil 

34from collections.abc import Callable 

35from pathlib import Path 

36 

37from lsst.ctrl.bps import ( 

38 BpsConfig, 

39 GenericWorkflow, 

40 GenericWorkflowExec, 

41 GenericWorkflowFile, 

42 GenericWorkflowJob, 

43) 

44from lsst.ctrl.bps.transform import _get_job_values 

45 

46_LOG = logging.getLogger(__name__) 

47 

48 

49def construct(config: BpsConfig) -> tuple[GenericWorkflow, BpsConfig]: 

50 """Create a workflow for running a custom job. 

51 

52 Parameters 

53 ---------- 

54 config : `lsst.ctrl.bps.BpsConfig` 

55 Configuration values to be used by submission. 

56 

57 Returns 

58 ------- 

59 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

60 Generic workflow for running a standalone job. 

61 generic_workflow_config : `lsst.ctrl.BpsConfig` 

62 Configuration to accompany created generic workflow. 

63 """ 

64 generic_workflow, generic_workflow_config = create_custom_workflow(config) 

65 return generic_workflow, generic_workflow_config 

66 

67 

68def create_custom_workflow(config: BpsConfig) -> tuple[GenericWorkflow, BpsConfig]: 

69 """Create a workflow that will run a custom job. 

70 

71 Parameters 

72 ---------- 

73 config : `lsst.ctrl.bps.BpsConfig` 

74 BPS configuration. 

75 

76 Returns 

77 ------- 

78 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

79 Generic workflow for running a custom job. 

80 generic_workflow_config : `lsst.ctrl.BpsConfig` 

81 Configuration to accompany created generic workflow. 

82 """ 

83 gwjob, inputs, outputs = create_custom_job(config) 

84 

85 _, name = config.search("uniqProcName", opt={"required": True}) 

86 generic_workflow = GenericWorkflow(name) 

87 generic_workflow.add_job(gwjob) 

88 generic_workflow.run_attrs.update( 

89 { 

90 "bps_isjob": "True", 

91 "bps_iscustom": "True", 

92 "bps_project": config["project"], 

93 "bps_campaign": config["campaign"], 

94 "bps_run": generic_workflow.name, 

95 "bps_operator": config["operator"], 

96 "bps_payload": config["payloadName"], 

97 "bps_runsite": config["computeSite"], 

98 } 

99 ) 

100 if inputs: 

101 generic_workflow.add_job_inputs(gwjob.name, inputs) 

102 if outputs: 

103 generic_workflow.add_job_outputs(gwjob.name, outputs) 

104 

105 generic_workflow_config = BpsConfig(config) 

106 generic_workflow_config["workflowName"] = config["uniqProcName"] 

107 generic_workflow_config["workflowPath"] = config["submitPath"] 

108 

109 return generic_workflow, generic_workflow_config 

110 

111 

112def create_custom_job( 

113 config: BpsConfig, 

114) -> tuple[GenericWorkflowJob, list[GenericWorkflowFile], list[GenericWorkflowFile]]: 

115 """Create a job that will run a custom command or script. 

116 

117 Parameters 

118 ---------- 

119 config : `lsst.ctrl.bps.BpsConfig` 

120 BPS configuration. 

121 

122 Returns 

123 ------- 

124 job : `lsst.ctrl.bps.GenericWorkflowJob` 

125 A custom job responsible for running the command. 

126 inputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

127 List of job's input files, empty if the job has no input files. 

128 outputs : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

129 List of job's output files, empty if the job has no output files. 

130 """ 

131 prefix = Path(config["submitPath"]) 

132 job_label = "customJob" 

133 

134 search_opts = {"searchobj": config[job_label], "curvals": {}} 

135 found, value = config.search("computeSite", opt=search_opts) 

136 if found: 

137 search_opts["curvals"]["curr_site"] = value 

138 found, value = config.search("computeCloud", opt=search_opts) 

139 if found: 

140 search_opts["curvals"]["curr_cloud"] = value 

141 

142 script_file = Path(config[f".{job_label}.executable"]) 

143 script_name = script_file.name 

144 

145 shutil.copy2(script_file, prefix) 

146 

147 job = GenericWorkflowJob(name=script_name, label=job_label) 

148 job_values = _get_job_values(config, search_opts, "") 

149 for attr, value in job_values.items(): 

150 if not getattr(job, attr): 

151 setattr(job, attr, value) 

152 job.executable = GenericWorkflowExec( 

153 name=script_name, src_uri=str(prefix / script_name), transfer_executable=True 

154 ) 

155 _, job.arguments = config.search("arguments", opt=search_opts | {"replaceVars": False}) 

156 

157 inputs = [] 

158 found, mapping = config.search("inputs", opt=search_opts) 

159 if found: 

160 inputs = create_job_files(mapping, prefix, path_creator=create_input_path) 

161 

162 outputs = [] 

163 found, mapping = config.search("outputs", opt=search_opts) 

164 if found: 

165 outputs = create_job_files(mapping, prefix, path_creator=create_output_path) 

166 

167 for gwfile in inputs + outputs: 

168 job.arguments = job.arguments.replace(f"{{{gwfile.name}}}", f"<FILE:{gwfile.name}>") 

169 

170 return job, inputs, outputs 

171 

172 

173def create_job_files( 

174 file_specs: BpsConfig, prefix: str | Path, path_creator: Callable[[Path, Path], Path] 

175) -> list[GenericWorkflowFile]: 

176 """Create files for a job. 

177 

178 Parameters 

179 ---------- 

180 file_specs : `lsst.ctrl.bps.BpsConfig` 

181 The mapping between file keys and file paths. 

182 prefix : `str` | `pathlib.Path` 

183 The root directory to which the files will be written. 

184 path_creator : `~collections.abc.Callable` \ 

185 [[`pathlib.Path`, `pathlib.Path`], `pathlib.Path`] 

186 File category that determines actions that need to be taken during 

187 file creation. 

188 

189 Returns 

190 ------- 

191 gwfiles : `list` [`lsst.ctrl.bps.GenericWorkflowFile`] 

192 List of files created for the job. 

193 """ 

194 prefix = Path(prefix) 

195 

196 gwfiles = [] 

197 for key, path in file_specs.items(): 

198 src = Path(path) 

199 dest = path_creator(src, prefix) 

200 gwfiles.append(GenericWorkflowFile(name=key, src_uri=str(dest), wms_transfer=True)) 

201 return gwfiles 

202 

203 

204def create_input_path(path: Path, prefix: Path) -> Path: 

205 """Process an input path. 

206 

207 Parameters 

208 ---------- 

209 path : `pathlib.Path` 

210 The input path. 

211 prefix : `pathlib.Path` 

212 The root directory to which the file will be written. 

213 

214 Raises 

215 ------ 

216 ValueError 

217 Raised if the input path does not exist or is a directory. 

218 """ 

219 if path.exists(): 

220 if path.is_dir(): 

221 raise ValueError(f"input path '{path} is a directory, must be file") 

222 else: 

223 raise ValueError(f"input path '{path}' does not exist") 

224 dest = prefix / path.name 

225 shutil.copy2(path, dest) 

226 return dest 

227 

228 

229def create_output_path(path: Path, prefix: Path) -> Path: 

230 """Process an output path. 

231 

232 Parameters 

233 ---------- 

234 path : `pathlib.Path` 

235 The output path. 

236 prefix : `pathlib.Path` 

237 The root directory to which the file will be written. 

238 """ 

239 if path.is_absolute(): 

240 dest = path 

241 else: 

242 dest = prefix / path 

243 dest.parent.mkdir(parents=True, exist_ok=True) 

244 return dest