Coverage for tests / test_qg_builder_dimensions.py: 19%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import logging 

29import unittest 

30 

31import astropy.table 

32 

33import lsst.utils.tests 

34from lsst.daf.butler import Butler, DataCoordinate 

35from lsst.daf.butler.tests.utils import create_populated_sqlite_registry 

36from lsst.pipe.base import PipelineGraph 

37from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( 

38 AllDimensionsQuantumGraphBuilder, 

39 DatasetQueryConstraintVariant, 

40) 

41from lsst.pipe.base.tests.mocks import ( 

42 DynamicConnectionConfig, 

43 DynamicTestPipelineTask, 

44 DynamicTestPipelineTaskConfig, 

45) 

46from lsst.resources import ResourcePath 

47from lsst.sphgeom import RangeSet 

48 

49_LOG = logging.getLogger(__name__) 

50 

51 

52class AllDimensionsQuantumGraphBuilderTestCase(unittest.TestCase): 

53 """Tests for AllDimensionsQuantumGraphBuilder with various interesting 

54 combinations of dimensions. 

55 """ 

56 

57 @staticmethod 

58 def make_butler() -> Butler: 

59 DATA_ROOT = ResourcePath("resource://lsst.daf.butler/tests/registry_data", forceDirectory=True) 

60 return create_populated_sqlite_registry( 

61 *[DATA_ROOT.join(filename) for filename in ("base.yaml", "spatial.yaml")] 

62 ) 

63 

64 def setUp(self): 

65 self.butler = self.make_butler() 

66 self.enterContext(self.butler) 

67 

68 def tearDown(self): 

69 del self.butler 

70 return super().tearDown() 

71 

72 def test_one_to_one(self) -> None: 

73 """Test building a QG with a single task whose inputs and outputs and 

74 quanta all have the same dimensions. 

75 """ 

76 config = DynamicTestPipelineTaskConfig() 

77 config.dimensions = ["visit", "detector"] 

78 config.inputs["i1"] = DynamicConnectionConfig( 

79 dataset_type_name="d1", dimensions=["visit", "detector"] 

80 ) 

81 config.outputs["o2"] = DynamicConnectionConfig( 

82 dataset_type_name="d2", dimensions=["visit", "detector"] 

83 ) 

84 pipeline_graph = PipelineGraph(universe=self.butler.dimensions) 

85 pipeline_graph.add_task("t1", DynamicTestPipelineTask, config=config) 

86 pipeline_graph.resolve(self.butler.registry) 

87 self._insert_overall_inputs(pipeline_graph) 

88 builder = AllDimensionsQuantumGraphBuilder( 

89 pipeline_graph, 

90 self.butler, 

91 input_collections=["c1"], 

92 output_run="c2", 

93 ) 

94 qg = builder.finish(attach_datastore_records=False).assemble() 

95 quanta = qg.quanta_by_task["t1"] 

96 self.assertEqual(len(quanta), 8) # 2 visits x 4 detectors 

97 

98 def test_patch_to_hpx_to_global(self) -> None: 

99 """Test building a QG with patch inputs and a hierarchy of healpix 

100 outputs and a final global step. 

101 """ 

102 pipeline_graph = self._make_hpx_pipeline_graph( 

103 patch_to_hpx8_and_hpx11=True, 

104 hpx8_to_hpx5=True, 

105 hpx5_to_global=True, 

106 ) 

107 self._insert_overall_inputs(pipeline_graph) 

108 builder = AllDimensionsQuantumGraphBuilder( 

109 pipeline_graph, 

110 self.butler, 

111 input_collections=["c1"], 

112 output_run="c2", 

113 ) 

114 qg = builder.finish(attach_datastore_records=False).assemble() 

115 self.assertEqual(len(qg.quanta_by_task["patch_to_hpx8_and_hpx11"]), 22) 

116 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2) 

117 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1) 

118 

119 def test_patch_to_hpx_to_instrument(self) -> None: 

120 """Test building a QG with patch inputs and a hierarchy of healpix 

121 outputs and a final per-instrument step. 

122 """ 

123 pipeline_graph = self._make_hpx_pipeline_graph( 

124 patch_to_hpx8_and_hpx11=True, 

125 hpx8_to_hpx5=True, 

126 hpx5_to_instrument=True, 

127 ) 

128 self._insert_overall_inputs(pipeline_graph) 

129 builder = AllDimensionsQuantumGraphBuilder( 

130 pipeline_graph, 

131 self.butler, 

132 input_collections=["c1"], 

133 output_run="c2", 

134 ) 

135 qg = builder.finish(attach_datastore_records=False).assemble() 

136 self.assertEqual(len(qg.quanta_by_task["patch_to_hpx8_and_hpx11"]), 22) 

137 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2) 

138 self.assertEqual(len(qg.quanta_by_task["hpx5_to_instrument"]), 1) 

139 

140 def test_hpx_to_global_dataset_constraint(self) -> None: 

141 """Test building a QG with healpix inputs and a hierarchy of healpix 

142 outputs and a final global step, with the healpix input used as a 

143 dataset constraint (i.e. the default behavior). 

144 """ 

145 pipeline_graph = self._make_hpx_pipeline_graph( 

146 hpx8_to_hpx5=True, 

147 hpx5_to_global=True, 

148 ) 

149 self._insert_overall_inputs(pipeline_graph) 

150 builder = AllDimensionsQuantumGraphBuilder( 

151 pipeline_graph, 

152 self.butler, 

153 input_collections=["c1"], 

154 output_run="c2", 

155 ) 

156 qg = builder.finish(attach_datastore_records=False).assemble() 

157 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2) 

158 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1) 

159 

160 @unittest.expectedFailure 

161 def test_hpx_to_global_where_constraint(self) -> None: 

162 """Test building a QG with healpix inputs and a hierarchy of healpix 

163 outputs and a final global step, with a 'where' constraint instead of 

164 a dataset constraint. 

165 

166 It would be nice for this to work, but we do not currently expect it to 

167 given the limitations of the butler query system. 

168 """ 

169 pipeline_graph = self._make_hpx_pipeline_graph( 

170 hpx8_to_hpx5=True, 

171 hpx5_to_global=True, 

172 ) 

173 self._insert_overall_inputs(pipeline_graph) 

174 builder = AllDimensionsQuantumGraphBuilder( 

175 pipeline_graph, 

176 self.butler, 

177 input_collections=["c1"], 

178 output_run="c2", 

179 dataset_query_constraint=DatasetQueryConstraintVariant.OFF, 

180 where="healpix5 IN (4864, 4522)", 

181 ) 

182 qg = builder.finish(attach_datastore_records=False).assemble() 

183 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2) 

184 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1) 

185 

186 def test_hpx_to_global_data_id_table(self) -> None: 

187 """Test building a QG with healpix inputs and a hierarchy of healpix 

188 outputs and a final global step, with healpix IDs provided via a data 

189 ID table. 

190 """ 

191 pipeline_graph = self._make_hpx_pipeline_graph( 

192 hpx8_to_hpx5=True, 

193 hpx5_to_global=True, 

194 ) 

195 self._insert_overall_inputs(pipeline_graph) 

196 tbl = astropy.table.Table(rows=[(4864,), (4522,)], names=["healpix5"]) 

197 builder = AllDimensionsQuantumGraphBuilder( 

198 pipeline_graph, 

199 self.butler, 

200 input_collections=["c1"], 

201 output_run="c2", 

202 dataset_query_constraint=DatasetQueryConstraintVariant.OFF, 

203 data_id_tables=[tbl], 

204 ) 

205 qg = builder.finish(attach_datastore_records=False).assemble() 

206 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2) 

207 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1) 

208 

209 def _make_hpx_pipeline_graph( 

210 self, 

211 *, 

212 patch_to_hpx8_and_hpx11: bool = False, 

213 hpx8_to_hpx5: bool = False, 

214 hpx5_to_global=False, 

215 hpx5_to_instrument: bool = False, 

216 ) -> PipelineGraph: 

217 """Generate a pipeline graph with tasks that use healpix dimensions 

218 in various ways. 

219 """ 

220 pipeline_graph = PipelineGraph(universe=self.butler.dimensions) 

221 if patch_to_hpx8_and_hpx11: 

222 config = DynamicTestPipelineTaskConfig() 

223 config.dimensions = {"healpix8"} 

224 config.inputs["i1"] = DynamicConnectionConfig( 

225 dataset_type_name="patch_dataset", 

226 dimensions={"patch"}, 

227 multiple=True, 

228 ) 

229 config.outputs["o1"] = DynamicConnectionConfig( 

230 dataset_type_name="hpx11_dataset", 

231 dimensions={"healpix11"}, 

232 multiple=True, 

233 ) 

234 config.outputs["o2"] = DynamicConnectionConfig( 

235 dataset_type_name="hpx8_dataset", 

236 dimensions={"healpix8"}, 

237 ) 

238 pipeline_graph.add_task("patch_to_hpx8_and_hpx11", DynamicTestPipelineTask, config=config) 

239 if hpx8_to_hpx5: 

240 config = DynamicTestPipelineTaskConfig() 

241 config.dimensions = {"healpix5"} 

242 config.inputs["i1"] = DynamicConnectionConfig( 

243 dataset_type_name="hpx8_dataset", 

244 dimensions={"healpix8"}, 

245 multiple=True, 

246 ) 

247 config.outputs["o1"] = DynamicConnectionConfig( 

248 dataset_type_name="hpx5_dataset", 

249 dimensions={"healpix5"}, 

250 ) 

251 pipeline_graph.add_task("hpx8_to_hpx5", DynamicTestPipelineTask, config=config) 

252 if hpx5_to_global: 

253 config = DynamicTestPipelineTaskConfig() 

254 config.dimensions = {} 

255 config.inputs["i1"] = DynamicConnectionConfig( 

256 dataset_type_name="hpx5_dataset", 

257 dimensions={"healpix5"}, 

258 multiple=True, 

259 ) 

260 config.outputs["o1"] = DynamicConnectionConfig( 

261 dataset_type_name="global_dataset", 

262 dimensions={}, 

263 ) 

264 pipeline_graph.add_task("hpx5_to_global", DynamicTestPipelineTask, config=config) 

265 if hpx5_to_instrument: 

266 config = DynamicTestPipelineTaskConfig() 

267 config.dimensions = {"instrument"} 

268 config.inputs["i1"] = DynamicConnectionConfig( 

269 dataset_type_name="hpx5_dataset", 

270 dimensions={"healpix5"}, 

271 multiple=True, 

272 ) 

273 config.outputs["o1"] = DynamicConnectionConfig( 

274 dataset_type_name="instrument_dataset", 

275 dimensions={"instrument"}, 

276 ) 

277 pipeline_graph.add_task("hpx5_to_instrument", DynamicTestPipelineTask, config=config) 

278 pipeline_graph.resolve(self.butler.registry) 

279 return pipeline_graph 

280 

281 def _insert_overall_inputs(self, pipeline_graph: PipelineGraph) -> None: 

282 """Insert overall-input datasets for a pipeline graph.""" 

283 self.butler.collections.register("c1") 

284 for _, node in pipeline_graph.iter_overall_inputs(): 

285 self.butler.registry.registerDatasetType(node.dataset_type) 

286 if node.dimensions.skypix: 

287 if len(node.dimensions) == 1: 

288 (skypix_name,) = node.dimensions.skypix 

289 pixelization = node.dimensions.universe.skypix_dimensions[skypix_name].pixelization 

290 ranges = RangeSet() 

291 for patch_record in self.butler.query_dimension_records("patch"): 

292 ranges |= pixelization.envelope(patch_record.region) 

293 data_ids = [] 

294 for begin, end in ranges: 

295 for index in range(begin, end): 

296 data_ids.append(DataCoordinate.from_required_values(node.dimensions, (index,))) 

297 else: 

298 raise NotImplementedError( 

299 "Can only generate data IDs for queryable dimensions and isolated skypix." 

300 ) 

301 else: 

302 data_ids = self.butler.query_data_ids(node.dimensions, explain=False) 

303 self.butler.registry.insertDatasets(node.dataset_type, data_ids, run="c1") 

304 

305 

306if __name__ == "__main__": 

307 lsst.utils.tests.init() 

308 unittest.main()