Coverage for python / lsst / utils / db_auth.py: 15%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
1# This file is part of utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24import fnmatch
25import json
26import os
27import stat
28import urllib.parse
30import yaml
32__all__ = ["DbAuth", "DbAuthError", "DbAuthPermissionsError"]
34_DEFAULT_PATH = "~/.lsst/db-auth.yaml"
35_DEFAULT_ENVVAR = "LSST_DB_AUTH"
36_DEFAULT_CREDS_ENVVAR = "LSST_DB_AUTH_CREDENTIALS"
39class DbAuthError(RuntimeError):
40 """Exception raised when a problem has occurred retrieving database
41 authentication information.
42 """
44 pass
47class DbAuthNotFoundError(DbAuthError):
48 """Credentials file does not exist or no match was found in it."""
51class DbAuthPermissionsError(DbAuthError):
52 """Credentials file has incorrect permissions."""
55class DbAuth:
56 """Retrieves authentication information for database connections.
58 The authorization configuration is taken from the ``authList`` parameter
59 or a (group- and world-inaccessible) YAML file located at a path specified
60 by the given environment variable or at a default path location.
62 Parameters
63 ----------
64 path : `str` or `None`, optional
65 Path to configuration file, default path is ``~/.lsst/db-auth.yaml``.
66 The default is used if `None`.
67 envVar : `str` or `None`, optional
68 Name of environment variable pointing to configuration file, default is
69 ``LSST_DB_AUTH``. This is used if the environment variable is available
70 even if the path is specified. The default is used if `None`.
71 authList : `list` [`dict`] or `None`, optional
72 Authentication configuration. Used directly if given without referring
73 to the environment.
74 credsEnvVar : `str` or `None`, optional
75 Name of environment variable containing the authentication
76 configuration in JSON format. Default is ``LSST_DB_AUTH_CREDENTIALS``.
77 If the environment variable is defined, this takes priority over
78 reading credentials from file. The default is used if `None`.
80 Notes
81 -----
82 Defaults will be used if no option is provided. If provided ``authList``
83 will be used directly. The JSON credentials environment variable will
84 be used if defined, in preference to reading from a file even if overrides
85 are given for ``path`` and ``envVar``.
86 """
88 def __init__(
89 self,
90 path: str | None = _DEFAULT_PATH,
91 envVar: str | None = _DEFAULT_ENVVAR,
92 authList: list[dict[str, str]] | None = None,
93 credsEnvVar: str | None = _DEFAULT_CREDS_ENVVAR,
94 ):
95 if authList is not None:
96 self._db_auth_path = "<auth-list>"
97 self.authList = authList
98 return
100 # Credentials in JSON takes priority.
101 if jsonCreds := os.getenv(credsEnvVar or _DEFAULT_CREDS_ENVVAR):
102 try:
103 self.authList = json.loads(jsonCreds)
104 except json.JSONDecodeError as exc:
105 raise DbAuthError(
106 f"Unable to load DbAuth configuration using JSON environment variable {credsEnvVar}"
107 ) from exc
108 self._db_auth_path = "<json-env-var>"
109 return
111 secretPath = os.getenv(envVar or _DEFAULT_ENVVAR, path or _DEFAULT_PATH)
112 secretPath = os.path.expanduser(secretPath)
113 if not os.path.isfile(secretPath):
114 raise DbAuthNotFoundError(f"No DbAuth configuration file: {secretPath}")
115 mode = os.stat(secretPath).st_mode
116 if mode & (stat.S_IRWXG | stat.S_IRWXO) != 0:
117 raise DbAuthPermissionsError(
118 f"DbAuth configuration file {secretPath} has incorrect permissions: {mode:o}"
119 )
121 try:
122 with open(secretPath) as secretFile:
123 self.authList = yaml.safe_load(secretFile)
124 except Exception as exc:
125 raise DbAuthError(f"Unable to load DbAuth configuration file: {secretPath}.") from exc
126 self._db_auth_path = secretPath
128 @property
129 def db_auth_path(self) -> str:
130 """The path to the secrets file used to load credentials (`str`)."""
131 return self._db_auth_path
133 # dialectname, host, and database are tagged as Optional only because other
134 # routines delegate to this one in order to raise a consistent exception
135 # for that condition.
136 def getAuth(
137 self,
138 dialectname: str | None,
139 username: str | None,
140 host: str | None,
141 port: int | str | None,
142 database: str | None,
143 ) -> tuple[str | None, str]:
144 """Retrieve a username and password for a database connection.
146 This function matches elements from the database connection URL with
147 glob-like URL patterns in a list of configuration dictionaries.
149 Parameters
150 ----------
151 dialectname : `str`
152 Database dialect, for example sqlite, mysql, postgresql, oracle,
153 or mssql.
154 username : `str` or None
155 Username from connection URL if present.
156 host : `str`
157 Host name from connection URL if present.
158 port : `str` or `int` or None
159 Port from connection URL if present.
160 database : `str`
161 Database name from connection URL.
163 Returns
164 -------
165 username: `str`
166 Username to use for database connection; same as parameter if
167 present.
168 password: `str`
169 Password to use for database connection.
171 Raises
172 ------
173 DbAuthError
174 Raised if the input is missing elements, an authorization
175 dictionary is missing elements, the authorization file is
176 misconfigured, or no matching authorization is found.
178 Notes
179 -----
180 The list of authorization configuration dictionaries is tested in
181 order, with the first matching dictionary used. Each dictionary must
182 contain a ``url`` item with a pattern to match against the database
183 connection URL and a ``password`` item. If no username is provided in
184 the database connection URL, the dictionary must also contain a
185 ``username`` item.
187 The ``url`` item must begin with a dialect and is not allowed to
188 specify dialect+driver.
190 Glob-style patterns (using "``*``" and "``?``" as wildcards) can be
191 used to match the host and database name portions of the connection
192 URL. For the username, port, and database name portions, omitting them
193 from the pattern matches against any value in the connection URL.
195 Examples
196 --------
197 The connection URL
198 ``postgresql://user@host.example.com:5432/my_database`` matches against
199 the identical string as a pattern. Other patterns that would match
200 include:
202 * ``postgresql://*``
203 * ``postgresql://*.example.com``
204 * ``postgresql://*.example.com/my_*``
205 * ``postgresql://host.example.com/my_database``
206 * ``postgresql://host.example.com:5432/my_database``
207 * ``postgresql://user@host.example.com/my_database``
209 Note that the connection URL
210 ``postgresql://host.example.com/my_database`` would not match against
211 the pattern ``postgresql://host.example.com:5432``, even if the default
212 port for the connection is 5432.
213 """
214 # Check inputs, squashing MyPy warnings that they're unnecessary
215 # (since they're only unnecessary if everyone else runs MyPy).
216 if dialectname is None or dialectname == "":
217 raise DbAuthError("Missing dialectname parameter")
218 if host is None or host == "":
219 raise DbAuthError("Missing host parameter")
220 if database is None or database == "":
221 raise DbAuthError("Missing database parameter")
223 for authDict in self.authList:
224 # Check for mandatory entries
225 if "url" not in authDict:
226 raise DbAuthError("Missing URL in DbAuth configuration")
228 # Parse pseudo-URL from db-auth.yaml
229 components = urllib.parse.urlparse(authDict["url"])
231 # Check for same database backend type/dialect
232 if components.scheme == "":
233 raise DbAuthError("Missing database dialect in URL: " + authDict["url"])
235 if "+" in components.scheme:
236 raise DbAuthError(
237 "Authorization dictionary URLs should only specify "
238 f"dialects, got: {components.scheme}. instead."
239 )
241 # dialect and driver are allowed in db string, since functionality
242 # could change. Connecting to a DB using different driver does not
243 # change dbname/user/pass and other auth info so we ignore it.
244 # https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
245 dialect = dialectname.split("+")[0]
246 if dialect != components.scheme:
247 continue
249 # Check for same database name
250 if (
251 components.path != ""
252 and components.path != "/"
253 and not fnmatch.fnmatch(database, components.path.lstrip("/"))
254 ):
255 continue
257 # Check username
258 if components.username is not None:
259 if username is None or username == "":
260 continue
261 if username != components.username:
262 continue
264 # Check hostname
265 if components.hostname is None:
266 raise DbAuthError("Missing host in URL: " + authDict["url"])
267 if not fnmatch.fnmatch(host, components.hostname):
268 continue
270 # Check port
271 if components.port is not None and (port is None or str(port) != str(components.port)):
272 continue
274 # Don't override username from connection string
275 if username is not None and username != "":
276 return (username, authDict["password"])
277 else:
278 if "username" not in authDict:
279 return (None, authDict["password"])
280 return (authDict["username"], authDict["password"])
282 raise DbAuthNotFoundError(
283 f"No matching DbAuth configuration for: ({dialectname}, {username}, {host}, {port}, {database})"
284 )
286 def getUrl(self, url: str) -> str:
287 """Fill in a username and password in a database connection URL.
289 This function parses the URL and calls `getAuth`.
291 Parameters
292 ----------
293 url : `str`
294 Database connection URL.
296 Returns
297 -------
298 url : `str`
299 Database connection URL with username and password.
301 Raises
302 ------
303 DbAuthError
304 Raised if the input is missing elements, an authorization
305 dictionary is missing elements, the authorization file is
306 misconfigured, or no matching authorization is found.
308 See Also
309 --------
310 getAuth : Retrieve authentication credentials.
311 """
312 components = urllib.parse.urlparse(url)
313 username, password = self.getAuth(
314 components.scheme,
315 components.username,
316 components.hostname,
317 components.port,
318 components.path.lstrip("/"),
319 )
320 hostname = components.hostname
321 assert hostname is not None
322 if ":" in hostname: # ipv6
323 hostname = f"[{hostname}]"
324 assert username is not None
325 netloc = "{}:{}@{}".format(
326 urllib.parse.quote(username, safe=""), urllib.parse.quote(password, safe=""), hostname
327 )
328 if components.port is not None:
329 netloc += ":" + str(components.port)
330 return urllib.parse.urlunparse(
331 (
332 components.scheme,
333 netloc,
334 components.path,
335 components.params,
336 components.query,
337 components.fragment,
338 )
339 )