Coverage for python / lsst / daf / butler / script / exportCalibs.py: 18%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29import logging 

30import os 

31from collections.abc import Iterable 

32from operator import attrgetter 

33from typing import TYPE_CHECKING 

34 

35from astropy.table import Table 

36 

37from .._butler import Butler 

38from .._butler_collections import CollectionInfo 

39from .._collection_type import CollectionType 

40 

41if TYPE_CHECKING: 

42 from lsst.daf.butler import DatasetRef, DatasetType 

43 

44log = logging.getLogger(__name__) 

45 

46 

47def find_calibration_datasets( 

48 butler: Butler, collection: CollectionInfo, datasetTypes: Iterable[DatasetType] 

49) -> list[DatasetRef]: 

50 """Search a calibration collection for calibration datasets. 

51 

52 Parameters 

53 ---------- 

54 butler : `lsst.daf.butler.Butler` 

55 Butler to use. 

56 collection : `CollectionInfo` 

57 Collection to search. This should be a CALIBRATION 

58 collection. 

59 datasetTypes : `list` [`lsst.daf.Butler.DatasetType`] 

60 List of calibration dataset types. 

61 

62 Returns 

63 ------- 

64 exportDatasets : `list` [`lsst.daf.butler.DatasetRef`] 

65 Datasets to save on export. 

66 

67 Raises 

68 ------ 

69 RuntimeError 

70 Raised if the collection to search is not a CALIBRATION collection. 

71 """ 

72 if collection.type != CollectionType.CALIBRATION: 

73 raise RuntimeError(f"Collection {collection.name} is not a CALIBRATION collection.") 

74 

75 exportDatasets = [] 

76 for calibType in datasetTypes: 

77 with butler.query() as query: 

78 results = query.datasets(calibType, collections=collection.name, find_first=False) 

79 

80 try: 

81 refs = list(results.with_dimension_records()) 

82 except Exception as e: 

83 e.add_note(f"Error from querying dataset type {calibType} and collection {collection.name}") 

84 raise 

85 exportDatasets.extend(refs) 

86 

87 return exportDatasets 

88 

89 

90def exportCalibs( 

91 repo: str, directory: str, collections: Iterable[str], dataset_type: Iterable[str], transfer: str 

92) -> Table: 

93 """Certify a set of calibrations with a validity range. 

94 

95 Parameters 

96 ---------- 

97 repo : `str` 

98 URI to the location of the repo or URI to a config file 

99 describing the repo and its location. 

100 directory : `str` 

101 URI string of the directory to write the exported 

102 calibrations. 

103 collections : `list` [`str`] 

104 Data collections to pull calibrations from. Must be an 

105 existing `~CollectionType.CHAINED` or 

106 `~CollectionType.CALIBRATION` collection. 

107 dataset_type : `tuple` [`str`] 

108 The dataset types to export. Default is to export all. 

109 transfer : `str` 

110 The transfer mode to use for exporting. 

111 

112 Returns 

113 ------- 

114 datasetTable : `astropy.table.Table` 

115 A table containing relevant information about the calibrations 

116 exported. 

117 

118 Raises 

119 ------ 

120 RuntimeError 

121 Raised if the output directory already exists. 

122 """ 

123 with Butler.from_config(repo, writeable=False) as butler: 

124 dataset_type_query = dataset_type or ... 

125 collections_query = collections or "*" 

126 

127 calibTypes = [ 

128 datasetType 

129 for datasetType in butler.registry.queryDatasetTypes(dataset_type_query) 

130 if datasetType.isCalibration() 

131 ] 

132 

133 collectionsToExport = [] 

134 datasetsToExport = [] 

135 

136 for collection in butler.collections.query_info( 

137 collections_query, 

138 flatten_chains=True, 

139 include_chains=True, 

140 include_doc=True, 

141 collection_types={CollectionType.CALIBRATION, CollectionType.CHAINED}, 

142 ): 

143 log.info("Checking collection: %s", collection.name) 

144 

145 # Get collection information. 

146 collectionsToExport.append(collection.name) 

147 if collection.type == CollectionType.CALIBRATION: 

148 exportDatasets = find_calibration_datasets(butler, collection, calibTypes) 

149 datasetsToExport.extend(exportDatasets) 

150 

151 if os.path.exists(directory): 

152 raise RuntimeError(f"Export directory exists: {directory}") 

153 os.makedirs(directory) 

154 with butler.export(directory=directory, format="yaml", transfer=transfer) as export: 

155 collectionsToExport = list(set(collectionsToExport)) 

156 datasetsToExport = list(set(datasetsToExport)) 

157 

158 for exportable in collectionsToExport: 

159 try: 

160 export.saveCollection(exportable) 

161 except Exception as e: 

162 log.warning("Did not save collection %s due to %s.", exportable, e) 

163 

164 log.info("Saving %d dataset(s)", len(datasetsToExport)) 

165 export.saveDatasets(datasetsToExport) 

166 

167 sortedDatasets = sorted(datasetsToExport, key=attrgetter("datasetType.name", "dataId")) 

168 

169 requiredDimensions: set[str] = set() 

170 for ref in sortedDatasets: 

171 requiredDimensions.update(ref.dimensions.names) 

172 dimensionColumns = { 

173 dimensionName: [ref.dataId.get(dimensionName, "") for ref in sortedDatasets] 

174 for dimensionName in sorted(requiredDimensions) 

175 } 

176 

177 return Table( 

178 { 

179 "calibrationType": [ref.datasetType.name for ref in sortedDatasets], 

180 "run": [ref.run for ref in sortedDatasets], 

181 **dimensionColumns, 

182 } 

183 )