Coverage for python / lsst / daf / butler / transfers / _yaml.py: 12%

298 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["YamlRepoExportBackend", "YamlRepoImportBackend"] 

31 

32import logging 

33import uuid 

34import warnings 

35from collections import UserDict, defaultdict 

36from collections.abc import Iterable, Mapping 

37from datetime import datetime 

38from typing import IO, TYPE_CHECKING, Any 

39 

40import astropy.time 

41import yaml 

42 

43from lsst.resources import ResourcePath 

44from lsst.utils import doImportType 

45from lsst.utils.introspection import find_outside_stacklevel 

46from lsst.utils.iteration import ensure_iterable 

47 

48from .._collection_type import CollectionType 

49from .._dataset_association import DatasetAssociation 

50from .._dataset_ref import DatasetId, DatasetRef 

51from .._dataset_type import DatasetType 

52from .._file_dataset import FileDataset 

53from .._named import NamedValueSet 

54from .._timespan import Timespan 

55from ..datastore import Datastore 

56from ..dimensions import DimensionElement, DimensionRecord, DimensionUniverse 

57from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord, RunRecord, VersionTuple 

58from ..registry.versions import IncompatibleVersionError 

59from ._interfaces import RepoExportBackend, RepoImportBackend 

60 

61if TYPE_CHECKING: 

62 from lsst.daf.butler import Butler 

63 from lsst.resources import ResourcePathExpression 

64 

65_LOG = logging.getLogger(__name__) 

66 

67EXPORT_FORMAT_VERSION = VersionTuple(1, 0, 2) 

68"""Export format version. 

69 

70Files with a different major version or a newer minor version cannot be read by 

71this version of the code. 

72""" 

73 

74 

75class _RefMapper(UserDict[int, uuid.UUID]): 

76 """Create a local dict subclass which creates new deterministic UUID for 

77 missing keys. 

78 """ 

79 

80 _namespace = uuid.UUID("4d4851f4-2890-4d41-8779-5f38a3f5062b") 

81 

82 def __missing__(self, key: int) -> uuid.UUID: 

83 newUUID = uuid.uuid3(namespace=self._namespace, name=str(key)) 

84 self[key] = newUUID 

85 return newUUID 

86 

87 

88_refIntId2UUID = _RefMapper() 

89 

90 

91def _uuid_representer(dumper: yaml.Dumper, data: uuid.UUID) -> yaml.Node: 

92 """Generate YAML representation for UUID. 

93 

94 This produces a scalar node with a tag "!uuid" and value being a regular 

95 string representation of UUID. 

96 """ 

97 return dumper.represent_scalar("!uuid", str(data)) 

98 

99 

100def _uuid_constructor(loader: yaml.Loader, node: yaml.Node) -> uuid.UUID | None: 

101 if node.value is not None: 

102 return uuid.UUID(hex=node.value) 

103 return None 

104 

105 

106yaml.Dumper.add_representer(uuid.UUID, _uuid_representer) 

107yaml.SafeLoader.add_constructor("!uuid", _uuid_constructor) 

108 

109 

110class YamlRepoExportBackend(RepoExportBackend): 

111 """A repository export implementation that saves to a YAML file. 

112 

113 Parameters 

114 ---------- 

115 stream : `typing.IO` 

116 A writeable file-like object. 

117 universe : `DimensionUniverse` 

118 The dimension universe to use for the export. 

119 """ 

120 

121 def __init__(self, stream: IO, universe: DimensionUniverse): 

122 self.stream = stream 

123 self.universe = universe 

124 self.data: list[dict[str, Any]] = [] 

125 

126 def saveDimensionData(self, element: DimensionElement, *data: DimensionRecord) -> None: 

127 # Docstring inherited from RepoExportBackend.saveDimensionData. 

128 data_dicts = [record.toDict(splitTimespan=True) for record in data] 

129 self.data.append( 

130 { 

131 "type": "dimension", 

132 "element": element.name, 

133 "records": data_dicts, 

134 } 

135 ) 

136 

137 def saveCollection(self, record: CollectionRecord, doc: str | None) -> None: 

138 # Docstring inherited from RepoExportBackend.saveCollections. 

139 data: dict[str, Any] = { 

140 "type": "collection", 

141 "collection_type": record.type.name, 

142 "name": record.name, 

143 } 

144 if doc is not None: 

145 data["doc"] = doc 

146 if isinstance(record, RunRecord): 

147 data["host"] = record.host 

148 data["timespan_begin"] = record.timespan.begin 

149 data["timespan_end"] = record.timespan.end 

150 elif isinstance(record, ChainedCollectionRecord): 

151 data["children"] = list(record.children) 

152 self.data.append(data) 

153 

154 def saveDatasets(self, datasetType: DatasetType, run: str, *datasets: FileDataset) -> None: 

155 # Docstring inherited from RepoExportBackend.saveDatasets. 

156 self.data.append( 

157 { 

158 "type": "dataset_type", 

159 "name": datasetType.name, 

160 "dimensions": list(datasetType.dimensions.names), 

161 "storage_class": datasetType.storageClass_name, 

162 "is_calibration": datasetType.isCalibration(), 

163 } 

164 ) 

165 self.data.append( 

166 { 

167 "type": "dataset", 

168 "dataset_type": datasetType.name, 

169 "run": run, 

170 "records": [ 

171 { 

172 "dataset_id": [ref.id for ref in sorted(dataset.refs)], 

173 "data_id": [dict(ref.dataId.required) for ref in sorted(dataset.refs)], 

174 "path": dataset.path, 

175 "formatter": dataset.formatter, 

176 # TODO: look up and save other collections 

177 } 

178 for dataset in datasets 

179 ], 

180 } 

181 ) 

182 

183 def saveDatasetAssociations( 

184 self, collection: str, collectionType: CollectionType, associations: Iterable[DatasetAssociation] 

185 ) -> None: 

186 # Docstring inherited from RepoExportBackend.saveDatasetAssociations. 

187 if collectionType is CollectionType.TAGGED: 

188 self.data.append( 

189 { 

190 "type": "associations", 

191 "collection": collection, 

192 "collection_type": collectionType.name, 

193 "dataset_ids": [assoc.ref.id for assoc in associations], 

194 } 

195 ) 

196 elif collectionType is CollectionType.CALIBRATION: 

197 idsByTimespan: dict[Timespan, list[DatasetId]] = defaultdict(list) 

198 for association in associations: 

199 assert association.timespan is not None 

200 idsByTimespan[association.timespan].append(association.ref.id) 

201 self.data.append( 

202 { 

203 "type": "associations", 

204 "collection": collection, 

205 "collection_type": collectionType.name, 

206 "validity_ranges": [ 

207 { 

208 "timespan": timespan, 

209 "dataset_ids": dataset_ids, 

210 } 

211 for timespan, dataset_ids in idsByTimespan.items() 

212 ], 

213 } 

214 ) 

215 

216 def finish(self) -> None: 

217 # Docstring inherited from RepoExportBackend. 

218 yaml.dump( 

219 { 

220 "description": "Butler Data Repository Export", 

221 "version": str(EXPORT_FORMAT_VERSION), 

222 "universe_version": self.universe.version, 

223 "universe_namespace": self.universe.namespace, 

224 "data": self.data, 

225 }, 

226 stream=self.stream, 

227 sort_keys=False, 

228 ) 

229 

230 

231class _DayObsOffsetCalculator: 

232 """Interface to allow the day_obs offset to be calculated from an 

233 instrument class name and cached. 

234 """ 

235 

236 name_to_class_name: dict[str, str] 

237 name_to_offset: dict[str, int | None] 

238 

239 def __init__(self) -> None: 

240 self.name_to_class_name = {} 

241 self.name_to_offset = {} 

242 

243 def __setitem__(self, name: str, class_name: str) -> None: 

244 """Store the instrument class name. 

245 

246 Parameters 

247 ---------- 

248 name : `str` 

249 Name of the instrument. 

250 class_name : `str` 

251 Full name of the instrument class. 

252 """ 

253 self.name_to_class_name[name] = class_name 

254 

255 def get_offset(self, name: str, date: astropy.time.Time) -> int | None: 

256 """Return the offset to use when calculating day_obs. 

257 

258 Parameters 

259 ---------- 

260 name : `str` 

261 The instrument name. 

262 date : `astropy.time.Time` 

263 Time for which the offset is required. 

264 

265 Returns 

266 ------- 

267 offset : `int` 

268 The offset in seconds. 

269 """ 

270 if name in self.name_to_offset: 

271 return self.name_to_offset[name] 

272 

273 try: 

274 instrument_class = doImportType(self.name_to_class_name[name]) 

275 except Exception: 

276 # Any error at all, store None and do not try again. 

277 self.name_to_offset[name] = None 

278 return None 

279 

280 # Assume this is a `lsst.pipe.base.Instrument` and that it has 

281 # a translatorClass property pointing to an 

282 # astro_metadata_translator.MetadataTranslator class. If this is not 

283 # true give up and store None. 

284 try: 

285 offset_delta = instrument_class.translatorClass.observing_date_to_offset(date) # type: ignore 

286 except Exception: 

287 offset_delta = None 

288 

289 if offset_delta is None: 

290 self.name_to_offset[name] = None 

291 return None 

292 

293 self.name_to_offset[name] = round(offset_delta.to_value("s")) 

294 return self.name_to_offset[name] 

295 

296 

297class YamlRepoImportBackend(RepoImportBackend): 

298 """A repository import implementation that reads from a YAML file. 

299 

300 Parameters 

301 ---------- 

302 stream : `typing.IO` 

303 A readable file-like object. 

304 butler : `Butler` 

305 The butler datasets will be imported into. Only used to retrieve 

306 dataset types during construction; all writes happen in `register` 

307 and `load`. 

308 """ 

309 

310 def __init__(self, stream: IO, butler: Butler): 

311 # We read the file fully and convert its contents to Python objects 

312 # instead of loading incrementally so we can spot some problems early; 

313 # because `register` can't be put inside a transaction, we'd rather not 

314 # run that at all if there's going to be problem later in `load`. 

315 wrapper = yaml.safe_load(stream) 

316 if wrapper["version"] == 0: 

317 # Grandfather-in 'version: 0' -> 1.0.0, which is what we wrote 

318 # before we really tried to do versioning here. 

319 fileVersion = VersionTuple(1, 0, 0) 

320 else: 

321 fileVersion = VersionTuple.fromString(wrapper["version"]) 

322 if fileVersion.major != EXPORT_FORMAT_VERSION.major: 

323 raise IncompatibleVersionError( 

324 f"Cannot read repository export file with version={fileVersion} " 

325 f"({EXPORT_FORMAT_VERSION.major}.x.x required)." 

326 ) 

327 if fileVersion.minor > EXPORT_FORMAT_VERSION.minor: 

328 raise IncompatibleVersionError( 

329 f"Cannot read repository export file with version={fileVersion} " 

330 f"< {EXPORT_FORMAT_VERSION.major}.{EXPORT_FORMAT_VERSION.minor}.x required." 

331 ) 

332 self.runs: dict[str, tuple[str | None, Timespan]] = {} 

333 self.chains: dict[str, list[str]] = {} 

334 self.collections: dict[str, CollectionType] = {} 

335 self.collectionDocs: dict[str, str] = {} 

336 self.datasetTypes: NamedValueSet[DatasetType] = NamedValueSet() 

337 self.dimensions: Mapping[DimensionElement, list[DimensionRecord]] = defaultdict(list) 

338 self.tagAssociations: dict[str, list[DatasetId]] = defaultdict(list) 

339 self.calibAssociations: dict[str, dict[Timespan, list[DatasetId]]] = defaultdict(dict) 

340 self.refsByFileId: dict[DatasetId, DatasetRef] = {} 

341 self.butler: Butler = butler 

342 

343 universe_version = wrapper.get("universe_version", 0) 

344 universe_namespace = wrapper.get("universe_namespace", "daf_butler") 

345 

346 # If this is data exported before the reorganization of visits 

347 # and visit systems and that new schema is in use, some filtering 

348 # will be needed. The entry in the visit dimension record will be 

349 # silently dropped when visit is created but the 

350 # visit_system_membership must be constructed. 

351 migrate_visit_system = False 

352 if ( 

353 universe_version < 2 

354 and universe_namespace == "daf_butler" 

355 and "visit_system_membership" in self.butler.dimensions 

356 ): 

357 migrate_visit_system = True 

358 

359 # Drop "seeing" from visits in files older than version 1. 

360 migrate_visit_seeing = False 

361 if ( 

362 universe_version < 1 

363 and universe_namespace == "daf_butler" 

364 and "visit" in self.butler.dimensions 

365 and "seeing" not in self.butler.dimensions["visit"].metadata 

366 ): 

367 migrate_visit_seeing = True 

368 

369 # If this data exported before group was a first-class dimension, 

370 # we'll need to modify some exposure columns and add group records. 

371 migrate_group = False 

372 if ( 

373 universe_version < 6 

374 and universe_namespace == "daf_butler" 

375 and "exposure" in self.butler.dimensions 

376 and "group" in self.butler.dimensions["exposure"].implied 

377 ): 

378 migrate_group = True 

379 

380 # If this data exported before day_obs was a first-class dimension, 

381 # we'll need to modify some exposure and visit columns and add day_obs 

382 # records. This is especially tricky because some files even predate 

383 # the existence of data ID values. 

384 migrate_exposure_day_obs = False 

385 migrate_visit_day_obs = False 

386 day_obs_ids: set[tuple[str, int]] = set() 

387 if universe_version < 6 and universe_namespace == "daf_butler": 

388 if ( 

389 "exposure" in self.butler.dimensions 

390 and "day_obs" in self.butler.dimensions["exposure"].implied 

391 ): 

392 migrate_exposure_day_obs = True 

393 if "visit" in self.butler.dimensions and "day_obs" in self.butler.dimensions["visit"].implied: 

394 migrate_visit_day_obs = True 

395 

396 # If this is pre-v1 universe we may need to fill in a missing 

397 # visit.day_obs field. 

398 migrate_add_visit_day_obs = False 

399 if ( 

400 universe_version < 1 

401 and universe_namespace == "daf_butler" 

402 and ( 

403 "day_obs" in self.butler.dimensions["visit"].implied 

404 or "day_obs" in self.butler.dimensions["visit"].metadata 

405 ) 

406 ): 

407 migrate_add_visit_day_obs = True 

408 

409 # Some conversions may need to work out a day_obs timespan. 

410 # The only way this offset can be found is by querying the instrument 

411 # class. Read all the existing instrument classes indexed by name. 

412 instrument_classes: dict[str, int] = {} 

413 if migrate_exposure_day_obs or migrate_visit_day_obs or migrate_add_visit_day_obs: 

414 day_obs_offset_calculator = _DayObsOffsetCalculator() 

415 for rec in self.butler.registry.queryDimensionRecords("instrument"): 

416 day_obs_offset_calculator[rec.name] = rec.class_name 

417 

418 datasetData = [] 

419 RecordClass: type[DimensionRecord] 

420 for data in wrapper["data"]: 

421 if data["type"] == "dimension": 

422 # convert all datetime values to astropy 

423 for record in data["records"]: 

424 for key in record: 

425 # Some older YAML files were produced with native 

426 # YAML support for datetime, we support reading that 

427 # data back. Newer conversion uses _AstropyTimeToYAML 

428 # class with special YAML tag. 

429 if isinstance(record[key], datetime): 

430 record[key] = astropy.time.Time(record[key], scale="utc") 

431 

432 if data["element"] == "instrument": 

433 if migrate_exposure_day_obs or migrate_visit_day_obs: 

434 # Might want the instrument class name for later. 

435 for record in data["records"]: 

436 if record["name"] not in instrument_classes: 

437 instrument_classes[record["name"]] = record["class_name"] 

438 

439 if data["element"] == "visit": 

440 if migrate_visit_system: 

441 # Must create the visit_system_membership records. 

442 # But first create empty list for visits since other 

443 # logic in this file depends on self.dimensions being 

444 # populated in an order consistent with primary keys. 

445 self.dimensions[self.butler.dimensions["visit"]] = [] 

446 element = self.butler.dimensions["visit_system_membership"] 

447 RecordClass = element.RecordClass 

448 self.dimensions[element].extend( 

449 RecordClass( 

450 instrument=r["instrument"], visit_system=r.pop("visit_system"), visit=r["id"] 

451 ) 

452 for r in data["records"] 

453 ) 

454 if migrate_visit_seeing: 

455 for record in data["records"]: 

456 record.pop("seeing", None) 

457 if migrate_add_visit_day_obs: 

458 # The day_obs field is missing. It can be derived from 

459 # the datetime_begin field. 

460 for record in data["records"]: 

461 date = record["datetime_begin"].tai 

462 offset = day_obs_offset_calculator.get_offset(record["instrument"], date) 

463 # This field is required so we have to calculate 

464 # it even if the offset is not defined. 

465 if offset: 

466 date = date - astropy.time.TimeDelta(offset, format="sec", scale="tai") 

467 record["day_obs"] = int(date.strftime("%Y%m%d")) 

468 if migrate_visit_day_obs: 

469 # Poke the entry for this dimension to make sure it 

470 # appears in the right order, even though we'll 

471 # populate it later. 

472 self.dimensions[self.butler.dimensions["day_obs"]] 

473 for record in data["records"]: 

474 day_obs_ids.add((record["instrument"], record["day_obs"])) 

475 

476 if data["element"] == "exposure": 

477 if migrate_group: 

478 element = self.butler.dimensions["group"] 

479 RecordClass = element.RecordClass 

480 group_records = self.dimensions[element] 

481 for exposure_record in data["records"]: 

482 exposure_record["group"] = exposure_record.pop("group_name") 

483 del exposure_record["group_id"] 

484 group_records.append( 

485 RecordClass( 

486 instrument=exposure_record["instrument"], name=exposure_record["group"] 

487 ) 

488 ) 

489 if migrate_exposure_day_obs: 

490 # Poke the entry for this dimension to make sure it 

491 # appears in the right order, even though we'll 

492 # populate it later. 

493 for record in data["records"]: 

494 day_obs_ids.add((record["instrument"], record["day_obs"])) 

495 

496 element = self.butler.dimensions[data["element"]] 

497 RecordClass = element.RecordClass 

498 self.dimensions[element].extend(RecordClass(**r) for r in data["records"]) 

499 

500 elif data["type"] == "collection": 

501 collectionType = CollectionType.from_name(data["collection_type"]) 

502 if collectionType is CollectionType.RUN: 

503 self.runs[data["name"]] = ( 

504 data["host"], 

505 Timespan(begin=data["timespan_begin"], end=data["timespan_end"]), 

506 ) 

507 elif collectionType is CollectionType.CHAINED: 

508 children = [] 

509 for child in data["children"]: 

510 if not isinstance(child, str): 

511 warnings.warn( 

512 f"CHAINED collection {data['name']} includes restrictions on child " 

513 "collection searches, which are no longer supported and will be ignored.", 

514 stacklevel=find_outside_stacklevel("lsst.daf.butler"), 

515 ) 

516 # Old form with dataset type restrictions only, 

517 # supported for backwards compatibility. 

518 child, _ = child 

519 children.append(child) 

520 self.chains[data["name"]] = children 

521 else: 

522 self.collections[data["name"]] = collectionType 

523 doc = data.get("doc") 

524 if doc is not None: 

525 self.collectionDocs[data["name"]] = doc 

526 elif data["type"] == "run": 

527 # Also support old form of saving a run with no extra info. 

528 self.runs[data["name"]] = (None, Timespan(None, None)) 

529 elif data["type"] == "dataset_type": 

530 dimensions = data["dimensions"] 

531 if migrate_visit_system and "visit" in dimensions and "visit_system" in dimensions: 

532 dimensions.remove("visit_system") 

533 self.datasetTypes.add( 

534 DatasetType( 

535 data["name"], 

536 dimensions=dimensions, 

537 storageClass=data["storage_class"], 

538 universe=self.butler.dimensions, 

539 isCalibration=data.get("is_calibration", False), 

540 ) 

541 ) 

542 elif data["type"] == "dataset": 

543 # Save raw dataset data for a second loop, so we can ensure we 

544 # know about all dataset types first. 

545 datasetData.append(data) 

546 elif data["type"] == "associations": 

547 collectionType = CollectionType.from_name(data["collection_type"]) 

548 if collectionType is CollectionType.TAGGED: 

549 self.tagAssociations[data["collection"]].extend( 

550 [x if not isinstance(x, int) else _refIntId2UUID[x] for x in data["dataset_ids"]] 

551 ) 

552 elif collectionType is CollectionType.CALIBRATION: 

553 assocsByTimespan = self.calibAssociations[data["collection"]] 

554 for d in data["validity_ranges"]: 

555 if "timespan" in d: 

556 assocsByTimespan[d["timespan"]] = [ 

557 x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] 

558 ] 

559 else: 

560 # TODO: this is for backward compatibility, should 

561 # be removed at some point. 

562 assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = [ 

563 x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"] 

564 ] 

565 else: 

566 raise ValueError(f"Unexpected calibration type for association: {collectionType.name}.") 

567 else: 

568 raise ValueError(f"Unexpected dictionary type: {data['type']}.") 

569 

570 if day_obs_ids: 

571 element = self.butler.dimensions["day_obs"] 

572 RecordClass = element.RecordClass 

573 missing_offsets = set() 

574 for instrument, day_obs in day_obs_ids: 

575 # To get the offset we need the astropy time. Since we are 

576 # going from a day_obs to a time, it's possible that in some 

577 # scenario the offset will be wrong. 

578 ymd = str(day_obs) 

579 t = astropy.time.Time( 

580 f"{ymd[0:4]}-{ymd[4:6]}-{ymd[6:8]}T00:00:00", format="isot", scale="tai" 

581 ) 

582 offset = day_obs_offset_calculator.get_offset(instrument, t) 

583 

584 # This should always return an offset but as a fallback 

585 # allow None here in case something has gone wrong above. 

586 # In particular, not being able to load an instrument class. 

587 if offset is not None: 

588 timespan = Timespan.from_day_obs(day_obs, offset=offset) 

589 else: 

590 timespan = None 

591 missing_offsets.add(instrument) 

592 self.dimensions[element].append( 

593 RecordClass(instrument=instrument, id=day_obs, timespan=timespan) 

594 ) 

595 

596 if missing_offsets: 

597 plural = "" if len(missing_offsets) == 1 else "s" 

598 warnings.warn( 

599 "Constructing day_obs records with no timespans for " 

600 "visit/exposure records that were exported before day_obs was a dimension. " 

601 f"(instrument{plural}: {missing_offsets})", 

602 stacklevel=find_outside_stacklevel("lsst.daf.butler"), 

603 ) 

604 

605 # key is (dataset type name, run) 

606 self.datasets: Mapping[tuple[str, str], list[FileDataset]] = defaultdict(list) 

607 for data in datasetData: 

608 datasetType = self.datasetTypes.get(data["dataset_type"]) 

609 if datasetType is None: 

610 datasetType = self.butler.get_dataset_type(data["dataset_type"]) 

611 self.datasets[data["dataset_type"], data["run"]].extend( 

612 FileDataset( 

613 d.get("path"), 

614 [ 

615 DatasetRef( 

616 datasetType, 

617 dataId, 

618 run=data["run"], 

619 id=refid if not isinstance(refid, int) else _refIntId2UUID[refid], 

620 ) 

621 for dataId, refid in zip( 

622 ensure_iterable(d["data_id"]), ensure_iterable(d["dataset_id"]), strict=True 

623 ) 

624 ], 

625 formatter=doImportType(d.get("formatter")) if "formatter" in d else None, 

626 ) 

627 for d in data["records"] 

628 ) 

629 

630 def register(self) -> None: 

631 # Docstring inherited from RepoImportBackend.register. 

632 for datasetType in self.datasetTypes: 

633 self.butler.registry.registerDatasetType(datasetType) 

634 for run in self.runs: 

635 self.butler.collections.register(run, doc=self.collectionDocs.get(run)) 

636 # No way to add extra run info to registry yet. 

637 for collection, collection_type in self.collections.items(): 

638 self.butler.collections.register( 

639 collection, collection_type, doc=self.collectionDocs.get(collection) 

640 ) 

641 for chain, children in self.chains.items(): 

642 self.butler.collections.register( 

643 chain, CollectionType.CHAINED, doc=self.collectionDocs.get(chain) 

644 ) 

645 self.butler.registry.setCollectionChain(chain, children) 

646 

647 def load( 

648 self, 

649 datastore: Datastore | None, 

650 *, 

651 directory: ResourcePathExpression | None = None, 

652 transfer: str | None = None, 

653 skip_dimensions: set | None = None, 

654 record_validation_info: bool = True, 

655 ) -> None: 

656 # Docstring inherited from RepoImportBackend.load. 

657 # Must ensure we insert in order supported by the universe. 

658 for element in self.butler.dimensions.sorted(self.dimensions.keys()): 

659 dimensionRecords = self.dimensions[element] 

660 if skip_dimensions and element in skip_dimensions: 

661 continue 

662 # Using skip_existing=True here assumes that the records in the 

663 # database are either equivalent or at least preferable to the ones 

664 # being imported. It'd be ideal to check that, but that would mean 

665 # using syncDimensionData, which is not vectorized and is hence 

666 # unacceptably slo. 

667 self.butler.registry.insertDimensionData(element, *dimensionRecords, skip_existing=True) 

668 # FileDatasets to ingest into the datastore (in bulk): 

669 fileDatasets = [] 

670 for records in self.datasets.values(): 

671 # Make a big flattened list of all data IDs and dataset_ids, while 

672 # remembering slices that associate them with the FileDataset 

673 # instances they came from. 

674 datasets: list[DatasetRef] = [] 

675 dataset_ids: list[DatasetId] = [] 

676 slices = [] 

677 for fileDataset in records: 

678 start = len(datasets) 

679 datasets.extend(fileDataset.refs) 

680 dataset_ids.extend(ref.id for ref in fileDataset.refs) 

681 stop = len(datasets) 

682 slices.append(slice(start, stop)) 

683 # Insert all of those DatasetRefs at once. 

684 # For now, we ignore the dataset_id we pulled from the file 

685 # and just insert without one to get a new autoincrement value. 

686 # Eventually (once we have origin in IDs) we'll preserve them. 

687 resolvedRefs = self.butler.registry._importDatasets(datasets) 

688 # Populate our dictionary that maps int dataset_id values from the 

689 # export file to the new DatasetRefs 

690 for fileId, ref in zip(dataset_ids, resolvedRefs, strict=True): 

691 self.refsByFileId[fileId] = ref 

692 # Now iterate over the original records, and install the new 

693 # resolved DatasetRefs to replace the unresolved ones as we 

694 # reorganize the collection information. 

695 for sliceForFileDataset, fileDataset in zip(slices, records, strict=True): 

696 fileDataset.refs = resolvedRefs[sliceForFileDataset] 

697 if directory is not None: 

698 fileDataset.path = ResourcePath(directory, forceDirectory=True).join(fileDataset.path) 

699 fileDatasets.append(fileDataset) 

700 # Ingest everything into the datastore at once. 

701 if datastore is not None and fileDatasets: 

702 datastore.ingest(*fileDatasets, transfer=transfer, record_validation_info=record_validation_info) 

703 # Associate datasets with tagged collections. 

704 for collection, dataset_ids in self.tagAssociations.items(): 

705 self.butler.registry.associate(collection, [self.refsByFileId[i] for i in dataset_ids]) 

706 # Associate datasets with calibration collections. 

707 for collection, idsByTimespan in self.calibAssociations.items(): 

708 for timespan, dataset_ids in idsByTimespan.items(): 

709 self.butler.registry.certify( 

710 collection, [self.refsByFileId[i] for i in dataset_ids], timespan 

711 )