Coverage for python / lsst / daf / butler / registry / managers.py: 31%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "RegistryManagerInstances", 

32 "RegistryManagerTypes", 

33) 

34 

35import dataclasses 

36import logging 

37from collections.abc import Iterator, Mapping 

38from contextlib import contextmanager 

39from typing import Any, Generic, TypeVar 

40 

41from lsst.utils import doImportType 

42 

43from .._config import Config 

44from ..dimensions import DimensionConfig, DimensionUniverse 

45from ._caching_context import CachingContext 

46from ._config import RegistryConfig 

47from .interfaces import ( 

48 ButlerAttributeManager, 

49 CollectionManager, 

50 Database, 

51 DatasetRecordStorageManager, 

52 DatastoreRegistryBridgeManager, 

53 DimensionRecordStorageManager, 

54 ObsCoreTableManager, 

55 OpaqueTableStorageManager, 

56 StaticTablesContext, 

57 VersionedExtension, 

58 VersionTuple, 

59) 

60from .versions import ButlerVersionsManager 

61 

62_Attributes = TypeVar("_Attributes") 

63_Dimensions = TypeVar("_Dimensions") 

64_Collections = TypeVar("_Collections") 

65_Datasets = TypeVar("_Datasets") 

66_Opaque = TypeVar("_Opaque") 

67_Datastores = TypeVar("_Datastores") 

68_ObsCore = TypeVar("_ObsCore") 

69 

70 

71_LOG = logging.getLogger(__name__) 

72 

73# key for dimensions configuration in attributes table 

74_DIMENSIONS_ATTR = "config:dimensions.json" 

75 

76# key for obscore configuration in attributes table 

77_OBSCORE_ATTR = "config:obscore.json" 

78 

79 

80@dataclasses.dataclass(frozen=True, eq=False) 

81class _GenericRegistryManagers( 

82 Generic[_Attributes, _Dimensions, _Collections, _Datasets, _Opaque, _Datastores, _ObsCore] 

83): 

84 """Base struct used to pass around the manager instances or types that back 

85 a `Registry`. 

86 

87 This class should only be used via its non-generic subclasses, 

88 `RegistryManagerInstances` and `RegistryManagerTypes`. 

89 """ 

90 

91 attributes: _Attributes 

92 """Manager for flat key-value pairs, including versions. 

93 """ 

94 

95 dimensions: _Dimensions 

96 """Manager for dimensions. 

97 """ 

98 

99 collections: _Collections 

100 """Manager for collections. 

101 """ 

102 

103 datasets: _Datasets 

104 """Manager for datasets, dataset types, and collection summaries. 

105 """ 

106 

107 opaque: _Opaque 

108 """Manager for opaque (to the Registry) tables. 

109 """ 

110 

111 datastores: _Datastores 

112 """Manager for the interface between `Registry` and `Datastore`. 

113 """ 

114 

115 obscore: _ObsCore | None 

116 """Manager for `ObsCore` table(s). 

117 """ 

118 

119 

120@dataclasses.dataclass(frozen=True, eq=False) 

121class RegistryManagerTypes( 

122 _GenericRegistryManagers[ 

123 type[ButlerAttributeManager], 

124 type[DimensionRecordStorageManager], 

125 type[CollectionManager], 

126 type[DatasetRecordStorageManager], 

127 type[OpaqueTableStorageManager], 

128 type[DatastoreRegistryBridgeManager], 

129 type[ObsCoreTableManager], 

130 ] 

131): 

132 """A struct used to pass around the types of the manager objects that back 

133 a `Registry`. 

134 """ 

135 

136 @classmethod 

137 def fromConfig(cls, config: RegistryConfig) -> RegistryManagerTypes: 

138 """Construct by extracting class names from configuration and importing 

139 them. 

140 

141 Parameters 

142 ---------- 

143 config : `RegistryConfig` 

144 Configuration object with a "managers" section that contains all 

145 fully-qualified class names for all manager types. 

146 

147 Returns 

148 ------- 

149 types : `RegistryManagerTypes` 

150 A new struct containing type objects. 

151 """ 

152 # We only check for manager names defined in class attributes. 

153 # TODO: Maybe we need to check keys for unknown names/typos? 

154 managers = {field.name for field in dataclasses.fields(cls)} - {"manager_configs", "schema_versions"} 

155 # Values of "config" sub-key, if any, indexed by manager name. 

156 configs: dict[str, Mapping] = {} 

157 schema_versions: dict[str, VersionTuple] = {} 

158 manager_types: dict[str, type] = {} 

159 for manager in managers: 

160 manager_config = config["managers"].get(manager) 

161 if isinstance(manager_config, Config): 

162 # Expect "cls" and optional "config" and "schema_version" 

163 # sub-keys. 

164 manager_config_dict = manager_config.toDict() 

165 try: 

166 class_name = manager_config_dict.pop("cls") 

167 except KeyError: 

168 raise KeyError(f"'cls' key is not defined in {manager!r} manager configuration") from None 

169 if (mgr_config := manager_config_dict.pop("config", None)) is not None: 

170 configs[manager] = mgr_config 

171 if (mgr_version := manager_config_dict.pop("schema_version", None)) is not None: 

172 # Note that we do not check versions that come from config 

173 # for compatibility, they may be overriden later by 

174 # versions from registry. 

175 schema_versions[manager] = VersionTuple.fromString(mgr_version) 

176 if manager_config_dict: 

177 raise ValueError( 

178 f"{manager!r} manager configuration has unexpected keys: {set(manager_config_dict)}" 

179 ) 

180 elif isinstance(manager_config, str): 

181 class_name = manager_config 

182 elif manager_config is None: 

183 # Some managers may be optional. 

184 continue 

185 else: 

186 raise KeyError(f"Unexpected type of {manager!r} manager configuration: {manager_config!r}") 

187 manager_types[manager] = doImportType(class_name) 

188 

189 # obscore need special care because it's the only manager which can be 

190 # None, and we cannot define default value for it. 

191 if "obscore" in manager_types: 

192 return cls(**manager_types, manager_configs=configs, schema_versions=schema_versions) 

193 else: 

194 return cls( 

195 **manager_types, obscore=None, manager_configs=configs, schema_versions=schema_versions 

196 ) 

197 

198 def makeRepo(self, database: Database, dimensionConfig: DimensionConfig) -> RegistryManagerInstances: 

199 """Create all persistent `Registry` state for a new, empty data 

200 repository, and return a new struct containing manager instances. 

201 

202 Parameters 

203 ---------- 

204 database : `Database` 

205 Object that represents a connection to the SQL database that will 

206 back the data repository. Must point to an empty namespace, or at 

207 least one with no tables or other entities whose names might clash 

208 with those used by butler. 

209 dimensionConfig : `DimensionConfig` 

210 Configuration that defines a `DimensionUniverse`, to be written 

211 into the data repository and used to define aspects of the schema. 

212 

213 Returns 

214 ------- 

215 instances : `RegistryManagerInstances` 

216 Struct containing instances of the types contained by ``self``, 

217 pointing to the new repository and backed by ``database``. 

218 """ 

219 # If schema versions were specified in the config, check that they are 

220 # compatible with their managers. 

221 managers = self.as_dict() 

222 for manager_type, schema_version in self.schema_versions.items(): 

223 manager_class = managers[manager_type] 

224 manager_class.checkNewSchemaVersion(schema_version) 

225 

226 universe = DimensionUniverse(dimensionConfig) 

227 with database.declareStaticTables(create=True) as context: 

228 instances = RegistryManagerInstances.initialize( 

229 database, context, types=self, universe=universe, manager_configs=self.manager_configs 

230 ) 

231 

232 # store managers and their versions in attributes table 

233 versions = ButlerVersionsManager(instances.attributes) 

234 versions.storeManagersConfig(instances.as_dict()) 

235 

236 # dump universe config as json into attributes (faster than YAML) 

237 json = dimensionConfig.dump(format="json") 

238 if json is not None: 

239 instances.attributes.set(_DIMENSIONS_ATTR, json) 

240 else: 

241 raise RuntimeError("Unexpectedly failed to serialize DimensionConfig to JSON") 

242 if instances.obscore is not None: 

243 json = instances.obscore.config_json() 

244 instances.attributes.set(_OBSCORE_ATTR, json) 

245 return instances 

246 

247 def loadRepo(self, database: Database) -> RegistryManagerInstances: 

248 """Construct manager instances that point to an existing data 

249 repository. 

250 

251 Parameters 

252 ---------- 

253 database : `Database` 

254 Object that represents a connection to the SQL database that backs 

255 the data repository. Must point to a namespace that already holds 

256 all tables and other persistent entities used by butler. 

257 

258 Returns 

259 ------- 

260 instances : `RegistryManagerInstances` 

261 Struct containing instances of the types contained by ``self``, 

262 pointing to the new repository and backed by ``database``. 

263 """ 

264 # Create attributes manager only first, so we can use it to load the 

265 # embedded dimensions configuration. Note that we do not check this 

266 # manager version before initializing it, it is supposed to be 

267 # completely backward- and forward-compatible. 

268 with database.declareStaticTables(create=False) as context: 

269 attributes = self.attributes.initialize(database, context) 

270 

271 # Verify that configured classes are compatible with the ones stored 

272 # in registry. 

273 versions = ButlerVersionsManager(attributes) 

274 versions.checkManagersConfig(self.as_dict()) 

275 

276 # Read schema versions from registry and validate them. 

277 self.schema_versions.update(versions.managerVersions()) 

278 for manager_type, manager_class in self.as_dict().items(): 

279 schema_version = self.schema_versions.get(manager_type) 

280 if schema_version is not None: 

281 manager_class.checkCompatibility(schema_version, database.isWriteable()) 

282 

283 # get serialized as a string from database 

284 dimensionsString = attributes.get(_DIMENSIONS_ATTR) 

285 if dimensionsString is not None: 

286 dimensionConfig = DimensionConfig(Config.fromString(dimensionsString, format="json")) 

287 else: 

288 raise LookupError(f"Registry attribute {_DIMENSIONS_ATTR} is missing from database") 

289 universe = DimensionUniverse(dimensionConfig) 

290 if self.obscore is not None: 

291 # Get ObsCore configuration from attributes table, this silently 

292 # overrides whatever may come from config file. Idea is that we do 

293 # not want to carry around the whole thing, and butler config will 

294 # have empty obscore configuration after initialization. When 

295 # configuration is missing from attributes table, the obscore table 

296 # does not exist, and we do not instantiate obscore manager. 

297 obscoreString = attributes.get(_OBSCORE_ATTR) 

298 if obscoreString is not None: 

299 self.manager_configs["obscore"] = Config.fromString(obscoreString, format="json") 

300 

301 with database.declareStaticTables(create=False) as context: 

302 instances = RegistryManagerInstances.initialize( 

303 database, context, types=self, universe=universe, manager_configs=self.manager_configs 

304 ) 

305 

306 # Load content from database that we try to keep in-memory. 

307 instances.refresh() 

308 return instances 

309 

310 def as_dict(self) -> Mapping[str, type[VersionedExtension]]: 

311 """Return contained managers as a dictionary with manager type name as 

312 a key. 

313 

314 Returns 

315 ------- 

316 extensions : `~collections.abc.Mapping` [`str`, `VersionedExtension`] 

317 Maps manager type name (e.g. "datasets") to its corresponding 

318 manager class. Only existing managers are returned. 

319 """ 

320 extras = {"manager_configs", "schema_versions"} 

321 managers = {f.name: getattr(self, f.name) for f in dataclasses.fields(self) if f.name not in extras} 

322 return {key: value for key, value in managers.items() if value is not None} 

323 

324 manager_configs: dict[str, Mapping] = dataclasses.field(default_factory=dict) 

325 """Per-manager configuration options passed to their initialize methods. 

326 """ 

327 

328 schema_versions: dict[str, VersionTuple] = dataclasses.field(default_factory=dict) 

329 """Per-manager schema versions defined by configuration, optional.""" 

330 

331 

332@dataclasses.dataclass(frozen=True, eq=False) 

333class RegistryManagerInstances( 

334 _GenericRegistryManagers[ 

335 ButlerAttributeManager, 

336 DimensionRecordStorageManager, 

337 CollectionManager, 

338 DatasetRecordStorageManager, 

339 OpaqueTableStorageManager, 

340 DatastoreRegistryBridgeManager, 

341 ObsCoreTableManager, 

342 ] 

343): 

344 """A struct used to pass around the manager instances that back a 

345 `Registry`. 

346 """ 

347 

348 caching_context: CachingContext 

349 """Object containing caches for for various information generated by 

350 managers. 

351 """ 

352 

353 @contextmanager 

354 def caching_context_manager(self) -> Iterator[None]: 

355 """Context manager that enables caching. 

356 

357 Calls to this method may be nested and the returned context managers 

358 may even be closed out of order, with only the context manager entered 

359 and the last context manager exited having any effect. 

360 """ 

361 with ( 

362 self.caching_context.enable_collection_record_cache(), 

363 self.caching_context.enable_collection_summary_cache(), 

364 ): 

365 yield 

366 

367 @classmethod 

368 def initialize( 

369 cls, 

370 database: Database, 

371 context: StaticTablesContext, 

372 *, 

373 types: RegistryManagerTypes, 

374 universe: DimensionUniverse, 

375 manager_configs: dict[str, Mapping], 

376 caching_context: CachingContext | None = None, 

377 ) -> RegistryManagerInstances: 

378 """Construct manager instances from their types and an existing 

379 database connection. 

380 

381 Parameters 

382 ---------- 

383 database : `Database` 

384 Object that represents a connection to the SQL database that backs 

385 the data repository. 

386 context : `StaticTablesContext` 

387 Object used to create tables in ``database``. 

388 types : `RegistryManagerTypes` 

389 Struct containing type objects for the manager instances to 

390 construct. 

391 universe : `DimensionUniverse` 

392 Object that describes all dimensions in this data repository. 

393 manager_configs 

394 Additional configuration for individual managers. 

395 caching_context : `CachingContext` or `None`, optional 

396 Caching context to use. 

397 

398 Returns 

399 ------- 

400 instances : `RegistryManagerInstances` 

401 Struct containing manager instances. 

402 """ 

403 if caching_context is None: 

404 caching_context = CachingContext() 

405 kwargs: dict[str, Any] = {} 

406 schema_versions = types.schema_versions 

407 kwargs["attributes"] = types.attributes.initialize( 

408 database, context, registry_schema_version=schema_versions.get("attributes") 

409 ) 

410 kwargs["dimensions"] = types.dimensions.initialize( 

411 database, context, universe=universe, registry_schema_version=schema_versions.get("dimensions") 

412 ) 

413 kwargs["collections"] = types.collections.initialize( 

414 database, 

415 context, 

416 caching_context=caching_context, 

417 registry_schema_version=schema_versions.get("collections"), 

418 ) 

419 datasets = types.datasets.initialize( 

420 database, 

421 context, 

422 collections=kwargs["collections"], 

423 dimensions=kwargs["dimensions"], 

424 registry_schema_version=schema_versions.get("datasets"), 

425 caching_context=caching_context, 

426 config=manager_configs.get("datasets", {}), 

427 ) 

428 kwargs["datasets"] = datasets 

429 kwargs["opaque"] = types.opaque.initialize( 

430 database, context, registry_schema_version=schema_versions.get("opaque") 

431 ) 

432 kwargs["datastores"] = types.datastores.initialize( 

433 database, 

434 context, 

435 opaque=kwargs["opaque"], 

436 datasets=types.datasets, 

437 universe=universe, 

438 registry_schema_version=schema_versions.get("datastores"), 

439 ) 

440 if types.obscore is not None and "obscore" in types.manager_configs: 

441 kwargs["obscore"] = types.obscore.initialize( 

442 database, 

443 context, 

444 universe=universe, 

445 config=types.manager_configs["obscore"], 

446 datasets=types.datasets, 

447 dimensions=kwargs["dimensions"], 

448 registry_schema_version=schema_versions.get("obscore"), 

449 ) 

450 else: 

451 kwargs["obscore"] = None 

452 kwargs["caching_context"] = caching_context 

453 return cls(**kwargs) 

454 

455 def clone( 

456 self, 

457 db: Database, 

458 ) -> RegistryManagerInstances: 

459 """Make an independent copy of the manager instances with a new 

460 `Database` instance. 

461 

462 Parameters 

463 ---------- 

464 db : `Database` 

465 New `Database` object to use when instantiating managers. 

466 

467 Returns 

468 ------- 

469 instances : `RegistryManagerInstances` 

470 New manager instances with the same configuration as this instance, 

471 but bound to a new Database object. 

472 """ 

473 caching_context = CachingContext() 

474 dimensions = self.dimensions.clone(db) 

475 collections = self.collections.clone(db, caching_context) 

476 opaque = self.opaque.clone(db) 

477 datasets = self.datasets.clone( 

478 db=db, collections=collections, dimensions=dimensions, caching_context=caching_context 

479 ) 

480 obscore = None 

481 if self.obscore is not None: 

482 obscore = self.obscore.clone(db=db, dimensions=dimensions) 

483 return RegistryManagerInstances( 

484 attributes=self.attributes.clone(db), 

485 dimensions=dimensions, 

486 collections=collections, 

487 datasets=datasets, 

488 opaque=opaque, 

489 datastores=self.datastores.clone(db=db, opaque=opaque), 

490 obscore=obscore, 

491 caching_context=caching_context, 

492 ) 

493 

494 def as_dict(self) -> Mapping[str, VersionedExtension]: 

495 """Return contained managers as a dictionary with manager type name as 

496 a key. 

497 

498 Returns 

499 ------- 

500 extensions : `~collections.abc.Mapping` [`str`, `VersionedExtension`] 

501 Maps manager type name (e.g. "datasets") to its corresponding 

502 manager instance. Only existing managers are returned. 

503 """ 

504 instances = { 

505 f.name: getattr(self, f.name) 

506 for f in dataclasses.fields(self) 

507 if f.name not in ("column_types", "caching_context") 

508 } 

509 return {key: value for key, value in instances.items() if value is not None} 

510 

511 def refresh(self) -> None: 

512 """Refresh all in-memory state by querying the database or clearing 

513 caches. 

514 """ 

515 self.collections.refresh() 

516 self.datasets.refresh()