Coverage for python / lsst / analysis / tools / tasks / reconstructor.py: 15%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:55 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ("reconstructAnalysisTools", "getPlotDatasetTypeNames") 

24 

25from collections.abc import Callable, Iterable 

26from typing import TYPE_CHECKING, Any 

27 

28from lsst.pipe.base.connections import PipelineTaskConnections, iterConnections 

29from lsst.pipe.base.connectionTypes import BaseConnection 

30 

31from ..interfaces import AnalysisBaseConfig 

32 

33if TYPE_CHECKING: 

34 from lsst.daf.butler import Butler, DataId 

35 

36 

37def reconstructAnalysisTools( 

38 butler: Butler, 

39 collection: str, 

40 label: str, 

41 dataId: DataId, 

42 callback: Callable[[dict[str, Any], DataId], dict[str, Any]] | None, 

43) -> tuple[AnalysisBaseConfig, dict[str, Any]]: 

44 """Reconstructs the analysis tools used to produce metrics and plots in a 

45 task and all input data required. 

46 

47 Parameters 

48 ---------- 

49 butler : `~lsst.daf.butler.Butler` 

50 The butler where the data is stored. 

51 collection : `str` 

52 Collection within the butler associated with desired data. 

53 label : `str` 

54 The label from the `~lsst.pipe.base.Pipeline` associated with the task 

55 whose tools are to be reconstructed. 

56 dataId : `~lsst.daf.butler.DataId` 

57 Identifier for which data to retrieve. 

58 callback : `~typing.Callable` or None 

59 An optional function which can transform the data after it has been 

60 loaded from the butler. The function must take a dict of strings to 

61 data products, and the DataId. The function must return a dict of 

62 string to data products. The returned dict is what will be returned 

63 by `reconstructAnalysisTools` 

64 

65 Returns 

66 ------- 

67 config : `AnalysisBaseConfig` 

68 The configuration of the task run to produce metrics and plots. This 

69 config contains all the `AnalysisTools` as configured when the task 

70 produced the data. 

71 data : `dict` of `str` to `Any` 

72 The data that went into producing metrics and plots. 

73 """ 

74 configDSType = f"{label}_config" 

75 config = butler.get(configDSType, collections=(collection,)) 

76 

77 connections: PipelineTaskConnections = config.connections.ConnectionsClass(config=config) 

78 inputs: dict[str, Any] = {} 

79 

80 for name in connections.inputs: 

81 connection: BaseConnection = getattr(connections, name) 

82 dsName = connection.name 

83 # If this is a multiple connection, query the butler for all the 

84 # inputs for this dataset type name 

85 if connection.multiple: 

86 container = [] 

87 for ref in set( 

88 butler.registry.queryDatasets( 

89 dsName, dataId=dataId, findFirst=True, collections=(collection,) 

90 ) 

91 ): 

92 container.append(butler.get(ref)) 

93 inputs[name] = container 

94 else: 

95 inputs[name] = butler.get(dsName, dataId=dataId, collections=(collection,)) 

96 

97 if callback is not None: 

98 inputs = callback(inputs, dataId) 

99 

100 return (config, inputs) 

101 

102 

103def getPlotDatasetTypeNames( 

104 butler: Butler, 

105 collections: str | Iterable[str], 

106 label: str | None = None, 

107) -> Iterable[str]: 

108 """Get the dataset type names for plots (anything with StorageClass="Plot") 

109 from butler collections. 

110 

111 Parameters 

112 ---------- 

113 butler : `~lsst.daf.butler.Butler` 

114 The butler where the data is stored. 

115 collections : `str` or `list` [`str`] 

116 Collections within the butler to query for datasets containing plots. 

117 label : `str`, optional 

118 The label from the `~lsst.pipe.base.Pipeline` associated with the task 

119 whose plots are to be queried. If no label is given, all requested 

120 collections will be queried. 

121 

122 Returns 

123 ------- 

124 plotNames : `list` [`str`] 

125 Plot dataset type names. 

126 """ 

127 if label is not None: 

128 configs = [butler.get(f"{label}_config", collections=collections)] 

129 else: 

130 configs = [] 

131 datasetRefs = butler.registry.queryDatasets("*_config", collections=collections) 

132 for datasetRef in datasetRefs: 

133 config = butler.get(datasetRef) 

134 if isinstance(config, AnalysisBaseConfig): 

135 configs.append(config) 

136 plotNames = [] 

137 for config in configs: 

138 connections: PipelineTaskConnections = config.connections.ConnectionsClass(config=config) 

139 for connection in iterConnections(connections, "outputs"): 

140 if connection.storageClass == "Plot": 

141 plotNames.append(connection.name) 

142 return plotNames