Coverage for tests / test_prepare_utils.py: 24%
308 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:50 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Unit tests for prepare utility functions."""
30import logging
31import os
32import unittest
34from lsst.ctrl.bps import (
35 BPS_DEFAULTS,
36 BPS_SEARCH_ORDER,
37 BpsConfig,
38 GenericWorkflow,
39 GenericWorkflowExec,
40 GenericWorkflowFile,
41 GenericWorkflowJob,
42)
43from lsst.ctrl.bps.htcondor import prepare_utils
44from lsst.ctrl.bps.tests.gw_test_utils import make_3_label_workflow, make_3_label_workflow_groups_sort
46logger = logging.getLogger("lsst.ctrl.bps.htcondor")
48TESTDIR = os.path.abspath(os.path.dirname(__file__))
51class TranslateJobCmdsTestCase(unittest.TestCase):
52 """Test _translate_job_cmds method."""
54 def setUp(self):
55 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask")
56 self.cached_vals = {"profile": {}, "bpsUseShared": True, "memoryLimit": 32768}
58 def testRetryUnlessNone(self):
59 gwjob = GenericWorkflowJob("retryUnless", "label1", executable=self.gw_exec)
60 gwjob.retry_unless_exit = None
61 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
62 self.assertNotIn("retry_until", htc_commands)
64 def testRetryUnlessInt(self):
65 gwjob = GenericWorkflowJob("retryUnlessInt", "label1", executable=self.gw_exec)
66 gwjob.retry_unless_exit = 3
67 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
68 self.assertEqual(int(htc_commands["retry_until"]), gwjob.retry_unless_exit)
70 def testRetryUnlessList(self):
71 gwjob = GenericWorkflowJob("retryUnlessList", "label1", executable=self.gw_exec)
72 gwjob.retry_unless_exit = [1, 2]
73 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
74 self.assertEqual(htc_commands["retry_until"], "member(ExitCode, {1,2})")
76 def testRetryUnlessBad(self):
77 gwjob = GenericWorkflowJob("retryUnlessBad", "label1", executable=self.gw_exec)
78 gwjob.retry_unless_exit = "1,2,3"
79 with self.assertRaises(ValueError) as cm:
80 _ = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
81 self.assertIn("retryUnlessExit", str(cm.exception))
83 def testEnvironmentBasic(self):
84 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec)
85 gwjob.environment = {"TEST_INT": 1, "TEST_STR": "TWO"}
86 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
87 self.assertEqual(htc_commands["environment"], "TEST_INT=1 TEST_STR='TWO'")
89 def testEnvironmentSpaces(self):
90 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec)
91 gwjob.environment = {"TEST_SPACES": "spacey value"}
92 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
93 self.assertEqual(htc_commands["environment"], "TEST_SPACES='spacey value'")
95 def testEnvironmentSingleQuotes(self):
96 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec)
97 gwjob.environment = {"TEST_SINGLE_QUOTES": "spacey 'quoted' value"}
98 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
99 self.assertEqual(htc_commands["environment"], "TEST_SINGLE_QUOTES='spacey ''quoted'' value'")
101 def testEnvironmentDoubleQuotes(self):
102 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec)
103 gwjob.environment = {"TEST_DOUBLE_QUOTES": 'spacey "double" value'}
104 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
105 self.assertEqual(htc_commands["environment"], """TEST_DOUBLE_QUOTES='spacey ""double"" value'""")
107 def testEnvironmentWithEnvVars(self):
108 gwjob = GenericWorkflowJob("jobEnvironment", "label1", executable=self.gw_exec)
109 gwjob.environment = {"TEST_ENV_VAR": "<ENV:CTRL_BPS_DIR>/tests"}
110 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
111 self.assertEqual(htc_commands["environment"], "TEST_ENV_VAR='$ENV(CTRL_BPS_DIR)/tests'")
113 def testPeriodicRelease(self):
114 gwjob = GenericWorkflowJob("periodicRelease", "label1", executable=self.gw_exec)
115 gwjob.request_memory = 2048
116 gwjob.memory_multiplier = 2
117 gwjob.number_of_retries = 3
118 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
119 release = (
120 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && "
121 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || "
122 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && "
123 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768"
124 )
125 self.assertEqual(htc_commands["periodic_release"], release)
127 def testPeriodicRemoveNoRetries(self):
128 gwjob = GenericWorkflowJob("periodicRelease", "label1", executable=self.gw_exec)
129 gwjob.request_memory = 2048
130 gwjob.memory_multiplier = 1
131 gwjob.number_of_retries = 0
132 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
133 remove = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)"
134 self.assertEqual(htc_commands["periodic_remove"], remove)
135 self.assertEqual(htc_commands["max_retries"], 0)
137 def testProfileJobCommands(self):
138 requirement_str = 'Machine == "node01.cluster.local"'
139 gwjob = GenericWorkflowJob("requirements", "label1", executable=self.gw_exec)
140 gwjob.request_memory = 2048
141 gwjob.memory_multiplier = 1
142 gwjob.number_of_retries = 0
143 gwjob.profile = {"requirements": requirement_str}
144 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, None, gwjob)
145 self.assertEqual(htc_commands["requirements"], requirement_str)
147 def testProfileCached(self):
148 requirement_str = 'Machine == "node01.cluster.local"'
149 gwjob = GenericWorkflowJob("requirements", "label1", executable=self.gw_exec)
150 gwjob.request_memory = 2048
151 gwjob.memory_multiplier = 1
152 gwjob.number_of_retries = 0
153 cached_vals = dict(self.cached_vals)
154 cached_vals["profile"] = {"requirements": requirement_str}
155 htc_commands = prepare_utils._translate_job_cmds(cached_vals, None, gwjob)
156 self.assertEqual(htc_commands["requirements"], requirement_str)
158 def testArgumentsReplaceWmsVars(self):
159 gwjob = GenericWorkflowJob("job1", "label1", executable=self.gw_exec)
160 gw = GenericWorkflow("test1")
161 gw.add_job(gwjob)
162 gwjob.request_cpus = 1
163 gwjob.request_memory = 2048
164 gwjob.arguments = "run-qbb repo test.qg --summary /a/b/t/jobs/c/d/job-<WMS:attemptNum>-summary.json"
165 new_arguments = "run-qbb repo test.qg --summary /a/b/t/jobs/c/d/job-$$([NumJobStarts])-summary.json"
166 htc_commands = prepare_utils._translate_job_cmds(self.cached_vals, gw, gwjob)
167 self.assertEqual(htc_commands["arguments"], new_arguments)
170class TranslateDagCmdsTestCase(unittest.TestCase):
171 """Test _translate_dag_cmds method."""
173 def setUp(self):
174 self.gw_exec = GenericWorkflowExec("test_exec", "/dummy/dir/pipetask")
176 def testPriority(self):
177 gwjob = GenericWorkflowJob("priority", "label1", executable=self.gw_exec)
178 gwjob.priority = 100
179 dag_commands = prepare_utils._translate_dag_cmds(gwjob)
180 self.assertEqual(dag_commands["priority"], 100)
183class GroupToSubdagTestCase(unittest.TestCase):
184 """Test _group_to_subdag function."""
186 def testBlocking(self):
187 gw = make_3_label_workflow_groups_sort("test1", True)
188 gwjob = gw.get_job("group_order1_10001")
189 config = BpsConfig(
190 {},
191 search_order=BPS_SEARCH_ORDER,
192 defaults=BPS_DEFAULTS,
193 )
195 htc_job = prepare_utils._group_to_subdag(config, gwjob, "the_prefix")
196 self.assertEqual(len(htc_job.subdag), len(gwjob))
199class GatherSiteValuesTestCase(unittest.TestCase):
200 """Test _gather_site_values function."""
202 def testAllThere(self):
203 config = BpsConfig(
204 {},
205 search_order=BPS_SEARCH_ORDER,
206 defaults=BPS_DEFAULTS,
207 )
208 compute_site = "notThere"
209 results = prepare_utils._gather_site_values(config, compute_site)
210 self.assertEqual(results["memoryLimit"], BPS_DEFAULTS["memoryLimit"])
212 def testNotSpecified(self):
213 config = BpsConfig(
214 {},
215 search_order=BPS_SEARCH_ORDER,
216 defaults=BPS_DEFAULTS,
217 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
218 )
219 compute_site = "notThere"
220 results = prepare_utils._gather_site_values(config, compute_site)
221 self.assertEqual(results["memoryLimit"], BPS_DEFAULTS["memoryLimit"])
223 def testGlobalNodeset(self):
224 config = BpsConfig(
225 {"nodeset": "global_node_set_{campaign}", "campaign": "DRP"},
226 search_order=BPS_SEARCH_ORDER,
227 defaults=BPS_DEFAULTS,
228 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
229 )
230 compute_site = "fr"
231 results = prepare_utils._gather_site_values(config, compute_site)
232 self.assertEqual(results["nodeset"], "global_node_set_DRP")
234 def testSiteNodeset(self):
235 config = BpsConfig(
236 {
237 "nodeset": "global_node_set_{campaign}",
238 "campaign": "DRP",
239 "site": {"fr": {"nodeset": "fr_node_set_{campaign}"}},
240 },
241 search_order=BPS_SEARCH_ORDER,
242 defaults=BPS_DEFAULTS,
243 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
244 )
245 compute_site = "fr"
246 results = prepare_utils._gather_site_values(config, compute_site)
247 self.assertEqual(results["nodeset"], "fr_node_set_DRP")
249 def testAttrsProfile(self):
250 test_values = {
251 "bpsNodeset": "DEVSET",
252 "site": {
253 "mycomputer": {
254 "profile": {
255 "condor": {
256 "requirements": '( TARGET.Nodeset == "{bpsNodeset}" )',
257 "+JobNodeset": "{bpsNodeset}",
258 }
259 }
260 }
261 },
262 }
263 config = BpsConfig(
264 test_values,
265 search_order=BPS_SEARCH_ORDER,
266 defaults=BPS_DEFAULTS,
267 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
268 )
269 results = prepare_utils._gather_site_values(config, "mycomputer")
270 self.assertEqual(results["profile"], {"requirements": '( TARGET.Nodeset == "DEVSET" )'})
271 self.assertEqual(results["attrs"], {"JobNodeset": "DEVSET"})
274class GatherLabelValuesTestCase(unittest.TestCase):
275 """Test _gather_labels_values function."""
277 def testClusterLabel(self):
278 # Test cluster value overrides pipetask.
279 label = "label1"
280 config = BpsConfig(
281 {
282 "cluster": {
283 "label1": {
284 "releaseExpr": "cluster_val",
285 "overwriteJobFiles": False,
286 "profile": {"condor": {"prof_val1": 3}},
287 }
288 },
289 "pipetask": {"label1": {"releaseExpr": "pipetask_val"}},
290 },
291 search_order=BPS_SEARCH_ORDER,
292 defaults=BPS_DEFAULTS,
293 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
294 )
295 results = prepare_utils._gather_label_values(config, label)
296 self.assertEqual(
297 results,
298 {
299 "attrs": {},
300 "profile": {"prof_val1": 3},
301 "releaseExpr": "cluster_val",
302 "overwriteJobFiles": False,
303 },
304 )
306 def testPipetaskLabel(self):
307 label = "label1"
308 config = BpsConfig(
309 {
310 "pipetask": {
311 "label1": {
312 "releaseExpr": "pipetask_val",
313 "overwriteJobFiles": False,
314 "profile": {"condor": {"prof_val1": 3}},
315 }
316 }
317 },
318 search_order=BPS_SEARCH_ORDER,
319 defaults=BPS_DEFAULTS,
320 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
321 )
322 results = prepare_utils._gather_label_values(config, label)
323 self.assertEqual(
324 results,
325 {
326 "attrs": {},
327 "profile": {"prof_val1": 3},
328 "releaseExpr": "pipetask_val",
329 "overwriteJobFiles": False,
330 },
331 )
333 def testNoSection(self):
334 label = "notThere"
335 config = BpsConfig(
336 {},
337 search_order=BPS_SEARCH_ORDER,
338 defaults=BPS_DEFAULTS,
339 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
340 )
341 results = prepare_utils._gather_label_values(config, label)
342 self.assertEqual(results, {"attrs": {}, "profile": {}, "overwriteJobFiles": True})
344 def testNoOverwriteSpecified(self):
345 label = "notthere"
346 config = BpsConfig(
347 {},
348 search_order=BPS_SEARCH_ORDER,
349 defaults={},
350 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
351 )
352 results = prepare_utils._gather_label_values(config, label)
353 self.assertEqual(results, {"attrs": {}, "profile": {}, "overwriteJobFiles": True})
355 def testFinalJob(self):
356 label = "finalJob"
357 config = BpsConfig(
358 {"finalJob": {"profile": {"condor": {"prof_val2": 6, "+attr_val1": 5}}}},
359 search_order=BPS_SEARCH_ORDER,
360 defaults=BPS_DEFAULTS,
361 wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService",
362 )
363 results = prepare_utils._gather_label_values(config, label)
364 self.assertEqual(
365 results, {"attrs": {"attr_val1": 5}, "profile": {"prof_val2": 6}, "overwriteJobFiles": False}
366 )
369class CreateCheckJobTestCase(unittest.TestCase):
370 """Test _create_check_job function."""
372 def testSuccess(self):
373 group_job_name = "group_order1_val1a"
374 job_label = "order1"
375 job = prepare_utils._create_check_job(group_job_name, job_label)
376 self.assertIn(group_job_name, job.name)
377 self.assertEqual(job.label, job_label)
378 self.assertIn("check_group_status.sub", job.subfile)
381class CreatePeriodicReleaseExprTestCase(unittest.TestCase):
382 """Test _create_periodic_release_expr function."""
384 def testNoReleaseExpr(self):
385 results = prepare_utils._create_periodic_release_expr(2048, 1, 32768, "")
386 self.assertEqual(results, "")
388 def testMultiplierNone(self):
389 results = prepare_utils._create_periodic_release_expr(2048, None, 32768, "")
390 self.assertEqual(results, "")
392 def testJustMemoryReleaseExpr(self):
393 self.maxDiff = None # so test error shows entire strings
394 results = prepare_utils._create_periodic_release_expr(2048, 2, 32768, "")
395 truth = (
396 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && "
397 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || "
398 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && "
399 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768"
400 )
401 self.assertEqual(results, truth)
403 def testJustUserReleaseExpr(self):
404 results = prepare_utils._create_periodic_release_expr(2048, 1, 32768, "True")
405 truth = "JobStatus == 5 && NumJobStarts <= JobMaxRetries && HoldReasonCode =!= 1 && True"
406 self.assertEqual(results, truth)
408 def testJustUserReleaseExprMultiplierNone(self):
409 results = prepare_utils._create_periodic_release_expr(2048, None, 32768, "True")
410 truth = "JobStatus == 5 && NumJobStarts <= JobMaxRetries && HoldReasonCode =!= 1 && True"
411 self.assertEqual(results, truth)
413 def testMemoryAndUserReleaseExpr(self):
414 self.maxDiff = None # so test error shows entire strings
415 results = prepare_utils._create_periodic_release_expr(2048, 2, 32768, "True")
416 truth = (
417 "JobStatus == 5 && NumJobStarts <= JobMaxRetries && "
418 "((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || "
419 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && "
420 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) < 32768 || "
421 "HoldReasonCode =!= 1 && True)"
422 )
423 self.assertEqual(results, truth)
426class CreatePeriodicRemoveExprTestCase(unittest.TestCase):
427 """Test _create_periodic_release_expr function."""
429 def testBasicRemoveExpr(self):
430 """Function assumes only called if max_retries >= 0."""
431 results = prepare_utils._create_periodic_remove_expr(2048, 1, 32768)
432 truth = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)"
433 self.assertEqual(results, truth)
435 def testBasicRemoveExprMultiplierNone(self):
436 """Function assumes only called if max_retries >= 0."""
437 results = prepare_utils._create_periodic_remove_expr(2048, None, 32768)
438 truth = "JobStatus == 5 && (NumJobStarts > JobMaxRetries)"
439 self.assertEqual(results, truth)
441 def testMemoryRemoveExpr(self):
442 self.maxDiff = None # so test error shows entire strings
443 results = prepare_utils._create_periodic_remove_expr(2048, 2, 32768)
444 truth = (
445 "JobStatus == 5 && (NumJobStarts > JobMaxRetries || "
446 "((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 || "
447 "HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && "
448 "min({int(2048 * pow(2, NumJobStarts - 1)), 32768}) == 32768))"
449 )
450 self.assertEqual(results, truth)
453class HandleJobOutputsTestCase(unittest.TestCase):
454 """Test _handle_job_outputs function."""
456 def setUp(self):
457 self.job_name = "test_job"
458 self.out_prefix = "/test/prefix"
460 def tearDown(self):
461 pass
463 def testNoOutputsSharedFilesystem(self):
464 """Test with shared filesystem and no outputs."""
465 mock_workflow = unittest.mock.Mock()
466 mock_workflow.get_job_outputs.return_value = []
468 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, True, self.out_prefix)
470 self.assertEqual(result, {"transfer_output_files": '""'})
472 def testWithOutputsSharedFilesystem(self):
473 """Test with shared filesystem and outputs present (still empty)."""
474 mock_workflow = unittest.mock.Mock()
475 mock_workflow.get_job_outputs.return_value = [
476 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt")
477 ]
479 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, True, self.out_prefix)
481 self.assertEqual(result, {"transfer_output_files": '""'})
483 def testNoOutputsNoSharedFilesystem(self):
484 """Test without shared filesystem and no outputs."""
485 mock_workflow = unittest.mock.Mock()
486 mock_workflow.get_job_outputs.return_value = []
488 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix)
490 self.assertEqual(result, {"transfer_output_files": '""'})
492 def testWithAnOutputNoSharedFilesystem(self):
493 """Test without shared filesystem and single output file."""
494 mock_workflow = unittest.mock.Mock()
495 mock_workflow.get_job_outputs.return_value = [
496 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt")
497 ]
499 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix)
501 expected = {
502 "transfer_output_files": "output.txt",
503 "transfer_output_remaps": '"output.txt=/path/to/output.txt"',
504 }
505 self.assertEqual(result, expected)
507 def testWithOutputsNoSharedFilesystem(self):
508 """Test without shared filesystem and multiple output files."""
509 mock_workflow = unittest.mock.Mock()
510 mock_workflow.get_job_outputs.return_value = [
511 GenericWorkflowFile(name="output1.txt", src_uri="/path/output1.txt"),
512 GenericWorkflowFile(name="output2.txt", src_uri="/another/path/output2.txt"),
513 ]
515 result = prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix)
517 expected = {
518 "transfer_output_files": "output1.txt,output2.txt",
519 "transfer_output_remaps": '"output1.txt=/path/output1.txt;output2.txt=/another/path/output2.txt"',
520 }
521 self.assertEqual(result, expected)
523 @unittest.mock.patch("lsst.ctrl.bps.htcondor.prepare_utils._LOG")
524 def testLogging(self, mock_log):
525 mock_workflow = unittest.mock.Mock()
526 mock_workflow.get_job_outputs.return_value = [
527 GenericWorkflowFile(name="output.txt", src_uri="/path/to/output.txt")
528 ]
530 prepare_utils._handle_job_outputs(mock_workflow, self.job_name, False, self.out_prefix)
532 self.assertTrue(mock_log.debug.called)
533 debug_calls = mock_log.debug.call_args_list
534 self.assertTrue(any("src_uri=" in str(call) for call in debug_calls))
535 self.assertTrue(any("transfer_output_files=" in str(call) for call in debug_calls))
536 self.assertTrue(any("transfer_output_remaps=" in str(call) for call in debug_calls))
539class CreateJobTestCase(unittest.TestCase):
540 """Test _create_job function."""
542 def setUp(self):
543 self.generic_workflow = make_3_label_workflow("test1", True)
544 self.template = "{label}/{tract}/{patch}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}"
546 def testNoOverwrite(self):
547 cached_values = {
548 "bpsUseShared": True,
549 "overwriteJobFiles": False,
550 "memoryLimit": 491520,
551 "profile": {},
552 "attrs": {},
553 }
554 gwjob = self.generic_workflow.get_final()
555 out_prefix = "submit"
556 htc_job = prepare_utils._create_job(
557 self.template, cached_values, self.generic_workflow, gwjob, out_prefix
558 )
559 self.assertEqual(htc_job.name, gwjob.name)
560 self.assertEqual(htc_job.label, gwjob.label)
561 self.assertIn("NumJobStarts", htc_job.cmds["output"])
562 self.assertIn("NumJobStarts", htc_job.cmds["error"])
563 self.assertNotIn("NumJobStarts", htc_job.cmds["log"])
564 self.assertTrue(htc_job.cmds["error"].endswith(".out"))
565 self.assertTrue(htc_job.cmds["output"].endswith(".out"))
566 self.assertTrue(htc_job.cmds["log"].endswith(".log"))
568 def testNodesetWithNoRequirements(self):
569 cached_values = {
570 "bpsUseShared": True,
571 "overwriteJobFiles": False,
572 "memoryLimit": 491520,
573 "profile": {},
574 "attrs": {},
575 "nodeset": "set1",
576 }
577 gwjob = self.generic_workflow.get_job("label1_10002_11")
578 out_prefix = "temp"
579 htc_job = prepare_utils._create_job(
580 self.template, cached_values, self.generic_workflow, gwjob, out_prefix
581 )
582 self.assertEqual(htc_job.cmds["requirements"], '( Target.Nodeset == "set1" )')
583 self.assertEqual(htc_job.attrs["JobNodeset"], "set1")
585 def testNodesetWithRequirements(self):
586 cached_values = {
587 "bpsUseShared": True,
588 "overwriteJobFiles": False,
589 "memoryLimit": 491520,
590 "profile": {"requirements": "dummy_val == 3"},
591 "attrs": {},
592 "nodeset": "set1",
593 }
594 gwjob = self.generic_workflow.get_job("label1_10002_11")
595 out_prefix = "temp"
596 htc_job = prepare_utils._create_job(
597 self.template, cached_values, self.generic_workflow, gwjob, out_prefix
598 )
599 self.assertEqual(htc_job.cmds["requirements"], '(dummy_val == 3) && ( Target.Nodeset == "set1" )')
600 self.assertEqual(htc_job.attrs["JobNodeset"], "set1")
603class ReplaceWmsVarsTestCase(unittest.TestCase):
604 """Test _replace_wms_vars function."""
606 def testNoWmsVar(self):
607 orig_string = "whatever <Other:notThere> whatnot"
608 updated_string = prepare_utils._replace_wms_vars(orig_string)
609 self.assertEqual(orig_string, updated_string)
611 def testAttemptNum(self):
612 orig_string = "whatever <WMS:attemptNum> whatnot"
613 updated_string = prepare_utils._replace_wms_vars(orig_string)
614 self.assertEqual("whatever $$([NumJobStarts]) whatnot", updated_string)
616 def testUnrecognized(self):
617 orig_string = "whatever <WMS:notThere> whatnot"
618 with self.assertLogs(level="INFO") as cm_log:
619 with self.assertRaises(KeyError):
620 _ = prepare_utils._replace_wms_vars(orig_string)
621 self.assertRegex(cm_log.output[0], "Unrecognized WMS placeholder: notThere")
624if __name__ == "__main__":
625 unittest.main()