Coverage for tests / test_server.py: 14%
362 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:48 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28import asyncio
29import os.path
30import tempfile
31import threading
32import unittest
33import unittest.mock
34import uuid
35from concurrent.futures import ThreadPoolExecutor
36from unittest.mock import DEFAULT, AsyncMock, NonCallableMock, patch
38from lsst.daf.butler import (
39 Butler,
40 DataCoordinate,
41 DatasetId,
42 DatasetNotFoundError,
43 DatasetRef,
44 DatasetType,
45 FileDataset,
46 InvalidQueryError,
47 LabeledButlerFactory,
48 MissingDatasetTypeError,
49 NoDefaultCollectionError,
50 StorageClassFactory,
51)
52from lsst.daf.butler.datastore import DatasetRefURIs
53from lsst.daf.butler.registry import RegistryDefaults
54from lsst.daf.butler.tests import DatastoreMock, addDatasetType
55from lsst.daf.butler.tests.dict_convertible_model import DictConvertibleModel
56from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available
57from lsst.daf.butler.tests.utils import MetricsExample, MetricTestRepo, mock_env
58from lsst.resources import ResourcePath
59from lsst.resources.http import HttpResourcePath
61if butler_server_is_available: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 import fastapi
63 import httpx
64 import safir.dependencies.logger
65 from fastapi.testclient import TestClient
67 import lsst.daf.butler.remote_butler._query_results
68 import lsst.daf.butler.remote_butler.server.handlers._query_limits
69 import lsst.daf.butler.remote_butler.server.handlers._query_streaming
70 from lsst.daf.butler.remote_butler import ButlerServerError, RemoteButler
71 from lsst.daf.butler.remote_butler.authentication.cadc import CadcAuthenticationProvider
72 from lsst.daf.butler.remote_butler.authentication.rubin import (
73 _EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY,
74 RubinAuthenticationProvider,
75 )
76 from lsst.daf.butler.remote_butler.server import create_app
77 from lsst.daf.butler.remote_butler.server._config import mock_config
78 from lsst.daf.butler.remote_butler.server._dependencies import (
79 authorizer_dependency,
80 butler_factory_dependency,
81 )
82 from lsst.daf.butler.remote_butler.server._gafaelfawr import MockGafaelfawrGroupAuthorizer
83 from lsst.daf.butler.remote_butler.server.handlers._utils import generate_file_download_uri
84 from lsst.daf.butler.remote_butler.server_models import QueryCollectionsRequestModel
85 from lsst.daf.butler.tests.server import TEST_REPOSITORY_NAME, UnhandledServerError, create_test_server
88TESTDIR = os.path.abspath(os.path.dirname(__file__))
91@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
92class ButlerClientServerTestCase(unittest.TestCase):
93 """Test for Butler client/server."""
95 @classmethod
96 def setUpClass(cls):
97 server_instance = cls.enterClassContext(create_test_server(TESTDIR))
98 cls.server_instance = server_instance
99 cls.client = server_instance.client
100 cls.butler = server_instance.remote_butler
101 cls.butler_without_error_propagation = server_instance.remote_butler_without_error_propagation
103 cls.storageClassFactory = StorageClassFactory()
105 cls.repo = MetricTestRepo.create_from_butler(
106 server_instance.direct_butler, server_instance.config_file_path
107 )
108 # Add a file with corrupted data for testing error conditions
109 cls.dataset_with_corrupted_data = _create_corrupted_dataset(cls.repo)
110 # All of the datasets that come with MetricTestRepo are disassembled
111 # composites. Add a simple dataset for testing the common case.
112 cls.simple_dataset_ref = _create_simple_dataset(server_instance.direct_butler)
114 # Populate the test server.
115 # The DatastoreMock is required because the datasets referenced in
116 # these imports do not point at real files.
117 direct_butler = server_instance.direct_butler
118 DatastoreMock.apply(direct_butler)
119 direct_butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/base.yaml")
120 direct_butler.import_(filename="resource://lsst.daf.butler/tests/registry_data/datasets.yaml")
122 def test_health_check(self):
123 try:
124 import importlib.metadata
126 importlib.metadata.metadata("lsst.daf.butler")
127 except ModuleNotFoundError:
128 raise self.skipTest("Standard python package metadata not available. Butler not pip installed.")
129 response = self.client.get("/")
130 self.assertEqual(response.status_code, 200)
131 self.assertEqual(response.json()["name"], "butler")
133 def test_static_files(self):
134 with tempfile.TemporaryDirectory() as tmpdir:
135 with open(os.path.join(tmpdir, "temp.txt"), "w") as fh:
136 fh.write("test data 123")
138 with mock_config() as server_config:
139 server_config.static_files_path = tmpdir
140 with create_test_server(TESTDIR, server_config=server_config) as server:
141 response = server.client.get("/api/butler/configs/temp.txt")
142 self.assertEqual(response.status_code, 200)
143 self.assertEqual(response.text, "test data 123")
145 def test_dimension_universe(self):
146 universe = self.butler.dimensions
147 self.assertEqual(universe.namespace, "daf_butler")
149 def test_get_dataset_type(self):
150 bias_type = self.butler.get_dataset_type("bias")
151 self.assertEqual(bias_type.name, "bias")
153 with self.assertRaises(MissingDatasetTypeError):
154 self.butler_without_error_propagation.get_dataset_type("not_bias")
156 def test_find_dataset(self):
157 storage_class = self.storageClassFactory.getStorageClass("Exposure")
159 ref = self.butler.find_dataset("bias", collections="imported_g", detector=1, instrument="Cam1")
160 self.assertIsInstance(ref, DatasetRef)
161 self.assertEqual(ref.id, uuid.UUID("e15ab039-bc8b-4135-87c5-90902a7c0b22"))
162 self.assertFalse(ref.dataId.hasRecords())
164 # Try again with variation of parameters.
165 ref_new = self.butler.find_dataset(
166 "bias",
167 {"detector": 1},
168 collections="imported_g",
169 instrument="Cam1",
170 dimension_records=True,
171 )
172 self.assertEqual(ref_new, ref)
173 self.assertTrue(ref_new.dataId.hasRecords())
175 ref_new = self.butler.find_dataset(
176 ref.datasetType,
177 DataCoordinate.standardize(detector=1, instrument="Cam1", universe=self.butler.dimensions),
178 collections="imported_g",
179 storage_class=storage_class,
180 )
181 self.assertEqual(ref_new, ref)
183 ref2 = self.butler.get_dataset(ref.id)
184 self.assertEqual(ref2, ref)
186 # Use detector name to find it.
187 ref3 = self.butler.find_dataset(
188 ref.datasetType,
189 collections="imported_g",
190 instrument="Cam1",
191 full_name="Aa",
192 )
193 self.assertEqual(ref2, ref3)
195 # Try expanded refs.
196 self.assertFalse(ref.dataId.hasRecords())
197 expanded = self.butler.get_dataset(ref.id, dimension_records=True)
198 self.assertTrue(expanded.dataId.hasRecords())
200 # The test datasets are all Exposure so storage class conversion
201 # can not be tested until we fix that. For now at least test the
202 # code paths.
203 bias = self.butler.get_dataset(ref.id, storage_class=storage_class)
204 self.assertEqual(bias.datasetType.storageClass, storage_class)
206 # Unknown dataset should not fail.
207 self.assertIsNone(self.butler.get_dataset(uuid.uuid4()))
208 self.assertIsNone(self.butler.get_dataset(uuid.uuid4(), storage_class="NumpyArray"))
210 def test_instantiate_via_butler_http_search(self):
211 """Ensure that the primary Butler constructor's automatic search logic
212 correctly locates and reads the configuration file and ends up with a
213 RemoteButler pointing to the correct URL
214 """
216 # This is kind of a fragile test. Butler's search logic does a lot of
217 # manipulations involving creating new ResourcePaths, and ResourcePath
218 # doesn't use httpx so we can't easily inject the TestClient in there.
219 # We don't have an actual valid HTTP URL to give to the constructor
220 # because the test instance of the server is accessed via ASGI.
221 #
222 # Instead we just monkeypatch the HTTPResourcePath 'read' method and
223 # hope that all ResourcePath HTTP reads during construction are going
224 # to the server under test.
225 def override_read(http_resource_path):
226 return self.client.get(http_resource_path.geturl()).content
228 server_url = f"https://test.example/api/butler/repo/{TEST_REPOSITORY_NAME}/"
230 with patch.object(HttpResourcePath, "read", override_read):
231 # RegistryDefaults.finish() needs to download the dimension
232 # universe from the server, which will fail because there is no
233 # server here. So mock it out.
234 with patch.object(RegistryDefaults, "finish"):
235 # Add access key to environment variables. RemoteButler
236 # instantiation will throw an error if access key is not
237 # available.
238 with mock_env({_EXPLICIT_BUTLER_ACCESS_TOKEN_ENVIRONMENT_KEY: "fake-access-token"}):
239 butler = Butler(
240 server_url,
241 collections=["collection1", "collection2"],
242 run="collection2",
243 )
244 self.enterContext(butler)
245 self.assertIsInstance(butler, RemoteButler)
246 self.assertEqual(butler._connection.server_url, server_url)
247 self.assertEqual(butler.collections.defaults, ("collection1", "collection2"))
248 self.assertEqual(butler.run, "collection2")
249 # A butler created this way uses the default cache config.
250 self.assertFalse(butler._use_disabled_datastore_cache)
252 butler_factory = LabeledButlerFactory({"server": server_url})
253 factory_created_butler = butler_factory.create_butler(label="server", access_token="token")
254 self.assertIsInstance(factory_created_butler, RemoteButler)
255 self.assertTrue(factory_created_butler._use_disabled_datastore_cache)
256 self.assertEqual(factory_created_butler._connection.server_url, server_url)
258 def test_get(self):
259 dataset_type = "test_metric_comp"
260 data_id = {"instrument": "DummyCamComp", "visit": 423}
261 collections = "ingest/run"
262 # Test get() of a DatasetRef.
263 ref = self.butler.find_dataset(dataset_type, data_id, collections=collections)
264 metric = self.butler.get(ref)
265 self.assertIsInstance(metric, MetricsExample)
266 self.assertEqual(metric.summary, MetricTestRepo.METRICS_EXAMPLE_SUMMARY)
268 # Test get() by DataId.
269 data_id_metric = self.butler.get(dataset_type, dataId=data_id, collections=collections)
270 self.assertEqual(metric, data_id_metric)
271 # Test get() by DataId dict augmented with kwargs.
272 kwarg_metric = self.butler.get(
273 dataset_type, dataId={"instrument": "DummyCamComp"}, collections=collections, visit=423
274 )
275 self.assertEqual(metric, kwarg_metric)
276 # Test get() by DataId DataCoordinate augmented with kwargs.
277 coordinate = DataCoordinate.make_empty(self.butler.dimensions)
278 kwarg_data_coordinate_metric = self.butler.get(
279 dataset_type, dataId=coordinate, collections=collections, instrument="DummyCamComp", visit=423
280 )
281 self.assertEqual(metric, kwarg_data_coordinate_metric)
282 # Test get() of a non-existent DataId.
283 invalid_data_id = {"instrument": "NotAValidlInstrument", "visit": 423}
284 with self.assertRaises(DatasetNotFoundError):
285 self.butler_without_error_propagation.get(
286 dataset_type, dataId=invalid_data_id, collections=collections
287 )
289 # Test get() by DataId with default collections.
290 butler_with_default_collection = self.butler.clone(collections="ingest/run")
291 default_collection_metric = butler_with_default_collection.get(dataset_type, dataId=data_id)
292 self.assertEqual(metric, default_collection_metric)
294 # Test get() by DataId with no collections specified.
295 with self.assertRaises(NoDefaultCollectionError):
296 self.butler_without_error_propagation.get(dataset_type, dataId=data_id)
298 # Test looking up a non-existent ref
299 invalid_ref = ref.replace(id=uuid.uuid4())
300 with self.assertRaises(DatasetNotFoundError):
301 self.butler_without_error_propagation.get(invalid_ref)
303 with self.assertRaises(RuntimeError):
304 self.butler_without_error_propagation.get(self.dataset_with_corrupted_data)
306 # Test storage class override
307 new_sc = self.storageClassFactory.getStorageClass("MetricsConversion")
309 def check_sc_override(converted):
310 self.assertNotEqual(type(metric), type(converted))
311 self.assertIsInstance(converted, new_sc.pytype)
312 self.assertEqual(metric, converted)
314 check_sc_override(self.butler.get(ref, storageClass=new_sc))
316 # Test storage class override via DatasetRef.
317 check_sc_override(self.butler.get(ref.overrideStorageClass("MetricsConversion")))
318 # Test storage class override via DatasetType.
319 check_sc_override(
320 self.butler.get(
321 ref.datasetType.overrideStorageClass(new_sc), dataId=data_id, collections=collections
322 )
323 )
325 # Test component override via DatasetRef.
326 component_ref = ref.makeComponentRef("summary")
327 component_data = self.butler.get(component_ref)
328 self.assertEqual(component_data, MetricTestRepo.METRICS_EXAMPLE_SUMMARY)
330 # Test overriding both storage class and component via DatasetRef.
331 converted_component_data = self.butler.get(component_ref, storageClass="DictConvertibleModel")
332 self.assertIsInstance(converted_component_data, DictConvertibleModel)
333 self.assertEqual(converted_component_data.content, MetricTestRepo.METRICS_EXAMPLE_SUMMARY)
335 # Test component override via DatasetType.
336 dataset_type_component_data = self.butler.get(
337 component_ref.datasetType, component_ref.dataId, collections=collections
338 )
339 self.assertEqual(dataset_type_component_data, MetricTestRepo.METRICS_EXAMPLE_SUMMARY)
341 def test_getURIs_no_components(self):
342 # This dataset does not have components, and should return one URI.
343 def check_uri(uri: ResourcePath):
344 self.assertIsNotNone(uris.primaryURI)
345 self.assertEqual(uris.primaryURI.scheme, "https")
346 self.assertEqual(uris.primaryURI.read(), b"123")
348 uris = self.butler.getURIs(self.simple_dataset_ref)
349 self.assertEqual(len(uris.componentURIs), 0)
350 check_uri(uris.primaryURI)
352 check_uri(self.butler.getURI(self.simple_dataset_ref))
354 def test_getURIs_multiple_components(self):
355 # This dataset has multiple components, so we should get back multiple
356 # URIs.
357 dataset_type = "test_metric_comp"
358 data_id = {"instrument": "DummyCamComp", "visit": 423}
359 collections = "ingest/run"
361 def check_uris(uris: DatasetRefURIs):
362 self.assertIsNone(uris.primaryURI)
363 self.assertEqual(len(uris.componentURIs), 3)
364 path = uris.componentURIs["summary"]
365 self.assertEqual(path.scheme, "https")
366 data = path.read()
367 self.assertEqual(data, b"AM1: 5.2\nAM2: 30.6\n")
369 uris = self.butler.getURIs(dataset_type, dataId=data_id, collections=collections)
370 check_uris(uris)
372 # Calling getURI on a multi-file dataset raises an exception
373 with self.assertRaises(RuntimeError):
374 self.butler.getURI(dataset_type, dataId=data_id, collections=collections)
376 # getURIs does NOT respect component overrides on the DatasetRef,
377 # instead returning the parent's URIs. Unclear if this is "correct"
378 # from a conceptual point of view, but this matches DirectButler
379 # behavior.
380 ref = self.butler.find_dataset(dataset_type, data_id=data_id, collections=collections)
381 componentRef = ref.makeComponentRef("summary")
382 componentUris = self.butler.getURIs(componentRef)
383 check_uris(componentUris)
385 def test_file_download_redirect(self):
386 def get_download_redirect(id: DatasetId, component: str | None = None) -> httpx.Response:
387 uri = generate_file_download_uri("http://unittest.test/", TEST_REPOSITORY_NAME, id, component)
388 return self.client.get(
389 uri,
390 follow_redirects=False,
391 headers=RubinAuthenticationProvider("mock-token").get_server_headers(),
392 )
394 # Test behavior of a single-file dataset.
395 response = get_download_redirect(self.simple_dataset_ref.id)
396 self.assertEqual(response.status_code, 307)
397 self.assertTrue(response.has_redirect_location)
398 assert response.next_request is not None
399 self.assertEqual(response.next_request.url.scheme, "https")
400 self.assertIn("test_int_DummyCamComp_R_d-r_423_ingest_run.json", response.next_request.url.path)
402 response = get_download_redirect(self.simple_dataset_ref.id, "somecomponent")
403 self.assertEqual(response.status_code, 404)
405 # This dataset is a "disassembled composite" with multiple files.
406 dataset_type = "test_metric_comp"
407 data_id = {"instrument": "DummyCamComp", "visit": 423}
408 collections = "ingest/run"
409 ref = self.butler.find_dataset(dataset_type, data_id, collections=collections)
411 # Getting single component of a multi-file "disassembled composite".
412 response = get_download_redirect(ref.id, "summary")
413 self.assertEqual(response.status_code, 307)
414 self.assertTrue(response.has_redirect_location)
415 assert response.next_request is not None
416 self.assertEqual(response.next_request.url.scheme, "https")
417 self.assertIn("test_metric_comp.summary", response.next_request.url.path)
419 # Unknown component.
420 response = get_download_redirect(ref.id, "badcomponent")
421 self.assertEqual(response.status_code, 404)
423 # Not specifying the component for a multi-file "disassembled
424 # composite".
425 response = get_download_redirect(ref.id, None)
426 self.assertEqual(response.status_code, 422)
428 # Unknown dataset.
429 response = get_download_redirect(uuid.UUID("59467c1b-fa13-4f7a-8ff8-cd83e092e563"))
430 self.assertEqual(response.status_code, 404)
432 def test_auth_check(self):
433 # This is checking that the unit-test middleware for validating the
434 # authentication headers is working. It doesn't test actual server
435 # functionality -- in a real deployment, the authentication headers are
436 # handled by GafaelfawrIngress, not our app.
437 with self.assertRaises(UnhandledServerError):
438 self.client.get("/v1/dataset_type/int")
440 def test_exception_logging(self):
441 app = create_app()
443 def raise_error():
444 raise RuntimeError("An unhandled error")
446 app.dependency_overrides[butler_factory_dependency] = raise_error
447 client = TestClient(app, raise_server_exceptions=False)
449 with patch.object(safir.dependencies.logger, "logger_dependency") as mock_logger_dep:
450 mock_logger = NonCallableMock(["aerror"])
452 async def noop():
453 pass
455 mock_logger.aerror.return_value = noop()
457 async def get_logger():
458 return mock_logger
460 mock_logger_dep.return_value = get_logger()
461 client.get(
462 "/api/butler/repo/something/v1/dataset_type/int",
463 headers={"X-Auth-Request-User": "user-name", "X-Butler-Client-Request-Id": "request-id"},
464 )
465 mock_logger_dep.assert_called_once()
467 mock_logger.aerror.assert_called_once()
468 args, kwargs = mock_logger.aerror.call_args
469 self.assertIsInstance(kwargs["exc_info"], RuntimeError)
470 self.assertEqual(kwargs["clientRequestId"], "request-id")
471 self.assertEqual(kwargs["user"], "user-name")
473 def test_query_keepalive(self):
474 """Test that long-running queries stream keep-alive messages to stop
475 the HTTP connection from closing before they are able to return
476 results.
477 """
478 # Normally it takes 15 seconds for a timeout -- mock it to trigger
479 # immediately instead.
480 with patch.object(
481 lsst.daf.butler.remote_butler.server.handlers._query_streaming, "_timeout"
482 ) as mock_timeout:
483 # Hook into QueryDriver to track the number of keep-alives we have
484 # seen.
485 with patch.object(
486 lsst.daf.butler.remote_butler._query_results, "_received_keep_alive"
487 ) as mock_keep_alive:
488 mock_timeout.side_effect = _timeout_twice()
489 with self.butler.query() as query:
490 datasets = list(query.datasets("bias", "imported_g"))
491 self.assertEqual(len(datasets), 3)
492 self.assertGreaterEqual(mock_timeout.call_count, 3)
493 self.assertGreaterEqual(mock_keep_alive.call_count, 2)
495 def test_query_retries(self):
496 """Test that the server will send HTTP status 503 to put backpressure
497 on clients if it is overloaded, and that the client will retry if this
498 happens.
499 """
500 query_event = threading.Event()
501 retry_event = asyncio.Event()
503 async def block_first_request() -> None:
504 # Signal the unit tests that we have reached the critical section
505 # in the server, where the first client has reserved the query
506 # slot.
507 query_event.set()
508 # Block inside the query, until the 2nd client has been forced to
509 # retry.
510 await retry_event.wait()
512 async def block_second_request() -> None:
513 # Release the first client, so it can finish its query and prevent
514 # this client from being blocked on the next go-round.
515 retry_event.set()
517 def do_query(butler: Butler) -> list[DatasetRef]:
518 return butler.query_datasets("bias", "imported_g")
520 with (
521 patch.object(
522 lsst.daf.butler.remote_butler.server.handlers._query_limits,
523 "_MAXIMUM_CONCURRENT_STREAMING_QUERIES",
524 new=1,
525 ),
526 patch.object(
527 lsst.daf.butler.remote_butler.server.handlers._query_limits, "_QUERY_RETRY_SECONDS", new=1
528 ),
529 patch.object(
530 lsst.daf.butler.remote_butler.server.handlers._query_limits,
531 "_block_query_for_unit_test",
532 new=AsyncMock(wraps=block_first_request),
533 ) as mock_first_client,
534 patch.object(
535 lsst.daf.butler.remote_butler.server.handlers._query_limits,
536 "_block_retry_for_unit_test",
537 new=AsyncMock(wraps=block_second_request),
538 ) as mock_second_client,
539 ThreadPoolExecutor(max_workers=1) as exec1,
540 ThreadPoolExecutor(max_workers=1) as exec2,
541 ):
542 first_butler = self.butler
543 second_butler = self.butler.clone()
545 # Run the first client up until the server starts executing its
546 # query.
547 future1 = exec1.submit(do_query, first_butler)
548 event_reached = query_event.wait(60)
549 if not event_reached:
550 raise TimeoutError("Server did not execute query logic as expected.")
552 # Start the second client, which will trigger the retry logic and
553 # release the first client to finish its query.
554 future2 = exec2.submit(do_query, second_butler)
556 result1 = future1.result(60)
557 result2 = future2.result(60)
558 self.assertEqual(len(result1), 3)
559 self.assertEqual(len(result2), 3)
560 # The original thread should have gone through this section, and
561 # then the 2nd thread after it retries.
562 self.assertEqual(mock_first_client.await_count, 2)
563 # We should have triggered the retry logic at least once, but it
564 # might occur multiple times depending how long the first client
565 # takes to finish.
566 self.assertGreaterEqual(mock_second_client.await_count, 1)
568 # TODO DM-46204: This can be removed once the RSP recommended image has
569 # been upgraded to a version that contains DM-46129.
570 def test_deprecated_collection_endpoints(self):
571 # These REST endpoints are no longer used by Butler client so they need
572 # to be checked separately until they can be removed.
573 json = self.butler._connection.get(
574 "collection_info",
575 params={"name": "imported_g", "include_doc": True, "include_parents": True},
576 ).json()
577 self.assertEqual(json["name"], "imported_g")
578 self.assertEqual(json["type"], 1)
580 json = self.butler._connection.post(
581 "query_collections",
582 QueryCollectionsRequestModel(
583 search=["imported_*"], collection_types=[1], flatten_chains=False, include_chains=False
584 ),
585 ).json()
586 self.assertCountEqual(json["collections"], ["imported_g", "imported_r"])
588 def test_oversized_data_coordinate_upload(self):
589 with self.butler.query() as query:
590 ref = self.simple_dataset_ref
591 data_id = ref.dataId
592 data_coordinates = [DataCoordinate.standardize(data_id, visit=x) for x in range(100_001)]
593 with self.assertRaisesRegex(InvalidQueryError, "data coordinate rows"):
594 list(query.join_data_coordinates(data_coordinates).datasets(ref.datasetType, ref.run))
597@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
598class ButlerClientServerAuthorizationTestCase(unittest.TestCase):
599 """Test authentication/authorization functionality."""
601 def test_group_authorization(self):
602 """Test that group membership repository authorization is checked when
603 repository is accessed.
604 """
605 with create_test_server(TESTDIR) as server_instance:
606 mock = MockGafaelfawrGroupAuthorizer()
607 server_instance.app.dependency_overrides[authorizer_dependency] = lambda: mock
608 server_instance.direct_butler.registry.registerDatasetType(
609 DatasetType("bias", [], "int", universe=server_instance.direct_butler.dimensions)
610 )
611 server_instance.direct_butler.collections.register("collection")
612 butler = server_instance.remote_butler
613 mock.set_response(False)
614 with self.assertRaises(ButlerServerError) as e:
615 butler.get_dataset_type("bias")
616 self.assertEqual(e.exception.status_code, 403)
617 with self.assertRaises(ButlerServerError) as e:
618 butler.query_datasets("bias", collections="*", find_first=False)
619 self.assertEqual(e.exception.status_code, 403)
621 mock.set_response(True)
622 self.assertEqual(butler.get_dataset_type("bias").name, "bias")
623 self.assertEqual(butler.query_datasets("bias", collections="collection", explain=False), [])
625 def test_cadc_auth(self) -> None:
626 """Test server running in CADC auth mode."""
627 with mock_config() as config:
628 config.authentication = "cadc"
629 config.gafaelfawr_url = "DISABLED"
630 with create_test_server(TESTDIR, server_config=config) as instance:
631 self.assertIsInstance(instance.remote_butler._connection.auth, CadcAuthenticationProvider)
633 # Set up a dataset backed by an HTTP URL.
634 # CADC uses a plain HTTP service, not S3, for hosting Butler
635 # artifacts.
636 dataset_type = DatasetType("test", [], "int", universe=instance.direct_butler.dimensions)
637 ref = DatasetRef(
638 datasetType=dataset_type,
639 dataId=DataCoordinate.makeEmpty(instance.direct_butler.dimensions),
640 run="ingest/run",
641 )
642 path = ResourcePath("https://fake-server.example/some-directory/file.json")
643 dataset = FileDataset(path, ref)
644 # ingest() insists on doing file existence checks, and we don't
645 # have an HTTP server to point it at.
646 with unittest.mock.patch(
647 "lsst.daf.butler.datastores.fileDatastore.FileDatastore._standardizeIngestPath"
648 ) as mock:
649 mock.return_value = path
650 instance.direct_butler.ingest(dataset, transfer="direct", record_validation_info=False)
652 # At the CADC, paths used for file download should NOT be a
653 # signed URL, and should have authentication headers attached.
654 def check_path(path_to_check: ResourcePath):
655 self.assertEqual(str(path_to_check), str(path))
656 assert isinstance(path_to_check, HttpResourcePath)
657 self.assertIsNotNone(path_to_check._extra_headers)
658 self.assertIsNotNone(path_to_check._extra_headers.get("Authorization"))
660 check_path(instance.remote_butler.getURI(ref))
661 transfer_map = instance.remote_butler._file_transfer_source.get_file_info_for_transfer(
662 [ref.id]
663 )
664 check_path(transfer_map[ref.id][0].location.pathInStore)
667def _create_corrupted_dataset(repo: MetricTestRepo) -> DatasetRef:
668 run = "corrupted-run"
669 ref = repo.addDataset({"instrument": "DummyCamComp", "visit": 423}, run=run)
670 uris = repo.butler.getURIs(ref)
671 oneOfTheComponents = list(uris.componentURIs.values())[0]
672 oneOfTheComponents.write("corrupted data")
673 return ref
676def _create_simple_dataset(butler: Butler) -> DatasetRef:
677 dataset_type = addDatasetType(butler, "test_int", {"instrument", "visit"}, "int")
678 ref = butler.put(123, dataset_type, dataId={"instrument": "DummyCamComp", "visit": 423}, run="ingest/run")
679 return ref
682def _timeout_twice():
683 """Return a mock side-effect function that raises a timeout error the first
684 two times it is called.
685 """
686 count = 0
688 def timeout(*args):
689 nonlocal count
690 count += 1
691 if count <= 2:
692 raise TimeoutError()
693 return DEFAULT
695 return timeout
698@unittest.skipIf(not butler_server_is_available, butler_server_import_error)
699class QueryLimitsTestCase(unittest.IsolatedAsyncioTestCase):
700 """Test details of the code that limits the maximum number of concurrent
701 queries in the server.
702 """
704 async def test_query_limits(self):
705 limits = lsst.daf.butler.remote_butler.server.handlers._query_limits.QueryLimits()
707 await limits.enforce_query_limits("user1") # under limit, doesn't raise
708 async with limits.track_query("user1"):
709 await limits.enforce_query_limits("user1") # under limit, doesn't raise
710 async with limits.track_query("user1"):
711 with self.assertRaises(fastapi.HTTPException) as exc:
712 await limits.enforce_query_limits("user1")
713 self.assertEqual(exc.exception.status_code, 429)
716if __name__ == "__main__":
717 unittest.main()