Coverage for python / lsst / daf / butler / dimensions / _record_set.py: 22%

283 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "DimensionDataAttacher", 

32 "DimensionDataExtractor", 

33 "DimensionRecordFactory", 

34 "DimensionRecordSet", 

35 "DimensionRecordSetDeserializer", 

36 "SerializableDimensionData", 

37) 

38 

39import dataclasses 

40from collections.abc import Collection, Iterable, Iterator 

41from typing import TYPE_CHECKING, Any, Protocol, Self, TypeAlias, final 

42 

43import pydantic 

44 

45from ._coordinate import DataCoordinate, DataIdValue 

46from ._records import DimensionRecord, SerializedKeyValueDimensionRecord 

47 

48if TYPE_CHECKING: 

49 from ..queries import Query 

50 from ._elements import DimensionElement 

51 from ._group import DimensionGroup 

52 from ._skypix import SkyPixDimension 

53 from ._universe import DimensionUniverse 

54 from .record_cache import DimensionRecordCache 

55 

56 

57SerializedDimensionRecordSetMapping: TypeAlias = dict[str, list[SerializedKeyValueDimensionRecord]] 

58 

59 

60class DimensionRecordFactory(Protocol): 

61 """Protocol for a callback that can be used to create a dimension record 

62 to add to a `DimensionRecordSet` when a search for an existing one fails. 

63 """ 

64 

65 def __call__( 

66 self, record_class: type[DimensionRecord], required_values: tuple[DataIdValue, ...] 

67 ) -> DimensionRecord: 

68 """Make a new `DimensionRecord` instance. 

69 

70 Parameters 

71 ---------- 

72 record_class : `type` [ `DimensionRecord` ] 

73 A concrete `DimensionRecord` subclass. 

74 required_values : `tuple` 

75 Tuple of data ID values, corresponding to 

76 ``record_class.definition.required``. 

77 """ 

78 ... # pragma: no cover 

79 

80 

81def fail_record_lookup( 

82 record_class: type[DimensionRecord], required_values: tuple[DataIdValue, ...] 

83) -> DimensionRecord: 

84 """Raise `LookupError` to indicate that a `DimensionRecord` could not be 

85 found or created. 

86 

87 This is intended for use as the default value for arguments that take a 

88 `DimensionRecordFactory` callback. 

89 

90 Parameters 

91 ---------- 

92 record_class : `type` [ `DimensionRecord` ] 

93 Type of record to create. 

94 required_values : `tuple` 

95 Tuple of data ID required values that are sufficient to identify a 

96 record that exists in the data repository. 

97 

98 Returns 

99 ------- 

100 record : `DimensionRecord` 

101 Never returned; this function always raises `LookupError`. 

102 """ 

103 raise LookupError( 

104 f"No {record_class.definition.name!r} record with data ID " 

105 f"{DataCoordinate.from_required_values(record_class.definition.minimal_group, required_values)}." 

106 ) 

107 

108 

109@final 

110class DimensionRecordSet(Collection[DimensionRecord]): # numpydoc ignore=PR01 

111 """A mutable set-like container specialized for `DimensionRecord` objects. 

112 

113 Parameters 

114 ---------- 

115 element : `DimensionElement` or `str`, optional 

116 The dimension element that defines the records held by this set. If 

117 not a `DimensionElement` instance, ``universe`` must be provided. 

118 records : `~collections.abc.Iterable` [ `DimensionRecord` ], optional 

119 Dimension records to add to the set. 

120 universe : `DimensionUniverse`, optional 

121 Object that defines all dimensions. Ignored if ``element`` is a 

122 `DimensionElement` instance. 

123 

124 Notes 

125 ----- 

126 `DimensionRecordSet` maintains its insertion order (like `dict`, and unlike 

127 `set`). 

128 

129 `DimensionRecordSet` implements `collections.abc.Collection` but not 

130 `collections.abc.Set` because the latter would require interoperability 

131 with all other `~collections.abc.Set` implementations rather than just 

132 `DimensionRecordSet`, and that adds a lot of complexity without much clear 

133 value. To help make this clear to type checkers it implements only the 

134 named-method versions of these operations (e.g. `issubset`) rather than the 

135 operator special methods (e.g. ``__le__``). 

136 

137 `DimensionRecord` equality is defined in terms of a record's data ID fields 

138 only, and `DimensionRecordSet` does not generally specify which record 

139 "wins" when two records with the same data ID interact (e.g. in 

140 `intersection`). The `add` and `update` methods are notable exceptions: 

141 they always replace the existing record with the new one. 

142 

143 Dimension records can also be held by `DimensionRecordTable`, which 

144 provides column-oriented access and Arrow interoperability. 

145 """ 

146 

147 def __init__( 

148 self, 

149 element: DimensionElement | str, 

150 records: Iterable[DimensionRecord] = (), 

151 universe: DimensionUniverse | None = None, 

152 *, 

153 _by_required_values: dict[tuple[DataIdValue, ...], DimensionRecord] | None = None, 

154 ): 

155 if isinstance(element, str): 

156 if universe is None: 

157 raise TypeError("'universe' must be provided if 'element' is not a DimensionElement.") 

158 element = universe[element] 

159 else: 

160 universe = element.universe 

161 if _by_required_values is None: 

162 _by_required_values = {} 

163 self._record_type = element.RecordClass 

164 self._by_required_values = _by_required_values 

165 self._dimensions = element.minimal_group 

166 self.update(records) 

167 

168 @property 

169 def element(self) -> DimensionElement: 

170 """Name of the dimension element these records correspond to.""" 

171 return self._record_type.definition 

172 

173 def __contains__(self, key: object) -> bool: 

174 match key: 

175 case DimensionRecord() if key.definition == self.element: 

176 required_values = key.dataId.required_values 

177 case DataCoordinate() if key.dimensions == self.element.minimal_group: 

178 required_values = key.required_values 

179 case _: 

180 return False 

181 return required_values in self._by_required_values 

182 

183 def __len__(self) -> int: 

184 return len(self._by_required_values) 

185 

186 def __iter__(self) -> Iterator[DimensionRecord]: 

187 return iter(self._by_required_values.values()) 

188 

189 def __eq__(self, other: object) -> bool: 

190 if not isinstance(other, DimensionRecordSet): 

191 return False 

192 return ( 

193 self._record_type is other._record_type 

194 and self._by_required_values.keys() == other._by_required_values.keys() 

195 ) 

196 

197 def __repr__(self) -> str: 

198 lines = [f"DimensionRecordSet({self.element.name}, {{"] 

199 for record in self: 

200 lines.append(f" {record!r},") 

201 lines.append("})") 

202 return "\n".join(lines) 

203 

204 def issubset(self, other: DimensionRecordSet) -> bool: 

205 """Test whether all elements in ``self`` are in ``other``. 

206 

207 Parameters 

208 ---------- 

209 other : `DimensionRecordSet` 

210 Another record set with the same record type. 

211 

212 Returns 

213 ------- 

214 issubset ; `bool` 

215 Whether all elements in ``self`` are in ``other``. 

216 """ 

217 if self._record_type is not other._record_type: 

218 raise ValueError( 

219 "Invalid comparison between dimension record sets for elements " 

220 f"{self.element.name!r} and {other.element.name!r}." 

221 ) 

222 return self._by_required_values.keys() <= other._by_required_values.keys() 

223 

224 def issuperset(self, other: DimensionRecordSet) -> bool: 

225 """Test whether all elements in ``other`` are in ``self``. 

226 

227 Parameters 

228 ---------- 

229 other : `DimensionRecordSet` 

230 Another record set with the same record type. 

231 

232 Returns 

233 ------- 

234 issuperset ; `bool` 

235 Whether all elements in ``other`` are in ``self``. 

236 """ 

237 if self._record_type is not other._record_type: 

238 raise ValueError( 

239 "Invalid comparison between dimension record sets for elements " 

240 f"{self.element.name!r} and {other.element.name!r}." 

241 ) 

242 return self._by_required_values.keys() >= other._by_required_values.keys() 

243 

244 def isdisjoint(self, other: DimensionRecordSet) -> bool: 

245 """Test whether the intersection of ``self`` and ``other`` is empty. 

246 

247 Parameters 

248 ---------- 

249 other : `DimensionRecordSet` 

250 Another record set with the same record type. 

251 

252 Returns 

253 ------- 

254 isdisjoint ; `bool` 

255 Whether the intersection of ``self`` and ``other`` is empty. 

256 """ 

257 if self._record_type is not other._record_type: 

258 raise ValueError( 

259 "Invalid comparison between dimension record sets for elements " 

260 f"{self.element.name!r} and {other.element.name!r}." 

261 ) 

262 return self._by_required_values.keys().isdisjoint(other._by_required_values.keys()) 

263 

264 def intersection(self, other: DimensionRecordSet) -> DimensionRecordSet: 

265 """Return a new set with only records that are in both ``self`` and 

266 ``other``. 

267 

268 Parameters 

269 ---------- 

270 other : `DimensionRecordSet` 

271 Another record set with the same record type. 

272 

273 Returns 

274 ------- 

275 intersection : `DimensionRecordSet` 

276 A new record set with all elements in both sets. 

277 """ 

278 if self._record_type is not other._record_type: 

279 raise ValueError( 

280 "Invalid intersection between dimension record sets for elements " 

281 f"{self.element.name!r} and {other.element.name!r}." 

282 ) 

283 return DimensionRecordSet( 

284 self.element, 

285 _by_required_values={ 

286 k: v for k, v in self._by_required_values.items() if k in other._by_required_values 

287 }, 

288 ) 

289 

290 def difference(self, other: DimensionRecordSet) -> DimensionRecordSet: 

291 """Return a new set with only records that are in ``self`` and not in 

292 ``other``. 

293 

294 Parameters 

295 ---------- 

296 other : `DimensionRecordSet` 

297 Another record set with the same record type. 

298 

299 Returns 

300 ------- 

301 difference : `DimensionRecordSet` 

302 A new record set with all elements ``self`` that are not in 

303 ``other``. 

304 """ 

305 if self._record_type is not other._record_type: 

306 raise ValueError( 

307 "Invalid difference between dimension record sets for elements " 

308 f"{self.element.name!r} and {other.element.name!r}." 

309 ) 

310 return DimensionRecordSet( 

311 self.element, 

312 _by_required_values={ 

313 k: v for k, v in self._by_required_values.items() if k not in other._by_required_values 

314 }, 

315 ) 

316 

317 def union(self, other: DimensionRecordSet) -> DimensionRecordSet: 

318 """Return a new set with all records that are either in ``self`` or 

319 ``other``. 

320 

321 Parameters 

322 ---------- 

323 other : `DimensionRecordSet` 

324 Another record set with the same record type. 

325 

326 Returns 

327 ------- 

328 intersection : `DimensionRecordSet` 

329 A new record set with all elements in either set. 

330 """ 

331 if self._record_type is not other._record_type: 

332 raise ValueError( 

333 "Invalid union between dimension record sets for elements " 

334 f"{self.element.name!r} and {other.element.name!r}." 

335 ) 

336 return DimensionRecordSet( 

337 self.element, 

338 _by_required_values=self._by_required_values | other._by_required_values, 

339 ) 

340 

341 def find( 

342 self, 

343 data_id: DataCoordinate, 

344 or_add: DimensionRecordFactory = fail_record_lookup, 

345 ) -> DimensionRecord: 

346 """Return the record with the given data ID. 

347 

348 Parameters 

349 ---------- 

350 data_id : `DataCoordinate` 

351 Data ID to match. 

352 or_add : `DimensionRecordFactory` 

353 Callback that is invoked if no existing record is found, to create 

354 a new record that is added to the set and returned. The return 

355 value of this callback is *not* checked to see if it is a valid 

356 dimension record with the right element and data ID. 

357 

358 Returns 

359 ------- 

360 record : `DimensionRecord` 

361 Matching record. 

362 

363 Raises 

364 ------ 

365 KeyError 

366 Raised if no record with this data ID was found. 

367 ValueError 

368 Raised if the data ID did not have the right dimensions. 

369 """ 

370 if data_id.dimensions != self._dimensions: 

371 raise ValueError( 

372 f"data ID {data_id} has incorrect dimensions for dimension records for {self.element!r}." 

373 ) 

374 return self.find_with_required_values(data_id.required_values, or_add) 

375 

376 def find_with_required_values( 

377 self, required_values: tuple[DataIdValue, ...], or_add: DimensionRecordFactory = fail_record_lookup 

378 ) -> DimensionRecord: 

379 """Return the record whose data ID has the given required values. 

380 

381 Parameters 

382 ---------- 

383 required_values : `tuple` [ `int` or `str` ] 

384 Data ID values to match. 

385 or_add : `DimensionRecordFactory` 

386 Callback that is invoked if no existing record is found, to create 

387 a new record that is added to the set and returned. The return 

388 value of this callback is *not* checked to see if it is a valid 

389 dimension record with the right element and data ID. 

390 

391 Returns 

392 ------- 

393 record : `DimensionRecord` 

394 Matching record. 

395 

396 Raises 

397 ------ 

398 ValueError 

399 Raised if the data ID did not have the right dimensions. 

400 """ 

401 if (result := self._by_required_values.get(required_values)) is None: 

402 result = or_add(self._record_type, required_values) 

403 self._by_required_values[required_values] = result 

404 return result 

405 

406 def add(self, value: DimensionRecord, replace: bool = True) -> None: 

407 """Add a new record to the set. 

408 

409 Parameters 

410 ---------- 

411 value : `DimensionRecord` 

412 Record to add. 

413 replace : `bool`, optional 

414 If `True` (default) replace any existing record with the same data 

415 ID. If `False` the existing record will be kept. 

416 

417 Raises 

418 ------ 

419 ValueError 

420 Raised if ``value.element != self.element``. 

421 """ 

422 if value.definition.name != self.element: 

423 raise ValueError( 

424 f"Cannot add record {value} for {value.definition.name!r} to set for {self.element!r}." 

425 ) 

426 if replace: 

427 self._by_required_values[value.dataId.required_values] = value 

428 else: 

429 self._by_required_values.setdefault(value.dataId.required_values, value) 

430 

431 def update(self, values: Iterable[DimensionRecord], replace: bool = True) -> None: 

432 """Add new records to the set. 

433 

434 Parameters 

435 ---------- 

436 values : `~collections.abc.Iterable` [ `DimensionRecord` ] 

437 Records to add. 

438 replace : `bool`, optional 

439 If `True` (default) replace any existing records with the same data 

440 IDs. If `False` the existing records will be kept. 

441 

442 Raises 

443 ------ 

444 ValueError 

445 Raised if ``value.element != self.element``. 

446 """ 

447 for value in values: 

448 self.add(value, replace=replace) 

449 

450 def update_from_data_coordinates(self, data_coordinates: Iterable[DataCoordinate]) -> None: 

451 """Add records to the set by extracting and deduplicating them from 

452 data coordinates. 

453 

454 Parameters 

455 ---------- 

456 data_coordinates : `~collections.abc.Iterable` [ `DataCoordinate` ] 

457 Data coordinates to extract from. `DataCoordinate.hasRecords` must 

458 be `True`. 

459 """ 

460 for data_coordinate in data_coordinates: 

461 if record := data_coordinate._record(self.element.name): 

462 self._by_required_values[record.dataId.required_values] = record 

463 

464 def discard(self, value: DimensionRecord | DataCoordinate) -> None: 

465 """Remove a record if it exists. 

466 

467 Parameters 

468 ---------- 

469 value : `DimensionRecord` or `DataCoordinate` 

470 Record to remove, or its data ID. 

471 """ 

472 if isinstance(value, DimensionRecord): 

473 value = value.dataId 

474 if value.dimensions != self._dimensions: 

475 raise ValueError(f"{value} has incorrect dimensions for dimension records for {self.element!r}.") 

476 self._by_required_values.pop(value.required_values, None) 

477 

478 def remove(self, value: DimensionRecord | DataCoordinate) -> None: 

479 """Remove a record. 

480 

481 Parameters 

482 ---------- 

483 value : `DimensionRecord` or `DataCoordinate` 

484 Record to remove, or its data ID. 

485 

486 Raises 

487 ------ 

488 KeyError 

489 Raised if there is no matching record. 

490 """ 

491 if isinstance(value, DimensionRecord): 

492 value = value.dataId 

493 if value.dimensions != self._dimensions: 

494 raise ValueError(f"{value} has incorrect dimensions for dimension records for {self.element!r}.") 

495 del self._by_required_values[value.required_values] 

496 

497 def pop(self) -> DimensionRecord: 

498 """Remove and return an arbitrary record.""" 

499 return self._by_required_values.popitem()[1] 

500 

501 def __deepcopy__(self, memo: dict[str, Any]) -> DimensionRecordSet: 

502 return DimensionRecordSet(self.element, _by_required_values=self._by_required_values.copy()) 

503 

504 def serialize_records(self) -> list[SerializedKeyValueDimensionRecord]: 

505 """Serialize the records to a list. 

506 

507 Returns 

508 ------- 

509 raw_records : `list` [ `list` ] 

510 Serialized records, in the form returned by 

511 `DimensionRecord.serialize_key_value`. 

512 

513 Notes 

514 ----- 

515 This does not include the dimension element shared by all of the 

516 records, on the assumption that this is usually more conveniently saved 

517 separately (e.g. as the key of a dictionary of which the list of 

518 records is a value). 

519 """ 

520 return [record.serialize_key_value() for record in self] 

521 

522 def deserialize_records(self, raw_records: Iterable[SerializedKeyValueDimensionRecord]) -> None: 

523 """Deserialize records and add them to this set. 

524 

525 Parameters 

526 ---------- 

527 raw_records : `~collections.abc.Iterable` [ `list` ] 

528 Serialized records, as returned by `serialize_records` or repeated 

529 calls to `DimensionRecord.serialize_key_value`. 

530 

531 Notes 

532 ----- 

533 The caller is responsible for ensuring that the serialized records have 

534 the same dimension element as this set, as this cannot be checked. 

535 Mismatches will probably result in a (confusing) type-validation error, 

536 but are not guaranteed to. 

537 """ 

538 deserializer = DimensionRecordSetDeserializer.from_raw(self.element, raw_records) 

539 self.update(deserializer) 

540 

541 

542class DimensionRecordSetDeserializer: 

543 """A helper class for deserializing sets of dimension records, with support 

544 for only fully deserializing certain records. 

545 

546 The `from_raw` factory method should generally be used instead of calling 

547 the constructor directly. 

548 

549 Parameters 

550 ---------- 

551 element : `DimensionElement` 

552 Dimension element that defines all records. 

553 mapping : `dict` [ `tuple`, `list` ] 

554 A dictionary that maps the data ID required-values `tuple` for reach 

555 record to the remainder of its raw serialization (i.e. an item in this 

556 `dict` is a pair returned by `DimensionRecord.deserialize_key`). This 

557 `dict` will be used directly to back the deserializer, not copied. 

558 

559 Notes 

560 ----- 

561 The keys (data ID required-values tuples) of all rows are deserialized 

562 immediately, but the remaining fields are deserialized only on demand; use 

563 `__iter__` to deserialize all records or `__getitem__` to deserialize only 

564 a few. An instance should really only be used for a single iteration or 

565 multiple `__getitem__` calls, as each call will re-deserialize the records 

566 in play; deserialized records are not cached. 

567 

568 The caller is responsible for ensuring that the serialized records are for 

569 the given dimension element, as this cannot be checked. Mismatches will 

570 probably result in a (confusing) type-validation error, but are not 

571 guaranteed to. 

572 """ 

573 

574 def __init__( 

575 self, 

576 element: DimensionElement, 

577 mapping: dict[tuple[DataIdValue, ...], SerializedKeyValueDimensionRecord], 

578 ): 

579 self.element = element 

580 self._mapping = mapping 

581 

582 @classmethod 

583 def from_raw( 

584 cls, element: DimensionElement, raw_records: Iterable[SerializedKeyValueDimensionRecord] 

585 ) -> Self: 

586 """Construct from raw serialized records. 

587 

588 Parameters 

589 ---------- 

590 element : `DimensionElement` 

591 Dimension element that defines all records. 

592 raw_records : `~collections.abc.Iterable` [ `list` ] 

593 Serialized records, as returned by 

594 `DimensionRecordSet.serialize_records` or repeated calls to 

595 `DimensionRecord.serialize_key_value`. 

596 

597 Returns 

598 ------- 

599 deserializer : `DimensionRecordSetDeserializer` 

600 New deserializer instance. 

601 """ 

602 return cls(element=element, mapping=dict(map(element.RecordClass.deserialize_key, raw_records))) 

603 

604 def __len__(self) -> int: 

605 return len(self._mapping) 

606 

607 def __iter__(self) -> Iterator[DimensionRecord]: 

608 deserialize = self.element.RecordClass.deserialize_value 

609 return (deserialize(k, v) for k, v in self._mapping.items()) 

610 

611 def __getitem__(self, key: tuple[DataIdValue, ...]) -> DimensionRecord: 

612 return self.element.RecordClass.deserialize_value(key, self._mapping[key]) 

613 

614 

615@dataclasses.dataclass 

616class DimensionDataExtractor: 

617 """A helper class for extracting dimension records from expanded data IDs 

618 (e.g. for normalized serialization). 

619 

620 Instances of this class must be initialized with empty sets (usually by one 

621 of the class method factories) with all of the dimension elements that 

622 should be extracted from the data IDs passed to `update_homogeneous` or 

623 `update_heterogeneous`. Dimension elements not included will not be 

624 extracted (which may be useful). 

625 """ 

626 

627 records: dict[str, DimensionRecordSet] = dataclasses.field(default_factory=dict) 

628 

629 @classmethod 

630 def from_element_names( 

631 cls, element_names: Iterable[str], universe: DimensionUniverse 

632 ) -> DimensionDataExtractor: 

633 """Construct from an iterable of dimension element names. 

634 

635 Parameters 

636 ---------- 

637 element_names : `~collections.abc.Iterable` [ `str` ] 

638 Names of dimension elements to include. 

639 universe : `DimensionUniverse` 

640 Definitions of all dimensions. 

641 

642 Returns 

643 ------- 

644 extractor : `DimensionDataExtractor` 

645 New extractor. 

646 """ 

647 return cls( 

648 records={ 

649 element_name: DimensionRecordSet(element_name, universe=universe) 

650 for element_name in element_names 

651 } 

652 ) 

653 

654 @classmethod 

655 def from_dimension_group( 

656 cls, 

657 dimensions: DimensionGroup, 

658 *, 

659 ignore: Iterable[str] = (), 

660 ignore_cached: bool = False, 

661 include_skypix: bool = False, 

662 ) -> DimensionDataExtractor: 

663 """Construct from a `DimensionGroup` and a set of dimension element 

664 names to ignore. 

665 

666 Parameters 

667 ---------- 

668 dimensions : `DimensionGroup` 

669 Dimensions that span the set of elements whose elements are to be 

670 extracted. 

671 ignore : `~collections.abc.Iterable` [ `str` ], optional 

672 Names of dimension elements that should not be extracted. 

673 ignore_cached : `bool`, optional 

674 If `True`, ignore all dimension elements for which 

675 `DimensionElement.is_cached` is `True`. 

676 include_skypix : `bool`, optional 

677 If `True`, include skypix dimensions. These are ignored by default 

678 because they can always be recomputed from their IDs on-the-fly. 

679 

680 Returns 

681 ------- 

682 extractor : `DimensionDataExtractor` 

683 New extractor. 

684 """ 

685 elements = set(dimensions.elements) 

686 elements.difference_update(ignore) 

687 if ignore_cached: 

688 elements.difference_update([e for e in elements if dimensions.universe[e].is_cached]) 

689 if not include_skypix: 

690 elements.difference_update(dimensions.skypix) 

691 return cls.from_element_names(elements, universe=dimensions.universe) 

692 

693 def update(self, data_ids: Iterable[DataCoordinate]) -> None: 

694 """Extract dimension records from an iterable of data IDs. 

695 

696 Parameters 

697 ---------- 

698 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ] 

699 Data IDs to extract dimension records from. 

700 """ 

701 for data_id in data_ids: 

702 for element in data_id.dimensions.elements & self.records.keys(): 

703 if (record := data_id.records[element]) is not None: 

704 self.records[element].add(record) 

705 

706 

707class SerializableDimensionData(pydantic.RootModel): 

708 """A pydantic model for normalized serialization of dimension records. 

709 

710 While dimension records are serialized directly via this model, they are 

711 deserialized by constructing a `DimensionRecordSetDeserializer` from this 

712 model, which allows full validation to be performed only on the records 

713 that are actually loaded. 

714 """ 

715 

716 root: dict[str, list[SerializedKeyValueDimensionRecord]] = pydantic.Field(default_factory=dict) 

717 

718 @classmethod 

719 def from_record_sets(cls, record_sets: Iterable[DimensionRecordSet]) -> SerializableDimensionData: 

720 """Construct from an iterable of `DimensionRecordSet` objects. 

721 

722 Parameters 

723 ---------- 

724 record_sets : `~collections.abc.Iterable` [ `DimensionRecordSet` ] 

725 Sets of dimension records, each for a different dimension element. 

726 

727 Returns 

728 ------- 

729 model : `SerializableDimensionData` 

730 New model instance. 

731 """ 

732 return cls.model_construct( 

733 root={record_set.element.name: record_set.serialize_records() for record_set in record_sets} 

734 ) 

735 

736 def make_deserializers(self, universe: DimensionUniverse) -> list[DimensionRecordSetDeserializer]: 

737 """Make objects from this model that handle the second phase of 

738 deserialization. 

739 

740 Parameters 

741 ---------- 

742 universe : `DimensionUniverse` 

743 Definitions of all dimensions. 

744 

745 Returns 

746 ------- 

747 deserializers : `list` [ `DimensionRecordSetDeserializer` ] 

748 A list of deserializers objects, one for each dimension element. 

749 """ 

750 return [ 

751 DimensionRecordSetDeserializer.from_raw(universe[element_name], raw_records) 

752 for element_name, raw_records in self.root.items() 

753 ] 

754 

755 

756class DimensionDataAttacher: 

757 """A helper class for attaching dimension records to data IDs. 

758 

759 Parameters 

760 ---------- 

761 records : `dict` [`str`, `DimensionRecordSet`], optional 

762 Regular dimension record sets, keyed by dimension element name. Not 

763 copied, and may be modified in-place. 

764 deserializers : `dict` [`str`, `DimensionRecordSetDeserializer`], optional 

765 Partially-deserialized dimension records, keyed by dimension element 

766 name. Records will be fully deserialized on demand and then cached. 

767 cache : `DimensionRecordCache`, optional 

768 A cache of dimension records from a butler instance. If present, this 

769 is assumed to have records for elements that are not in ``records`` and 

770 ``deserializers``. 

771 dimensions : `DimensionGroup`, optional 

772 Dimensions for which empty record sets should be added when no other 

773 source of records is given. This allows data IDs with these dimensions 

774 to have records attached by fetching them via the ``query`` argument 

775 to the ``attach`` method, or by computing regions on the skypix 

776 dimensions. 

777 """ 

778 

779 def __init__( 

780 self, 

781 *, 

782 records: Iterable[DimensionRecordSet] = (), 

783 deserializers: Iterable[DimensionRecordSetDeserializer] = (), 

784 cache: DimensionRecordCache | None = None, 

785 dimensions: DimensionGroup | None = None, 

786 ): 

787 self.records = {record_set.element.name: record_set for record_set in records} 

788 self.deserializers: dict[str, DimensionRecordSetDeserializer] = {} 

789 for deserializer in deserializers: 

790 self.deserializers[deserializer.element.name] = deserializer 

791 if deserializer.element.name not in self.records: 

792 self.records[deserializer.element.name] = DimensionRecordSet(deserializer.element) 

793 self.cache = cache 

794 if dimensions is not None: 

795 for element in dimensions.elements: 

796 if element not in self.records and (self.cache is None or element not in self.cache): 

797 self.records[element] = DimensionRecordSet(element, universe=dimensions.universe) 

798 

799 def attach( 

800 self, dimensions: DimensionGroup, data_ids: Iterable[DataCoordinate], query: Query | None = None 

801 ) -> list[DataCoordinate]: 

802 """Attach dimension records to data IDs. 

803 

804 Parameters 

805 ---------- 

806 dimensions : `DimensionGroup` 

807 Dimensions of all given data IDs. All dimension elements must have 

808 been referenced in at least one of the constructor arguments. 

809 data_ids : `~collections.abc.Iterable` [ `DataCoordinate` ] 

810 Data IDs to attach dimension records to (not in place; data 

811 coordinates are immutable). 

812 query : `.queries.Query`, optional 

813 A butler query that can be used to look up missing dimension 

814 records. Records fetched via query are cached in the ``records`` 

815 attribute. 

816 

817 Returns 

818 ------- 

819 expanded : `list` [ `DataCoordinate` ] 

820 Data IDs with dimension records attached, in the same order as the 

821 original iterable. 

822 """ 

823 lookup_helpers = [ 

824 _DimensionRecordLookupHelper.build(dimensions, element_name, self) 

825 for element_name in dimensions.lookup_order 

826 ] 

827 records = [_InProgressRecordDicts(data_id) for data_id in data_ids] 

828 for lookup_helper in lookup_helpers: 

829 for r in records: 

830 lookup_helper.lookup(r) 

831 incomplete = lookup_helper.incomplete_records 

832 if incomplete: 

833 if query is not None: 

834 lookup_helper.fetch_missing(query) 

835 # We may still be missing records at this point, if they 

836 # were not available in the database. 

837 # This is intentional, because in existing Butler 

838 # repositories dimension records are not always fully 

839 # populated. (For example, it is common for a visit to 

840 # exist without corresponding visit_detector_region 

841 # records, since these are populated at different times 

842 # by different processes.) 

843 else: 

844 raise LookupError( 

845 f"No dimension record for element '{lookup_helper.element}' " 

846 f"for data ID {incomplete[0].data_id}. " 

847 f"{len(incomplete)} data ID{' was' if len(incomplete) == 1 else 's were'} " 

848 "missing at least one record." 

849 ) 

850 

851 return [r.data_id.expanded(r.done) for r in records] 

852 

853 def serialized( 

854 self, *, ignore: Iterable[str] = (), ignore_cached: bool = False, include_skypix: bool = False 

855 ) -> SerializableDimensionData: 

856 """Serialize all dimension data in this attacher, with deduplication 

857 across fully- and partially-deserialized records. 

858 

859 Parameters 

860 ---------- 

861 ignore : `~collections.abc.Iterable` [ `str` ], optional 

862 Names of dimension elements that should not be serialized. 

863 ignore_cached : `bool`, optional 

864 If `True`, ignore all dimension elements for which 

865 `DimensionElement.is_cached` is `True`. 

866 include_skypix : `bool`, optional 

867 If `True`, include skypix dimensions. These are ignored by default 

868 because they can always be recomputed from their IDs on-the-fly. 

869 

870 Returns 

871 ------- 

872 serialized : `SerializedDimensionData` 

873 Serialized dimension records. 

874 """ 

875 from ._skypix import SkyPixDimension 

876 

877 ignore = set(ignore) 

878 result = SerializableDimensionData() 

879 for record_set in self.records.values(): 

880 if record_set.element.name in ignore: 

881 continue 

882 if not include_skypix and isinstance(record_set.element, SkyPixDimension): 

883 continue 

884 if ignore_cached and record_set.element.is_cached: 

885 continue 

886 serialized_records: dict[tuple[DataIdValue, ...], SerializedKeyValueDimensionRecord] = {} 

887 if (deserializer := self.deserializers.get(record_set.element.name)) is not None: 

888 for key, value in deserializer._mapping.items(): 

889 serialized_record = list(key) 

890 serialized_record.extend(value) 

891 serialized_records[key] = serialized_record 

892 for key, record in record_set._by_required_values.items(): 

893 if key not in serialized_records: 

894 serialized_records[key] = record.serialize_key_value() 

895 result.root[record_set.element.name] = list(serialized_records.values()) 

896 if self.cache is not None and not ignore_cached: 

897 for record_set in self.cache.values(): 

898 result.root[record_set.element.name] = record_set.serialize_records() 

899 return result 

900 

901 

902@dataclasses.dataclass 

903class _InProgressRecordDicts: 

904 data_id: DataCoordinate 

905 done: dict[str, DimensionRecord] = dataclasses.field(default_factory=dict) 

906 

907 

908@dataclasses.dataclass 

909class _DimensionRecordLookupHelper: 

910 # These are the indices of the dimension record's data ID's required_values 

911 # tuple in the to-be-expanded data ID's full-values tuple. 

912 indices: list[int] 

913 record_set: DimensionRecordSet 

914 incomplete_records: list[_InProgressRecordDicts] = dataclasses.field(default_factory=list) 

915 

916 @property 

917 def element(self) -> str: 

918 return self.record_set.element.name 

919 

920 @staticmethod 

921 def build( 

922 dimensions: DimensionGroup, element: str, attacher: DimensionDataAttacher 

923 ) -> _DimensionRecordLookupHelper: 

924 indices = [ 

925 dimensions._data_coordinate_indices[k] 

926 for k in dimensions.universe.elements[element].minimal_group.required 

927 ] 

928 if attacher.cache is not None and element in attacher.cache: 

929 return _DimensionRecordLookupHelper(indices, attacher.cache[element]) 

930 elif element in dimensions.skypix: 

931 return _SkyPixDimensionRecordLookupHelper( 

932 indices, 

933 attacher.records[element], 

934 dimension=dimensions.universe.skypix_dimensions[element], 

935 ) 

936 elif element in attacher.deserializers: 

937 return _DeserializingDimensionRecordLookupHelper( 

938 indices, attacher.records[element], deserializer=attacher.deserializers[element] 

939 ) 

940 else: 

941 return _DimensionRecordLookupHelper(indices, attacher.records[element]) 

942 

943 def lookup(self, records: _InProgressRecordDicts) -> None: 

944 required_values = self._get_required_values(records) 

945 if (result := self.record_set._by_required_values.get(required_values)) is None: 

946 result = self.fallback(required_values) 

947 if result is not None: 

948 self.record_set.add(result) 

949 records.done[self.element] = result 

950 else: 

951 self.incomplete_records.append(records) 

952 else: 

953 records.done[self.element] = result 

954 

955 def _get_required_values(self, records: _InProgressRecordDicts) -> tuple[DataIdValue, ...]: 

956 if records.data_id.hasFull(): 

957 full_values = records.data_id.full_values 

958 return tuple([full_values[i] for i in self.indices]) 

959 else: 

960 values = [] 

961 dimensions = self.record_set.element.minimal_group.required 

962 for dimension in dimensions: 

963 value = records.data_id.get(dimension) 

964 if value is None: 

965 value = self._find_implied_value(dimension, records) 

966 values.append(value) 

967 return tuple(values) 

968 

969 def _find_implied_value(self, implied_dimension: str, records: _InProgressRecordDicts) -> DataIdValue: 

970 for rec in records.done.values(): 

971 if implied_dimension in rec.definition.implied: 

972 return rec.get(implied_dimension) 

973 

974 raise LookupError( 

975 f"Implied value for dimension '{implied_dimension}' not found in records for" 

976 f" {list(records.done.keys())}" 

977 ) 

978 

979 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord | None: 

980 return None 

981 

982 def fetch_missing(self, query: Query) -> None: 

983 if self.incomplete_records: 

984 missing_values = set(self._get_required_values(r) for r in self.incomplete_records) 

985 self.record_set.update( 

986 query.join_data_coordinates( 

987 [ 

988 DataCoordinate.from_required_values(self.record_set.element.minimal_group, values) 

989 for values in missing_values 

990 ] 

991 ).dimension_records(self.record_set.element.name) 

992 ) 

993 

994 missing = self.incomplete_records 

995 self.incomplete_records = list() 

996 for record in missing: 

997 self.lookup(record) 

998 

999 

1000@dataclasses.dataclass 

1001class _DeserializingDimensionRecordLookupHelper(_DimensionRecordLookupHelper): 

1002 deserializer: DimensionRecordSetDeserializer = dataclasses.field(kw_only=True) 

1003 

1004 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord | None: 

1005 try: 

1006 return self.deserializer[required_values] 

1007 except KeyError: 

1008 return None 

1009 

1010 

1011@dataclasses.dataclass 

1012class _SkyPixDimensionRecordLookupHelper(_DimensionRecordLookupHelper): 

1013 dimension: SkyPixDimension = dataclasses.field(kw_only=True) 

1014 

1015 def fallback(self, required_values: tuple[DataIdValue, ...]) -> DimensionRecord: 

1016 id = required_values[0] 

1017 return self.dimension.RecordClass(id=id, region=self.dimension.pixelization.pixel(id))