Coverage for tests / test_lssthtc.py: 19%
570 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:36 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""Unit tests for classes and functions in lssthtc.py."""
29import io
30import logging
31import os
32import pathlib
33import stat
34import sys
35import tempfile
36import unittest
37from shutil import copy2, copytree, ignore_patterns, rmtree, which
39import htcondor
41from lsst.ctrl.bps import BpsConfig
42from lsst.ctrl.bps.htcondor import dagman_configurator, htcondor_config, lssthtc
43from lsst.daf.butler import Config
44from lsst.utils.tests import temporaryDirectory
46logger = logging.getLogger("lsst.ctrl.bps.htcondor")
47TESTDIR = os.path.abspath(os.path.dirname(__file__))
50class TestLsstHtc(unittest.TestCase):
51 """Test basic usage."""
53 def testHtcEscapeInt(self):
54 self.assertEqual(lssthtc.htc_escape(100), 100)
56 def testHtcEscapeDouble(self):
57 self.assertEqual(lssthtc.htc_escape('"double"'), '""double""')
59 def testHtcEscapeSingle(self):
60 self.assertEqual(lssthtc.htc_escape("'single'"), "''single''")
62 def testHtcEscapeNoSideEffect(self):
63 val = "'val'"
64 self.assertEqual(lssthtc.htc_escape(val), "''val''")
65 self.assertEqual(val, "'val'")
67 def testHtcEscapeQuot(self):
68 self.assertEqual(lssthtc.htc_escape(""val""), '"val"')
70 def testHtcVersion(self):
71 ver = lssthtc.htc_version()
72 self.assertRegex(ver, r"^\d+\.\d+\.\d+$")
75class HtcTweakJobInfoTestCase(unittest.TestCase):
76 """Test the function responsible for massaging job information."""
78 def setUp(self):
79 self.log_dir = tempfile.TemporaryDirectory()
80 self.log_dirname = pathlib.Path(self.log_dir.name)
81 self.job = {
82 "Cluster": 1,
83 "Proc": 0,
84 "Iwd": str(self.log_dirname),
85 "Owner": self.log_dirname.owner(),
86 "MyType": None,
87 "TerminatedNormally": True,
88 }
90 def tearDown(self):
91 self.log_dir.cleanup()
93 def testDirectAssignments(self):
94 lssthtc.htc_tweak_log_info(self.log_dirname, self.job)
95 self.assertEqual(self.job["ClusterId"], self.job["Cluster"])
96 self.assertEqual(self.job["ProcId"], self.job["Proc"])
97 self.assertEqual(self.job["Iwd"], str(self.log_dirname))
98 self.assertEqual(self.job["Owner"], self.log_dirname.owner())
100 def testIncompatibleAdPassThru(self):
101 # Passing a job ad with insufficient information should be a no-op.
102 expected = {"foo": "bar"}
103 result = dict(expected)
104 lssthtc.htc_tweak_log_info(self.log_dirname, result)
105 self.assertEqual(result, expected)
107 def testJobStatusAssignmentJobAbortedEvent(self):
108 job = self.job | {"MyType": "JobAbortedEvent"}
109 lssthtc.htc_tweak_log_info(self.log_dirname, job)
110 self.assertTrue("JobStatus" in job)
111 self.assertEqual(job["JobStatus"], htcondor.JobStatus.REMOVED)
113 def testJobStatusAssignmentExecuteEvent(self):
114 job = self.job | {"MyType": "ExecuteEvent"}
115 lssthtc.htc_tweak_log_info(self.log_dirname, job)
116 self.assertTrue("JobStatus" in job)
117 self.assertEqual(job["JobStatus"], htcondor.JobStatus.RUNNING)
119 def testJobStatusAssignmentSubmitEvent(self):
120 job = self.job | {"MyType": "SubmitEvent"}
121 lssthtc.htc_tweak_log_info(self.log_dirname, job)
122 self.assertTrue("JobStatus" in job)
123 self.assertEqual(job["JobStatus"], htcondor.JobStatus.IDLE)
125 def testJobStatusAssignmentJobHeldEvent(self):
126 job = self.job | {"MyType": "JobHeldEvent"}
127 lssthtc.htc_tweak_log_info(self.log_dirname, job)
128 self.assertTrue("JobStatus" in job)
129 self.assertEqual(job["JobStatus"], htcondor.JobStatus.HELD)
131 def testJobStatusAssignmentJobTerminatedEvent(self):
132 job = self.job | {"MyType": "JobTerminatedEvent"}
133 lssthtc.htc_tweak_log_info(self.log_dirname, job)
134 self.assertTrue("JobStatus" in job)
135 self.assertEqual(job["JobStatus"], htcondor.JobStatus.COMPLETED)
137 def testJobStatusAssignmentPostScriptTerminatedEvent(self):
138 job = self.job | {"MyType": "PostScriptTerminatedEvent"}
139 lssthtc.htc_tweak_log_info(self.log_dirname, job)
140 self.assertTrue("JobStatus" in job)
141 self.assertEqual(job["JobStatus"], htcondor.JobStatus.COMPLETED)
143 def testJobStatusAssignmentReleaseEventMainDagJob(self):
144 job = self.job | {"MyType": "JobReleaseEvent"}
145 lssthtc.htc_tweak_log_info(self.log_dirname, job)
146 self.assertTrue("JobStatus" in job)
147 self.assertEqual(job["JobStatus"], htcondor.JobStatus.RUNNING)
149 def testJobStatusAssignmentReleaseEventForNodeJob(self):
150 job = self.job | {"MyType": "JobReleaseEvent", "DAGNodeName": "test_payload_job"}
151 lssthtc.htc_tweak_log_info(self.log_dirname, job)
152 self.assertTrue("JobStatus" in job)
153 self.assertEqual(job["JobStatus"], None)
155 def testAddingExitStatusSuccess(self):
156 job = self.job | {
157 "MyType": "JobTerminatedEvent",
158 "ToE": {"ExitBySignal": False, "ExitCode": 1},
159 }
160 lssthtc.htc_tweak_log_info(self.log_dirname, job)
161 self.assertIn("ExitBySignal", job)
162 self.assertIs(job["ExitBySignal"], False)
163 self.assertIn("ExitCode", job)
164 self.assertEqual(job["ExitCode"], 1)
166 def testAddingExitStatusFailure(self):
167 job = self.job | {
168 "MyType": "JobHeldEvent",
169 }
170 with self.assertLogs(logger=logger, level="ERROR") as cm:
171 lssthtc.htc_tweak_log_info(self.log_dirname, job)
172 self.assertIn("Could not determine exit status", cm.output[0])
174 def testLoggingUnknownLogEvent(self):
175 job = self.job | {"MyType": "Foo"}
176 with self.assertLogs(logger=logger, level="DEBUG") as cm:
177 lssthtc.htc_tweak_log_info(self.log_dirname, job)
178 self.assertIn("Unknown log event", cm.output[1])
180 def testMissingKey(self):
181 job = self.job
182 del job["Cluster"]
183 with self.assertRaises(KeyError) as cm:
184 lssthtc.htc_tweak_log_info(self.log_dirname, job)
185 self.assertEqual(str(cm.exception), "'Cluster'")
188class HtcCheckDagmanOutputTestCase(unittest.TestCase):
189 """Test htc_check_dagman_output function."""
191 def test_missing_output_file(self):
192 with temporaryDirectory() as tmp_dir:
193 with self.assertRaises(FileNotFoundError):
194 _ = lssthtc.htc_check_dagman_output(tmp_dir)
196 def test_permissions_output_file(self):
197 with temporaryDirectory() as tmp_dir:
198 copy2(f"{TESTDIR}/data/test_tmpdir_abort.dag.dagman.out", tmp_dir)
199 os.chmod(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out", 0o200)
200 print(os.stat(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out"))
201 results = lssthtc.htc_check_dagman_output(tmp_dir)
202 os.chmod(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out", 0o600)
203 self.assertIn("Could not read dagman output file", results)
205 def test_submit_failure(self):
206 with temporaryDirectory() as tmp_dir:
207 copy2(f"{TESTDIR}/data/bad_submit.dag.dagman.out", tmp_dir)
208 results = lssthtc.htc_check_dagman_output(tmp_dir)
209 self.assertIn("Warn: Job submission issues (last: ", results)
211 def test_tmpdir_abort(self):
212 with temporaryDirectory() as tmp_dir:
213 copy2(f"{TESTDIR}/data/test_tmpdir_abort.dag.dagman.out", tmp_dir)
214 results = lssthtc.htc_check_dagman_output(tmp_dir)
215 self.assertIn("Cannot submit from /tmp", results)
217 def test_no_messages(self):
218 with temporaryDirectory() as tmp_dir:
219 copy2(f"{TESTDIR}/data/test_no_messages.dag.dagman.out", tmp_dir)
220 results = lssthtc.htc_check_dagman_output(tmp_dir)
221 self.assertEqual("", results)
224class SummarizeDagTestCase(unittest.TestCase):
225 """Test summarize_dag function."""
227 def test_no_dag_file(self):
228 with temporaryDirectory() as tmp_dir:
229 summary, job_name_to_pipetask, job_name_to_type = lssthtc.summarize_dag(tmp_dir)
230 self.assertFalse(len(job_name_to_pipetask))
231 self.assertFalse(len(job_name_to_type))
232 self.assertFalse(summary)
234 def test_success(self):
235 with temporaryDirectory() as tmp_dir:
236 copy2(f"{TESTDIR}/data/good.dag", tmp_dir)
237 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir)
238 self.assertEqual(summary, "pipetaskInit:1;label1:1;label2:1;label3:1;finalJob:1")
239 self.assertEqual(
240 job_name_to_label,
241 {
242 "pipetaskInit": "pipetaskInit",
243 "0682f8f9-12f0-40a5-971e-8b30c7231e5c_label1_val1_val2": "label1",
244 "d0305e2d-f164-4a85-bd24-06afe6c84ed9_label2_val1_val2": "label2",
245 "2806ecc9-1bba-4362-8fff-ab4e6abb9f83_label3_val1_val2": "label3",
246 "finalJob": "finalJob",
247 },
248 )
249 self.assertEqual(
250 job_name_to_type,
251 {
252 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD,
253 "0682f8f9-12f0-40a5-971e-8b30c7231e5c_label1_val1_val2": lssthtc.WmsNodeType.PAYLOAD,
254 "d0305e2d-f164-4a85-bd24-06afe6c84ed9_label2_val1_val2": lssthtc.WmsNodeType.PAYLOAD,
255 "2806ecc9-1bba-4362-8fff-ab4e6abb9f83_label3_val1_val2": lssthtc.WmsNodeType.PAYLOAD,
256 "finalJob": lssthtc.WmsNodeType.FINAL,
257 },
258 )
260 def test_service(self):
261 with temporaryDirectory() as tmp_dir:
262 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag", tmp_dir)
263 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir)
264 self.assertEqual(summary, "pipetaskInit:1;label1:2;label2:2;finalJob:1")
265 self.assertEqual(
266 job_name_to_label,
267 {
268 "pipetaskInit": "pipetaskInit",
269 "057c8caf-66f6-4612-abf7-cdea5b666b1b_label1_val1a_val2b": "label1",
270 "4a7f478b-2e9b-435c-a730-afac3f621658_label1_val1a_val2a": "label1",
271 "40040b97-606d-4997-98d3-e0493055fe7e_label2_val1a_val2b": "label2",
272 "696ee50d-e711-40d6-9caf-ee29ae4a656d_label2_val1a_val2a": "label2",
273 "finalJob": "finalJob",
274 "provisioningJob": "provisioningJob",
275 },
276 )
277 self.assertEqual(
278 job_name_to_type,
279 {
280 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD,
281 "057c8caf-66f6-4612-abf7-cdea5b666b1b_label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
282 "4a7f478b-2e9b-435c-a730-afac3f621658_label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
283 "40040b97-606d-4997-98d3-e0493055fe7e_label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
284 "696ee50d-e711-40d6-9caf-ee29ae4a656d_label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
285 "finalJob": lssthtc.WmsNodeType.FINAL,
286 "provisioningJob": lssthtc.WmsNodeType.SERVICE,
287 },
288 )
290 def test_noop(self):
291 with temporaryDirectory() as tmp_dir:
292 copy2(f"{TESTDIR}/data/noop_running_1/noop_running_1.dag", tmp_dir)
293 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir)
294 self.assertEqual(
295 set(summary.split(";")),
296 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"},
297 )
298 self.assertEqual(
299 job_name_to_label,
300 {
301 "label1_val1a_val2a": "label1",
302 "label1_val1a_val2b": "label1",
303 "label1_val1b_val2a": "label1",
304 "label1_val1b_val2b": "label1",
305 "label1_val1c_val2a": "label1",
306 "label1_val1c_val2b": "label1",
307 "label2_val1a_val2a": "label2",
308 "label2_val1a_val2b": "label2",
309 "label2_val1b_val2a": "label2",
310 "label2_val1b_val2b": "label2",
311 "label2_val1c_val2a": "label2",
312 "label2_val1c_val2b": "label2",
313 "label3_val1a_val2a": "label3",
314 "label3_val1a_val2b": "label3",
315 "label3_val1b_val2a": "label3",
316 "label3_val1b_val2b": "label3",
317 "label3_val1c_val2a": "label3",
318 "label3_val1c_val2b": "label3",
319 "label4_val1a_val2a": "label4",
320 "label4_val1a_val2b": "label4",
321 "label4_val1b_val2a": "label4",
322 "label4_val1b_val2b": "label4",
323 "label4_val1c_val2a": "label4",
324 "label4_val1c_val2b": "label4",
325 "label5_val1a_val2a": "label5",
326 "label5_val1a_val2b": "label5",
327 "label5_val1b_val2a": "label5",
328 "label5_val1b_val2b": "label5",
329 "label5_val1c_val2a": "label5",
330 "label5_val1c_val2b": "label5",
331 "finalJob": "finalJob",
332 "pipetaskInit": "pipetaskInit",
333 "wms_noop_order1_val1a": "order1",
334 "wms_noop_order1_val1b": "order1",
335 },
336 )
337 self.assertEqual(
338 job_name_to_type,
339 {
340 "label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
341 "label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
342 "label1_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
343 "label1_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
344 "label1_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
345 "label1_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
346 "label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
347 "label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
348 "label2_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
349 "label2_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
350 "label2_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
351 "label2_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
352 "label3_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
353 "label3_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
354 "label3_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
355 "label3_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
356 "label3_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
357 "label3_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
358 "label4_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
359 "label4_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
360 "label4_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
361 "label4_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
362 "label4_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
363 "label4_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
364 "label5_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
365 "label5_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
366 "label5_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
367 "label5_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
368 "label5_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
369 "label5_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
370 "finalJob": lssthtc.WmsNodeType.FINAL,
371 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD,
372 "wms_noop_order1_val1a": lssthtc.WmsNodeType.NOOP,
373 "wms_noop_order1_val1b": lssthtc.WmsNodeType.NOOP,
374 },
375 )
377 def test_subdags(self):
378 with temporaryDirectory() as tmp_dir:
379 submit_dir = os.path.join(tmp_dir, "group_running_1")
380 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
381 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(submit_dir)
382 self.assertEqual(
383 set(summary.split(";")),
384 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"},
385 )
387 self.assertEqual(
388 job_name_to_label,
389 {
390 "pipetaskInit": "pipetaskInit",
391 "label1_val1b_val2a": "label1",
392 "label1_val1c_val2a": "label1",
393 "label1_val1a_val2b": "label1",
394 "label1_val1b_val2b": "label1",
395 "label1_val1c_val2b": "label1",
396 "label1_val1a_val2a": "label1",
397 "label2_val1a_val2b": "label2",
398 "label2_val1a_val2a": "label2",
399 "label2_val1b_val2a": "label2",
400 "label2_val1b_val2b": "label2",
401 "label2_val1c_val2a": "label2",
402 "label2_val1c_val2b": "label2",
403 "label3_val1b_val2a": "label3",
404 "label3_val1c_val2a": "label3",
405 "label3_val1a_val2b": "label3",
406 "label3_val1b_val2b": "label3",
407 "label3_val1c_val2b": "label3",
408 "label3_val1a_val2a": "label3",
409 "label4_val1a_val2b": "label4",
410 "label4_val1a_val2a": "label4",
411 "label4_val1b_val2a": "label4",
412 "label4_val1b_val2b": "label4",
413 "label4_val1c_val2a": "label4",
414 "label4_val1c_val2b": "label4",
415 "label5_val1a_val2b": "label5",
416 "label5_val1a_val2a": "label5",
417 "label5_val1b_val2a": "label5",
418 "label5_val1b_val2b": "label5",
419 "label5_val1c_val2a": "label5",
420 "label5_val1c_val2b": "label5",
421 "finalJob": "finalJob",
422 "provisioningJob": "provisioningJob",
423 "wms_group_order1_val1a": "order1",
424 "wms_group_order1_val1b": "order1",
425 "wms_group_order1_val1c": "order1",
426 "wms_check_status_wms_group_order1_val1a": "order1",
427 "wms_check_status_wms_group_order1_val1b": "order1",
428 "wms_check_status_wms_group_order1_val1c": "order1",
429 },
430 )
432 self.assertEqual(
433 job_name_to_type,
434 {
435 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD,
436 "label1_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
437 "label1_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
438 "label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
439 "label1_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
440 "label1_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
441 "label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
442 "label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
443 "label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
444 "label2_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
445 "label2_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
446 "label2_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
447 "label2_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
448 "label3_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
449 "label3_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
450 "label3_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
451 "label3_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
452 "label3_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
453 "label3_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
454 "label4_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
455 "label4_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
456 "label4_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
457 "label4_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
458 "label4_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
459 "label4_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
460 "label5_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD,
461 "label5_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD,
462 "label5_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD,
463 "label5_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD,
464 "label5_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD,
465 "label5_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD,
466 "finalJob": lssthtc.WmsNodeType.FINAL,
467 "provisioningJob": lssthtc.WmsNodeType.SERVICE,
468 "wms_group_order1_val1a": lssthtc.WmsNodeType.SUBDAG,
469 "wms_group_order1_val1b": lssthtc.WmsNodeType.SUBDAG,
470 "wms_group_order1_val1c": lssthtc.WmsNodeType.SUBDAG,
471 "wms_check_status_wms_group_order1_val1a": lssthtc.WmsNodeType.SUBDAG_CHECK,
472 "wms_check_status_wms_group_order1_val1b": lssthtc.WmsNodeType.SUBDAG_CHECK,
473 "wms_check_status_wms_group_order1_val1c": lssthtc.WmsNodeType.SUBDAG_CHECK,
474 },
475 )
478class ReadDagNodesLogTestCase(unittest.TestCase):
479 """Test read_dag_nodes_log function."""
481 def setUp(self):
482 self.tmpdir = tempfile.mkdtemp()
484 def tearDown(self):
485 rmtree(self.tmpdir, ignore_errors=True)
487 def testFileMissing(self):
488 with self.assertRaisesRegex(FileNotFoundError, "DAGMan node log not found in"):
489 _ = lssthtc.read_dag_nodes_log(self.tmpdir)
491 def testRegular(self):
492 with temporaryDirectory() as tmp_dir:
493 submit_dir = os.path.join(tmp_dir, "tiny_problems")
494 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*"))
495 results = lssthtc.read_dag_nodes_log(submit_dir)
496 self.assertEqual(results["9231.0"]["Cluster"], 9231)
497 self.assertEqual(results["9231.0"]["Proc"], 0)
498 self.assertEqual(results["9231.0"]["ToE"]["ExitCode"], 1)
499 self.assertEqual(len(results), 6)
501 def testSubdags(self):
502 """Making sure it gets data from subdag dirs and doesn't
503 fail if some subdags haven't started running yet.
504 """
505 with temporaryDirectory() as tmp_dir:
506 submit_dir = os.path.join(tmp_dir, "group_running_1")
507 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
508 results = lssthtc.read_dag_nodes_log(submit_dir)
509 # main dag
510 self.assertEqual(results["10094.0"]["Cluster"], 10094)
511 # subdag
512 self.assertEqual(results["10112.0"]["Cluster"], 10112)
513 self.assertEqual(results["10116.0"]["Cluster"], 10116)
516class ReadNodeStatusTestCase(unittest.TestCase):
517 """Test read_node_status function."""
519 def setUp(self):
520 self.tmpdir = tempfile.mkdtemp()
522 def tearDown(self):
523 rmtree(self.tmpdir, ignore_errors=True)
525 def testServiceJobNotSubmitted(self):
526 # tiny_prov_no_submit files have successful workflow
527 # but provisioningJob could not submit.
528 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag.nodes.log", self.tmpdir)
529 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag.dagman.log", self.tmpdir)
530 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.node_status", self.tmpdir)
531 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag", self.tmpdir)
533 jobs = lssthtc.read_node_status(self.tmpdir)
534 found = [
535 id_
536 for id_ in jobs
537 if jobs[id_].get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN) == lssthtc.WmsNodeType.SERVICE
538 ]
539 self.assertEqual(len(found), 1)
540 self.assertEqual(jobs[found[0]]["DAGNodeName"], "provisioningJob")
541 self.assertEqual(jobs[found[0]]["NodeStatus"], lssthtc.NodeStatus.NOT_READY)
543 def testMissingStatusFile(self):
544 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag.nodes.log", self.tmpdir)
545 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag.dagman.log", self.tmpdir)
546 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag", self.tmpdir)
548 jobs = lssthtc.read_node_status(self.tmpdir)
549 self.assertEqual(len(jobs), 7)
550 self.assertEqual(jobs["9230.0"]["DAGNodeName"], "pipetaskInit")
551 self.assertEqual(jobs["9230.0"]["wms_node_type"], lssthtc.WmsNodeType.PAYLOAD)
552 found = [
553 id_
554 for id_ in jobs
555 if jobs[id_].get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN) == lssthtc.WmsNodeType.SERVICE
556 ]
557 self.assertEqual(len(found), 1)
558 self.assertEqual(jobs[found[0]]["DAGNodeName"], "provisioningJob")
560 def testSubdagsRunning(self):
561 with temporaryDirectory() as tmp_dir:
562 test_tmp_dir = pathlib.Path(tmp_dir)
563 submit_dir = test_tmp_dir / "submit"
564 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
565 jobs = lssthtc.read_node_status(submit_dir)
566 self.assertEqual(len(jobs), 39) # includes non-payload jobs
567 # not guaranteed ids are same, so use names instead
568 job_name_to_id = {}
569 for id_, info in jobs.items():
570 job_name_to_id[info.get("DAGNodeName", id_)] = id_
571 job_type_to_names = {}
572 for id_, info in jobs.items():
573 job_type_to_names.setdefault(
574 info.get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN), set()
575 ).add(info.get("DAGNodeName", id_))
577 # check counts
578 self.assertNotIn(lssthtc.WmsNodeType.NOOP, job_type_to_names)
579 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.PAYLOAD]), 31)
580 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.FINAL]), 1)
581 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SERVICE]), 1)
582 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG]), 3)
583 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG_CHECK]), 3)
585 # spot check some statuses
586 self.assertEqual(
587 jobs[job_name_to_id["label3_val1a_val2b"]]["NodeStatus"], lssthtc.NodeStatus.DONE
588 )
589 self.assertEqual(
590 jobs[job_name_to_id["wms_group_order1_val1a"]]["NodeStatus"], lssthtc.NodeStatus.SUBMITTED
591 )
592 self.assertEqual(
593 jobs[job_name_to_id["label5_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.NOT_READY
594 )
595 self.assertEqual(
596 jobs[job_name_to_id["label2_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.DONE
597 )
599 def testSubdagsFailed(self):
600 with temporaryDirectory() as tmp_dir:
601 test_tmp_dir = pathlib.Path(tmp_dir)
602 submit_dir = test_tmp_dir / "submit"
603 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
604 jobs = lssthtc.read_node_status(submit_dir)
605 self.assertEqual(len(jobs), 39)
606 # not guaranteed ids are same, so use names instead
607 job_name_to_id = {}
608 for id_, info in jobs.items():
609 job_name_to_id[info.get("DAGNodeName", id_)] = id_
610 job_type_to_names = {}
611 for id_, info in jobs.items():
612 job_type_to_names.setdefault(
613 info.get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN), set()
614 ).add(info.get("DAGNodeName", id_))
616 # check counts
617 self.assertNotIn(lssthtc.WmsNodeType.NOOP, job_type_to_names)
618 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.PAYLOAD]), 31)
619 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.FINAL]), 1)
620 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SERVICE]), 1)
621 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG]), 3)
622 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG_CHECK]), 3)
624 # spot check some statuses
625 self.assertEqual(
626 jobs[job_name_to_id["label3_val1a_val2b"]]["NodeStatus"], lssthtc.NodeStatus.DONE
627 )
628 self.assertEqual(
629 jobs[job_name_to_id["wms_group_order1_val1a"]]["NodeStatus"], lssthtc.NodeStatus.DONE
630 )
631 self.assertEqual(
632 jobs[job_name_to_id["label5_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.DONE
633 )
635 self.assertEqual(
636 jobs[job_name_to_id["label5_val1b_val2a"]]["NodeStatus"], lssthtc.NodeStatus.FUTILE
637 )
638 self.assertEqual(
639 jobs[job_name_to_id["wms_group_order1_val1b"]]["NodeStatus"], lssthtc.NodeStatus.DONE
640 )
641 self.assertEqual(
642 jobs[job_name_to_id["wms_check_status_wms_group_order1_val1b"]]["NodeStatus"],
643 lssthtc.NodeStatus.ERROR,
644 )
647class HTCJobTestCase(unittest.TestCase):
648 """Test HTCJob methods."""
650 def testWriteDagCommandsPayload(self):
651 job = lssthtc.HTCJob(
652 "job1",
653 "label1",
654 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"},
655 {"dir": "jobs/label1"},
656 )
657 job.subfile = "job1.sub"
659 mockfh = io.StringIO()
660 job.write_dag_commands(mockfh, "../..")
661 self.assertIn('JOB job1 "job1.sub" DIR "../../jobs/label1"', mockfh.getvalue())
663 def testWriteDagCommandsNotJob(self):
664 # Testing giving command_name, no dag_rel_path and no dir
665 job = lssthtc.HTCJob(
666 "finalJob",
667 "finalJob",
668 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"},
669 )
670 job.subfile = "jobs/finalJob/finalJob.sub"
671 mockfh = io.StringIO()
672 job.write_dag_commands(mockfh, "", "FINAL")
673 self.assertIn('FINAL finalJob "jobs/finalJob/finalJob.sub"', mockfh.getvalue())
675 def testWriteDagCommandsNoop(self):
676 job = lssthtc.HTCJob("wms_noop_job1", "label1", {}, {"noop": True})
677 job.subfile = "notthere.sub"
678 mockfh = io.StringIO()
679 job.write_dag_commands(mockfh, "")
680 self.assertIn("NOOP", mockfh.getvalue())
682 def testWriteSubmitFile(self):
683 job = lssthtc.HTCJob(
684 "job1",
685 "label1",
686 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"},
687 )
688 with temporaryDirectory() as tmp_dir:
689 filename = pathlib.Path(tmp_dir) / "label1/job1.sub"
690 job.write_submit_file(filename.parent)
691 self.assertTrue(filename.exists())
692 # Try to make Submit object from file to find any syntax issues
693 _ = lssthtc.htc_create_submit_from_file(filename)
695 def testWriteSubmitFileExists(self):
696 job = lssthtc.HTCJob(
697 "job1",
698 "label1",
699 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"},
700 )
701 with temporaryDirectory() as tmp_dir:
702 filename = pathlib.Path(tmp_dir) / "job1.sub"
703 job.subfile = filename
704 with open(filename, "w"):
705 pass # make empty file
706 job.write_submit_file(filename.parent)
707 # make sure didn't overwrite file
708 self.assertEqual(filename.stat().st_size, 0, "Incorrectly overwrote existing file")
711class HtcWriteJobCommands(unittest.TestCase):
712 """Test _htc_write_job_commands function."""
714 def testAllCommands(self):
715 dag_cmds = {
716 "pre": {
717 "defer": {"status": 1, "time": 120},
718 "debug": {"filename": "debug_pre.txt", "type": "ALL"},
719 "executable": "exec1",
720 "arguments": "arg1 arg2",
721 },
722 "post": {
723 "defer": {"status": 2, "time": 180},
724 "debug": {"filename": "debug_post.txt", "type": "ALL"},
725 "executable": "exec2",
726 "arguments": "arg3 arg4",
727 },
728 "vars": {"num": 8, "spaces": "a space"},
729 "pre_skip": "1",
730 "retry": 3,
731 "retry_unless_exit": 1,
732 "abort_dag_on": {"node_exit": 100, "abort_exit": 4},
733 "priority": 123,
734 }
736 truth = """SCRIPT DEFER 1 120 DEBUG debug_pre.txt ALL PRE job1 exec1 arg1 arg2
737SCRIPT DEFER 2 180 DEBUG debug_post.txt ALL POST job1 exec2 arg3 arg4
738VARS job1 num="8"
739VARS job1 spaces="a space"
740PRE_SKIP job1 1
741RETRY job1 3 UNLESS-EXIT 1
742ABORT-DAG-ON job1 100 RETURN 4
743PRIORITY job1 123
744"""
745 mockfh = io.StringIO()
746 lssthtc._htc_write_job_commands(mockfh, "job1", dag_cmds)
747 self.assertEqual(mockfh.getvalue(), truth)
749 def testPartialCommands(self):
750 # Trigger skipping the inner if clauses.
751 dag_cmds = {
752 "pre": {
753 "executable": "exec1",
754 },
755 "post": {
756 "executable": "exec2",
757 },
758 "vars": {"num": 8, "spaces": "a space"},
759 "pre_skip": "1",
760 "retry": 3,
761 }
763 truth = """SCRIPT PRE job1 exec1
764SCRIPT POST job1 exec2
765VARS job1 num="8"
766VARS job1 spaces="a space"
767PRE_SKIP job1 1
768RETRY job1 3
769"""
770 mockfh = io.StringIO()
771 lssthtc._htc_write_job_commands(mockfh, "job1", dag_cmds)
772 self.assertEqual(mockfh.getvalue(), truth)
774 def testNoCommands(self):
775 dag_cmds = {}
776 mockfh = io.StringIO()
777 lssthtc._htc_write_job_commands(mockfh, "job2", dag_cmds)
778 self.assertEqual(mockfh.getvalue(), "")
780 def testFinal(self):
781 self.maxDiff = None
782 dag_cmds = {
783 "pre": {
784 "defer": {"status": 1, "time": 120},
785 "debug": {"filename": "debug_pre.txt", "type": "ALL"},
786 "executable": "exec1",
787 "arguments": "arg1 arg2",
788 },
789 "post": {
790 "defer": {"status": 2, "time": 180},
791 "debug": {"filename": "debug_post.txt", "type": "ALL"},
792 "executable": "exec2",
793 "arguments": "arg3 arg4",
794 },
795 "vars": {"num": 8, "spaces": "a space"},
796 "pre_skip": "1",
797 "retry": 3,
798 "retry_unless_exit": 1,
799 "abort_dag_on": {"node_exit": 100, "abort_exit": 4},
800 "priority": 123,
801 }
803 truth = """SCRIPT DEFER 1 120 DEBUG debug_pre.txt ALL PRE finalJob exec1 arg1 arg2
804SCRIPT DEFER 2 180 DEBUG debug_post.txt ALL POST finalJob exec2 arg3 arg4
805VARS finalJob num="8"
806VARS finalJob spaces="a space"
807PRE_SKIP finalJob 1
808"""
809 mockfh = io.StringIO()
810 lssthtc._htc_write_job_commands(mockfh, "finalJob", dag_cmds, "FINAL")
811 self.assertEqual(mockfh.getvalue(), truth)
814class HTCBackupFilesSinglePathTestCase(unittest.TestCase):
815 """Test htc_backup_files_single_path function."""
817 def testSrcDestSame(self):
818 with temporaryDirectory() as tmp_dir:
819 with self.assertRaisesRegex(
820 RuntimeError, "Destination directory is same as the source directory"
821 ):
822 lssthtc.htc_backup_files_single_path(tmp_dir, tmp_dir)
824 def testSuccess(self):
825 with temporaryDirectory() as tmp_dir:
826 test_tmp_dir = pathlib.Path(tmp_dir)
827 submit_dir = test_tmp_dir / "the_src_dir"
828 copytree(f"{TESTDIR}/data/tiny_success", submit_dir, ignore=ignore_patterns("*~", ".???*"))
829 backup_dir = test_tmp_dir / "the_dest_dir"
830 backup_dir.mkdir()
831 lssthtc.htc_backup_files_single_path(submit_dir, backup_dir)
832 result_submit = []
833 for root, _, files in os.walk(submit_dir):
834 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
835 self.assertEqual(
836 set(result_submit),
837 {
838 "./tiny_success.dag.dagman.log",
839 "./tiny_success.dag.dagman.out",
840 "./tiny_success.dag",
841 },
842 )
843 result_backup = []
844 for root, _, files in os.walk(backup_dir):
845 result_backup.extend([str(os.path.join(os.path.relpath(root, backup_dir), f)) for f in files])
846 self.assertEqual(
847 set(result_backup),
848 {
849 "./tiny_success.info.json",
850 "./tiny_success.dag.metrics",
851 "./tiny_success.dag.nodes.log",
852 "./tiny_success.node_status",
853 },
854 )
857class HTCBackupFilesTestCase(unittest.TestCase):
858 """Test htc_backup_files function."""
860 def testDirectoryNotFound(self):
861 with temporaryDirectory() as tmp_dir:
862 test_tmp_dir = pathlib.Path(tmp_dir)
863 submit_dir = test_tmp_dir / "submit"
864 with self.assertRaises(FileNotFoundError):
865 _ = lssthtc.htc_backup_files(submit_dir)
867 def testSuccess(self):
868 with temporaryDirectory() as tmp_dir:
869 test_tmp_dir = pathlib.Path(tmp_dir)
870 submit_dir = test_tmp_dir / "submit"
871 copytree(f"{TESTDIR}/data/tiny_success", submit_dir, ignore=ignore_patterns("*~", ".???*"))
872 result_rescue = lssthtc.htc_backup_files(submit_dir)
873 self.assertIsNone(result_rescue)
874 result_submit = []
875 for root, _, files in os.walk(submit_dir):
876 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
877 self.assertEqual(
878 set(result_submit),
879 {
880 "./tiny_success.dag.dagman.log",
881 "./tiny_success.dag.dagman.out",
882 "./tiny_success.dag",
883 "000/tiny_success.info.json",
884 "000/tiny_success.dag.metrics",
885 "000/tiny_success.dag.nodes.log",
886 "000/tiny_success.node_status",
887 },
888 )
890 def testDestNotInSubmitDir(self):
891 with temporaryDirectory() as tmp_dir:
892 test_tmp_dir = pathlib.Path(tmp_dir)
893 submit_dir = test_tmp_dir / "submit"
894 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*"))
895 with self.assertLogs("lsst.ctrl.bps.htcondor", level="WARNING") as cm:
896 result_rescue = lssthtc.htc_backup_files(submit_dir, test_tmp_dir / "backup")
897 self.assertIn("Invalid backup location:", cm.output[-1])
898 result_rescue = lssthtc.htc_backup_files(submit_dir)
899 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue))
900 result_submit = []
901 for root, _, files in os.walk(submit_dir):
902 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
903 self.assertEqual(
904 set(result_submit),
905 {
906 "./tiny_problems.dag.dagman.log",
907 "./tiny_problems.dag.dagman.out",
908 "./tiny_problems.dag",
909 "./tiny_problems.dag.rescue001",
910 "001/tiny_problems.info.json",
911 "001/tiny_problems.dag.metrics",
912 "001/tiny_problems.dag.nodes.log",
913 "001/tiny_problems.node_status",
914 },
915 )
917 def testDestInSubmitDir(self):
918 with temporaryDirectory() as tmp_dir:
919 test_tmp_dir = pathlib.Path(tmp_dir)
920 submit_dir = test_tmp_dir / "submit"
921 backup_dir = submit_dir / "subdir"
922 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*"))
923 result_rescue = lssthtc.htc_backup_files(submit_dir, backup_dir)
924 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue))
925 result_submit = []
926 for root, _, files in os.walk(submit_dir):
927 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
928 self.assertEqual(
929 set(result_submit),
930 {
931 "./tiny_problems.dag.dagman.log",
932 "./tiny_problems.dag.dagman.out",
933 "./tiny_problems.dag",
934 "./tiny_problems.dag.rescue001",
935 "subdir/001/tiny_problems.info.json",
936 "subdir/001/tiny_problems.dag.metrics",
937 "subdir/001/tiny_problems.dag.nodes.log",
938 "subdir/001/tiny_problems.node_status",
939 },
940 )
942 def testRelativeSubdir(self):
943 with temporaryDirectory() as tmp_dir:
944 test_tmp_dir = pathlib.Path(tmp_dir)
945 submit_dir = test_tmp_dir / "submit"
946 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*"))
947 result_rescue = lssthtc.htc_backup_files(submit_dir, "reldir")
948 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue))
949 result_submit = []
950 for root, _, files in os.walk(submit_dir):
951 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
952 self.assertEqual(
953 set(result_submit),
954 {
955 "./tiny_problems.dag.dagman.log",
956 "./tiny_problems.dag.dagman.out",
957 "./tiny_problems.dag",
958 "./tiny_problems.dag.rescue001",
959 "reldir/001/tiny_problems.info.json",
960 "reldir/001/tiny_problems.dag.metrics",
961 "reldir/001/tiny_problems.dag.nodes.log",
962 "reldir/001/tiny_problems.node_status",
963 },
964 )
966 def testSubdags(self):
967 with temporaryDirectory() as tmp_dir:
968 test_tmp_dir = pathlib.Path(tmp_dir)
969 submit_dir = test_tmp_dir / "submit"
970 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
971 result_rescue = lssthtc.htc_backup_files(submit_dir)
972 self.assertTrue(result_rescue.samefile(submit_dir / "group_failed_1.dag.rescue001"))
973 result_submit = []
974 for root, _, files in os.walk(submit_dir):
975 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files])
976 self.assertEqual(
977 set(result_submit),
978 {
979 "./group_failed_1.dag",
980 "./group_failed_1.dag.dagman.log",
981 "./group_failed_1.dag.dagman.out",
982 "./group_failed_1.dag.rescue001",
983 "subdags/wms_group_order1_val1a/group_order1_val1a.dag",
984 "subdags/wms_group_order1_val1a/group_order1_val1a.dag.dagman.log",
985 "subdags/wms_group_order1_val1a/group_order1_val1a.dag.dagman.out",
986 "subdags/wms_group_order1_val1b/group_order1_val1b.dag",
987 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.dagman.log",
988 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.dagman.out",
989 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.rescue001",
990 "subdags/wms_group_order1_val1c/group_order1_val1c.dag",
991 "subdags/wms_group_order1_val1c/group_order1_val1c.dag.dagman.log",
992 "subdags/wms_group_order1_val1c/group_order1_val1c.dag.dagman.out",
993 "001/group_failed_1.dag.nodes.log",
994 "001/group_failed_1.info.json",
995 "001/group_failed_1.node_status",
996 "001/subdags/wms_group_order1_val1a/group_order1_val1a.dag.nodes.log",
997 "001/subdags/wms_group_order1_val1a/group_order1_val1a.node_status",
998 "001/subdags/wms_group_order1_val1a/wms_group_order1_val1a.dag.post.out",
999 "001/subdags/wms_group_order1_val1a/wms_group_order1_val1a.status.txt",
1000 "001/subdags/wms_group_order1_val1b/group_order1_val1b.dag.nodes.log",
1001 "001/subdags/wms_group_order1_val1b/group_order1_val1b.node_status",
1002 "001/subdags/wms_group_order1_val1b/wms_group_order1_val1b.status.txt",
1003 "001/subdags/wms_group_order1_val1b/wms_group_order1_val1b.dag.post.out",
1004 "001/subdags/wms_group_order1_val1c/group_order1_val1c.dag.nodes.log",
1005 "001/subdags/wms_group_order1_val1c/group_order1_val1c.node_status",
1006 "001/subdags/wms_group_order1_val1c/wms_group_order1_val1c.dag.post.out",
1007 "001/subdags/wms_group_order1_val1c/wms_group_order1_val1c.status.txt",
1008 },
1009 )
1012class UpdateRescueFileTestCase(unittest.TestCase):
1013 """Test _update_rescue_file function."""
1015 def testSuccess(self):
1016 self.maxDiff = None
1017 with temporaryDirectory() as tmp_dir:
1018 test_tmp_dir = pathlib.Path(tmp_dir)
1019 submit_dir = test_tmp_dir / "submit"
1020 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
1021 rescue_file = submit_dir / "group_failed_1.dag.rescue001"
1022 lssthtc._update_rescue_file(rescue_file)
1024 with open(rescue_file) as fh:
1025 lines = fh.readlines()
1026 results = "".join(lines)
1028 truth = """# Rescue DAG file, created after running
1029# the u_testuser_DM-46294_group_fail_20250310T160455Z.dag DAG file
1030# Created 3/10/2025 16:08:56 UTC
1031# Rescue DAG version: 2.0.1 (partial)
1032#
1033# Total number of Nodes: 26
1034# Nodes premarked DONE: 21
1035# Nodes that failed: 2
1036# wms_group_order1_val1b,finalJob,<ENDLIST>
1038DONE pipetaskInit
1039DONE label1_val1c_val2a
1040DONE label1_val1b_val2b
1041DONE label1_val1b_val2a
1042DONE label1_val1c_val2b
1043DONE label1_val1a_val2a
1044DONE label1_val1a_val2b
1045DONE label3_val1c_val2a
1046DONE label3_val1b_val2b
1047DONE label3_val1b_val2a
1048DONE label3_val1c_val2b
1049DONE label3_val1a_val2a
1050DONE label3_val1a_val2b
1051DONE wms_group_order1_val1a
1052DONE label5_val1a_val2a
1053DONE label5_val1a_val2b
1054DONE wms_group_order1_val1c
1055DONE label5_val1c_val2a
1056DONE label5_val1c_val2b
1057DONE wms_check_status_wms_group_order1_val1a
1058DONE wms_check_status_wms_group_order1_val1c
1059"""
1061 print("results = ", results, file=sys.stderr)
1062 print("truth = ", truth, file=sys.stderr)
1063 self.assertEqual(results, truth)
1066class ReadDagStatusTestCase(unittest.TestCase):
1067 """Test read_dag_status function and read_single_dag_status."""
1069 def testFileMissing(self):
1070 with temporaryDirectory() as tmp_dir:
1071 with self.assertRaisesRegex(FileNotFoundError, "DAGMan node status not found"):
1072 _ = lssthtc.read_dag_status(tmp_dir)
1074 def testRegular(self):
1075 with temporaryDirectory() as tmp_dir:
1076 submit_dir = os.path.join(tmp_dir, "tiny_problems")
1077 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*"))
1078 results = lssthtc.read_dag_status(submit_dir)
1079 truth = {
1080 "JobProcsHeld": 0,
1081 "NodesPost": 0,
1082 "JobProcsIdle": 0,
1083 "NodesTotal": 6,
1084 "NodesFailed": 2,
1085 "NodesDone": 3,
1086 "NodesQueued": 0,
1087 "NodesPre": 0,
1088 "NodesFutile": 1,
1089 "NodesUnready": 0,
1090 }
1091 self.assertEqual(results, results | truth)
1093 def testSubdags(self):
1094 """Making sure it gets data from subdag dirs and doesn't
1095 fail if some subdags haven't started running yet.
1096 """
1097 self.maxDiff = None
1098 with temporaryDirectory() as tmp_dir:
1099 submit_dir = os.path.join(tmp_dir, "submit")
1100 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*"))
1101 results = lssthtc.read_dag_status(submit_dir)
1102 truth = {
1103 "JobProcsHeld": 0,
1104 "NodesPost": 0,
1105 "JobProcsIdle": 0,
1106 "NodesTotal": 34,
1107 "NodesFailed": 0,
1108 "NodesDone": 17,
1109 "NodesQueued": 3,
1110 "NodesPre": 0,
1111 "NodesFutile": 0,
1112 "NodesUnready": 14,
1113 }
1114 self.assertEqual(results, results | truth)
1117class ReadDagInfoTestCase(unittest.TestCase):
1118 """Test read_dag_info function."""
1120 def testFileMissing(self):
1121 with temporaryDirectory() as tmp_dir:
1122 with self.assertRaisesRegex(FileNotFoundError, "File with DAGMan job information not found in "):
1123 _ = lssthtc.read_dag_info(tmp_dir)
1125 def testSuccess(self):
1126 with temporaryDirectory() as tmp_dir:
1127 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.info.json", tmp_dir)
1128 results = lssthtc.read_dag_info(tmp_dir)
1130 truth = {
1131 "test02": {
1132 "9208.0": {
1133 "ClusterId": 9208,
1134 "GlobalJobId": "test02#9208.0#1739465078",
1135 "bps_wms_service": "lsst.ctrl.bps.htcondor.htcondor_service.HTCondorService",
1136 "bps_project": "dev",
1137 "bps_payload": "tiny",
1138 "bps_operator": "testuser",
1139 "bps_wms_workflow": "lsst.ctrl.bps.htcondor.htcondor_service.HTCondorWorkflow",
1140 "bps_provisioning_job": "provisioningJob",
1141 "bps_run_quanta": "label1:1;label2:1",
1142 "bps_campaign": "quick",
1143 "bps_runsite": "testpool",
1144 "bps_job_summary": "pipetaskInit:1;label1:1;label2:1;finalJob:1",
1145 "bps_run": "u_testuser_tiny_20250213T164427Z",
1146 "bps_isjob": "True",
1147 }
1148 }
1149 }
1151 self.assertEqual(results, truth)
1153 def testPermissionError(self):
1154 with temporaryDirectory() as tmp_dir:
1155 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.info.json", tmp_dir)
1156 with unittest.mock.patch("lsst.ctrl.bps.htcondor.lssthtc.open") as mocked_open:
1157 mocked_open.side_effect = PermissionError
1158 with self.assertLogs("lsst.ctrl.bps.htcondor", level="DEBUG") as cm:
1159 results = lssthtc.read_dag_info(tmp_dir)
1160 self.assertIn("Retrieving DAGMan job information failed:", cm.output[-1])
1161 self.assertEqual({}, results)
1164class HtcWriteCondorFileTestCase(unittest.TestCase):
1165 """Test htc_write_condor_file function."""
1167 def testSuccess(self):
1168 with temporaryDirectory() as tmp_dir:
1169 job_name = "job1"
1170 filename = pathlib.Path(tmp_dir) / f"label1/{job_name}.sub"
1171 job = {
1172 "executable": "$(CTRL_MPEXEC_DIR)/bin/pipetask",
1173 "arguments": "-a -b 2 -c",
1174 "request_memory": "2000",
1175 "environment": "one=1 two=\"2\" three='spacey 'quoted' value'",
1176 "log": f"{job_name}.log",
1177 }
1178 job_attrs = {
1179 "bps_job_name": job_name,
1180 "bps_job_label": "label1",
1181 "bps_job_quanta": "task1:8;task2:8",
1182 }
1183 expected = [
1184 "executable=$(CTRL_MPEXEC_DIR)/bin/pipetask\n",
1185 "arguments=-a -b 2 -c\n",
1186 "request_memory=2000\n",
1187 "environment=\"one=1 two=\"2\" three='spacey 'quoted' value'\"\n",
1188 f"output={job_name}.$(Cluster).out\n",
1189 f"error={job_name}.$(Cluster).out\n",
1190 f"log={job_name}.log\n",
1191 f'+bps_job_name = "{job_name}"\n',
1192 '+bps_job_label = "label1"\n',
1193 '+bps_job_quanta = "task1:8;task2:8"\n',
1194 "queue\n",
1195 ]
1197 lssthtc.htc_write_condor_file(filename, job_name, job, job_attrs)
1198 with open(filename, encoding="utf-8") as f:
1199 actual = f.readlines()
1201 self.assertEqual(set(actual), set(expected))
1202 self.assertTrue(filename.exists())
1203 # Try to make Submit object from file to find any syntax issues
1204 _ = lssthtc.htc_create_submit_from_file(filename)
1207class HtcCreateSubmitFromDagTestCase(unittest.TestCase):
1208 """Test htc_create_submit_from_dag function."""
1210 @classmethod
1211 def setUpClass(cls):
1212 cls.bindir = None
1213 # htcondor.Submit.from_dag requires condor_dagman executable in path.
1214 if not which("condor_dagman"): # pragma: no cover
1215 cls.bindir = tempfile.TemporaryDirectory()
1216 fake_dagman_exec = pathlib.Path(cls.bindir.name) / "condor_dagman"
1217 with open(fake_dagman_exec, "w") as fh:
1218 print("#!/bin/bash", file=fh)
1219 print("echo fake_condor_dagman $@", file=fh)
1220 print("exit 0", file=fh)
1221 fake_dagman_exec.chmod(fake_dagman_exec.stat().st_mode | stat.S_IEXEC)
1222 os.environ["PATH"] = f"{os.environ['PATH']}:{cls.bindir.name}"
1224 @classmethod
1225 def tearDownClass(cls):
1226 if cls.bindir:
1227 cls.bindir.cleanup()
1229 @unittest.mock.patch.dict(os.environ, {"_CONDOR_DAGMAN_MAX_JOBS_IDLE": "42"})
1230 def testMaxIdleEnvVar(self):
1231 with temporaryDirectory() as tmp_dir:
1232 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir)
1233 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag"
1234 submit = lssthtc.htc_create_submit_from_dag(str(dag_filename), {})
1235 self.assertIn("-MaxIdle 42", submit["arguments"])
1237 @unittest.mock.patch.dict(os.environ, {})
1238 def testMaxIdleGiven(self):
1239 with temporaryDirectory() as tmp_dir:
1240 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir)
1241 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag"
1242 submit = lssthtc.htc_create_submit_from_dag(str(dag_filename), {"MaxIdle": 37})
1243 self.assertIn("-MaxIdle 37", submit["arguments"])
1245 @unittest.mock.patch.dict(os.environ, {})
1246 def testNoMaxJobsIdle(self):
1247 """Note: Since the produced arguments differ depending on
1248 HTCondor version when no MaxIdle passed to from_dag, not
1249 checking arguments string here. Instead just making sure
1250 lssthtc code doesn't pass MaxIdle value to from_dag.
1251 """
1252 with temporaryDirectory() as tmp_dir:
1253 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir)
1254 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag"
1255 with unittest.mock.patch("htcondor.Submit.from_dag") as submit_mock:
1256 with unittest.mock.patch("htcondor.param") as mock_param:
1257 mock_param.__contains__.return_value = False
1258 _ = lssthtc.htc_create_submit_from_dag(str(dag_filename), {})
1259 submit_mock.assert_called_once_with(str(dag_filename), {})
1262class HtcDagTestCase(unittest.TestCase):
1263 """Test for HTCDag class."""
1265 def setUp(self):
1266 job = lssthtc.HTCJob(name="test_job")
1267 job.add_job_cmds(
1268 {
1269 "executable": "/usr/bin/echo",
1270 "arguments": "foo",
1271 "output": "test_job.$(Cluster).out",
1272 "error": "test_job.$(Cluster).out",
1273 "log": "test_job.$(Cluster).log",
1274 }
1275 )
1276 job.subfile = f"{job.name}.sub"
1278 self.dag = lssthtc.HTCDag(name="test_workflow")
1279 self.dag.add_job(job)
1281 self.subfile_expected = [
1282 "executable=/usr/bin/echo\n",
1283 "arguments=foo\n",
1284 "output=test_job.$(Cluster).out\n",
1285 "error=test_job.$(Cluster).out\n",
1286 "log=test_job.$(Cluster).log\n",
1287 "queue\n",
1288 ]
1290 def tearDown(self):
1291 pass
1293 def testWriteWithDagConfig(self):
1294 with temporaryDirectory() as tmp_dir:
1295 config = BpsConfig(Config(htcondor_config.HTC_DEFAULTS_URI))
1296 job = self.dag.nodes["test_job"]["data"]
1297 wms_config_filename = "dagman.conf"
1298 wms_configurator = dagman_configurator.DagmanConfigurator(config)
1299 wms_configurator.prepare(wms_config_filename, prefix=tmp_dir)
1300 wms_configurator.configure(self.dag)
1301 dagfile_expected = [
1302 f"CONFIG {wms_config_filename}\n",
1303 f'JOB {job.name} "{job.subfile}"\n',
1304 f"DOT {self.dag.name}.dot\n",
1305 f"NODE_STATUS_FILE {self.dag.name}.node_status\n",
1306 f'SET_JOB_ATTR bps_wms_config_path= "{wms_config_filename}"\n',
1307 ]
1309 self.dag.write(tmp_dir, "", "")
1311 self.assertIn("submit_path", self.dag.graph)
1312 self.assertEqual(self.dag.graph["submit_path"], tmp_dir)
1313 self.assertIn("dag_filename", self.dag.graph)
1314 self.assertEqual(self.dag.graph["dag_filename"], f"{self.dag.graph['name']}.dag")
1315 with open(os.path.join(tmp_dir, self.dag.graph["dag_filename"]), encoding="utf-8") as f:
1316 dagfile_actual = f.readlines()
1317 self.assertEqual(dagfile_actual, dagfile_expected)
1318 with open(os.path.join(tmp_dir, job.subfile), encoding="utf-8") as f:
1319 subfile_actual = f.readlines()
1320 self.assertEqual(subfile_actual, self.subfile_expected)
1322 def testWriteWithoutDagConfig(self):
1323 with temporaryDirectory() as tmp_dir:
1324 job = self.dag.nodes["test_job"]["data"]
1325 dagfile_expected = [
1326 f'JOB {job.name} "{job.subfile}"\n',
1327 f"DOT {self.dag.name}.dot\n",
1328 f"NODE_STATUS_FILE {self.dag.name}.node_status\n",
1329 ]
1331 self.dag.write(tmp_dir, "", "")
1333 self.assertIn("submit_path", self.dag.graph)
1334 self.assertEqual(self.dag.graph["submit_path"], tmp_dir)
1335 self.assertIn("dag_filename", self.dag.graph)
1336 self.assertEqual(self.dag.graph["dag_filename"], f"{self.dag.graph['name']}.dag")
1337 with open(os.path.join(tmp_dir, self.dag.graph["dag_filename"]), encoding="utf-8") as f:
1338 dagfile_actual = f.readlines()
1339 self.assertEqual(dagfile_actual, dagfile_expected)
1340 with open(os.path.join(tmp_dir, job.subfile), encoding="utf-8") as f:
1341 subfile_actual = f.readlines()
1342 self.assertEqual(subfile_actual, self.subfile_expected)
1345if __name__ == "__main__":
1346 unittest.main()