Coverage for tests / test_construct.py: 22%
189 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Unit tests for the methods in construct.py."""
30import os
31import tempfile
32import unittest
33from pathlib import Path
34from unittest.mock import patch
36from lsst.ctrl.bps import BpsConfig, GenericWorkflowFile, GenericWorkflowJob
37from lsst.ctrl.bps.construct import (
38 construct,
39 create_custom_job,
40 create_custom_workflow,
41 create_input_path,
42 create_job_files,
43 create_output_path,
44)
47class ConstructTestCase(unittest.TestCase):
48 """Tests for the main construct function."""
50 def setUp(self):
51 self.script = tempfile.NamedTemporaryFile(prefix="foo", suffix=".sh")
52 self.submit_dir = tempfile.TemporaryDirectory()
53 self.config = BpsConfig(
54 {
55 "submitPath": self.submit_dir.name,
56 "uniqProcName": "test_workflow",
57 "project": "test_project",
58 "campaign": "test_campaign",
59 "operator": "test_operator",
60 "payloadName": "test_payload",
61 "computeSite": "test_site",
62 "customJob": {
63 "executable": self.script.name,
64 "arguments": "test_arg",
65 },
66 },
67 defaults={},
68 )
70 def tearDown(self):
71 self.script.close()
72 self.submit_dir.cleanup()
74 def testConstructSuccess(self):
75 """Test that construct returns a workflow and config."""
76 workflow, config = construct(self.config)
78 self.assertIsNotNone(workflow)
79 self.assertIsNotNone(config)
80 self.assertEqual(workflow.name, "test_workflow")
81 self.assertEqual(config["workflowName"], "test_workflow")
84class CreateCustomWorkflowTestCase(unittest.TestCase):
85 """Tests for creating a custom workflow."""
87 def setUp(self):
88 self.script = tempfile.NamedTemporaryFile(prefix="bar", suffix=".sh")
89 self.submit_dir = tempfile.TemporaryDirectory()
90 self.config = BpsConfig(
91 {
92 "submitPath": self.submit_dir.name,
93 "customJob": {
94 "executable": self.script.name,
95 "arguments": "arg",
96 },
97 "project": "dev",
98 "campaign": "test",
99 "operator": "tester",
100 "payloadName": "custom/workflow",
101 "computeCloud": "test_cloud",
102 "computeSite": "test_site",
103 },
104 defaults={},
105 )
107 def tearDown(self):
108 self.script.close()
109 self.submit_dir.cleanup()
111 def testSuccess(self):
112 self.config["uniqProcName"] = self.config["submitPath"]
114 workflow, config = create_custom_workflow(self.config)
116 self.assertEqual(workflow.name, self.config["uniqProcName"])
117 self.assertEqual(workflow.job_counts, {"customJob": 1})
118 self.assertTrue(workflow.run_attrs["bps_isjob"])
119 self.assertTrue(workflow.run_attrs["bps_iscustom"])
120 self.assertEqual(workflow.run_attrs["bps_project"], "dev")
121 self.assertEqual(workflow.run_attrs["bps_campaign"], "test")
122 self.assertEqual(workflow.run_attrs["bps_operator"], "tester")
123 self.assertEqual(workflow.run_attrs["bps_payload"], "custom/workflow")
124 self.assertEqual(workflow.run_attrs["bps_run"], workflow.name)
125 self.assertEqual(workflow.run_attrs["bps_runsite"], "test_site")
126 self.assertEqual(config["workflowName"], self.config["uniqProcName"])
127 self.assertEqual(config["workflowPath"], self.config["submitPath"])
129 def testEmptyInputs(self):
130 """Test workflow creation when job has no inputs files."""
131 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create:
132 self.config["uniqProcName"] = "test_custom"
134 job = GenericWorkflowJob(name="test_job", label="test_job")
135 gwfile = GenericWorkflowFile(name="test_output", src_uri="test_output")
136 mock_create.return_value = (job, [], [gwfile])
138 workflow, config = create_custom_workflow(self.config)
140 self.assertEqual(len(workflow.get_job_inputs(job.name)), 0)
141 self.assertGreater(len(workflow.get_job_outputs(job.name)), 0)
143 def testEmptyOutputs(self):
144 """Test workflow creation when job has no output files."""
145 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create:
146 self.config["uniqProcName"] = "test_custom"
148 job = GenericWorkflowJob(name="test_job", label="test_job")
149 gwfile = GenericWorkflowFile(name="test_input", src_uri="test_input")
150 mock_create.return_value = (job, [gwfile], [])
152 workflow, config = create_custom_workflow(self.config)
154 self.assertGreater(len(workflow.get_job_inputs(job.name)), 0)
155 self.assertEqual(len(workflow.get_job_outputs(job.name)), 0)
157 def testEmptyInputsAndOutputs(self):
158 """Test workflow creation when job has no input nor output files."""
159 with patch("lsst.ctrl.bps.construct.create_custom_job") as mock_create:
160 self.config["uniqProcName"] = "test_custom"
162 job = GenericWorkflowJob(name="test_job", label="test_job")
163 mock_create.return_value = (job, [], [])
165 workflow, config = create_custom_workflow(self.config)
167 self.assertEqual(len(workflow.get_job_inputs(job.name)), 0)
168 self.assertEqual(len(workflow.get_job_outputs(job.name)), 0)
171class CreateCustomJobTestCase(unittest.TestCase):
172 """Tests for creating a custom job."""
174 def setUp(self):
175 self.script = tempfile.NamedTemporaryFile(prefix="foo", suffix=".sh")
176 self.submit_dir = tempfile.TemporaryDirectory()
177 self.config = BpsConfig(
178 {
179 "submitPath": self.submit_dir.name,
180 "computeCloud": "test_cloud",
181 "computeSite": "test_site",
182 "customJob": {
183 "executable": self.script.name,
184 "arguments": "arg",
185 },
186 },
187 defaults={},
188 )
190 def tearDown(self):
191 self.script.close()
192 self.submit_dir.cleanup()
194 def testJobCreationNoFilesSuccess(self):
195 """Test successful creation of a custom job."""
196 script_file = self.script.name
197 script_name = Path(script_file).name
199 job, inputs, outputs = create_custom_job(self.config)
201 self.assertEqual(job.name, script_name)
202 self.assertEqual(job.label, "customJob")
203 self.assertEqual(job.compute_cloud, "test_cloud")
204 self.assertEqual(job.compute_site, "test_site")
205 self.assertEqual(job.executable.name, script_name)
206 self.assertEqual(job.executable.src_uri, f"{self.submit_dir.name}/{script_name}")
207 self.assertTrue(job.executable.transfer_executable)
208 self.assertTrue(Path(f"{self.submit_dir.name}/{script_name}").exists())
209 self.assertEqual(job.arguments, "arg")
210 self.assertEqual(inputs, [])
211 self.assertEqual(outputs, [])
213 def testJobCreationWithFilesSuccess(self):
214 """Test custom job creation with input and output files."""
215 # Create a temporary input file
216 input_file = tempfile.NamedTemporaryFile(prefix="input", suffix=".txt", delete=False)
217 input_file.write(b"test input data")
218 input_file.close()
220 self.config[".customJob.arguments"] = "--input {input1} --output {output1}"
221 self.config[".customJob.inputs.input1"] = input_file.name
222 self.config[".customJob.outputs.output1"] = "output.txt"
224 try:
225 job, inputs, outputs = create_custom_job(self.config)
227 self.assertEqual(len(inputs), 1)
228 self.assertEqual(len(outputs), 1)
229 self.assertEqual(inputs[0].name, "input1")
230 self.assertEqual(outputs[0].name, "output1")
231 self.assertIn("<FILE:input1>", job.arguments)
232 self.assertIn("<FILE:output1>", job.arguments)
233 finally:
234 os.unlink(input_file.name)
236 def testJobCreationMissingExecutable(self):
237 """Test custom job creation fails with missing executable."""
238 self.config[".customJob.executable"] = "/nonexistent/script.sh"
240 with self.assertRaises(FileNotFoundError):
241 create_custom_job(self.config)
244class CreateJobFilesTestCase(unittest.TestCase):
245 """Tests for create_job_files function."""
247 def setUp(self):
248 self.temp_dir = tempfile.TemporaryDirectory()
249 self.prefix = Path(self.temp_dir.name)
251 def tearDown(self):
252 self.temp_dir.cleanup()
254 def testJobFileCreationNoFiles(self):
255 """Test create_job_files with empty file specs."""
256 config = BpsConfig({"inputs": {}})
257 _, filespecs = config.search("inputs")
258 files = create_job_files(filespecs, self.prefix, lambda path, prefix: prefix / path.name)
260 self.assertEqual(files, [])
262 def testJobFileCreationWithFiles(self):
263 """Test create_job_files with file specifications."""
264 config = BpsConfig(
265 {
266 "inputs": {
267 "file1": "/path/to/file1.txt",
268 "file2": "/path/to/file2.txt",
269 }
270 }
271 )
272 _, filespecs = config.search("inputs")
273 files = create_job_files(filespecs, self.prefix, lambda path, prefix: prefix / path.name)
275 self.assertEqual(len(files), 2)
276 self.assertEqual(files[0].name, "file1")
277 self.assertEqual(files[1].name, "file2")
278 self.assertTrue(files[0].wms_transfer)
279 self.assertTrue(files[1].wms_transfer)
282class CreateInputPathTestCase(unittest.TestCase):
283 """Tests for create_input_path function."""
285 def setUp(self):
286 self.temp_dir = tempfile.TemporaryDirectory()
287 self.prefix = Path(self.temp_dir.name)
289 # Create a test input file
290 self.input_file = tempfile.NamedTemporaryFile(prefix="input_", suffix=".txt", delete=False)
291 self.input_file.write(b"test content")
292 self.input_file.close()
294 def tearDown(self):
295 self.temp_dir.cleanup()
297 def testInputPathCreationSuccess(self):
298 """Test successful input path creation."""
299 input_path = Path(self.input_file.name)
300 result_path = create_input_path(input_path, self.prefix)
302 expected_path = self.prefix / input_path.name
303 self.assertEqual(result_path, expected_path)
304 self.assertTrue(result_path.exists())
306 # Verify file content was copied
307 with open(result_path) as f:
308 content = f.read()
309 self.assertEqual(content, "test content")
311 def testInputPathCreationFileIsMissing(self):
312 """Test input path creation fails if file does not exist."""
313 nonexistent_path = Path("/nonexistent/file.txt")
315 with self.assertRaisesRegex(ValueError, "does not exist"):
316 create_input_path(nonexistent_path, self.prefix)
318 def testInputPathCreationFileIsDirectory(self):
319 """Test input path creation fails if path is a directory."""
320 dir_path = Path(self.temp_dir.name)
322 with self.assertRaisesRegex(ValueError, "is a directory"):
323 create_input_path(dir_path, self.prefix)
325 def testInputPathCreationPermissionError(self):
326 """Test input path creation with permission denied."""
327 with patch("shutil.copy2", side_effect=PermissionError("Permission denied")):
328 with self.assertRaises(PermissionError):
329 create_input_path(Path(self.input_file.name), self.prefix)
332class CreateOutputPathTestCase(unittest.TestCase):
333 """Tests for create_output_path function."""
335 def setUp(self):
336 self.temp_dir = tempfile.TemporaryDirectory()
337 self.prefix = Path(self.temp_dir.name)
339 def tearDown(self):
340 self.temp_dir.cleanup()
342 def testOutputPathCreationRelativePathNew(self):
343 """Test output path creation."""
344 output_path = Path("foo/bar.txt")
345 result_path = create_output_path(output_path, self.prefix)
346 expected_path = self.prefix / "foo/bar.txt"
348 self.assertEqual(result_path, expected_path)
349 self.assertTrue(result_path.parent.exists())
351 def testOutputPathCreationRelativePathParentExits(self):
352 """Test output path creation when directory already exists."""
353 # Create the directory first
354 subdir = self.prefix / "foo"
355 subdir.mkdir()
357 output_path = Path("foo/bar.txt")
358 result_path = create_output_path(output_path, self.prefix)
359 expected_path = self.prefix / "foo/bar.txt"
361 self.assertEqual(result_path, expected_path)
362 self.assertTrue(result_path.parent.exists())
364 def testOutputPathCreationAbsolutePath(self):
365 """Test that absolute output paths are handled properly."""
366 output_path = self.prefix / "foo/bar.txt"
367 expected_path = output_path
368 result_path = create_output_path(output_path, self.prefix)
370 self.assertEqual(result_path, expected_path)
371 self.assertTrue(result_path.parent.exists())