Coverage for tests / test_common_utils.py: 34%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Unit tests for the common utility functions."""
30import logging
31import os
32import unittest
33from pathlib import Path
35import htcondor
37from lsst.ctrl.bps import (
38 WmsStates,
39)
40from lsst.ctrl.bps.htcondor import common_utils
41from lsst.utils.tests import temporaryDirectory
43logger = logging.getLogger("lsst.ctrl.bps.htcondor")
46class HtcNodeStatusToWmsStateTestCase(unittest.TestCase):
47 """Test assigning WMS state base on HTCondor node status."""
49 def setUp(self):
50 pass
52 def tearDown(self):
53 pass
55 def testNotReady(self):
56 job = {"NodeStatus": common_utils.NodeStatus.NOT_READY}
57 result = common_utils._htc_node_status_to_wms_state(job)
58 self.assertEqual(result, WmsStates.UNREADY)
60 def testReady(self):
61 job = {"NodeStatus": common_utils.NodeStatus.READY}
62 result = common_utils._htc_node_status_to_wms_state(job)
63 self.assertEqual(result, WmsStates.READY)
65 def testPrerun(self):
66 job = {"NodeStatus": common_utils.NodeStatus.PRERUN}
67 result = common_utils._htc_node_status_to_wms_state(job)
68 self.assertEqual(result, WmsStates.MISFIT)
70 def testSubmittedHeld(self):
71 job = {
72 "NodeStatus": common_utils.NodeStatus.SUBMITTED,
73 "JobProcsHeld": 1,
74 "StatusDetails": "",
75 "JobProcsQueued": 0,
76 }
77 result = common_utils._htc_node_status_to_wms_state(job)
78 self.assertEqual(result, WmsStates.HELD)
80 def testSubmittedRunning(self):
81 job = {
82 "NodeStatus": common_utils.NodeStatus.SUBMITTED,
83 "JobProcsHeld": 0,
84 "StatusDetails": "not_idle",
85 "JobProcsQueued": 0,
86 }
87 result = common_utils._htc_node_status_to_wms_state(job)
88 self.assertEqual(result, WmsStates.RUNNING)
90 def testSubmittedPending(self):
91 job = {
92 "NodeStatus": common_utils.NodeStatus.SUBMITTED,
93 "JobProcsHeld": 0,
94 "StatusDetails": "",
95 "JobProcsQueued": 1,
96 }
97 result = common_utils._htc_node_status_to_wms_state(job)
98 self.assertEqual(result, WmsStates.PENDING)
100 def testPostrun(self):
101 job = {"NodeStatus": common_utils.NodeStatus.POSTRUN}
102 result = common_utils._htc_node_status_to_wms_state(job)
103 self.assertEqual(result, WmsStates.MISFIT)
105 def testDone(self):
106 job = {"NodeStatus": common_utils.NodeStatus.DONE}
107 result = common_utils._htc_node_status_to_wms_state(job)
108 self.assertEqual(result, WmsStates.SUCCEEDED)
110 def testErrorDagmanSuccess(self):
111 job = {"NodeStatus": common_utils.NodeStatus.ERROR, "StatusDetails": "DAGMAN error 0"}
112 result = common_utils._htc_node_status_to_wms_state(job)
113 self.assertEqual(result, WmsStates.SUCCEEDED)
115 def testErrorDagmanFailure(self):
116 job = {"NodeStatus": common_utils.NodeStatus.ERROR, "StatusDetails": "DAGMAN error 1"}
117 result = common_utils._htc_node_status_to_wms_state(job)
118 self.assertEqual(result, WmsStates.FAILED)
120 def testFutile(self):
121 job = {"NodeStatus": common_utils.NodeStatus.FUTILE}
122 result = common_utils._htc_node_status_to_wms_state(job)
123 self.assertEqual(result, WmsStates.PRUNED)
125 def testDeletedJob(self):
126 job = {
127 "NodeStatus": common_utils.NodeStatus.ERROR,
128 "StatusDetails": "HTCondor reported ULOG_JOB_ABORTED event for job proc (1.0.0)",
129 "JobProcsQueued": 0,
130 }
131 result = common_utils._htc_node_status_to_wms_state(job)
132 self.assertEqual(result, WmsStates.DELETED)
135class HtcStatusToWmsStateTestCase(unittest.TestCase):
136 """Test assigning WMS state base on HTCondor status."""
138 def testJobStatus(self):
139 job = {
140 "ClusterId": 1,
141 "JobStatus": htcondor.JobStatus.IDLE,
142 "bps_job_label": "foo",
143 }
144 result = common_utils._htc_status_to_wms_state(job)
145 self.assertEqual(result, WmsStates.PENDING)
147 def testNodeStatus(self):
148 # Hold/Release test case
149 job = {
150 "ClusterId": 1,
151 "JobStatus": None,
152 "NodeStatus": common_utils.NodeStatus.SUBMITTED,
153 "JobProcsHeld": 0,
154 "StatusDetails": "",
155 "JobProcsQueued": 1,
156 }
157 result = common_utils._htc_status_to_wms_state(job)
158 self.assertEqual(result, WmsStates.PENDING)
160 def testNeitherStatus(self):
161 job = {"ClusterId": 1}
162 result = common_utils._htc_status_to_wms_state(job)
163 self.assertEqual(result, WmsStates.MISFIT)
165 def testRetrySuccess(self):
166 job = {
167 "NodeStatus": 5,
168 "Node": "8e62c569-ae2e-44e8-be36-d1aee333a129_isr_903342_10",
169 "RetryCount": 0,
170 "ClusterId": 851,
171 "ProcId": 0,
172 "MyType": "JobTerminatedEvent",
173 "EventTypeNumber": 5,
174 "HoldReasonCode": 3,
175 "HoldReason": "Job raised a signal 9. Handling signal as if job has gone over memory limit.",
176 "HoldReasonSubCode": 34,
177 "ToE": {
178 "ExitBySignal": False,
179 "ExitCode": 0,
180 },
181 "JobStatus": htcondor.JobStatus.COMPLETED,
182 "ExitBySignal": False,
183 "ExitCode": 0,
184 }
185 result = common_utils._htc_status_to_wms_state(job)
186 self.assertEqual(result, WmsStates.SUCCEEDED)
189class WmsIdToDirTestCase(unittest.TestCase):
190 """Test _wms_id_to_dir function."""
192 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type")
193 def testInvalidIdType(self, _wms_id_type_mock):
194 _wms_id_type_mock.return_value = common_utils.WmsIdType.UNKNOWN
195 with self.assertRaises(TypeError) as cm:
196 _, _ = common_utils._wms_id_to_dir("not_used")
197 self.assertIn("Invalid job id type", str(cm.exception))
199 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type")
200 def testAbsPathId(self, mock_wms_id_type):
201 mock_wms_id_type.return_value = common_utils.WmsIdType.PATH
202 with temporaryDirectory() as tmp_dir:
203 wms_path, id_type = common_utils._wms_id_to_dir(tmp_dir)
204 self.assertEqual(id_type, common_utils.WmsIdType.PATH)
205 self.assertEqual(Path(tmp_dir).resolve(), wms_path)
207 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type")
208 def testRelPathId(self, _wms_id_type_mock):
209 _wms_id_type_mock.return_value = common_utils.WmsIdType.PATH
210 orig_dir = Path.cwd()
211 with temporaryDirectory() as tmp_dir:
212 os.chdir(tmp_dir)
213 abs_path = Path(tmp_dir) / "newdir"
214 abs_path.mkdir()
215 wms_path, id_type = common_utils._wms_id_to_dir("newdir")
216 self.assertEqual(id_type, common_utils.WmsIdType.PATH)
217 self.assertEqual(abs_path.resolve(), wms_path)
218 os.chdir(orig_dir)
221class WmsIdTypeTestCase(unittest.TestCase):
222 """Test _wms_id_type function."""
224 def testIntId(self):
225 id_type = common_utils._wms_id_type("4")
226 self.assertEqual(id_type, common_utils.WmsIdType.LOCAL)
228 def testPathId(self):
229 with temporaryDirectory() as tmp_dir:
230 id_type = common_utils._wms_id_type(str(tmp_dir))
231 self.assertEqual(id_type, common_utils.WmsIdType.PATH)
233 def testGlobalId(self):
234 id_type = common_utils._wms_id_type("testmachine#5044.0#1757720957")
235 self.assertEqual(id_type, common_utils.WmsIdType.GLOBAL)
237 def testUnknownType(self):
238 id_type = common_utils._wms_id_type(["bad param"])
239 self.assertEqual(id_type, common_utils.WmsIdType.UNKNOWN)
242if __name__ == "__main__":
243 unittest.main()