Coverage for tests / test_remote_butler.py: 53%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28import os 

29import unittest 

30from unittest.mock import patch 

31 

32from pydantic import ValidationError 

33 

34from lsst.daf.butler import Butler 

35from lsst.daf.butler._exceptions import UnknownButlerUserError 

36from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( 

37 determine_destination_for_retrieved_artifact, 

38) 

39from lsst.daf.butler.registry import RegistryConfig 

40from lsst.daf.butler.registry.tests import RegistryTests 

41from lsst.daf.butler.tests.postgresql import TemporaryPostgresInstance, setup_postgres_test_db 

42from lsst.daf.butler.tests.server_available import butler_server_import_error, butler_server_is_available 

43from lsst.resources import ResourcePath 

44 

45if butler_server_is_available: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 import httpx 

47 

48 from lsst.daf.butler.remote_butler import ButlerServerError 

49 from lsst.daf.butler.tests.server import create_test_server 

50 

51TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

52 

53 

54@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

55class RemoteButlerConfigTests(unittest.TestCase): 

56 """Test construction of RemoteButler via Butler()""" 

57 

58 def test_bad_config(self): 

59 with self.assertRaises(ValidationError): 

60 Butler({"cls": "lsst.daf.butler.remote_butler.RemoteButler", "remote_butler": {"url": "!"}}) 

61 

62 

63@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

64class RemoteButlerErrorHandlingTests(unittest.TestCase): 

65 """Test RemoteButler error handling.""" 

66 

67 def setUp(self): 

68 server_instance = self.enterContext(create_test_server(TESTDIR)) 

69 self.butler = server_instance.remote_butler 

70 self.mock = self.enterContext(patch.object(self.butler._connection._client, "send")) 

71 

72 def _mock_error_response(self, content: str) -> None: 

73 self.mock.return_value = httpx.Response( 

74 status_code=422, content=content, request=httpx.Request("GET", "/") 

75 ) 

76 

77 def test_internal_server_error(self): 

78 self.mock.side_effect = httpx.HTTPError("unhandled error") 

79 with self.assertRaises(ButlerServerError): 

80 self.butler.get_dataset_type("int") 

81 

82 def test_unknown_error_type(self): 

83 self.mock.return_value = httpx.Response( 

84 status_code=422, json={"error_type": "not a known error type", "detail": "an error happened"} 

85 ) 

86 with self.assertRaises(UnknownButlerUserError): 

87 self.butler.get_dataset_type("int") 

88 

89 def test_non_json_error(self): 

90 # Server returns a non-JSON body with an error 

91 self._mock_error_response("notvalidjson") 

92 with self.assertRaises(ButlerServerError): 

93 self.butler.get_dataset_type("int") 

94 

95 def test_malformed_error(self): 

96 # Server returns JSON, but not in the expected format. 

97 self._mock_error_response("{}") 

98 with self.assertRaises(ButlerServerError): 

99 self.butler.get_dataset_type("int") 

100 

101 

102class RemoteButlerMiscTests(unittest.TestCase): 

103 """Test miscellaneous RemoteButler functionality.""" 

104 

105 def test_retrieve_artifacts_security(self): 

106 # Make sure that the function used to determine output file paths for 

107 # retrieveArtifacts throws if a malicious server tries to escape its 

108 # destination directory. 

109 with self.assertRaisesRegex(ValueError, "^File path attempts to escape destination directory"): 

110 determine_destination_for_retrieved_artifact( 

111 ResourcePath("output_directory/"), 

112 ResourcePath("../something.txt", forceAbsolute=False), 

113 preserve_path=True, 

114 ) 

115 

116 # Make sure all paths are forced to relative paths, even if the server 

117 # sends an absolute path. 

118 self.assertEqual( 

119 determine_destination_for_retrieved_artifact( 

120 ResourcePath("/tmp/output_directory/"), 

121 ResourcePath("file:///not/relative.txt"), 

122 preserve_path=True, 

123 ), 

124 ResourcePath("/tmp/output_directory/not/relative.txt"), 

125 ) 

126 

127 # Test prefixing. 

128 self.assertEqual( 

129 determine_destination_for_retrieved_artifact( 

130 ResourcePath("/tmp/output_directory/"), 

131 ResourcePath("file:///not/relative.txt"), 

132 preserve_path=False, 

133 prefix="prefix-", 

134 ), 

135 ResourcePath("/tmp/output_directory/prefix-relative.txt"), 

136 ) 

137 

138 

139class RemoteButlerRegistryTests(RegistryTests): 

140 """Tests for RemoteButler's `Registry` shim.""" 

141 

142 postgres: TemporaryPostgresInstance | None 

143 

144 def setUp(self): 

145 self.server_instance = self.enterContext(create_test_server(TESTDIR, postgres=self.postgres)) 

146 

147 @classmethod 

148 def getDataDir(cls) -> str: 

149 return os.path.join(TESTDIR, "data", "registry") 

150 

151 def make_butler(self, registry_config: RegistryConfig | None = None) -> Butler: 

152 return self.server_instance.hybrid_butler 

153 

154 def testBasicTransaction(self): 

155 # RemoteButler will never support transactions. 

156 pass 

157 

158 def testNestedTransaction(self): 

159 # RemoteButler will never support transactions. 

160 pass 

161 

162 def testOpaque(self): 

163 # This tests an internal implementation detail that isn't exposed to 

164 # the client side. 

165 pass 

166 

167 def testCollectionChainPrependConcurrency(self): 

168 # This tests an implementation detail that requires access to the 

169 # collection manager object. 

170 pass 

171 

172 def testCollectionChainReplaceConcurrency(self): 

173 # This tests an implementation detail that requires access to the 

174 # collection manager object. 

175 pass 

176 

177 def testCollectionChainRemoveConcurrency(self): 

178 # This tests an implementation detail that requires access to the 

179 # collection manager object. 

180 pass 

181 

182 def testAttributeManager(self): 

183 # Tests a non-public API that isn't relevant on the client side. 

184 pass 

185 

186 

187@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

188class RemoteButlerSqliteRegistryTests(RemoteButlerRegistryTests, unittest.TestCase): 

189 """Tests for RemoteButler's registry shim, with a SQLite DB backing the 

190 server. 

191 """ 

192 

193 postgres = None 

194 

195 

196@unittest.skipIf(not butler_server_is_available, butler_server_import_error) 

197class RemoteButlerPostgresRegistryTests(RemoteButlerRegistryTests, unittest.TestCase): 

198 """Tests for RemoteButler's registry shim, with a Postgres DB backing the 

199 server. 

200 """ 

201 

202 @classmethod 

203 def setUpClass(cls): 

204 cls.postgres = cls.enterClassContext(setup_postgres_test_db()) 

205 super().setUpClass() 

206 

207 def testSkipCalibs(self): 

208 if self.postgres.server_major_version() < 16: 

209 # TODO DM-44875: This test currently fails for older Postgres. 

210 self.skipTest("TODO DM-44875") 

211 return super().testSkipCalibs() 

212 

213 

214if __name__ == "__main__": 

215 unittest.main()