Coverage for python / lsst / daf / butler / script / queryDatasets.py: 17%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29import logging
30import warnings
31from collections import defaultdict
32from collections.abc import Iterable, Iterator
33from contextlib import contextmanager
34from itertools import chain, groupby
35from typing import TYPE_CHECKING
37import numpy as np
38from astropy.table import Table as AstropyTable
40from .._butler import Butler
41from .._exceptions import MissingDatasetTypeError
42from .._query_all_datasets import QueryAllDatasetsParameters
43from ..cli.utils import sortAstropyTable
44from ..utils import has_globs
46if TYPE_CHECKING:
47 from lsst.daf.butler import DatasetRef
48 from lsst.resources import ResourcePath
51_LOG = logging.getLogger(__name__)
54class _Table:
55 """Aggregates rows for a single dataset type, and creates an astropy table
56 with the aggregated data. Eliminates duplicate rows.
57 """
59 datasetRefs: dict[DatasetRef, str | None]
61 def __init__(self) -> None:
62 self.datasetRefs = {}
64 def add(self, datasetRef: DatasetRef, uri: ResourcePath | None = None) -> None:
65 """Add a row of information to the table.
67 ``uri`` is optional but must be the consistent; provided or not, for
68 every call to a ``_Table`` instance.
70 Parameters
71 ----------
72 datasetRef : `DatasetRef`
73 A dataset ref that will be added as a row in the table.
74 uri : `lsst.resources.ResourcePath`, optional
75 The URI to show as a file location in the table, by default `None`.
76 """
77 uri_str = str(uri) if uri else None
78 # Use a dict to retain ordering.
79 self.datasetRefs[datasetRef] = uri_str
81 def getAstropyTable(self, datasetTypeName: str, sort: bool = True) -> AstropyTable:
82 """Get the table as an astropy table.
84 Parameters
85 ----------
86 datasetTypeName : `str`
87 The dataset type name to show in the ``type`` column of the table.
88 sort : `bool`, optional
89 If `True` the table will be sorted.
91 Returns
92 -------
93 table : `astropy.table._Table`
94 The table with the provided column names and rows.
95 """
96 # Should never happen; adding a dataset should be the action that
97 # causes a _Table to be created.
98 if not self.datasetRefs:
99 raise RuntimeError(f"No DatasetRefs were provided for dataset type {datasetTypeName}")
101 ref = next(iter(self.datasetRefs))
102 dimensions = [ref.dataId.universe.dimensions[k] for k in ref.dataId.dimensions.data_coordinate_keys]
103 columnNames = ["type", "run", "id", *[str(item) for item in dimensions]]
105 # Need to hint the column types for numbers since the per-row
106 # constructor of Table does not work this out on its own and sorting
107 # will not work properly without.
108 typeMap = {float: np.float64, int: np.int64}
109 columnTypes = [
110 None,
111 None,
112 str,
113 *[typeMap.get(type(value)) for value in ref.dataId.full_values],
114 ]
115 if self.datasetRefs[ref]:
116 columnNames.append("URI")
117 columnTypes.append(None)
119 rows = []
120 for ref, uri in self.datasetRefs.items():
121 row = [
122 datasetTypeName,
123 ref.run,
124 str(ref.id),
125 *ref.dataId.full_values,
126 ]
127 if uri:
128 row.append(uri)
129 rows.append(row)
131 dataset_table = AstropyTable(np.array(rows), names=columnNames, dtype=columnTypes)
132 if sort:
133 return sortAstropyTable(dataset_table, dimensions, ["type", "run"])
134 else:
135 return dataset_table
138class QueryDatasets:
139 """Get dataset refs from a repository.
141 Parameters
142 ----------
143 glob : `~collections.abc.Iterable` [`str`]
144 A list of glob-style search string that fully or partially identify
145 the dataset type names to search for.
146 collections : `~collections.abc.Iterable` [`str`]
147 A list of glob-style search string that fully or partially identify
148 the collections to search for.
149 where : `str`
150 A string expression similar to a SQL WHERE clause. May involve any
151 column of a dimension table or (as a shortcut for the primary key
152 column of a dimension table) dimension name.
153 find_first : `bool`
154 For each result data ID, only yield one DatasetRef of each DatasetType,
155 from the first collection in which a dataset of that dataset type
156 appears (according to the order of `collections` passed in). If used,
157 `collections` must specify at least one expression and must not contain
158 wildcards.
159 show_uri : `bool`
160 If True, include the dataset URI in the output.
161 butler : `lsst.daf.butler.Butler`
162 The butler to use to query.
163 limit : `int`, optional
164 Limit the number of results to be returned. A value of 0 means
165 unlimited. A negative value is used to specify a cap where a warning
166 is issued if that cap is hit.
167 order_by : `tuple` of `str`
168 Dimensions to use for sorting results. If no ordering is given the
169 results of ``limit`` are undefined and default sorting of the resulting
170 datasets will be applied. It is an error if the requested ordering
171 is inconsistent with the dimensions of the dataset type being queried.
172 with_dimension_records : `bool`, optional
173 If `True` (default is `False`) then returned data IDs will have
174 dimension records.
175 """
177 def __init__(
178 self,
179 glob: Iterable[str],
180 collections: Iterable[str],
181 where: str,
182 find_first: bool,
183 show_uri: bool,
184 butler: Butler,
185 limit: int = 0,
186 order_by: tuple[str, ...] = (),
187 with_dimension_records: bool = False,
188 ):
189 collections = list(collections)
190 if not collections:
191 warnings.warn(
192 "No --collections specified. The --collections argument will become mandatory after v28.",
193 FutureWarning,
194 stacklevel=2,
195 )
196 glob = list(glob)
197 if not glob:
198 warnings.warn(
199 "No dataset types specified. Explicitly specifying dataset types will become mandatory"
200 " after v28. Specify '*' to match the current behavior of querying all dataset types.",
201 FutureWarning,
202 stacklevel=2,
203 )
204 glob = ["*"]
206 searches_multiple_dataset_types = len(glob) > 1 or has_globs(glob)
207 if order_by and searches_multiple_dataset_types:
208 raise NotImplementedError("--order-by is only supported for queries with a single dataset type.")
210 self.butler = butler
211 self.showUri = show_uri
212 self._dataset_type_glob = glob
213 self._collections_wildcard = collections
214 self._where = where
215 self._find_first = find_first
216 self._limit = limit
217 self._order_by = order_by
218 self._searches_multiple_dataset_types = searches_multiple_dataset_types
219 self._with_dimension_records = with_dimension_records
221 def getTables(self) -> Iterator[AstropyTable]:
222 """Get the datasets as a list of astropy tables.
224 Yields
225 ------
226 datasetTables : `collections.abc.Iterator` [``astropy.table._Table``]
227 Astropy tables, one for each dataset type.
228 """
229 # Sort if we haven't been told to enforce an order.
230 sort_table = not bool(self._order_by)
232 if not self.showUri:
233 for refs in self.getDatasets():
234 table = _Table()
235 for ref in refs:
236 table.add(ref)
237 if refs:
238 yield table.getAstropyTable(refs[0].datasetType.name, sort=sort_table)
239 else:
240 for refs in self.getDatasets():
241 if not refs:
242 continue
243 # For URIs of disassembled composites we create a table per
244 # component.
245 tables: dict[str, _Table] = defaultdict(_Table)
246 dataset_type_name = refs[0].datasetType.name
247 ref_uris = self.butler.get_many_uris(refs, predict=True)
248 for ref, uris in ref_uris.items():
249 if uris.primaryURI:
250 tables[dataset_type_name].add(ref, uris.primaryURI)
251 for name, uri in uris.componentURIs.items():
252 tables[ref.datasetType.componentTypeName(name)].add(ref, uri)
253 for name in sorted(tables):
254 yield tables[name].getAstropyTable(name, sort=sort_table)
255 return
257 def getDatasets(self) -> Iterator[list[DatasetRef]]:
258 """Get the datasets as a list of lists.
260 Yields
261 ------
262 refs : `collections.abc.Iterator` [ `list [ `DatasetRef` ] ]
263 Dataset references matching the given query criteria grouped
264 by dataset type.
265 """
266 query_collections = self._collections_wildcard or ["*"]
268 warn_limit = False
269 limit_reached = False
270 if self._limit < 0:
271 # Negative limit means we should warn if the limit is exceeded.
272 warn_limit = True
273 limit = abs(self._limit) + 1 # +1 to tell us we hit the limit.
274 elif self._limit == 0:
275 # 0 means 'unlimited' in the CLI.
276 limit = None
277 else:
278 limit = self._limit
280 # We want to allow --order-by, but the query backend only supports
281 # it when there is a single dataset type.
282 # So we have to select different backends for single vs multiple
283 # dataset type queries.
284 if self._searches_multiple_dataset_types:
285 query_func = self._query_multiple_dataset_types
286 else:
287 query_func = self._query_single_dataset_type
289 try:
290 with query_func(query_collections, limit) as ref_iter:
291 datasets_found = 0
292 for dataset_type, chunk in groupby(ref_iter, lambda ref: ref.datasetType.name):
293 refs = list(chunk)
294 datasets_found += len(refs)
295 if warn_limit and limit is not None and datasets_found >= limit:
296 # We asked for one too many so must remove that from
297 # the list.
298 refs = refs[0:-1]
299 limit_reached = True
301 yield refs
303 _LOG.debug("Got %d results for dataset type %s", len(refs), dataset_type)
304 except MissingDatasetTypeError as e:
305 _LOG.info(str(e))
306 return
308 if limit is not None and limit_reached:
309 _LOG.warning(
310 "Requested limit of %d hit for number of datasets returned. "
311 "Use --limit to increase this limit.",
312 limit - 1,
313 )
315 @contextmanager
316 def _query_multiple_dataset_types(
317 self, collections: list[str], limit: int | None
318 ) -> Iterator[Iterator[DatasetRef]]:
319 args = QueryAllDatasetsParameters(
320 collections=collections,
321 find_first=self._find_first,
322 name=self._dataset_type_glob,
323 where=self._where,
324 limit=limit,
325 data_id={},
326 bind={},
327 with_dimension_records=self._with_dimension_records,
328 )
329 with self.butler._query_all_datasets_by_page(args) as iterator:
330 yield chain.from_iterable(iterator)
332 @contextmanager
333 def _query_single_dataset_type(
334 self, collections: list[str], limit: int | None
335 ) -> Iterator[Iterator[DatasetRef]]:
336 assert len(self._dataset_type_glob) == 1
337 dataset_type = self._dataset_type_glob[0]
339 refs = self.butler.query_datasets(
340 dataset_type,
341 collections=collections,
342 find_first=self._find_first,
343 where=self._where,
344 limit=limit,
345 order_by=self._order_by,
346 with_dimension_records=self._with_dimension_records,
347 )
349 yield iter(refs)