Coverage for python / lsst / daf / butler / remote_butler / _http_connection.py: 0%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("RemoteButlerHttpConnection", "parse_model") 

31 

32import time 

33import urllib.parse 

34from collections.abc import Iterator, Mapping 

35from contextlib import contextmanager 

36from dataclasses import dataclass 

37from typing import TypeVar 

38from uuid import uuid4 

39 

40import httpx 

41from pydantic import BaseModel, ValidationError 

42 

43from lsst.daf.butler import __version__ 

44 

45from ._errors import deserialize_butler_user_error 

46from .authentication.interface import RemoteButlerAuthenticationProvider 

47from .server_models import CLIENT_REQUEST_ID_HEADER_NAME, ERROR_STATUS_CODE, ErrorResponseModel 

48 

49_AnyPydanticModel = TypeVar("_AnyPydanticModel", bound=BaseModel) 

50"""Generic type variable that accepts any Pydantic model class.""" 

51 

52_USER_AGENT = f"RemoteButler/{__version__}" 

53 

54 

55class RemoteButlerHttpConnection: 

56 """HTTP connection to a Butler server. 

57 

58 Parameters 

59 ---------- 

60 http_client : `httpx.Client` 

61 HTTP connection pool we will use to connect to the server. 

62 server_url : `str` 

63 URL of the Butler server we will connect to. 

64 auth : `RemoteButlerAuthenticationProvider` 

65 Provides headers that will be used to authenticate with the server. 

66 """ 

67 

68 def __init__( 

69 self, http_client: httpx.Client, server_url: str, auth: RemoteButlerAuthenticationProvider 

70 ) -> None: 

71 self._client = http_client 

72 self.server_url = server_url 

73 self.auth = auth 

74 

75 def post(self, path: str, model: BaseModel) -> httpx.Response: 

76 """Send a POST request to the Butler server. 

77 

78 Parameters 

79 ---------- 

80 path : `str` 

81 A relative path to an endpoint. 

82 model : `pydantic.BaseModel` 

83 Pydantic model containing the request body to be sent to the 

84 server. 

85 

86 Returns 

87 ------- 

88 response: `~httpx.Response` 

89 The response from the server. 

90 

91 Raises 

92 ------ 

93 ButlerUserError 

94 If the server explicitly returned a user-facing error response. 

95 ButlerServerError 

96 If there is an issue communicating with the server. 

97 """ 

98 request = self._build_post_request(path, model) 

99 return self._send_request(request) 

100 

101 @contextmanager 

102 def post_with_stream_response(self, path: str, model: BaseModel) -> Iterator[httpx.Response]: 

103 """Send a POST request to the Butler server. 

104 

105 Parameters 

106 ---------- 

107 path : `str` 

108 A relative path to an endpoint. 

109 model : `pydantic.BaseModel` 

110 Pydantic model containing the request body to be sent to the 

111 server. 

112 

113 Returns 

114 ------- 

115 response: `~httpx.Response` 

116 The response from the server. 

117 

118 Raises 

119 ------ 

120 ButlerUserError 

121 If the server explicitly returned a user-facing error response. 

122 ButlerServerError 

123 If there is an issue communicating with the server. 

124 """ 

125 request = self._build_post_request(path, model) 

126 with self._send_request_with_stream_response(request) as response: 

127 yield response 

128 

129 def _build_post_request(self, path: str, model: BaseModel) -> _Request: 

130 json = model.model_dump_json().encode("utf-8") 

131 return self._build_request( 

132 "POST", 

133 path, 

134 content=json, 

135 headers={"content-type": "application/json"}, 

136 ) 

137 

138 def get(self, path: str, params: Mapping[str, str | bool] | None = None) -> httpx.Response: 

139 """Send a GET request to the Butler server. 

140 

141 Parameters 

142 ---------- 

143 path : `str` 

144 A relative path to an endpoint. 

145 params : `~collections.abc.Mapping` [ `str` , `str` | `bool` ] 

146 Query parameters included in the request URL. 

147 

148 Returns 

149 ------- 

150 response: `~httpx.Response` 

151 The response from the server. 

152 

153 Raises 

154 ------ 

155 ButlerUserError 

156 If the server explicitly returned a user-facing error response. 

157 ButlerServerError 

158 If there is an issue communicating with the server. 

159 """ 

160 request = self._build_request("GET", path, params=params) 

161 return self._send_request(request) 

162 

163 def _get_url(self, path: str, version: str = "v1") -> str: 

164 """Form the complete path to an endpoint on the server. 

165 

166 Parameters 

167 ---------- 

168 path : `str` 

169 The relative path to the server endpoint. 

170 version : `str`, optional 

171 Version string to prepend to path. Defaults to "v1". 

172 

173 Returns 

174 ------- 

175 path : `str` 

176 The full path to the endpoint. 

177 """ 

178 slash = "" if self.server_url.endswith("/") else "/" 

179 return f"{self.server_url}{slash}{version}/{path}" 

180 

181 def _build_request( 

182 self, 

183 method: str, 

184 path: str, 

185 *, 

186 content: bytes | None = None, 

187 params: Mapping[str, str | bool] | None = None, 

188 headers: Mapping[str, str] | None = None, 

189 ) -> _Request: 

190 url = self._get_url(path) 

191 

192 request_id = str(uuid4()) 

193 request_headers = {CLIENT_REQUEST_ID_HEADER_NAME: request_id, "user-agent": _USER_AGENT} 

194 request_headers.update(self.auth.get_server_headers()) 

195 if headers is not None: 

196 request_headers.update(headers) 

197 

198 return _Request( 

199 request=self._client.build_request( 

200 method, url, content=content, params=params, headers=request_headers 

201 ), 

202 request_id=request_id, 

203 ) 

204 

205 def _send_request(self, request: _Request) -> httpx.Response: 

206 """Send an HTTP request to the Butler server with authentication 

207 headers and a request ID. 

208 

209 If the server returns a user-facing error detail, raises an exception 

210 with the message as a subclass of ButlerUserError. 

211 """ 

212 try: 

213 response = self._send_with_retries(request, stream=False) 

214 self._handle_http_status(response, request.request_id) 

215 return response 

216 except httpx.HTTPStatusError as e: 

217 raise ButlerServerError( 

218 client_request_id=request.request_id, status_code=e.response.status_code 

219 ) from e 

220 except httpx.HTTPError as e: 

221 raise ButlerServerError(client_request_id=request.request_id) from e 

222 

223 @contextmanager 

224 def _send_request_with_stream_response(self, request: _Request) -> Iterator[httpx.Response]: 

225 try: 

226 response = self._send_with_retries(request, stream=True) 

227 try: 

228 self._handle_http_status(response, request.request_id) 

229 yield response 

230 finally: 

231 response.close() 

232 except httpx.HTTPStatusError as e: 

233 raise ButlerServerError( 

234 client_request_id=request.request_id, status_code=e.response.status_code 

235 ) from e 

236 except httpx.HTTPError as e: 

237 raise ButlerServerError(client_request_id=request.request_id) from e 

238 

239 def _send_with_retries(self, request: _Request, stream: bool) -> httpx.Response: 

240 max_retry_time_seconds = 120 

241 start_time = time.time() 

242 while True: 

243 response = self._client.send(request.request, stream=stream) 

244 retry = _needs_retry(response) 

245 time_remaining = max_retry_time_seconds - (time.time() - start_time) 

246 if retry.retry and time_remaining > 0: 

247 if stream: 

248 response.close() 

249 sleep_time = min(time_remaining, retry.delay_seconds) 

250 time.sleep(sleep_time) 

251 else: 

252 return response 

253 

254 def _handle_http_status(self, response: httpx.Response, request_id: str) -> None: 

255 if response.status_code == ERROR_STATUS_CODE: 

256 # Raise an exception that the server has forwarded to the 

257 # client. 

258 model = _try_to_parse_model(response, ErrorResponseModel) 

259 if model is not None: 

260 exc = deserialize_butler_user_error(model) 

261 exc.add_note(f"Client request ID: {request_id}") 

262 raise exc 

263 # If model is None, server sent an expected error code, but 

264 # the body wasn't in the expected JSON format. This likely 

265 # means some HTTP thing between us and the server is 

266 # misbehaving. 

267 

268 response.raise_for_status() 

269 

270 

271@dataclass(frozen=True) 

272class _Retry: 

273 retry: bool 

274 delay_seconds: int 

275 

276 

277def _needs_retry(response: httpx.Response) -> _Retry: 

278 # Handle a 503 Service Unavailable, sent by the server if it is 

279 # overloaded, or a 429, sent by the server if the client 

280 # triggers a rate limit. 

281 if response.status_code == 503 or response.status_code == 429: 

282 # Only retry if the server has instructed us to do so by sending a 

283 # Retry-After header. 

284 retry_after = response.headers.get("retry-after") 

285 if retry_after is not None: 

286 try: 

287 # The HTTP standard also allows a date string here, but the 

288 # Butler server only sends integer seconds. 

289 delay_seconds = int(retry_after) 

290 return _Retry(True, delay_seconds) 

291 except ValueError: 

292 pass 

293 

294 return _Retry(False, 0) 

295 

296 

297def parse_model(response: httpx.Response, model: type[_AnyPydanticModel]) -> _AnyPydanticModel: 

298 """Deserialize a Pydantic model from the body of an HTTP response. 

299 

300 Parameters 

301 ---------- 

302 response : `~httpx.Response` 

303 An HTTP response object. 

304 model : `type` [ ``pydantic.BaseModel`` ] 

305 A Pydantic model class that will be used to parse the response body. 

306 

307 Returns 

308 ------- 

309 response_model : ``pydantic.BaseModel`` 

310 An instance of the Pydantic model class loaded from the response body. 

311 """ 

312 return model.model_validate_json(response.read()) 

313 

314 

315def _try_to_parse_model(response: httpx.Response, model: type[_AnyPydanticModel]) -> _AnyPydanticModel | None: 

316 """Attempt to deserialize a Pydantic model from the body of an HTTP 

317 response. Returns `None` if the content could not be parsed as JSON or 

318 failed validation against the model. 

319 """ 

320 try: 

321 return parse_model(response, model) 

322 except (ValueError, ValidationError): 

323 return None 

324 

325 

326class ButlerServerError(RuntimeError): 

327 """Exception returned when there is an error communicating with the Butler 

328 server. 

329 

330 Parameters 

331 ---------- 

332 client_request_id : `str` 

333 Request ID to include in the exception message. 

334 status_code : `int`, optional 

335 HTTP status code for response that triggered the error, if available. 

336 """ 

337 

338 def __init__(self, *, client_request_id: str, status_code: int | None = None): 

339 super().__init__(f"Error while communicating with Butler server. Request ID: {client_request_id}") 

340 self.client_request_id = client_request_id 

341 self.status_code = status_code 

342 

343 

344def quote_path_variable(path: str) -> str: # numpydoc ignore=PR01 

345 """URL encode a string to make it suitable for inserting as a segment of 

346 the path part of a URL. 

347 """ 

348 return urllib.parse.quote(path, safe="") 

349 

350 

351@dataclass(frozen=True) 

352class _Request: 

353 request: httpx.Request 

354 request_id: str