Coverage for python / lsst / analysis / tools / tasks / metadataAnalysis.py: 20%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:32 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ( 

24 "MetadataAnalysisConfig", 

25 "DatasetMetadataAnalysisTask", 

26 "TaskMetadataAnalysisTask", 

27) 

28 

29from lsst.pex.config import Field, ListField 

30from lsst.pipe.base import UpstreamFailureNoWorkFound, connectionTypes 

31 

32from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask 

33 

34 

35class MetadataAnalysisConnections( 

36 AnalysisBaseConnections, 

37 dimensions={}, 

38 defaultTemplates={"inputName": "", "outputName": "", "datasetType": "", "storageClass": ""}, 

39): 

40 def __init__(self, *, config=None): 

41 """Customize the connections for a specific task's or dataset's 

42 metadata. 

43 

44 Parameters 

45 ---------- 

46 config : `MetadataAnalysisConfig` 

47 A config for analyzing task or dataset metadata with this 

48 connection. 

49 """ 

50 # The following must come before super().__init__ so the input 

51 # dimensions are propagated into the output metric bundle by 

52 # the base class __init__ 

53 self.dimensions = frozenset(config.inputDimensions) 

54 super().__init__(config=config) 

55 

56 self.data = connectionTypes.Input( 

57 doc="Input dataset to extract metadata from.", 

58 name=config.connections.inputName, 

59 storageClass=config.connections.storageClass, 

60 deferLoad=True, 

61 dimensions=self.dimensions, 

62 ) 

63 

64 

65class MetadataAnalysisConfig( 

66 AnalysisBaseConfig, 

67 pipelineConnections=MetadataAnalysisConnections, 

68): 

69 inputDimensions = ListField( 

70 # Sort to ensure default order is consistent between runs 

71 default=sorted(MetadataAnalysisConnections.dimensions), 

72 dtype=str, 

73 doc="The dimensions of the input dataset.", 

74 ) 

75 raiseNoWorkFoundOnEmptyMetadata = Field( 

76 dtype=bool, 

77 default=False, 

78 doc="Raise a NoWorkFound error if none of the configured metrics are in the task metadata.", 

79 ) 

80 raiseNoWorkFoundOnIncompleteMetadata = Field( 

81 dtype=bool, 

82 default=False, 

83 doc="Raise NoWorkFound if any of the configured metrics are not in the task metadata.", 

84 ) 

85 

86 def validate(self): 

87 super().validate() 

88 if self.raiseNoWorkFoundOnEmptyMetadata and self.raiseNoWorkFoundOnIncompleteMetadata: 

89 raise ValueError("At most one 'raiseNoWorkFound' option may be True at a time.") 

90 

91 

92class DatasetMetadataAnalysisConfig(MetadataAnalysisConfig): 

93 

94 def setDefaults(self): 

95 super().setDefaults() 

96 self.raiseNoWorkFoundOnIncompleteMetadata = True 

97 

98 

99class DatasetMetadataAnalysisTask(AnalysisPipelineTask): 

100 ConfigClass = DatasetMetadataAnalysisConfig 

101 _DefaultName = "datasetMetadataAnalysis" 

102 

103 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

104 dataId = butlerQC.quantum.dataId 

105 inputs = butlerQC.get(inputRefs) 

106 plotInfo = self.parsePlotInfo(inputs, dataId) 

107 data = inputs["data"].get() 

108 

109 # Collect all the metrics that are configured to run. 

110 metadata = {} 

111 for name in self.config.atools.fieldNames: 

112 atool = getattr(self.config.atools, name) 

113 if hasattr(atool, "metrics"): 

114 for metric in atool.metrics: 

115 # Check if the metric uses base key prefixes. 

116 if atool.metricsPrefixedWithBaseKeys.get(metric): 

117 # Find all keys prefixed with the base key and collect 

118 # them in a dictionary. 

119 metadata[metric] = {k: v for k, v in data.metadata.items() if k.startswith(metric)} 

120 else: 

121 # Retrieve the metric directly if it's not prefixed. 

122 metadata[metric] = data.metadata.get(metric) 

123 # Check if the retrieved metadata is empty. 

124 if not metadata[metric] and self.config.raiseNoWorkFoundOnIncompleteMetadata: 

125 raise UpstreamFailureNoWorkFound( 

126 f"Metadata entry '{metric}' is empty for {inputRefs.data.datasetType.name}, " 

127 f"or it is not one of {data.metadata.getOrderedNames()}." 

128 ) 

129 if self.config.raiseNoWorkFoundOnEmptyMetadata and not any(v for v in metadata.values()): 

130 raise UpstreamFailureNoWorkFound( 

131 "All configured metadata entries are missing from " 

132 f"{inputRefs.data.datasetType.name}." 

133 ) 

134 

135 outputs = self.run(data={"metadata_metrics": metadata}, plotInfo=plotInfo) 

136 butlerQC.put(outputs, outputRefs) 

137 

138 

139class TaskMetadataAnalysisTask(AnalysisPipelineTask): 

140 ConfigClass = MetadataAnalysisConfig 

141 _DefaultName = "taskMetadataAnalysis" 

142 

143 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

144 dataId = butlerQC.quantum.dataId 

145 inputs = butlerQC.get(inputRefs) 

146 plotInfo = self.parsePlotInfo(inputs, dataId) 

147 metadata = inputs["data"].get().to_dict() 

148 taskName = inputRefs.data.datasetType.name 

149 taskName = taskName[: taskName.find("_")] 

150 if not metadata: 

151 raise UpstreamFailureNoWorkFound(f"No metadata entries for {taskName}.") 

152 if self.config.raiseNoWorkFoundOnEmptyMetadata or self.config.raiseNoWorkFoundOnIncompleteMetadata: 

153 self.validateMetrics(metadata, taskName) 

154 outputs = self.run(data=metadata, plotInfo=plotInfo) 

155 butlerQC.put(outputs, outputRefs) 

156 

157 def validateMetrics(self, metadata, taskName): 

158 """Raise NoWorkFound if there are insufficent metrics in the task 

159 metadata. 

160 

161 Parameters 

162 ---------- 

163 metadata : `dict` 

164 The task metadata converted to a dict. 

165 taskName : `str` 

166 The name of the task to extract metadata from 

167 

168 Raises 

169 ------ 

170 NoWorkFound 

171 If none of the metrics are in the metadata. 

172 """ 

173 for fieldName in self.config.atools.fieldNames: 

174 atool = getattr(self.config.atools, fieldName) 

175 subTaskNames = getattr(atool, "subTaskNames", None) or {} 

176 for key in atool.metrics.keys(): 

177 if key in metadata[taskName].keys(): 

178 if self.config.raiseNoWorkFoundOnEmptyMetadata: 

179 return 

180 elif subtaskName := subTaskNames.get(key): 

181 if f"{taskName}:{subtaskName}" not in metadata: 

182 raise UpstreamFailureNoWorkFound( 

183 f"Subtask {subtaskName!r} was not found in {taskName} metadata" 

184 ) 

185 if key in metadata[f"{taskName}:{subtaskName}"]: 

186 if self.config.raiseNoWorkFoundOnEmptyMetadata: 

187 return 

188 else: 

189 if self.config.raiseNoWorkFoundOnIncompleteMetadata: 

190 raise UpstreamFailureNoWorkFound( 

191 f"Metric {key!r} was not found in {taskName} metadata" 

192 ) 

193 if self.config.raiseNoWorkFoundOnEmptyMetadata: 

194 raise UpstreamFailureNoWorkFound( 

195 f"None of the specified metrics were found in the {taskName} metadata" 

196 )