Coverage for python / lsst / daf / butler / dimensions / _group.py: 39%

211 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DimensionGroup", "SerializedDimensionGroup") 

31 

32import itertools 

33from collections.abc import Iterable, Iterator, Mapping, Set 

34from types import MappingProxyType 

35from typing import TYPE_CHECKING, Any, TypeAlias 

36 

37import pydantic 

38from deprecated.sphinx import deprecated 

39from pydantic_core import core_schema 

40 

41from lsst.utils.classes import cached_getter, immutable 

42 

43from .. import pydantic_utils 

44from .._named import NamedValueAbstractSet, NamedValueSet 

45from .._topology import TopologicalFamily, TopologicalSpace 

46 

47if TYPE_CHECKING: # Imports needed only for type annotations; may be circular. 

48 from ._elements import DimensionElement 

49 from ._universe import DimensionUniverse 

50 

51 

52class SortedSequenceSet(Set[str]): 

53 """A set-like interface wrapper around a tuple. 

54 

55 This delegates directly to ``tuple.__contains__``, so there is an implicit 

56 assumption that `len` is small and hence O(N) lookups are not a problem, as 

57 is the case for sets of dimension names. 

58 

59 Parameters 

60 ---------- 

61 seq : `tuple` [`str`, ...] 

62 Strings to see the set. 

63 """ 

64 

65 def __init__(self, seq: tuple[str, ...]): 

66 self._seq = seq 

67 

68 __slots__ = ("_seq",) 

69 

70 def __contains__(self, x: object) -> bool: 

71 return x in self._seq 

72 

73 def __iter__(self) -> Iterator[str]: 

74 return iter(self._seq) 

75 

76 def __len__(self) -> int: 

77 return len(self._seq) 

78 

79 def __hash__(self) -> int: 

80 return hash(self._seq) 

81 

82 def __eq__(self, other: object) -> bool: 

83 if seq := getattr(other, "_seq", None): 

84 return seq == self._seq 

85 return super().__eq__(other) 

86 

87 @classmethod 

88 def _from_iterable(cls, iterable: Iterable[str]) -> set[str]: 

89 # This is used by collections.abc.Set mixin methods when they need 

90 # to return a new object (e.g. in `__and__`). 

91 return set(iterable) 

92 

93 def __repr__(self) -> str: 

94 return f"{{{', '.join(str(k) for k in self._seq)}}}" 

95 

96 def as_tuple(self) -> tuple[str, ...]: 

97 """Return the underlying tuple. 

98 

99 Returns 

100 ------- 

101 t : `tuple` 

102 A tuple of all the values. 

103 """ 

104 return self._seq 

105 

106 # TODO: remove on DM-45185 

107 @property 

108 @deprecated( 

109 "Deprecated in favor of direct iteration over the parent set. Will be removed after v28.", 

110 version="v28", 

111 category=FutureWarning, 

112 ) 

113 def names(self) -> Set[str]: 

114 """An alias to ``self``. 

115 

116 This is a backwards-compatibility API that allows `DimensionGroup` to 

117 mimic the old ``DimensionGraph`` object it replaced, by permitting 

118 expressions like ``x.required.names`` when ``x`` can be an object of 

119 either type. 

120 """ 

121 return self 

122 

123 

124@immutable 

125class DimensionGroup: # numpydoc ignore=PR02 

126 """An immutable, dependency-complete collection of dimensions. 

127 

128 `DimensionGroup` behaves in many respects like a set of `str` dimension 

129 names that maintains several special subsets and supersets of related 

130 dimension elements. It does not fully implement the `collections.abc.Set` 

131 interface, because it defines a few different iteration orders and does not 

132 privilege any one of them by implementing ``__iter__``. 

133 

134 Parameters 

135 ---------- 

136 universe : `DimensionUniverse` 

137 Object that manages all known dimensions. 

138 names : `~collections.abc.Iterable` of `str`, optional 

139 An iterable of the names of dimensions that must be included in the 

140 group. All (recursive) dependencies of these dimensions will also be 

141 included. At most one of ``dimensions`` and ``names`` must be 

142 provided. 

143 _conform : `bool`, optional 

144 If `True` (default), expand to include dependencies. `False` should 

145 only be used for callers that can guarantee that other arguments are 

146 already correctly expanded, and is for internal use only. 

147 

148 Notes 

149 ----- 

150 `DimensionGroup` should be used instead of other collections in most 

151 contexts where a collection of dimensions is required and a 

152 `DimensionUniverse` is available. Exceptions include cases where order 

153 matters (and is different from the consistent ordering defined by the 

154 `DimensionUniverse`), or complete `~collections.abc.Set` semantics are 

155 required. 

156 

157 This class is not a Pydantic model, but it implements the 

158 `__get_pydantic_core_schema__` special method and hence can be used as a 

159 field in Pydantic models or [de]serialized directly via 

160 `pydantic.TypeAdapter`, but validation requires a `DimensionUniverse` to be 

161 passed as the "universe" key in the Pydantic validation context. The 

162 `.pydantic_utils.DeferredValidation` class can be used to defer validation 

163 of this object or other types that use it until that context is available. 

164 """ 

165 

166 def __new__( 

167 cls, 

168 universe: DimensionUniverse, 

169 names: Iterable[str] | DimensionGroup = frozenset(), 

170 _conform: bool = True, 

171 ) -> DimensionGroup: 

172 if isinstance(names, DimensionGroup): 

173 if names.universe is universe: 

174 return names 

175 else: 

176 names = names.names 

177 if _conform: 

178 # Expand dimension names to include all required and implied 

179 # dependencies. 

180 to_expand = set(names) 

181 names = set() 

182 while to_expand: 

183 dimension = universe[to_expand.pop()] 

184 names.add(dimension.name) 

185 to_expand.update(dimension.required.names) 

186 to_expand.update(dimension.implied.names) 

187 to_expand.difference_update(names) 

188 else: 

189 names = frozenset(names) 

190 # Look in the cache of existing groups, with the expanded set of names. 

191 cache_key = frozenset(names) 

192 self = universe._cached_groups.get(cache_key) 

193 if self is not None: 

194 return self 

195 # This is apparently a new group. Create it, and add it to the cache. 

196 self = super().__new__(cls) 

197 self.universe = universe 

198 # Reorder dimensions by iterating over the universe (which is 

199 # ordered already) and extracting the ones in the set. 

200 self.names = SortedSequenceSet(tuple(d.name for d in universe.sorted(names))) 

201 # Make a set that includes both the dimensions and any 

202 # DimensionElements whose dependencies are in self.dimensions. 

203 self.elements = SortedSequenceSet( 

204 tuple(e.name for e in universe.elements if e.required.names <= self.names) 

205 ) 

206 self.governors = SortedSequenceSet( 

207 tuple(d for d in self.names if d in universe.governor_dimensions.names) 

208 ) 

209 self.skypix = SortedSequenceSet(tuple(d for d in self.names if d in universe.skypix_dimensions.names)) 

210 # Split dependencies up into "required" and "implied" subsets. 

211 # Note that a dimension may be required in one group and implied in 

212 # another. 

213 required: list[str] = [] 

214 implied: list[str] = [] 

215 for dim1 in self.names: 

216 for dim2 in self.names: 

217 if dim1 in universe[dim2].implied.names: 

218 implied.append(dim1) 

219 break 

220 else: 

221 # If no other dimension implies dim1, it's required. 

222 required.append(dim1) 

223 self.required = SortedSequenceSet(tuple(required)) 

224 self.implied = SortedSequenceSet(tuple(implied)) 

225 

226 self._space_families = MappingProxyType( 

227 { 

228 space: NamedValueSet( 

229 universe[e].topology[space] for e in self.elements if space in universe[e].topology 

230 ).freeze() 

231 for space in TopologicalSpace.__members__.values() 

232 } 

233 ) 

234 

235 # Build mappings from dimension to index; this is really for 

236 # DataCoordinate, but we put it in DimensionGroup because many (many!) 

237 # DataCoordinates will share the same DimensionGroup, and we want them 

238 # to be lightweight. The order here is what's convenient for 

239 # DataCoordinate: all required dimensions before all implied 

240 # dimensions. 

241 self._data_coordinate_indices = { 

242 name: i for i, name in enumerate(itertools.chain(self.required, self.implied)) 

243 } 

244 return universe._cached_groups.set_or_get(cache_key, self) 

245 

246 def __getnewargs__(self) -> tuple: 

247 return (self.universe, self.names._seq, False) 

248 

249 def __deepcopy__(self, memo: dict) -> DimensionGroup: 

250 # DimensionGroup is recursively immutable; see note in @immutable 

251 # decorator. 

252 return self 

253 

254 def __len__(self) -> int: 

255 return len(self.names) 

256 

257 def __contains__(self, element: str) -> bool: 

258 if element in self.elements: 

259 return True 

260 else: 

261 from ._elements import DimensionElement 

262 

263 if isinstance(element, DimensionElement): # type: ignore[unreachable] 

264 raise TypeError( 

265 "DimensionGroup does not support membership tests using DimensionElement " 

266 "instances; use their names instead." 

267 ) 

268 return False 

269 

270 def __str__(self) -> str: 

271 return str(self.names) 

272 

273 def __repr__(self) -> str: 

274 return f"DimensionGroup({self.names})" 

275 

276 # TODO: remove on DM-45185 

277 @deprecated( 

278 "Deprecated as no longer necessary (this method always returns 'self'). Will be removed after v28.", 

279 version="v28", 

280 category=FutureWarning, 

281 ) 

282 def as_group(self) -> DimensionGroup: 

283 """Return ``self``. 

284 

285 Returns 

286 ------- 

287 group : `DimensionGroup` 

288 Returns itself. 

289 

290 Notes 

291 ----- 

292 This is a backwards-compatibility API that allowed both the old 

293 ``DimensionGraph`` class and `DimensionGroup` to be coerced to the 

294 latter. 

295 """ 

296 return self 

297 

298 def isdisjoint(self, other: DimensionGroup) -> bool: 

299 """Test whether the intersection of two groups is empty. 

300 

301 Parameters 

302 ---------- 

303 other : `DimensionGroup` 

304 Other group to compare with. 

305 

306 Returns 

307 ------- 

308 is_disjoin : `bool` 

309 Returns `True` if either operand is the empty. 

310 """ 

311 return self.names.isdisjoint(other.names) 

312 

313 def issubset(self, other: DimensionGroup) -> bool: 

314 """Test whether all dimensions in ``self`` are also in ``other``. 

315 

316 Parameters 

317 ---------- 

318 other : `DimensionGroup` 

319 Other group to compare with. 

320 

321 Returns 

322 ------- 

323 is_subset : `bool` 

324 Returns `True` if ``self`` is empty. 

325 """ 

326 return self.names <= other.names 

327 

328 def issuperset(self, other: DimensionGroup) -> bool: 

329 """Test whether all dimensions in ``other`` are also in ``self``. 

330 

331 Parameters 

332 ---------- 

333 other : `DimensionGroup` 

334 Other group to compare with. 

335 

336 Returns 

337 ------- 

338 is_superset : `bool` 

339 Returns `True` if ``other`` is empty. 

340 """ 

341 return self.names >= other.names 

342 

343 def __eq__(self, other: Any) -> bool: 

344 if isinstance(other, DimensionGroup): 

345 return self.names == other.names 

346 else: 

347 return False 

348 

349 def __hash__(self) -> int: 

350 return hash(self.required._seq) 

351 

352 def __le__(self, other: DimensionGroup) -> bool: 

353 return self.names <= other.names 

354 

355 def __ge__(self, other: DimensionGroup) -> bool: 

356 return self.names >= other.names 

357 

358 def __lt__(self, other: DimensionGroup) -> bool: 

359 return self.names < other.names 

360 

361 def __gt__(self, other: DimensionGroup) -> bool: 

362 return self.names > other.names 

363 

364 def union(*operands: DimensionGroup, universe: DimensionUniverse | None = None) -> DimensionGroup: 

365 """Construct a new group with all dimensions in any of the operands. 

366 

367 Parameters 

368 ---------- 

369 *operands : `DimensionGroup` 

370 Groups to union. 

371 universe : `DimensionUniverse`, optional 

372 Universe to use to create an empty universe when no operands are 

373 provided (i.e. when this method is called on the class). 

374 

375 Returns 

376 ------- 

377 union : `DimensionGroup` 

378 Union of all the groups. 

379 

380 Notes 

381 ----- 

382 The elements of the returned group may exceed the naive union of their 

383 elements, as some dimension elements are included in groups whenever 

384 multiple dimensions are present, and those dependency dimensions could 

385 have been provided by different operands. 

386 """ 

387 names = set().union(*[operand.names for operand in operands]) 

388 if universe is None: 

389 try: 

390 universe = operands[0].universe 

391 except IndexError: 

392 raise TypeError( 

393 "'universe' must be provided when 'union' is called with an empty iterable." 

394 ) from None 

395 return DimensionGroup(universe, names) 

396 

397 def intersection(self, *others: DimensionGroup) -> DimensionGroup: 

398 """Construct a new group with only dimensions in all of the operands. 

399 

400 Parameters 

401 ---------- 

402 *others : `DimensionGroup` 

403 Other groups to compare with. 

404 

405 Returns 

406 ------- 

407 inter : `DimensionGroup` 

408 Intersection of all the groups. 

409 

410 Notes 

411 ----- 

412 See also `union`. 

413 """ 

414 names = set(self.names).intersection(*[other.names for other in others]) 

415 return DimensionGroup(self.universe, names=names) 

416 

417 def difference(self, other: DimensionGroup) -> DimensionGroup: 

418 """Construct a new group with dimensions that are in ``self`` but not 

419 ``other`` OR dependencies of those in ``self`` but not in ``other``. 

420 

421 Parameters 

422 ---------- 

423 other : `DimensionGroup` 

424 Other group to compare with. 

425 

426 Returns 

427 ------- 

428 diff : `DimensionGroup` 

429 Difference of the two groups. 

430 

431 Notes 

432 ----- 

433 This is not exactly equivalent to a true `set` difference, because the 

434 result must be expanded to include required and implied dependencies, 

435 and those may be common to ``self`` and ``other``. 

436 """ 

437 return DimensionGroup(self.universe, names=self.names - other.names) 

438 

439 def __or__(self, other: DimensionGroup) -> DimensionGroup: 

440 return self.union(other) 

441 

442 def __and__(self, other: DimensionGroup) -> DimensionGroup: 

443 return self.intersection(other) 

444 

445 def __sub__(self, other: DimensionGroup) -> DimensionGroup: 

446 return self.difference(other) 

447 

448 @property 

449 def data_coordinate_keys(self) -> Set[str]: 

450 """A set of dimensions ordered like `DataCoordinate.mapping`. 

451 

452 This order is defined as all required dimensions followed by all 

453 implied dimensions. 

454 """ 

455 return self._data_coordinate_indices.keys() 

456 

457 @property 

458 @cached_getter 

459 def lookup_order(self) -> tuple[str, ...]: 

460 """A tuple of all elements in the order needed to find their records. 

461 

462 Unlike the table definition/topological order (which is what 

463 `DimensionUniverse.sorted` gives you), when dimension A implies 

464 dimension B, dimension A appears first. 

465 """ 

466 done: set[str] = set() 

467 order: list[str] = [] 

468 

469 def add_to_order(element: DimensionElement) -> None: 

470 if element.name in done: 

471 return 

472 predecessors = set(element.required.names) 

473 predecessors.discard(element.name) 

474 if not done.issuperset(predecessors): 

475 return 

476 order.append(element.name) 

477 done.add(element.name) 

478 for other in element.implied: 

479 add_to_order(other) 

480 

481 while not done.issuperset(self.required): 

482 for dimension in self.required: 

483 add_to_order(self.universe[dimension]) 

484 

485 order.extend(element for element in self.elements if element not in done) 

486 return tuple(order) 

487 

488 def _choose_dimension(self, families: NamedValueAbstractSet[TopologicalFamily]) -> str | None: 

489 if len(families) != 1: 

490 return None 

491 return list(families)[0].choose(self).name 

492 

493 @property 

494 def region_dimension(self) -> str | None: 

495 """Return the most appropriate spatial dimension to use when looking 

496 up a region. 

497 

498 Returns `None` if there are no appropriate dimensions or more than one 

499 spatial family. 

500 """ 

501 return self._choose_dimension(self.spatial) 

502 

503 @property 

504 def timespan_dimension(self) -> str | None: 

505 """Return the most appropriate temporal dimension to use when looking 

506 up a time span. 

507 

508 Returns `None` if there are no appropriate dimensions or more than one 

509 temporal family. 

510 """ 

511 return self._choose_dimension(self.temporal) 

512 

513 @property 

514 def spatial(self) -> NamedValueAbstractSet[TopologicalFamily]: 

515 """Families represented by the spatial elements in this graph.""" 

516 return self._space_families[TopologicalSpace.SPATIAL] 

517 

518 @property 

519 def temporal(self) -> NamedValueAbstractSet[TopologicalFamily]: 

520 """Families represented by the temporal elements in this graph.""" 

521 return self._space_families[TopologicalSpace.TEMPORAL] 

522 

523 # Class attributes below are shadowed by instance attributes, and are 

524 # present just to hold the docstrings for those instance attributes. 

525 

526 universe: DimensionUniverse 

527 """The set of all known dimensions, of which this group is a subset 

528 (`DimensionUniverse`). 

529 """ 

530 

531 names: SortedSequenceSet 

532 """A true `~collections.abc.Set` of the dimension names. 

533 

534 Iteration order is consist with `DimensionUniverse.sorted`: each dimension 

535 is preceded by its required and implied dependencies. 

536 """ 

537 

538 elements: SortedSequenceSet 

539 """A true `~collections.abc.Set` of all dimension element names in the 

540 group; a superset of `dimensions`. 

541 """ 

542 

543 governors: SortedSequenceSet 

544 """A true `~collections.abc.Set` of all governor dimension names in the 

545 group. 

546 """ 

547 

548 skypix: SortedSequenceSet 

549 """A true `~collections.abc.Set` of all skypix dimension names in the 

550 group. 

551 """ 

552 

553 required: SortedSequenceSet 

554 """The dimensions that must be directly identified via their primary keys 

555 in a data ID in order to identify the rest of the elements in the group. 

556 """ 

557 

558 implied: SortedSequenceSet 

559 """The dimensions that need not be directly identified via their primary 

560 keys in a data ID. 

561 """ 

562 

563 _space_families: Mapping[TopologicalSpace, NamedValueAbstractSet[TopologicalFamily]] 

564 """Families of elements in this graph that exist in topological spaces 

565 relationships (`~collections.abc.Mapping` from `TopologicalSpace` to 

566 `NamedValueAbstractSet` of `TopologicalFamily`). 

567 """ 

568 

569 _data_coordinate_indices: dict[str, int] 

570 

571 @classmethod 

572 def _validate(cls, data: Any, info: pydantic.ValidationInfo) -> DimensionGroup: 

573 """Pydantic validator (deserializer) for `DimensionGroup`. 

574 

575 This satisfies the `pydantic.WithInfoPlainValidatorFunction` signature. 

576 """ 

577 universe = pydantic_utils.get_universe_from_context(info.context) 

578 return cls.from_simple(data, universe) 

579 

580 @classmethod 

581 def from_simple(cls, data: SerializedDimensionGroup, universe: DimensionUniverse) -> DimensionGroup: 

582 """Create an instance of this class from serialized data. 

583 

584 Parameters 

585 ---------- 

586 data : `SerializedDimensionGroup` 

587 Serialized data from a previous call to ``to_simple``. 

588 universe : `DimensionUniverse` 

589 Dimension universe in which this dimension group will be defined. 

590 """ 

591 return universe.conform(data) 

592 

593 def to_simple(self) -> SerializedDimensionGroup: 

594 """Convert this class to a simple data format suitable for 

595 serialization. 

596 """ 

597 return list(self.names) 

598 

599 @classmethod 

600 def __get_pydantic_core_schema__( 

601 cls, source_type: Any, handler: pydantic.GetCoreSchemaHandler 

602 ) -> core_schema.CoreSchema: 

603 # This is the Pydantic hook for overriding serialization, validation, 

604 # and JSON schema generation. 

605 list_of_str_schema = core_schema.list_schema(core_schema.str_schema()) 

606 from_list_of_str_schema = core_schema.chain_schema( 

607 [list_of_str_schema, core_schema.with_info_plain_validator_function(cls._validate)] 

608 ) 

609 return core_schema.json_or_python_schema( 

610 # When deserializing from JSON, expect it to look like list[str]. 

611 json_schema=from_list_of_str_schema, 

612 # When deserializing from Python, first see if it's already a 

613 # DimensionGroup and then try conversion from list[str]. 

614 python_schema=core_schema.union_schema( 

615 [core_schema.is_instance_schema(DimensionGroup), from_list_of_str_schema] 

616 ), 

617 # When serializing convert it to a `list[str]`. 

618 serialization=core_schema.plain_serializer_function_ser_schema( 

619 cls.to_simple, return_schema=list_of_str_schema 

620 ), 

621 ) 

622 

623 

624SerializedDimensionGroup: TypeAlias = list[str]