Coverage for python / lsst / daf / butler / registry / queries / _query_common.py: 39%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import dataclasses 

31from abc import abstractmethod 

32from collections.abc import Iterable, Iterator 

33from contextlib import contextmanager 

34from typing import Any, Generic, Self, TypeVar 

35 

36from ..._butler import Butler 

37from ...dimensions import DataId 

38from ...queries import Query, QueryResultsBase 

39from .._registry import CollectionArgType 

40from ..wildcards import CollectionWildcard 

41from ._results import QueryResultsBase as LegacyQueryResultsBase 

42 

43 

44@dataclasses.dataclass(frozen=True) 

45class CommonQueryArguments: 

46 """Simplified version of the arguments passed to many ``Registry.query*`` 

47 methods. 

48 """ 

49 

50 dataId: DataId | None 

51 dataset_types: list[str] 

52 collections: list[str] | None 

53 where: str 

54 bind: dict[str, Any] | None 

55 kwargs: dict[str, int | str] 

56 check: bool 

57 

58 def replaceCollections(self, collections: list[str]) -> CommonQueryArguments: 

59 return dataclasses.replace(self, collections=collections) 

60 

61 def replaceDatasetTypes(self, dataset_types: list[str]) -> CommonQueryArguments: 

62 return dataclasses.replace(self, dataset_types=dataset_types) 

63 

64 

65_T = TypeVar("_T", bound=QueryResultsBase) 

66_U = TypeVar("_U", bound=QueryResultsBase) 

67 

68 

69class LegacyQueryResultsMixin(Generic[_T], LegacyQueryResultsBase): 

70 """Implements common methods for the various ``QueryResults`` classes in 

71 the legacy query system by forwarding to the new query system. 

72 

73 Parameters 

74 ---------- 

75 butler : `Butler` 

76 Butler object used to execute queries. 

77 args : `CommonQueryArguments` 

78 User-facing arguments forwarded from the original ``registry.query*`` 

79 method. 

80 """ 

81 

82 def __init__( 

83 self, 

84 butler: Butler, 

85 args: CommonQueryArguments, 

86 ) -> None: 

87 self._butler = butler 

88 self._args = args 

89 self._limit: int | None = None 

90 self._order_by: list[str] = [] 

91 

92 def count(self, *, exact: bool = True, discard: bool = False) -> int: 

93 with self._build_query() as result: 

94 return result.count(exact=exact, discard=discard) 

95 

96 def any(self, *, execute: bool = True, exact: bool = True) -> bool: 

97 with self._build_query() as result: 

98 return result.any(execute=execute, exact=exact) 

99 

100 def order_by(self, *args: str) -> Self: 

101 self._order_by.extend(args) 

102 return self 

103 

104 def limit(self, limit: int) -> Self: 

105 self._limit = limit 

106 return self 

107 

108 def explain_no_results(self, execute: bool = True) -> Iterable[str]: 

109 with self._build_query() as result: 

110 return result.explain_no_results(execute=execute) 

111 

112 @contextmanager 

113 def _build_query(self) -> Iterator[_T]: 

114 with self._butler.query() as query: 

115 a = self._args 

116 for dataset_type in a.dataset_types: 

117 query = query.join_dataset_search(dataset_type, a.collections) 

118 if not a.check: 

119 query = query._skip_governor_validation() 

120 

121 result = self._build_result(query) 

122 result = self._apply_result_modifiers(result) 

123 yield result 

124 

125 def _apply_result_modifiers(self, result: _U) -> _U: 

126 a = self._args 

127 

128 result = result.limit(self._limit) 

129 if self._order_by: 

130 result = result.order_by(*self._order_by) 

131 

132 if a.where: 

133 result = result.where(a.where, bind=a.bind) 

134 if a.dataId or a.kwargs: 

135 id_list = [a.dataId] if a.dataId else [] 

136 # dataId and kwargs have to be sent together as part of the 

137 # same call to where() so that the kwargs can override values 

138 # in the data ID. 

139 result = result.where(*id_list, **a.kwargs, bind=None) 

140 

141 return result 

142 

143 @abstractmethod 

144 def _build_result(self, query: Query) -> _T: 

145 raise NotImplementedError("Subclasses must implement _build_result") 

146 

147 

148def resolve_collections( 

149 butler: Butler, collections: CollectionArgType | None, doomed_by: list[str] | None = None 

150) -> list[str]: 

151 """Convert the collection argument used throughout the registry query 

152 methods to a concrete list of collections. 

153 

154 Parameters 

155 ---------- 

156 butler : `Butler` 

157 Butler object used to execute queries. 

158 collections : Any 

159 Any of the values that can be passed as the collections argument to 

160 Registry query methods. 

161 doomed_by : `list` [ `str` ] 

162 Diagnostic messages will be appended to this list. 

163 

164 Returns 

165 ------- 

166 collections : `list` [ `str` ] 

167 Concrete list of collection names to be used by the query. 

168 """ 

169 if collections is None: 

170 return list(butler.collections.defaults) 

171 

172 wildcard = CollectionWildcard.from_expression(collections) 

173 if wildcard.patterns: 

174 result = list(butler.registry.queryCollections(collections)) 

175 if not result and doomed_by is not None: 

176 doomed_by.append(f"No collections found matching expression {wildcard}") 

177 return result 

178 else: 

179 return list(wildcard.strings)