Coverage for python / lsst / daf / butler / transfers / _yaml.py: 12%
298 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["YamlRepoExportBackend", "YamlRepoImportBackend"]
32import logging
33import uuid
34import warnings
35from collections import UserDict, defaultdict
36from collections.abc import Iterable, Mapping
37from datetime import datetime
38from typing import IO, TYPE_CHECKING, Any
40import astropy.time
41import yaml
43from lsst.resources import ResourcePath
44from lsst.utils import doImportType
45from lsst.utils.introspection import find_outside_stacklevel
46from lsst.utils.iteration import ensure_iterable
48from .._collection_type import CollectionType
49from .._dataset_association import DatasetAssociation
50from .._dataset_ref import DatasetId, DatasetRef
51from .._dataset_type import DatasetType
52from .._file_dataset import FileDataset
53from .._named import NamedValueSet
54from .._timespan import Timespan
55from ..datastore import Datastore
56from ..dimensions import DimensionElement, DimensionRecord, DimensionUniverse
57from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord, RunRecord, VersionTuple
58from ..registry.versions import IncompatibleVersionError
59from ._interfaces import RepoExportBackend, RepoImportBackend
61if TYPE_CHECKING:
62 from lsst.daf.butler import Butler
63 from lsst.resources import ResourcePathExpression
65_LOG = logging.getLogger(__name__)
67EXPORT_FORMAT_VERSION = VersionTuple(1, 0, 2)
68"""Export format version.
70Files with a different major version or a newer minor version cannot be read by
71this version of the code.
72"""
75class _RefMapper(UserDict[int, uuid.UUID]):
76 """Create a local dict subclass which creates new deterministic UUID for
77 missing keys.
78 """
80 _namespace = uuid.UUID("4d4851f4-2890-4d41-8779-5f38a3f5062b")
82 def __missing__(self, key: int) -> uuid.UUID:
83 newUUID = uuid.uuid3(namespace=self._namespace, name=str(key))
84 self[key] = newUUID
85 return newUUID
88_refIntId2UUID = _RefMapper()
91def _uuid_representer(dumper: yaml.Dumper, data: uuid.UUID) -> yaml.Node:
92 """Generate YAML representation for UUID.
94 This produces a scalar node with a tag "!uuid" and value being a regular
95 string representation of UUID.
96 """
97 return dumper.represent_scalar("!uuid", str(data))
100def _uuid_constructor(loader: yaml.Loader, node: yaml.Node) -> uuid.UUID | None:
101 if node.value is not None:
102 return uuid.UUID(hex=node.value)
103 return None
106yaml.Dumper.add_representer(uuid.UUID, _uuid_representer)
107yaml.SafeLoader.add_constructor("!uuid", _uuid_constructor)
110class YamlRepoExportBackend(RepoExportBackend):
111 """A repository export implementation that saves to a YAML file.
113 Parameters
114 ----------
115 stream : `typing.IO`
116 A writeable file-like object.
117 universe : `DimensionUniverse`
118 The dimension universe to use for the export.
119 """
121 def __init__(self, stream: IO, universe: DimensionUniverse):
122 self.stream = stream
123 self.universe = universe
124 self.data: list[dict[str, Any]] = []
126 def saveDimensionData(self, element: DimensionElement, *data: DimensionRecord) -> None:
127 # Docstring inherited from RepoExportBackend.saveDimensionData.
128 data_dicts = [record.toDict(splitTimespan=True) for record in data]
129 self.data.append(
130 {
131 "type": "dimension",
132 "element": element.name,
133 "records": data_dicts,
134 }
135 )
137 def saveCollection(self, record: CollectionRecord, doc: str | None) -> None:
138 # Docstring inherited from RepoExportBackend.saveCollections.
139 data: dict[str, Any] = {
140 "type": "collection",
141 "collection_type": record.type.name,
142 "name": record.name,
143 }
144 if doc is not None:
145 data["doc"] = doc
146 if isinstance(record, RunRecord):
147 data["host"] = record.host
148 data["timespan_begin"] = record.timespan.begin
149 data["timespan_end"] = record.timespan.end
150 elif isinstance(record, ChainedCollectionRecord):
151 data["children"] = list(record.children)
152 self.data.append(data)
154 def saveDatasets(self, datasetType: DatasetType, run: str, *datasets: FileDataset) -> None:
155 # Docstring inherited from RepoExportBackend.saveDatasets.
156 self.data.append(
157 {
158 "type": "dataset_type",
159 "name": datasetType.name,
160 "dimensions": list(datasetType.dimensions.names),
161 "storage_class": datasetType.storageClass_name,
162 "is_calibration": datasetType.isCalibration(),
163 }
164 )
165 self.data.append(
166 {
167 "type": "dataset",
168 "dataset_type": datasetType.name,
169 "run": run,
170 "records": [
171 {
172 "dataset_id": [ref.id for ref in sorted(dataset.refs)],
173 "data_id": [dict(ref.dataId.required) for ref in sorted(dataset.refs)],
174 "path": dataset.path,
175 "formatter": dataset.formatter,
176 # TODO: look up and save other collections
177 }
178 for dataset in datasets
179 ],
180 }
181 )
183 def saveDatasetAssociations(
184 self, collection: str, collectionType: CollectionType, associations: Iterable[DatasetAssociation]
185 ) -> None:
186 # Docstring inherited from RepoExportBackend.saveDatasetAssociations.
187 if collectionType is CollectionType.TAGGED:
188 self.data.append(
189 {
190 "type": "associations",
191 "collection": collection,
192 "collection_type": collectionType.name,
193 "dataset_ids": [assoc.ref.id for assoc in associations],
194 }
195 )
196 elif collectionType is CollectionType.CALIBRATION:
197 idsByTimespan: dict[Timespan, list[DatasetId]] = defaultdict(list)
198 for association in associations:
199 assert association.timespan is not None
200 idsByTimespan[association.timespan].append(association.ref.id)
201 self.data.append(
202 {
203 "type": "associations",
204 "collection": collection,
205 "collection_type": collectionType.name,
206 "validity_ranges": [
207 {
208 "timespan": timespan,
209 "dataset_ids": dataset_ids,
210 }
211 for timespan, dataset_ids in idsByTimespan.items()
212 ],
213 }
214 )
216 def finish(self) -> None:
217 # Docstring inherited from RepoExportBackend.
218 yaml.dump(
219 {
220 "description": "Butler Data Repository Export",
221 "version": str(EXPORT_FORMAT_VERSION),
222 "universe_version": self.universe.version,
223 "universe_namespace": self.universe.namespace,
224 "data": self.data,
225 },
226 stream=self.stream,
227 sort_keys=False,
228 )
231class _DayObsOffsetCalculator:
232 """Interface to allow the day_obs offset to be calculated from an
233 instrument class name and cached.
234 """
236 name_to_class_name: dict[str, str]
237 name_to_offset: dict[str, int | None]
239 def __init__(self) -> None:
240 self.name_to_class_name = {}
241 self.name_to_offset = {}
243 def __setitem__(self, name: str, class_name: str) -> None:
244 """Store the instrument class name.
246 Parameters
247 ----------
248 name : `str`
249 Name of the instrument.
250 class_name : `str`
251 Full name of the instrument class.
252 """
253 self.name_to_class_name[name] = class_name
255 def get_offset(self, name: str, date: astropy.time.Time) -> int | None:
256 """Return the offset to use when calculating day_obs.
258 Parameters
259 ----------
260 name : `str`
261 The instrument name.
262 date : `astropy.time.Time`
263 Time for which the offset is required.
265 Returns
266 -------
267 offset : `int`
268 The offset in seconds.
269 """
270 if name in self.name_to_offset:
271 return self.name_to_offset[name]
273 try:
274 instrument_class = doImportType(self.name_to_class_name[name])
275 except Exception:
276 # Any error at all, store None and do not try again.
277 self.name_to_offset[name] = None
278 return None
280 # Assume this is a `lsst.pipe.base.Instrument` and that it has
281 # a translatorClass property pointing to an
282 # astro_metadata_translator.MetadataTranslator class. If this is not
283 # true give up and store None.
284 try:
285 offset_delta = instrument_class.translatorClass.observing_date_to_offset(date) # type: ignore
286 except Exception:
287 offset_delta = None
289 if offset_delta is None:
290 self.name_to_offset[name] = None
291 return None
293 self.name_to_offset[name] = round(offset_delta.to_value("s"))
294 return self.name_to_offset[name]
297class YamlRepoImportBackend(RepoImportBackend):
298 """A repository import implementation that reads from a YAML file.
300 Parameters
301 ----------
302 stream : `typing.IO`
303 A readable file-like object.
304 butler : `Butler`
305 The butler datasets will be imported into. Only used to retrieve
306 dataset types during construction; all writes happen in `register`
307 and `load`.
308 """
310 def __init__(self, stream: IO, butler: Butler):
311 # We read the file fully and convert its contents to Python objects
312 # instead of loading incrementally so we can spot some problems early;
313 # because `register` can't be put inside a transaction, we'd rather not
314 # run that at all if there's going to be problem later in `load`.
315 wrapper = yaml.safe_load(stream)
316 if wrapper["version"] == 0:
317 # Grandfather-in 'version: 0' -> 1.0.0, which is what we wrote
318 # before we really tried to do versioning here.
319 fileVersion = VersionTuple(1, 0, 0)
320 else:
321 fileVersion = VersionTuple.fromString(wrapper["version"])
322 if fileVersion.major != EXPORT_FORMAT_VERSION.major:
323 raise IncompatibleVersionError(
324 f"Cannot read repository export file with version={fileVersion} "
325 f"({EXPORT_FORMAT_VERSION.major}.x.x required)."
326 )
327 if fileVersion.minor > EXPORT_FORMAT_VERSION.minor:
328 raise IncompatibleVersionError(
329 f"Cannot read repository export file with version={fileVersion} "
330 f"< {EXPORT_FORMAT_VERSION.major}.{EXPORT_FORMAT_VERSION.minor}.x required."
331 )
332 self.runs: dict[str, tuple[str | None, Timespan]] = {}
333 self.chains: dict[str, list[str]] = {}
334 self.collections: dict[str, CollectionType] = {}
335 self.collectionDocs: dict[str, str] = {}
336 self.datasetTypes: NamedValueSet[DatasetType] = NamedValueSet()
337 self.dimensions: Mapping[DimensionElement, list[DimensionRecord]] = defaultdict(list)
338 self.tagAssociations: dict[str, list[DatasetId]] = defaultdict(list)
339 self.calibAssociations: dict[str, dict[Timespan, list[DatasetId]]] = defaultdict(dict)
340 self.refsByFileId: dict[DatasetId, DatasetRef] = {}
341 self.butler: Butler = butler
343 universe_version = wrapper.get("universe_version", 0)
344 universe_namespace = wrapper.get("universe_namespace", "daf_butler")
346 # If this is data exported before the reorganization of visits
347 # and visit systems and that new schema is in use, some filtering
348 # will be needed. The entry in the visit dimension record will be
349 # silently dropped when visit is created but the
350 # visit_system_membership must be constructed.
351 migrate_visit_system = False
352 if (
353 universe_version < 2
354 and universe_namespace == "daf_butler"
355 and "visit_system_membership" in self.butler.dimensions
356 ):
357 migrate_visit_system = True
359 # Drop "seeing" from visits in files older than version 1.
360 migrate_visit_seeing = False
361 if (
362 universe_version < 1
363 and universe_namespace == "daf_butler"
364 and "visit" in self.butler.dimensions
365 and "seeing" not in self.butler.dimensions["visit"].metadata
366 ):
367 migrate_visit_seeing = True
369 # If this data exported before group was a first-class dimension,
370 # we'll need to modify some exposure columns and add group records.
371 migrate_group = False
372 if (
373 universe_version < 6
374 and universe_namespace == "daf_butler"
375 and "exposure" in self.butler.dimensions
376 and "group" in self.butler.dimensions["exposure"].implied
377 ):
378 migrate_group = True
380 # If this data exported before day_obs was a first-class dimension,
381 # we'll need to modify some exposure and visit columns and add day_obs
382 # records. This is especially tricky because some files even predate
383 # the existence of data ID values.
384 migrate_exposure_day_obs = False
385 migrate_visit_day_obs = False
386 day_obs_ids: set[tuple[str, int]] = set()
387 if universe_version < 6 and universe_namespace == "daf_butler":
388 if (
389 "exposure" in self.butler.dimensions
390 and "day_obs" in self.butler.dimensions["exposure"].implied
391 ):
392 migrate_exposure_day_obs = True
393 if "visit" in self.butler.dimensions and "day_obs" in self.butler.dimensions["visit"].implied:
394 migrate_visit_day_obs = True
396 # If this is pre-v1 universe we may need to fill in a missing
397 # visit.day_obs field.
398 migrate_add_visit_day_obs = False
399 if (
400 universe_version < 1
401 and universe_namespace == "daf_butler"
402 and (
403 "day_obs" in self.butler.dimensions["visit"].implied
404 or "day_obs" in self.butler.dimensions["visit"].metadata
405 )
406 ):
407 migrate_add_visit_day_obs = True
409 # Some conversions may need to work out a day_obs timespan.
410 # The only way this offset can be found is by querying the instrument
411 # class. Read all the existing instrument classes indexed by name.
412 instrument_classes: dict[str, int] = {}
413 if migrate_exposure_day_obs or migrate_visit_day_obs or migrate_add_visit_day_obs:
414 day_obs_offset_calculator = _DayObsOffsetCalculator()
415 for rec in self.butler.registry.queryDimensionRecords("instrument"):
416 day_obs_offset_calculator[rec.name] = rec.class_name
418 datasetData = []
419 RecordClass: type[DimensionRecord]
420 for data in wrapper["data"]:
421 if data["type"] == "dimension":
422 # convert all datetime values to astropy
423 for record in data["records"]:
424 for key in record:
425 # Some older YAML files were produced with native
426 # YAML support for datetime, we support reading that
427 # data back. Newer conversion uses _AstropyTimeToYAML
428 # class with special YAML tag.
429 if isinstance(record[key], datetime):
430 record[key] = astropy.time.Time(record[key], scale="utc")
432 if data["element"] == "instrument":
433 if migrate_exposure_day_obs or migrate_visit_day_obs:
434 # Might want the instrument class name for later.
435 for record in data["records"]:
436 if record["name"] not in instrument_classes:
437 instrument_classes[record["name"]] = record["class_name"]
439 if data["element"] == "visit":
440 if migrate_visit_system:
441 # Must create the visit_system_membership records.
442 # But first create empty list for visits since other
443 # logic in this file depends on self.dimensions being
444 # populated in an order consistent with primary keys.
445 self.dimensions[self.butler.dimensions["visit"]] = []
446 element = self.butler.dimensions["visit_system_membership"]
447 RecordClass = element.RecordClass
448 self.dimensions[element].extend(
449 RecordClass(
450 instrument=r["instrument"], visit_system=r.pop("visit_system"), visit=r["id"]
451 )
452 for r in data["records"]
453 )
454 if migrate_visit_seeing:
455 for record in data["records"]:
456 record.pop("seeing", None)
457 if migrate_add_visit_day_obs:
458 # The day_obs field is missing. It can be derived from
459 # the datetime_begin field.
460 for record in data["records"]:
461 date = record["datetime_begin"].tai
462 offset = day_obs_offset_calculator.get_offset(record["instrument"], date)
463 # This field is required so we have to calculate
464 # it even if the offset is not defined.
465 if offset:
466 date = date - astropy.time.TimeDelta(offset, format="sec", scale="tai")
467 record["day_obs"] = int(date.strftime("%Y%m%d"))
468 if migrate_visit_day_obs:
469 # Poke the entry for this dimension to make sure it
470 # appears in the right order, even though we'll
471 # populate it later.
472 self.dimensions[self.butler.dimensions["day_obs"]]
473 for record in data["records"]:
474 day_obs_ids.add((record["instrument"], record["day_obs"]))
476 if data["element"] == "exposure":
477 if migrate_group:
478 element = self.butler.dimensions["group"]
479 RecordClass = element.RecordClass
480 group_records = self.dimensions[element]
481 for exposure_record in data["records"]:
482 exposure_record["group"] = exposure_record.pop("group_name")
483 del exposure_record["group_id"]
484 group_records.append(
485 RecordClass(
486 instrument=exposure_record["instrument"], name=exposure_record["group"]
487 )
488 )
489 if migrate_exposure_day_obs:
490 # Poke the entry for this dimension to make sure it
491 # appears in the right order, even though we'll
492 # populate it later.
493 for record in data["records"]:
494 day_obs_ids.add((record["instrument"], record["day_obs"]))
496 element = self.butler.dimensions[data["element"]]
497 RecordClass = element.RecordClass
498 self.dimensions[element].extend(RecordClass(**r) for r in data["records"])
500 elif data["type"] == "collection":
501 collectionType = CollectionType.from_name(data["collection_type"])
502 if collectionType is CollectionType.RUN:
503 self.runs[data["name"]] = (
504 data["host"],
505 Timespan(begin=data["timespan_begin"], end=data["timespan_end"]),
506 )
507 elif collectionType is CollectionType.CHAINED:
508 children = []
509 for child in data["children"]:
510 if not isinstance(child, str):
511 warnings.warn(
512 f"CHAINED collection {data['name']} includes restrictions on child "
513 "collection searches, which are no longer supported and will be ignored.",
514 stacklevel=find_outside_stacklevel("lsst.daf.butler"),
515 )
516 # Old form with dataset type restrictions only,
517 # supported for backwards compatibility.
518 child, _ = child
519 children.append(child)
520 self.chains[data["name"]] = children
521 else:
522 self.collections[data["name"]] = collectionType
523 doc = data.get("doc")
524 if doc is not None:
525 self.collectionDocs[data["name"]] = doc
526 elif data["type"] == "run":
527 # Also support old form of saving a run with no extra info.
528 self.runs[data["name"]] = (None, Timespan(None, None))
529 elif data["type"] == "dataset_type":
530 dimensions = data["dimensions"]
531 if migrate_visit_system and "visit" in dimensions and "visit_system" in dimensions:
532 dimensions.remove("visit_system")
533 self.datasetTypes.add(
534 DatasetType(
535 data["name"],
536 dimensions=dimensions,
537 storageClass=data["storage_class"],
538 universe=self.butler.dimensions,
539 isCalibration=data.get("is_calibration", False),
540 )
541 )
542 elif data["type"] == "dataset":
543 # Save raw dataset data for a second loop, so we can ensure we
544 # know about all dataset types first.
545 datasetData.append(data)
546 elif data["type"] == "associations":
547 collectionType = CollectionType.from_name(data["collection_type"])
548 if collectionType is CollectionType.TAGGED:
549 self.tagAssociations[data["collection"]].extend(
550 [x if not isinstance(x, int) else _refIntId2UUID[x] for x in data["dataset_ids"]]
551 )
552 elif collectionType is CollectionType.CALIBRATION:
553 assocsByTimespan = self.calibAssociations[data["collection"]]
554 for d in data["validity_ranges"]:
555 if "timespan" in d:
556 assocsByTimespan[d["timespan"]] = [
557 x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"]
558 ]
559 else:
560 # TODO: this is for backward compatibility, should
561 # be removed at some point.
562 assocsByTimespan[Timespan(begin=d["begin"], end=d["end"])] = [
563 x if not isinstance(x, int) else _refIntId2UUID[x] for x in d["dataset_ids"]
564 ]
565 else:
566 raise ValueError(f"Unexpected calibration type for association: {collectionType.name}.")
567 else:
568 raise ValueError(f"Unexpected dictionary type: {data['type']}.")
570 if day_obs_ids:
571 element = self.butler.dimensions["day_obs"]
572 RecordClass = element.RecordClass
573 missing_offsets = set()
574 for instrument, day_obs in day_obs_ids:
575 # To get the offset we need the astropy time. Since we are
576 # going from a day_obs to a time, it's possible that in some
577 # scenario the offset will be wrong.
578 ymd = str(day_obs)
579 t = astropy.time.Time(
580 f"{ymd[0:4]}-{ymd[4:6]}-{ymd[6:8]}T00:00:00", format="isot", scale="tai"
581 )
582 offset = day_obs_offset_calculator.get_offset(instrument, t)
584 # This should always return an offset but as a fallback
585 # allow None here in case something has gone wrong above.
586 # In particular, not being able to load an instrument class.
587 if offset is not None:
588 timespan = Timespan.from_day_obs(day_obs, offset=offset)
589 else:
590 timespan = None
591 missing_offsets.add(instrument)
592 self.dimensions[element].append(
593 RecordClass(instrument=instrument, id=day_obs, timespan=timespan)
594 )
596 if missing_offsets:
597 plural = "" if len(missing_offsets) == 1 else "s"
598 warnings.warn(
599 "Constructing day_obs records with no timespans for "
600 "visit/exposure records that were exported before day_obs was a dimension. "
601 f"(instrument{plural}: {missing_offsets})",
602 stacklevel=find_outside_stacklevel("lsst.daf.butler"),
603 )
605 # key is (dataset type name, run)
606 self.datasets: Mapping[tuple[str, str], list[FileDataset]] = defaultdict(list)
607 for data in datasetData:
608 datasetType = self.datasetTypes.get(data["dataset_type"])
609 if datasetType is None:
610 datasetType = self.butler.get_dataset_type(data["dataset_type"])
611 self.datasets[data["dataset_type"], data["run"]].extend(
612 FileDataset(
613 d.get("path"),
614 [
615 DatasetRef(
616 datasetType,
617 dataId,
618 run=data["run"],
619 id=refid if not isinstance(refid, int) else _refIntId2UUID[refid],
620 )
621 for dataId, refid in zip(
622 ensure_iterable(d["data_id"]), ensure_iterable(d["dataset_id"]), strict=True
623 )
624 ],
625 formatter=doImportType(d.get("formatter")) if "formatter" in d else None,
626 )
627 for d in data["records"]
628 )
630 def register(self) -> None:
631 # Docstring inherited from RepoImportBackend.register.
632 for datasetType in self.datasetTypes:
633 self.butler.registry.registerDatasetType(datasetType)
634 for run in self.runs:
635 self.butler.collections.register(run, doc=self.collectionDocs.get(run))
636 # No way to add extra run info to registry yet.
637 for collection, collection_type in self.collections.items():
638 self.butler.collections.register(
639 collection, collection_type, doc=self.collectionDocs.get(collection)
640 )
641 for chain, children in self.chains.items():
642 self.butler.collections.register(
643 chain, CollectionType.CHAINED, doc=self.collectionDocs.get(chain)
644 )
645 self.butler.registry.setCollectionChain(chain, children)
647 def load(
648 self,
649 datastore: Datastore | None,
650 *,
651 directory: ResourcePathExpression | None = None,
652 transfer: str | None = None,
653 skip_dimensions: set | None = None,
654 record_validation_info: bool = True,
655 ) -> None:
656 # Docstring inherited from RepoImportBackend.load.
657 # Must ensure we insert in order supported by the universe.
658 for element in self.butler.dimensions.sorted(self.dimensions.keys()):
659 dimensionRecords = self.dimensions[element]
660 if skip_dimensions and element in skip_dimensions:
661 continue
662 # Using skip_existing=True here assumes that the records in the
663 # database are either equivalent or at least preferable to the ones
664 # being imported. It'd be ideal to check that, but that would mean
665 # using syncDimensionData, which is not vectorized and is hence
666 # unacceptably slo.
667 self.butler.registry.insertDimensionData(element, *dimensionRecords, skip_existing=True)
668 # FileDatasets to ingest into the datastore (in bulk):
669 fileDatasets = []
670 for records in self.datasets.values():
671 # Make a big flattened list of all data IDs and dataset_ids, while
672 # remembering slices that associate them with the FileDataset
673 # instances they came from.
674 datasets: list[DatasetRef] = []
675 dataset_ids: list[DatasetId] = []
676 slices = []
677 for fileDataset in records:
678 start = len(datasets)
679 datasets.extend(fileDataset.refs)
680 dataset_ids.extend(ref.id for ref in fileDataset.refs)
681 stop = len(datasets)
682 slices.append(slice(start, stop))
683 # Insert all of those DatasetRefs at once.
684 # For now, we ignore the dataset_id we pulled from the file
685 # and just insert without one to get a new autoincrement value.
686 # Eventually (once we have origin in IDs) we'll preserve them.
687 resolvedRefs = self.butler.registry._importDatasets(datasets)
688 # Populate our dictionary that maps int dataset_id values from the
689 # export file to the new DatasetRefs
690 for fileId, ref in zip(dataset_ids, resolvedRefs, strict=True):
691 self.refsByFileId[fileId] = ref
692 # Now iterate over the original records, and install the new
693 # resolved DatasetRefs to replace the unresolved ones as we
694 # reorganize the collection information.
695 for sliceForFileDataset, fileDataset in zip(slices, records, strict=True):
696 fileDataset.refs = resolvedRefs[sliceForFileDataset]
697 if directory is not None:
698 fileDataset.path = ResourcePath(directory, forceDirectory=True).join(fileDataset.path)
699 fileDatasets.append(fileDataset)
700 # Ingest everything into the datastore at once.
701 if datastore is not None and fileDatasets:
702 datastore.ingest(*fileDatasets, transfer=transfer, record_validation_info=record_validation_info)
703 # Associate datasets with tagged collections.
704 for collection, dataset_ids in self.tagAssociations.items():
705 self.butler.registry.associate(collection, [self.refsByFileId[i] for i in dataset_ids])
706 # Associate datasets with calibration collections.
707 for collection, idsByTimespan in self.calibAssociations.items():
708 for timespan, dataset_ids in idsByTimespan.items():
709 self.butler.registry.certify(
710 collection, [self.refsByFileId[i] for i in dataset_ids], timespan
711 )