Coverage for tests / test_construct.py: 22%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:04 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for the methods in construct.py.""" 

29 

30import os 

31import tempfile 

32import unittest 

33from pathlib import Path 

34from unittest.mock import patch 

35 

36from lsst.ctrl.bps import BpsConfig, GenericWorkflowFile, GenericWorkflowJob 

37from lsst.ctrl.bps.construct import ( 

38 construct, 

39 create_custom_job, 

40 create_custom_workflow, 

41 create_input_path, 

42 create_job_files, 

43 create_output_path, 

44) 

45 

46 

47class ConstructTestCase(unittest.TestCase): 

48 """Tests for the main construct function.""" 

49 

50 def setUp(self): 

51 self.script = tempfile.NamedTemporaryFile(prefix="foo", suffix=".sh") 

52 self.submit_dir = tempfile.TemporaryDirectory() 

53 self.config = BpsConfig( 

54 { 

55 "submitPath": self.submit_dir.name, 

56 "uniqProcName": "test_workflow", 

57 "project": "test_project", 

58 "campaign": "test_campaign", 

59 "operator": "test_operator", 

60 "payloadName": "test_payload", 

61 "computeSite": "test_site", 

62 "customJob": { 

63 "executable": self.script.name, 

64 "arguments": "test_arg", 

65 }, 

66 }, 

67 defaults={}, 

68 ) 

69 

70 def tearDown(self): 

71 self.script.close() 

72 self.submit_dir.cleanup() 

73 

74 def testConstructSuccess(self): 

75 """Test that construct returns a workflow and config.""" 

76 workflow, config = construct(self.config) 

77 

78 self.assertIsNotNone(workflow) 

79 self.assertIsNotNone(config) 

80 self.assertEqual(workflow.name, "test_workflow") 

81 self.assertEqual(config["workflowName"], "test_workflow") 

82 

83 

84class CreateCustomWorkflowTestCase(unittest.TestCase): 

85 """Tests for creating a custom workflow.""" 

86 

87 def setUp(self): 

88 self.script = tempfile.NamedTemporaryFile(prefix="bar", suffix=".sh") 

89 self.submit_dir = tempfile.TemporaryDirectory() 

90 self.config = BpsConfig( 

91 { 

92 "submitPath": self.submit_dir.name, 

93 "customJob": { 

94 "executable": self.script.name, 

95 "arguments": "arg", 

96 }, 

97 "project": "dev", 

98 "campaign": "test", 

99 "operator": "tester", 

100 "payloadName": "custom/workflow", 

101 "computeCloud": "test_cloud", 

102 "computeSite": "test_site", 

103 }, 

104 defaults={}, 

105 ) 

106 

107 def tearDown(self): 

108 self.script.close() 

109 self.submit_dir.cleanup() 

110 

111 def testSuccess(self): 

112 self.config["uniqProcName"] = self.config["submitPath"] 

113 

114 workflow, config = create_custom_workflow(self.config) 

115 

116 self.assertEqual(workflow.name, self.config["uniqProcName"]) 

117 self.assertEqual(workflow.job_counts, {"customJob": 1}) 

118 self.assertTrue(workflow.run_attrs["bps_isjob"]) 

119 self.assertTrue(workflow.run_attrs["bps_iscustom"]) 

120 self.assertEqual(workflow.run_attrs["bps_project"], "dev") 

121 self.assertEqual(workflow.run_attrs["bps_campaign"], "test") 

122 self.assertEqual(workflow.run_attrs["bps_operator"], "tester") 

123 self.assertEqual(workflow.run_attrs["bps_payload"], "custom/workflow") 

124 self.assertEqual(workflow.run_attrs["bps_run"], workflow.name) 

125 self.assertEqual(workflow.run_attrs["bps_runsite"], "test_site") 

126 self.assertEqual(config["workflowName"], self.config["uniqProcName"]) 

127 self.assertEqual(config["workflowPath"], self.config["submitPath"]) 

128 

129 def testEmptyInputs(self): 

130 """Test workflow creation when job has no inputs files.""" 

131 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create: 

132 self.config["uniqProcName"] = "test_custom" 

133 

134 job = GenericWorkflowJob(name="test_job", label="test_job") 

135 gwfile = GenericWorkflowFile(name="test_output", src_uri="test_output") 

136 mock_create.return_value = (job, [], [gwfile]) 

137 

138 workflow, config = create_custom_workflow(self.config) 

139 

140 self.assertEqual(len(workflow.get_job_inputs(job.name)), 0) 

141 self.assertGreater(len(workflow.get_job_outputs(job.name)), 0) 

142 

143 def testEmptyOutputs(self): 

144 """Test workflow creation when job has no output files.""" 

145 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create: 

146 self.config["uniqProcName"] = "test_custom" 

147 

148 job = GenericWorkflowJob(name="test_job", label="test_job") 

149 gwfile = GenericWorkflowFile(name="test_input", src_uri="test_input") 

150 mock_create.return_value = (job, [gwfile], []) 

151 

152 workflow, config = create_custom_workflow(self.config) 

153 

154 self.assertGreater(len(workflow.get_job_inputs(job.name)), 0) 

155 self.assertEqual(len(workflow.get_job_outputs(job.name)), 0) 

156 

157 def testEmptyInputsAndOutputs(self): 

158 """Test workflow creation when job has no input nor output files.""" 

159 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create: 

160 self.config["uniqProcName"] = "test_custom" 

161 

162 job = GenericWorkflowJob(name="test_job", label="test_job") 

163 mock_create.return_value = (job, [], []) 

164 

165 workflow, config = create_custom_workflow(self.config) 

166 

167 self.assertEqual(len(workflow.get_job_inputs(job.name)), 0) 

168 self.assertEqual(len(workflow.get_job_outputs(job.name)), 0) 

169 

170 

171class CreateCustomJobTestCase(unittest.TestCase): 

172 """Tests for creating a custom job.""" 

173 

174 def setUp(self): 

175 self.script = tempfile.NamedTemporaryFile(prefix="foo", suffix=".sh") 

176 self.submit_dir = tempfile.TemporaryDirectory() 

177 self.config = BpsConfig( 

178 { 

179 "submitPath": self.submit_dir.name, 

180 "computeCloud": "test_cloud", 

181 "computeSite": "test_site", 

182 "customJob": { 

183 "executable": self.script.name, 

184 "arguments": "arg", 

185 }, 

186 }, 

187 defaults={}, 

188 ) 

189 

190 def tearDown(self): 

191 self.script.close() 

192 self.submit_dir.cleanup() 

193 

194 def testJobCreationNoFilesSuccess(self): 

195 """Test successful creation of a custom job.""" 

196 script_file = self.script.name 

197 script_name = Path(script_file).name 

198 

199 job, inputs, outputs = create_custom_job(self.config) 

200 

201 self.assertEqual(job.name, script_name) 

202 self.assertEqual(job.label, "customJob") 

203 self.assertEqual(job.compute_cloud, "test_cloud") 

204 self.assertEqual(job.compute_site, "test_site") 

205 self.assertEqual(job.executable.name, script_name) 

206 self.assertEqual(job.executable.src_uri, f"{self.submit_dir.name}/{script_name}") 

207 self.assertTrue(job.executable.transfer_executable) 

208 self.assertTrue(Path(f"{self.submit_dir.name}/{script_name}").exists()) 

209 self.assertEqual(job.arguments, "arg") 

210 self.assertEqual(inputs, []) 

211 self.assertEqual(outputs, []) 

212 

213 def testJobCreationWithFilesSuccess(self): 

214 """Test custom job creation with input and output files.""" 

215 # Create a temporary input file 

216 input_file = tempfile.NamedTemporaryFile(prefix="input", suffix=".txt", delete=False) 

217 input_file.write(b"test input data") 

218 input_file.close() 

219 

220 self.config[".customJob.arguments"] = "--input {input1} --output {output1}" 

221 self.config[".customJob.inputs.input1"] = input_file.name 

222 self.config[".customJob.outputs.output1"] = "output.txt" 

223 

224 try: 

225 job, inputs, outputs = create_custom_job(self.config) 

226 

227 self.assertEqual(len(inputs), 1) 

228 self.assertEqual(len(outputs), 1) 

229 self.assertEqual(inputs[0].name, "input1") 

230 self.assertEqual(outputs[0].name, "output1") 

231 self.assertIn("<FILE:input1>", job.arguments) 

232 self.assertIn("<FILE:output1>", job.arguments) 

233 finally: 

234 os.unlink(input_file.name) 

235 

236 def testJobCreationMissingExecutable(self): 

237 """Test custom job creation fails with missing executable.""" 

238 self.config[".customJob.executable"] = "/nonexistent/script.sh" 

239 

240 with self.assertRaises(FileNotFoundError): 

241 create_custom_job(self.config) 

242 

243 

244class CreateJobFilesTestCase(unittest.TestCase): 

245 """Tests for create_job_files function.""" 

246 

247 def setUp(self): 

248 self.temp_dir = tempfile.TemporaryDirectory() 

249 self.prefix = Path(self.temp_dir.name) 

250 

251 def tearDown(self): 

252 self.temp_dir.cleanup() 

253 

254 def testJobFileCreationNoFiles(self): 

255 """Test create_job_files with empty file specs.""" 

256 config = BpsConfig({"inputs": {}}) 

257 _, filespecs = config.search("inputs") 

258 files = create_job_files(filespecs, self.prefix, lambda path, prefix: prefix / path.name) 

259 

260 self.assertEqual(files, []) 

261 

262 def testJobFileCreationWithFiles(self): 

263 """Test create_job_files with file specifications.""" 

264 config = BpsConfig( 

265 { 

266 "inputs": { 

267 "file1": "/path/to/file1.txt", 

268 "file2": "/path/to/file2.txt", 

269 } 

270 } 

271 ) 

272 _, filespecs = config.search("inputs") 

273 files = create_job_files(filespecs, self.prefix, lambda path, prefix: prefix / path.name) 

274 

275 self.assertEqual(len(files), 2) 

276 self.assertEqual(files[0].name, "file1") 

277 self.assertEqual(files[1].name, "file2") 

278 self.assertTrue(files[0].wms_transfer) 

279 self.assertTrue(files[1].wms_transfer) 

280 

281 

282class CreateInputPathTestCase(unittest.TestCase): 

283 """Tests for create_input_path function.""" 

284 

285 def setUp(self): 

286 self.temp_dir = tempfile.TemporaryDirectory() 

287 self.prefix = Path(self.temp_dir.name) 

288 

289 # Create a test input file 

290 self.input_file = tempfile.NamedTemporaryFile(prefix="input_", suffix=".txt", delete=False) 

291 self.input_file.write(b"test content") 

292 self.input_file.close() 

293 

294 def tearDown(self): 

295 self.temp_dir.cleanup() 

296 

297 def testInputPathCreationSuccess(self): 

298 """Test successful input path creation.""" 

299 input_path = Path(self.input_file.name) 

300 result_path = create_input_path(input_path, self.prefix) 

301 

302 expected_path = self.prefix / input_path.name 

303 self.assertEqual(result_path, expected_path) 

304 self.assertTrue(result_path.exists()) 

305 

306 # Verify file content was copied 

307 with open(result_path) as f: 

308 content = f.read() 

309 self.assertEqual(content, "test content") 

310 

311 def testInputPathCreationFileIsMissing(self): 

312 """Test input path creation fails if file does not exist.""" 

313 nonexistent_path = Path("/nonexistent/file.txt") 

314 

315 with self.assertRaisesRegex(ValueError, "does not exist"): 

316 create_input_path(nonexistent_path, self.prefix) 

317 

318 def testInputPathCreationFileIsDirectory(self): 

319 """Test input path creation fails if path is a directory.""" 

320 dir_path = Path(self.temp_dir.name) 

321 

322 with self.assertRaisesRegex(ValueError, "is a directory"): 

323 create_input_path(dir_path, self.prefix) 

324 

325 def testInputPathCreationPermissionError(self): 

326 """Test input path creation with permission denied.""" 

327 with patch("shutil.copy2", side_effect=PermissionError("Permission denied")): 

328 with self.assertRaises(PermissionError): 

329 create_input_path(Path(self.input_file.name), self.prefix) 

330 

331 

332class CreateOutputPathTestCase(unittest.TestCase): 

333 """Tests for create_output_path function.""" 

334 

335 def setUp(self): 

336 self.temp_dir = tempfile.TemporaryDirectory() 

337 self.prefix = Path(self.temp_dir.name) 

338 

339 def tearDown(self): 

340 self.temp_dir.cleanup() 

341 

342 def testOutputPathCreationRelativePathNew(self): 

343 """Test output path creation.""" 

344 output_path = Path("foo/bar.txt") 

345 result_path = create_output_path(output_path, self.prefix) 

346 expected_path = self.prefix / "foo/bar.txt" 

347 

348 self.assertEqual(result_path, expected_path) 

349 self.assertTrue(result_path.parent.exists()) 

350 

351 def testOutputPathCreationRelativePathParentExits(self): 

352 """Test output path creation when directory already exists.""" 

353 # Create the directory first 

354 subdir = self.prefix / "foo" 

355 subdir.mkdir() 

356 

357 output_path = Path("foo/bar.txt") 

358 result_path = create_output_path(output_path, self.prefix) 

359 expected_path = self.prefix / "foo/bar.txt" 

360 

361 self.assertEqual(result_path, expected_path) 

362 self.assertTrue(result_path.parent.exists()) 

363 

364 def testOutputPathCreationAbsolutePath(self): 

365 """Test that absolute output paths are handled properly.""" 

366 output_path = self.prefix / "foo/bar.txt" 

367 expected_path = output_path 

368 result_path = create_output_path(output_path, self.prefix) 

369 

370 self.assertEqual(result_path, expected_path) 

371 self.assertTrue(result_path.parent.exists())