Coverage for tests / test_apdbSql.py: 47%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:43 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Unit test for Apdb class.""" 

23 

24import gc 

25import os 

26import shutil 

27import tempfile 

28import unittest 

29from typing import Any 

30from unittest.mock import patch 

31 

32import sqlalchemy 

33 

34import lsst.utils.tests 

35from lsst.dax.apdb import ( 

36 Apdb, 

37 ApdbConfig, 

38 ApdbReplica, 

39 ApdbTables, 

40 ApdbUpdateRecord, 

41 IncompatibleVersionError, 

42 ReplicaChunk, 

43) 

44from lsst.dax.apdb.pixelization import Pixelization 

45from lsst.dax.apdb.sql import ApdbSql, ApdbSqlConfig 

46from lsst.dax.apdb.tests import ApdbSchemaUpdateTest, ApdbTest 

47 

48try: 

49 import testing.postgresql 

50except ImportError: 

51 testing = None 

52 

53TEST_SCHEMA = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-apdb.yaml") 

54TEST_SCHEMA_SSO = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-sso.yaml") 

55# Schema that uses `datetime` for timestamps and combines APDB and SSP. 

56TEST_SCHEMA_DT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config/schema-datetime.yaml") 

57 

58 

59class ApdbSQLTest(ApdbTest): 

60 """A common base class for SQL APDB tests.""" 

61 

62 dia_object_index: str 

63 

64 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

65 """Create database and return its config.""" 

66 kw = { 

67 "schema_file": TEST_SCHEMA, 

68 "ss_schema_file": TEST_SCHEMA_SSO, 

69 "dia_object_index": self.dia_object_index, 

70 "enable_replica": self.enable_replica, 

71 } 

72 kw.update(kwargs) 

73 return ApdbSql.init_database(**kw) # type: ignore[arg-type] 

74 

75 def getDiaObjects_table(self) -> ApdbTables: 

76 """Return type of table returned from getDiaObjects method.""" 

77 return ApdbTables.DiaObject 

78 

79 def pixelization(self, config: ApdbConfig) -> Pixelization: 

80 """Return pixelization used by implementation.""" 

81 assert isinstance(config, ApdbSqlConfig), "Only expect ApdbSqlConfig here" 

82 return Pixelization("htm", config.pixelization.htm_level, config.pixelization.htm_max_ranges) 

83 

84 def test_connection_timeout(self) -> None: 

85 """Test that setting connection timeout does not break things.""" 

86 config = self.make_instance() 

87 assert isinstance(config, ApdbSqlConfig), "Only expect ApdbSqlConfig here" 

88 config.connection_config.connection_timeout = 60.0 

89 Apdb.from_config(config) 

90 

91 def store_update_records(self, apdb: Apdb, records: list[ApdbUpdateRecord], chunk: ReplicaChunk) -> None: 

92 # Docstring inherited. 

93 assert isinstance(apdb, ApdbSql), "Expecting ApdbSql instance" 

94 apdb._storeUpdateRecords(records, chunk, store_chunk=True) 

95 

96 def _count_after_reset_dedup(self, count_before: int) -> int: 

97 return count_before 

98 

99 

100class ApdbSQLiteTestCase(ApdbSQLTest, unittest.TestCase): 

101 """A test case for ApdbSql class using SQLite backend.""" 

102 

103 fsrc_requires_id_list = True 

104 dia_object_index = "baseline" 

105 use_mjd = True 

106 

107 def setUp(self) -> None: 

108 self.tempdir = tempfile.mkdtemp() 

109 self.db_url = f"sqlite:///{self.tempdir}/apdb.sqlite3" 

110 

111 def tearDown(self) -> None: 

112 shutil.rmtree(self.tempdir, ignore_errors=True) 

113 

114 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

115 return super().make_instance(db_url=self.db_url, **kwargs) 

116 

117 

118class ApdbSQLiteDatetimeTestCase(ApdbSQLTest, unittest.TestCase): 

119 """A test case for ApdbSql class using SQLite backend and schema with 

120 timestamps in TAI MJD. 

121 """ 

122 

123 fsrc_requires_id_list = True 

124 dia_object_index = "baseline" 

125 use_mjd = False 

126 

127 def setUp(self) -> None: 

128 self.tempdir = tempfile.mkdtemp() 

129 self.db_url = f"sqlite:///{self.tempdir}/apdb.sqlite3" 

130 

131 def tearDown(self) -> None: 

132 shutil.rmtree(self.tempdir, ignore_errors=True) 

133 

134 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

135 if "schema_file" in kwargs: 

136 return super().make_instance(db_url=self.db_url, **kwargs) 

137 else: 

138 return super().make_instance(db_url=self.db_url, schema_file=TEST_SCHEMA_DT, **kwargs) 

139 

140 

141class ApdbSQLiteTestCaseLastObject(ApdbSQLiteTestCase): 

142 """A test case for ApdbSql class using SQLite backend and DiaObjectLast 

143 table. 

144 """ 

145 

146 dia_object_index = "last_object_table" 

147 

148 def getDiaObjects_table(self) -> ApdbTables: 

149 """Return type of table returned from getDiaObjects method.""" 

150 return ApdbTables.DiaObjectLast 

151 

152 

153class ApdbSQLiteTestCasePixIdIovIndex(ApdbSQLiteTestCase): 

154 """A test case for ApdbSql class using SQLite backend with pix_id_iov 

155 indexing. 

156 """ 

157 

158 dia_object_index = "pix_id_iov" 

159 

160 

161class ApdbSQLiteTestCaseReplica(ApdbSQLiteTestCase): 

162 """Test case for ApdbSql class using SQLite backend with replica tables.""" 

163 

164 enable_replica = True 

165 meta_row_count = 4 

166 

167 

168@unittest.skipUnless(testing is not None, "testing.postgresql module not found") 

169class ApdbPostgresTestCase(ApdbSQLTest, unittest.TestCase): 

170 """A test case for ApdbSql class using Postgres backend.""" 

171 

172 fsrc_requires_id_list = True 

173 dia_object_index = "last_object_table" 

174 postgresql: Any 

175 enable_replica = True 

176 meta_row_count = 4 

177 timestamp_type_name = "datetime64[ns]" 

178 

179 @classmethod 

180 def setUpClass(cls) -> None: 

181 # Create the postgres test server. 

182 cls.postgresql = testing.postgresql.PostgresqlFactory(cache_initialized_db=True) 

183 super().setUpClass() 

184 

185 @classmethod 

186 def tearDownClass(cls) -> None: 

187 # Clean up any lingering SQLAlchemy engines/connections 

188 # so they're closed before we shut down the server. 

189 gc.collect() 

190 cls.postgresql.clear_cache() 

191 super().tearDownClass() 

192 

193 def setUp(self) -> None: 

194 self.server = self.postgresql() 

195 

196 def tearDown(self) -> None: 

197 self.server = self.postgresql() 

198 

199 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

200 return super().make_instance(db_url=self.server.url(), **kwargs) 

201 

202 def getDiaObjects_table(self) -> ApdbTables: 

203 """Return type of table returned from getDiaObjects method.""" 

204 return ApdbTables.DiaObjectLast 

205 

206 

207@unittest.skipUnless(testing is not None, "testing.postgresql module not found") 

208class ApdbPostgresNamespaceTestCase(ApdbPostgresTestCase): 

209 """A test case for ApdbSql class using Postgres backend with schema name""" 

210 

211 # use mixed case to trigger quoting 

212 namespace = "ApdbSchema" 

213 

214 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

215 """Make config class instance used in all tests.""" 

216 return super().make_instance(namespace=self.namespace, **kwargs) 

217 

218 

219class ApdbSchemaUpdateSQLiteTestCase(ApdbSchemaUpdateTest, unittest.TestCase): 

220 """A test case for schema updates using SQLite backend.""" 

221 

222 def setUp(self) -> None: 

223 self.tempdir = tempfile.mkdtemp() 

224 self.db_url = f"sqlite:///{self.tempdir}/apdb.sqlite3" 

225 

226 def tearDown(self) -> None: 

227 shutil.rmtree(self.tempdir, ignore_errors=True) 

228 

229 def make_instance(self, **kwargs: Any) -> ApdbConfig: 

230 """Make config class instance used in all tests.""" 

231 kw = { 

232 "db_url": self.db_url, 

233 "schema_file": TEST_SCHEMA, 

234 "ss_schema_file": TEST_SCHEMA_SSO, 

235 } 

236 kw.update(kwargs) 

237 return ApdbSql.init_database(**kw) # type: ignore[arg-type] 

238 

239 

240class ApdbSQLiteFromUriTestCase(unittest.TestCase): 

241 """A test case for instantiating ApdbSql via URI.""" 

242 

243 def setUp(self) -> None: 

244 self.tempdir = tempfile.mkdtemp() 

245 self.addCleanup(shutil.rmtree, self.tempdir, ignore_errors=True) 

246 self.db_url = f"sqlite:///{self.tempdir}/apdb.sqlite3" 

247 config = ApdbSql.init_database( 

248 db_url=self.db_url, schema_file=TEST_SCHEMA, ss_schema_file=TEST_SCHEMA_SSO 

249 ) 

250 # TODO: This will need update when we switch to pydantic configs. 

251 self.config_path = os.path.join(self.tempdir, "apdb-config.yaml") 

252 config.save(self.config_path) 

253 self.bad_config_path = os.path.join(self.tempdir, "not-config.yaml") 

254 self.index_path = os.path.join(self.tempdir, "apdb-index.yaml") 

255 with open(self.index_path, "w") as index_file: 

256 print(f'label1: "{self.config_path}"', file=index_file) 

257 print(f'"label2/pex_config": "{self.config_path}"', file=index_file) 

258 print(f'bad-label: "{self.bad_config_path}"', file=index_file) 

259 # File with incorrect format. 

260 self.bad_index_path = os.path.join(self.tempdir, "apdb-index-bad.yaml") 

261 with open(self.bad_index_path, "w") as index_file: 

262 print(f'label1: ["{self.config_path}"]', file=index_file) 

263 self.missing_index_path = os.path.join(self.tempdir, "no-apdb-index.yaml") 

264 

265 def test_make_apdb_from_path(self) -> None: 

266 """Check that we can make APDB instance from config URI.""" 

267 Apdb.from_uri(self.config_path) 

268 with self.assertRaises(FileNotFoundError): 

269 Apdb.from_uri(self.bad_config_path) 

270 

271 def test_make_apdb_from_labels(self) -> None: 

272 """Check that we can make APDB instance from config URI.""" 

273 # Replace DAX_APDB_INDEX_URI value 

274 new_env = {"DAX_APDB_INDEX_URI": self.index_path} 

275 with patch.dict(os.environ, new_env, clear=True): 

276 Apdb.from_uri("label:label1") 

277 Apdb.from_uri("label:label2") 

278 # Label does not exist. 

279 with self.assertRaises(ValueError): 

280 Apdb.from_uri("label:not-a-label") 

281 # Label exists but points to a missing config. 

282 with self.assertRaises(FileNotFoundError): 

283 Apdb.from_uri("label:bad-label") 

284 

285 def test_make_apdb_bad_index(self) -> None: 

286 """Check what happens when DAX_APDB_INDEX_URI is broken.""" 

287 # envvar is set but empty. 

288 new_env = {"DAX_APDB_INDEX_URI": ""} 

289 with patch.dict(os.environ, new_env, clear=True): 

290 with self.assertRaises(RuntimeError): 

291 Apdb.from_uri("label:label") 

292 

293 # envvar is set to something non-existing. 

294 new_env = {"DAX_APDB_INDEX_URI": self.missing_index_path} 

295 with patch.dict(os.environ, new_env, clear=True): 

296 with self.assertRaises(FileNotFoundError): 

297 Apdb.from_uri("label:label") 

298 

299 # envvar points to an incorrect file. 

300 new_env = {"DAX_APDB_INDEX_URI": self.bad_index_path} 

301 with patch.dict(os.environ, new_env, clear=True): 

302 with self.assertRaises(TypeError): 

303 Apdb.from_uri("label:label") 

304 

305 def test_remove_database_file(self) -> None: 

306 """Check that SQLite does not try to recreate database after it was 

307 removed, but config persisted. 

308 """ 

309 os.unlink(f"{self.tempdir}/apdb.sqlite3") 

310 with self.assertRaisesRegex(sqlalchemy.exc.OperationalError, "unable to open database file"): 

311 Apdb.from_uri(self.config_path) 

312 

313 def test_make_apdb_replica(self) -> None: 

314 """Check that we can make ApdbReplica instance from config URI.""" 

315 ApdbReplica.from_uri(self.config_path) 

316 with self.assertRaises(FileNotFoundError): 

317 Apdb.from_uri(self.bad_config_path) 

318 

319 

320class ApdbSQLiteVersionCheck(unittest.TestCase): 

321 """A test case to verify that version check happens before reading 

322 frozen configuration. 

323 """ 

324 

325 def setUp(self) -> None: 

326 self.tempdir = tempfile.mkdtemp() 

327 self.addCleanup(shutil.rmtree, self.tempdir, ignore_errors=True) 

328 self.db_url = f"sqlite:///{self.tempdir}/apdb.sqlite3" 

329 self.config = ApdbSql.init_database( 

330 db_url=self.db_url, schema_file=TEST_SCHEMA, ss_schema_file=TEST_SCHEMA_SSO 

331 ) 

332 

333 def test_version_check(self) -> None: 

334 """Test that version check happens before reading config.""" 

335 apdb = Apdb.from_config(self.config) 

336 assert isinstance(apdb, ApdbSql) 

337 

338 # Store incompatible version. 

339 apdb.metadata.set(ApdbSql.metadataSchemaVersionKey, "99.0.0", force=True) 

340 

341 # Overwrite frozen config with something that will break. 

342 apdb.metadata.set(ApdbSql.metadataConfigKey, '{"not_a_config_key": 0}', force=True) 

343 

344 # Try again. 

345 with self.assertRaises(IncompatibleVersionError): 

346 Apdb.from_config(self.config) 

347 

348 

349class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

350 """Run file leak tests.""" 

351 

352 

353def setup_module(module: Any) -> None: 

354 """Configure pytest.""" 

355 lsst.utils.tests.init() 

356 

357 

358if __name__ == "__main__": 

359 lsst.utils.tests.init() 

360 unittest.main()