Coverage for tests / test_task.py: 25%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008, 2009, 2010 LSST Corporation. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <http://www.lsstcorp.org/LegalNotices/>. 

21# 

22import json 

23import logging 

24import numbers 

25import time 

26import unittest 

27 

28import yaml 

29 

30import lsst.pex.config as pexConfig 

31import lsst.pipe.base as pipeBase 

32import lsst.utils.tests 

33 

34# Whilst in transition the test can't tell which type is 

35# going to be used for metadata. 

36from lsst.pipe.base.task import _TASK_METADATA_TYPE 

37from lsst.utils.timer import timeMethod 

38 

39 

40class AddConfig(pexConfig.Config): 

41 """Config for AddTask.""" 

42 

43 addend = pexConfig.Field(doc="amount to add", dtype=float, default=3.1) 

44 

45 

46class AddTask(pipeBase.Task): 

47 """Example task to add two values.""" 

48 

49 ConfigClass = AddConfig 

50 

51 @timeMethod 

52 def run(self, val): 

53 self.metadata.add("add", self.config.addend) 

54 return pipeBase.Struct( 

55 val=val + self.config.addend, 

56 ) 

57 

58 

59class MultConfig(pexConfig.Config): 

60 """Config for MultTask.""" 

61 

62 multiplicand = pexConfig.Field(doc="amount by which to multiply", dtype=float, default=2.5) 

63 

64 

65class MultTask(pipeBase.Task): 

66 """Task to multiply.""" 

67 

68 ConfigClass = MultConfig 

69 

70 @timeMethod 

71 def run(self, val): 

72 self.metadata.add("mult", self.config.multiplicand) 

73 return pipeBase.Struct( 

74 val=val * self.config.multiplicand, 

75 ) 

76 

77 

78# prove that registry fields can also be used to hold subtasks 

79# by using a registry to hold MultTask 

80multRegistry = pexConfig.makeRegistry("Registry for Mult-like tasks") 

81multRegistry.register("stdMult", MultTask) 

82 

83 

84class AddMultConfig(pexConfig.Config): 

85 """Config for AddMult.""" 

86 

87 add = AddTask.makeField("add task") 

88 mult = multRegistry.makeField("mult task", default="stdMult") 

89 

90 

91class AddMultTask(pipeBase.Task): 

92 """Test Task with subtasks.""" 

93 

94 ConfigClass = AddMultConfig 

95 _DefaultName = "addMult" 

96 _add_module_logger_prefix = False 

97 

98 """First add, then multiply.""" 

99 

100 def __init__(self, **keyArgs): 

101 pipeBase.Task.__init__(self, **keyArgs) 

102 self.makeSubtask("add") 

103 self.makeSubtask("mult") 

104 

105 @timeMethod 

106 def run(self, val): 

107 with self.timer("context"): 

108 addRet = self.add.run(val) 

109 multRet = self.mult.run(addRet.val) 

110 self.metadata.add("addmult", multRet.val) 

111 return pipeBase.Struct( 

112 val=multRet.val, 

113 ) 

114 

115 @timeMethod 

116 def failDec(self): 

117 """Fail with a decorator.""" 

118 raise RuntimeError("failDec intentional error") 

119 

120 def failCtx(self): 

121 """Fail inside a context manager.""" 

122 with self.timer("failCtx"): 

123 raise RuntimeError("failCtx intentional error") 

124 

125 

126class AddMultTask2(AddMultTask): 

127 """Subclass that gets an automatic logger prefix.""" 

128 

129 _add_module_logger_prefix = True 

130 

131 

132class AddTwiceTask(AddTask): 

133 """Variant of AddTask that adds twice the addend.""" 

134 

135 def run(self, val): 

136 addend = self.config.addend 

137 return pipeBase.Struct(val=val + (2 * addend)) 

138 

139 

140class TaskTestCase(unittest.TestCase): 

141 """A test case for Task.""" 

142 

143 def testBasics(self): 

144 """Test basic construction and use of a task.""" 

145 for addend in (1.1, -3.5): 

146 for multiplicand in (0.9, -45.0): 

147 config = AddMultTask.ConfigClass() 

148 config.add.addend = addend 

149 config.mult["stdMult"].multiplicand = multiplicand 

150 # make sure both ways of accessing the registry work and give 

151 # the same result 

152 self.assertEqual(config.mult.active.multiplicand, multiplicand) 

153 addMultTask = AddMultTask(config=config) 

154 for val in (-1.0, 0.0, 17.5): 

155 ret = addMultTask.run(val=val) 

156 self.assertAlmostEqual(ret.val, (val + addend) * multiplicand) 

157 

158 def testNames(self): 

159 """Test getName() and getFullName().""" 

160 addMultTask = AddMultTask() 

161 self.assertEqual(addMultTask.getName(), "addMult") 

162 self.assertEqual(addMultTask.add.getName(), "add") 

163 self.assertEqual(addMultTask.mult.getName(), "mult") 

164 

165 self.assertEqual(addMultTask._name, "addMult") 

166 self.assertEqual(addMultTask.add._name, "add") 

167 self.assertEqual(addMultTask.mult._name, "mult") 

168 

169 self.assertEqual(addMultTask.getFullName(), "addMult") 

170 self.assertEqual(addMultTask.add.getFullName(), "addMult.add") 

171 self.assertEqual(addMultTask.mult.getFullName(), "addMult.mult") 

172 

173 self.assertEqual(addMultTask._fullName, "addMult") 

174 self.assertEqual(addMultTask.add._fullName, "addMult.add") 

175 self.assertEqual(addMultTask.mult._fullName, "addMult.mult") 

176 

177 def testLog(self): 

178 """Test the Task's logger.""" 

179 addMultTask = AddMultTask() 

180 self.assertEqual(addMultTask.log.name, "addMult") 

181 self.assertEqual(addMultTask.add.log.name, "addMult.add") 

182 

183 log = logging.getLogger("tester") 

184 addMultTask = AddMultTask(log=log) 

185 self.assertEqual(addMultTask.log.name, "tester.addMult") 

186 self.assertEqual(addMultTask.add.log.name, "tester.addMult.add") 

187 

188 addMultTask2 = AddMultTask2() 

189 self.assertEqual(addMultTask2.log.name, f"{__name__}.addMult") 

190 

191 def testGetFullMetadata(self): 

192 """Test getFullMetadata().""" 

193 addMultTask = AddMultTask() 

194 addMultTask.run(val=1.234) # Add some metadata 

195 fullMetadata = addMultTask.getFullMetadata() 

196 self.assertIsInstance(fullMetadata["addMult"], _TASK_METADATA_TYPE) 

197 self.assertIsInstance(fullMetadata["addMult:add"], _TASK_METADATA_TYPE) 

198 self.assertIsInstance(fullMetadata["addMult:mult"], _TASK_METADATA_TYPE) 

199 self.assertEqual(set(fullMetadata), {"addMult", "addMult:add", "addMult:mult"}) 

200 

201 all_names = fullMetadata.names() 

202 self.assertIn("addMult", all_names) 

203 self.assertIn("addMult.runStartUtc", all_names) 

204 

205 param_names = fullMetadata.paramNames(topLevelOnly=True) 

206 # No top level keys without hierarchy 

207 self.assertEqual(set(param_names), set()) 

208 

209 param_names = fullMetadata.paramNames(topLevelOnly=False) 

210 self.assertNotIn("addMult", param_names) 

211 self.assertIn("addMult.runStartUtc", param_names) 

212 self.assertIn("addMult:add.runStartCpuTime", param_names) 

213 

214 def testEmptyMetadata(self): 

215 task = AddMultTask() 

216 task.run(val=1.2345) 

217 task.emptyMetadata() 

218 fullMetadata = task.getFullMetadata() 

219 self.assertEqual(len(fullMetadata["addMult"]), 0) 

220 self.assertEqual(len(fullMetadata["addMult:add"]), 0) 

221 self.assertEqual(len(fullMetadata["addMult:mult"]), 0) 

222 

223 def testReplace(self): 

224 """Test replacing one subtask with another.""" 

225 for addend in (1.1, -3.5): 

226 for multiplicand in (0.9, -45.0): 

227 config = AddMultTask.ConfigClass() 

228 config.add.retarget(AddTwiceTask) 

229 config.add.addend = addend 

230 config.mult["stdMult"].multiplicand = multiplicand 

231 addMultTask = AddMultTask(config=config) 

232 for val in (-1.0, 0.0, 17.5): 

233 ret = addMultTask.run(val=val) 

234 self.assertAlmostEqual(ret.val, (val + (2 * addend)) * multiplicand) 

235 

236 def testFail(self): 

237 """Test timers when the code they are timing fails.""" 

238 addMultTask = AddMultTask() 

239 try: 

240 addMultTask.failDec() 

241 self.fail("Expected RuntimeError") 

242 except RuntimeError: 

243 self.assertIn("failDecEndCpuTime", addMultTask.metadata) 

244 try: 

245 addMultTask.failCtx() 

246 self.fail("Expected RuntimeError") 

247 except RuntimeError: 

248 self.assertIn("failCtxEndCpuTime", addMultTask.metadata) 

249 

250 def testTimeMethod(self): 

251 """Test that the timer is adding the right metadata.""" 

252 addMultTask = AddMultTask() 

253 

254 # Run twice to ensure we are additive. 

255 addMultTask.run(val=1.1) 

256 addMultTask.run(val=2.0) 

257 # Check existence and type 

258 for key, keyType in ( 

259 ("Utc", str), 

260 ("CpuTime", float), 

261 ("UserTime", float), 

262 ("SystemTime", float), 

263 ("MaxResidentSetSize", numbers.Integral), 

264 ("MinorPageFaults", numbers.Integral), 

265 ("MajorPageFaults", numbers.Integral), 

266 ("BlockInputs", numbers.Integral), 

267 ("BlockOutputs", numbers.Integral), 

268 ("VoluntaryContextSwitches", numbers.Integral), 

269 ("InvoluntaryContextSwitches", numbers.Integral), 

270 ): 

271 for when in ("Start", "End"): 

272 for method in ("run", "context"): 

273 name = method + when + key 

274 self.assertIn(name, addMultTask.metadata, name + " is missing from task metadata") 

275 self.assertIsInstance( 

276 addMultTask.metadata.getScalar(name), 

277 keyType, 

278 f"{name} is not of the right type " 

279 f"({keyType} vs {type(addMultTask.metadata.getScalar(name))})", 

280 ) 

281 # Some basic sanity checks 

282 currCpuTime = time.process_time() 

283 self.assertLessEqual( 

284 addMultTask.metadata.getScalar("runStartCpuTime"), 

285 addMultTask.metadata.getScalar("runEndCpuTime"), 

286 ) 

287 self.assertLessEqual(addMultTask.metadata.getScalar("runEndCpuTime"), currCpuTime) 

288 self.assertLessEqual( 

289 addMultTask.metadata.getScalar("contextStartCpuTime"), 

290 addMultTask.metadata.getScalar("contextEndCpuTime"), 

291 ) 

292 self.assertLessEqual(addMultTask.metadata.getScalar("contextEndCpuTime"), currCpuTime) 

293 self.assertLessEqual( 

294 addMultTask.add.metadata.getScalar("runStartCpuTime"), 

295 addMultTask.metadata.getScalar("runEndCpuTime"), 

296 ) 

297 self.assertLessEqual(addMultTask.add.metadata.getScalar("runEndCpuTime"), currCpuTime) 

298 

299 # Add some explicit values for serialization test. 

300 addMultTask.metadata["comment"] = "A comment" 

301 addMultTask.metadata["integer"] = 5 

302 addMultTask.metadata["float"] = 3.14 

303 addMultTask.metadata["bool"] = False 

304 addMultTask.metadata.add("commentList", "comment1") 

305 addMultTask.metadata.add("commentList", "comment1") 

306 addMultTask.metadata.add("intList", 6) 

307 addMultTask.metadata.add("intList", 7) 

308 addMultTask.metadata.add("boolList", False) 

309 addMultTask.metadata.add("boolList", True) 

310 addMultTask.metadata.add("floatList", 6.6) 

311 addMultTask.metadata.add("floatList", 7.8) 

312 

313 # TaskMetadata can serialize to JSON but not YAML 

314 # and PropertySet can serialize to YAML and not JSON. 

315 if hasattr(addMultTask.metadata, "json"): 

316 j = addMultTask.metadata.model_dump_json() 

317 new_meta = pipeBase.TaskMetadata.model_validate(json.loads(j)) 

318 else: 

319 y = yaml.dump(addMultTask.metadata) 

320 new_meta = yaml.safe_load(y) 

321 self.assertEqual(new_meta, addMultTask.metadata) 

322 

323 def test_annotate_exception(self): 

324 """Test annotating failures in the task metadata when a non-Task 

325 exception is raised (when there is no `metadata` on the exception). 

326 """ 

327 task = AddMultTask() 

328 msg = "something failed!" 

329 error = ValueError(msg) 

330 with self.assertLogs("addMult", level="DEBUG") as cm: 

331 pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) 

332 self.assertIn(msg, "\n".join(cm.output)) 

333 self.assertEqual(task.metadata["failure"]["message"], msg) 

334 self.assertEqual(task.metadata["failure"]["type"], "ValueError") 

335 self.assertNotIn("metadata", task.metadata["failure"]) 

336 

337 def test_annotate_task_exception(self): 

338 """Test annotating failures in the task metadata when a Task-specific 

339 exception is raised. 

340 """ 

341 

342 class TestError(pipeBase.AlgorithmError): 

343 @property 

344 def metadata(self): 

345 return {"something": 12345} 

346 

347 task = AddMultTask() 

348 msg = "something failed!" 

349 error = TestError(msg) 

350 with self.assertLogs("addMult", level="DEBUG") as cm: 

351 pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) 

352 self.assertIn(msg, "\n".join(cm.output)) 

353 self.assertEqual(task.metadata["failure"]["message"], msg) 

354 result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError" 

355 self.assertEqual(task.metadata["failure"]["type"], result) 

356 self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345) 

357 

358 def test_AlgorithmError(self): 

359 """Test that AlgorithmError checks for abstractness; 

360 see https://github.com/python/cpython/issues/50246 

361 """ 

362 

363 class StillAbstractError(pipeBase.AlgorithmError): 

364 pass 

365 

366 with self.assertRaisesRegex(TypeError, "with abstract methods: metadata"): 

367 StillAbstractError() 

368 

369 

370class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

371 """Run file leak tests.""" 

372 

373 

374def setup_module(module): 

375 """Configure pytest.""" 

376 lsst.utils.tests.init() 

377 

378 

379if __name__ == "__main__": 

380 lsst.utils.tests.init() 

381 unittest.main()