Coverage for python / lsst / daf / butler / queries / tree / _query_tree.py: 40%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "DataCoordinateUploadKey", 

32 "DatasetSearch", 

33 "MaterializationKey", 

34 "QueryTree", 

35 "SerializedQueryTree", 

36 "make_identity_query_tree", 

37) 

38 

39import uuid 

40from collections.abc import Iterator, Mapping 

41from typing import TypeAlias, final 

42 

43import pydantic 

44 

45from ..._exceptions import InvalidQueryError 

46from ...dimensions import DimensionGroup, DimensionUniverse 

47from ...pydantic_utils import DeferredValidation 

48from ._base import ANY_DATASET, AnyDatasetType, QueryTreeBase 

49from ._column_set import ColumnSet 

50from ._predicate import Predicate 

51 

52DataCoordinateUploadKey: TypeAlias = uuid.UUID 

53 

54MaterializationKey: TypeAlias = uuid.UUID 

55 

56 

57def make_identity_query_tree(universe: DimensionUniverse) -> QueryTree: 

58 """Make an initial query tree with empty dimensions and a single logical 

59 row. 

60 

61 This method should be used by `Butler._query` to construct the initial 

62 query tree. This tree is a useful initial state because it is the 

63 identity for joins, in that joining any other query tree to the identity 

64 yields that query tree. 

65 

66 Parameters 

67 ---------- 

68 universe : `~lsst.daf.butler.DimensionUniverse` 

69 Definitions for all dimensions. 

70 

71 Returns 

72 ------- 

73 tree : `QueryTree` 

74 A tree with empty dimensions. 

75 """ 

76 return QueryTree(dimensions=universe.empty) 

77 

78 

79@final 

80class DatasetSearch(QueryTreeBase): 

81 """Information about a dataset search joined into a query tree. 

82 

83 The dataset type name is the key of the dictionary (in `QueryTree`) where 

84 this type is used as a value. 

85 """ 

86 

87 collections: tuple[str, ...] 

88 """The collections to search. 

89 

90 Order matters if this dataset type is later referenced by a `FindFirst` 

91 operation. Collection wildcards are always resolved before being included 

92 in a dataset search. 

93 """ 

94 

95 dimensions: DimensionGroup 

96 """The dimensions of the dataset type. 

97 

98 This must match the dimensions of the dataset type as already defined in 

99 the butler database, but this cannot generally be verified when a relation 

100 tree is validated (since it requires a database query) and hence must be 

101 checked later. 

102 """ 

103 

104 

105@final 

106class QueryTree(QueryTreeBase): 

107 """A declarative, serializable description of the row constraints and joins 

108 in a butler query. 

109 

110 Notes 

111 ----- 

112 A `QueryTree` is the struct that represents the serializable form of a 

113 `Query` object, or one piece (with `ResultSpec` the other) of the 

114 serializable form of a query results object. 

115 

116 This class's attributes describe the columns that are "available" to be 

117 returned or used in ``where`` or ``order_by`` expressions, but it does not 

118 carry information about the columns that are actually included in result 

119 rows, or what kind of butler primitive (e.g. `DataCoordinate` or 

120 `DatasetRef`) those rows might be transformed into. 

121 """ 

122 

123 dimensions: DimensionGroup 

124 """The dimensions whose keys are joined into the query. 

125 """ 

126 

127 datasets: Mapping[str, DatasetSearch] = pydantic.Field(default_factory=dict) 

128 """Dataset searches that have been joined into the query.""" 

129 

130 any_dataset: DatasetSearch | None = pydantic.Field(default=None) 

131 """A special optional dataset search for all dataset types with a 

132 particular set of dimensions. 

133 """ 

134 

135 data_coordinate_uploads: Mapping[DataCoordinateUploadKey, DimensionGroup] = pydantic.Field( 

136 default_factory=dict 

137 ) 

138 """Uploaded tables of data ID values that have been joined into the query. 

139 """ 

140 

141 materializations: Mapping[MaterializationKey, DimensionGroup] = pydantic.Field(default_factory=dict) 

142 """Tables of result rows from other queries that have been stored 

143 temporarily on the server. 

144 """ 

145 

146 predicate: Predicate = Predicate.from_bool(True) 

147 """Boolean expression trees whose logical AND defines a row filter.""" 

148 

149 validateGovernorConstraints: bool = True 

150 """If True, enforce the requirement that governor dimensions must be 

151 constrained if any dimensions that depend on them have constraints. 

152 """ 

153 

154 def iter_all_dataset_searches(self) -> Iterator[tuple[str | AnyDatasetType, DatasetSearch]]: 

155 yield from self.datasets.items() 

156 if self.any_dataset is not None: 

157 yield (ANY_DATASET, self.any_dataset) 

158 

159 def get_joined_dimension_groups(self) -> frozenset[DimensionGroup]: 

160 """Return a set of the dimension groups of all data coordinate uploads, 

161 dataset searches, and materializations. 

162 """ 

163 result: set[DimensionGroup] = set(self.data_coordinate_uploads.values()) 

164 result.update(self.materializations.values()) 

165 for dataset_spec in self.datasets.values(): 

166 result.add(dataset_spec.dimensions) 

167 if self.any_dataset is not None: 

168 result.add(self.any_dataset.dimensions) 

169 return frozenset(result) 

170 

171 def join_dimensions(self, dimensions: DimensionGroup) -> QueryTree: 

172 """Return a new tree that includes additional dimensions. 

173 

174 Parameters 

175 ---------- 

176 dimensions : `DimensionGroup` 

177 Dimensions to include. 

178 

179 Returns 

180 ------- 

181 result : `QueryTree` 

182 A new tree with the additional dimensions. 

183 """ 

184 return self.model_copy(update=dict(dimensions=self.dimensions | dimensions)) 

185 

186 def join_data_coordinate_upload( 

187 self, key: DataCoordinateUploadKey, dimensions: DimensionGroup 

188 ) -> QueryTree: 

189 """Return a new tree that joins in an uploaded table of data ID values. 

190 

191 Parameters 

192 ---------- 

193 key : `DataCoordinateUploadKey` 

194 Unique identifier for this upload, as assigned by a `QueryDriver`. 

195 dimensions : `DimensionGroup` 

196 Dimensions of the data IDs. 

197 

198 Returns 

199 ------- 

200 result : `QueryTree` 

201 A new tree that joins in the data ID table. 

202 """ 

203 assert key not in self.data_coordinate_uploads, "Query should prevent doing the same upload twice." 

204 data_coordinate_uploads = dict(self.data_coordinate_uploads) 

205 data_coordinate_uploads[key] = dimensions 

206 return self.model_copy( 

207 update=dict( 

208 dimensions=self.dimensions | dimensions, data_coordinate_uploads=data_coordinate_uploads 

209 ) 

210 ) 

211 

212 def join_materialization(self, key: MaterializationKey, dimensions: DimensionGroup) -> QueryTree: 

213 """Return a new tree that joins in temporarily stored results from 

214 another query. 

215 

216 Parameters 

217 ---------- 

218 key : `MaterializationKey` 

219 Unique identifier for this materialization, as assigned by a 

220 `QueryDriver`. 

221 dimensions : `DimensionGroup` 

222 The dimensions stored in the materialization. 

223 

224 Returns 

225 ------- 

226 result : `QueryTree` 

227 A new tree that joins in the materialization. 

228 """ 

229 assert key not in self.data_coordinate_uploads, "Query should prevent duplicate materialization." 

230 materializations = dict(self.materializations) 

231 materializations[key] = dimensions 

232 return self.model_copy( 

233 update=dict(dimensions=self.dimensions | dimensions, materializations=materializations) 

234 ) 

235 

236 def join_dataset(self, dataset_type: str, search: DatasetSearch) -> QueryTree: 

237 """Return a new tree that joins in a search for a dataset. 

238 

239 Parameters 

240 ---------- 

241 dataset_type : `str` 

242 Name of dataset type to join in. 

243 search : `DatasetSearch` 

244 Struct containing the collection search path and dataset type 

245 dimensions. 

246 

247 Returns 

248 ------- 

249 result : `QueryTree` 

250 A new tree that joins in the dataset search. 

251 """ 

252 if existing := self.datasets.get(dataset_type): 

253 assert existing == search, "Dataset search should be new or the same." 

254 return self 

255 else: 

256 datasets = dict(self.datasets) 

257 datasets[dataset_type] = search 

258 return self.model_copy( 

259 update=dict(dimensions=self.dimensions | search.dimensions, datasets=datasets) 

260 ) 

261 

262 def join_any_dataset(self, search: DatasetSearch) -> QueryTree: 

263 """Return a new tree that joins in a search for any dataset type with 

264 the given diensions. 

265 

266 Parameters 

267 ---------- 

268 search : `DatasetSearch` 

269 Struct containing the collection search path and dimensions. 

270 

271 Returns 

272 ------- 

273 result : `QueryTree` 

274 A new tree that joins in the dataset search. 

275 """ 

276 if self.any_dataset is not None: 

277 assert self.any_dataset == search, "Dataset search should be new or the same." 

278 return self 

279 else: 

280 return self.model_copy( 

281 update=dict(dimensions=self.dimensions | search.dimensions, any_dataset=search) 

282 ) 

283 

284 def where(self, *terms: Predicate) -> QueryTree: 

285 """Return a new tree that adds row filtering via a boolean column 

286 expression. 

287 

288 Parameters 

289 ---------- 

290 *terms : `Predicate` 

291 Boolean column expressions that filter rows. Arguments are 

292 combined with logical AND. 

293 

294 Returns 

295 ------- 

296 result : `QueryTree` 

297 A new tree that with row filtering. 

298 

299 Raises 

300 ------ 

301 InvalidQueryTreeError 

302 Raised if a column expression requires a dataset column that is not 

303 already present in the query tree. 

304 

305 Notes 

306 ----- 

307 If an expression references a dimension or dimension element that is 

308 not already present in the query tree, it will be joined in, but 

309 datasets must already be joined into a query tree in order to reference 

310 their fields in expressions. 

311 """ 

312 predicate = self.predicate 

313 columns = ColumnSet(self.dimensions) 

314 for where_term in terms: 

315 where_term.gather_required_columns(columns) 

316 predicate = predicate.logical_and(where_term) 

317 missing_dataset_types = columns.dataset_fields.keys() - self.datasets.keys() 

318 if self.any_dataset is not None: 

319 missing_dataset_types.discard(ANY_DATASET) 

320 if missing_dataset_types: 

321 raise InvalidQueryError( 

322 f"Cannot reference dataset type(s) {missing_dataset_types} that have not been joined." 

323 ) 

324 return self.model_copy(update=dict(dimensions=columns.dimensions, predicate=predicate)) 

325 

326 @pydantic.model_validator(mode="after") 

327 def _validate_join_operands(self) -> QueryTree: 

328 for dimensions in self.get_joined_dimension_groups(): 

329 if not dimensions.issubset(self.dimensions): 

330 raise InvalidQueryError( 

331 f"Dimensions {dimensions} of join operand are not a " 

332 f"subset of the query tree's dimensions {self.dimensions}." 

333 ) 

334 return self 

335 

336 @pydantic.model_validator(mode="after") 

337 def _validate_required_columns(self) -> QueryTree: 

338 columns = ColumnSet(self.dimensions) 

339 self.predicate.gather_required_columns(columns) 

340 if not columns.dimensions.issubset(self.dimensions): 

341 raise InvalidQueryError("Predicate requires dimensions beyond those in the query tree.") 

342 if not columns.dataset_fields.keys() <= self.datasets.keys(): 

343 raise InvalidQueryError("Predicate requires dataset columns that are not in the query tree.") 

344 return self 

345 

346 

347class SerializedQueryTree(DeferredValidation[QueryTree]): 

348 """A Pydantic-serializable wrapper for `QueryTree` that defers validation 

349 to the `validated` method, allowing a `.DimensionUniverse` to be provided. 

350 """ 

351 

352 def to_query_tree(self, universe: DimensionUniverse) -> QueryTree: 

353 return self.validated(universe=universe)