Coverage for python / lsst / analysis / tools / tasks / metadataAnalysis.py: 20%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:08 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = (
24 "MetadataAnalysisConfig",
25 "DatasetMetadataAnalysisTask",
26 "TaskMetadataAnalysisTask",
27)
29from lsst.pex.config import Field, ListField
30from lsst.pipe.base import UpstreamFailureNoWorkFound, connectionTypes
32from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask
35class MetadataAnalysisConnections(
36 AnalysisBaseConnections,
37 dimensions={},
38 defaultTemplates={"inputName": "", "outputName": "", "datasetType": "", "storageClass": ""},
39):
40 def __init__(self, *, config=None):
41 """Customize the connections for a specific task's or dataset's
42 metadata.
44 Parameters
45 ----------
46 config : `MetadataAnalysisConfig`
47 A config for analyzing task or dataset metadata with this
48 connection.
49 """
50 # The following must come before super().__init__ so the input
51 # dimensions are propagated into the output metric bundle by
52 # the base class __init__
53 self.dimensions = frozenset(config.inputDimensions)
54 super().__init__(config=config)
56 self.data = connectionTypes.Input(
57 doc="Input dataset to extract metadata from.",
58 name=config.connections.inputName,
59 storageClass=config.connections.storageClass,
60 deferLoad=True,
61 dimensions=self.dimensions,
62 )
65class MetadataAnalysisConfig(
66 AnalysisBaseConfig,
67 pipelineConnections=MetadataAnalysisConnections,
68):
69 inputDimensions = ListField(
70 # Sort to ensure default order is consistent between runs
71 default=sorted(MetadataAnalysisConnections.dimensions),
72 dtype=str,
73 doc="The dimensions of the input dataset.",
74 )
75 raiseNoWorkFoundOnEmptyMetadata = Field(
76 dtype=bool,
77 default=False,
78 doc="Raise a NoWorkFound error if none of the configured metrics are in the task metadata.",
79 )
80 raiseNoWorkFoundOnIncompleteMetadata = Field(
81 dtype=bool,
82 default=False,
83 doc="Raise NoWorkFound if any of the configured metrics are not in the task metadata.",
84 )
86 def validate(self):
87 super().validate()
88 if self.raiseNoWorkFoundOnEmptyMetadata and self.raiseNoWorkFoundOnIncompleteMetadata:
89 raise ValueError("At most one 'raiseNoWorkFound' option may be True at a time.")
92class DatasetMetadataAnalysisConfig(MetadataAnalysisConfig):
94 def setDefaults(self):
95 super().setDefaults()
96 self.raiseNoWorkFoundOnIncompleteMetadata = True
99class DatasetMetadataAnalysisTask(AnalysisPipelineTask):
100 ConfigClass = DatasetMetadataAnalysisConfig
101 _DefaultName = "datasetMetadataAnalysis"
103 def runQuantum(self, butlerQC, inputRefs, outputRefs):
104 dataId = butlerQC.quantum.dataId
105 inputs = butlerQC.get(inputRefs)
106 plotInfo = self.parsePlotInfo(inputs, dataId)
107 data = inputs["data"].get()
109 # Collect all the metrics that are configured to run.
110 metadata = {}
111 for name in self.config.atools.fieldNames:
112 atool = getattr(self.config.atools, name)
113 if hasattr(atool, "metrics"):
114 for metric in atool.metrics:
115 # Check if the metric uses base key prefixes.
116 if atool.metricsPrefixedWithBaseKeys.get(metric):
117 # Find all keys prefixed with the base key and collect
118 # them in a dictionary.
119 metadata[metric] = {k: v for k, v in data.metadata.items() if k.startswith(metric)}
120 else:
121 # Retrieve the metric directly if it's not prefixed.
122 metadata[metric] = data.metadata.get(metric)
123 # Check if the retrieved metadata is empty.
124 if not metadata[metric] and self.config.raiseNoWorkFoundOnIncompleteMetadata:
125 raise UpstreamFailureNoWorkFound(
126 f"Metadata entry '{metric}' is empty for {inputRefs.data.datasetType.name}, "
127 f"or it is not one of {data.metadata.getOrderedNames()}."
128 )
129 if self.config.raiseNoWorkFoundOnEmptyMetadata and not any(v for v in metadata.values()):
130 raise UpstreamFailureNoWorkFound(
131 "All configured metadata entries are missing from "
132 f"{inputRefs.data.datasetType.name}."
133 )
135 outputs = self.run(data={"metadata_metrics": metadata}, plotInfo=plotInfo)
136 butlerQC.put(outputs, outputRefs)
139class TaskMetadataAnalysisTask(AnalysisPipelineTask):
140 ConfigClass = MetadataAnalysisConfig
141 _DefaultName = "taskMetadataAnalysis"
143 def runQuantum(self, butlerQC, inputRefs, outputRefs):
144 dataId = butlerQC.quantum.dataId
145 inputs = butlerQC.get(inputRefs)
146 plotInfo = self.parsePlotInfo(inputs, dataId)
147 metadata = inputs["data"].get().to_dict()
148 taskName = inputRefs.data.datasetType.name
149 taskName = taskName[: taskName.find("_")]
150 if not metadata:
151 raise UpstreamFailureNoWorkFound(f"No metadata entries for {taskName}.")
152 if self.config.raiseNoWorkFoundOnEmptyMetadata or self.config.raiseNoWorkFoundOnIncompleteMetadata:
153 self.validateMetrics(metadata, taskName)
154 outputs = self.run(data=metadata, plotInfo=plotInfo)
155 butlerQC.put(outputs, outputRefs)
157 def validateMetrics(self, metadata, taskName):
158 """Raise NoWorkFound if there are insufficent metrics in the task
159 metadata.
161 Parameters
162 ----------
163 metadata : `dict`
164 The task metadata converted to a dict.
165 taskName : `str`
166 The name of the task to extract metadata from
168 Raises
169 ------
170 NoWorkFound
171 If none of the metrics are in the metadata.
172 """
173 for fieldName in self.config.atools.fieldNames:
174 atool = getattr(self.config.atools, fieldName)
175 subTaskNames = getattr(atool, "subTaskNames", None) or {}
176 for key in atool.metrics.keys():
177 if key in metadata[taskName].keys():
178 if self.config.raiseNoWorkFoundOnEmptyMetadata:
179 return
180 elif subtaskName := subTaskNames.get(key):
181 if f"{taskName}:{subtaskName}" not in metadata:
182 raise UpstreamFailureNoWorkFound(
183 f"Subtask {subtaskName!r} was not found in {taskName} metadata"
184 )
185 if key in metadata[f"{taskName}:{subtaskName}"]:
186 if self.config.raiseNoWorkFoundOnEmptyMetadata:
187 return
188 else:
189 if self.config.raiseNoWorkFoundOnIncompleteMetadata:
190 raise UpstreamFailureNoWorkFound(
191 f"Metric {key!r} was not found in {taskName} metadata"
192 )
193 if self.config.raiseNoWorkFoundOnEmptyMetadata:
194 raise UpstreamFailureNoWorkFound(
195 f"None of the specified metrics were found in the {taskName} metadata"
196 )