Coverage for tests / test_execution_storage_class_conversion.py: 24%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import os 

31import shutil 

32import tempfile 

33import unittest 

34 

35import lsst.daf.butler 

36import lsst.utils.tests 

37from lsst.pipe.base import PipelineGraph 

38from lsst.pipe.base.pipeline_graph import IncompatibleDatasetTypeError 

39from lsst.pipe.base.simple_pipeline_executor import SimplePipelineExecutor 

40from lsst.pipe.base.tests.mocks import ( 

41 DynamicConnectionConfig, 

42 DynamicTestPipelineTask, 

43 DynamicTestPipelineTaskConfig, 

44 MockStorageClass, 

45 get_mock_name, 

46) 

47 

48TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

49 

50 

51class TestExecutionStorageClassConversion(lsst.utils.tests.TestCase): 

52 """Test storage class conversions during execution. 

53 

54 Task connection declarations should always define which storage class they 

55 see, while data repository registrations should always define what is 

56 stored. 

57 

58 This test uses mock storage classes for intermediate and output datasets, 

59 which let us load the dataset to see what storage class the task saw when 

60 it was running. These storage class names need to be wrapped in 

61 get_mock_name calls to get what the butler actually sees. Overall input 

62 datasets are not declared with mock datasets, so we can `put` them directly 

63 in test code. 

64 """ 

65 

66 def setUp(self): 

67 self.path = tempfile.mkdtemp() 

68 # standalone parameter forces the returned config to also include 

69 # the information from the search paths. 

70 config = lsst.daf.butler.Butler.makeRepo( 

71 self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")] 

72 ) 

73 self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake") 

74 self.enterContext(self.butler) 

75 self.butler.registry.registerDatasetType( 

76 lsst.daf.butler.DatasetType( 

77 "input", 

78 dimensions=self.butler.dimensions.empty, 

79 storageClass="StructuredDataDict", 

80 ) 

81 ) 

82 self.butler.put({"zero": 0}, "input") 

83 MockStorageClass.get_or_register_mock("StructuredDataDict") 

84 MockStorageClass.get_or_register_mock("TaskMetadataLike") 

85 

86 def tearDown(self): 

87 shutil.rmtree(self.path, ignore_errors=True) 

88 

89 def _make_config( 

90 self, 

91 input_storage_class="StructuredDataDict", 

92 output_storage_class="StructuredDataDict", 

93 input_name="input", 

94 output_name="output", 

95 ): 

96 """Create configuration for a test task with a single input and single 

97 output of the given storage classes and dataset type names. 

98 """ 

99 config = DynamicTestPipelineTaskConfig() 

100 config.inputs["i"] = DynamicConnectionConfig( 

101 dataset_type_name=input_name, 

102 storage_class=input_storage_class, 

103 # Since the overall input is special, we only use a mock storage 

104 # class for it when there's a storage class conversion. 

105 mock_storage_class=(input_name != "input" or (input_storage_class != "StructuredDataDict")), 

106 ) 

107 config.outputs["o"] = DynamicConnectionConfig( 

108 dataset_type_name=output_name, storage_class=output_storage_class 

109 ) 

110 return config 

111 

112 def _make_executor( 

113 self, 

114 a_i_storage_class="StructuredDataDict", 

115 a_o_storage_class="StructuredDataDict", 

116 b_i_storage_class="StructuredDataDict", 

117 b_o_storage_class="StructuredDataDict", 

118 ): 

119 """Configure a SimplePipelineExecutor with tasks with the given 

120 storage classes as inputs and outputs. 

121 

122 This sets up a simple pipeline with two tasks ('a' and 'b') where the 

123 second task's only input is the first task's only output. 

124 """ 

125 config_a = self._make_config(a_i_storage_class, a_o_storage_class, output_name="intermediate") 

126 config_b = self._make_config(b_i_storage_class, b_o_storage_class, input_name="intermediate") 

127 pipeline_graph = PipelineGraph() 

128 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

129 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

130 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) 

131 return executor 

132 

133 def _assert_datasets( 

134 self, 

135 a_i_storage_class="StructuredDataDict", 

136 a_o_storage_class="StructuredDataDict", 

137 b_i_storage_class="StructuredDataDict", 

138 b_o_storage_class="StructuredDataDict", 

139 stored_intermediate_storage_class="StructuredDataDict", 

140 stored_output_storage_class="StructuredDataDict", 

141 butler: lsst.daf.butler.Butler | None = None, 

142 ): 

143 """Check that a butler repository's contents are consistent with 

144 running a pipeline created by _make_executor. 

145 """ 

146 if butler is None: 

147 butler = self.butler 

148 # Read input and output datasets from butler, inspect their storage 

149 # classes directly. 

150 stored_intermediate = butler.get("intermediate") 

151 stored_output = butler.get("output") 

152 self.assertEqual( 

153 butler.get_dataset_type("intermediate").storageClass_name, 

154 get_mock_name(stored_intermediate_storage_class), 

155 ) 

156 self.assertEqual(stored_output.storage_class, get_mock_name(stored_output_storage_class)) 

157 self.assertEqual( 

158 butler.get_dataset_type("output").storageClass_name, 

159 get_mock_name(stored_output_storage_class), 

160 ) 

161 # Since we didn't tell the butler to convert storage classes on read, 

162 # they'll remember their last conversion (on write). 

163 if a_o_storage_class != stored_intermediate_storage_class: 

164 self.assertEqual( 

165 stored_intermediate.converted_from.storage_class, 

166 get_mock_name(a_o_storage_class), 

167 ) 

168 else: 

169 self.assertIsNone(stored_intermediate.converted_from) 

170 if b_o_storage_class != stored_output_storage_class: 

171 self.assertEqual( 

172 stored_output.converted_from.storage_class, 

173 get_mock_name(b_o_storage_class), 

174 ) 

175 else: 

176 self.assertIsNone(stored_output.converted_from) 

177 # Extract the inputs as seen by the tasks from those stored outputs. 

178 quantum_a = stored_intermediate.quantum 

179 quantum_b = stored_output.quantum 

180 b_input = quantum_b.inputs["i"][0] 

181 a_input = quantum_a.inputs["i"][0] 

182 if a_i_storage_class == "StructuredDataDict": 

183 self.assertIsNone(a_input.converted_from, None) 

184 else: 

185 self.assertEqual(a_input.original_type, "dict") 

186 self.assertEqual(b_input.storage_class, get_mock_name(b_i_storage_class)) 

187 

188 def test_no_conversions(self): 

189 """Test execution with no storage class conversions as a baseline.""" 

190 executor = self._make_executor() 

191 quanta = executor.run(register_dataset_types=True, save_versions=False) 

192 self.assertEqual(len(quanta), 2) 

193 self._assert_datasets() 

194 

195 def test_intermediate_registration_differs(self): 

196 """Test execution where an intermediate is registered to be different 

197 from both the producing and consuming task. 

198 """ 

199 self.butler.registry.registerDatasetType( 

200 lsst.daf.butler.DatasetType( 

201 "intermediate", 

202 dimensions=self.butler.dimensions.empty, 

203 storageClass=get_mock_name("TaskMetadataLike"), 

204 ) 

205 ) 

206 executor = self._make_executor() 

207 quanta = executor.run(register_dataset_types=True, save_versions=False) 

208 self.assertEqual(len(quanta), 2) 

209 self._assert_datasets(stored_intermediate_storage_class="TaskMetadataLike") 

210 

211 def test_intermediate_producer_differs(self): 

212 """Test execution where an intermediate is registered to be consistent 

213 with the consumer but different from its producer. 

214 """ 

215 self.butler.registry.registerDatasetType( 

216 lsst.daf.butler.DatasetType( 

217 "intermediate", 

218 dimensions=self.butler.dimensions.empty, 

219 storageClass=get_mock_name("TaskMetadataLike"), 

220 ) 

221 ) 

222 executor = self._make_executor(b_i_storage_class="TaskMetadataLike") 

223 quanta = executor.run(register_dataset_types=True, save_versions=False) 

224 self.assertEqual(len(quanta), 2) 

225 self._assert_datasets( 

226 stored_intermediate_storage_class="TaskMetadataLike", b_i_storage_class="TaskMetadataLike" 

227 ) 

228 

229 def test_intermediate_consumer_differs(self): 

230 """Test execution where an intermediate is registered to be consistent 

231 with its producer but different from its consumer. 

232 """ 

233 executor = self._make_executor(a_o_storage_class="TaskMetadataLike") 

234 quanta = executor.run(register_dataset_types=True, save_versions=False) 

235 self.assertEqual(len(quanta), 2) 

236 self._assert_datasets( 

237 stored_intermediate_storage_class="TaskMetadataLike", a_o_storage_class="TaskMetadataLike" 

238 ) 

239 

240 def test_output_differs(self): 

241 """Test execution where an overall output is registered to be 

242 different from the producing task. 

243 """ 

244 self.butler.registry.registerDatasetType( 

245 lsst.daf.butler.DatasetType( 

246 "output", 

247 dimensions=self.butler.dimensions.empty, 

248 storageClass=get_mock_name("TaskMetadataLike"), 

249 ) 

250 ) 

251 executor = self._make_executor() 

252 quanta = executor.run(register_dataset_types=True, save_versions=False) 

253 self.assertEqual(len(quanta), 2) 

254 self._assert_datasets(stored_output_storage_class="TaskMetadataLike") 

255 

256 def test_input_differs(self): 

257 """Test execution where an overall input's storage class is different 

258 from the consuming task. 

259 """ 

260 executor = self._make_executor(a_i_storage_class="TaskMetadataLike") 

261 quanta = executor.run(register_dataset_types=True, save_versions=False) 

262 self.assertEqual(len(quanta), 2) 

263 self._assert_datasets(a_i_storage_class="TaskMetadataLike") 

264 

265 def test_input_differs_use_local_butler(self): 

266 """Test execution where an overall input's storage class is different 

267 from the consuming task, and we use a local butler. 

268 """ 

269 executor = self._make_executor(a_i_storage_class="TaskMetadataLike") 

270 with tempfile.TemporaryDirectory() as tempdir: 

271 root = os.path.join(tempdir, "butler_root") 

272 local_butler = executor.use_local_butler(root) 

273 self.enterContext(local_butler) 

274 quanta = executor.run(register_dataset_types=True, save_versions=False) 

275 self.assertEqual(len(quanta), 2) 

276 self._assert_datasets(a_i_storage_class="TaskMetadataLike", butler=executor.butler) 

277 

278 def test_incompatible(self): 

279 """Test that we cannot make a QG if the registry and pipeline have 

280 incompatible storage classes for a dataset type. 

281 """ 

282 # Incompatible output dataset type. 

283 self.butler.registry.registerDatasetType( 

284 lsst.daf.butler.DatasetType( 

285 "output", 

286 dimensions=self.butler.dimensions.empty, 

287 storageClass="StructuredDataList", 

288 ) 

289 ) 

290 with self.assertRaisesRegex( 

291 IncompatibleDatasetTypeError, "Incompatible definition.*StructuredDataDict.*StructuredDataList.*" 

292 ): 

293 self._make_executor() 

294 

295 def test_registry_changed(self): 

296 """Run pipeline, but change registry dataset types between making the 

297 QG and executing it. 

298 

299 This only fails with full-butler execution; we don't have a way to 

300 prevent it with QBB. 

301 """ 

302 executor = self._make_executor() 

303 self.butler.registry.registerDatasetType( 

304 lsst.daf.butler.DatasetType( 

305 "output", 

306 dimensions=self.butler.dimensions.empty, 

307 storageClass="TaskMetadataLike", # even compatible is not okay 

308 ) 

309 ) 

310 with self.assertRaisesRegex( 

311 lsst.daf.butler.registry.ConflictingDefinitionError, 

312 ".*_mock_StructuredDataDict.*is inconsistent with.*TaskMetadataLike.*", 

313 ): 

314 executor.run(register_dataset_types=True, save_versions=False) 

315 

316 

317class MemoryTester(lsst.utils.tests.MemoryTestCase): 

318 """Generic tests for file leaks.""" 

319 

320 

321def setup_module(module): 

322 """Set up the module for pytest. 

323 

324 Parameters 

325 ---------- 

326 module : `~types.ModuleType` 

327 Module to set up. 

328 """ 

329 lsst.utils.tests.init() 

330 

331 

332if __name__ == "__main__": 

333 lsst.utils.tests.init() 

334 unittest.main()