Coverage for tests / test_metrics.py: 34%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:58 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21#
23import astropy.units as u
24import time
25import unittest
27import lsst.pex.config
28from lsst.pipe.base import testUtils
29from lsst.utils.timer import timeMethod
30import lsst.utils.tests
31from lsst.verify import Name
32import lsst.verify.tasks
33from lsst.verify.tasks.testUtils import MetricTaskTestCase
35from lsst.ap.pipe.metrics import PipelineTimingMetricTask
38class DummyTask(lsst.pipe.base.Task):
39 ConfigClass = lsst.pex.config.Config
40 _DefaultName = "NotARealTask"
41 taskLength = 0.1
43 @timeMethod
44 def run(self):
45 time.sleep(self.taskLength)
48# Can't test against MetadataMetricTestCase, because this class is not a MetadataMetricTask
49class TestPipelineTimingMetricTask(MetricTaskTestCase):
50 @staticmethod
51 def _makeConfig(nameStart=DummyTask._DefaultName, nameEnd=DummyTask._DefaultName):
52 config = PipelineTimingMetricTask.ConfigClass()
53 config.connections.labelStart = nameStart
54 config.connections.labelEnd = nameEnd
55 config.targetStart = nameStart + ".run"
56 config.targetEnd = nameEnd + ".run"
57 config.connections.package = "ap_pipe"
58 config.connections.metric = "DummyTime"
59 return config
61 @classmethod
62 def makeTask(cls):
63 return PipelineTimingMetricTask(config=cls._makeConfig(nameStart="first", nameEnd="last"))
65 def setUp(self):
66 super().setUp()
67 self.metric = Name("ap_pipe.DummyTime")
69 self.startTask = DummyTask(name="first")
70 self.startTask.run()
71 self.endTask = DummyTask(name="last")
72 self.endTask.run()
74 def testSingleTask(self):
75 task = PipelineTimingMetricTask(config=self._makeConfig(nameStart="first", nameEnd="first"))
77 altConfig = lsst.verify.tasks.TimingMetricConfig()
78 altConfig.connections.labelName = "first"
79 altConfig.target = "first.run"
80 altConfig.connections.package = "verify"
81 altConfig.connections.metric = "DummyTime"
82 altTask = lsst.verify.tasks.TimingMetricTask(config=altConfig)
84 result = task.run(self.startTask.getFullMetadata(), self.startTask.getFullMetadata())
85 oracle = altTask.run(self.startTask.getFullMetadata())
87 self.assertEqual(result.measurement.metric_name, self.metric)
88 self.assertAlmostEqual(result.measurement.quantity.to_value(u.s),
89 oracle.measurement.quantity.to_value(u.s))
91 def testTwoTasks(self):
92 firstTask = PipelineTimingMetricTask(config=self._makeConfig(nameStart="first", nameEnd="first"))
93 secondTask = PipelineTimingMetricTask(config=self._makeConfig(nameStart="last", nameEnd="last"))
95 result = self.task.run(self.startTask.getFullMetadata(), self.endTask.getFullMetadata())
96 firstResult = firstTask.run(self.startTask.getFullMetadata(), self.startTask.getFullMetadata())
97 secondResult = secondTask.run(self.endTask.getFullMetadata(), self.endTask.getFullMetadata())
99 self.assertEqual(result.measurement.metric_name, self.metric)
100 self.assertGreater(result.measurement.quantity, 0.0 * u.s)
101 self.assertGreaterEqual(result.measurement.quantity,
102 firstResult.measurement.quantity + secondResult.measurement.quantity)
104 def testRunDifferentMethodFirst(self):
105 config = self._makeConfig(nameStart="first", nameEnd="last")
106 config.targetStart = "first.doProcess"
107 task = PipelineTimingMetricTask(config=config)
108 try:
109 result = task.run(self.startTask.getFullMetadata(), self.endTask.getFullMetadata())
110 except lsst.pipe.base.NoWorkFound:
111 # Correct behavior for MetricTask
112 pass
113 else:
114 # Alternative correct behavior for MetricTask
115 testUtils.assertValidOutput(task, result)
116 meas = result.measurement
117 self.assertIsNone(meas)
119 def testRunDifferentMethodLast(self):
120 config = self._makeConfig(nameStart="first", nameEnd="last")
121 config.targetStart = "last.doProcess"
122 task = PipelineTimingMetricTask(config=config)
123 try:
124 result = task.run(self.startTask.getFullMetadata(), self.endTask.getFullMetadata())
125 except lsst.pipe.base.NoWorkFound:
126 # Correct behavior for MetricTask
127 pass
128 else:
129 # Alternative correct behavior for MetricTask
130 testUtils.assertValidOutput(task, result)
131 meas = result.measurement
132 self.assertIsNone(meas)
134 def testBadlyTypedKeys(self):
135 metadata = self.endTask.getFullMetadata()
136 endKeys = [key
137 for key in metadata.paramNames(topLevelOnly=False)
138 if "EndUtc" in key]
139 for key in endKeys:
140 metadata[key] = 42
142 with self.assertRaises(lsst.verify.tasks.MetricComputationError):
143 self.task.run(self.startTask.getFullMetadata(), metadata)
146# Hack around unittest's hacky test setup system
147del MetricTaskTestCase
150class MemoryTester(lsst.utils.tests.MemoryTestCase):
151 pass
154def setup_module(module):
155 lsst.utils.tests.init()
158if __name__ == "__main__": 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 lsst.utils.tests.init()
160 unittest.main()