Coverage for tests / test_obscore.py: 16%

314 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import os 

29import tempfile 

30import unittest 

31from abc import abstractmethod 

32from typing import cast 

33 

34import astropy.time 

35import sqlalchemy 

36 

37from lsst.daf.butler import ( 

38 Butler, 

39 CollectionType, 

40 Config, 

41 DataCoordinate, 

42 DatasetRef, 

43 DatasetType, 

44 StorageClassFactory, 

45) 

46from lsst.daf.butler.registry import RegistryConfig, _RegistryFactory 

47from lsst.daf.butler.registry.obscore import ( 

48 DatasetTypeConfig, 

49 ObsCoreConfig, 

50 ObsCoreLiveTableManager, 

51 ObsCoreSchema, 

52 RegionTypeWarning, 

53) 

54from lsst.daf.butler.registry.obscore._schema import _STATIC_COLUMNS 

55from lsst.daf.butler.registry.sql_registry import SqlRegistry 

56from lsst.daf.butler.tests.postgresql import setup_postgres_test_db 

57from lsst.daf.butler.tests.utils import TestCaseMixin, makeTestTempDir, removeTestTempDir 

58from lsst.sphgeom import Box, ConvexPolygon, LonLat, UnitVector3d 

59 

60TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

61 

62 

63class ObsCoreTests(TestCaseMixin): 

64 """Base class for testing obscore manager functionality.""" 

65 

66 root: str 

67 

68 def make_registry( 

69 self, collections: list[str] | None = None, collection_type: str | None = None 

70 ) -> SqlRegistry: 

71 """Create new empty Registry.""" 

72 config = self.make_registry_config(collections, collection_type) 

73 registry = _RegistryFactory(config).create_from_config(butlerRoot=self.root) 

74 self.addCleanup(registry.close) 

75 self.initialize_registry(registry) 

76 return registry 

77 

78 @abstractmethod 

79 def make_registry_config( 

80 self, collections: list[str] | None = None, collection_type: str | None = None 

81 ) -> RegistryConfig: 

82 """Make Registry configuration.""" 

83 raise NotImplementedError() 

84 

85 def initialize_registry(self, registry: SqlRegistry) -> None: 

86 """Populate Registry with the things that we need for tests.""" 

87 registry.insertDimensionData("instrument", {"name": "DummyCam"}) 

88 registry.insertDimensionData( 

89 "physical_filter", {"instrument": "DummyCam", "name": "d-r", "band": "r"} 

90 ) 

91 registry.insertDimensionData("day_obs", {"instrument": "DummyCam", "id": 20200101}) 

92 for detector in (1, 2, 3, 4): 

93 registry.insertDimensionData( 

94 "detector", {"instrument": "DummyCam", "id": detector, "full_name": f"detector{detector}"} 

95 ) 

96 

97 for exposure in (1, 2, 3, 4): 

98 registry.insertDimensionData("group", {"instrument": "DummyCam", "name": f"group{exposure}"}) 

99 registry.insertDimensionData( 

100 "exposure", 

101 { 

102 "instrument": "DummyCam", 

103 "id": exposure, 

104 "obs_id": f"exposure{exposure}", 

105 "physical_filter": "d-r", 

106 "group": f"group{exposure}", 

107 "day_obs": 20200101, 

108 }, 

109 ) 

110 

111 registry.insertDimensionData("visit_system", {"instrument": "DummyCam", "id": 1, "name": "default"}) 

112 

113 for visit in (1, 2, 3, 4, 9): 

114 visit_start = astropy.time.Time(f"2020-01-01 08:0{visit}:00", scale="tai") 

115 visit_end = astropy.time.Time(f"2020-01-01 08:0{visit}:45", scale="tai") 

116 registry.insertDimensionData( 

117 "visit", 

118 { 

119 "instrument": "DummyCam", 

120 "id": visit, 

121 "name": f"visit{visit}", 

122 "physical_filter": "d-r", 

123 "datetime_begin": visit_start, 

124 "datetime_end": visit_end, 

125 "day_obs": 20200101, 

126 }, 

127 ) 

128 registry.insertDimensionData( 

129 "visit_system_membership", 

130 {"instrument": "DummyCam", "visit": visit, "visit_system": 1}, 

131 ) 

132 

133 # Only couple of exposures are linked to visits. 

134 for visit in (1, 2): 

135 registry.insertDimensionData( 

136 "visit_definition", 

137 { 

138 "instrument": "DummyCam", 

139 "exposure": visit, 

140 "visit": visit, 

141 }, 

142 ) 

143 

144 # map visit and detector to region 

145 self.regions: dict[tuple[int, int], ConvexPolygon] = {} 

146 for visit in (1, 2, 3, 4): 

147 for detector in (1, 2, 3, 4): 

148 lon = visit * 90 - 88 

149 lat = detector * 2 - 5 

150 region = ConvexPolygon( 

151 [ 

152 UnitVector3d(LonLat.fromDegrees(lon - 1.0, lat - 1.0)), 

153 UnitVector3d(LonLat.fromDegrees(lon + 1.0, lat - 1.0)), 

154 UnitVector3d(LonLat.fromDegrees(lon + 1.0, lat + 1.0)), 

155 UnitVector3d(LonLat.fromDegrees(lon - 1.0, lat + 1.0)), 

156 ] 

157 ) 

158 registry.insertDimensionData( 

159 "visit_detector_region", 

160 { 

161 "instrument": "DummyCam", 

162 "visit": visit, 

163 "detector": detector, 

164 "region": region, 

165 }, 

166 ) 

167 self.regions[(visit, detector)] = region 

168 

169 # Visit 9 has non-polygon region 

170 for detector in (1, 2, 3, 4): 

171 lat = detector * 2 - 5 

172 region = Box.fromDegrees(17.0, lat - 1.0, 19.0, lat + 1.0) 

173 registry.insertDimensionData( 

174 "visit_detector_region", 

175 { 

176 "instrument": "DummyCam", 

177 "visit": 9, 

178 "detector": detector, 

179 "region": region, 

180 }, 

181 ) 

182 

183 # Add few dataset types 

184 storage_class_factory = StorageClassFactory() 

185 storage_class = storage_class_factory.getStorageClass("StructuredDataDict") 

186 

187 self.dataset_types: dict[str, DatasetType] = {} 

188 

189 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "exposure"]) 

190 self.dataset_types["raw"] = DatasetType("raw", dimensions, storage_class) 

191 

192 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "visit"]) 

193 self.dataset_types["calexp"] = DatasetType("calexp", dimensions, storage_class) 

194 

195 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector", "visit"]) 

196 self.dataset_types["no_obscore"] = DatasetType("no_obscore", dimensions, storage_class) 

197 

198 dimensions = registry.dimensions.conform(["instrument", "physical_filter", "detector"]) 

199 self.dataset_types["calib"] = DatasetType("calib", dimensions, storage_class, isCalibration=True) 

200 

201 for dataset_type in self.dataset_types.values(): 

202 registry.registerDatasetType(dataset_type) 

203 

204 # Add few run collections. 

205 for run in (1, 2, 3, 4, 5, 6): 

206 registry.registerRun(f"run{run}") 

207 

208 # Add few chained collections, run6 is not in any chained collections. 

209 registry.registerCollection("chain12", CollectionType.CHAINED) 

210 registry.setCollectionChain("chain12", ("run1", "run2")) 

211 registry.registerCollection("chain34", CollectionType.CHAINED) 

212 registry.setCollectionChain("chain34", ("run3", "run4")) 

213 registry.registerCollection("chain-all", CollectionType.CHAINED) 

214 registry.setCollectionChain("chain-all", ("chain12", "chain34", "run5")) 

215 

216 # And a tagged collection 

217 registry.registerCollection("tagged", CollectionType.TAGGED) 

218 

219 def make_obscore_config( 

220 self, collections: list[str] | None = None, collection_type: str | None = None 

221 ) -> Config: 

222 """Make configuration for obscore manager.""" 

223 obscore_config = Config(os.path.join(TESTDIR, "config", "basic", "obscore.yaml")) 

224 if collections is not None: 

225 obscore_config["collections"] = collections 

226 if collection_type is not None: 

227 obscore_config["collection_type"] = collection_type 

228 return obscore_config 

229 

230 def _insert_dataset( 

231 self, registry: SqlRegistry, run: str, dataset_type: str, do_import: bool = False, **kwargs 

232 ) -> DatasetRef: 

233 """Insert or import one dataset into a specified run collection.""" 

234 data_id = {"instrument": "DummyCam", "physical_filter": "d-r"} 

235 data_id.update(kwargs) 

236 coordinate = DataCoordinate.standardize(data_id, universe=registry.dimensions) 

237 if do_import: 

238 ds_type = self.dataset_types[dataset_type] 

239 ref = DatasetRef(ds_type, coordinate, run=run) 

240 [ref] = registry._importDatasets([ref]) 

241 else: 

242 [ref] = registry.insertDatasets(dataset_type, [data_id], run=run) 

243 return ref 

244 

245 def _insert_datasets(self, registry: SqlRegistry, do_import: bool = False) -> list[DatasetRef]: 

246 """Inset a small bunch of datasets into every run collection.""" 

247 return [ 

248 self._insert_dataset(registry, "run1", "raw", detector=1, exposure=1, do_import=do_import), 

249 self._insert_dataset(registry, "run2", "calexp", detector=2, visit=2, do_import=do_import), 

250 self._insert_dataset(registry, "run3", "raw", detector=3, exposure=3, do_import=do_import), 

251 self._insert_dataset(registry, "run4", "calexp", detector=4, visit=4, do_import=do_import), 

252 self._insert_dataset(registry, "run5", "calexp", detector=4, visit=4, do_import=do_import), 

253 # This dataset type is not configured, will not be in obscore. 

254 self._insert_dataset(registry, "run5", "no_obscore", detector=1, visit=1, do_import=do_import), 

255 self._insert_dataset(registry, "run6", "raw", detector=1, exposure=4, do_import=do_import), 

256 ] 

257 

258 def test_config_errors(self): 

259 """Test for handling various configuration problems.""" 

260 # This raises pydantic ValidationError, which wraps ValueError 

261 exception_re = "'collections' must have one element" 

262 with self.assertRaisesRegex(ValueError, exception_re): 

263 self.make_registry(None, "TAGGED") 

264 

265 with self.assertRaisesRegex(ValueError, exception_re): 

266 self.make_registry([], "TAGGED") 

267 

268 with self.assertRaisesRegex(ValueError, exception_re): 

269 self.make_registry(["run1", "run2"], "TAGGED") 

270 

271 # Invalid regex. 

272 with self.assertRaisesRegex(ValueError, "Failed to compile regex"): 

273 self.make_registry(["+run"], "RUN") 

274 

275 def test_schema(self): 

276 """Check how obscore schema is constructed""" 

277 config = ObsCoreConfig(obs_collection="", dataset_types={}, facility_name="FACILITY") 

278 schema = ObsCoreSchema(config, []) 

279 table_spec = schema.table_spec 

280 self.assertEqual(list(table_spec.fields.names), [col.name for col in _STATIC_COLUMNS]) 

281 

282 # extra columns from top-level config 

283 config = ObsCoreConfig( 

284 obs_collection="", 

285 extra_columns={"c1": 1, "c2": "string", "c3": {"template": "{calib_level}", "type": "float"}}, 

286 dataset_types={}, 

287 facility_name="FACILITY", 

288 ) 

289 schema = ObsCoreSchema(config, []) 

290 table_spec = schema.table_spec 

291 self.assertEqual( 

292 list(table_spec.fields.names), 

293 [col.name for col in _STATIC_COLUMNS] + ["c1", "c2", "c3"], 

294 ) 

295 self.assertEqual(table_spec.fields["c1"].dtype, sqlalchemy.BigInteger) 

296 self.assertEqual(table_spec.fields["c2"].dtype, sqlalchemy.String) 

297 self.assertEqual(table_spec.fields["c3"].dtype, sqlalchemy.Float) 

298 

299 # extra columns from per-dataset type configs 

300 config = ObsCoreConfig( 

301 obs_collection="", 

302 extra_columns={"c1": 1}, 

303 dataset_types={ 

304 "raw": DatasetTypeConfig( 

305 name="raw", 

306 dataproduct_type="image", 

307 calib_level=1, 

308 extra_columns={"c2": "string"}, 

309 ), 

310 "calexp": DatasetTypeConfig( 

311 dataproduct_type="image", 

312 calib_level=2, 

313 extra_columns={"c3": 1e10}, 

314 ), 

315 }, 

316 facility_name="FACILITY", 

317 ) 

318 schema = ObsCoreSchema(config, []) 

319 table_spec = schema.table_spec 

320 self.assertEqual( 

321 list(table_spec.fields.names), 

322 [col.name for col in _STATIC_COLUMNS] + ["c1", "c2", "c3"], 

323 ) 

324 self.assertEqual(table_spec.fields["c1"].dtype, sqlalchemy.BigInteger) 

325 self.assertEqual(table_spec.fields["c2"].dtype, sqlalchemy.String) 

326 self.assertEqual(table_spec.fields["c3"].dtype, sqlalchemy.Float) 

327 

328 # Columns with the same names as in static list in configs, types 

329 # are not overriden. 

330 config = ObsCoreConfig( 

331 version=0, 

332 obs_collection="", 

333 extra_columns={"t_xel": 1e10}, 

334 dataset_types={ 

335 "raw": DatasetTypeConfig( 

336 dataproduct_type="image", 

337 calib_level=1, 

338 extra_columns={"target_name": 1}, 

339 ), 

340 "calexp": DatasetTypeConfig( 

341 dataproduct_type="image", 

342 calib_level=2, 

343 extra_columns={"em_xel": "string"}, 

344 ), 

345 }, 

346 facility_name="FACILITY", 

347 ) 

348 schema = ObsCoreSchema(config, []) 

349 table_spec = schema.table_spec 

350 self.assertEqual(list(table_spec.fields.names), [col.name for col in _STATIC_COLUMNS]) 

351 self.assertEqual(table_spec.fields["t_xel"].dtype, sqlalchemy.Integer) 

352 self.assertEqual(table_spec.fields["target_name"].dtype, sqlalchemy.String) 

353 self.assertEqual(table_spec.fields["em_xel"].dtype, sqlalchemy.Integer) 

354 

355 def test_insert_existing_collection(self): 

356 """Test insert and import registry methods, with various restrictions 

357 on collection names. 

358 """ 

359 # First item is collections, second item is expected record count. 

360 test_data = ( 

361 (None, 6), 

362 (["run1", "run2"], 2), 

363 (["run[34]"], 2), 

364 (["[rR]un[^6]"], 5), 

365 ) 

366 

367 for collections, count in test_data: 

368 for do_import in (False, True): 

369 registry = self.make_registry(collections) 

370 obscore = registry.obsCoreTableManager 

371 assert obscore is not None 

372 self._insert_datasets(registry, do_import) 

373 

374 with obscore.query() as result: 

375 rows = list(result) 

376 self.assertEqual(len(rows), count) 

377 

378 # Also check `query` method with COUNT(*) 

379 with obscore.query([sqlalchemy.sql.func.count()]) as result: 

380 scalar = result.scalar_one() 

381 self.assertEqual(scalar, count) 

382 

383 def test_drop_datasets(self): 

384 """Test for dropping datasets after obscore insert.""" 

385 collections = None 

386 registry = self.make_registry(collections) 

387 obscore = registry.obsCoreTableManager 

388 assert obscore is not None 

389 refs = self._insert_datasets(registry) 

390 

391 with obscore.query() as result: 

392 rows = list(result) 

393 self.assertEqual(len(rows), 6) 

394 

395 # drop single dataset 

396 registry.removeDatasets(ref for ref in refs if ref.run == "run1") 

397 with obscore.query() as result: 

398 rows = list(result) 

399 self.assertEqual(len(rows), 5) 

400 

401 # drop whole run collection 

402 registry.removeCollection("run6") 

403 with obscore.query() as result: 

404 rows = list(result) 

405 self.assertEqual(len(rows), 4) 

406 

407 def test_associate(self): 

408 """Test for associating datasets to TAGGED collection.""" 

409 collections = ["tagged"] 

410 registry = self.make_registry(collections, "TAGGED") 

411 obscore = registry.obsCoreTableManager 

412 assert obscore is not None 

413 refs = self._insert_datasets(registry) 

414 

415 with obscore.query() as result: 

416 rows = list(result) 

417 self.assertEqual(len(rows), 0) 

418 

419 # Associate datasets that are already in obscore, changes nothing. 

420 registry.associate("tagged", (ref for ref in refs if ref.run == "run1")) 

421 with obscore.query() as result: 

422 rows = list(result) 

423 self.assertEqual(len(rows), 1) 

424 

425 # Associate datasets that are not in obscore 

426 registry.associate("tagged", (ref for ref in refs if ref.run == "run3")) 

427 with obscore.query() as result: 

428 rows = list(result) 

429 self.assertEqual(len(rows), 2) 

430 

431 # Disassociate them 

432 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run3")) 

433 with obscore.query() as result: 

434 rows = list(result) 

435 self.assertEqual(len(rows), 1) 

436 

437 # Non-associated dataset, should be OK and not throw. 

438 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run2")) 

439 with obscore.query() as result: 

440 rows = list(result) 

441 self.assertEqual(len(rows), 1) 

442 

443 registry.disassociate("tagged", (ref for ref in refs if ref.run == "run1")) 

444 with obscore.query() as result: 

445 rows = list(result) 

446 self.assertEqual(len(rows), 0) 

447 

448 def test_region_type_warning(self) -> None: 

449 """Test that non-polygon region generates one or more warnings.""" 

450 collections = None 

451 registry = self.make_registry(collections) 

452 

453 with self.assertWarns(RegionTypeWarning) as cm: 

454 self._insert_dataset(registry, "run2", "calexp", detector=2, visit=9) 

455 self.assertRegex( 

456 str(cm.warning), 

457 "Unexpected region type: .*lsst.sphgeom._sphgeom.Box.*", 

458 ) 

459 

460 def test_update_exposure_region(self) -> None: 

461 """Test for update_exposure_regions method.""" 

462 registry = self.make_registry(["run1"]) 

463 obscore = registry.obsCoreTableManager 

464 assert obscore is not None 

465 

466 # Exposure 4 is not associated with any visit. 

467 for detector in (1, 2, 3, 4): 

468 self._insert_dataset(registry, "run1", "raw", detector=detector, exposure=4) 

469 

470 # All spatial columns should be None. 

471 with obscore.query() as result: 

472 rows = list(result) 

473 self.assertEqual(len(rows), 4) 

474 for row in rows: 

475 self.assertIsNone(row.s_ra) 

476 self.assertIsNone(row.s_dec) 

477 self.assertIsNone(row.s_region) 

478 

479 # Assign Region from visit 4. 

480 count = obscore.update_exposure_regions( 

481 "DummyCam", [(4, 1, self.regions[(4, 1)]), (4, 2, self.regions[(4, 2)])] 

482 ) 

483 self.assertEqual(count, 2) 

484 

485 with obscore.query(["s_ra", "s_dec", "s_region", "lsst_detector", "facility_name"]) as result: 

486 rows = list(result) 

487 self.assertEqual(len(rows), 4) 

488 for row in rows: 

489 if row.lsst_detector in (1, 2): 

490 self.assertIsNotNone(row.s_ra) 

491 self.assertIsNotNone(row.s_dec) 

492 self.assertIsNotNone(row.s_region) 

493 else: 

494 self.assertIsNone(row.s_ra) 

495 self.assertIsNone(row.s_dec) 

496 self.assertIsNone(row.s_region) 

497 self.assertEqual(row.facility_name, "derived_facility") 

498 

499 

500class SQLiteObsCoreTest(ObsCoreTests, unittest.TestCase): 

501 """Unit test for obscore with SQLite backend.""" 

502 

503 def setUp(self): 

504 self.root = makeTestTempDir(TESTDIR) 

505 

506 def tearDown(self): 

507 removeTestTempDir(self.root) 

508 

509 def make_registry_config( 

510 self, collections: list[str] | None = None, collection_type: str | None = None 

511 ) -> RegistryConfig: 

512 # docstring inherited from a base class 

513 _, filename = tempfile.mkstemp(dir=self.root, suffix=".sqlite3") 

514 config = RegistryConfig() 

515 config["db"] = f"sqlite:///{filename}" 

516 config["managers", "obscore"] = { 

517 "cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager", 

518 "config": self.make_obscore_config(collections, collection_type), 

519 } 

520 return config 

521 

522 

523class ClonedSqliteObscoreTest(SQLiteObsCoreTest, unittest.TestCase): 

524 """Unit test for obscore manager created via clone()""" 

525 

526 def make_registry( 

527 self, collections: list[str] | None = None, collection_type: str | None = None 

528 ) -> SqlRegistry: 

529 """Create new empty Registry.""" 

530 original = super().make_registry(collections, collection_type) 

531 copy = original.copy() 

532 self.addCleanup(copy.close) 

533 return copy 

534 

535 

536class PostgresObsCoreTest(ObsCoreTests, unittest.TestCase): 

537 """Unit test for obscore with PostgreSQL backend.""" 

538 

539 @classmethod 

540 def setUpClass(cls): 

541 # Create the postgres test server. 

542 cls.postgresql = cls.enterClassContext(setup_postgres_test_db()) 

543 super().setUpClass() 

544 

545 def setUp(self): 

546 self.root = makeTestTempDir(TESTDIR) 

547 

548 def tearDown(self): 

549 removeTestTempDir(self.root) 

550 

551 def make_registry_config( 

552 self, collections: list[str] | None = None, collection_type: str | None = None 

553 ) -> RegistryConfig: 

554 # docstring inherited from a base class 

555 config = RegistryConfig() 

556 self.postgresql.patch_registry_config(config) 

557 

558 config["managers", "obscore"] = { 

559 "cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager", 

560 "config": self.make_obscore_config(collections, collection_type), 

561 } 

562 return config 

563 

564 

565class PostgresPgSphereObsCoreTest(PostgresObsCoreTest): 

566 """Unit test for obscore with PostgreSQL backend and pgsphere plugin.""" 

567 

568 @classmethod 

569 def setUpClass(cls): 

570 super().setUpClass() 

571 with cls.postgresql.begin() as connection: 

572 try: 

573 connection.execute(sqlalchemy.text("CREATE EXTENSION pg_sphere")) 

574 except sqlalchemy.exc.DatabaseError as exc: 

575 raise unittest.SkipTest(f"pg_sphere extension does not exist: {exc}") from None 

576 

577 def make_obscore_config( 

578 self, collections: list[str] | None = None, collection_type: str | None = None 

579 ) -> Config: 

580 """Make configuration for obscore manager.""" 

581 obscore_config = super().make_obscore_config(collections, collection_type) 

582 obscore_config["spatial_plugins"] = { 

583 "pgsphere": { 

584 "cls": "lsst.daf.butler.registry.obscore.pgsphere.PgSphereObsCorePlugin", 

585 "config": { 

586 "region_column": "pgs_region", 

587 "position_column": "pgs_center", 

588 }, 

589 } 

590 } 

591 return obscore_config 

592 

593 def test_spatial(self): 

594 """Test that pgsphere plugin fills spatial columns.""" 

595 collections = None 

596 registry = self.make_registry(collections) 

597 obscore = registry.obsCoreTableManager 

598 assert obscore is not None 

599 self._insert_datasets(registry) 

600 

601 # select everything 

602 with obscore.query() as result: 

603 rows = list(result) 

604 self.assertEqual(len(rows), 6) 

605 

606 db = registry._db 

607 assert registry.obsCoreTableManager is not None 

608 table = cast(ObsCoreLiveTableManager, registry.obsCoreTableManager).table 

609 

610 # It's not easy to generate spatial queries in sqlalchemy, use plain 

611 # text queries for testing. 

612 

613 # position matching visit=1, there is a single dataset 

614 query = f"SELECT * FROM {table.key} WHERE pgs_center <-> '(2d,0d)'::spoint < .1" 

615 with db.query(sqlalchemy.text(query)) as results: 

616 self.assertEqual(len(list(results)), 1) 

617 

618 # position matching visit=4, there are two datasets 

619 query = f"SELECT * FROM {table.key} WHERE pgs_center <-> '(272d,0d)'::spoint < .1" 

620 with db.query(sqlalchemy.text(query)) as results: 

621 self.assertEqual(len(list(results)), 2) 

622 

623 # position matching visit=1, there is a single dataset 

624 query = f"SELECT * FROM {table.key} WHERE '(2d,-3d)'::spoint @ pgs_region" 

625 with db.query(sqlalchemy.text(query)) as results: 

626 self.assertEqual(len(list(results)), 1) 

627 

628 # position matching visit=4, there are two datasets 

629 query = f"SELECT * FROM {table.key} WHERE '(272d,3d)'::spoint @ pgs_region" 

630 with db.query(sqlalchemy.text(query)) as results: 

631 self.assertEqual(len(list(results)), 2) 

632 

633 

634class TestMissingObscoreConfig(unittest.TestCase): 

635 """Test case for making butler instance with obscore manager but missing 

636 configuration. 

637 """ 

638 

639 root: str 

640 

641 def setUp(self): 

642 self.root = makeTestTempDir(TESTDIR) 

643 

644 def tearDown(self): 

645 removeTestTempDir(self.root) 

646 

647 def make_butler_config(self, obscore_config: str | dict[str, str | dict]) -> Config: 

648 registry_config = RegistryConfig() 

649 registry_config["db"] = f"sqlite:///{self.root}/butler.sqlite3" 

650 registry_config["managers", "obscore"] = obscore_config 

651 return Config({"registry": registry_config}) 

652 

653 def test_missing_config_one(self) -> None: 

654 """Test setup when obscore key defines manager class.""" 

655 config = self.make_butler_config("lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager") 

656 

657 with self.assertWarnsRegex(UserWarning, "configuration is missing"): 

658 Butler.makeRepo(self.root, config) 

659 

660 # Now instanciate Butler from the same repo. 

661 Butler.from_config(self.root) 

662 

663 def test_missing_config_two(self) -> None: 

664 """Test setup when obscore key defines manager class and empty 

665 config. 

666 """ 

667 config = self.make_butler_config( 

668 {"cls": "lsst.daf.butler.registry.obscore.ObsCoreLiveTableManager", "config": {}} 

669 ) 

670 

671 with self.assertWarnsRegex(UserWarning, "configuration is missing"): 

672 Butler.makeRepo(self.root, config) 

673 

674 # Now instanciate Butler from the same repo. 

675 Butler.from_config(self.root) 

676 

677 

678if __name__ == "__main__": 

679 unittest.main()