Coverage for python / lsst / daf / butler / queries / tree / _column_set.py: 27%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("ColumnOrder", "ColumnSet", "ResultColumn") 

31 

32from collections.abc import Iterable, Iterator, Mapping, Sequence, Set 

33from typing import NamedTuple, cast 

34 

35from ... import column_spec 

36from ...dimensions import DataIdValue, DimensionGroup 

37from ...nonempty_mapping import NonemptyMapping 

38from ._base import ANY_DATASET, AnyDatasetFieldName, AnyDatasetType 

39 

40 

41class ColumnSet: 

42 """A set-like hierarchical container for the columns in a query. 

43 

44 Parameters 

45 ---------- 

46 dimensions : `DimensionGroup` 

47 The dimensions that bound the set of columns, and by default specify 

48 the set of dimension key columns present. 

49 

50 Notes 

51 ----- 

52 This class does not inherit from `collections.abc.Set` because that brings 

53 in a lot of requirements we don't need (particularly interoperability with 

54 other set-like objects). 

55 

56 This class is iterable over tuples of ``(logical_table, field)``, where 

57 ``logical_table`` is a dimension element name or dataset type name, and 

58 ``field`` is a column associated with one of those, or `None` for dimension 

59 key columns. Iteration order is guaranteed to be deterministic and to 

60 start with all included dimension keys in 

61 `DimensionGroup.data_coordinate_keys`. 

62 """ 

63 

64 def __init__(self, dimensions: DimensionGroup) -> None: 

65 self._dimensions = dimensions 

66 self._removed_dimension_keys: set[str] = set() 

67 self._dimension_fields: dict[str, set[str]] = {name: set() for name in dimensions.elements} 

68 self._dataset_fields = NonemptyMapping[str | AnyDatasetType, set[AnyDatasetFieldName]](set) 

69 

70 @property 

71 def dimensions(self) -> DimensionGroup: 

72 """The dimensions that bound all columns in the set.""" 

73 return self._dimensions 

74 

75 @property 

76 def dimension_fields(self) -> Mapping[str, set[str]]: 

77 """Dimension record fields included in the set, grouped by dimension 

78 element name. 

79 

80 The keys of this mapping are always ``self.dimensions.elements``, and 

81 nested sets may be empty. 

82 """ 

83 return self._dimension_fields 

84 

85 @property 

86 def dataset_fields(self) -> NonemptyMapping[str | AnyDatasetType, set[AnyDatasetFieldName]]: 

87 """Dataset fields included in the set, grouped by dataset type name. 

88 

89 The keys of this mapping are just those that actually have nonempty 

90 nested sets. 

91 """ 

92 return self._dataset_fields 

93 

94 def __bool__(self) -> bool: 

95 return bool(self._dimensions) or any(self._dataset_fields.values()) 

96 

97 def __eq__(self, other: object) -> bool: 

98 if not isinstance(other, ColumnSet): 

99 return False 

100 return ( 

101 self._dimensions == other._dimensions 

102 and self._removed_dimension_keys == other._removed_dimension_keys 

103 and self._dimension_fields == other._dimension_fields 

104 and self._dataset_fields == other._dataset_fields 

105 ) 

106 

107 def __str__(self) -> str: 

108 return f"{{{', '.join(self.get_qualified_name(k, v) for k, v in self)}}}" 

109 

110 def issubset(self, other: ColumnSet) -> bool: 

111 """Test whether all columns in this set are also in another. 

112 

113 Parameters 

114 ---------- 

115 other : `ColumnSet` 

116 Set of columns to compare to. 

117 

118 Returns 

119 ------- 

120 issubset : `bool` 

121 Whether all columns in ``self`` are also in ``other``. 

122 """ 

123 return ( 

124 (self._get_dimension_keys() <= other._get_dimension_keys()) 

125 and all( 

126 fields.issubset(other._dimension_fields.get(element_name, frozenset())) 

127 for element_name, fields in self._dimension_fields.items() 

128 ) 

129 and all( 

130 fields.issubset(other._dataset_fields.get(dataset_type, frozenset())) 

131 for dataset_type, fields in self._dataset_fields.items() 

132 ) 

133 ) 

134 

135 def issuperset(self, other: ColumnSet) -> bool: 

136 """Test whether all columns another set are also in this one. 

137 

138 Parameters 

139 ---------- 

140 other : `ColumnSet` 

141 Set of columns to compare to. 

142 

143 Returns 

144 ------- 

145 issuperset : `bool` 

146 Whether all columns in ``other`` are also in ``self``. 

147 """ 

148 return other.issubset(self) 

149 

150 def isdisjoint(self, other: ColumnSet) -> bool: 

151 """Test whether there are no columns in both this set and another. 

152 

153 Parameters 

154 ---------- 

155 other : `ColumnSet` 

156 Set of columns to compare to. 

157 

158 Returns 

159 ------- 

160 isdisjoint : `bool` 

161 Whether there are any columns in both ``self`` and ``other``. 

162 """ 

163 return ( 

164 self._get_dimension_keys().isdisjoint(other._get_dimension_keys()) 

165 and all( 

166 fields.isdisjoint(other._dimension_fields.get(element, frozenset())) 

167 for element, fields in self._dimension_fields.items() 

168 ) 

169 and all( 

170 fields.isdisjoint(other._dataset_fields.get(dataset_type, frozenset())) 

171 for dataset_type, fields in self._dataset_fields.items() 

172 ) 

173 ) 

174 

175 def copy(self) -> ColumnSet: 

176 """Return a copy of this set. 

177 

178 Returns 

179 ------- 

180 copy : `ColumnSet` 

181 New column set that can be modified without changing the original. 

182 """ 

183 result = ColumnSet(self._dimensions) 

184 for element_name, element_fields in self._dimension_fields.items(): 

185 result._dimension_fields[element_name].update(element_fields) 

186 for dataset_type, dataset_fields in self._dataset_fields.items(): 

187 result._dataset_fields[dataset_type].update(dataset_fields) 

188 return result 

189 

190 def update_dimensions(self, dimensions: DimensionGroup) -> None: 

191 """Add new dimensions to the set. 

192 

193 Parameters 

194 ---------- 

195 dimensions : `DimensionGroup` 

196 Dimensions to be included. 

197 """ 

198 if not dimensions.issubset(self._dimensions): 

199 self._dimensions = dimensions.union(self._dimensions) 

200 self._dimension_fields = { 

201 name: self._dimension_fields.get(name, set()) for name in self._dimensions.elements 

202 } 

203 self._removed_dimension_keys.intersection_update(dimensions.names) 

204 

205 def update(self, other: ColumnSet) -> None: 

206 """Add columns from another set to this one. 

207 

208 Parameters 

209 ---------- 

210 other : `ColumnSet` 

211 Column set whose columns should be included in this one. 

212 """ 

213 self.update_dimensions(other.dimensions) 

214 self._removed_dimension_keys.intersection_update(other._removed_dimension_keys) 

215 for element_name, element_fields in other._dimension_fields.items(): 

216 self._dimension_fields[element_name].update(element_fields) 

217 for dataset_type, dataset_fields in other._dataset_fields.items(): 

218 self._dataset_fields[dataset_type].update(dataset_fields) 

219 

220 def drop_dimension_keys(self, names: Iterable[str]) -> ColumnSet: 

221 """Remove the given dimension key columns from the set. 

222 

223 Parameters 

224 ---------- 

225 names : `~collections.abc.Iterable` [ `str` ] 

226 Names of the dimensions to remove. 

227 

228 Returns 

229 ------- 

230 self : `ColumnSet` 

231 This column set, modified in place. 

232 """ 

233 self._removed_dimension_keys.update(names) 

234 return self 

235 

236 def drop_implied_dimension_keys(self) -> ColumnSet: 

237 """Remove dimension key columns that are implied by others. 

238 

239 Returns 

240 ------- 

241 self : `ColumnSet` 

242 This column set, modified in place. 

243 """ 

244 return self.drop_dimension_keys(self._dimensions.implied) 

245 

246 def restore_dimension_keys(self) -> None: 

247 """Restore all removed dimension key columns.""" 

248 self._removed_dimension_keys.clear() 

249 

250 def __iter__(self) -> Iterator[ResultColumn]: 

251 yield from self.get_column_order().columns() 

252 

253 def get_column_order(self) -> ColumnOrder: 

254 dimension_names: list[ResultColumn] = [] 

255 for dimension_name in self._dimensions.data_coordinate_keys: 

256 if dimension_name not in self._removed_dimension_keys: 

257 dimension_names.append(ResultColumn(dimension_name, None)) 

258 

259 # We iterate over DimensionElements and their DimensionRecord columns 

260 # in order to make sure that's predictable. We might want to extract 

261 # these query results positionally in some contexts. 

262 dimension_elements: list[ResultColumn] = [] 

263 for element_name in self._dimensions.elements: 

264 element = self._dimensions.universe[element_name] 

265 fields_for_element = self._dimension_fields[element_name] 

266 for spec in element.schema.remainder: 

267 if spec.name in fields_for_element: 

268 dimension_elements.append(ResultColumn(element_name, spec.name)) 

269 

270 # We sort dataset types and their fields lexicographically just to keep 

271 # our queries from having any dependence on set-iteration order. 

272 dataset_fields: list[ResultColumn] = [] 

273 for dataset_type in sorted(self._dataset_fields, key=str): # transform ANY_DATASET to str for sort 

274 for field in sorted(self._dataset_fields[dataset_type]): 

275 dataset_fields.append(ResultColumn(dataset_type, field)) 

276 

277 return ColumnOrder(dimension_names, dimension_elements, dataset_fields) 

278 

279 def is_timespan(self, logical_table: AnyDatasetType | str, field: str | None) -> bool: 

280 """Test whether the given column is a timespan. 

281 

282 Parameters 

283 ---------- 

284 logical_table : `str` or ``ANY_DATASET`` 

285 Name of the dimension element or dataset type the column belongs 

286 to. ``ANY_DATASET`` is used to represent any dataset type. 

287 field : `str` or `None` 

288 Column within the logical table, or `None` for dimension key 

289 columns. 

290 

291 Returns 

292 ------- 

293 is_timespan : `bool` 

294 Whether this column is a timespan. 

295 """ 

296 return field == "timespan" 

297 

298 @staticmethod 

299 def get_qualified_name(logical_table: AnyDatasetType | str, field: str | None) -> str: 

300 """Return string that should be used to fully identify a column. 

301 

302 Parameters 

303 ---------- 

304 logical_table : `str` or ``ANY_DATASET`` 

305 Name of the dimension element or dataset type the column belongs 

306 to. ``ANY_DATASET`` is used to represent any dataset type. 

307 field : `str` or `None` 

308 Column within the logical table, or `None` for dimension key 

309 columns. 

310 

311 Returns 

312 ------- 

313 name : `str` 

314 Fully-qualified name. 

315 """ 

316 return str(logical_table) if field is None else f"{logical_table}:{field}" 

317 

318 def get_column_spec( 

319 self, logical_table: AnyDatasetType | str, field: str | None 

320 ) -> column_spec.ColumnSpec: 

321 """Return a complete description of a column. 

322 

323 Parameters 

324 ---------- 

325 logical_table : `str` or ``ANY_DATASET`` 

326 Name of the dimension element or dataset type the column belongs 

327 to. ``ANY_DATASET`` is used to represent any dataset type. 

328 field : `str` or `None` 

329 Column within the logical table, or `None` for dimension key 

330 columns. 

331 

332 Returns 

333 ------- 

334 spec : `.column_spec.ColumnSpec` 

335 Description of the column. 

336 """ 

337 qualified_name = self.get_qualified_name(logical_table, field) 

338 if field is None: 

339 assert logical_table is not ANY_DATASET 

340 return self._dimensions.universe.dimensions[logical_table].primary_key.model_copy( 

341 update=dict(name=qualified_name) 

342 ) 

343 if logical_table in self._dimension_fields: 

344 assert logical_table is not ANY_DATASET # type: ignore[comparison-overlap] 

345 return ( 

346 self._dimensions.universe[logical_table] 

347 .schema.all[field] 

348 .model_copy(update=dict(name=qualified_name)) 

349 ) 

350 match field: 

351 case "dataset_id": 

352 return column_spec.UUIDColumnSpec.model_construct(name=qualified_name, nullable=False) 

353 case "ingest_date": 

354 return column_spec.DateTimeColumnSpec.model_construct(name=qualified_name) 

355 case "run": 

356 return column_spec.StringColumnSpec.model_construct( 

357 name=qualified_name, nullable=False, length=column_spec.COLLECTION_NAME_MAX_LENGTH 

358 ) 

359 case "collection": 

360 return column_spec.StringColumnSpec.model_construct( 

361 name=qualified_name, nullable=False, length=column_spec.COLLECTION_NAME_MAX_LENGTH 

362 ) 

363 case "timespan": 

364 return column_spec.TimespanColumnSpec.model_construct(name=qualified_name, nullable=True) 

365 raise AssertionError(f"Unrecognized column identifiers: {logical_table}, {field}.") 

366 

367 def _get_dimension_keys(self) -> Set[str]: 

368 if not self._removed_dimension_keys: 

369 return self._dimensions.names 

370 else: 

371 return self._dimensions.names - self._removed_dimension_keys 

372 

373 

374class ResultColumn(NamedTuple): 

375 """Defines a column that can be output from a query.""" 

376 

377 logical_table: AnyDatasetType | str 

378 """Dimension element name or dataset type name.""" 

379 

380 field: str | None 

381 """Column associated with the dimension element or dataset type, or `None` 

382 if it is a dimension key column.""" 

383 

384 def __str__(self) -> str: 

385 return str(self.logical_table) if self.field is None else f"{self.logical_table}.{self.field}" 

386 

387 

388class ColumnOrder: 

389 """Defines the position of columns within a result row and provides helper 

390 methods for accessing subsets of columns in a row. 

391 

392 Parameters 

393 ---------- 

394 dimension_keys : `~collections.abc.Iterable` [ `ResultColumn` ] 

395 Columns corresponding to dimension primary keys. 

396 dimension_elements : `~collections.abc.Iterable` [ `ResultColumn` ] 

397 Columns corresponding to DimensionElements and their DimensionRecord 

398 columns. 

399 dataset_fields : `~collections.abc.Iterable` [ `ResultColumn` ] 

400 Columns corresponding to dataset types and their fields. 

401 """ 

402 

403 def __init__( 

404 self, 

405 dimension_keys: Iterable[ResultColumn], 

406 dimension_elements: Iterable[ResultColumn], 

407 dataset_fields: Iterable[ResultColumn], 

408 ): 

409 self._dimension_keys = tuple(dimension_keys) 

410 self._dimension_elements = tuple(dimension_elements) 

411 self._dataset_fields = tuple(dataset_fields) 

412 

413 def columns(self) -> Iterator[ResultColumn]: 

414 # When editing this method, take care to update the other methods on 

415 # this object to correspond to the new order. 

416 yield from self._dimension_keys 

417 yield from self._dimension_elements 

418 yield from self._dataset_fields 

419 

420 @property 

421 def dimension_key_names(self) -> list[str]: 

422 """Return the names of the dimension key columns included in result 

423 rows, in the order they appear in the row. 

424 """ 

425 return [cast(str, column.logical_table) for column in self._dimension_keys] 

426 

427 def extract_dimension_key_columns(self, row: Sequence[DataIdValue]) -> Sequence[DataIdValue]: 

428 """Given a full result row, return just the dimension key columns. 

429 

430 Parameters 

431 ---------- 

432 row : `~collections.abc.Sequence` [ `DataIdValue` ] 

433 A row output by the SQL query associated with these columns. 

434 """ 

435 return row[: len(self._dimension_keys)]