Coverage for python / lsst / daf / butler / remote_butler / _http_connection.py: 0%
122 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("RemoteButlerHttpConnection", "parse_model")
32import time
33import urllib.parse
34from collections.abc import Iterator, Mapping
35from contextlib import contextmanager
36from dataclasses import dataclass
37from typing import TypeVar
38from uuid import uuid4
40import httpx
41from pydantic import BaseModel, ValidationError
43from lsst.daf.butler import __version__
45from ._errors import deserialize_butler_user_error
46from .authentication.interface import RemoteButlerAuthenticationProvider
47from .server_models import CLIENT_REQUEST_ID_HEADER_NAME, ERROR_STATUS_CODE, ErrorResponseModel
49_AnyPydanticModel = TypeVar("_AnyPydanticModel", bound=BaseModel)
50"""Generic type variable that accepts any Pydantic model class."""
52_USER_AGENT = f"RemoteButler/{__version__}"
55class RemoteButlerHttpConnection:
56 """HTTP connection to a Butler server.
58 Parameters
59 ----------
60 http_client : `httpx.Client`
61 HTTP connection pool we will use to connect to the server.
62 server_url : `str`
63 URL of the Butler server we will connect to.
64 auth : `RemoteButlerAuthenticationProvider`
65 Provides headers that will be used to authenticate with the server.
66 """
68 def __init__(
69 self, http_client: httpx.Client, server_url: str, auth: RemoteButlerAuthenticationProvider
70 ) -> None:
71 self._client = http_client
72 self.server_url = server_url
73 self.auth = auth
75 def post(self, path: str, model: BaseModel) -> httpx.Response:
76 """Send a POST request to the Butler server.
78 Parameters
79 ----------
80 path : `str`
81 A relative path to an endpoint.
82 model : `pydantic.BaseModel`
83 Pydantic model containing the request body to be sent to the
84 server.
86 Returns
87 -------
88 response: `~httpx.Response`
89 The response from the server.
91 Raises
92 ------
93 ButlerUserError
94 If the server explicitly returned a user-facing error response.
95 ButlerServerError
96 If there is an issue communicating with the server.
97 """
98 request = self._build_post_request(path, model)
99 return self._send_request(request)
101 @contextmanager
102 def post_with_stream_response(self, path: str, model: BaseModel) -> Iterator[httpx.Response]:
103 """Send a POST request to the Butler server.
105 Parameters
106 ----------
107 path : `str`
108 A relative path to an endpoint.
109 model : `pydantic.BaseModel`
110 Pydantic model containing the request body to be sent to the
111 server.
113 Returns
114 -------
115 response: `~httpx.Response`
116 The response from the server.
118 Raises
119 ------
120 ButlerUserError
121 If the server explicitly returned a user-facing error response.
122 ButlerServerError
123 If there is an issue communicating with the server.
124 """
125 request = self._build_post_request(path, model)
126 with self._send_request_with_stream_response(request) as response:
127 yield response
129 def _build_post_request(self, path: str, model: BaseModel) -> _Request:
130 json = model.model_dump_json().encode("utf-8")
131 return self._build_request(
132 "POST",
133 path,
134 content=json,
135 headers={"content-type": "application/json"},
136 )
138 def get(self, path: str, params: Mapping[str, str | bool] | None = None) -> httpx.Response:
139 """Send a GET request to the Butler server.
141 Parameters
142 ----------
143 path : `str`
144 A relative path to an endpoint.
145 params : `~collections.abc.Mapping` [ `str` , `str` | `bool` ]
146 Query parameters included in the request URL.
148 Returns
149 -------
150 response: `~httpx.Response`
151 The response from the server.
153 Raises
154 ------
155 ButlerUserError
156 If the server explicitly returned a user-facing error response.
157 ButlerServerError
158 If there is an issue communicating with the server.
159 """
160 request = self._build_request("GET", path, params=params)
161 return self._send_request(request)
163 def _get_url(self, path: str, version: str = "v1") -> str:
164 """Form the complete path to an endpoint on the server.
166 Parameters
167 ----------
168 path : `str`
169 The relative path to the server endpoint.
170 version : `str`, optional
171 Version string to prepend to path. Defaults to "v1".
173 Returns
174 -------
175 path : `str`
176 The full path to the endpoint.
177 """
178 slash = "" if self.server_url.endswith("/") else "/"
179 return f"{self.server_url}{slash}{version}/{path}"
181 def _build_request(
182 self,
183 method: str,
184 path: str,
185 *,
186 content: bytes | None = None,
187 params: Mapping[str, str | bool] | None = None,
188 headers: Mapping[str, str] | None = None,
189 ) -> _Request:
190 url = self._get_url(path)
192 request_id = str(uuid4())
193 request_headers = {CLIENT_REQUEST_ID_HEADER_NAME: request_id, "user-agent": _USER_AGENT}
194 request_headers.update(self.auth.get_server_headers())
195 if headers is not None:
196 request_headers.update(headers)
198 return _Request(
199 request=self._client.build_request(
200 method, url, content=content, params=params, headers=request_headers
201 ),
202 request_id=request_id,
203 )
205 def _send_request(self, request: _Request) -> httpx.Response:
206 """Send an HTTP request to the Butler server with authentication
207 headers and a request ID.
209 If the server returns a user-facing error detail, raises an exception
210 with the message as a subclass of ButlerUserError.
211 """
212 try:
213 response = self._send_with_retries(request, stream=False)
214 self._handle_http_status(response, request.request_id)
215 return response
216 except httpx.HTTPStatusError as e:
217 raise ButlerServerError(
218 client_request_id=request.request_id, status_code=e.response.status_code
219 ) from e
220 except httpx.HTTPError as e:
221 raise ButlerServerError(client_request_id=request.request_id) from e
223 @contextmanager
224 def _send_request_with_stream_response(self, request: _Request) -> Iterator[httpx.Response]:
225 try:
226 response = self._send_with_retries(request, stream=True)
227 try:
228 self._handle_http_status(response, request.request_id)
229 yield response
230 finally:
231 response.close()
232 except httpx.HTTPStatusError as e:
233 raise ButlerServerError(
234 client_request_id=request.request_id, status_code=e.response.status_code
235 ) from e
236 except httpx.HTTPError as e:
237 raise ButlerServerError(client_request_id=request.request_id) from e
239 def _send_with_retries(self, request: _Request, stream: bool) -> httpx.Response:
240 max_retry_time_seconds = 120
241 start_time = time.time()
242 while True:
243 response = self._client.send(request.request, stream=stream)
244 retry = _needs_retry(response)
245 time_remaining = max_retry_time_seconds - (time.time() - start_time)
246 if retry.retry and time_remaining > 0:
247 if stream:
248 response.close()
249 sleep_time = min(time_remaining, retry.delay_seconds)
250 time.sleep(sleep_time)
251 else:
252 return response
254 def _handle_http_status(self, response: httpx.Response, request_id: str) -> None:
255 if response.status_code == ERROR_STATUS_CODE:
256 # Raise an exception that the server has forwarded to the
257 # client.
258 model = _try_to_parse_model(response, ErrorResponseModel)
259 if model is not None:
260 exc = deserialize_butler_user_error(model)
261 exc.add_note(f"Client request ID: {request_id}")
262 raise exc
263 # If model is None, server sent an expected error code, but
264 # the body wasn't in the expected JSON format. This likely
265 # means some HTTP thing between us and the server is
266 # misbehaving.
268 response.raise_for_status()
271@dataclass(frozen=True)
272class _Retry:
273 retry: bool
274 delay_seconds: int
277def _needs_retry(response: httpx.Response) -> _Retry:
278 # Handle a 503 Service Unavailable, sent by the server if it is
279 # overloaded, or a 429, sent by the server if the client
280 # triggers a rate limit.
281 if response.status_code == 503 or response.status_code == 429:
282 # Only retry if the server has instructed us to do so by sending a
283 # Retry-After header.
284 retry_after = response.headers.get("retry-after")
285 if retry_after is not None:
286 try:
287 # The HTTP standard also allows a date string here, but the
288 # Butler server only sends integer seconds.
289 delay_seconds = int(retry_after)
290 return _Retry(True, delay_seconds)
291 except ValueError:
292 pass
294 return _Retry(False, 0)
297def parse_model(response: httpx.Response, model: type[_AnyPydanticModel]) -> _AnyPydanticModel:
298 """Deserialize a Pydantic model from the body of an HTTP response.
300 Parameters
301 ----------
302 response : `~httpx.Response`
303 An HTTP response object.
304 model : `type` [ ``pydantic.BaseModel`` ]
305 A Pydantic model class that will be used to parse the response body.
307 Returns
308 -------
309 response_model : ``pydantic.BaseModel``
310 An instance of the Pydantic model class loaded from the response body.
311 """
312 return model.model_validate_json(response.read())
315def _try_to_parse_model(response: httpx.Response, model: type[_AnyPydanticModel]) -> _AnyPydanticModel | None:
316 """Attempt to deserialize a Pydantic model from the body of an HTTP
317 response. Returns `None` if the content could not be parsed as JSON or
318 failed validation against the model.
319 """
320 try:
321 return parse_model(response, model)
322 except (ValueError, ValidationError):
323 return None
326class ButlerServerError(RuntimeError):
327 """Exception returned when there is an error communicating with the Butler
328 server.
330 Parameters
331 ----------
332 client_request_id : `str`
333 Request ID to include in the exception message.
334 status_code : `int`, optional
335 HTTP status code for response that triggered the error, if available.
336 """
338 def __init__(self, *, client_request_id: str, status_code: int | None = None):
339 super().__init__(f"Error while communicating with Butler server. Request ID: {client_request_id}")
340 self.client_request_id = client_request_id
341 self.status_code = status_code
344def quote_path_variable(path: str) -> str: # numpydoc ignore=PR01
345 """URL encode a string to make it suitable for inserting as a segment of
346 the path part of a URL.
347 """
348 return urllib.parse.quote(path, safe="")
351@dataclass(frozen=True)
352class _Request:
353 request: httpx.Request
354 request_id: str