Coverage for python / lsst / daf / butler / script / queryDatasets.py: 17%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29import logging 

30import warnings 

31from collections import defaultdict 

32from collections.abc import Iterable, Iterator 

33from contextlib import contextmanager 

34from itertools import chain, groupby 

35from typing import TYPE_CHECKING 

36 

37import numpy as np 

38from astropy.table import Table as AstropyTable 

39 

40from .._butler import Butler 

41from .._exceptions import MissingDatasetTypeError 

42from .._query_all_datasets import QueryAllDatasetsParameters 

43from ..cli.utils import sortAstropyTable 

44from ..utils import has_globs 

45 

46if TYPE_CHECKING: 

47 from lsst.daf.butler import DatasetRef 

48 from lsst.resources import ResourcePath 

49 

50 

51_LOG = logging.getLogger(__name__) 

52 

53 

54class _Table: 

55 """Aggregates rows for a single dataset type, and creates an astropy table 

56 with the aggregated data. Eliminates duplicate rows. 

57 """ 

58 

59 datasetRefs: dict[DatasetRef, str | None] 

60 

61 def __init__(self) -> None: 

62 self.datasetRefs = {} 

63 

64 def add(self, datasetRef: DatasetRef, uri: ResourcePath | None = None) -> None: 

65 """Add a row of information to the table. 

66 

67 ``uri`` is optional but must be the consistent; provided or not, for 

68 every call to a ``_Table`` instance. 

69 

70 Parameters 

71 ---------- 

72 datasetRef : `DatasetRef` 

73 A dataset ref that will be added as a row in the table. 

74 uri : `lsst.resources.ResourcePath`, optional 

75 The URI to show as a file location in the table, by default `None`. 

76 """ 

77 uri_str = str(uri) if uri else None 

78 # Use a dict to retain ordering. 

79 self.datasetRefs[datasetRef] = uri_str 

80 

81 def getAstropyTable(self, datasetTypeName: str, sort: bool = True) -> AstropyTable: 

82 """Get the table as an astropy table. 

83 

84 Parameters 

85 ---------- 

86 datasetTypeName : `str` 

87 The dataset type name to show in the ``type`` column of the table. 

88 sort : `bool`, optional 

89 If `True` the table will be sorted. 

90 

91 Returns 

92 ------- 

93 table : `astropy.table._Table` 

94 The table with the provided column names and rows. 

95 """ 

96 # Should never happen; adding a dataset should be the action that 

97 # causes a _Table to be created. 

98 if not self.datasetRefs: 

99 raise RuntimeError(f"No DatasetRefs were provided for dataset type {datasetTypeName}") 

100 

101 ref = next(iter(self.datasetRefs)) 

102 dimensions = [ref.dataId.universe.dimensions[k] for k in ref.dataId.dimensions.data_coordinate_keys] 

103 columnNames = ["type", "run", "id", *[str(item) for item in dimensions]] 

104 

105 # Need to hint the column types for numbers since the per-row 

106 # constructor of Table does not work this out on its own and sorting 

107 # will not work properly without. 

108 typeMap = {float: np.float64, int: np.int64} 

109 columnTypes = [ 

110 None, 

111 None, 

112 str, 

113 *[typeMap.get(type(value)) for value in ref.dataId.full_values], 

114 ] 

115 if self.datasetRefs[ref]: 

116 columnNames.append("URI") 

117 columnTypes.append(None) 

118 

119 rows = [] 

120 for ref, uri in self.datasetRefs.items(): 

121 row = [ 

122 datasetTypeName, 

123 ref.run, 

124 str(ref.id), 

125 *ref.dataId.full_values, 

126 ] 

127 if uri: 

128 row.append(uri) 

129 rows.append(row) 

130 

131 dataset_table = AstropyTable(np.array(rows), names=columnNames, dtype=columnTypes) 

132 if sort: 

133 return sortAstropyTable(dataset_table, dimensions, ["type", "run"]) 

134 else: 

135 return dataset_table 

136 

137 

138class QueryDatasets: 

139 """Get dataset refs from a repository. 

140 

141 Parameters 

142 ---------- 

143 glob : `~collections.abc.Iterable` [`str`] 

144 A list of glob-style search string that fully or partially identify 

145 the dataset type names to search for. 

146 collections : `~collections.abc.Iterable` [`str`] 

147 A list of glob-style search string that fully or partially identify 

148 the collections to search for. 

149 where : `str` 

150 A string expression similar to a SQL WHERE clause. May involve any 

151 column of a dimension table or (as a shortcut for the primary key 

152 column of a dimension table) dimension name. 

153 find_first : `bool` 

154 For each result data ID, only yield one DatasetRef of each DatasetType, 

155 from the first collection in which a dataset of that dataset type 

156 appears (according to the order of `collections` passed in). If used, 

157 `collections` must specify at least one expression and must not contain 

158 wildcards. 

159 show_uri : `bool` 

160 If True, include the dataset URI in the output. 

161 butler : `lsst.daf.butler.Butler` 

162 The butler to use to query. 

163 limit : `int`, optional 

164 Limit the number of results to be returned. A value of 0 means 

165 unlimited. A negative value is used to specify a cap where a warning 

166 is issued if that cap is hit. 

167 order_by : `tuple` of `str` 

168 Dimensions to use for sorting results. If no ordering is given the 

169 results of ``limit`` are undefined and default sorting of the resulting 

170 datasets will be applied. It is an error if the requested ordering 

171 is inconsistent with the dimensions of the dataset type being queried. 

172 with_dimension_records : `bool`, optional 

173 If `True` (default is `False`) then returned data IDs will have 

174 dimension records. 

175 """ 

176 

177 def __init__( 

178 self, 

179 glob: Iterable[str], 

180 collections: Iterable[str], 

181 where: str, 

182 find_first: bool, 

183 show_uri: bool, 

184 butler: Butler, 

185 limit: int = 0, 

186 order_by: tuple[str, ...] = (), 

187 with_dimension_records: bool = False, 

188 ): 

189 collections = list(collections) 

190 if not collections: 

191 warnings.warn( 

192 "No --collections specified. The --collections argument will become mandatory after v28.", 

193 FutureWarning, 

194 stacklevel=2, 

195 ) 

196 glob = list(glob) 

197 if not glob: 

198 warnings.warn( 

199 "No dataset types specified. Explicitly specifying dataset types will become mandatory" 

200 " after v28. Specify '*' to match the current behavior of querying all dataset types.", 

201 FutureWarning, 

202 stacklevel=2, 

203 ) 

204 glob = ["*"] 

205 

206 searches_multiple_dataset_types = len(glob) > 1 or has_globs(glob) 

207 if order_by and searches_multiple_dataset_types: 

208 raise NotImplementedError("--order-by is only supported for queries with a single dataset type.") 

209 

210 self.butler = butler 

211 self.showUri = show_uri 

212 self._dataset_type_glob = glob 

213 self._collections_wildcard = collections 

214 self._where = where 

215 self._find_first = find_first 

216 self._limit = limit 

217 self._order_by = order_by 

218 self._searches_multiple_dataset_types = searches_multiple_dataset_types 

219 self._with_dimension_records = with_dimension_records 

220 

221 def getTables(self) -> Iterator[AstropyTable]: 

222 """Get the datasets as a list of astropy tables. 

223 

224 Yields 

225 ------ 

226 datasetTables : `collections.abc.Iterator` [``astropy.table._Table``] 

227 Astropy tables, one for each dataset type. 

228 """ 

229 # Sort if we haven't been told to enforce an order. 

230 sort_table = not bool(self._order_by) 

231 

232 if not self.showUri: 

233 for refs in self.getDatasets(): 

234 table = _Table() 

235 for ref in refs: 

236 table.add(ref) 

237 if refs: 

238 yield table.getAstropyTable(refs[0].datasetType.name, sort=sort_table) 

239 else: 

240 for refs in self.getDatasets(): 

241 if not refs: 

242 continue 

243 # For URIs of disassembled composites we create a table per 

244 # component. 

245 tables: dict[str, _Table] = defaultdict(_Table) 

246 dataset_type_name = refs[0].datasetType.name 

247 ref_uris = self.butler.get_many_uris(refs, predict=True) 

248 for ref, uris in ref_uris.items(): 

249 if uris.primaryURI: 

250 tables[dataset_type_name].add(ref, uris.primaryURI) 

251 for name, uri in uris.componentURIs.items(): 

252 tables[ref.datasetType.componentTypeName(name)].add(ref, uri) 

253 for name in sorted(tables): 

254 yield tables[name].getAstropyTable(name, sort=sort_table) 

255 return 

256 

257 def getDatasets(self) -> Iterator[list[DatasetRef]]: 

258 """Get the datasets as a list of lists. 

259 

260 Yields 

261 ------ 

262 refs : `collections.abc.Iterator` [ `list [ `DatasetRef` ] ] 

263 Dataset references matching the given query criteria grouped 

264 by dataset type. 

265 """ 

266 query_collections = self._collections_wildcard or ["*"] 

267 

268 warn_limit = False 

269 limit_reached = False 

270 if self._limit < 0: 

271 # Negative limit means we should warn if the limit is exceeded. 

272 warn_limit = True 

273 limit = abs(self._limit) + 1 # +1 to tell us we hit the limit. 

274 elif self._limit == 0: 

275 # 0 means 'unlimited' in the CLI. 

276 limit = None 

277 else: 

278 limit = self._limit 

279 

280 # We want to allow --order-by, but the query backend only supports 

281 # it when there is a single dataset type. 

282 # So we have to select different backends for single vs multiple 

283 # dataset type queries. 

284 if self._searches_multiple_dataset_types: 

285 query_func = self._query_multiple_dataset_types 

286 else: 

287 query_func = self._query_single_dataset_type 

288 

289 try: 

290 with query_func(query_collections, limit) as ref_iter: 

291 datasets_found = 0 

292 for dataset_type, chunk in groupby(ref_iter, lambda ref: ref.datasetType.name): 

293 refs = list(chunk) 

294 datasets_found += len(refs) 

295 if warn_limit and limit is not None and datasets_found >= limit: 

296 # We asked for one too many so must remove that from 

297 # the list. 

298 refs = refs[0:-1] 

299 limit_reached = True 

300 

301 yield refs 

302 

303 _LOG.debug("Got %d results for dataset type %s", len(refs), dataset_type) 

304 except MissingDatasetTypeError as e: 

305 _LOG.info(str(e)) 

306 return 

307 

308 if limit is not None and limit_reached: 

309 _LOG.warning( 

310 "Requested limit of %d hit for number of datasets returned. " 

311 "Use --limit to increase this limit.", 

312 limit - 1, 

313 ) 

314 

315 @contextmanager 

316 def _query_multiple_dataset_types( 

317 self, collections: list[str], limit: int | None 

318 ) -> Iterator[Iterator[DatasetRef]]: 

319 args = QueryAllDatasetsParameters( 

320 collections=collections, 

321 find_first=self._find_first, 

322 name=self._dataset_type_glob, 

323 where=self._where, 

324 limit=limit, 

325 data_id={}, 

326 bind={}, 

327 with_dimension_records=self._with_dimension_records, 

328 ) 

329 with self.butler._query_all_datasets_by_page(args) as iterator: 

330 yield chain.from_iterable(iterator) 

331 

332 @contextmanager 

333 def _query_single_dataset_type( 

334 self, collections: list[str], limit: int | None 

335 ) -> Iterator[Iterator[DatasetRef]]: 

336 assert len(self._dataset_type_glob) == 1 

337 dataset_type = self._dataset_type_glob[0] 

338 

339 refs = self.butler.query_datasets( 

340 dataset_type, 

341 collections=collections, 

342 find_first=self._find_first, 

343 where=self._where, 

344 limit=limit, 

345 order_by=self._order_by, 

346 with_dimension_records=self._with_dimension_records, 

347 ) 

348 

349 yield iter(refs)