Coverage for python / lsst / daf / butler / direct_butler / _direct_butler.py: 10%

956 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Butler top level classes.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ( 

33 "ButlerValidationError", 

34 "DirectButler", 

35) 

36 

37import collections.abc 

38import contextlib 

39import io 

40import itertools 

41import math 

42import numbers 

43import os 

44import uuid 

45import warnings 

46from collections import Counter, defaultdict 

47from collections.abc import Collection, Iterable, Iterator, MutableMapping, Sequence 

48from functools import partial 

49from types import EllipsisType 

50from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, TextIO, cast 

51 

52from deprecated.sphinx import deprecated 

53from sqlalchemy.exc import IntegrityError 

54 

55from lsst.resources import ResourcePath, ResourcePathExpression 

56from lsst.utils.introspection import find_outside_stacklevel, get_class_of 

57from lsst.utils.iteration import chunk_iterable 

58from lsst.utils.logging import VERBOSE, getLogger 

59from lsst.utils.timer import time_this 

60 

61from .._butler import Butler, _DeprecatedDefault 

62from .._butler_config import ButlerConfig 

63from .._butler_instance_options import ButlerInstanceOptions 

64from .._butler_metrics import ButlerMetrics 

65from .._collection_type import CollectionType 

66from .._dataset_existence import DatasetExistence 

67from .._dataset_ref import DatasetRef 

68from .._dataset_type import DatasetType 

69from .._deferredDatasetHandle import DeferredDatasetHandle 

70from .._exceptions import DatasetNotFoundError, DimensionValueError, EmptyQueryResultError, ValidationError 

71from .._file_dataset import FileDataset 

72from .._limited_butler import LimitedButler 

73from .._query_all_datasets import QueryAllDatasetsParameters, query_all_datasets 

74from .._registry_shim import RegistryShim 

75from .._storage_class import StorageClass, StorageClassFactory 

76from .._timespan import Timespan 

77from ..datastore import Datastore, NullDatastore 

78from ..datastores.file_datastore.retrieve_artifacts import ZipIndex, retrieve_and_zip 

79from ..datastores.file_datastore.transfer import retrieve_file_transfer_records 

80from ..dimensions import DataCoordinate, Dimension, DimensionGroup 

81from ..direct_query_driver import DirectQueryDriver 

82from ..progress import Progress 

83from ..queries import Query 

84from ..registry import ( 

85 ConflictingDefinitionError, 

86 DataIdError, 

87 MissingDatasetTypeError, 

88 RegistryDefaults, 

89 _RegistryFactory, 

90) 

91from ..registry.sql_registry import SqlRegistry 

92from ..transfers import RepoExportContext 

93from ..utils import transactional 

94from ._direct_butler_collections import DirectButlerCollections 

95 

96if TYPE_CHECKING: 

97 from lsst.resources import ResourceHandleProtocol 

98 

99 from .._dataset_provenance import DatasetProvenance 

100 from .._dataset_ref import DatasetId 

101 from ..datastore import DatasetRefURIs 

102 from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse 

103 from ..registry import CollectionArgType, Registry 

104 from ..transfers import RepoImportBackend 

105 

106_LOG = getLogger(__name__) 

107 

108 

109class ButlerValidationError(ValidationError): 

110 """There is a problem with the Butler configuration.""" 

111 

112 pass 

113 

114 

115class DirectButler(Butler): # numpydoc ignore=PR02 

116 """Main entry point for the data access system. 

117 

118 Parameters 

119 ---------- 

120 config : `ButlerConfig` 

121 The configuration for this Butler instance. 

122 registry : `SqlRegistry` 

123 The object that manages dataset metadata and relationships. 

124 datastore : Datastore 

125 The object that manages actual dataset storage. 

126 storageClasses : StorageClassFactory 

127 An object that maps known storage class names to objects that fully 

128 describe them. 

129 

130 Notes 

131 ----- 

132 Most users should call the top-level `Butler`.``from_config`` instead of 

133 using this constructor directly. 

134 """ 

135 

136 # This is __new__ instead of __init__ because we have to support 

137 # instantiation via the legacy constructor Butler.__new__(), which 

138 # reads the configuration and selects which subclass to instantiate. The 

139 # interaction between __new__ and __init__ is kind of wacky in Python. If 

140 # we were using __init__ here, __init__ would be called twice (once when 

141 # the DirectButler instance is constructed inside Butler.from_config(), and 

142 # a second time with the original arguments to Butler() when the instance 

143 # is returned from Butler.__new__() 

144 def __new__( 

145 cls, 

146 *, 

147 config: ButlerConfig, 

148 registry: SqlRegistry, 

149 datastore: Datastore, 

150 storageClasses: StorageClassFactory, 

151 metrics: ButlerMetrics | None = None, 

152 ) -> DirectButler: 

153 self = cast(DirectButler, super().__new__(cls)) 

154 self._config = config 

155 self._registry = registry 

156 self._datastore = datastore 

157 self.storageClasses = storageClasses 

158 self._metrics = metrics if metrics is not None else ButlerMetrics() 

159 

160 # For execution butler the datastore needs a special 

161 # dependency-inversion trick. This is not used by regular butler, 

162 # but we do not have a way to distinguish regular butler from execution 

163 # butler. 

164 self._datastore.set_retrieve_dataset_type_method(partial(_retrieve_dataset_type, registry)) 

165 

166 self._closed = False 

167 

168 return self 

169 

170 @classmethod 

171 def create_from_config( 

172 cls, 

173 config: ButlerConfig, 

174 *, 

175 options: ButlerInstanceOptions, 

176 without_datastore: bool = False, 

177 ) -> DirectButler: 

178 """Construct a Butler instance from a configuration file. 

179 

180 Parameters 

181 ---------- 

182 config : `ButlerConfig` 

183 The configuration for this Butler instance. 

184 options : `ButlerInstanceOptions` 

185 Default values and other settings for the Butler instance. 

186 without_datastore : `bool`, optional 

187 If `True` do not attach a datastore to this butler. Any attempts 

188 to use a datastore will fail. 

189 

190 Notes 

191 ----- 

192 Most users should call the top-level `Butler`.``from_config`` 

193 instead of using this function directly. 

194 """ 

195 if "run" in config or "collection" in config: 

196 raise ValueError("Passing a run or collection via configuration is no longer supported.") 

197 

198 defaults = RegistryDefaults.from_butler_instance_options(options) 

199 try: 

200 butlerRoot = config.get("root", config.configDir) 

201 writeable = options.writeable 

202 if writeable is None: 

203 writeable = options.run is not None 

204 registry = _RegistryFactory(config).from_config( 

205 butlerRoot=butlerRoot, writeable=writeable, defaults=defaults 

206 ) 

207 if without_datastore: 

208 datastore: Datastore = NullDatastore(None, None) 

209 else: 

210 datastore = Datastore.fromConfig( 

211 config, registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot 

212 ) 

213 # TODO: Once datastore drops dependency on registry we can 

214 # construct datastore first and pass opaque tables to registry 

215 # constructor. 

216 registry.make_datastore_tables(datastore.get_opaque_table_definitions()) 

217 storageClasses = StorageClassFactory() 

218 storageClasses.addFromConfig(config) 

219 

220 return DirectButler( 

221 config=config, 

222 registry=registry, 

223 datastore=datastore, 

224 storageClasses=storageClasses, 

225 metrics=options.metrics, 

226 ) 

227 except Exception: 

228 # Failures here usually mean that configuration is incomplete, 

229 # just issue an error message which includes config file URI. 

230 _LOG.error(f"Failed to instantiate Butler from config {config.configFile}.") 

231 raise 

232 

233 def clone( 

234 self, 

235 *, 

236 collections: CollectionArgType | None | EllipsisType = ..., 

237 run: str | None | EllipsisType = ..., 

238 inferDefaults: bool | EllipsisType = ..., 

239 dataId: dict[str, str] | EllipsisType = ..., 

240 metrics: ButlerMetrics | None = None, 

241 ) -> DirectButler: 

242 # Docstring inherited 

243 defaults = self._registry.defaults.clone(collections, run, inferDefaults, dataId) 

244 registry = self._registry.copy(defaults) 

245 

246 return DirectButler( 

247 registry=registry, 

248 config=self._config, 

249 datastore=self._datastore.clone(registry.getDatastoreBridgeManager()), 

250 storageClasses=self.storageClasses, 

251 metrics=metrics, 

252 ) 

253 

254 def close(self) -> None: 

255 if not self._closed: 

256 self._closed = True 

257 self._registry.close() 

258 # Cause exceptions to be raised if a user attempts to use the 

259 # instance after closing it. Without this, Butler would still 

260 # work after being closed because of implementation details 

261 # of SqlAlchemy, but this may not continue to be the case in the 

262 # future and we don't want users to get in the habit of doing this. 

263 self._registry = _BUTLER_CLOSED_INSTANCE 

264 self._datastore = _BUTLER_CLOSED_INSTANCE 

265 

266 GENERATION: ClassVar[int] = 3 

267 """This is a Generation 3 Butler. 

268 

269 This attribute may be removed in the future, once the Generation 2 Butler 

270 interface has been fully retired; it should only be used in transitional 

271 code. 

272 """ 

273 

274 @classmethod 

275 def _unpickle( 

276 cls, 

277 config: ButlerConfig, 

278 collections: tuple[str, ...] | None, 

279 run: str | None, 

280 defaultDataId: dict[str, str], 

281 writeable: bool, 

282 ) -> DirectButler: 

283 """Callable used to unpickle a Butler. 

284 

285 We prefer not to use ``Butler.__init__`` directly so we can force some 

286 of its many arguments to be keyword-only (note that ``__reduce__`` 

287 can only invoke callables with positional arguments). 

288 

289 Parameters 

290 ---------- 

291 config : `ButlerConfig` 

292 Butler configuration, already coerced into a true `ButlerConfig` 

293 instance (and hence after any search paths for overrides have been 

294 utilized). 

295 collections : `tuple` [ `str` ] 

296 Names of the default collections to read from. 

297 run : `str`, optional 

298 Name of the default `~CollectionType.RUN` collection to write to. 

299 defaultDataId : `dict` [ `str`, `str` ] 

300 Default data ID values. 

301 writeable : `bool` 

302 Whether the Butler should support write operations. 

303 

304 Returns 

305 ------- 

306 butler : `Butler` 

307 A new `Butler` instance. 

308 """ 

309 return cls.create_from_config( 

310 config=config, 

311 options=ButlerInstanceOptions( 

312 collections=collections, run=run, writeable=writeable, kwargs=defaultDataId 

313 ), 

314 ) 

315 

316 def __reduce__(self) -> tuple: 

317 """Support pickling.""" 

318 return ( 

319 DirectButler._unpickle, 

320 ( 

321 self._config, 

322 self.collections.defaults, 

323 self.run, 

324 dict(self._registry.defaults.dataId.required), 

325 self._registry.isWriteable(), 

326 ), 

327 ) 

328 

329 def __str__(self) -> str: 

330 return ( 

331 f"Butler(collections={self.collections}, run={self.run}, " 

332 f"datastore='{self._datastore}', registry='{self._registry}')" 

333 ) 

334 

335 def isWriteable(self) -> bool: 

336 # Docstring inherited. 

337 return self._registry.isWriteable() 

338 

339 def _caching_context(self) -> contextlib.AbstractContextManager[None]: 

340 """Context manager that enables caching.""" 

341 return self._registry.caching_context() 

342 

343 @contextlib.contextmanager 

344 def transaction(self) -> Iterator[None]: 

345 """Context manager supporting `Butler` transactions. 

346 

347 Transactions can be nested. 

348 """ 

349 with self._registry.transaction(), self._datastore.transaction(): 

350 yield 

351 

352 def _standardizeArgs( 

353 self, 

354 datasetRefOrType: DatasetRef | DatasetType | str, 

355 dataId: DataId | None = None, 

356 for_put: bool = True, 

357 **kwargs: Any, 

358 ) -> tuple[DatasetType, DataId | None]: 

359 """Standardize the arguments passed to several Butler APIs. 

360 

361 Parameters 

362 ---------- 

363 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

364 When `DatasetRef` the `dataId` should be `None`. 

365 Otherwise the `DatasetType` or name thereof. 

366 dataId : `dict` or `DataCoordinate` 

367 A `dict` of `Dimension` link name, value pairs that label the 

368 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

369 should be provided as the second argument. 

370 for_put : `bool`, optional 

371 If `True` this call is invoked as part of a `Butler.put`. 

372 Otherwise it is assumed to be part of a `Butler.get()`. This 

373 parameter is only relevant if there is dataset type 

374 inconsistency. 

375 **kwargs 

376 Additional keyword arguments used to augment or construct a 

377 `DataCoordinate`. See `DataCoordinate.standardize` 

378 parameters. 

379 

380 Returns 

381 ------- 

382 datasetType : `DatasetType` 

383 A `DatasetType` instance extracted from ``datasetRefOrType``. 

384 dataId : `dict` or `DataId`, optional 

385 Argument that can be used (along with ``kwargs``) to construct a 

386 `DataId`. 

387 

388 Notes 

389 ----- 

390 Butler APIs that conceptually need a DatasetRef also allow passing a 

391 `DatasetType` (or the name of one) and a `DataId` (or a dict and 

392 keyword arguments that can be used to construct one) separately. This 

393 method accepts those arguments and always returns a true `DatasetType` 

394 and a `DataId` or `dict`. 

395 

396 Standardization of `dict` vs `DataId` is best handled by passing the 

397 returned ``dataId`` (and ``kwargs``) to `Registry` APIs, which are 

398 generally similarly flexible. 

399 """ 

400 externalDatasetType: DatasetType | None = None 

401 internalDatasetType: DatasetType | None = None 

402 if isinstance(datasetRefOrType, DatasetRef): 

403 if dataId is not None or kwargs: 

404 raise ValueError("DatasetRef given, cannot use dataId as well") 

405 externalDatasetType = datasetRefOrType.datasetType 

406 dataId = datasetRefOrType.dataId 

407 else: 

408 # Don't check whether DataId is provided, because Registry APIs 

409 # can usually construct a better error message when it wasn't. 

410 if isinstance(datasetRefOrType, DatasetType): 

411 externalDatasetType = datasetRefOrType 

412 else: 

413 internalDatasetType = self.get_dataset_type(datasetRefOrType) 

414 

415 # Check that they are self-consistent 

416 if externalDatasetType is not None: 

417 internalDatasetType = self.get_dataset_type(externalDatasetType.name) 

418 if externalDatasetType != internalDatasetType: 

419 # We can allow differences if they are compatible, depending 

420 # on whether this is a get or a put. A get requires that 

421 # the python type associated with the datastore can be 

422 # converted to the user type. A put requires that the user 

423 # supplied python type can be converted to the internal 

424 # type expected by registry. 

425 relevantDatasetType = internalDatasetType 

426 if for_put: 

427 is_compatible = internalDatasetType.is_compatible_with(externalDatasetType) 

428 else: 

429 is_compatible = externalDatasetType.is_compatible_with(internalDatasetType) 

430 relevantDatasetType = externalDatasetType 

431 if not is_compatible: 

432 raise ValueError( 

433 f"Supplied dataset type ({externalDatasetType}) inconsistent with " 

434 f"registry definition ({internalDatasetType})" 

435 ) 

436 # Override the internal definition. 

437 internalDatasetType = relevantDatasetType 

438 

439 assert internalDatasetType is not None 

440 return internalDatasetType, dataId 

441 

442 def _rewrite_data_id( 

443 self, dataId: DataId | None, datasetType: DatasetType, **kwargs: Any 

444 ) -> tuple[DataId | None, dict[str, Any]]: 

445 """Rewrite a data ID taking into account dimension records. 

446 

447 Take a Data ID and keyword args and rewrite it if necessary to 

448 allow the user to specify dimension records rather than dimension 

449 primary values. 

450 

451 This allows a user to include a dataId dict with keys of 

452 ``exposure.day_obs`` and ``exposure.seq_num`` instead of giving 

453 the integer exposure ID. It also allows a string to be given 

454 for a dimension value rather than the integer ID if that is more 

455 convenient. For example, rather than having to specifying the 

456 detector with ``detector.full_name``, a string given for ``detector`` 

457 will be interpreted as the full name and converted to the integer 

458 value. 

459 

460 Keyword arguments can also use strings for dimensions like detector 

461 and exposure but python does not allow them to include ``.`` and 

462 so the ``exposure.day_obs`` syntax can not be used in a keyword 

463 argument. 

464 

465 Parameters 

466 ---------- 

467 dataId : `dict` or `DataCoordinate` 

468 A `dict` of `Dimension` link name, value pairs that will label the 

469 `DatasetRef` within a Collection. 

470 datasetType : `DatasetType` 

471 The dataset type associated with this dataId. Required to 

472 determine the relevant dimensions. 

473 **kwargs 

474 Additional keyword arguments used to augment or construct a 

475 `DataId`. See `DataId` parameters. 

476 

477 Returns 

478 ------- 

479 dataId : `dict` or `DataCoordinate` 

480 The, possibly rewritten, dataId. If given a `DataCoordinate` and 

481 no keyword arguments, the original dataId will be returned 

482 unchanged. 

483 **kwargs : `dict` 

484 Any unused keyword arguments (would normally be empty dict). 

485 """ 

486 # Process dimension records that are using record information 

487 # rather than ids 

488 newDataId: dict[str, DataIdValue] = {} 

489 byRecord: dict[str, dict[str, Any]] = defaultdict(dict) 

490 

491 if isinstance(dataId, DataCoordinate): 

492 # Do nothing if we have a DataCoordinate and no kwargs. 

493 if not kwargs: 

494 return dataId, kwargs 

495 # If we have a DataCoordinate with kwargs, we know the 

496 # DataCoordinate only has values for real dimensions. 

497 newDataId.update(dataId.mapping) 

498 elif dataId: 

499 # The data is mapping, which means it might have keys like 

500 # "exposure.obs_id" (unlike kwargs, because a "." is not allowed in 

501 # a keyword parameter). 

502 for k, v in dataId.items(): 

503 if isinstance(k, str) and "." in k: 

504 # Someone is using a more human-readable dataId 

505 dimensionName, record = k.split(".", 1) 

506 byRecord[dimensionName][record] = v 

507 else: 

508 newDataId[k] = v 

509 

510 # Go through the updated dataId and check the type in case someone is 

511 # using an alternate key. We have already filtered out the compound 

512 # keys dimensions.record format. 

513 not_dimensions = {} 

514 

515 # Will need to look in the dataId and the keyword arguments 

516 # and will remove them if they need to be fixed or are unrecognized. 

517 for dataIdDict in (newDataId, kwargs): 

518 # Use a list so we can adjust the dict safely in the loop 

519 for dimensionName in list(dataIdDict): 

520 value = dataIdDict[dimensionName] 

521 try: 

522 dimension = self.dimensions.dimensions[dimensionName] 

523 except KeyError: 

524 # This is not a real dimension 

525 not_dimensions[dimensionName] = value 

526 del dataIdDict[dimensionName] 

527 continue 

528 

529 # Convert an integral type to an explicit int to simplify 

530 # comparisons here 

531 if isinstance(value, numbers.Integral): 

532 value = int(value) 

533 

534 if not isinstance(value, dimension.primaryKey.getPythonType()): 

535 for alternate in dimension.alternateKeys: 

536 if isinstance(value, alternate.getPythonType()): 

537 byRecord[dimensionName][alternate.name] = value 

538 del dataIdDict[dimensionName] 

539 _LOG.debug( 

540 "Converting dimension %s to %s.%s=%s", 

541 dimensionName, 

542 dimensionName, 

543 alternate.name, 

544 value, 

545 ) 

546 break 

547 else: 

548 _LOG.warning( 

549 "Type mismatch found for value '%r' provided for dimension %s. " 

550 "Could not find matching alternative (primary key has type %s) " 

551 "so attempting to use as-is.", 

552 value, 

553 dimensionName, 

554 dimension.primaryKey.getPythonType(), 

555 ) 

556 

557 # By this point kwargs and newDataId should only include valid 

558 # dimensions. Merge kwargs in to the new dataId and log if there 

559 # are dimensions in both (rather than calling update). 

560 for k, v in kwargs.items(): 

561 if k in newDataId and newDataId[k] != v: 

562 _LOG.debug( 

563 "Keyword arg %s overriding explicit value in dataId of %s with %s", k, newDataId[k], v 

564 ) 

565 newDataId[k] = v 

566 # No need to retain any values in kwargs now. 

567 kwargs = {} 

568 

569 # If we have some unrecognized dimensions we have to try to connect 

570 # them to records in other dimensions. This is made more complicated 

571 # by some dimensions having records with clashing names. A mitigation 

572 # is that we can tell by this point which dimensions are missing 

573 # for the DatasetType but this does not work for calibrations 

574 # where additional dimensions can be used to constrain the temporal 

575 # axis. 

576 if not_dimensions: 

577 # Search for all dimensions even if we have been given a value 

578 # explicitly. In some cases records are given as well as the 

579 # actually dimension and this should not be an error if they 

580 # match. 

581 mandatoryDimensions = datasetType.dimensions.names # - provided 

582 

583 candidateDimensions: set[str] = set() 

584 candidateDimensions.update(mandatoryDimensions) 

585 

586 # For calibrations we may well be needing temporal dimensions 

587 # so rather than always including all dimensions in the scan 

588 # restrict things a little. It is still possible for there 

589 # to be confusion over day_obs in visit vs exposure for example. 

590 # If we are not searching calibration collections things may 

591 # fail but they are going to fail anyway because of the 

592 # ambiguousness of the dataId... 

593 if datasetType.isCalibration(): 

594 for dim in self.dimensions.dimensions: 

595 if dim.temporal: 

596 candidateDimensions.add(str(dim)) 

597 

598 # Look up table for the first association with a dimension 

599 guessedAssociation: dict[str, dict[str, Any]] = defaultdict(dict) 

600 

601 # Keep track of whether an item is associated with multiple 

602 # dimensions. 

603 counter: Counter[str] = Counter() 

604 assigned: dict[str, set[str]] = defaultdict(set) 

605 

606 # Go through the missing dimensions and associate the 

607 # given names with records within those dimensions 

608 matched_dims = set() 

609 for dimensionName in candidateDimensions: 

610 dimension = self.dimensions.dimensions[dimensionName] 

611 fields = dimension.metadata.names | dimension.uniqueKeys.names 

612 for field in not_dimensions: 

613 if field in fields: 

614 guessedAssociation[dimensionName][field] = not_dimensions[field] 

615 counter[dimensionName] += 1 

616 assigned[field].add(dimensionName) 

617 matched_dims.add(field) 

618 

619 # Calculate the fields that matched nothing. 

620 never_found = set(not_dimensions) - matched_dims 

621 

622 if never_found: 

623 raise DimensionValueError(f"Unrecognized keyword args given: {never_found}") 

624 

625 # There is a chance we have allocated a single dataId item 

626 # to multiple dimensions. Need to decide which should be retained. 

627 # For now assume that the most popular alternative wins. 

628 # This means that day_obs with seq_num will result in 

629 # exposure.day_obs and not visit.day_obs 

630 # Also prefer an explicitly missing dimension over an inferred 

631 # temporal dimension. 

632 for fieldName, assignedDimensions in assigned.items(): 

633 if len(assignedDimensions) > 1: 

634 # Pick the most popular (preferring mandatory dimensions) 

635 requiredButMissing = assignedDimensions.intersection(mandatoryDimensions) 

636 if requiredButMissing: 

637 candidateDimensions = requiredButMissing 

638 else: 

639 candidateDimensions = assignedDimensions 

640 

641 # If this is a choice between visit and exposure and 

642 # neither was a required part of the dataset type, 

643 # (hence in this branch) always prefer exposure over 

644 # visit since exposures are always defined and visits 

645 # are defined from exposures. 

646 if candidateDimensions == {"exposure", "visit"}: 

647 candidateDimensions = {"exposure"} 

648 

649 # Select the relevant items and get a new restricted 

650 # counter. 

651 theseCounts = {k: v for k, v in counter.items() if k in candidateDimensions} 

652 duplicatesCounter: Counter[str] = Counter() 

653 duplicatesCounter.update(theseCounts) 

654 

655 # Choose the most common. If they are equally common 

656 # we will pick the one that was found first. 

657 # Returns a list of tuples 

658 selected = duplicatesCounter.most_common(1)[0][0] 

659 

660 _LOG.debug( 

661 "Ambiguous dataId entry '%s' associated with multiple dimensions: %s." 

662 " Removed ambiguity by choosing dimension %s.", 

663 fieldName, 

664 ", ".join(assignedDimensions), 

665 selected, 

666 ) 

667 

668 for candidateDimension in assignedDimensions: 

669 if candidateDimension != selected: 

670 del guessedAssociation[candidateDimension][fieldName] 

671 

672 # Update the record look up dict with the new associations 

673 for dimensionName, values in guessedAssociation.items(): 

674 if values: # A dict might now be empty 

675 _LOG.debug( 

676 "Assigned non-dimension dataId keys to dimension %s: %s", dimensionName, values 

677 ) 

678 byRecord[dimensionName].update(values) 

679 

680 if byRecord: 

681 # Some record specifiers were found so we need to convert 

682 # them to the Id form 

683 for dimensionName, values in byRecord.items(): 

684 if dimensionName in newDataId: 

685 _LOG.debug( 

686 "DataId specified explicit %s dimension value of %s in addition to" 

687 " general record specifiers for it of %s. Checking for self-consistency.", 

688 dimensionName, 

689 newDataId[dimensionName], 

690 str(values), 

691 ) 

692 # Get the actual record and compare with these values. 

693 # Only query with relevant data ID values. 

694 filtered_data_id = { 

695 k: v for k, v in newDataId.items() if k in self.dimensions[dimensionName].required 

696 } 

697 try: 

698 recs = self.query_dimension_records( 

699 dimensionName, 

700 data_id=filtered_data_id, 

701 ) 

702 except (DataIdError, EmptyQueryResultError): 

703 raise DimensionValueError( 

704 f"Could not find dimension '{dimensionName}'" 

705 f" with dataId {filtered_data_id} as part of comparing with" 

706 f" record values {byRecord[dimensionName]}" 

707 ) from None 

708 if len(recs) == 1: 

709 errmsg: list[str] = [] 

710 for k, v in values.items(): 

711 if (recval := getattr(recs[0], k)) != v: 

712 errmsg.append(f"{k} ({recval} != {v})") 

713 if errmsg: 

714 raise DimensionValueError( 

715 f"Dimension {dimensionName} in dataId has explicit value" 

716 f" {newDataId[dimensionName]} inconsistent with" 

717 f" {dimensionName} dimension record: " + ", ".join(errmsg) 

718 ) 

719 else: 

720 # Multiple matches for an explicit dimension 

721 # should never happen but let downstream complain. 

722 pass 

723 continue 

724 

725 # Do not use data ID keys in query that aren't relevant. 

726 # Otherwise we can have detector queries being constrained 

727 # by an exposure ID that doesn't exist and return no matches 

728 # for a detector even though it's a good detector name. 

729 filtered_data_id = { 

730 k: v 

731 for k, v in newDataId.items() 

732 if k in self.dimensions[dimensionName].minimal_group.names 

733 } 

734 

735 def _get_attr(obj: Any, attr: str) -> Any: 

736 # Used to implement x.exposure.seq_num when given 

737 # x and "exposure.seq_num". 

738 for component in attr.split("."): 

739 obj = getattr(obj, component) 

740 return obj 

741 

742 with self.query() as q: 

743 x = q.expression_factory 

744 # Build up a WHERE expression. 

745 predicates = tuple(_get_attr(x, f"{dimensionName}.{k}") == v for k, v in values.items()) 

746 extra_args: dict[str, Any] = {} # For mypy. 

747 extra_args.update(filtered_data_id) 

748 extra_args.update(kwargs) 

749 q = q.where(x.all(*predicates), **extra_args) 

750 records = set(q.dimension_records(dimensionName)) 

751 

752 if len(records) != 1: 

753 if len(records) > 1: 

754 # visit can have an ambiguous answer without involving 

755 # visit_system. The default visit_system is defined 

756 # by the instrument. 

757 if ( 

758 dimensionName == "visit" 

759 and "visit_system_membership" in self.dimensions 

760 and "visit_system" in self.dimensions["instrument"].metadata 

761 ): 

762 instrument_records = self.query_dimension_records( 

763 "instrument", 

764 data_id=newDataId, 

765 explain=False, 

766 **kwargs, 

767 ) 

768 if len(instrument_records) == 1: 

769 visit_system = instrument_records[0].visit_system 

770 if visit_system is None: 

771 # Set to a value that will never match. 

772 visit_system = -1 

773 

774 # Look up each visit in the 

775 # visit_system_membership records. 

776 for rec in records: 

777 membership = self.query_dimension_records( 

778 # Use bind to allow zero results. 

779 # This is a fully-specified query. 

780 "visit_system_membership", 

781 instrument=instrument_records[0].name, 

782 visit_system=visit_system, 

783 visit=rec.id, 

784 explain=False, 

785 ) 

786 if membership: 

787 # This record is the right answer. 

788 records = {rec} 

789 break 

790 

791 # The ambiguity may have been resolved so check again. 

792 if len(records) > 1: 

793 _LOG.debug( 

794 "Received %d records from constraints of %s", len(records), str(values) 

795 ) 

796 for r in records: 

797 _LOG.debug("- %s", str(r)) 

798 raise DimensionValueError( 

799 f"DataId specification for dimension {dimensionName} is not" 

800 f" uniquely constrained to a single dataset by {values}." 

801 f" Got {len(records)} results." 

802 ) 

803 else: 

804 raise DimensionValueError( 

805 f"DataId specification for dimension {dimensionName} matched no" 

806 f" records when constrained by {values}" 

807 ) 

808 

809 # Get the primary key from the real dimension object 

810 dimension = self.dimensions.dimensions[dimensionName] 

811 if not isinstance(dimension, Dimension): 

812 raise RuntimeError( 

813 f"{dimension.name} is not a true dimension, and cannot be used in data IDs." 

814 ) 

815 newDataId[dimensionName] = getattr(records.pop(), dimension.primaryKey.name) 

816 

817 return newDataId, kwargs 

818 

819 def _findDatasetRef( 

820 self, 

821 datasetRefOrType: DatasetRef | DatasetType | str, 

822 dataId: DataId | None = None, 

823 *, 

824 collections: Any = None, 

825 predict: bool = False, 

826 run: str | None = None, 

827 datastore_records: bool = False, 

828 timespan: Timespan | None = None, 

829 **kwargs: Any, 

830 ) -> DatasetRef: 

831 """Shared logic for methods that start with a search for a dataset in 

832 the registry. 

833 

834 Parameters 

835 ---------- 

836 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

837 When `DatasetRef` the `dataId` should be `None`. 

838 Otherwise the `DatasetType` or name thereof. 

839 dataId : `dict` or `DataCoordinate`, optional 

840 A `dict` of `Dimension` link name, value pairs that label the 

841 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

842 should be provided as the first argument. 

843 collections : Any, optional 

844 Collections to be searched, overriding ``self.collections``. 

845 Can be any of the types supported by the ``collections`` argument 

846 to butler construction. 

847 predict : `bool`, optional 

848 If `True`, return a newly created `DatasetRef` with a unique 

849 dataset ID if finding a reference in the `Registry` fails. 

850 Defaults to `False`. 

851 run : `str`, optional 

852 Run collection name to use for creating `DatasetRef` for predicted 

853 datasets. Only used if ``predict`` is `True`. 

854 datastore_records : `bool`, optional 

855 If `True` add datastore records to returned `DatasetRef`. 

856 timespan : `Timespan` or `None`, optional 

857 A timespan that the validity range of the dataset must overlap. 

858 If not provided and this is a calibration dataset type, an attempt 

859 will be made to find the timespan from any temporal coordinate 

860 in the data ID. 

861 **kwargs 

862 Additional keyword arguments used to augment or construct a 

863 `DataId`. See `DataId` parameters. 

864 

865 Returns 

866 ------- 

867 ref : `DatasetRef` 

868 A reference to the dataset identified by the given arguments. 

869 This can be the same dataset reference as given if it was 

870 resolved. 

871 

872 Raises 

873 ------ 

874 LookupError 

875 Raised if no matching dataset exists in the `Registry` (and 

876 ``predict`` is `False`). 

877 ValueError 

878 Raised if a resolved `DatasetRef` was passed as an input, but it 

879 differs from the one found in the registry. 

880 TypeError 

881 Raised if no collections were provided. 

882 """ 

883 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, for_put=False, **kwargs) 

884 if isinstance(datasetRefOrType, DatasetRef): 

885 if collections is not None: 

886 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3) 

887 if predict and not datasetRefOrType.dataId.hasRecords(): 

888 return datasetRefOrType.expanded(self.registry.expandDataId(datasetRefOrType.dataId)) 

889 # May need to retrieve datastore records if requested. 

890 if datastore_records and datasetRefOrType._datastore_records is None: 

891 datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType) 

892 return datasetRefOrType 

893 

894 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs) 

895 

896 if datasetType.isCalibration(): 

897 # Because this is a calibration dataset, first try to make a 

898 # standardize the data ID without restricting the dimensions to 

899 # those of the dataset type requested, because there may be extra 

900 # dimensions that provide temporal information for a validity-range 

901 # lookup. 

902 dataId = DataCoordinate.standardize( 

903 dataId, universe=self.dimensions, defaults=self._registry.defaults.dataId, **kwargs 

904 ) 

905 if timespan is None: 

906 if dataId.dimensions.temporal: 

907 dataId = self._registry.expandDataId(dataId) 

908 # Use the timespan from the data ID to constrain the 

909 # calibration lookup, but only if the caller has not 

910 # specified an explicit timespan. 

911 timespan = dataId.timespan 

912 else: 

913 # Try an arbitrary timespan. Downstream will fail if this 

914 # results in more than one matching dataset. 

915 timespan = Timespan(None, None) 

916 else: 

917 # Standardize the data ID to just the dimensions of the dataset 

918 # type instead of letting registry.findDataset do it, so we get the 

919 # result even if no dataset is found. 

920 dataId = DataCoordinate.standardize( 

921 dataId, 

922 dimensions=datasetType.dimensions, 

923 defaults=self._registry.defaults.dataId, 

924 **kwargs, 

925 ) 

926 # Always lookup the DatasetRef, even if one is given, to ensure it is 

927 # present in the current collection. 

928 ref = self.find_dataset( 

929 datasetType, 

930 dataId, 

931 collections=collections, 

932 timespan=timespan, 

933 datastore_records=datastore_records, 

934 ) 

935 if ref is None: 

936 if predict: 

937 if run is None: 

938 run = self.run 

939 if run is None: 

940 raise TypeError("Cannot predict dataset ID/location with run=None.") 

941 dataId = self.registry.expandDataId(dataId) 

942 return DatasetRef(datasetType, dataId, run=run) 

943 else: 

944 if collections is None: 

945 collections = self._registry.defaults.collections 

946 raise DatasetNotFoundError( 

947 f"Dataset {datasetType.name} with data ID {dataId} " 

948 f"could not be found in collections {collections}." 

949 ) 

950 if datasetType != ref.datasetType: 

951 # If they differ it is because the user explicitly specified 

952 # a compatible dataset type to this call rather than using the 

953 # registry definition. The DatasetRef must therefore be recreated 

954 # using the user definition such that the expected type is 

955 # returned. 

956 ref = DatasetRef( 

957 datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records 

958 ) 

959 

960 return ref 

961 

962 @transactional 

963 def put( 

964 self, 

965 obj: Any, 

966 datasetRefOrType: DatasetRef | DatasetType | str, 

967 /, 

968 dataId: DataId | None = None, 

969 *, 

970 run: str | None = None, 

971 provenance: DatasetProvenance | None = None, 

972 **kwargs: Any, 

973 ) -> DatasetRef: 

974 """Store and register a dataset. 

975 

976 Parameters 

977 ---------- 

978 obj : `object` 

979 The dataset. 

980 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

981 When `DatasetRef` is provided, ``dataId`` should be `None`. 

982 Otherwise the `DatasetType` or name thereof. If a fully resolved 

983 `DatasetRef` is given the run and ID are used directly. 

984 dataId : `dict` or `DataCoordinate` 

985 A `dict` of `Dimension` link name, value pairs that label the 

986 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

987 should be provided as the second argument. 

988 run : `str`, optional 

989 The name of the run the dataset should be added to, overriding 

990 ``self.run``. Not used if a resolved `DatasetRef` is provided. 

991 provenance : `DatasetProvenance` or `None`, optional 

992 Any provenance that should be attached to the serialized dataset. 

993 Not supported by all serialization mechanisms. 

994 **kwargs 

995 Additional keyword arguments used to augment or construct a 

996 `DataCoordinate`. See `DataCoordinate.standardize` 

997 parameters. Not used if a resolve `DatasetRef` is provided. 

998 

999 Returns 

1000 ------- 

1001 ref : `DatasetRef` 

1002 A reference to the stored dataset, updated with the correct id if 

1003 given. 

1004 

1005 Raises 

1006 ------ 

1007 TypeError 

1008 Raised if the butler is read-only or if no run has been provided. 

1009 """ 

1010 if isinstance(datasetRefOrType, DatasetRef): 

1011 # This is a direct put of predefined DatasetRef. 

1012 _LOG.debug("Butler put direct: %s", datasetRefOrType) 

1013 if run is not None: 

1014 warnings.warn("Run collection is not used for DatasetRef", stacklevel=3) 

1015 

1016 with self._metrics.instrument_put(_LOG, msg="Dataset put direct"): 

1017 # If registry already has a dataset with the same dataset ID, 

1018 # dataset type and DataId, then _importDatasets will do 

1019 # nothing and just return an original ref. We have to raise in 

1020 # this case, there is a datastore check below for that. 

1021 self._registry._importDatasets([datasetRefOrType], expand=True) 

1022 # Before trying to write to the datastore check that it does 

1023 # not know this dataset. This is prone to races, of course. 

1024 if self._datastore.knows(datasetRefOrType): 

1025 raise ConflictingDefinitionError( 

1026 f"Datastore already contains dataset: {datasetRefOrType}" 

1027 ) 

1028 # Try to write dataset to the datastore, if it fails due to a 

1029 # race with another write, the content of stored data may be 

1030 # unpredictable. 

1031 try: 

1032 self._datastore.put(obj, datasetRefOrType, provenance=provenance) 

1033 except IntegrityError as e: 

1034 raise ConflictingDefinitionError(f"Datastore already contains dataset: {e}") from e 

1035 

1036 return datasetRefOrType 

1037 

1038 _LOG.debug("Butler put: %s, dataId=%s, run=%s", datasetRefOrType, dataId, run) 

1039 if not self.isWriteable(): 

1040 raise TypeError("Butler is read-only.") 

1041 

1042 with self._metrics.instrument_put(_LOG, msg="Dataset put with dataID"): 

1043 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwargs) 

1044 

1045 # Handle dimension records in dataId 

1046 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs) 

1047 

1048 # Add Registry Dataset entry. 

1049 dataId = self._registry.expandDataId(dataId, dimensions=datasetType.dimensions, **kwargs) 

1050 (ref,) = self._registry.insertDatasets(datasetType, run=run, dataIds=[dataId]) 

1051 self._datastore.put(obj, ref, provenance=provenance) 

1052 

1053 return ref 

1054 

1055 def getDeferred( 

1056 self, 

1057 datasetRefOrType: DatasetRef | DatasetType | str, 

1058 /, 

1059 dataId: DataId | None = None, 

1060 *, 

1061 parameters: dict | None = None, 

1062 collections: Any = None, 

1063 storageClass: str | StorageClass | None = None, 

1064 timespan: Timespan | None = None, 

1065 **kwargs: Any, 

1066 ) -> DeferredDatasetHandle: 

1067 """Create a `DeferredDatasetHandle` which can later retrieve a dataset, 

1068 after an immediate registry lookup. 

1069 

1070 Parameters 

1071 ---------- 

1072 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1073 When `DatasetRef` the `dataId` should be `None`. 

1074 Otherwise the `DatasetType` or name thereof. 

1075 dataId : `dict` or `DataCoordinate`, optional 

1076 A `dict` of `Dimension` link name, value pairs that label the 

1077 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1078 should be provided as the first argument. 

1079 parameters : `dict` 

1080 Additional StorageClass-defined options to control reading, 

1081 typically used to efficiently read only a subset of the dataset. 

1082 collections : Any, optional 

1083 Collections to be searched, overriding ``self.collections``. 

1084 Can be any of the types supported by the ``collections`` argument 

1085 to butler construction. 

1086 storageClass : `StorageClass` or `str`, optional 

1087 The storage class to be used to override the Python type 

1088 returned by this method. By default the returned type matches 

1089 the dataset type definition for this dataset. Specifying a 

1090 read `StorageClass` can force a different type to be returned. 

1091 This type must be compatible with the original type. 

1092 timespan : `Timespan` or `None`, optional 

1093 A timespan that the validity range of the dataset must overlap. 

1094 If not provided and this is a calibration dataset type, an attempt 

1095 will be made to find the timespan from any temporal coordinate 

1096 in the data ID. 

1097 **kwargs 

1098 Additional keyword arguments used to augment or construct a 

1099 `DataId`. See `DataId` parameters. 

1100 

1101 Returns 

1102 ------- 

1103 obj : `DeferredDatasetHandle` 

1104 A handle which can be used to retrieve a dataset at a later time. 

1105 

1106 Raises 

1107 ------ 

1108 LookupError 

1109 Raised if no matching dataset exists in the `Registry` or 

1110 datastore. 

1111 ValueError 

1112 Raised if a resolved `DatasetRef` was passed as an input, but it 

1113 differs from the one found in the registry. 

1114 TypeError 

1115 Raised if no collections were provided. 

1116 """ 

1117 if isinstance(datasetRefOrType, DatasetRef): 

1118 # Do the quick check first and if that fails, check for artifact 

1119 # existence. This is necessary for datastores that are configured 

1120 # in trust mode where there won't be a record but there will be 

1121 # a file. 

1122 if self._datastore.knows(datasetRefOrType) or self._datastore.exists(datasetRefOrType): 

1123 ref = datasetRefOrType 

1124 else: 

1125 raise LookupError(f"Dataset reference {datasetRefOrType} does not exist.") 

1126 else: 

1127 ref = self._findDatasetRef( 

1128 datasetRefOrType, dataId, collections=collections, timespan=timespan, **kwargs 

1129 ) 

1130 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass) 

1131 

1132 def get( 

1133 self, 

1134 datasetRefOrType: DatasetRef | DatasetType | str, 

1135 /, 

1136 dataId: DataId | None = None, 

1137 *, 

1138 parameters: dict[str, Any] | None = None, 

1139 collections: Any = None, 

1140 storageClass: StorageClass | str | None = None, 

1141 timespan: Timespan | None = None, 

1142 **kwargs: Any, 

1143 ) -> Any: 

1144 """Retrieve a stored dataset. 

1145 

1146 Parameters 

1147 ---------- 

1148 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1149 When `DatasetRef` the `dataId` should be `None`. 

1150 Otherwise the `DatasetType` or name thereof. 

1151 If a resolved `DatasetRef`, the associated dataset 

1152 is returned directly without additional querying. 

1153 dataId : `dict` or `DataCoordinate` 

1154 A `dict` of `Dimension` link name, value pairs that label the 

1155 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1156 should be provided as the first argument. 

1157 parameters : `dict` 

1158 Additional StorageClass-defined options to control reading, 

1159 typically used to efficiently read only a subset of the dataset. 

1160 collections : Any, optional 

1161 Collections to be searched, overriding ``self.collections``. 

1162 Can be any of the types supported by the ``collections`` argument 

1163 to butler construction. 

1164 storageClass : `StorageClass` or `str`, optional 

1165 The storage class to be used to override the Python type 

1166 returned by this method. By default the returned type matches 

1167 the dataset type definition for this dataset. Specifying a 

1168 read `StorageClass` can force a different type to be returned. 

1169 This type must be compatible with the original type. 

1170 timespan : `Timespan` or `None`, optional 

1171 A timespan that the validity range of the dataset must overlap. 

1172 If not provided and this is a calibration dataset type, an attempt 

1173 will be made to find the timespan from any temporal coordinate 

1174 in the data ID. 

1175 **kwargs 

1176 Additional keyword arguments used to augment or construct a 

1177 `DataCoordinate`. See `DataCoordinate.standardize` 

1178 parameters. 

1179 

1180 Returns 

1181 ------- 

1182 obj : `object` 

1183 The dataset. 

1184 

1185 Raises 

1186 ------ 

1187 LookupError 

1188 Raised if no matching dataset exists in the `Registry`. 

1189 TypeError 

1190 Raised if no collections were provided. 

1191 

1192 Notes 

1193 ----- 

1194 When looking up datasets in a `~CollectionType.CALIBRATION` collection, 

1195 this method requires that the given data ID include temporal dimensions 

1196 beyond the dimensions of the dataset type itself, in order to find the 

1197 dataset with the appropriate validity range. For example, a "bias" 

1198 dataset with native dimensions ``{instrument, detector}`` could be 

1199 fetched with a ``{instrument, detector, exposure}`` data ID, because 

1200 ``exposure`` is a temporal dimension. 

1201 """ 

1202 _LOG.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters) 

1203 with self._metrics.instrument_get(_LOG, msg="Retrieved dataset"): 

1204 ref = self._findDatasetRef( 

1205 datasetRefOrType, 

1206 dataId, 

1207 collections=collections, 

1208 datastore_records=True, 

1209 timespan=timespan, 

1210 **kwargs, 

1211 ) 

1212 return self._datastore.get(ref, parameters=parameters, storageClass=storageClass) 

1213 

1214 def getURIs( 

1215 self, 

1216 datasetRefOrType: DatasetRef | DatasetType | str, 

1217 /, 

1218 dataId: DataId | None = None, 

1219 *, 

1220 predict: bool = False, 

1221 collections: Any = None, 

1222 run: str | None = None, 

1223 **kwargs: Any, 

1224 ) -> DatasetRefURIs: 

1225 """Return the URIs associated with the dataset. 

1226 

1227 Parameters 

1228 ---------- 

1229 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str` 

1230 When `DatasetRef` the `dataId` should be `None`. 

1231 Otherwise the `DatasetType` or name thereof. 

1232 dataId : `dict` or `DataCoordinate` 

1233 A `dict` of `Dimension` link name, value pairs that label the 

1234 `DatasetRef` within a Collection. When `None`, a `DatasetRef` 

1235 should be provided as the first argument. 

1236 predict : `bool` 

1237 If `True`, allow URIs to be returned of datasets that have not 

1238 been written. 

1239 collections : Any, optional 

1240 Collections to be searched, overriding ``self.collections``. 

1241 Can be any of the types supported by the ``collections`` argument 

1242 to butler construction. 

1243 run : `str`, optional 

1244 Run to use for predictions, overriding ``self.run``. 

1245 **kwargs 

1246 Additional keyword arguments used to augment or construct a 

1247 `DataCoordinate`. See `DataCoordinate.standardize` 

1248 parameters. 

1249 

1250 Returns 

1251 ------- 

1252 uris : `DatasetRefURIs` 

1253 The URI to the primary artifact associated with this dataset (if 

1254 the dataset was disassembled within the datastore this may be 

1255 `None`), and the URIs to any components associated with the dataset 

1256 artifact. (can be empty if there are no components). 

1257 """ 

1258 ref = self._findDatasetRef( 

1259 datasetRefOrType, dataId, predict=predict, run=run, collections=collections, **kwargs 

1260 ) 

1261 return self._datastore.getURIs(ref, predict) 

1262 

1263 def get_dataset_type(self, name: str) -> DatasetType: 

1264 return self._registry.getDatasetType(name) 

1265 

1266 def get_dataset( 

1267 self, 

1268 id: DatasetId | str, 

1269 *, 

1270 storage_class: str | StorageClass | None = None, 

1271 dimension_records: bool = False, 

1272 datastore_records: bool = False, 

1273 ) -> DatasetRef | None: 

1274 id = _to_uuid(id) 

1275 ref = self._registry.getDataset(id) 

1276 if ref is not None: 

1277 if dimension_records: 

1278 ref = ref.expanded( 

1279 self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions) 

1280 ) 

1281 if storage_class: 

1282 ref = ref.overrideStorageClass(storage_class) 

1283 if datastore_records: 

1284 ref = self._registry.get_datastore_records(ref) 

1285 return ref 

1286 

1287 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]: 

1288 uuids = [_to_uuid(id) for id in ids] 

1289 return self._registry._managers.datasets.get_dataset_refs(uuids) 

1290 

1291 def find_dataset( 

1292 self, 

1293 dataset_type: DatasetType | str, 

1294 data_id: DataId | None = None, 

1295 *, 

1296 collections: str | Sequence[str] | None = None, 

1297 timespan: Timespan | None = None, 

1298 storage_class: str | StorageClass | None = None, 

1299 dimension_records: bool = False, 

1300 datastore_records: bool = False, 

1301 **kwargs: Any, 

1302 ) -> DatasetRef | None: 

1303 # Handle any parts of the dataID that are not using primary dimension 

1304 # keys. 

1305 if isinstance(dataset_type, str): 

1306 actual_type = self.get_dataset_type(dataset_type) 

1307 else: 

1308 actual_type = dataset_type 

1309 

1310 # Store the component for later. 

1311 component_name = actual_type.component() 

1312 if actual_type.isComponent(): 

1313 parent_type = actual_type.makeCompositeDatasetType() 

1314 else: 

1315 parent_type = actual_type 

1316 

1317 data_id, kwargs = self._rewrite_data_id(data_id, parent_type, **kwargs) 

1318 

1319 ref = self.registry.findDataset( 

1320 parent_type, 

1321 data_id, 

1322 collections=collections, 

1323 timespan=timespan, 

1324 datastore_records=datastore_records, 

1325 **kwargs, 

1326 ) 

1327 if ref is not None and dimension_records: 

1328 ref = ref.expanded(self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions)) 

1329 if ref is not None and component_name: 

1330 ref = ref.makeComponentRef(component_name) 

1331 if ref is not None and storage_class is not None: 

1332 ref = ref.overrideStorageClass(storage_class) 

1333 

1334 return ref 

1335 

1336 def retrieve_artifacts_zip( 

1337 self, 

1338 refs: Iterable[DatasetRef], 

1339 destination: ResourcePathExpression, 

1340 overwrite: bool = True, 

1341 ) -> ResourcePath: 

1342 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite) 

1343 

1344 def retrieveArtifacts( 

1345 self, 

1346 refs: Iterable[DatasetRef], 

1347 destination: ResourcePathExpression, 

1348 transfer: str = "auto", 

1349 preserve_path: bool = True, 

1350 overwrite: bool = False, 

1351 ) -> list[ResourcePath]: 

1352 # Docstring inherited. 

1353 outdir = ResourcePath(destination) 

1354 artifact_map = self._datastore.retrieveArtifacts( 

1355 refs, 

1356 outdir, 

1357 transfer=transfer, 

1358 preserve_path=preserve_path, 

1359 overwrite=overwrite, 

1360 write_index=True, 

1361 ) 

1362 return list(artifact_map) 

1363 

1364 def exists( 

1365 self, 

1366 dataset_ref_or_type: DatasetRef | DatasetType | str, 

1367 /, 

1368 data_id: DataId | None = None, 

1369 *, 

1370 full_check: bool = True, 

1371 collections: Any = None, 

1372 **kwargs: Any, 

1373 ) -> DatasetExistence: 

1374 # Docstring inherited. 

1375 existence = DatasetExistence.UNRECOGNIZED 

1376 

1377 if isinstance(dataset_ref_or_type, DatasetRef): 

1378 if collections is not None: 

1379 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=2) 

1380 if data_id is not None: 

1381 warnings.warn("A DataID should not be specified with DatasetRef", stacklevel=2) 

1382 ref = dataset_ref_or_type 

1383 registry_ref = self._registry.getDataset(dataset_ref_or_type.id) 

1384 if registry_ref is not None: 

1385 existence |= DatasetExistence.RECORDED 

1386 

1387 if dataset_ref_or_type != registry_ref: 

1388 # This could mean that storage classes differ, so we should 

1389 # check for that but use the registry ref for the rest of 

1390 # the method. 

1391 if registry_ref.is_compatible_with(dataset_ref_or_type): 

1392 # Use the registry version from now on. 

1393 ref = registry_ref 

1394 else: 

1395 raise ValueError( 

1396 f"The ref given to exists() ({ref}) has the same dataset ID as one " 

1397 f"in registry but has different incompatible values ({registry_ref})." 

1398 ) 

1399 else: 

1400 try: 

1401 ref = self._findDatasetRef(dataset_ref_or_type, data_id, collections=collections, **kwargs) 

1402 except (LookupError, TypeError): 

1403 return existence 

1404 existence |= DatasetExistence.RECORDED 

1405 

1406 if self._datastore.knows(ref): 

1407 existence |= DatasetExistence.DATASTORE 

1408 

1409 if full_check: 

1410 if self._datastore.exists(ref): 

1411 existence |= DatasetExistence._ARTIFACT 

1412 elif existence.value != DatasetExistence.UNRECOGNIZED.value: 

1413 # Do not add this flag if we have no other idea about a dataset. 

1414 existence |= DatasetExistence(DatasetExistence._ASSUMED) 

1415 

1416 return existence 

1417 

1418 def _exists_many( 

1419 self, 

1420 refs: Iterable[DatasetRef], 

1421 /, 

1422 *, 

1423 full_check: bool = True, 

1424 ) -> dict[DatasetRef, DatasetExistence]: 

1425 # Docstring inherited. 

1426 existence = {ref: DatasetExistence.UNRECOGNIZED for ref in refs} 

1427 

1428 # Check which refs exist in the registry. 

1429 id_map = {ref.id: ref for ref in existence.keys()} 

1430 for registry_ref in self.get_many_datasets(id_map.keys()): 

1431 # Consistency between the given DatasetRef and the information 

1432 # recorded in the registry is not verified. 

1433 existence[id_map[registry_ref.id]] |= DatasetExistence.RECORDED 

1434 

1435 # Ask datastore if it knows about these refs. 

1436 knows = self._datastore.knows_these(refs) 

1437 for ref, known in knows.items(): 

1438 if known: 

1439 existence[ref] |= DatasetExistence.DATASTORE 

1440 

1441 if full_check: 

1442 mexists = self._datastore.mexists(refs) 

1443 for ref, exists in mexists.items(): 

1444 if exists: 

1445 existence[ref] |= DatasetExistence._ARTIFACT 

1446 else: 

1447 # Do not set this flag if nothing is known about the dataset. 

1448 for ref in existence: 

1449 if existence[ref] != DatasetExistence.UNRECOGNIZED: 

1450 existence[ref] |= DatasetExistence._ASSUMED 

1451 

1452 return existence 

1453 

1454 def removeRuns( 

1455 self, 

1456 names: Iterable[str], 

1457 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault, 

1458 *, 

1459 unlink_from_chains: bool = False, 

1460 ) -> None: 

1461 # Docstring inherited. 

1462 if not self.isWriteable(): 

1463 raise TypeError("Butler is read-only.") 

1464 

1465 if unstore is not _DeprecatedDefault: 

1466 # The value was passed in by a user. Must report it is now 

1467 # ignored. 

1468 if unstore is True: 

1469 msg = "The unstore parameter is deprecated and is now always treated as True. " 

1470 else: 

1471 msg = "The unstore parameter for removeRuns can no longer be False and is now ignored. " 

1472 warnings.warn( 

1473 msg + " The parameter will be removed after v30.", 

1474 category=FutureWarning, 

1475 stacklevel=find_outside_stacklevel("lsst.daf.butler"), 

1476 ) 

1477 

1478 names = list(names) 

1479 refs: list[DatasetRef] = [] 

1480 # Map of the chained collections to the RUN children. 

1481 parents_to_children: dict[str, set[str]] = defaultdict(set) 

1482 

1483 with self._caching_context(): 

1484 # Get information about these RUNs. 

1485 collections_info = self.collections.query_info(names, include_parents=unlink_from_chains) 

1486 for info in collections_info: 

1487 if info.type is not CollectionType.RUN: 

1488 raise TypeError(f"The collection type of '{info.name}' is {info.type.name}, not RUN.") 

1489 if unlink_from_chains: 

1490 if info.parents is None: # For mypy. 

1491 raise AssertionError("Internal error: Collection parents required but not received") 

1492 for parent in info.parents: 

1493 parents_to_children[parent].add(info.name) 

1494 

1495 # Update the names in case the query unexpectedly had a wildcard. 

1496 names = [info.name for info in collections_info] 

1497 

1498 # Get all the datasets from these runs. 

1499 refs = self.query_all_datasets(names, find_first=False, limit=None) 

1500 

1501 # Call pruneDatasets since we are deliberately removing 

1502 # datasets in chunks from the RUN collections rather than 

1503 # attempting to remove everything at once. 

1504 with time_this( 

1505 _LOG, 

1506 msg="Removing %d dataset%s from %s", 

1507 args=(len(refs), "s" if len(refs) != 1 else "", ", ".join(names)), 

1508 ): 

1509 self.pruneDatasets(refs, unstore=True, purge=True, disassociate=True) 

1510 

1511 # Now can remove the actual RUN collection and unlink from chains. 

1512 with self._registry.transaction(): 

1513 # This will fail if caller is not unlinking from chains but the 

1514 # RUN is in a chain -- but we have already deleted all the datasets 

1515 # by this point. 

1516 if unlink_from_chains: 

1517 # Use deterministic order for deletions to attempt to minimize 

1518 # risk of deadlocks for parallel deletes. 

1519 for parent in sorted(parents_to_children): 

1520 self.collections.remove_from_chain(parent, sorted(parents_to_children[parent])) 

1521 # Sort to avoid potential deadlocks. 

1522 for name in sorted(names): 

1523 # This should be fast since the collection should be empty. 

1524 with time_this(_LOG, msg="Removing RUN collection %s", args=(name,)): 

1525 self._registry.removeCollection(name) 

1526 _LOG.info("Completely removed the following RUN collections: %s", ", ".join(names)) 

1527 

1528 def pruneDatasets( 

1529 self, 

1530 refs: Iterable[DatasetRef], 

1531 *, 

1532 disassociate: bool = True, 

1533 unstore: bool = False, 

1534 tags: Iterable[str] = (), 

1535 purge: bool = False, 

1536 ) -> None: 

1537 # docstring inherited from LimitedButler 

1538 

1539 if not self.isWriteable(): 

1540 raise TypeError("Butler is read-only.") 

1541 if purge: 

1542 if not disassociate: 

1543 raise TypeError("Cannot pass purge=True without disassociate=True.") 

1544 if not unstore: 

1545 raise TypeError("Cannot pass purge=True without unstore=True.") 

1546 elif disassociate: 

1547 tags = tuple(tags) 

1548 if not tags: 

1549 raise TypeError("No tags provided but disassociate=True.") 

1550 for tag in tags: 

1551 collectionType = self._registry.getCollectionType(tag) 

1552 if collectionType is not CollectionType.TAGGED: 

1553 raise TypeError( 

1554 f"Cannot disassociate from collection '{tag}' " 

1555 f"of non-TAGGED type {collectionType.name}." 

1556 ) 

1557 # Transform possibly-single-pass iterable into something we can iterate 

1558 # over multiple times. 

1559 refs = list(refs) 

1560 # Pruning a component of a DatasetRef makes no sense since registry 

1561 # doesn't know about components and datastore might not store 

1562 # components in a separate file 

1563 for ref in refs: 

1564 if ref.datasetType.component(): 

1565 raise ValueError(f"Can not prune a component of a dataset (ref={ref})") 

1566 

1567 # Chunk the deletions using a size that is reasonably efficient whilst 

1568 # also giving reasonable feedback to the user. Chunking also minimizes 

1569 # what needs to rollback if there is a failure and should allow 

1570 # incremental re-running of the pruning (assuming the query is 

1571 # repeated). The only issue will be if the Ctrl-C comes during 

1572 # emptyTrash since an admin command would need to run to finish the 

1573 # emptying of that chunk. 

1574 progress = Progress("lsst.daf.butler.Butler.pruneDatasets", level=_LOG.INFO) 

1575 chunk_size = 50_000 

1576 n_chunks = math.ceil(len(refs) / chunk_size) 

1577 if n_chunks > 1: 

1578 _LOG.verbose("Pruning a total of %d datasets", len(refs)) 

1579 chunk_num = 0 

1580 for chunked_refs in progress.wrap( 

1581 chunk_iterable(refs, chunk_size=chunk_size), desc="Deleting datasets", total=n_chunks 

1582 ): 

1583 chunk_num += 1 

1584 _LOG.verbose( 

1585 "Pruning %d dataset%s in chunk %d/%d", 

1586 len(chunked_refs), 

1587 "s" if len(chunked_refs) != 1 else "", 

1588 chunk_num, 

1589 n_chunks, 

1590 ) 

1591 with time_this( 

1592 _LOG, 

1593 msg="Removing %d datasets for chunk %d/%d", 

1594 args=(len(chunked_refs), chunk_num, n_chunks), 

1595 ): 

1596 self._prune_datasets( 

1597 chunked_refs, tags=tags, unstore=unstore, purge=purge, disassociate=disassociate 

1598 ) 

1599 

1600 def _prune_datasets( 

1601 self, 

1602 refs: Collection[DatasetRef], 

1603 *, 

1604 disassociate: bool = True, 

1605 unstore: bool = False, 

1606 tags: Iterable[str] = (), 

1607 purge: bool = False, 

1608 ) -> None: 

1609 # We don't need an unreliable Datastore transaction for this, because 

1610 # we've been extra careful to ensure that Datastore.trash only involves 

1611 # mutating the Registry (it can _look_ at Datastore-specific things, 

1612 # but shouldn't change them), and hence all operations here are 

1613 # Registry operations. 

1614 with self.transaction(): 

1615 plural = "s" if len(refs) != 1 else "" 

1616 if unstore: 

1617 with time_this( 

1618 _LOG, 

1619 msg="Marking %d dataset%s for removal during pruneDatasets", 

1620 args=(len(refs), plural), 

1621 ): 

1622 self._datastore.trash(refs) 

1623 if purge: 

1624 with time_this( 

1625 _LOG, msg="Removing %d pruned dataset%s from registry", args=(len(refs), plural) 

1626 ): 

1627 self._registry.removeDatasets(refs) 

1628 elif disassociate: 

1629 assert tags, "Guaranteed by earlier logic in this function." 

1630 with time_this( 

1631 _LOG, msg="Disassociating %d dataset%ss from tagged collections", args=(len(refs), plural) 

1632 ): 

1633 for tag in tags: 

1634 self._registry.disassociate(tag, refs) 

1635 # We've exited the Registry transaction, and apparently committed. 

1636 # (if there was an exception, everything rolled back, and it's as if 

1637 # nothing happened - and we never get here). 

1638 # Datastore artifacts are not yet gone, but they're clearly marked 

1639 # as trash, so if we fail to delete now because of (e.g.) filesystem 

1640 # problems we can try again later, and if manual administrative 

1641 # intervention is required, it's pretty clear what that should entail: 

1642 # deleting everything on disk and in private Datastore tables that is 

1643 # in the dataset_location_trash table. 

1644 if unstore: 

1645 # Point of no return for removing artifacts. Restrict the trash 

1646 # emptying to the refs that this call trashed. 

1647 with time_this( 

1648 _LOG, 

1649 msg="Attempting to remove artifacts for %d dataset%s associated with pruning", 

1650 args=(len(refs), plural), 

1651 ): 

1652 self._datastore.emptyTrash(refs=refs) 

1653 

1654 def ingest_zip( 

1655 self, 

1656 zip_file: ResourcePathExpression, 

1657 transfer: str = "auto", 

1658 *, 

1659 transfer_dimensions: bool = False, 

1660 dry_run: bool = False, 

1661 skip_existing: bool = False, 

1662 ) -> None: 

1663 # Docstring inherited. 

1664 if not self.isWriteable(): 

1665 raise TypeError("Butler is read-only.") 

1666 

1667 zip_path = ResourcePath(zip_file) 

1668 index = ZipIndex.from_zip_file(zip_path) 

1669 _LOG.verbose( 

1670 "Ingesting %s containing %d datasets and %d files.", zip_path, len(index.refs), len(index) 

1671 ) 

1672 

1673 # Need to ingest the refs into registry. Re-use the standard ingest 

1674 # code by reconstructing FileDataset from the index. 

1675 refs = index.refs.to_refs(universe=self.dimensions) 

1676 id_to_ref = {ref.id: ref for ref in refs} 

1677 datasets: list[FileDataset] = [] 

1678 processed_ids: set[uuid.UUID] = set() 

1679 for path_in_zip, index_info in index.artifact_map.items(): 

1680 # Disassembled composites need to check this ref isn't already 

1681 # included. 

1682 unprocessed = {id_ for id_ in index_info.ids if id_ not in processed_ids} 

1683 if not unprocessed: 

1684 continue 

1685 dataset = FileDataset(refs=[id_to_ref[id_] for id_ in unprocessed], path=path_in_zip) 

1686 datasets.append(dataset) 

1687 processed_ids.update(unprocessed) 

1688 

1689 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets) 

1690 if existing_datasets: 

1691 if skip_existing: 

1692 _LOG.info( 

1693 "Skipping %d datasets from zip file %s which already exist in the repository.", 

1694 len(existing_datasets), 

1695 zip_file, 

1696 ) 

1697 else: 

1698 raise ConflictingDefinitionError( 

1699 f"Datastore already contains {len(existing_datasets)} of the given datasets." 

1700 f" Example: {existing_datasets[0]}" 

1701 ) 

1702 if new_datasets: 

1703 # Can not yet support partial zip ingests where a zip contains 

1704 # some datasets that are already in another zip. 

1705 raise ValueError( 

1706 f"The given zip file from {zip_file} contains {len(new_datasets)} datasets not known " 

1707 f"to this butler but also contains {len(existing_datasets)} datasets already known to " 

1708 "this butler. Currently butler can not ingest zip files with overlapping content." 

1709 ) 

1710 return 

1711 

1712 # Ingest doesn't create the RUN collections so we have to do that 

1713 # here. 

1714 # 

1715 # Sort by run collection name to ensure Postgres takes locks in the 

1716 # same order between different processes, to mitigate an issue 

1717 # where Postgres can deadlock due to the unique index on collection 

1718 # name. (See DM-47543). 

1719 runs = {ref.run for ref in refs} 

1720 for run in sorted(runs): 

1721 registered = self.collections.register(run) 

1722 if registered: 

1723 _LOG.verbose("Created RUN collection %s as part of zip ingest", run) 

1724 

1725 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE) 

1726 import_info = self._prepare_ingest_file_datasets( 

1727 datasets, progress, dry_run=dry_run, transfer_dimensions=transfer_dimensions 

1728 ) 

1729 

1730 # Calculate some statistics based on the given list of datasets. 

1731 n_datasets = 0 

1732 for d in datasets: 

1733 n_datasets += len(d.refs) 

1734 srefs = "s" if n_datasets != 1 else "" 

1735 

1736 with ( 

1737 self._metrics.instrument_ingest( 

1738 n_datasets, _LOG, msg=f"Ingesting zip file {zip_file} with {n_datasets} dataset{srefs}" 

1739 ), 

1740 self.transaction(), 

1741 ): 

1742 # Do not need expanded dataset refs so can ignore the return value. 

1743 self._ingest_file_datasets(datasets, import_info, progress, dry_run=dry_run) 

1744 

1745 try: 

1746 self._datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run) 

1747 except IntegrityError as e: 

1748 raise ConflictingDefinitionError( 

1749 f"Datastore already contains one or more datasets: {e}" 

1750 ) from e 

1751 

1752 def _prepare_ingest_file_datasets( 

1753 self, 

1754 datasets: Sequence[FileDataset], 

1755 progress: Progress, 

1756 *, 

1757 transfer_dimensions: bool = False, 

1758 dry_run: bool = False, 

1759 ) -> _ImportDatasetsInfo: 

1760 # Track DataIDs that are being ingested so we can spot issues early 

1761 # with duplication. Retain previous FileDataset so we can report it. 

1762 groupedDataIds: MutableMapping[tuple[DatasetType, str], dict[DataCoordinate, FileDataset]] = ( 

1763 defaultdict(dict) 

1764 ) 

1765 

1766 # All the refs we need to import. 

1767 refs: list[DatasetRef] = [] 

1768 

1769 for dataset in progress.wrap(datasets, desc="Validating dataIDs"): 

1770 for ref in dataset.refs: 

1771 group_key = (ref.datasetType, ref.run) 

1772 

1773 if ref.dataId in groupedDataIds[group_key]: 

1774 raise ConflictingDefinitionError( 

1775 f"Ingest conflict. Dataset {dataset.path} has same" 

1776 " DataId as other ingest dataset" 

1777 f" {groupedDataIds[group_key][ref.dataId].path} " 

1778 f" ({ref.dataId})" 

1779 ) 

1780 

1781 groupedDataIds[group_key][ref.dataId] = dataset 

1782 refs.extend(dataset.refs) 

1783 

1784 # Ensure that dataset types are created and all ref information 

1785 # extracted. 

1786 import_info = self._prepare_for_import_refs( 

1787 self, 

1788 refs, 

1789 register_dataset_types=True, 

1790 dry_run=dry_run, 

1791 transfer_dimensions=transfer_dimensions, 

1792 ) 

1793 return import_info 

1794 

1795 def _ingest_file_datasets( 

1796 self, 

1797 datasets: Sequence[FileDataset], 

1798 import_info: _ImportDatasetsInfo, 

1799 progress: Progress, 

1800 *, 

1801 dry_run: bool = False, 

1802 ) -> None: 

1803 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run) 

1804 imported_refs = self._import_grouped_refs( 

1805 import_info.grouped_refs, None, progress, dry_run=dry_run, expand_refs=True 

1806 ) 

1807 

1808 # The expanded refs need to be attached back to the original 

1809 # FileDatasets for datastore to use. 

1810 id_to_ref = {ref.id: ref for ref in imported_refs} 

1811 

1812 for dataset in progress.wrap(datasets, desc="Re-attaching expanded refs"): 

1813 dataset.refs = [id_to_ref[ref.id] for ref in dataset.refs] 

1814 

1815 def ingest( 

1816 self, 

1817 *datasets: FileDataset, 

1818 transfer: str | None = "auto", 

1819 record_validation_info: bool = True, 

1820 skip_existing: bool = False, 

1821 ) -> None: 

1822 # Docstring inherited. 

1823 if not datasets: 

1824 return 

1825 if not self.isWriteable(): 

1826 raise TypeError("Butler is read-only.") 

1827 _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s") 

1828 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE) 

1829 

1830 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets) 

1831 if existing_datasets: 

1832 if skip_existing: 

1833 _LOG.info( 

1834 "Skipping %d datasets which already exist in the repository.", len(existing_datasets) 

1835 ) 

1836 else: 

1837 raise ConflictingDefinitionError( 

1838 f"Datastore already contains {len(existing_datasets)} of the given datasets." 

1839 f" Example: {existing_datasets[0]}" 

1840 ) 

1841 

1842 # Calculate some statistics based on the given list of datasets. 

1843 n_files = len(datasets) 

1844 n_datasets = 0 

1845 for d in datasets: 

1846 n_datasets += len(d.refs) 

1847 sfiles = "s" if n_files != 1 else "" 

1848 srefs = "s" if n_datasets != 1 else "" 

1849 

1850 # We use `datasets` rather `new_datasets` for the Registry 

1851 # portion of this, to let it confirm that everything matches the 

1852 # existing datasets. 

1853 import_info = self._prepare_ingest_file_datasets(datasets, progress) 

1854 

1855 with ( 

1856 self._metrics.instrument_ingest( 

1857 n_datasets, _LOG, msg=f"Ingesting {n_files} file{sfiles} with {n_datasets} dataset{srefs}" 

1858 ), 

1859 self.transaction(), 

1860 ): 

1861 self._ingest_file_datasets(datasets, import_info, progress) 

1862 

1863 # Bulk-insert everything into Datastore. 

1864 # We do not know if any of the registry entries already existed 

1865 # (_importDatasets only complains if they exist but differ). 

1866 # The _partition_datasets_by_known logic above should catch most 

1867 # instances where we attempt to re-ingest files that were already 

1868 # ingested, but a concurrent writer could cause a unique constraint 

1869 # violation here. 

1870 try: 

1871 self._datastore.ingest( 

1872 *new_datasets, transfer=transfer, record_validation_info=record_validation_info 

1873 ) 

1874 except IntegrityError as e: 

1875 raise ConflictingDefinitionError( 

1876 f"Datastore already contains one or more datasets: {e}" 

1877 ) from e 

1878 

1879 def _partition_datasets_by_known( 

1880 self, datasets: Iterable[FileDataset] 

1881 ) -> tuple[list[FileDataset], list[FileDataset]]: 

1882 """Divides the given `FileDataset` objects into two groups: those for 

1883 which the Datastore already has an entry, and those for which it does 

1884 not. 

1885 """ 

1886 new_datasets = [] 

1887 existing_datasets = [] 

1888 

1889 refs = itertools.chain.from_iterable(dataset.refs for dataset in datasets) 

1890 known_refs = self._datastore.knows_these(refs) 

1891 

1892 for dataset in datasets: 

1893 if any(known_refs[ref] for ref in dataset.refs): 

1894 existing_datasets.append(dataset) 

1895 else: 

1896 new_datasets.append(dataset) 

1897 

1898 return new_datasets, existing_datasets 

1899 

1900 @contextlib.contextmanager 

1901 def export( 

1902 self, 

1903 *, 

1904 directory: str | None = None, 

1905 filename: str | None = None, 

1906 format: str | None = None, 

1907 transfer: str | None = None, 

1908 ) -> Iterator[RepoExportContext]: 

1909 # Docstring inherited. 

1910 if directory is None and transfer is not None: 

1911 raise TypeError("Cannot transfer without providing a directory.") 

1912 if transfer == "move": 

1913 raise TypeError("Transfer may not be 'move': export is read-only") 

1914 if format is None: 

1915 if filename is None: 

1916 raise TypeError("At least one of 'filename' or 'format' must be provided.") 

1917 else: 

1918 _, format = os.path.splitext(filename) 

1919 if not format: 

1920 raise ValueError("Please specify a file extension to determine export format.") 

1921 format = format[1:] # Strip leading "."" 

1922 elif filename is None: 

1923 filename = f"export.{format}" 

1924 if directory is not None: 

1925 filename = os.path.join(directory, filename) 

1926 formats = self._config["repo_transfer_formats"] 

1927 if format not in formats: 

1928 raise ValueError(f"Unknown export format {format!r}, allowed: {','.join(formats.keys())}") 

1929 BackendClass = get_class_of(formats[format, "export"]) 

1930 with open(filename, "w") as stream: 

1931 backend = BackendClass(stream, universe=self.dimensions) 

1932 try: 

1933 helper = RepoExportContext(self, backend=backend, directory=directory, transfer=transfer) 

1934 with self._caching_context(): 

1935 yield helper 

1936 except BaseException: 

1937 raise 

1938 else: 

1939 helper._finish() 

1940 

1941 def import_( 

1942 self, 

1943 *, 

1944 directory: ResourcePathExpression | None = None, 

1945 filename: ResourcePathExpression | TextIO | None = None, 

1946 format: str | None = None, 

1947 transfer: str | None = None, 

1948 skip_dimensions: set | None = None, 

1949 record_validation_info: bool = True, 

1950 without_datastore: bool = False, 

1951 ) -> None: 

1952 # Docstring inherited. 

1953 if not self.isWriteable(): 

1954 raise TypeError("Butler is read-only.") 

1955 if filename is None and format is not None: 

1956 filename = ResourcePath(f"export.{format}", forceAbsolute=False) 

1957 if directory is not None: 

1958 directory = ResourcePath(directory, forceDirectory=True) 

1959 # mypy doesn't think this will work but it does in python >= 3.10. 

1960 if isinstance(filename, ResourcePathExpression): # type: ignore 

1961 filename = ResourcePath(filename, forceAbsolute=False) # type: ignore 

1962 if format is None: 

1963 format = filename.getExtension() 

1964 if not filename.isabs() and directory is not None: 

1965 potential = directory.join(filename) 

1966 exists_in_cwd = filename.exists() 

1967 exists_in_dir = potential.exists() 

1968 if exists_in_cwd and exists_in_dir: 

1969 _LOG.warning( 

1970 "A relative path for filename was specified (%s) which exists relative to cwd. " 

1971 "Additionally, the file exists relative to the given search directory (%s). " 

1972 "Using the export file in the given directory.", 

1973 filename, 

1974 potential, 

1975 ) 

1976 # Given they specified an explicit directory and that 

1977 # directory has the export file in it, assume that that 

1978 # is what was meant despite the file in cwd. 

1979 filename = potential 

1980 elif exists_in_dir: 

1981 filename = potential 

1982 elif not exists_in_cwd and not exists_in_dir: 

1983 # Raise early. 

1984 raise FileNotFoundError( 

1985 f"Export file could not be found in {filename.abspath()} or {potential.abspath()}." 

1986 ) 

1987 elif format is None: 

1988 format = ".yaml" 

1989 BackendClass: type[RepoImportBackend] = get_class_of( 

1990 self._config["repo_transfer_formats"][format]["import"] 

1991 ) 

1992 

1993 def doImport(importStream: TextIO | ResourceHandleProtocol) -> None: 

1994 with self._caching_context(): 

1995 backend = BackendClass(importStream, self) # type: ignore[call-arg] 

1996 backend.register() 

1997 with self.transaction(): 

1998 backend.load( 

1999 datastore=self._datastore if not without_datastore else None, 

2000 directory=directory, 

2001 transfer=transfer, 

2002 skip_dimensions=skip_dimensions, 

2003 record_validation_info=record_validation_info, 

2004 ) 

2005 

2006 if isinstance(filename, ResourcePath): 

2007 # We can not use open() here at the moment because of 

2008 # DM-38589 since yaml does stream.read(8192) in a loop. 

2009 stream = io.StringIO(filename.read().decode()) 

2010 doImport(stream) 

2011 else: 

2012 doImport(filename) # type: ignore 

2013 

2014 def transfer_dimension_records_from( 

2015 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate] 

2016 ) -> None: 

2017 # Allowed dimensions in the target butler. 

2018 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table) 

2019 

2020 data_ids = {ref.dataId for ref in source_refs} 

2021 

2022 dimension_records = self._extract_all_dimension_records_from_data_ids( 

2023 source_butler, data_ids, elements 

2024 ) 

2025 

2026 # Insert order is important. 

2027 for element in self.dimensions.sorted(dimension_records.keys()): 

2028 records = [r for r in dimension_records[element].values()] 

2029 # Assume that if the record is already present that we can 

2030 # use it without having to check that the record metadata 

2031 # is consistent. 

2032 self._registry.insertDimensionData(element, *records, skip_existing=True) 

2033 _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records)) 

2034 

2035 def _extract_all_dimension_records_from_data_ids( 

2036 self, 

2037 source_butler: LimitedButler | Butler, 

2038 data_ids: set[DataCoordinate], 

2039 allowed_elements: frozenset[DimensionElement], 

2040 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: 

2041 primary_records = self._extract_dimension_records_from_data_ids( 

2042 source_butler, data_ids, allowed_elements 

2043 ) 

2044 

2045 additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2046 for original_element, record_mapping in primary_records.items(): 

2047 # Get dimensions that depend on this dimension. 

2048 populated_by = self.dimensions.get_elements_populated_by( 

2049 self.dimensions[original_element.name] # type: ignore 

2050 ) 

2051 if populated_by: 

2052 for element in populated_by: 

2053 if element not in allowed_elements: 

2054 continue 

2055 if element.name == original_element.name: 

2056 continue 

2057 

2058 if element.name in primary_records: 

2059 # If this element has already been stored avoid 

2060 # re-finding records since that may lead to additional 

2061 # spurious records. e.g. visit is populated_by 

2062 # visit_detector_region but querying 

2063 # visit_detector_region by visit will return all the 

2064 # detectors for this visit -- the visit dataId does not 

2065 # constrain this. 

2066 # To constrain the query the original dataIds would 

2067 # have to be scanned. 

2068 continue 

2069 

2070 if record_mapping: 

2071 if not isinstance(source_butler, Butler): 

2072 raise RuntimeError( 

2073 f"Transferring populated_by records like {element.name}" 

2074 " requires a full Butler." 

2075 ) 

2076 

2077 with source_butler.query() as query: 

2078 records = query.join_data_coordinates(record_mapping.keys()).dimension_records( 

2079 element.name 

2080 ) 

2081 for record in records: 

2082 additional_records[record.definition].setdefault(record.dataId, record) 

2083 

2084 # The next step is to walk back through the additional records to 

2085 # pick up any missing content (such as visit_definition needing to 

2086 # know the exposure). Want to ensure we do not request records we 

2087 # already have. 

2088 missing_data_ids = set() 

2089 for record_mapping in additional_records.values(): 

2090 for data_id in record_mapping.keys(): 

2091 for dimension in data_id.dimensions.required: 

2092 element = source_butler.dimensions[dimension] 

2093 dimension_key = data_id.subset(dimension) 

2094 if dimension_key not in primary_records[element]: 

2095 missing_data_ids.add(dimension_key) 

2096 

2097 # Fill out the new records. Assume that these new records do not 

2098 # also need to carry over additional populated_by records. 

2099 secondary_records = self._extract_dimension_records_from_data_ids( 

2100 source_butler, missing_data_ids, allowed_elements 

2101 ) 

2102 

2103 # Merge the extra sets of records in with the original. 

2104 for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()): 

2105 primary_records[name].update(record_mapping) 

2106 

2107 return primary_records 

2108 

2109 def _extract_dimension_records_from_data_ids( 

2110 self, 

2111 source_butler: LimitedButler | Butler, 

2112 data_ids: Iterable[DataCoordinate], 

2113 allowed_elements: frozenset[DimensionElement], 

2114 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]: 

2115 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2116 

2117 data_ids = set(data_ids) 

2118 if not all(data_id.hasRecords() for data_id in data_ids): 

2119 if isinstance(source_butler, Butler): 

2120 data_ids = source_butler._expand_data_ids(data_ids) 

2121 else: 

2122 raise TypeError("Input butler needs to be a full butler to expand DataId.") 

2123 

2124 for data_id in data_ids: 

2125 # If this butler doesn't know about a dimension in the source 

2126 # butler things will break later. 

2127 for element_name in data_id.dimensions.elements: 

2128 record = data_id.records[element_name] 

2129 if record is not None and record.definition in allowed_elements: 

2130 dimension_records[record.definition].setdefault(record.dataId, record) 

2131 

2132 return dimension_records 

2133 

2134 def _prepare_for_import_refs( 

2135 self, 

2136 source_butler: LimitedButler, 

2137 source_refs: Iterable[DatasetRef], 

2138 *, 

2139 register_dataset_types: bool = False, 

2140 transfer_dimensions: bool = False, 

2141 dry_run: bool = False, 

2142 ) -> _ImportDatasetsInfo: 

2143 # Docstring inherited. 

2144 if not self.isWriteable() and not dry_run: 

2145 raise TypeError("Butler is read-only.") 

2146 

2147 # Will iterate through the refs multiple times so need to convert 

2148 # to a list if this isn't a collection. 

2149 if not isinstance(source_refs, collections.abc.Collection): 

2150 source_refs = list(source_refs) 

2151 

2152 original_count = len(source_refs) 

2153 log_level = _LOG.INFO if original_count > 1 else _LOG.VERBOSE 

2154 _LOG.log( 

2155 log_level, 

2156 "Importing %d dataset%s into %s", 

2157 original_count, 

2158 "s" if original_count != 1 else "", 

2159 str(self), 

2160 ) 

2161 

2162 # Importing requires that we group the refs by dimension group and run 

2163 # before doing the import. 

2164 source_dataset_types = set() 

2165 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] = defaultdict(list) 

2166 for ref in source_refs: 

2167 grouped_refs[_RefGroup(ref.datasetType.dimensions, ref.run)].append(ref) 

2168 source_dataset_types.add(ref.datasetType) 

2169 

2170 # Check to see if the dataset type in the source butler has 

2171 # the same definition in the target butler and register missing 

2172 # ones if requested. Registration must happen outside a transaction. 

2173 newly_registered_dataset_types = set() 

2174 for datasetType in source_dataset_types: 

2175 if register_dataset_types: 

2176 # Let this raise immediately if inconsistent. Continuing 

2177 # on to find additional inconsistent dataset types 

2178 # might result in additional unwanted dataset types being 

2179 # registered. 

2180 try: 

2181 if not dry_run and self._registry.registerDatasetType(datasetType): 

2182 newly_registered_dataset_types.add(datasetType) 

2183 except ConflictingDefinitionError as e: 

2184 # Be safe and require that conversions be bidirectional 

2185 # when there are storage class mismatches. This is because 

2186 # get() will have to support conversion from source to 

2187 # target python type (the source formatter will be 

2188 # returning source python type) but there also is an 

2189 # expectation that people will want to be able to get() in 

2190 # the target using the source python type, which will not 

2191 # require conversion for transferred datasets but might 

2192 # for target-native types. Additionally, butler.get does 

2193 # not know that the formatter will return the wrong 

2194 # python type and so will always check that the conversion 

2195 # works even though it won't need it. 

2196 target_dataset_type = self.get_dataset_type(datasetType.name) 

2197 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType) 

2198 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type) 

2199 if not (target_compatible_with_source and source_compatible_with_target): 

2200 if target_compatible_with_source: 

2201 e.add_note( 

2202 "Target dataset type storage class is compatible with source " 

2203 "but the reverse is not true." 

2204 ) 

2205 elif source_compatible_with_target: 

2206 e.add_note( 

2207 "Source dataset type storage class is compatible with target " 

2208 "but the reverse is not true." 

2209 ) 

2210 else: 

2211 e.add_note("If storage classes differ, please register converters.") 

2212 raise 

2213 else: 

2214 # If the dataset type is missing, let it fail immediately. 

2215 target_dataset_type = self.get_dataset_type(datasetType.name) 

2216 if target_dataset_type != datasetType: 

2217 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType) 

2218 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type) 

2219 # Both conversion directions are currently required. 

2220 if not (target_compatible_with_source and source_compatible_with_target): 

2221 msg = "" 

2222 if target_compatible_with_source: 

2223 msg = ( 

2224 "Target storage class is compatible with the source storage class " 

2225 "but the reverse is not true." 

2226 ) 

2227 elif source_compatible_with_target: 

2228 msg = ( 

2229 "Source storage class is compatible with the target storage class" 

2230 " but the reverse is not true." 

2231 ) 

2232 else: 

2233 msg = "If storage classes differ register converters." 

2234 if msg: 

2235 msg = f"({msg})" 

2236 raise ConflictingDefinitionError( 

2237 "Source butler dataset type differs from definition" 

2238 f" in target butler: {datasetType} !=" 

2239 f" {target_dataset_type} {msg}" 

2240 ) 

2241 if newly_registered_dataset_types: 

2242 # We may have registered some even if there were inconsistencies 

2243 # but should let people know (or else remove them again). 

2244 _LOG.verbose( 

2245 "Registered the following dataset types in the target Butler: %s", 

2246 ", ".join(d.name for d in newly_registered_dataset_types), 

2247 ) 

2248 else: 

2249 _LOG.verbose("All required dataset types are known to the target Butler") 

2250 

2251 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

2252 if transfer_dimensions: 

2253 # Collect all the dimension records for these refs. 

2254 # All dimensions are to be copied but the list of valid dimensions 

2255 # come from this butler's universe. 

2256 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table) 

2257 dataIds = {ref.dataId for ref in source_refs} 

2258 dimension_records = self._extract_all_dimension_records_from_data_ids( 

2259 source_butler, dataIds, elements 

2260 ) 

2261 return _ImportDatasetsInfo(grouped_refs, dimension_records) 

2262 

2263 def _import_dimension_records( 

2264 self, 

2265 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]], 

2266 *, 

2267 dry_run: bool, 

2268 ) -> None: 

2269 """Import dimension records collected during import pre-process.""" 

2270 if dimension_records and not dry_run: 

2271 _LOG.verbose("Ensuring that dimension records exist for transferred datasets.") 

2272 # Order matters. 

2273 for element in self.dimensions.sorted(dimension_records.keys()): 

2274 records = list(dimension_records[element].values()) 

2275 # Assume that if the record is already present that we can 

2276 # use it without having to check that the record metadata 

2277 # is consistent. 

2278 self._registry.insertDimensionData(element, *records, skip_existing=True) 

2279 

2280 def _import_grouped_refs( 

2281 self, 

2282 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]], 

2283 source_butler: LimitedButler | None, 

2284 progress: Progress, 

2285 *, 

2286 dry_run: bool = False, 

2287 expand_refs: bool = False, 

2288 ) -> list[DatasetRef]: 

2289 handled_collections: set[str] = set() 

2290 n_to_import = 0 

2291 all_imported_refs: list[DatasetRef] = [] 

2292 # Sort by run collection name to ensure Postgres takes locks in the 

2293 # same order between different processes, to mitigate an issue 

2294 # where Postgres can deadlock due to the unique index on collection 

2295 # name. (See DM-47543). 

2296 groups = sorted(grouped_refs.items(), key=lambda item: item[0].run) 

2297 for (dimension_group, run), refs_to_import in progress.iter_item_chunks( 

2298 groups, desc="Importing to registry by run and dataset type" 

2299 ): 

2300 if run not in handled_collections: 

2301 # May need to create output collection. If source butler 

2302 # has a registry, ask for documentation string. 

2303 run_doc = None 

2304 if source_butler is not None and (registry := getattr(source_butler, "registry", None)): 

2305 run_doc = registry.getCollectionDocumentation(run) 

2306 if not dry_run: 

2307 registered = self.collections.register(run, doc=run_doc) 

2308 else: 

2309 registered = True 

2310 handled_collections.add(run) 

2311 if registered: 

2312 _LOG.verbose("Creating output run %s", run) 

2313 

2314 n_refs = len(refs_to_import) 

2315 n_to_import += n_refs 

2316 _LOG.verbose( 

2317 "Importing %d ref%s with dimensions %s into run %s", 

2318 n_refs, 

2319 "" if n_refs == 1 else "s", 

2320 dimension_group.names, 

2321 run, 

2322 ) 

2323 

2324 # Assume we are using UUIDs and the source refs will match 

2325 # those imported. 

2326 if not dry_run: 

2327 imported_refs = self._registry._importDatasets(refs_to_import, expand=expand_refs) 

2328 else: 

2329 imported_refs = refs_to_import 

2330 

2331 all_imported_refs.extend(imported_refs) 

2332 

2333 assert n_to_import == len(all_imported_refs) 

2334 _LOG.verbose("Imported %d datasets into destination butler", n_to_import) 

2335 return all_imported_refs 

2336 

2337 def transfer_from( 

2338 self, 

2339 source_butler: LimitedButler, 

2340 source_refs: Iterable[DatasetRef], 

2341 transfer: str = "auto", 

2342 skip_missing: bool = True, 

2343 register_dataset_types: bool = False, 

2344 transfer_dimensions: bool = False, 

2345 dry_run: bool = False, 

2346 ) -> collections.abc.Collection[DatasetRef]: 

2347 # Docstring inherited. 

2348 source_refs = list(source_refs) 

2349 if not self.isWriteable() and not dry_run: 

2350 raise TypeError("Butler is read-only.") 

2351 

2352 progress = Progress("lsst.daf.butler.Butler.transfer_from", level=VERBOSE) 

2353 

2354 artifact_existence: dict[ResourcePath, bool] = {} 

2355 file_transfer_source = source_butler._file_transfer_source 

2356 transfer_records = retrieve_file_transfer_records( 

2357 file_transfer_source, source_refs, artifact_existence 

2358 ) 

2359 # In some situations the datastore artifact may be missing and we do 

2360 # not want that registry entry to be imported. For example, this can 

2361 # happen if a file was removed but the dataset was left in the registry 

2362 # for provenance, or if a pipeline task didn't create all of the 

2363 # possible files in a QuantumBackedButler. 

2364 if skip_missing: 

2365 original_ids = {ref.id for ref in source_refs} 

2366 missing_ids = original_ids - transfer_records.keys() 

2367 if missing_ids: 

2368 original_count = len(source_refs) 

2369 source_refs = [ref for ref in source_refs if ref.id not in missing_ids] 

2370 filtered_count = len(source_refs) 

2371 n_missing = original_count - filtered_count 

2372 _LOG.verbose( 

2373 "%d dataset%s removed because the artifact does not exist. Now have %d.", 

2374 n_missing, 

2375 "" if n_missing == 1 else "s", 

2376 filtered_count, 

2377 ) 

2378 

2379 import_info = self._prepare_for_import_refs( 

2380 source_butler, 

2381 source_refs, 

2382 register_dataset_types=register_dataset_types, 

2383 dry_run=dry_run, 

2384 transfer_dimensions=transfer_dimensions, 

2385 ) 

2386 

2387 # Do all the importing in a single transaction. 

2388 with self.transaction(): 

2389 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run) 

2390 imported_refs = self._import_grouped_refs( 

2391 import_info.grouped_refs, source_butler, progress, dry_run=dry_run 

2392 ) 

2393 

2394 # Ask the datastore to transfer. The datastore has to check that 

2395 # the source datastore is compatible with the target datastore. 

2396 _LOG.verbose("Transferring %d datasets from %s", len(transfer_records), file_transfer_source.name) 

2397 accepted, rejected = self._datastore.transfer_from( 

2398 transfer_records, 

2399 imported_refs, 

2400 transfer=transfer, 

2401 artifact_existence=artifact_existence, 

2402 dry_run=dry_run, 

2403 ) 

2404 if rejected: 

2405 # For now, accept the registry entries but not the files. 

2406 _LOG.warning( 

2407 "%d datasets were rejected and %d accepted for transfer.", 

2408 len(rejected), 

2409 len(accepted), 

2410 ) 

2411 

2412 return imported_refs 

2413 

2414 def validateConfiguration( 

2415 self, 

2416 logFailures: bool = False, 

2417 datasetTypeNames: Iterable[str] | None = None, 

2418 ignore: Iterable[str] | None = None, 

2419 ) -> None: 

2420 # Docstring inherited. 

2421 if datasetTypeNames: 

2422 datasetTypes = [self.get_dataset_type(name) for name in datasetTypeNames] 

2423 else: 

2424 datasetTypes = list(self._registry.queryDatasetTypes()) 

2425 

2426 # filter out anything from the ignore list 

2427 if ignore: 

2428 ignore = set(ignore) 

2429 datasetTypes = [ 

2430 e for e in datasetTypes if e.name not in ignore and e.nameAndComponent()[0] not in ignore 

2431 ] 

2432 else: 

2433 ignore = set() 

2434 

2435 # For each datasetType that has an instrument dimension, create 

2436 # a DatasetRef for each defined instrument 

2437 datasetRefs = [] 

2438 

2439 # Find all the registered instruments (if "instrument" is in the 

2440 # universe). 

2441 instruments: set[str] = set() 

2442 if "instrument" in self.dimensions: 

2443 instruments = {rec.name for rec in self.query_dimension_records("instrument", explain=False)} 

2444 

2445 for datasetType in datasetTypes: 

2446 if "instrument" in datasetType.dimensions: 

2447 # In order to create a conforming dataset ref, create 

2448 # fake DataCoordinate values for the non-instrument 

2449 # dimensions. The type of the value does not matter here. 

2450 dataId = {dim: 1 for dim in datasetType.dimensions.names if dim != "instrument"} 

2451 

2452 for instrument in instruments: 

2453 datasetRef = DatasetRef( 

2454 datasetType, 

2455 DataCoordinate.standardize( 

2456 dataId, instrument=instrument, dimensions=datasetType.dimensions 

2457 ), 

2458 run="validate", 

2459 ) 

2460 datasetRefs.append(datasetRef) 

2461 

2462 entities: list[DatasetType | DatasetRef] = [] 

2463 entities.extend(datasetTypes) 

2464 entities.extend(datasetRefs) 

2465 

2466 datastoreErrorStr = None 

2467 try: 

2468 self._datastore.validateConfiguration(entities, logFailures=logFailures) 

2469 except ValidationError as e: 

2470 datastoreErrorStr = str(e) 

2471 

2472 # Also check that the LookupKeys used by the datastores match 

2473 # registry and storage class definitions 

2474 keys = self._datastore.getLookupKeys() 

2475 

2476 failedNames = set() 

2477 failedDataId = set() 

2478 for key in keys: 

2479 if key.name is not None: 

2480 if key.name in ignore: 

2481 continue 

2482 

2483 # skip if specific datasetType names were requested and this 

2484 # name does not match 

2485 if datasetTypeNames and key.name not in datasetTypeNames: 

2486 continue 

2487 

2488 # See if it is a StorageClass or a DatasetType 

2489 if key.name in self.storageClasses: 

2490 pass 

2491 else: 

2492 try: 

2493 self.get_dataset_type(key.name) 

2494 except KeyError: 

2495 if logFailures: 

2496 _LOG.critical( 

2497 "Key '%s' does not correspond to a DatasetType or StorageClass", key 

2498 ) 

2499 failedNames.add(key) 

2500 else: 

2501 # Dimensions are checked for consistency when the Butler 

2502 # is created and rendezvoused with a universe. 

2503 pass 

2504 

2505 # Check that the instrument is a valid instrument 

2506 # Currently only support instrument so check for that 

2507 if key.dataId: 

2508 dataIdKeys = set(key.dataId) 

2509 if {"instrument"} != dataIdKeys: 

2510 if logFailures: 

2511 _LOG.critical("Key '%s' has unsupported DataId override", key) 

2512 failedDataId.add(key) 

2513 elif key.dataId["instrument"] not in instruments: 

2514 if logFailures: 

2515 _LOG.critical("Key '%s' has unknown instrument", key) 

2516 failedDataId.add(key) 

2517 

2518 messages = [] 

2519 

2520 if datastoreErrorStr: 

2521 messages.append(datastoreErrorStr) 

2522 

2523 for failed, msg in ( 

2524 (failedNames, "Keys without corresponding DatasetType or StorageClass entry: "), 

2525 (failedDataId, "Keys with bad DataId entries: "), 

2526 ): 

2527 if failed: 

2528 msg += ", ".join(str(k) for k in failed) 

2529 messages.append(msg) 

2530 

2531 if messages: 

2532 raise ValidationError(";\n".join(messages)) 

2533 

2534 @property 

2535 @deprecated( 

2536 "Please use 'collections' instead. collection_chains will be removed after v28.", 

2537 version="v28", 

2538 category=FutureWarning, 

2539 ) 

2540 def collection_chains(self) -> DirectButlerCollections: 

2541 """Object with methods for modifying collection chains.""" 

2542 return DirectButlerCollections(self._registry) 

2543 

2544 @property 

2545 def collections(self) -> DirectButlerCollections: 

2546 """Object with methods for modifying and inspecting collections.""" 

2547 return DirectButlerCollections(self._registry) 

2548 

2549 @property 

2550 def run(self) -> str | None: 

2551 """Name of the run this butler writes outputs to by default (`str` or 

2552 `None`). 

2553 

2554 This is an alias for ``self.registry.defaults.run``. It cannot be set 

2555 directly in isolation, but all defaults may be changed together by 

2556 assigning a new `RegistryDefaults` instance to 

2557 ``self.registry.defaults``. 

2558 """ 

2559 return self._registry.defaults.run 

2560 

2561 @property 

2562 def registry(self) -> Registry: 

2563 """The object that manages dataset metadata and relationships 

2564 (`Registry`). 

2565 

2566 Many operations that don't involve reading or writing butler datasets 

2567 are accessible only via `Registry` methods. Eventually these methods 

2568 will be replaced by equivalent `Butler` methods. 

2569 """ 

2570 return RegistryShim(self) 

2571 

2572 @property 

2573 def dimensions(self) -> DimensionUniverse: 

2574 # Docstring inherited. 

2575 return self._registry.dimensions 

2576 

2577 def query(self) -> contextlib.AbstractContextManager[Query]: 

2578 # Docstring inherited. 

2579 return self._registry._query() 

2580 

2581 def _query_driver( 

2582 self, 

2583 default_collections: Iterable[str], 

2584 default_data_id: DataCoordinate, 

2585 ) -> contextlib.AbstractContextManager[DirectQueryDriver]: 

2586 """Set up a QueryDriver instance for use with this Butler. Although 

2587 this is marked as a private method, it is also used by Butler server. 

2588 """ 

2589 return self._registry._query_driver(default_collections, default_data_id) 

2590 

2591 @contextlib.contextmanager 

2592 def _query_all_datasets_by_page( 

2593 self, args: QueryAllDatasetsParameters 

2594 ) -> Iterator[Iterator[list[DatasetRef]]]: 

2595 with self.query() as query: 

2596 pages = query_all_datasets(self, query, args) 

2597 yield iter(page.data for page in pages) 

2598 

2599 def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None: 

2600 """Immediately load caches that are used for common operations.""" 

2601 self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache) 

2602 

2603 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]: 

2604 return self._registry.expand_data_ids(data_ids) 

2605 

2606 _config: ButlerConfig 

2607 """Configuration for this Butler instance.""" 

2608 

2609 _registry: SqlRegistry 

2610 """The object that manages dataset metadata and relationships 

2611 (`SqlRegistry`). 

2612 

2613 Most operations that don't involve reading or writing butler datasets are 

2614 accessible only via `SqlRegistry` methods. 

2615 """ 

2616 

2617 storageClasses: StorageClassFactory 

2618 """An object that maps known storage class names to objects that fully 

2619 describe them (`StorageClassFactory`). 

2620 """ 

2621 

2622 _closed: bool 

2623 """`True` if close() has already been called on this instance; `False` 

2624 otherwise. 

2625 """ 

2626 

2627 

2628class _RefGroup(NamedTuple): 

2629 """Key identifying a batch of DatasetRefs to be inserted in 

2630 `Butler.transfer_from`. 

2631 """ 

2632 

2633 dimensions: DimensionGroup 

2634 run: str 

2635 

2636 

2637class _ImportDatasetsInfo(NamedTuple): 

2638 """Information extracted from datasets to be imported.""" 

2639 

2640 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] 

2641 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] 

2642 

2643 

2644def _to_uuid(id: DatasetId | str) -> uuid.UUID: 

2645 if isinstance(id, uuid.UUID): 

2646 return id 

2647 else: 

2648 return uuid.UUID(id) 

2649 

2650 

2651class _ButlerClosed: 

2652 def __getattr__(self, name: str) -> Any: 

2653 raise RuntimeError("Attempted to use a Butler instance which has been closed.") 

2654 

2655 

2656_BUTLER_CLOSED_INSTANCE: Any = _ButlerClosed() 

2657 

2658 

2659def _retrieve_dataset_type(registry: SqlRegistry, name: str) -> DatasetType | None: 

2660 """Return DatasetType defined in registry given dataset type name.""" 

2661 try: 

2662 return registry.getDatasetType(name) 

2663 except MissingDatasetTypeError: 

2664 return None