Coverage for tests / test_obscore.py: 16%
314 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import os
29import tempfile
30import unittest
31from abc import abstractmethod
32from typing import cast
34import astropy.time
35import sqlalchemy
37from lsst.daf.butler import (
38 Butler,
39 CollectionType,
40 Config,
41 DataCoordinate,
42 DatasetRef,
43 DatasetType,
44 StorageClassFactory,
45)
46from lsst.daf.butler.registry import RegistryConfig, _RegistryFactory
47from lsst.daf.butler.registry.obscore import (
48 DatasetTypeConfig,
49 ObsCoreConfig,
50 ObsCoreLiveTableManager,
51 ObsCoreSchema,
52 RegionTypeWarning,
53)
54from lsst.daf.butler.registry.obscore._schema import _STATIC_COLUMNS
55from lsst.daf.butler.registry.sql_registry import SqlRegistry
56from lsst.daf.butler.tests.postgresql import setup_postgres_test_db
57from lsst.daf.butler.tests.utils import TestCaseMixin, makeTestTempDir, removeTestTempDir
58from lsst.sphgeom import Box, ConvexPolygon, LonLat, UnitVector3d
60TESTDIR = os.path.abspath(os.path.dirname(__file__))
63class ObsCoreTests(TestCaseMixin):
64 """Base class for testing obscore manager functionality."""
66 root: str
68 def make_registry(
69 self, collections: list[str] | None = None, collection_type: str | None = None
70 ) -> SqlRegistry:
71 """Create new empty Registry."""
72 config = self.make_registry_config(collections, collection_type)
73 registry = _RegistryFactory(config).create_from_config(butlerRoot=self.root)
74 self.addCleanup(registry.close)
75 self.initialize_registry(registry)
76 return registry
78 @abstractmethod
79 def make_registry_config(
80 self, collections: list[str] | None = None, collection_type: str | None = None
81 ) -> RegistryConfig:
82 """Make Registry configuration."""
83 raise NotImplementedError()
85 def initialize_registry(self, registry: SqlRegistry) -> None:
86 """Populate Registry with the things that we need for tests."""
87 registry.insertDimensionData("instrument", {"name": "DummyCam"})
88 registry.insertDimensionData(
89 "physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "r"}
90 )
91 registry.insertDimensionData("day_obs", {"instrument": "DummyCam", "id": 20200101})
92 for detector in (1, 2, 3, 4):
93 registry.insertDimensionData(
94 "detector", {"instrument": "DummyCam", "id": detector, "full_name": f"detector{detector}"}
95 )
97 for exposure in (1, 2, 3, 4):
98 registry.insertDimensionData("group", {"instrument": "DummyCam", "name": f"group{exposure}"})
99 registry.insertDimensionData(
100 "exposure",
101 {
102 "instrument": "DummyCam",
103 "id": exposure,
104 "obs_id": f"exposure{exposure}",
105 "physical_filter": "d-r",
106 "group": f"group{exposure}",
107 "day_obs": 20200101,
108 },
109 )
111 registry.insertDimensionData("visit_system", {"instrument": "DummyCam", "id": 1, "name": "default"})
113 for visit in (1, 2, 3, 4, 9):
114 visit_start = astropy.time.Time(f"2020-01-01 08:0{visit}:00", scale="tai")
115 visit_end = astropy.time.Time(f"2020-01-01 08:0{visit}:45", scale="tai")
116 registry.insertDimensionData(
117 "visit",
118 {
119 "instrument": "DummyCam",
120 "id": visit,
121 "name": f"visit{visit}",
122 "physical_filter": "d-r",
123 "datetime_begin": visit_start,
124 "datetime_end": visit_end,
125 "day_obs": 20200101,
126 },
127 )
128 registry.insertDimensionData(
129 "visit_system_membership",
130 {"instrument": "DummyCam", "visit": visit, "visit_system": 1},
131 )
133 # Only couple of exposures are linked to visits.
134 for visit in (1, 2):
135 registry.insertDimensionData(
136 "visit_definition",
137 {
138 "instrument": "DummyCam",
139 "exposure": visit,
140 "visit": visit,
141 },
142 )
144 # map visit and detector to region
145 self.regions: dict[tuple[int, int], ConvexPolygon] = {}
146 for visit in (1, 2, 3, 4):
147 for detector in (1, 2, 3, 4):
148 lon = visit * 90 - 88
149 lat = detector * 2 - 5
150 region = ConvexPolygon(
151 [
152 UnitVector3d(LonLat.fromDegrees(lon - 1.0, lat - 1.0)),
153 UnitVector3d(LonLat.fromDegrees(lon + 1.0, lat - 1.0)),
154 UnitVector3d(LonLat.fromDegrees(lon + 1.0, lat + 1.0)),
155 UnitVector3d(LonLat.fromDegrees(lon - 1.0, lat + 1.0)),
156 ]
157 )
158 registry.insertDimensionData(
159 "visit_detector_region",
160 {
161 "instrument": "DummyCam",
162 "visit": visit,
163 "detector": detector,
164 "region": region,
165 },
166 )
167 self.regions[(visit, detector)] = region
169 # Visit 9 has non-polygon region
170 for detector in (1, 2, 3, 4):
171 lat = detector * 2 - 5
172 region = Box.fromDegrees(17.0, lat - 1.0, 19.0, lat + 1.0)
173 registry.insertDimensionData(
174 "visit_detector_region",
175 {
176 "instrument": "DummyCam",
177 "visit": 9,
178 "detector": detector,
179 "region": region,
180 },
181 )
183 # Add few dataset types
184 storage_class_factory = StorageClassFactory()
185 storage_class = storage_class_factory.getStorageClass("StructuredDataDict")
187 self.dataset_types: dict[str, DatasetType] = {}
189 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "exposure"])
190 self.dataset_types["raw"] = DatasetType("raw", dimensions, storage_class)
192 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "visit"])
193 self.dataset_types["calexp"] = DatasetType("calexp", dimensions, storage_class)
195 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "visit"])
196 self.dataset_types["no_obscore"] = DatasetType("no_obscore", dimensions, storage_class)
198 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector"])
199 self.dataset_types["calib"] = DatasetType("calib", dimensions, storage_class, isCalibration=True)
201 for dataset_type in self.dataset_types.values():
202 registry.registerDatasetType(dataset_type)
204 # Add few run collections.
205 for run in (1, 2, 3, 4, 5, 6):
206 registry.registerRun(f"run{run}")
208 # Add few chained collections, run6 is not in any chained collections.
209 registry.registerCollection("chain12", CollectionType.CHAINED)
210 registry.setCollectionChain("chain12", ("run1", "run2"))
211 registry.registerCollection("chain34", CollectionType.CHAINED)
212 registry.setCollectionChain("chain34", ("run3", "run4"))
213 registry.registerCollection("chain-all", CollectionType.CHAINED)
214 registry.setCollectionChain("chain-all", ("chain12", "chain34", "run5"))
216 # And a tagged collection
217 registry.registerCollection("tagged", CollectionType.TAGGED)
219 def make_obscore_config(
220 self, collections: list[str] | None = None, collection_type: str | None = None
221 ) -> Config:
222 """Make configuration for obscore manager."""
223 obscore_config = Config(os.path.join(TESTDIR, "config", "basic", "obscore.yaml"))
224 if collections is not None:
225 obscore_config["collections"] = collections
226 if collection_type is not None:
227 obscore_config["collection_type"] = collection_type
228 return obscore_config
230 def _insert_dataset(
231 self, registry: SqlRegistry, run: str, dataset_type: str, do_import: bool = False, **kwargs
232 ) -> DatasetRef:
233 """Insert or import one dataset into a specified run collection."""
234 data_id = {"instrument": "DummyCam", "physical_filter": "d-r"}
235 data_id.update(kwargs)
236 coordinate = DataCoordinate.standardize(data_id, universe=registry.dimensions)
237 if do_import:
238 ds_type = self.dataset_types[dataset_type]
239 ref = DatasetRef(ds_type, coordinate, run=run)
240 [ref] = registry._importDatasets([ref])
241 else:
242 [ref] = registry.insertDatasets(dataset_type, [data_id], run=run)
243 return ref
245 def _insert_datasets(self, registry: SqlRegistry, do_import: bool = False) -> list[DatasetRef]:
246 """Inset a small bunch of datasets into every run collection."""
247 return [
248 self._insert_dataset(registry, "run1", "raw", detector=1, exposure=1, do_import=do_import),
249 self._insert_dataset(registry, "run2", "calexp", detector=2, visit=2, do_import=do_import),
250 self._insert_dataset(registry, "run3", "raw", detector=3, exposure=3, do_import=do_import),
251 self._insert_dataset(registry, "run4", "calexp", detector=4, visit=4, do_import=do_import),
252 self._insert_dataset(registry, "run5", "calexp", detector=4, visit=4, do_import=do_import),
253 # This dataset type is not configured, will not be in obscore.
254 self._insert_dataset(registry, "run5", "no_obscore", detector=1, visit=1, do_import=do_import),
255 self._insert_dataset(registry, "run6", "raw", detector=1, exposure=4, do_import=do_import),
256 ]
258 def test_config_errors(self):
259 """Test for handling various configuration problems."""
260 # This raises pydantic ValidationError, which wraps ValueError
261 exception_re = "'collections' must have one element"
262 with self.assertRaisesRegex(ValueError, exception_re):
263 self.make_registry(None, "TAGGED")
265 with self.assertRaisesRegex(ValueError, exception_re):
266 self.make_registry([], "TAGGED")
268 with self.assertRaisesRegex(ValueError, exception_re):
269 self.make_registry(["run1", "run2"], "TAGGED")
271 # Invalid regex.
272 with self.assertRaisesRegex(ValueError, "Failed to compile regex"):
273 self.make_registry(["+run"], "RUN")
275 def test_schema(self):
276 """Check how obscore schema is constructed"""
277 config = ObsCoreConfig(obs_collection="", dataset_types={}, facility_name="FACILITY")
278 schema = ObsCoreSchema(config, [])
279 table_spec = schema.table_spec
280 self.assertEqual(list(table_spec.fields.names), [col.name for col in _STATIC_COLUMNS])
282 # extra columns from top-level config
283 config = ObsCoreConfig(
284 obs_collection="",
285 extra_columns={"c1": 1, "c2": "string", "c3": {"template": "{calib_level}", "type": "float"}},
286 dataset_types={},
287 facility_name="FACILITY",
288 )
289 schema = ObsCoreSchema(config, [])
290 table_spec = schema.table_spec
291 self.assertEqual(
292 list(table_spec.fields.names),
293 [col.name for col in _STATIC_COLUMNS] + ["c1", "c2", "c3"],
294 )
295 self.assertEqual(table_spec.fields["c1"].dtype, sqlalchemy.BigInteger)
296 self.assertEqual(table_spec.fields["c2"].dtype, sqlalchemy.String)
297 self.assertEqual(table_spec.fields["c3"].dtype, sqlalchemy.Float)
299 # extra columns from per-dataset type configs
300 config = ObsCoreConfig(
301 obs_collection="",
302 extra_columns={"c1": 1},
303 dataset_types={
304 "raw": DatasetTypeConfig(
305 name="raw",
306 dataproduct_type="image",
307 calib_level=1,
308 extra_columns={"c2": "string"},
309 ),
310 "calexp": DatasetTypeConfig(
311 dataproduct_type="image",
312 calib_level=2,
313 extra_columns={"c3": 1e10},
314 ),
315 },
316 facility_name="FACILITY",
317 )
318 schema = ObsCoreSchema(config, [])
319 table_spec = schema.table_spec
320 self.assertEqual(
321 list(table_spec.fields.names),
322 [col.name for col in _STATIC_COLUMNS] + ["c1", "c2", "c3"],
323 )
324 self.assertEqual(table_spec.fields["c1"].dtype, sqlalchemy.BigInteger)
325 self.assertEqual(table_spec.fields["c2"].dtype, sqlalchemy.String)
326 self.assertEqual(table_spec.fields["c3"].dtype, sqlalchemy.Float)
328 # Columns with the same names as in static list in configs, types
329 # are not overriden.
330 config = ObsCoreConfig(
331 version=0,
332 obs_collection="",
333 extra_columns={"t_xel": 1e10},
334 dataset_types={
335 "raw": DatasetTypeConfig(
336 dataproduct_type="image",
337 calib_level=1,
338 extra_columns={"target_name": 1},
339 ),
340 "calexp": DatasetTypeConfig(
341 dataproduct_type="image",
342 calib_level=2,
343 extra_columns={"em_xel": "string"},
344 ),
345 },
346 facility_name="FACILITY",
347 )
348 schema = ObsCoreSchema(config, [])
349 table_spec = schema.table_spec
350 self.assertEqual(list(table_spec.fields.names), [col.name for col in _STATIC_COLUMNS])
351 self.assertEqual(table_spec.fields["t_xel"].dtype, sqlalchemy.Integer)
352 self.assertEqual(table_spec.fields["target_name"].dtype, sqlalchemy.String)
353 self.assertEqual(table_spec.fields["em_xel"].dtype, sqlalchemy.Integer)
355 def test_insert_existing_collection(self):
356 """Test insert and import registry methods, with various restrictions
357 on collection names.
358 """
359 # First item is collections, second item is expected record count.
360 test_data = (
361 (None, 6),
362 (["run1", "run2"], 2),
363 (["run[34]"], 2),
364 (["[rR]un[^6]"], 5),
365 )
367 for collections, count in test_data:
368 for do_import in (False, True):
369 registry = self.make_registry(collections)
370 obscore = registry.obsCoreTableManager
371 assert obscore is not None
372 self._insert_datasets(registry, do_import)
374 with obscore.query() as result:
375 rows = list(result)
376 self.assertEqual(len(rows), count)
378 # Also check `query` method with COUNT(*)
379 with obscore.query([sqlalchemy.sql.func.count()]) as result:
380 scalar = result.scalar_one()
381 self.assertEqual(scalar, count)
383 def test_drop_datasets(self):
384 """Test for dropping datasets after obscore insert."""
385 collections = None
386 registry = self.make_registry(collections)
387 obscore = registry.obsCoreTableManager
388 assert obscore is not None
389 refs = self._insert_datasets(registry)
391 with obscore.query() as result:
392 rows = list(result)
393 self.assertEqual(len(rows), 6)
395 # drop single dataset
396 registry.removeDatasets(ref for ref in refs if ref.run == "run1")
397 with obscore.query() as result:
398 rows = list(result)
399 self.assertEqual(len(rows), 5)
401 # drop whole run collection
402 registry.removeCollection("run6")
403 with obscore.query() as result:
404 rows = list(result)
405 self.assertEqual(len(rows), 4)
407 def test_associate(self):
408 """Test for associating datasets to TAGGED collection."""
409 collections = ["tagged"]
410 registry = self.make_registry(collections, "TAGGED")
411 obscore = registry.obsCoreTableManager
412 assert obscore is not None
413 refs = self._insert_datasets(registry)
415 with obscore.query() as result:
416 rows = list(result)
417 self.assertEqual(len(rows), 0)
419 # Associate datasets that are already in obscore, changes nothing.
420 registry.associate("tagged", (ref for ref in refs if ref.run == "run1"))
421 with obscore.query() as result:
422 rows = list(result)
423 self.assertEqual(len(rows), 1)
425 # Associate datasets that are not in obscore
426 registry.associate("tagged", (ref for ref in refs if ref.run == "run3"))
427 with obscore.query() as result:
428 rows = list(result)
429 self.assertEqual(len(rows), 2)
431 # Disassociate them
432 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run3"))
433 with obscore.query() as result:
434 rows = list(result)
435 self.assertEqual(len(rows), 1)
437 # Non-associated dataset, should be OK and not throw.
438 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run2"))
439 with obscore.query() as result:
440 rows = list(result)
441 self.assertEqual(len(rows), 1)
443 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run1"))
444 with obscore.query() as result:
445 rows = list(result)
446 self.assertEqual(len(rows), 0)
448 def test_region_type_warning(self) -> None:
449 """Test that non-polygon region generates one or more warnings."""
450 collections = None
451 registry = self.make_registry(collections)
453 with self.assertWarns(RegionTypeWarning) as cm:
454 self._insert_dataset(registry, "run2", "calexp", detector=2, visit=9)
455 self.assertRegex(
456 str(cm.warning),
457 "Unexpected region type: .*lsst.sphgeom._sphgeom.Box.*",
458 )
460 def test_update_exposure_region(self) -> None:
461 """Test for update_exposure_regions method."""
462 registry = self.make_registry(["run1"])
463 obscore = registry.obsCoreTableManager
464 assert obscore is not None
466 # Exposure 4 is not associated with any visit.
467 for detector in (1, 2, 3, 4):
468 self._insert_dataset(registry, "run1", "raw", detector=detector, exposure=4)
470 # All spatial columns should be None.
471 with obscore.query() as result:
472 rows = list(result)
473 self.assertEqual(len(rows), 4)
474 for row in rows:
475 self.assertIsNone(row.s_ra)
476 self.assertIsNone(row.s_dec)
477 self.assertIsNone(row.s_region)
479 # Assign Region from visit 4.
480 count = obscore.update_exposure_regions(
481 "DummyCam", [(4, 1, self.regions[(4, 1)]), (4, 2, self.regions[(4, 2)])]
482 )
483 self.assertEqual(count, 2)
485 with obscore.query(["s_ra", "s_dec", "s_region", "lsst_detector", "facility_name"]) as result:
486 rows = list(result)
487 self.assertEqual(len(rows), 4)
488 for row in rows:
489 if row.lsst_detector in (1, 2):
490 self.assertIsNotNone(row.s_ra)
491 self.assertIsNotNone(row.s_dec)
492 self.assertIsNotNone(row.s_region)
493 else:
494 self.assertIsNone(row.s_ra)
495 self.assertIsNone(row.s_dec)
496 self.assertIsNone(row.s_region)
497 self.assertEqual(row.facility_name, "derived_facility")
500class SQLiteObsCoreTest(ObsCoreTests, unittest.TestCase):
501 """Unit test for obscore with SQLite backend."""
503 def setUp(self):
504 self.root = makeTestTempDir(TESTDIR)
506 def tearDown(self):
507 removeTestTempDir(self.root)
509 def make_registry_config(
510 self, collections: list[str] | None = None, collection_type: str | None = None
511 ) -> RegistryConfig:
512 # docstring inherited from a base class
513 _, filename = tempfile.mkstemp(dir=self.root, suffix=".sqlite3")
514 config = RegistryConfig()
515 config["db"] = f"sqlite:///{filename}"
516 config["managers", "obscore"] = {
517 "cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager",
518 "config": self.make_obscore_config(collections, collection_type),
519 }
520 return config
523class ClonedSqliteObscoreTest(SQLiteObsCoreTest, unittest.TestCase):
524 """Unit test for obscore manager created via clone()"""
526 def make_registry(
527 self, collections: list[str] | None = None, collection_type: str | None = None
528 ) -> SqlRegistry:
529 """Create new empty Registry."""
530 original = super().make_registry(collections, collection_type)
531 copy = original.copy()
532 self.addCleanup(copy.close)
533 return copy
536class PostgresObsCoreTest(ObsCoreTests, unittest.TestCase):
537 """Unit test for obscore with PostgreSQL backend."""
539 @classmethod
540 def setUpClass(cls):
541 # Create the postgres test server.
542 cls.postgresql = cls.enterClassContext(setup_postgres_test_db())
543 super().setUpClass()
545 def setUp(self):
546 self.root = makeTestTempDir(TESTDIR)
548 def tearDown(self):
549 removeTestTempDir(self.root)
551 def make_registry_config(
552 self, collections: list[str] | None = None, collection_type: str | None = None
553 ) -> RegistryConfig:
554 # docstring inherited from a base class
555 config = RegistryConfig()
556 self.postgresql.patch_registry_config(config)
558 config["managers", "obscore"] = {
559 "cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager",
560 "config": self.make_obscore_config(collections, collection_type),
561 }
562 return config
565class PostgresPgSphereObsCoreTest(PostgresObsCoreTest):
566 """Unit test for obscore with PostgreSQL backend and pgsphere plugin."""
568 @classmethod
569 def setUpClass(cls):
570 super().setUpClass()
571 with cls.postgresql.begin() as connection:
572 try:
573 connection.execute(sqlalchemy.text("CREATE EXTENSION pg_sphere"))
574 except sqlalchemy.exc.DatabaseError as exc:
575 raise unittest.SkipTest(f"pg_sphere extension does not exist: {exc}") from None
577 def make_obscore_config(
578 self, collections: list[str] | None = None, collection_type: str | None = None
579 ) -> Config:
580 """Make configuration for obscore manager."""
581 obscore_config = super().make_obscore_config(collections, collection_type)
582 obscore_config["spatial_plugins"] = {
583 "pgsphere": {
584 "cls": "lsst.daf.butler.registry.obscore.pgsphere.PgSphereObsCorePlugin",
585 "config": {
586 "region_column": "pgs_region",
587 "position_column": "pgs_center",
588 },
589 }
590 }
591 return obscore_config
593 def test_spatial(self):
594 """Test that pgsphere plugin fills spatial columns."""
595 collections = None
596 registry = self.make_registry(collections)
597 obscore = registry.obsCoreTableManager
598 assert obscore is not None
599 self._insert_datasets(registry)
601 # select everything
602 with obscore.query() as result:
603 rows = list(result)
604 self.assertEqual(len(rows), 6)
606 db = registry._db
607 assert registry.obsCoreTableManager is not None
608 table = cast(ObsCoreLiveTableManager, registry.obsCoreTableManager).table
610 # It's not easy to generate spatial queries in sqlalchemy, use plain
611 # text queries for testing.
613 # position matching visit=1, there is a single dataset
614 query = f"SELECT * FROM {table.key} WHERE pgs_center <-> '(2d,0d)'::spoint < .1"
615 with db.query(sqlalchemy.text(query)) as results:
616 self.assertEqual(len(list(results)), 1)
618 # position matching visit=4, there are two datasets
619 query = f"SELECT * FROM {table.key} WHERE pgs_center <-> '(272d,0d)'::spoint < .1"
620 with db.query(sqlalchemy.text(query)) as results:
621 self.assertEqual(len(list(results)), 2)
623 # position matching visit=1, there is a single dataset
624 query = f"SELECT * FROM {table.key} WHERE '(2d,-3d)'::spoint @ pgs_region"
625 with db.query(sqlalchemy.text(query)) as results:
626 self.assertEqual(len(list(results)), 1)
628 # position matching visit=4, there are two datasets
629 query = f"SELECT * FROM {table.key} WHERE '(272d,3d)'::spoint @ pgs_region"
630 with db.query(sqlalchemy.text(query)) as results:
631 self.assertEqual(len(list(results)), 2)
634class TestMissingObscoreConfig(unittest.TestCase):
635 """Test case for making butler instance with obscore manager but missing
636 configuration.
637 """
639 root: str
641 def setUp(self):
642 self.root = makeTestTempDir(TESTDIR)
644 def tearDown(self):
645 removeTestTempDir(self.root)
647 def make_butler_config(self, obscore_config: str | dict[str, str | dict]) -> Config:
648 registry_config = RegistryConfig()
649 registry_config["db"] = f"sqlite:///{self.root}/butler.sqlite3"
650 registry_config["managers", "obscore"] = obscore_config
651 return Config({"registry": registry_config})
653 def test_missing_config_one(self) -> None:
654 """Test setup when obscore key defines manager class."""
655 config = self.make_butler_config("lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager")
657 with self.assertWarnsRegex(UserWarning, "configuration is missing"):
658 Butler.makeRepo(self.root, config)
660 # Now instanciate Butler from the same repo.
661 Butler.from_config(self.root)
663 def test_missing_config_two(self) -> None:
664 """Test setup when obscore key defines manager class and empty
665 config.
666 """
667 config = self.make_butler_config(
668 {"cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager", "config": {}}
669 )
671 with self.assertWarnsRegex(UserWarning, "configuration is missing"):
672 Butler.makeRepo(self.root, config)
674 # Now instanciate Butler from the same repo.
675 Butler.from_config(self.root)
678if __name__ == "__main__":
679 unittest.main()