Coverage for tests / test_task.py: 25%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1#
2# LSST Data Management System
3# Copyright 2008, 2009, 2010 LSST Corporation.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <http://www.lsstcorp.org/LegalNotices/>.
21#
22import json
23import logging
24import numbers
25import time
26import unittest
28import yaml
30import lsst.pex.config as pexConfig
31import lsst.pipe.base as pipeBase
32import lsst.utils.tests
34# Whilst in transition the test can't tell which type is
35# going to be used for metadata.
36from lsst.pipe.base.task import _TASK_METADATA_TYPE
37from lsst.utils.timer import timeMethod
40class AddConfig(pexConfig.Config):
41 """Config for AddTask."""
43 addend = pexConfig.Field(doc="amount to add", dtype=float, default=3.1)
46class AddTask(pipeBase.Task):
47 """Example task to add two values."""
49 ConfigClass = AddConfig
51 @timeMethod
52 def run(self, val):
53 self.metadata.add("add", self.config.addend)
54 return pipeBase.Struct(
55 val=val + self.config.addend,
56 )
59class MultConfig(pexConfig.Config):
60 """Config for MultTask."""
62 multiplicand = pexConfig.Field(doc="amount by which to multiply", dtype=float, default=2.5)
65class MultTask(pipeBase.Task):
66 """Task to multiply."""
68 ConfigClass = MultConfig
70 @timeMethod
71 def run(self, val):
72 self.metadata.add("mult", self.config.multiplicand)
73 return pipeBase.Struct(
74 val=val * self.config.multiplicand,
75 )
78# prove that registry fields can also be used to hold subtasks
79# by using a registry to hold MultTask
80multRegistry = pexConfig.makeRegistry("Registry for Mult-like tasks")
81multRegistry.register("stdMult", MultTask)
84class AddMultConfig(pexConfig.Config):
85 """Config for AddMult."""
87 add = AddTask.makeField("add task")
88 mult = multRegistry.makeField("mult task", default="stdMult")
91class AddMultTask(pipeBase.Task):
92 """Test Task with subtasks."""
94 ConfigClass = AddMultConfig
95 _DefaultName = "addMult"
96 _add_module_logger_prefix = False
98 """First add, then multiply."""
100 def __init__(self, **keyArgs):
101 pipeBase.Task.__init__(self, **keyArgs)
102 self.makeSubtask("add")
103 self.makeSubtask("mult")
105 @timeMethod
106 def run(self, val):
107 with self.timer("context"):
108 addRet = self.add.run(val)
109 multRet = self.mult.run(addRet.val)
110 self.metadata.add("addmult", multRet.val)
111 return pipeBase.Struct(
112 val=multRet.val,
113 )
115 @timeMethod
116 def failDec(self):
117 """Fail with a decorator."""
118 raise RuntimeError("failDec intentional error")
120 def failCtx(self):
121 """Fail inside a context manager."""
122 with self.timer("failCtx"):
123 raise RuntimeError("failCtx intentional error")
126class AddMultTask2(AddMultTask):
127 """Subclass that gets an automatic logger prefix."""
129 _add_module_logger_prefix = True
132class AddTwiceTask(AddTask):
133 """Variant of AddTask that adds twice the addend."""
135 def run(self, val):
136 addend = self.config.addend
137 return pipeBase.Struct(val=val + (2 * addend))
140class TaskTestCase(unittest.TestCase):
141 """A test case for Task."""
143 def testBasics(self):
144 """Test basic construction and use of a task."""
145 for addend in (1.1, -3.5):
146 for multiplicand in (0.9, -45.0):
147 config = AddMultTask.ConfigClass()
148 config.add.addend = addend
149 config.mult["stdMult"].multiplicand = multiplicand
150 # make sure both ways of accessing the registry work and give
151 # the same result
152 self.assertEqual(config.mult.active.multiplicand, multiplicand)
153 addMultTask = AddMultTask(config=config)
154 for val in (-1.0, 0.0, 17.5):
155 ret = addMultTask.run(val=val)
156 self.assertAlmostEqual(ret.val, (val + addend) * multiplicand)
158 def testNames(self):
159 """Test getName() and getFullName()."""
160 addMultTask = AddMultTask()
161 self.assertEqual(addMultTask.getName(), "addMult")
162 self.assertEqual(addMultTask.add.getName(), "add")
163 self.assertEqual(addMultTask.mult.getName(), "mult")
165 self.assertEqual(addMultTask._name, "addMult")
166 self.assertEqual(addMultTask.add._name, "add")
167 self.assertEqual(addMultTask.mult._name, "mult")
169 self.assertEqual(addMultTask.getFullName(), "addMult")
170 self.assertEqual(addMultTask.add.getFullName(), "addMult.add")
171 self.assertEqual(addMultTask.mult.getFullName(), "addMult.mult")
173 self.assertEqual(addMultTask._fullName, "addMult")
174 self.assertEqual(addMultTask.add._fullName, "addMult.add")
175 self.assertEqual(addMultTask.mult._fullName, "addMult.mult")
177 def testLog(self):
178 """Test the Task's logger."""
179 addMultTask = AddMultTask()
180 self.assertEqual(addMultTask.log.name, "addMult")
181 self.assertEqual(addMultTask.add.log.name, "addMult.add")
183 log = logging.getLogger("tester")
184 addMultTask = AddMultTask(log=log)
185 self.assertEqual(addMultTask.log.name, "tester.addMult")
186 self.assertEqual(addMultTask.add.log.name, "tester.addMult.add")
188 addMultTask2 = AddMultTask2()
189 self.assertEqual(addMultTask2.log.name, f"{__name__}.addMult")
191 def testGetFullMetadata(self):
192 """Test getFullMetadata()."""
193 addMultTask = AddMultTask()
194 addMultTask.run(val=1.234) # Add some metadata
195 fullMetadata = addMultTask.getFullMetadata()
196 self.assertIsInstance(fullMetadata["addMult"], _TASK_METADATA_TYPE)
197 self.assertIsInstance(fullMetadata["addMult:add"], _TASK_METADATA_TYPE)
198 self.assertIsInstance(fullMetadata["addMult:mult"], _TASK_METADATA_TYPE)
199 self.assertEqual(set(fullMetadata), {"addMult", "addMult:add", "addMult:mult"})
201 all_names = fullMetadata.names()
202 self.assertIn("addMult", all_names)
203 self.assertIn("addMult.runStartUtc", all_names)
205 param_names = fullMetadata.paramNames(topLevelOnly=True)
206 # No top level keys without hierarchy
207 self.assertEqual(set(param_names), set())
209 param_names = fullMetadata.paramNames(topLevelOnly=False)
210 self.assertNotIn("addMult", param_names)
211 self.assertIn("addMult.runStartUtc", param_names)
212 self.assertIn("addMult:add.runStartCpuTime", param_names)
214 def testEmptyMetadata(self):
215 task = AddMultTask()
216 task.run(val=1.2345)
217 task.emptyMetadata()
218 fullMetadata = task.getFullMetadata()
219 self.assertEqual(len(fullMetadata["addMult"]), 0)
220 self.assertEqual(len(fullMetadata["addMult:add"]), 0)
221 self.assertEqual(len(fullMetadata["addMult:mult"]), 0)
223 def testReplace(self):
224 """Test replacing one subtask with another."""
225 for addend in (1.1, -3.5):
226 for multiplicand in (0.9, -45.0):
227 config = AddMultTask.ConfigClass()
228 config.add.retarget(AddTwiceTask)
229 config.add.addend = addend
230 config.mult["stdMult"].multiplicand = multiplicand
231 addMultTask = AddMultTask(config=config)
232 for val in (-1.0, 0.0, 17.5):
233 ret = addMultTask.run(val=val)
234 self.assertAlmostEqual(ret.val, (val + (2 * addend)) * multiplicand)
236 def testFail(self):
237 """Test timers when the code they are timing fails."""
238 addMultTask = AddMultTask()
239 try:
240 addMultTask.failDec()
241 self.fail("Expected RuntimeError")
242 except RuntimeError:
243 self.assertIn("failDecEndCpuTime", addMultTask.metadata)
244 try:
245 addMultTask.failCtx()
246 self.fail("Expected RuntimeError")
247 except RuntimeError:
248 self.assertIn("failCtxEndCpuTime", addMultTask.metadata)
250 def testTimeMethod(self):
251 """Test that the timer is adding the right metadata."""
252 addMultTask = AddMultTask()
254 # Run twice to ensure we are additive.
255 addMultTask.run(val=1.1)
256 addMultTask.run(val=2.0)
257 # Check existence and type
258 for key, keyType in (
259 ("Utc", str),
260 ("CpuTime", float),
261 ("UserTime", float),
262 ("SystemTime", float),
263 ("MaxResidentSetSize", numbers.Integral),
264 ("MinorPageFaults", numbers.Integral),
265 ("MajorPageFaults", numbers.Integral),
266 ("BlockInputs", numbers.Integral),
267 ("BlockOutputs", numbers.Integral),
268 ("VoluntaryContextSwitches", numbers.Integral),
269 ("InvoluntaryContextSwitches", numbers.Integral),
270 ):
271 for when in ("Start", "End"):
272 for method in ("run", "context"):
273 name = method + when + key
274 self.assertIn(name, addMultTask.metadata, name + " is missing from task metadata")
275 self.assertIsInstance(
276 addMultTask.metadata.getScalar(name),
277 keyType,
278 f"{name} is not of the right type "
279 f"({keyType} vs {type(addMultTask.metadata.getScalar(name))})",
280 )
281 # Some basic sanity checks
282 currCpuTime = time.process_time()
283 self.assertLessEqual(
284 addMultTask.metadata.getScalar("runStartCpuTime"),
285 addMultTask.metadata.getScalar("runEndCpuTime"),
286 )
287 self.assertLessEqual(addMultTask.metadata.getScalar("runEndCpuTime"), currCpuTime)
288 self.assertLessEqual(
289 addMultTask.metadata.getScalar("contextStartCpuTime"),
290 addMultTask.metadata.getScalar("contextEndCpuTime"),
291 )
292 self.assertLessEqual(addMultTask.metadata.getScalar("contextEndCpuTime"), currCpuTime)
293 self.assertLessEqual(
294 addMultTask.add.metadata.getScalar("runStartCpuTime"),
295 addMultTask.metadata.getScalar("runEndCpuTime"),
296 )
297 self.assertLessEqual(addMultTask.add.metadata.getScalar("runEndCpuTime"), currCpuTime)
299 # Add some explicit values for serialization test.
300 addMultTask.metadata["comment"] = "A comment"
301 addMultTask.metadata["integer"] = 5
302 addMultTask.metadata["float"] = 3.14
303 addMultTask.metadata["bool"] = False
304 addMultTask.metadata.add("commentList", "comment1")
305 addMultTask.metadata.add("commentList", "comment1")
306 addMultTask.metadata.add("intList", 6)
307 addMultTask.metadata.add("intList", 7)
308 addMultTask.metadata.add("boolList", False)
309 addMultTask.metadata.add("boolList", True)
310 addMultTask.metadata.add("floatList", 6.6)
311 addMultTask.metadata.add("floatList", 7.8)
313 # TaskMetadata can serialize to JSON but not YAML
314 # and PropertySet can serialize to YAML and not JSON.
315 if hasattr(addMultTask.metadata, "json"):
316 j = addMultTask.metadata.model_dump_json()
317 new_meta = pipeBase.TaskMetadata.model_validate(json.loads(j))
318 else:
319 y = yaml.dump(addMultTask.metadata)
320 new_meta = yaml.safe_load(y)
321 self.assertEqual(new_meta, addMultTask.metadata)
323 def test_annotate_exception(self):
324 """Test annotating failures in the task metadata when a non-Task
325 exception is raised (when there is no `metadata` on the exception).
326 """
327 task = AddMultTask()
328 msg = "something failed!"
329 error = ValueError(msg)
330 with self.assertLogs("addMult", level="DEBUG") as cm:
331 pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
332 self.assertIn(msg, "\n".join(cm.output))
333 self.assertEqual(task.metadata["failure"]["message"], msg)
334 self.assertEqual(task.metadata["failure"]["type"], "ValueError")
335 self.assertNotIn("metadata", task.metadata["failure"])
337 def test_annotate_task_exception(self):
338 """Test annotating failures in the task metadata when a Task-specific
339 exception is raised.
340 """
342 class TestError(pipeBase.AlgorithmError):
343 @property
344 def metadata(self):
345 return {"something": 12345}
347 task = AddMultTask()
348 msg = "something failed!"
349 error = TestError(msg)
350 with self.assertLogs("addMult", level="DEBUG") as cm:
351 pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
352 self.assertIn(msg, "\n".join(cm.output))
353 self.assertEqual(task.metadata["failure"]["message"], msg)
354 result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError"
355 self.assertEqual(task.metadata["failure"]["type"], result)
356 self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345)
358 def test_AlgorithmError(self):
359 """Test that AlgorithmError checks for abstractness;
360 see https://github.com/python/cpython/issues/50246
361 """
363 class StillAbstractError(pipeBase.AlgorithmError):
364 pass
366 with self.assertRaisesRegex(TypeError, "with abstract methods: metadata"):
367 StillAbstractError()
370class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
371 """Run file leak tests."""
374def setup_module(module):
375 """Configure pytest."""
376 lsst.utils.tests.init()
379if __name__ == "__main__":
380 lsst.utils.tests.init()
381 unittest.main()