Coverage for python / lsst / ap / pipe / metrics.py: 47%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:56 +0000

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21# 

22 

23"""Metrics for ap_pipe tasks. 

24""" 

25 

26__all__ = [ 

27 "PipelineTimingMetricTask", "PipelineTimingMetricConfig", 

28] 

29 

30import astropy.units as u 

31from datetime import datetime 

32 

33import lsst.pex.config as pexConfig 

34from lsst.pipe.base import Struct, NoWorkFound 

35import lsst.pipe.base.connectionTypes as connTypes 

36from lsst.verify import Measurement, Datum 

37from lsst.verify.tasks import AbstractMetadataMetricTask, MetricTask, MetricComputationError 

38 

39 

40class PipelineTimingMetricConnections( 

41 MetricTask.ConfigClass.ConnectionsClass, 

42 dimensions={"instrument", "visit", "detector"}, 

43 defaultTemplates={"labelStart": "", 

44 "labelEnd": "", 

45 "package": "ap_pipe", 

46 "metric": "ApPipelineTime"}): 

47 metadataStart = connTypes.Input( 

48 name="{labelStart}_metadata", 

49 doc="The starting task's metadata.", 

50 storageClass="TaskMetadata", 

51 dimensions={"instrument", "exposure", "detector"}, 

52 multiple=False, 

53 ) 

54 metadataEnd = connTypes.Input( 

55 name="{labelEnd}_metadata", 

56 doc="The final task's metadata.", 

57 storageClass="TaskMetadata", 

58 dimensions={"instrument", "visit", "detector"}, 

59 multiple=False, 

60 ) 

61 

62 

63class PipelineTimingMetricConfig(MetricTask.ConfigClass, pipelineConnections=PipelineTimingMetricConnections): 

64 # Don't include the dimensions hack that MetadataMetricConfig has; unlike 

65 # TimingMetricTask, this task is not designed to be run on multiple 

66 # pipelines. 

67 targetStart = pexConfig.Field( 

68 dtype=str, 

69 doc="The method to take as the starting point of the starting task, " 

70 "optionally prefixed by one or more task names in the format of " 

71 "`lsst.pipe.base.Task.getFullMetadata()`.") 

72 targetEnd = pexConfig.Field( 

73 dtype=str, 

74 doc="The method to take as the stopping point of the final task, " 

75 "optionally prefixed by one or more task names in the format of " 

76 "`lsst.pipe.base.Task.getFullMetadata()`.") 

77 

78 

79class PipelineTimingMetricTask(AbstractMetadataMetricTask): 

80 """A Task that computes a wall-clock time for an entire pipeline, using 

81 metadata produced by the `lsst.utils.timer.timeMethod` decorator. 

82 

83 Parameters 

84 ---------- 

85 args 

86 kwargs 

87 Constructor parameters are the same as for 

88 `lsst.verify.tasks.MetricTask`. 

89 """ 

90 

91 _DefaultName = "pipelineTimingMetric" 

92 ConfigClass = PipelineTimingMetricConfig 

93 

94 @classmethod 

95 def getInputMetadataKeys(cls, config): 

96 """Get search strings for the metadata. 

97 

98 Parameters 

99 ---------- 

100 config : ``cls.ConfigClass`` 

101 Configuration for this task. 

102 

103 Returns 

104 ------- 

105 keys : `dict` 

106 A dictionary of keys, optionally prefixed by one or more tasks in 

107 the format of `lsst.pipe.base.Task.getFullMetadata()`. 

108 

109 ``"StartTimestamp"`` 

110 The key for an ISO 8601-compliant text string where the target 

111 pipeline started (`str`). 

112 ``"EndTimestamp"`` 

113 The key for an ISO 8601-compliant text string where the target 

114 pipeline ended (`str`). 

115 """ 

116 return {"StartTimestamp": config.targetStart + "StartUtc", 

117 "EndTimestamp": config.targetEnd + "EndUtc", 

118 } 

119 

120 def run(self, metadataStart, metadataEnd): 

121 """Compute the pipeline wall-clock time from science task metadata. 

122 

123 Parameters 

124 ---------- 

125 metadataStart : `lsst.pipe.base.TaskMetadata` 

126 A metadata object for the first quantum run by the pipeline. 

127 metadataEnd : `lsst.pipe.base.TaskMetadata` 

128 A metadata object for the last quantum run by the pipeline. 

129 

130 Returns 

131 ------- 

132 result : `lsst.pipe.base.Struct` 

133 A `~lsst.pipe.base.Struct` containing the following component: 

134 

135 - ``measurement``: the value of the metric 

136 (`lsst.verify.Measurement` or `None`) 

137 

138 Raises 

139 ------ 

140 lsst.verify.tasks.MetricComputationError 

141 Raised if the strings returned by `getInputMetadataKeys` match 

142 more than one key in either metadata object. 

143 lsst.pipe.base.NoWorkFound 

144 Raised if the metric is ill-defined. Typically this means that at 

145 least one pipeline step was not run. 

146 """ 

147 metadataKeys = self.getInputMetadataKeys(self.config) 

148 timingsStart = self.extractMetadata(metadataStart, metadataKeys) 

149 timingsEnd = self.extractMetadata(metadataEnd, metadataKeys) 

150 

151 if timingsStart["StartTimestamp"] is None: 

152 raise NoWorkFound(f"Nothing to do: no timing information for {self.config.targetStart} found.") 

153 if timingsEnd["EndTimestamp"] is None: 

154 raise NoWorkFound(f"Nothing to do: no timing information for {self.config.targetEnd} found.") 

155 

156 try: 

157 startTime = datetime.fromisoformat(timingsStart["StartTimestamp"]) 

158 endTime = datetime.fromisoformat(timingsEnd["EndTimestamp"]) 

159 except (TypeError, ValueError) as e: 

160 raise MetricComputationError("Invalid metadata") from e 

161 else: 

162 totalTime = (endTime - startTime).total_seconds() 

163 meas = Measurement(self.config.metricName, totalTime * u.second) 

164 meas.notes["estimator"] = "utils.timer.timeMethod" 

165 meas.extras["start"] = Datum(timingsStart["StartTimestamp"]) 

166 meas.extras["end"] = Datum(timingsEnd["EndTimestamp"]) 

167 return Struct(measurement=meas)