Coverage for python / lsst / daf / butler / queries / result_specs.py: 39%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:18 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "DataCoordinateResultSpec", 

32 "DatasetRefResultSpec", 

33 "DimensionRecordResultSpec", 

34 "ResultSpecBase", 

35) 

36 

37from abc import ABC, abstractmethod 

38from collections.abc import Mapping 

39from typing import Annotated, Literal, TypeAlias, cast 

40 

41import pydantic 

42 

43from .._exceptions import InvalidQueryError 

44from ..dimensions import DimensionElement, DimensionGroup, DimensionUniverse 

45from ..pydantic_utils import DeferredValidation 

46from .tree import AnyDatasetFieldName, AnyDatasetType, ColumnSet, OrderExpression, QueryTree 

47 

48 

49class ResultSpecBase(pydantic.BaseModel, ABC): 

50 """Base class for all query-result specification objects. 

51 

52 A result specification is a struct that is combined with a `QueryTree` to 

53 represent a serializable query-results object. 

54 """ 

55 

56 result_type: str 

57 """String literal that corresponds to a concrete derived type.""" 

58 

59 order_by: tuple[OrderExpression, ...] = () 

60 """Expressions to sort the rows by.""" 

61 

62 limit: int | None = None 

63 """Maximum number of rows to return, or `None` for no bound.""" 

64 

65 allow_duplicate_overlaps: bool = False 

66 """If set to True the queries are allowed to returnd duplicate rows for 

67 spatial overlaps. 

68 """ 

69 

70 def validate_tree(self, tree: QueryTree) -> None: 

71 """Check that this result object is consistent with a query tree. 

72 

73 Parameters 

74 ---------- 

75 tree : `QueryTree` 

76 Query tree that defines the joins and row-filtering that these 

77 results will come from. 

78 """ 

79 spec = cast(ResultSpec, self) 

80 if not spec.dimensions <= tree.dimensions: 

81 raise InvalidQueryError( 

82 f"Query result specification has dimensions {spec.dimensions} that are not a subset of the " 

83 f"query's dimensions {tree.dimensions}." 

84 ) 

85 result_columns = spec.get_result_columns() 

86 assert result_columns.dimensions == spec.dimensions, "enforced by ResultSpec implementations" 

87 for dataset_type in result_columns.dataset_fields: 

88 if dataset_type not in tree.datasets: 

89 raise InvalidQueryError(f"Dataset {dataset_type!r} is not available from this query.") 

90 order_by_columns = ColumnSet(spec.dimensions) 

91 for term in spec.order_by: 

92 term.gather_required_columns(order_by_columns) 

93 if not (order_by_columns.dimensions <= spec.dimensions): 

94 allowed_columns = spec.dimensions.names 

95 invalid_columns = order_by_columns.dimensions.names - allowed_columns 

96 raise InvalidQueryError( 

97 "Order-by expression may not reference columns that are not in the result dimensions.\n" 

98 f"Invalid columns: {invalid_columns}\n" 

99 f"Available columns: {allowed_columns}" 

100 ) 

101 for dataset_type in order_by_columns.dataset_fields.keys(): 

102 if dataset_type not in tree.datasets: 

103 raise InvalidQueryError( 

104 f"Dataset type {dataset_type!r} in order-by expression is not part of the query." 

105 ) 

106 

107 @property 

108 def find_first_dataset(self) -> str | AnyDatasetType | None: 

109 """The dataset type for which find-first resolution is required, if 

110 any. 

111 """ 

112 return None 

113 

114 @abstractmethod 

115 def get_result_columns(self) -> ColumnSet: 

116 """Return the columns included in the actual result rows. 

117 

118 This does not necessarily include all columns required by the 

119 `order_by` terms that are also a part of this spec. 

120 """ 

121 raise NotImplementedError() 

122 

123 

124class DataCoordinateResultSpec(ResultSpecBase): 

125 """Specification for a query that yields `DataCoordinate` objects.""" 

126 

127 result_type: Literal["data_coordinate"] = "data_coordinate" 

128 

129 dimensions: DimensionGroup 

130 """The dimensions of the data IDs returned by this query.""" 

131 

132 include_dimension_records: bool = False 

133 """Whether the returned data IDs include dimension records.""" 

134 

135 def get_result_columns(self) -> ColumnSet: 

136 # Docstring inherited. 

137 result = ColumnSet(self.dimensions) 

138 if self.include_dimension_records: 

139 _add_dimension_records_to_column_set(self.dimensions, result) 

140 return result 

141 

142 

143class DimensionRecordResultSpec(ResultSpecBase): 

144 """Specification for a query that yields `DimensionRecord` objects.""" 

145 

146 result_type: Literal["dimension_record"] = "dimension_record" 

147 

148 element: DimensionElement 

149 """The name and definition of the dimension records returned by this query. 

150 """ 

151 

152 @property 

153 def dimensions(self) -> DimensionGroup: 

154 """The dimensions that are required or implied (directly or indirectly) 

155 by this dimension element. 

156 """ 

157 return self.element.minimal_group 

158 

159 def get_result_columns(self) -> ColumnSet: 

160 # Docstring inherited. 

161 result = ColumnSet(self.element.minimal_group) 

162 if self.element not in self.dimensions.universe.skypix_dimensions: 

163 result.dimension_fields[self.element.name].update(self.element.schema.remainder.names) 

164 result.drop_dimension_keys(self.element.minimal_group.names - self.element.dimensions.names) 

165 return result 

166 

167 

168class DatasetRefResultSpec(ResultSpecBase): 

169 """Specification for a query that yields `DatasetRef` objects.""" 

170 

171 result_type: Literal["dataset_ref"] = "dataset_ref" 

172 

173 dataset_type_name: str 

174 """The dataset type name of the datasets returned by this query.""" 

175 

176 dimensions: DimensionGroup 

177 """The dimensions of the datasets returned by this query.""" 

178 

179 storage_class_name: str 

180 """The name of the storage class of the datasets returned by this query.""" 

181 

182 include_dimension_records: bool = False 

183 """Whether the data IDs returned by this query include dimension records. 

184 """ 

185 

186 find_first: bool 

187 """Whether this query should resolve data ID duplicates according to the 

188 order of the collections to be searched. 

189 """ 

190 

191 @property 

192 def find_first_dataset(self) -> str | None: 

193 # Docstring inherited. 

194 return self.dataset_type_name if self.find_first else None 

195 

196 def get_result_columns(self) -> ColumnSet: 

197 # Docstring inherited. 

198 result = ColumnSet(self.dimensions) 

199 result.dataset_fields[self.dataset_type_name].update({"dataset_id", "run"}) 

200 if self.include_dimension_records: 

201 _add_dimension_records_to_column_set(self.dimensions, result) 

202 return result 

203 

204 

205class GeneralResultSpec(ResultSpecBase): 

206 """Specification for a query that yields a table with 

207 an explicit list of columns. 

208 """ 

209 

210 result_type: Literal["general"] = "general" 

211 

212 dimensions: DimensionGroup 

213 """The dimensions that span all fields returned by this query.""" 

214 

215 dimension_fields: Mapping[str, set[str]] 

216 """Dimension record fields included in this query.""" 

217 

218 dataset_fields: Mapping[str, set[AnyDatasetFieldName]] 

219 """Dataset fields included in this query.""" 

220 

221 include_dimension_records: bool = False 

222 """Whether to include fields for all dimension records, in addition to 

223 explicitly specified in `dimension_fields`. 

224 """ 

225 

226 find_first: bool 

227 """Whether this query requires find-first resolution for a dataset. 

228 

229 This can only be `True` if exactly one dataset type's fields are included 

230 in the results and the ``collection`` and ``timespan`` fields for that 

231 dataset are not included. 

232 """ 

233 

234 @property 

235 def find_first_dataset(self) -> str | None: 

236 # Docstring inherited. 

237 if self.find_first: 

238 if len(self.dataset_fields) != 1: 

239 raise InvalidQueryError( 

240 "General query with find_first=True cannot have results from multiple dataset searches." 

241 ) 

242 (dataset_type,) = self.dataset_fields.keys() 

243 return dataset_type 

244 return None 

245 

246 def get_result_columns(self) -> ColumnSet: 

247 # Docstring inherited. 

248 result = ColumnSet(self.dimensions) 

249 for element_name, fields_for_element in self.dimension_fields.items(): 

250 result.dimension_fields[element_name].update(fields_for_element) 

251 for dataset_type, fields_for_dataset in self.dataset_fields.items(): 

252 result.dataset_fields[dataset_type].update(fields_for_dataset) 

253 if self.include_dimension_records: 

254 # This only adds record fields for non-cached and non-skypix 

255 # elements, this is what we want when generating query. When 

256 # `include_dimension_records` is True, dimension records for cached 

257 # and skypix elements are added to result pages by page converter. 

258 _add_dimension_records_to_column_set(self.dimensions, result) 

259 return result 

260 

261 @pydantic.model_validator(mode="after") 

262 def _validate(self) -> GeneralResultSpec: 

263 if self.find_first and len(self.dataset_fields) != 1: 

264 raise InvalidQueryError("find_first=True requires exactly one result dataset type.") 

265 for element_name, fields_for_element in self.dimension_fields.items(): 

266 if element_name not in self.dimensions.elements: 

267 raise InvalidQueryError(f"Dimension element {element_name} is not in {self.dimensions}.") 

268 if not fields_for_element: 

269 raise InvalidQueryError( 

270 f"Empty dimension element field set for {element_name!r} is not permitted." 

271 ) 

272 elif element_name in self.dimensions.universe.skypix_dimensions.names: 

273 raise InvalidQueryError( 

274 f"Regions for skypix dimension {element_name!r} are not stored; compute them via " 

275 f"{element_name}.pixelization.pixel(id) instead." 

276 ) 

277 for dataset_type, fields_for_dataset in self.dataset_fields.items(): 

278 if not fields_for_dataset: 

279 raise InvalidQueryError(f"Empty dataset field set for {dataset_type!r} is not permitted.") 

280 return self 

281 

282 

283ResultSpec: TypeAlias = Annotated[ 

284 DataCoordinateResultSpec | DimensionRecordResultSpec | DatasetRefResultSpec | GeneralResultSpec, 

285 pydantic.Field(discriminator="result_type"), 

286] 

287 

288 

289class SerializedResultSpec(DeferredValidation[ResultSpec]): 

290 def to_result_spec(self, universe: DimensionUniverse) -> ResultSpec: 

291 return self.validated(universe=universe) 

292 

293 

294def _add_dimension_records_to_column_set(dimensions: DimensionGroup, column_set: ColumnSet) -> None: 

295 """Add extra columns for generating 'expanded' data IDs that include 

296 dimension records. 

297 """ 

298 for element_name in dimensions.elements: 

299 element = dimensions.universe[element_name] 

300 if not element.is_cached and element not in dimensions.universe.skypix_dimensions: 

301 column_set.dimension_fields[element_name].update(element.schema.remainder.names)