Coverage for python / lsst / daf / butler / registry / managers.py: 31%
162 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "RegistryManagerInstances",
32 "RegistryManagerTypes",
33)
35import dataclasses
36import logging
37from collections.abc import Iterator, Mapping
38from contextlib import contextmanager
39from typing import Any, Generic, TypeVar
41from lsst.utils import doImportType
43from .._config import Config
44from ..dimensions import DimensionConfig, DimensionUniverse
45from ._caching_context import CachingContext
46from ._config import RegistryConfig
47from .interfaces import (
48 ButlerAttributeManager,
49 CollectionManager,
50 Database,
51 DatasetRecordStorageManager,
52 DatastoreRegistryBridgeManager,
53 DimensionRecordStorageManager,
54 ObsCoreTableManager,
55 OpaqueTableStorageManager,
56 StaticTablesContext,
57 VersionedExtension,
58 VersionTuple,
59)
60from .versions import ButlerVersionsManager
62_Attributes = TypeVar("_Attributes")
63_Dimensions = TypeVar("_Dimensions")
64_Collections = TypeVar("_Collections")
65_Datasets = TypeVar("_Datasets")
66_Opaque = TypeVar("_Opaque")
67_Datastores = TypeVar("_Datastores")
68_ObsCore = TypeVar("_ObsCore")
71_LOG = logging.getLogger(__name__)
73# key for dimensions configuration in attributes table
74_DIMENSIONS_ATTR = "config:dimensions.json"
76# key for obscore configuration in attributes table
77_OBSCORE_ATTR = "config:obscore.json"
80@dataclasses.dataclass(frozen=True, eq=False)
81class _GenericRegistryManagers(
82 Generic[_Attributes, _Dimensions, _Collections, _Datasets, _Opaque, _Datastores, _ObsCore]
83):
84 """Base struct used to pass around the manager instances or types that back
85 a `Registry`.
87 This class should only be used via its non-generic subclasses,
88 `RegistryManagerInstances` and `RegistryManagerTypes`.
89 """
91 attributes: _Attributes
92 """Manager for flat key-value pairs, including versions.
93 """
95 dimensions: _Dimensions
96 """Manager for dimensions.
97 """
99 collections: _Collections
100 """Manager for collections.
101 """
103 datasets: _Datasets
104 """Manager for datasets, dataset types, and collection summaries.
105 """
107 opaque: _Opaque
108 """Manager for opaque (to the Registry) tables.
109 """
111 datastores: _Datastores
112 """Manager for the interface between `Registry` and `Datastore`.
113 """
115 obscore: _ObsCore | None
116 """Manager for `ObsCore` table(s).
117 """
120@dataclasses.dataclass(frozen=True, eq=False)
121class RegistryManagerTypes(
122 _GenericRegistryManagers[
123 type[ButlerAttributeManager],
124 type[DimensionRecordStorageManager],
125 type[CollectionManager],
126 type[DatasetRecordStorageManager],
127 type[OpaqueTableStorageManager],
128 type[DatastoreRegistryBridgeManager],
129 type[ObsCoreTableManager],
130 ]
131):
132 """A struct used to pass around the types of the manager objects that back
133 a `Registry`.
134 """
136 @classmethod
137 def fromConfig(cls, config: RegistryConfig) -> RegistryManagerTypes:
138 """Construct by extracting class names from configuration and importing
139 them.
141 Parameters
142 ----------
143 config : `RegistryConfig`
144 Configuration object with a "managers" section that contains all
145 fully-qualified class names for all manager types.
147 Returns
148 -------
149 types : `RegistryManagerTypes`
150 A new struct containing type objects.
151 """
152 # We only check for manager names defined in class attributes.
153 # TODO: Maybe we need to check keys for unknown names/typos?
154 managers = {field.name for field in dataclasses.fields(cls)} - {"manager_configs", "schema_versions"}
155 # Values of "config" sub-key, if any, indexed by manager name.
156 configs: dict[str, Mapping] = {}
157 schema_versions: dict[str, VersionTuple] = {}
158 manager_types: dict[str, type] = {}
159 for manager in managers:
160 manager_config = config["managers"].get(manager)
161 if isinstance(manager_config, Config):
162 # Expect "cls" and optional "config" and "schema_version"
163 # sub-keys.
164 manager_config_dict = manager_config.toDict()
165 try:
166 class_name = manager_config_dict.pop("cls")
167 except KeyError:
168 raise KeyError(f"'cls' key is not defined in {manager!r} manager configuration") from None
169 if (mgr_config := manager_config_dict.pop("config", None)) is not None:
170 configs[manager] = mgr_config
171 if (mgr_version := manager_config_dict.pop("schema_version", None)) is not None:
172 # Note that we do not check versions that come from config
173 # for compatibility, they may be overriden later by
174 # versions from registry.
175 schema_versions[manager] = VersionTuple.fromString(mgr_version)
176 if manager_config_dict:
177 raise ValueError(
178 f"{manager!r} manager configuration has unexpected keys: {set(manager_config_dict)}"
179 )
180 elif isinstance(manager_config, str):
181 class_name = manager_config
182 elif manager_config is None:
183 # Some managers may be optional.
184 continue
185 else:
186 raise KeyError(f"Unexpected type of {manager!r} manager configuration: {manager_config!r}")
187 manager_types[manager] = doImportType(class_name)
189 # obscore need special care because it's the only manager which can be
190 # None, and we cannot define default value for it.
191 if "obscore" in manager_types:
192 return cls(**manager_types, manager_configs=configs, schema_versions=schema_versions)
193 else:
194 return cls(
195 **manager_types, obscore=None, manager_configs=configs, schema_versions=schema_versions
196 )
198 def makeRepo(self, database: Database, dimensionConfig: DimensionConfig) -> RegistryManagerInstances:
199 """Create all persistent `Registry` state for a new, empty data
200 repository, and return a new struct containing manager instances.
202 Parameters
203 ----------
204 database : `Database`
205 Object that represents a connection to the SQL database that will
206 back the data repository. Must point to an empty namespace, or at
207 least one with no tables or other entities whose names might clash
208 with those used by butler.
209 dimensionConfig : `DimensionConfig`
210 Configuration that defines a `DimensionUniverse`, to be written
211 into the data repository and used to define aspects of the schema.
213 Returns
214 -------
215 instances : `RegistryManagerInstances`
216 Struct containing instances of the types contained by ``self``,
217 pointing to the new repository and backed by ``database``.
218 """
219 # If schema versions were specified in the config, check that they are
220 # compatible with their managers.
221 managers = self.as_dict()
222 for manager_type, schema_version in self.schema_versions.items():
223 manager_class = managers[manager_type]
224 manager_class.checkNewSchemaVersion(schema_version)
226 universe = DimensionUniverse(dimensionConfig)
227 with database.declareStaticTables(create=True) as context:
228 instances = RegistryManagerInstances.initialize(database, context, types=self, universe=universe)
230 # store managers and their versions in attributes table
231 versions = ButlerVersionsManager(instances.attributes)
232 versions.storeManagersConfig(instances.as_dict())
234 # dump universe config as json into attributes (faster than YAML)
235 json = dimensionConfig.dump(format="json")
236 if json is not None:
237 instances.attributes.set(_DIMENSIONS_ATTR, json)
238 else:
239 raise RuntimeError("Unexpectedly failed to serialize DimensionConfig to JSON")
240 if instances.obscore is not None:
241 json = instances.obscore.config_json()
242 instances.attributes.set(_OBSCORE_ATTR, json)
243 return instances
245 def loadRepo(self, database: Database) -> RegistryManagerInstances:
246 """Construct manager instances that point to an existing data
247 repository.
249 Parameters
250 ----------
251 database : `Database`
252 Object that represents a connection to the SQL database that backs
253 the data repository. Must point to a namespace that already holds
254 all tables and other persistent entities used by butler.
256 Returns
257 -------
258 instances : `RegistryManagerInstances`
259 Struct containing instances of the types contained by ``self``,
260 pointing to the new repository and backed by ``database``.
261 """
262 # Create attributes manager only first, so we can use it to load the
263 # embedded dimensions configuration. Note that we do not check this
264 # manager version before initializing it, it is supposed to be
265 # completely backward- and forward-compatible.
266 with database.declareStaticTables(create=False) as context:
267 attributes = self.attributes.initialize(database, context)
269 # Verify that configured classes are compatible with the ones stored
270 # in registry.
271 versions = ButlerVersionsManager(attributes)
272 versions.checkManagersConfig(self.as_dict())
274 # Read schema versions from registry and validate them.
275 self.schema_versions.update(versions.managerVersions())
276 for manager_type, manager_class in self.as_dict().items():
277 schema_version = self.schema_versions.get(manager_type)
278 if schema_version is not None:
279 manager_class.checkCompatibility(schema_version, database.isWriteable())
281 # get serialized as a string from database
282 dimensionsString = attributes.get(_DIMENSIONS_ATTR)
283 if dimensionsString is not None:
284 dimensionConfig = DimensionConfig(Config.fromString(dimensionsString, format="json"))
285 else:
286 raise LookupError(f"Registry attribute {_DIMENSIONS_ATTR} is missing from database")
287 universe = DimensionUniverse(dimensionConfig)
288 if self.obscore is not None:
289 # Get ObsCore configuration from attributes table, this silently
290 # overrides whatever may come from config file. Idea is that we do
291 # not want to carry around the whole thing, and butler config will
292 # have empty obscore configuration after initialization. When
293 # configuration is missing from attributes table, the obscore table
294 # does not exist, and we do not instantiate obscore manager.
295 obscoreString = attributes.get(_OBSCORE_ATTR)
296 if obscoreString is not None:
297 self.manager_configs["obscore"] = Config.fromString(obscoreString, format="json")
299 with database.declareStaticTables(create=False) as context:
300 instances = RegistryManagerInstances.initialize(database, context, types=self, universe=universe)
302 # Load content from database that we try to keep in-memory.
303 instances.refresh()
304 return instances
306 def as_dict(self) -> Mapping[str, type[VersionedExtension]]:
307 """Return contained managers as a dictionary with manager type name as
308 a key.
310 Returns
311 -------
312 extensions : `~collections.abc.Mapping` [`str`, `VersionedExtension`]
313 Maps manager type name (e.g. "datasets") to its corresponding
314 manager class. Only existing managers are returned.
315 """
316 extras = {"manager_configs", "schema_versions"}
317 managers = {f.name: getattr(self, f.name) for f in dataclasses.fields(self) if f.name not in extras}
318 return {key: value for key, value in managers.items() if value is not None}
320 manager_configs: dict[str, Mapping] = dataclasses.field(default_factory=dict)
321 """Per-manager configuration options passed to their initialize methods.
322 """
324 schema_versions: dict[str, VersionTuple] = dataclasses.field(default_factory=dict)
325 """Per-manager schema versions defined by configuration, optional."""
328@dataclasses.dataclass(frozen=True, eq=False)
329class RegistryManagerInstances(
330 _GenericRegistryManagers[
331 ButlerAttributeManager,
332 DimensionRecordStorageManager,
333 CollectionManager,
334 DatasetRecordStorageManager,
335 OpaqueTableStorageManager,
336 DatastoreRegistryBridgeManager,
337 ObsCoreTableManager,
338 ]
339):
340 """A struct used to pass around the manager instances that back a
341 `Registry`.
342 """
344 caching_context: CachingContext
345 """Object containing caches for for various information generated by
346 managers.
347 """
349 @contextmanager
350 def caching_context_manager(self) -> Iterator[None]:
351 """Context manager that enables caching.
353 Calls to this method may be nested and the returned context managers
354 may even be closed out of order, with only the context manager entered
355 and the last context manager exited having any effect.
356 """
357 with (
358 self.caching_context.enable_collection_record_cache(),
359 self.caching_context.enable_collection_summary_cache(),
360 ):
361 yield
363 @classmethod
364 def initialize(
365 cls,
366 database: Database,
367 context: StaticTablesContext,
368 *,
369 types: RegistryManagerTypes,
370 universe: DimensionUniverse,
371 caching_context: CachingContext | None = None,
372 ) -> RegistryManagerInstances:
373 """Construct manager instances from their types and an existing
374 database connection.
376 Parameters
377 ----------
378 database : `Database`
379 Object that represents a connection to the SQL database that backs
380 the data repository.
381 context : `StaticTablesContext`
382 Object used to create tables in ``database``.
383 types : `RegistryManagerTypes`
384 Struct containing type objects for the manager instances to
385 construct.
386 universe : `DimensionUniverse`
387 Object that describes all dimensions in this data repository.
388 caching_context : `CachingContext` or `None`, optional
389 Caching context to use.
391 Returns
392 -------
393 instances : `RegistryManagerInstances`
394 Struct containing manager instances.
395 """
396 if caching_context is None:
397 caching_context = CachingContext()
398 kwargs: dict[str, Any] = {}
399 schema_versions = types.schema_versions
400 kwargs["attributes"] = types.attributes.initialize(
401 database, context, registry_schema_version=schema_versions.get("attributes")
402 )
403 kwargs["dimensions"] = types.dimensions.initialize(
404 database, context, universe=universe, registry_schema_version=schema_versions.get("dimensions")
405 )
406 kwargs["collections"] = types.collections.initialize(
407 database,
408 context,
409 caching_context=caching_context,
410 registry_schema_version=schema_versions.get("collections"),
411 )
412 datasets = types.datasets.initialize(
413 database,
414 context,
415 collections=kwargs["collections"],
416 dimensions=kwargs["dimensions"],
417 registry_schema_version=schema_versions.get("datasets"),
418 caching_context=caching_context,
419 )
420 kwargs["datasets"] = datasets
421 kwargs["opaque"] = types.opaque.initialize(
422 database, context, registry_schema_version=schema_versions.get("opaque")
423 )
424 kwargs["datastores"] = types.datastores.initialize(
425 database,
426 context,
427 opaque=kwargs["opaque"],
428 datasets=types.datasets,
429 universe=universe,
430 registry_schema_version=schema_versions.get("datastores"),
431 )
432 if types.obscore is not None and "obscore" in types.manager_configs:
433 kwargs["obscore"] = types.obscore.initialize(
434 database,
435 context,
436 universe=universe,
437 config=types.manager_configs["obscore"],
438 datasets=types.datasets,
439 dimensions=kwargs["dimensions"],
440 registry_schema_version=schema_versions.get("obscore"),
441 )
442 else:
443 kwargs["obscore"] = None
444 kwargs["caching_context"] = caching_context
445 return cls(**kwargs)
447 def clone(
448 self,
449 db: Database,
450 ) -> RegistryManagerInstances:
451 """Make an independent copy of the manager instances with a new
452 `Database` instance.
454 Parameters
455 ----------
456 db : `Database`
457 New `Database` object to use when instantiating managers.
459 Returns
460 -------
461 instances : `RegistryManagerInstances`
462 New manager instances with the same configuration as this instance,
463 but bound to a new Database object.
464 """
465 caching_context = CachingContext()
466 dimensions = self.dimensions.clone(db)
467 collections = self.collections.clone(db, caching_context)
468 opaque = self.opaque.clone(db)
469 datasets = self.datasets.clone(
470 db=db, collections=collections, dimensions=dimensions, caching_context=caching_context
471 )
472 obscore = None
473 if self.obscore is not None:
474 obscore = self.obscore.clone(db=db, dimensions=dimensions)
475 return RegistryManagerInstances(
476 attributes=self.attributes.clone(db),
477 dimensions=dimensions,
478 collections=collections,
479 datasets=datasets,
480 opaque=opaque,
481 datastores=self.datastores.clone(db=db, opaque=opaque),
482 obscore=obscore,
483 caching_context=caching_context,
484 )
486 def as_dict(self) -> Mapping[str, VersionedExtension]:
487 """Return contained managers as a dictionary with manager type name as
488 a key.
490 Returns
491 -------
492 extensions : `~collections.abc.Mapping` [`str`, `VersionedExtension`]
493 Maps manager type name (e.g. "datasets") to its corresponding
494 manager instance. Only existing managers are returned.
495 """
496 instances = {
497 f.name: getattr(self, f.name)
498 for f in dataclasses.fields(self)
499 if f.name not in ("column_types", "caching_context")
500 }
501 return {key: value for key, value in instances.items() if value is not None}
503 def refresh(self) -> None:
504 """Refresh all in-memory state by querying the database or clearing
505 caches.
506 """
507 self.collections.refresh()
508 self.datasets.refresh()