Coverage for tests / test_server.py: 14%

362 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import asyncio 

29import os.path 

30import tempfile 

31import threading 

32import unittest 

33import unittest.mock 

34import uuid 

35from concurrent.futures import ThreadPoolExecutor 

36from unittest.mock import DEFAULT, AsyncMock, NonCallableMock, patch 

37 

38from lsst.daf.butler import ( 

39 Butler, 

40 DataCoordinate, 

41 DatasetId, 

42 DatasetNotFoundError, 

43 DatasetRef, 

44 DatasetType, 

45 FileDataset, 

46 InvalidQueryError, 

47 LabeledButlerFactory, 

48 MissingDatasetTypeError, 

49 NoDefaultCollectionError, 

50 StorageClassFactory, 

51) 

52from lsst.daf.butler.datastore import DatasetRefURIs 

53from lsst.daf.butler.registry import RegistryDefaults 

54from lsst.daf.butler.tests import DatastoreMock, addDatasetType 

55from lsst.daf.butler.tests.dict_convertible_model import DictConvertibleModel 

56from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available 

57from lsst.daf.butler.tests.utils import MetricsExample, MetricTestRepo, mock_env 

58from lsst.resources import ResourcePath 

59from lsst.resources.http import HttpResourcePath 

60 

61if butler_server_is_available: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 import fastapi 

63 import httpx 

64 import safir.dependencies.logger 

65 from fastapi.testclient import TestClient 

66 

67 import lsst.daf.butler.remote_butler._query_results 

68 import lsst.daf.butler.remote_butler.server.handlers._query_limits 

69 import lsst.daf.butler.remote_butler.server.handlers._query_streaming 

70 from lsst.daf.butler.remote_butler import ButlerServerError, RemoteButler 

71 from lsst.daf.butler.remote_butler.authentication.cadc import CadcAuthenticationProvider 

72 from lsst.daf.butler.remote_butler.authentication.rubin import ( 

73 _EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY, 

74 RubinAuthenticationProvider, 

75 ) 

76 from lsst.daf.butler.remote_butler.server import create_app 

77 from lsst.daf.butler.remote_butler.server._config import mock_config 

78 from lsst.daf.butler.remote_butler.server._dependencies import ( 

79 authorizer_dependency, 

80 butler_factory_dependency, 

81 ) 

82 from lsst.daf.butler.remote_butler.server._gafaelfawr import MockGafaelfawrGroupAuthorizer 

83 from lsst.daf.butler.remote_butler.server.handlers._utils import generate_file_download_uri 

84 from lsst.daf.butler.remote_butler.server_models import QueryCollectionsRequestModel 

85 from lsst.daf.butler.tests.server import TEST_REPOSITORY_NAME, UnhandledServerError, create_test_server 

86 

87 

88TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

89 

90 

91@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

92class ButlerClientServerTestCase(unittest.TestCase): 

93 """Test for Butler client/server.""" 

94 

95 @classmethod 

96 def setUpClass(cls): 

97 server_instance = cls.enterClassContext(create_test_server(TESTDIR)) 

98 cls.server_instance = server_instance 

99 cls.client = server_instance.client 

100 cls.butler = server_instance.remote_butler 

101 cls.butler_without_error_propagation = server_instance.remote_butler_without_error_propagation 

102 

103 cls.storageClassFactory = StorageClassFactory() 

104 

105 cls.repo = MetricTestRepo.create_from_butler( 

106 server_instance.direct_butler, server_instance.config_file_path 

107 ) 

108 # Add a file with corrupted data for testing error conditions 

109 cls.dataset_with_corrupted_data = _create_corrupted_dataset(cls.repo) 

110 # All of the datasets that come with MetricTestRepo are disassembled 

111 # composites. Add a simple dataset for testing the common case. 

112 cls.simple_dataset_ref = _create_simple_dataset(server_instance.direct_butler) 

113 

114 # Populate the test server. 

115 # The DatastoreMock is required because the datasets referenced in 

116 # these imports do not point at real files. 

117 direct_butler = server_instance.direct_butler 

118 DatastoreMock.apply(direct_butler) 

119 direct_butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/base.yaml") 

120 direct_butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/datasets.yaml") 

121 

122 def test_health_check(self): 

123 try: 

124 import importlib.metadata 

125 

126 importlib.metadata.metadata("lsst.daf.butler") 

127 except ModuleNotFoundError: 

128 raise self.skipTest("Standard python package metadata not available. Butler not pip installed.") 

129 response = self.client.get("/") 

130 self.assertEqual(response.status_code, 200) 

131 self.assertEqual(response.json()["name"], "butler") 

132 

133 def test_static_files(self): 

134 with tempfile.TemporaryDirectory() as tmpdir: 

135 with open(os.path.join(tmpdir, "temp.txt"), "w") as fh: 

136 fh.write("test data 123") 

137 

138 with mock_config() as server_config: 

139 server_config.static_files_path = tmpdir 

140 with create_test_server(TESTDIR, server_config=server_config) as server: 

141 response = server.client.get("/api/butler/configs/temp.txt") 

142 self.assertEqual(response.status_code, 200) 

143 self.assertEqual(response.text, "test data 123") 

144 

145 def test_dimension_universe(self): 

146 universe = self.butler.dimensions 

147 self.assertEqual(universe.namespace, "daf_butler") 

148 

149 def test_get_dataset_type(self): 

150 bias_type = self.butler.get_dataset_type("bias") 

151 self.assertEqual(bias_type.name, "bias") 

152 

153 with self.assertRaises(MissingDatasetTypeError): 

154 self.butler_without_error_propagation.get_dataset_type("not_bias") 

155 

156 def test_find_dataset(self): 

157 storage_class = self.storageClassFactory.getStorageClass("Exposure") 

158 

159 ref = self.butler.find_dataset("bias", collections="imported_g", detector=1, instrument="Cam1") 

160 self.assertIsInstance(ref, DatasetRef) 

161 self.assertEqual(ref.id, uuid.UUID("e15ab039-bc8b-4135-87c5-90902a7c0b22")) 

162 self.assertFalse(ref.dataId.hasRecords()) 

163 

164 # Try again with variation of parameters. 

165 ref_new = self.butler.find_dataset( 

166 "bias", 

167 {"detector": 1}, 

168 collections="imported_g", 

169 instrument="Cam1", 

170 dimension_records=True, 

171 ) 

172 self.assertEqual(ref_new, ref) 

173 self.assertTrue(ref_new.dataId.hasRecords()) 

174 

175 ref_new = self.butler.find_dataset( 

176 ref.datasetType, 

177 DataCoordinate.standardize(detector=1, instrument="Cam1", universe=self.butler.dimensions), 

178 collections="imported_g", 

179 storage_class=storage_class, 

180 ) 

181 self.assertEqual(ref_new, ref) 

182 

183 ref2 = self.butler.get_dataset(ref.id) 

184 self.assertEqual(ref2, ref) 

185 

186 # Use detector name to find it. 

187 ref3 = self.butler.find_dataset( 

188 ref.datasetType, 

189 collections="imported_g", 

190 instrument="Cam1", 

191 full_name="Aa", 

192 ) 

193 self.assertEqual(ref2, ref3) 

194 

195 # Try expanded refs. 

196 self.assertFalse(ref.dataId.hasRecords()) 

197 expanded = self.butler.get_dataset(ref.id, dimension_records=True) 

198 self.assertTrue(expanded.dataId.hasRecords()) 

199 

200 # The test datasets are all Exposure so storage class conversion 

201 # can not be tested until we fix that. For now at least test the 

202 # code paths. 

203 bias = self.butler.get_dataset(ref.id, storage_class=storage_class) 

204 self.assertEqual(bias.datasetType.storageClass, storage_class) 

205 

206 # Unknown dataset should not fail. 

207 self.assertIsNone(self.butler.get_dataset(uuid.uuid4())) 

208 self.assertIsNone(self.butler.get_dataset(uuid.uuid4(), storage_class="NumpyArray")) 

209 

210 def test_instantiate_via_butler_http_search(self): 

211 """Ensure that the primary Butler constructor's automatic search logic 

212 correctly locates and reads the configuration file and ends up with a 

213 RemoteButler pointing to the correct URL 

214 """ 

215 

216 # This is kind of a fragile test. Butler's search logic does a lot of 

217 # manipulations involving creating new ResourcePaths, and ResourcePath 

218 # doesn't use httpx so we can't easily inject the TestClient in there. 

219 # We don't have an actual valid HTTP URL to give to the constructor 

220 # because the test instance of the server is accessed via ASGI. 

221 # 

222 # Instead we just monkeypatch the HTTPResourcePath 'read' method and 

223 # hope that all ResourcePath HTTP reads during construction are going 

224 # to the server under test. 

225 def override_read(http_resource_path): 

226 return self.client.get(http_resource_path.geturl()).content 

227 

228 server_url = f"https://test.example/api/butler/repo/{TEST_REPOSITORY_NAME}/" 

229 

230 with patch.object(HttpResourcePath, "read", override_read): 

231 # RegistryDefaults.finish() needs to download the dimension 

232 # universe from the server, which will fail because there is no 

233 # server here. So mock it out. 

234 with patch.object(RegistryDefaults, "finish"): 

235 # Add access key to environment variables. RemoteButler 

236 # instantiation will throw an error if access key is not 

237 # available. 

238 with mock_env({_EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY: "fake-access-token"}): 

239 butler = Butler( 

240 server_url, 

241 collections=["collection1", "collection2"], 

242 run="collection2", 

243 ) 

244 self.enterContext(butler) 

245 self.assertIsInstance(butler, RemoteButler) 

246 self.assertEqual(butler._connection.server_url, server_url) 

247 self.assertEqual(butler.collections.defaults, ("collection1", "collection2")) 

248 self.assertEqual(butler.run, "collection2") 

249 # A butler created this way uses the default cache config. 

250 self.assertFalse(butler._use_disabled_datastore_cache) 

251 

252 butler_factory = LabeledButlerFactory({"server": server_url}) 

253 factory_created_butler = butler_factory.create_butler(label="server", access_token="token") 

254 self.assertIsInstance(factory_created_butler, RemoteButler) 

255 self.assertTrue(factory_created_butler._use_disabled_datastore_cache) 

256 self.assertEqual(factory_created_butler._connection.server_url, server_url) 

257 

258 def test_get(self): 

259 dataset_type = "test_metric_comp" 

260 data_id = {"instrument": "DummyCamComp", "visit": 423} 

261 collections = "ingest/run" 

262 # Test get() of a DatasetRef. 

263 ref = self.butler.find_dataset(dataset_type, data_id, collections=collections) 

264 metric = self.butler.get(ref) 

265 self.assertIsInstance(metric, MetricsExample) 

266 self.assertEqual(metric.summary, MetricTestRepo.METRICS_EXAMPLE_SUMMARY) 

267 

268 # Test get() by DataId. 

269 data_id_metric = self.butler.get(dataset_type, dataId=data_id, collections=collections) 

270 self.assertEqual(metric, data_id_metric) 

271 # Test get() by DataId dict augmented with kwargs. 

272 kwarg_metric = self.butler.get( 

273 dataset_type, dataId={"instrument": "DummyCamComp"}, collections=collections, visit=423 

274 ) 

275 self.assertEqual(metric, kwarg_metric) 

276 # Test get() by DataId DataCoordinate augmented with kwargs. 

277 coordinate = DataCoordinate.make_empty(self.butler.dimensions) 

278 kwarg_data_coordinate_metric = self.butler.get( 

279 dataset_type, dataId=coordinate, collections=collections, instrument="DummyCamComp", visit=423 

280 ) 

281 self.assertEqual(metric, kwarg_data_coordinate_metric) 

282 # Test get() of a non-existent DataId. 

283 invalid_data_id = {"instrument": "NotAValidlInstrument", "visit": 423} 

284 with self.assertRaises(DatasetNotFoundError): 

285 self.butler_without_error_propagation.get( 

286 dataset_type, dataId=invalid_data_id, collections=collections 

287 ) 

288 

289 # Test get() by DataId with default collections. 

290 butler_with_default_collection = self.butler.clone(collections="ingest/run") 

291 default_collection_metric = butler_with_default_collection.get(dataset_type, dataId=data_id) 

292 self.assertEqual(metric, default_collection_metric) 

293 

294 # Test get() by DataId with no collections specified. 

295 with self.assertRaises(NoDefaultCollectionError): 

296 self.butler_without_error_propagation.get(dataset_type, dataId=data_id) 

297 

298 # Test looking up a non-existent ref 

299 invalid_ref = ref.replace(id=uuid.uuid4()) 

300 with self.assertRaises(DatasetNotFoundError): 

301 self.butler_without_error_propagation.get(invalid_ref) 

302 

303 with self.assertRaises(RuntimeError): 

304 self.butler_without_error_propagation.get(self.dataset_with_corrupted_data) 

305 

306 # Test storage class override 

307 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion") 

308 

309 def check_sc_override(converted): 

310 self.assertNotEqual(type(metric), type(converted)) 

311 self.assertIsInstance(converted, new_sc.pytype) 

312 self.assertEqual(metric, converted) 

313 

314 check_sc_override(self.butler.get(ref, storageClass=new_sc)) 

315 

316 # Test storage class override via DatasetRef. 

317 check_sc_override(self.butler.get(ref.overrideStorageClass("MetricsConversion"))) 

318 # Test storage class override via DatasetType. 

319 check_sc_override( 

320 self.butler.get( 

321 ref.datasetType.overrideStorageClass(new_sc), dataId=data_id, collections=collections 

322 ) 

323 ) 

324 

325 # Test component override via DatasetRef. 

326 component_ref = ref.makeComponentRef("summary") 

327 component_data = self.butler.get(component_ref) 

328 self.assertEqual(component_data, MetricTestRepo.METRICS_EXAMPLE_SUMMARY) 

329 

330 # Test overriding both storage class and component via DatasetRef. 

331 converted_component_data = self.butler.get(component_ref, storageClass="DictConvertibleModel") 

332 self.assertIsInstance(converted_component_data, DictConvertibleModel) 

333 self.assertEqual(converted_component_data.content, MetricTestRepo.METRICS_EXAMPLE_SUMMARY) 

334 

335 # Test component override via DatasetType. 

336 dataset_type_component_data = self.butler.get( 

337 component_ref.datasetType, component_ref.dataId, collections=collections 

338 ) 

339 self.assertEqual(dataset_type_component_data, MetricTestRepo.METRICS_EXAMPLE_SUMMARY) 

340 

341 def test_getURIs_no_components(self): 

342 # This dataset does not have components, and should return one URI. 

343 def check_uri(uri: ResourcePath): 

344 self.assertIsNotNone(uris.primaryURI) 

345 self.assertEqual(uris.primaryURI.scheme, "https") 

346 self.assertEqual(uris.primaryURI.read(), b"123") 

347 

348 uris = self.butler.getURIs(self.simple_dataset_ref) 

349 self.assertEqual(len(uris.componentURIs), 0) 

350 check_uri(uris.primaryURI) 

351 

352 check_uri(self.butler.getURI(self.simple_dataset_ref)) 

353 

354 def test_getURIs_multiple_components(self): 

355 # This dataset has multiple components, so we should get back multiple 

356 # URIs. 

357 dataset_type = "test_metric_comp" 

358 data_id = {"instrument": "DummyCamComp", "visit": 423} 

359 collections = "ingest/run" 

360 

361 def check_uris(uris: DatasetRefURIs): 

362 self.assertIsNone(uris.primaryURI) 

363 self.assertEqual(len(uris.componentURIs), 3) 

364 path = uris.componentURIs["summary"] 

365 self.assertEqual(path.scheme, "https") 

366 data = path.read() 

367 self.assertEqual(data, b"AM1: 5.2\nAM2: 30.6\n") 

368 

369 uris = self.butler.getURIs(dataset_type, dataId=data_id, collections=collections) 

370 check_uris(uris) 

371 

372 # Calling getURI on a multi-file dataset raises an exception 

373 with self.assertRaises(RuntimeError): 

374 self.butler.getURI(dataset_type, dataId=data_id, collections=collections) 

375 

376 # getURIs does NOT respect component overrides on the DatasetRef, 

377 # instead returning the parent's URIs. Unclear if this is "correct" 

378 # from a conceptual point of view, but this matches DirectButler 

379 # behavior. 

380 ref = self.butler.find_dataset(dataset_type, data_id=data_id, collections=collections) 

381 componentRef = ref.makeComponentRef("summary") 

382 componentUris = self.butler.getURIs(componentRef) 

383 check_uris(componentUris) 

384 

385 def test_file_download_redirect(self): 

386 def get_download_redirect(id: DatasetId, component: str | None = None) -> httpx.Response: 

387 uri = generate_file_download_uri("http://unittest.test/", TEST_REPOSITORY_NAME, id, component) 

388 return self.client.get( 

389 uri, 

390 follow_redirects=False, 

391 headers=RubinAuthenticationProvider("mock-token").get_server_headers(), 

392 ) 

393 

394 # Test behavior of a single-file dataset. 

395 response = get_download_redirect(self.simple_dataset_ref.id) 

396 self.assertEqual(response.status_code, 307) 

397 self.assertTrue(response.has_redirect_location) 

398 assert response.next_request is not None 

399 self.assertEqual(response.next_request.url.scheme, "https") 

400 self.assertIn("test_int_DummyCamComp_R_d-r_423_ingest_run.json", response.next_request.url.path) 

401 

402 response = get_download_redirect(self.simple_dataset_ref.id, "somecomponent") 

403 self.assertEqual(response.status_code, 404) 

404 

405 # This dataset is a "disassembled composite" with multiple files. 

406 dataset_type = "test_metric_comp" 

407 data_id = {"instrument": "DummyCamComp", "visit": 423} 

408 collections = "ingest/run" 

409 ref = self.butler.find_dataset(dataset_type, data_id, collections=collections) 

410 

411 # Getting single component of a multi-file "disassembled composite". 

412 response = get_download_redirect(ref.id, "summary") 

413 self.assertEqual(response.status_code, 307) 

414 self.assertTrue(response.has_redirect_location) 

415 assert response.next_request is not None 

416 self.assertEqual(response.next_request.url.scheme, "https") 

417 self.assertIn("test_metric_comp.summary", response.next_request.url.path) 

418 

419 # Unknown component. 

420 response = get_download_redirect(ref.id, "badcomponent") 

421 self.assertEqual(response.status_code, 404) 

422 

423 # Not specifying the component for a multi-file "disassembled 

424 # composite". 

425 response = get_download_redirect(ref.id, None) 

426 self.assertEqual(response.status_code, 422) 

427 

428 # Unknown dataset. 

429 response = get_download_redirect(uuid.UUID("59467c1b-fa13-4f7a-8ff8-cd83e092e563")) 

430 self.assertEqual(response.status_code, 404) 

431 

432 def test_auth_check(self): 

433 # This is checking that the unit-test middleware for validating the 

434 # authentication headers is working. It doesn't test actual server 

435 # functionality -- in a real deployment, the authentication headers are 

436 # handled by GafaelfawrIngress, not our app. 

437 with self.assertRaises(UnhandledServerError): 

438 self.client.get("/v1/dataset_type/int") 

439 

440 def test_exception_logging(self): 

441 app = create_app() 

442 

443 def raise_error(): 

444 raise RuntimeError("An unhandled error") 

445 

446 app.dependency_overrides[butler_factory_dependency] = raise_error 

447 client = TestClient(app, raise_server_exceptions=False) 

448 

449 with patch.object(safir.dependencies.logger, "logger_dependency") as mock_logger_dep: 

450 mock_logger = NonCallableMock(["aerror"]) 

451 

452 async def noop(): 

453 pass 

454 

455 mock_logger.aerror.return_value = noop() 

456 

457 async def get_logger(): 

458 return mock_logger 

459 

460 mock_logger_dep.return_value = get_logger() 

461 client.get( 

462 "/api/butler/repo/something/v1/dataset_type/int", 

463 headers={"X-Auth-Request-User": "user-name", "X-Butler-Client-Request-Id": "request-id"}, 

464 ) 

465 mock_logger_dep.assert_called_once() 

466 

467 mock_logger.aerror.assert_called_once() 

468 args, kwargs = mock_logger.aerror.call_args 

469 self.assertIsInstance(kwargs["exc_info"], RuntimeError) 

470 self.assertEqual(kwargs["clientRequestId"], "request-id") 

471 self.assertEqual(kwargs["user"], "user-name") 

472 

473 def test_query_keepalive(self): 

474 """Test that long-running queries stream keep-alive messages to stop 

475 the HTTP connection from closing before they are able to return 

476 results. 

477 """ 

478 # Normally it takes 15 seconds for a timeout -- mock it to trigger 

479 # immediately instead. 

480 with patch.object( 

481 lsst.daf.butler.remote_butler.server.handlers._query_streaming, "_timeout" 

482 ) as mock_timeout: 

483 # Hook into QueryDriver to track the number of keep-alives we have 

484 # seen. 

485 with patch.object( 

486 lsst.daf.butler.remote_butler._query_results, "_received_keep_alive" 

487 ) as mock_keep_alive: 

488 mock_timeout.side_effect = _timeout_twice() 

489 with self.butler.query() as query: 

490 datasets = list(query.datasets("bias", "imported_g")) 

491 self.assertEqual(len(datasets), 3) 

492 self.assertGreaterEqual(mock_timeout.call_count, 3) 

493 self.assertGreaterEqual(mock_keep_alive.call_count, 2) 

494 

495 def test_query_retries(self): 

496 """Test that the server will send HTTP status 503 to put backpressure 

497 on clients if it is overloaded, and that the client will retry if this 

498 happens. 

499 """ 

500 query_event = threading.Event() 

501 retry_event = asyncio.Event() 

502 

503 async def block_first_request() -> None: 

504 # Signal the unit tests that we have reached the critical section 

505 # in the server, where the first client has reserved the query 

506 # slot. 

507 query_event.set() 

508 # Block inside the query, until the 2nd client has been forced to 

509 # retry. 

510 await retry_event.wait() 

511 

512 async def block_second_request() -> None: 

513 # Release the first client, so it can finish its query and prevent 

514 # this client from being blocked on the next go-round. 

515 retry_event.set() 

516 

517 def do_query(butler: Butler) -> list[DatasetRef]: 

518 return butler.query_datasets("bias", "imported_g") 

519 

520 with ( 

521 patch.object( 

522 lsst.daf.butler.remote_butler.server.handlers._query_limits, 

523 "_MAXIMUM_CONCURRENT_STREAMING_QUERIES", 

524 new=1, 

525 ), 

526 patch.object( 

527 lsst.daf.butler.remote_butler.server.handlers._query_limits, "_QUERY_RETRY_SECONDS", new=1 

528 ), 

529 patch.object( 

530 lsst.daf.butler.remote_butler.server.handlers._query_limits, 

531 "_block_query_for_unit_test", 

532 new=AsyncMock(wraps=block_first_request), 

533 ) as mock_first_client, 

534 patch.object( 

535 lsst.daf.butler.remote_butler.server.handlers._query_limits, 

536 "_block_retry_for_unit_test", 

537 new=AsyncMock(wraps=block_second_request), 

538 ) as mock_second_client, 

539 ThreadPoolExecutor(max_workers=1) as exec1, 

540 ThreadPoolExecutor(max_workers=1) as exec2, 

541 ): 

542 first_butler = self.butler 

543 second_butler = self.butler.clone() 

544 

545 # Run the first client up until the server starts executing its 

546 # query. 

547 future1 = exec1.submit(do_query, first_butler) 

548 event_reached = query_event.wait(60) 

549 if not event_reached: 

550 raise TimeoutError("Server did not execute query logic as expected.") 

551 

552 # Start the second client, which will trigger the retry logic and 

553 # release the first client to finish its query. 

554 future2 = exec2.submit(do_query, second_butler) 

555 

556 result1 = future1.result(60) 

557 result2 = future2.result(60) 

558 self.assertEqual(len(result1), 3) 

559 self.assertEqual(len(result2), 3) 

560 # The original thread should have gone through this section, and 

561 # then the 2nd thread after it retries. 

562 self.assertEqual(mock_first_client.await_count, 2) 

563 # We should have triggered the retry logic at least once, but it 

564 # might occur multiple times depending how long the first client 

565 # takes to finish. 

566 self.assertGreaterEqual(mock_second_client.await_count, 1) 

567 

568 # TODO DM-46204: This can be removed once the RSP recommended image has 

569 # been upgraded to a version that contains DM-46129. 

570 def test_deprecated_collection_endpoints(self): 

571 # These REST endpoints are no longer used by Butler client so they need 

572 # to be checked separately until they can be removed. 

573 json = self.butler._connection.get( 

574 "collection_info", 

575 params={"name": "imported_g", "include_doc": True, "include_parents": True}, 

576 ).json() 

577 self.assertEqual(json["name"], "imported_g") 

578 self.assertEqual(json["type"], 1) 

579 

580 json = self.butler._connection.post( 

581 "query_collections", 

582 QueryCollectionsRequestModel( 

583 search=["imported_*"], collection_types=[1], flatten_chains=False, include_chains=False 

584 ), 

585 ).json() 

586 self.assertCountEqual(json["collections"], ["imported_g", "imported_r"]) 

587 

588 def test_oversized_data_coordinate_upload(self): 

589 with self.butler.query() as query: 

590 ref = self.simple_dataset_ref 

591 data_id = ref.dataId 

592 data_coordinates = [DataCoordinate.standardize(data_id, visit=x) for x in range(100_001)] 

593 with self.assertRaisesRegex(InvalidQueryError, "data coordinate rows"): 

594 list(query.join_data_coordinates(data_coordinates).datasets(ref.datasetType, ref.run)) 

595 

596 

597@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

598class ButlerClientServerAuthorizationTestCase(unittest.TestCase): 

599 """Test authentication/authorization functionality.""" 

600 

601 def test_group_authorization(self): 

602 """Test that group membership repository authorization is checked when 

603 repository is accessed. 

604 """ 

605 with create_test_server(TESTDIR) as server_instance: 

606 mock = MockGafaelfawrGroupAuthorizer() 

607 server_instance.app.dependency_overrides[authorizer_dependency] = lambda: mock 

608 server_instance.direct_butler.registry.registerDatasetType( 

609 DatasetType("bias", [], "int", universe=server_instance.direct_butler.dimensions) 

610 ) 

611 server_instance.direct_butler.collections.register("collection") 

612 butler = server_instance.remote_butler 

613 mock.set_response(False) 

614 with self.assertRaises(ButlerServerError) as e: 

615 butler.get_dataset_type("bias") 

616 self.assertEqual(e.exception.status_code, 403) 

617 with self.assertRaises(ButlerServerError) as e: 

618 butler.query_datasets("bias", collections="*", find_first=False) 

619 self.assertEqual(e.exception.status_code, 403) 

620 

621 mock.set_response(True) 

622 self.assertEqual(butler.get_dataset_type("bias").name, "bias") 

623 self.assertEqual(butler.query_datasets("bias", collections="collection", explain=False), []) 

624 

625 def test_cadc_auth(self) -> None: 

626 """Test server running in CADC auth mode.""" 

627 with mock_config() as config: 

628 config.authentication = "cadc" 

629 config.gafaelfawr_url = "DISABLED" 

630 with create_test_server(TESTDIR, server_config=config) as instance: 

631 self.assertIsInstance(instance.remote_butler._connection.auth, CadcAuthenticationProvider) 

632 

633 # Set up a dataset backed by an HTTP URL. 

634 # CADC uses a plain HTTP service, not S3, for hosting Butler 

635 # artifacts. 

636 dataset_type = DatasetType("test", [], "int", universe=instance.direct_butler.dimensions) 

637 ref = DatasetRef( 

638 datasetType=dataset_type, 

639 dataId=DataCoordinate.makeEmpty(instance.direct_butler.dimensions), 

640 run="ingest/run", 

641 ) 

642 path = ResourcePath("https://fake-server.example/some-directory/file.json") 

643 dataset = FileDataset(path, ref) 

644 # ingest() insists on doing file existence checks, and we don't 

645 # have an HTTP server to point it at. 

646 with unittest.mock.patch( 

647 "lsst.daf.butler.datastores.fileDatastore.FileDatastore._standardizeIngestPath" 

648 ) as mock: 

649 mock.return_value = path 

650 instance.direct_butler.ingest(dataset, transfer="direct", record_validation_info=False) 

651 

652 # At the CADC, paths used for file download should NOT be a 

653 # signed URL, and should have authentication headers attached. 

654 def check_path(path_to_check: ResourcePath): 

655 self.assertEqual(str(path_to_check), str(path)) 

656 assert isinstance(path_to_check, HttpResourcePath) 

657 self.assertIsNotNone(path_to_check._extra_headers) 

658 self.assertIsNotNone(path_to_check._extra_headers.get("Authorization")) 

659 

660 check_path(instance.remote_butler.getURI(ref)) 

661 transfer_map = instance.remote_butler._file_transfer_source.get_file_info_for_transfer( 

662 [ref.id] 

663 ) 

664 check_path(transfer_map[ref.id][0].location.pathInStore) 

665 

666 

667def _create_corrupted_dataset(repo: MetricTestRepo) -> DatasetRef: 

668 run = "corrupted-run" 

669 ref = repo.addDataset({"instrument": "DummyCamComp", "visit": 423}, run=run) 

670 uris = repo.butler.getURIs(ref) 

671 oneOfTheComponents = list(uris.componentURIs.values())[0] 

672 oneOfTheComponents.write("corrupted data") 

673 return ref 

674 

675 

676def _create_simple_dataset(butler: Butler) -> DatasetRef: 

677 dataset_type = addDatasetType(butler, "test_int", {"instrument", "visit"}, "int") 

678 ref = butler.put(123, dataset_type, dataId={"instrument": "DummyCamComp", "visit": 423}, run="ingest/run") 

679 return ref 

680 

681 

682def _timeout_twice(): 

683 """Return a mock side-effect function that raises a timeout error the first 

684 two times it is called. 

685 """ 

686 count = 0 

687 

688 def timeout(*args): 

689 nonlocal count 

690 count += 1 

691 if count <= 2: 

692 raise TimeoutError() 

693 return DEFAULT 

694 

695 return timeout 

696 

697 

698@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

699class QueryLimitsTestCase(unittest.IsolatedAsyncioTestCase): 

700 """Test details of the code that limits the maximum number of concurrent 

701 queries in the server. 

702 """ 

703 

704 async def test_query_limits(self): 

705 limits = lsst.daf.butler.remote_butler.server.handlers._query_limits.QueryLimits() 

706 

707 await limits.enforce_query_limits("user1") # under limit, doesn't raise 

708 async with limits.track_query("user1"): 

709 await limits.enforce_query_limits("user1") # under limit, doesn't raise 

710 async with limits.track_query("user1"): 

711 with self.assertRaises(fastapi.HTTPException) as exc: 

712 await limits.enforce_query_limits("user1") 

713 self.assertEqual(exc.exception.status_code, 429) 

714 

715 

716if __name__ == "__main__": 

717 unittest.main()