Coverage for python / lsst / daf / butler / _query_all_datasets.py: 37%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import dataclasses 

31import logging 

32from collections.abc import Iterator, Mapping, Sequence 

33from typing import TYPE_CHECKING, Any, NamedTuple 

34 

35from lsst.utils.iteration import ensure_iterable 

36 

37from ._dataset_ref import DatasetRef 

38from ._exceptions import InvalidQueryError, MissingDatasetTypeError 

39from .dimensions import DataId, DataIdValue 

40from .queries import Query 

41from .utils import has_globs 

42 

43if TYPE_CHECKING: 

44 from ._butler import Butler 

45 

46 

47_LOG = logging.getLogger(__name__) 

48 

49 

50class DatasetsPage(NamedTuple): 

51 """A single page of results from ``query_all_datasets``.""" 

52 

53 dataset_type: str 

54 data: list[DatasetRef] 

55 

56 

57@dataclasses.dataclass(frozen=True) 

58class QueryAllDatasetsParameters: 

59 """These are the parameters passed to `Butler.query_all_datasets` and have 

60 the same meaning as that function unless noted below. 

61 """ 

62 

63 collections: Sequence[str] 

64 name: Sequence[str] 

65 find_first: bool 

66 data_id: DataId 

67 where: str 

68 bind: Mapping[str, Any] 

69 limit: int | None 

70 """ 

71 Upper limit on the number of returned records. `None` can be used 

72 if no limit is wanted. A limit of ``0`` means that the query will 

73 be executed and validated but no results will be returned. 

74 

75 (This cannot be negative, contrary to the `Butler.query_all_datasets` 

76 equivalent.) 

77 """ 

78 with_dimension_records: bool 

79 kwargs: dict[str, DataIdValue] = dataclasses.field(default_factory=dict) 

80 

81 

82def query_all_datasets( 

83 butler: Butler, query: Query, args: QueryAllDatasetsParameters 

84) -> Iterator[DatasetsPage]: 

85 """Query for dataset refs from multiple types simultaneously. 

86 

87 Parameters 

88 ---------- 

89 butler : `Butler` 

90 Butler instance to use for executing queries. 

91 query : `Query` 

92 Query context object to use for executing queries. 

93 args : `QueryAllDatasetsParameters` 

94 Arguments describing the query to be performed. 

95 

96 Raises 

97 ------ 

98 MissingDatasetTypeError 

99 When no dataset types match ``name``, or an explicit (non-glob) 

100 dataset type in ``name`` does not exist. 

101 InvalidQueryError 

102 If the parameters to the query are inconsistent or malformed. 

103 MissingCollectionError 

104 If a given collection is not found. 

105 

106 Returns 

107 ------- 

108 pages : `~collections.abc.Iterator` [ `DatasetsPage` ] 

109 `DatasetRef` results matching the given query criteria, grouped by 

110 dataset type. 

111 """ 

112 if args.find_first and has_globs(args.collections): 

113 raise InvalidQueryError("Can not use wildcards in collections when find_first=True") 

114 

115 dataset_type_query = list(ensure_iterable(args.name)) 

116 

117 with butler.registry.caching_context(): 

118 dataset_type_collections = _filter_collections_and_dataset_types( 

119 butler, args.collections, dataset_type_query 

120 ) 

121 

122 limit = args.limit 

123 for dt, filtered_collections in sorted(dataset_type_collections.items()): 

124 _LOG.debug("Querying dataset type %s", dt) 

125 results = ( 

126 query.datasets(dt, filtered_collections, find_first=args.find_first) 

127 .where(args.data_id, args.where, args.kwargs, bind=args.bind) 

128 .limit(limit) 

129 ) 

130 if args.with_dimension_records: 

131 results = results.with_dimension_records() 

132 

133 for page in results._iter_pages(): 

134 if limit is not None: 

135 # Track how much of the limit has been used up by each 

136 # query. 

137 limit -= len(page) 

138 

139 yield DatasetsPage(dataset_type=dt, data=page) 

140 

141 if limit is not None and limit <= 0: 

142 break 

143 

144 

145def _filter_collections_and_dataset_types( 

146 butler: Butler, collections: Sequence[str], dataset_type_query: Sequence[str] 

147) -> Mapping[str, list[str]]: 

148 """For each dataset type matching the query, filter down the given 

149 collections to only those that might actually contain datasets of the given 

150 type. 

151 

152 Parameters 

153 ---------- 

154 butler 

155 Butler repository to use. 

156 collections 

157 List of collection names or collection search globs. 

158 dataset_type_query 

159 List of dataset type names or search globs. 

160 

161 Returns 

162 ------- 

163 mapping 

164 Mapping from dataset type name to list of collections that contain that 

165 dataset type. 

166 

167 Notes 

168 ----- 

169 Because collection summaries are an approximation, some of the returned 

170 collections may not actually contain datasets of the expected type. 

171 """ 

172 missing_types: list[str] = [] 

173 dataset_types = set(butler.registry.queryDatasetTypes(dataset_type_query, missing=missing_types)) 

174 if len(dataset_types) == 0: 

175 raise MissingDatasetTypeError(f"No dataset types found for query {dataset_type_query}") 

176 if len(missing_types) > 0: 

177 raise MissingDatasetTypeError(f"Dataset types not found: {missing_types}") 

178 

179 # Expand the collections query and include summary information. 

180 query_collections_info = butler.collections.query_info( 

181 collections, 

182 include_summary=True, 

183 flatten_chains=True, 

184 include_chains=False, 

185 summary_datasets=dataset_types, 

186 ) 

187 

188 # Only iterate over dataset types that are relevant for the query. 

189 dataset_type_names = {dataset_type.name for dataset_type in dataset_types} 

190 dataset_type_collections = butler.collections._group_by_dataset_type( 

191 dataset_type_names, query_collections_info 

192 ) 

193 n_dataset_types = len(dataset_types) 

194 if (n_filtered := len(dataset_type_collections)) != n_dataset_types: 

195 _LOG.debug("Filtered %d dataset types down to %d", n_dataset_types, n_filtered) 

196 else: 

197 _LOG.debug("Processing %d dataset type%s", n_dataset_types, "" if n_dataset_types == 1 else "s") 

198 

199 return dataset_type_collections