Coverage for tests / test_db_auth.py: 10%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:31 +0000

1# This file is part of utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import json 

23import os 

24import unittest 

25import unittest.mock 

26 

27import yaml 

28 

29from lsst.utils.db_auth import DbAuth, DbAuthError, DbAuthNotFoundError 

30 

31TESTDIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "config", "dbAuth") 

32 

33 

34class DbAuthTestCase(unittest.TestCase): 

35 """Test DbAuth class.""" 

36 

37 def test_patterns(self): 

38 """Test various patterns against a fixed connection URL.""" 

39 for matchPattern in [ 

40 "postgresql://*", 

41 "postgresql://*.example.com", 

42 "postgresql://*.example.com/my_*", 

43 "postgresql://host.example.com/my_database", 

44 "postgresql://host.example.com:5432/my_database", 

45 "postgresql://user@host.example.com/my_database", 

46 ]: 

47 auth = DbAuth(authList=[{"url": matchPattern, "username": "foo", "password": "bar"}]) 

48 user, pwd = auth.getAuth("postgresql", "user", "host.example.com", "5432", "my_database") 

49 self.assertEqual(user, "user") 

50 self.assertEqual(pwd, "bar") 

51 

52 def test_connStrings(self): 

53 """Test various connection URLs against a fixed pattern.""" 

54 auth = DbAuth( 

55 authList=[ 

56 {"url": "postgresql://host.example.com/my_database", "username": "foo", "password": "bar"} 

57 ] 

58 ) 

59 for connComponents in [ 

60 ("postgresql", "user", "host.example.com", 5432, "my_database"), 

61 ("postgresql", "user", "host.example.com", None, "my_database"), 

62 ]: 

63 user, pwd = auth.getAuth(*connComponents) 

64 self.assertEqual(user, "user") 

65 self.assertEqual(pwd, "bar") 

66 for connComponents in [ 

67 ("postgresql", None, "host.example.com", None, "my_database"), 

68 ("postgresql", None, "host.example.com", "5432", "my_database"), 

69 ]: 

70 user, pwd = auth.getAuth(*connComponents) 

71 self.assertEqual(user, "foo") 

72 self.assertEqual(pwd, "bar") 

73 

74 def test_load(self): 

75 """Test loading from a YAML file and matching in order.""" 

76 filePath = os.path.join(TESTDIR, "db-auth.yaml") 

77 os.chmod(filePath, 0o600) 

78 auth = DbAuth(filePath) 

79 self.assertEqual(auth.db_auth_path, filePath) 

80 self.assertEqual( 

81 auth.getAuth("postgresql", "user", "host.example.com", "5432", "my_database"), ("user", "test1") 

82 ) 

83 self.assertEqual( 

84 auth.getAuth("postgresql", "user", "host.example.com", "3360", "my_database"), ("user", "test2") 

85 ) 

86 self.assertEqual( 

87 auth.getAuth("postgresql", "", "host.example.com", "5432", "my_database"), ("user", "test3") 

88 ) 

89 self.assertEqual( 

90 auth.getAuth("postgresql", None, "host.example.com", "", "my_database"), ("user", "test4") 

91 ) 

92 self.assertEqual( 

93 auth.getAuth("postgresql", "", "host2.example.com", None, "my_other"), ("user", "test5") 

94 ) 

95 self.assertEqual( 

96 auth.getAuth("postgresql", None, "host3.example.com", 3360, "some_database"), ("user", "test6") 

97 ) 

98 self.assertEqual( 

99 auth.getAuth("postgresql", "user", "host4.other.com", None, "other_database"), ("user", "test8") 

100 ) 

101 

102 def test_envvar_path(self): 

103 """Test loading from envvar path location.""" 

104 filePath = os.path.join(TESTDIR, "db-auth.yaml") 

105 os.chmod(filePath, 0o600) 

106 with unittest.mock.patch.dict(os.environ, {"LSST_DB_AUTH": filePath}): 

107 auth = DbAuth() 

108 self.assertEqual( 

109 auth.getAuth("postgresql", "user", "host4.other.com", None, "other_database"), 

110 ("user", "test8"), 

111 ) 

112 auth = DbAuth(envVar=None) # Should fallback to default 

113 self.assertEqual( 

114 auth.getAuth("postgresql", "user", "host4.other.com", None, "other_database"), 

115 ("user", "test8"), 

116 ) 

117 

118 def test_json_envvar(self): 

119 """Test loading from JSON string in environment variable.""" 

120 filepath = os.path.join(TESTDIR, "db-auth.yaml") 

121 with open(filepath) as fd: 

122 auth_list = yaml.safe_load(fd) 

123 json_content = json.dumps(auth_list) 

124 

125 with unittest.mock.patch.dict(os.environ, {"LSST_DB_AUTH_CREDENTIALS": json_content}): 

126 auth = DbAuth() 

127 self.assertEqual( 

128 auth.getAuth("postgresql", "user", "host.example.com", "5432", "my_database"), 

129 ("user", "test1"), 

130 ) 

131 self.assertEqual( 

132 auth.getAuth("postgresql", "user", "host.example.com", "3360", "my_database"), 

133 ("user", "test2"), 

134 ) 

135 

136 with unittest.mock.patch.dict(os.environ, {"LSST_DB_AUTH_CREDENTIALS": json_content}): 

137 auth = DbAuth(credsEnvVar=None) 

138 self.assertEqual( 

139 auth.getAuth("postgresql", "user", "host.example.com", "5432", "my_database"), 

140 ("user", "test1"), 

141 ) 

142 

143 with unittest.mock.patch.dict(os.environ, {"TEST_CREDS": json_content}): 

144 auth = DbAuth(credsEnvVar="TEST_CREDS") 

145 self.assertEqual( 

146 auth.getAuth("postgresql", "user", "host.example.com", "5432", "my_database"), 

147 ("user", "test1"), 

148 ) 

149 self.assertEqual( 

150 auth.getAuth("postgresql", "user", "host.example.com", "3360", "my_database"), 

151 ("user", "test2"), 

152 ) 

153 

154 with unittest.mock.patch.dict(os.environ, {"TEST_CREDS": "Bad ' : JSON"}): 

155 with self.assertRaises(DbAuthError): 

156 DbAuth(credsEnvVar="TEST_CREDS") 

157 

158 def test_ipv6(self): 

159 """Test IPv6 addresses as host names.""" 

160 auth = DbAuth( 

161 authList=[ 

162 {"url": "postgresql://user@[fe80::1ff:fe23:4567:890a]:5432/db", "password": "pwd"}, 

163 {"url": "postgresql://[fe80::1ff:fe23:4567:890a]:5432/db", "password": "pwd2"}, 

164 {"url": "postgresql://user3@[fe80::1ff:fe23:4567:890a]/db", "password": "pwd3"}, 

165 {"url": "postgresql://[fe80::1ff:fe23:4567:890a]/db", "password": "pwd4"}, 

166 ] 

167 ) 

168 self.assertEqual( 

169 auth.getAuth("postgresql", "user", "fe80::1ff:fe23:4567:890a", "5432", "db"), ("user", "pwd") 

170 ) 

171 self.assertEqual( 

172 auth.getAuth("postgresql", "user2", "fe80::1ff:fe23:4567:890a", "5432", "db"), ("user2", "pwd2") 

173 ) 

174 self.assertEqual( 

175 auth.getAuth("postgresql", "user3", "fe80::1ff:fe23:4567:890a", "3360", "db"), ("user3", "pwd3") 

176 ) 

177 self.assertEqual( 

178 auth.getAuth("postgresql", "user4", "fe80::1ff:fe23:4567:890a", "3360", "db"), ("user4", "pwd4") 

179 ) 

180 

181 def test_search(self): 

182 """Test ordered searching.""" 

183 auth = DbAuth( 

184 authList=[ 

185 {"url": "postgresql://user1@host2.example.com:3360/database3", "password": "first"}, 

186 { 

187 "url": "postgresql://host2.example.com:3360/database3", 

188 "username": "second_u", 

189 "password": "second", 

190 }, 

191 { 

192 "url": "postgresql://host2.example.com:5432/database3", 

193 "username": "wrong", 

194 "password": "port", 

195 }, 

196 {"url": "postgresql://host3.example.com/", "username": "third_u", "password": "third"}, 

197 { 

198 "url": "oracle://oracle.example.com/other_database", 

199 "username": "scott", 

200 "password": "tiger", 

201 }, 

202 {"url": "postgresql://*.example.com/database3", "username": "fourth_u", "password": "fourth"}, 

203 {"url": "postgresql://*.example.com", "username": "fifth_u", "password": "fifth"}, 

204 ] 

205 ) 

206 self.assertEqual( 

207 auth.getAuth("postgresql", "user1", "host2.example.com", "3360", "database3"), ("user1", "first") 

208 ) 

209 self.assertEqual( 

210 auth.getAuth("postgresql", "user2", "host2.example.com", "3360", "database3"), ("user2", "second") 

211 ) 

212 self.assertEqual( 

213 auth.getAuth("postgresql", "", "host2.example.com", "3360", "database3"), ("second_u", "second") 

214 ) 

215 self.assertEqual( 

216 auth.getAuth("postgresql", None, "host2.example.com", "3360", "database3"), ("second_u", "second") 

217 ) 

218 self.assertEqual( 

219 auth.getAuth("postgresql", None, "host3.example.com", "3360", "database3"), ("third_u", "third") 

220 ) 

221 self.assertEqual( 

222 auth.getAuth("postgresql", None, "host3.example.com", "3360", "database4"), ("third_u", "third") 

223 ) 

224 self.assertEqual( 

225 auth.getAuth("postgresql", None, "host4.example.com", "3360", "database3"), ("fourth_u", "fourth") 

226 ) 

227 self.assertEqual( 

228 auth.getAuth("postgresql", None, "host4.example.com", "3360", "database4"), ("fifth_u", "fifth") 

229 ) 

230 

231 def test_errors(self): 

232 """Test for error exceptions.""" 

233 with self.assertRaisesRegex(DbAuthError, r"^No DbAuth configuration file: .*this_does_not_exist$"): 

234 auth = DbAuth(os.path.join(TESTDIR, "this_does_not_exist")) 

235 with self.assertRaisesRegex(DbAuthError, r"^No DbAuth configuration file: .*this_does_not_exist$"): 

236 auth = DbAuth(os.path.join(TESTDIR, "this_does_not_exist"), envVar="LSST_NONEXISTENT") 

237 with self.assertRaisesRegex( 

238 DbAuthError, 

239 r"^DbAuth configuration file .*badDbAuth1.yaml has incorrect " r"permissions: \d*644$", 

240 ): 

241 filePath = os.path.join(TESTDIR, "badDbAuth1.yaml") 

242 os.chmod(filePath, 0o644) 

243 auth = DbAuth(filePath) 

244 with self.assertRaisesRegex( 

245 DbAuthError, r"^Unable to load DbAuth configuration file: " r".*badDbAuth2.yaml" 

246 ): 

247 filePath = os.path.join(TESTDIR, "badDbAuth2.yaml") 

248 os.chmod(filePath, 0o600) 

249 with unittest.mock.patch.dict(os.environ, {"LSST_DB_AUTH_TEST": filePath}): 

250 auth = DbAuth(envVar="LSST_DB_AUTH_TEST") 

251 

252 auth = DbAuth(authList=[]) 

253 with self.assertRaisesRegex(DbAuthError, r"^Missing dialectname parameter$"): 

254 auth.getAuth(None, None, None, None, None) 

255 with self.assertRaisesRegex(DbAuthError, r"^Missing dialectname parameter$"): 

256 auth.getAuth("", None, None, None, None) 

257 with self.assertRaisesRegex(DbAuthError, r"^Missing host parameter$"): 

258 auth.getAuth("postgresql", None, None, None, None) 

259 with self.assertRaisesRegex(DbAuthError, r"^Missing host parameter$"): 

260 auth.getAuth("postgresql", None, "", None, None) 

261 with self.assertRaisesRegex(DbAuthError, r"^Missing database parameter$"): 

262 auth.getAuth("postgresql", None, "example.com", None, None) 

263 with self.assertRaisesRegex(DbAuthError, r"^Missing database parameter$"): 

264 auth.getAuth("postgresql", None, "example.com", None, "") 

265 with self.assertRaisesRegex( 

266 DbAuthError, 

267 r"^No matching DbAuth configuration for: " r"\(postgresql, None, example\.com, None, foo\)$", 

268 ): 

269 auth.getAuth("postgresql", None, "example.com", None, "foo") 

270 

271 auth = DbAuth(authList=[{"password": "testing"}]) 

272 with self.assertRaisesRegex(DbAuthError, r"^Missing URL in DbAuth configuration$"): 

273 auth.getAuth("postgresql", None, "example.com", None, "foo") 

274 

275 auth = DbAuth(authList=[{"url": "testing", "password": "testing"}]) 

276 with self.assertRaisesRegex(DbAuthError, r"^Missing database dialect in URL: testing$"): 

277 auth.getAuth("postgresql", None, "example.com", None, "foo") 

278 

279 auth = DbAuth(authList=[{"url": "postgresql:///foo", "password": "testing"}]) 

280 with self.assertRaisesRegex(DbAuthError, r"^Missing host in URL: postgresql:///foo$"): 

281 auth.getAuth("postgresql", None, "example.com", None, "foo") 

282 

283 with unittest.mock.patch("lsst.utils.db_auth._DEFAULT_PATH", new="/tmp/non-existant-file.yaml"): 

284 with self.assertRaises(DbAuthNotFoundError): 

285 # The file we have made the default should never exist. 

286 DbAuth(None) 

287 

288 def test_getUrl(self): 

289 """Repeat relevant tests using getUrl interface.""" 

290 for matchPattern in [ 

291 "postgresql://*", 

292 "postgresql://*.example.com", 

293 "postgresql://*.example.com/my_*", 

294 "postgresql://host.example.com/my_database", 

295 "postgresql://host.example.com:5432/my_database", 

296 "postgresql://user@host.example.com/my_database", 

297 ]: 

298 auth = DbAuth(authList=[{"url": matchPattern, "username": "foo", "password": "bar"}]) 

299 self.assertEqual( 

300 auth.getUrl("postgresql://user@host.example.com:5432/my_database"), 

301 "postgresql://user:bar@host.example.com:5432/my_database", 

302 ) 

303 

304 auth = DbAuth( 

305 authList=[ 

306 {"url": "postgresql://host.example.com/my_database", "username": "foo", "password": "bar"} 

307 ] 

308 ) 

309 self.assertEqual( 

310 auth.getUrl("postgresql://user@host.example.com:5432/my_database"), 

311 "postgresql://user:bar@host.example.com:5432/my_database", 

312 ) 

313 self.assertEqual( 

314 auth.getUrl("postgresql://user@host.example.com/my_database"), 

315 "postgresql://user:bar@host.example.com/my_database", 

316 ) 

317 self.assertEqual( 

318 auth.getUrl("postgresql://host.example.com/my_database"), 

319 "postgresql://foo:bar@host.example.com/my_database", 

320 ) 

321 self.assertEqual( 

322 auth.getUrl("postgresql://host.example.com:5432/my_database"), 

323 "postgresql://foo:bar@host.example.com:5432/my_database", 

324 ) 

325 

326 filePath = os.path.join(TESTDIR, "db-auth.yaml") 

327 os.chmod(filePath, 0o600) 

328 auth = DbAuth(filePath) 

329 self.assertEqual( 

330 auth.getUrl("postgresql://user@host.example.com:5432/my_database"), 

331 "postgresql://user:test1@host.example.com:5432/my_database", 

332 ) 

333 self.assertEqual( 

334 auth.getUrl("postgresql://user@host.example.com:3360/my_database"), 

335 "postgresql://user:test2@host.example.com:3360/my_database", 

336 ) 

337 self.assertEqual( 

338 auth.getUrl("postgresql://host.example.com:5432/my_database"), 

339 "postgresql://user:test3@host.example.com:5432/my_database", 

340 ) 

341 self.assertEqual( 

342 auth.getUrl("postgresql://host.example.com/my_database"), 

343 "postgresql://user:test4@host.example.com/my_database", 

344 ) 

345 self.assertEqual( 

346 auth.getUrl("postgresql://host2.example.com/my_other"), 

347 "postgresql://user:test5@host2.example.com/my_other", 

348 ) 

349 self.assertEqual( 

350 auth.getUrl("postgresql://host3.example.com:3360/some_database"), 

351 "postgresql://user:test6@host3.example.com:3360/some_database", 

352 ) 

353 self.assertEqual( 

354 auth.getUrl("postgresql://user@host4.other.com/other_database"), 

355 "postgresql://user:test8@host4.other.com/other_database", 

356 ) 

357 

358 auth = DbAuth( 

359 authList=[ 

360 {"url": "postgresql://user@[fe80::1ff:fe23:4567:890a]:5432/db", "password": "pwd"}, 

361 {"url": "postgresql://[fe80::1ff:fe23:4567:890a]:5432/db", "password": "pwd2"}, 

362 {"url": "postgresql://user3@[fe80::1ff:fe23:4567:890a]/db", "password": "pwd3"}, 

363 {"url": "postgresql://[fe80::1ff:fe23:4567:890a]/db", "password": "pwd4"}, 

364 ] 

365 ) 

366 self.assertEqual( 

367 auth.getUrl("postgresql://user@[fe80::1ff:fe23:4567:890a]:5432/db"), 

368 "postgresql://user:pwd@[fe80::1ff:fe23:4567:890a]:5432/db", 

369 ) 

370 self.assertEqual( 

371 auth.getUrl("postgresql://user2@[fe80::1ff:fe23:4567:890a]:5432/db"), 

372 "postgresql://user2:pwd2@[fe80::1ff:fe23:4567:890a]:5432/db", 

373 ) 

374 self.assertEqual( 

375 auth.getUrl("postgresql://user3@[fe80::1ff:fe23:4567:890a]:3360/db"), 

376 "postgresql://user3:pwd3@[fe80::1ff:fe23:4567:890a]:3360/db", 

377 ) 

378 self.assertEqual( 

379 auth.getUrl("postgresql://user4@[fe80::1ff:fe23:4567:890a]:3360/db"), 

380 "postgresql://user4:pwd4@[fe80::1ff:fe23:4567:890a]:3360/db", 

381 ) 

382 

383 auth = DbAuth( 

384 authList=[ 

385 {"url": "postgresql://user1@host2.example.com:3360/database3", "password": "first"}, 

386 { 

387 "url": "postgresql://host2.example.com:3360/database3", 

388 "username": "second_u", 

389 "password": "second", 

390 }, 

391 { 

392 "url": "postgresql://host2.example.com:5432/database3", 

393 "username": "wrong", 

394 "password": "port", 

395 }, 

396 {"url": "postgresql://host3.example.com/", "username": "third_u", "password": "third"}, 

397 { 

398 "url": "oracle://oracle.example.com/other_database", 

399 "username": "scott", 

400 "password": "tiger", 

401 }, 

402 {"url": "postgresql://*.example.com/database3", "username": "fourth_u", "password": "fourth"}, 

403 {"url": "postgresql://*.example.com", "username": "fifth_u", "password": "fifth"}, 

404 ] 

405 ) 

406 self.assertEqual( 

407 auth.getUrl("postgresql://user1@host2.example.com:3360/database3"), 

408 "postgresql://user1:first@host2.example.com:3360/database3", 

409 ) 

410 self.assertEqual( 

411 auth.getUrl("postgresql://user2@host2.example.com:3360/database3"), 

412 "postgresql://user2:second@host2.example.com:3360/database3", 

413 ) 

414 self.assertEqual( 

415 auth.getUrl("postgresql://host2.example.com:3360/database3"), 

416 "postgresql://second_u:second@host2.example.com:3360/database3", 

417 ) 

418 self.assertEqual( 

419 auth.getUrl("postgresql://host3.example.com:3360/database3"), 

420 "postgresql://third_u:third@host3.example.com:3360/database3", 

421 ) 

422 self.assertEqual( 

423 auth.getUrl("postgresql://host3.example.com:3360/database4"), 

424 "postgresql://third_u:third@host3.example.com:3360/database4", 

425 ) 

426 self.assertEqual( 

427 auth.getUrl("postgresql://host4.example.com:3360/database3"), 

428 "postgresql://fourth_u:fourth@host4.example.com:3360/database3", 

429 ) 

430 self.assertEqual( 

431 auth.getUrl("postgresql://host4.example.com:3360/database4"), 

432 "postgresql://fifth_u:fifth@host4.example.com:3360/database4", 

433 ) 

434 

435 auth = DbAuth(authList=[]) 

436 with self.assertRaisesRegex(DbAuthError, r"^Missing dialectname parameter$"): 

437 auth.getUrl("/") 

438 with self.assertRaisesRegex(DbAuthError, r"^Missing host parameter$"): 

439 auth.getUrl("postgresql:///") 

440 with self.assertRaisesRegex(DbAuthError, r"^Missing database parameter$"): 

441 auth.getUrl("postgresql://example.com") 

442 with self.assertRaisesRegex(DbAuthError, r"^Missing database parameter$"): 

443 auth.getUrl("postgresql://example.com/") 

444 with self.assertRaisesRegex( 

445 DbAuthError, 

446 r"^No matching DbAuth configuration for: " r"\(postgresql, None, example\.com, None, foo\)$", 

447 ): 

448 auth.getUrl("postgresql://example.com/foo") 

449 

450 def test_urlEncoding(self): 

451 """Test URL encoding of username and password.""" 

452 auth = DbAuth( 

453 authList=[ 

454 { 

455 "url": "postgresql://host.example.com/my_database", 

456 "username": "foo:étude", 

457 "password": "bar,.@%&/*[]", 

458 } 

459 ] 

460 ) 

461 self.assertEqual( 

462 auth.getUrl("postgresql://host.example.com:5432/my_database"), 

463 "postgresql://foo%3A%C3%A9tude:bar%2C.%40%25%26%2F%2A%5B%5D@host.example.com:5432/my_database", 

464 ) 

465 

466 

467if __name__ == "__main__": 

468 unittest.main()