Coverage for tests / qg_test_utils.py: 59%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:21 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27"""QuantumGraph-related utilities to support ctrl_bps testing.""" 

28 

29# Not actually running Quantum so do not need to override 'run' Method 

30# pylint: disable=abstract-method 

31 

32# Many dummy classes for testing. 

33# pylint: disable=missing-class-docstring 

34 

35import lsst.pipe.base.connectionTypes as cT 

36from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionConfig, Quantum 

37from lsst.pex.config import Field 

38from lsst.pipe.base import PipelineTask, PipelineTaskConfig, PipelineTaskConnections 

39from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo 

40 

41METADATA = {"D1": [1, 2, 3]} 

42 

43 

44# For each dummy task, create a Connections, Config, and PipelineTask 

45 

46 

47class Dummy1Connections(PipelineTaskConnections, dimensions=("D1", "D2")): 

48 """Connections class used for tests.""" 

49 

50 initOutput = cT.InitOutput(name="Dummy1InitOutput", storageClass="ExposureF", doc="n/a") 

51 input = cT.Input(name="Dummy1Input", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

52 output = cT.Output(name="Dummy1Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

53 

54 

55class Dummy1Config(PipelineTaskConfig, pipelineConnections=Dummy1Connections): 

56 """Config class used for testing.""" 

57 

58 conf1 = Field(dtype=int, default=1, doc="dummy config") 

59 

60 

61class Dummy1PipelineTask(PipelineTask): 

62 """PipelineTask used for testing.""" 

63 

64 ConfigClass = Dummy1Config 

65 

66 

67class Dummy2Connections(PipelineTaskConnections, dimensions=("D1", "D2")): 

68 """Second connections class used for testing.""" 

69 

70 initInput = cT.InitInput(name="Dummy1InitOutput", storageClass="ExposureF", doc="n/a") 

71 initOutput = cT.InitOutput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a") 

72 input = cT.Input(name="Dummy1Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

73 output = cT.Output(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

74 

75 

76class Dummy2Config(PipelineTaskConfig, pipelineConnections=Dummy2Connections): 

77 """Config class used for second pipeline task.""" 

78 

79 conf1 = Field(dtype=int, default=1, doc="dummy config") 

80 

81 

82class Dummy2PipelineTask(PipelineTask): 

83 """Second test PipelineTask.""" 

84 

85 ConfigClass = Dummy2Config 

86 

87 

88class Dummy2bConnections(PipelineTaskConnections, dimensions=("D1", "D2")): 

89 """A connections class used for testing mid-pipeline leaf node.""" 

90 

91 initInput = cT.InitInput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a") 

92 initOutput = cT.InitOutput(name="Dummy2bInitOutput", storageClass="ExposureF", doc="n/a") 

93 input = cT.Input(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

94 output = cT.Output(name="Dummy2bOutput", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

95 

96 

97class Dummy2bConfig(PipelineTaskConfig, pipelineConnections=Dummy2bConnections): 

98 """Config used for testing dummy2b.""" 

99 

100 conf1 = Field(dtype=int, default=1, doc="dummy config") 

101 

102 

103class Dummy2bPipelineTask(PipelineTask): 

104 """PipelineTask for dummy2b.""" 

105 

106 ConfigClass = Dummy2bConfig 

107 

108 

109class Dummy3Connections(PipelineTaskConnections, dimensions=("D1", "D2")): 

110 """Third connections class used for testing.""" 

111 

112 initInput = cT.InitInput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a") 

113 initOutput = cT.InitOutput(name="Dummy3InitOutput", storageClass="ExposureF", doc="n/a") 

114 input = cT.Input(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

115 output = cT.Output(name="Dummy3Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

116 

117 

118class Dummy3Config(PipelineTaskConfig, pipelineConnections=Dummy3Connections): 

119 """Third config used for testing.""" 

120 

121 conf1 = Field(dtype=int, default=1, doc="dummy config") 

122 

123 

124class Dummy3PipelineTask(PipelineTask): 

125 """Third test PipelineTask.""" 

126 

127 ConfigClass = Dummy3Config 

128 

129 

130class Dummy4Connections(PipelineTaskConnections, dimensions=("D1", "D2")): 

131 """Fourth connections class used for testing.""" 

132 

133 initInput = cT.InitInput(name="Dummy3InitOutput", storageClass="ExposureF", doc="n/a") 

134 initOutput = cT.InitOutput(name="Dummy4InitOutput", storageClass="ExposureF", doc="n/a") 

135 input = cT.Input(name="Dummy3Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

136 output = cT.Output(name="Dummy4Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

137 

138 

139class Dummy4Config(PipelineTaskConfig, pipelineConnections=Dummy4Connections): 

140 """Fourth config used for testing.""" 

141 

142 conf1 = Field(dtype=int, default=1, doc="dummy config") 

143 

144 

145class Dummy4PipelineTask(PipelineTask): 

146 """Fourth test PipelineTask.""" 

147 

148 ConfigClass = Dummy4Config 

149 

150 

151# Test if a Task that does not interact with the other Tasks works fine in 

152# the graph. 

153class Dummy5Connections(PipelineTaskConnections, dimensions=("D1", "D2")): 

154 """Fifth connections class used for testing.""" 

155 

156 input = cT.Input(name="Dummy5Input", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

157 output = cT.Output(name="Dummy5Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2")) 

158 

159 

160class Dummy5Config(PipelineTaskConfig, pipelineConnections=Dummy5Connections): 

161 """Fifth config used for testing.""" 

162 

163 conf1 = Field(dtype=int, default=1, doc="dummy config") 

164 

165 

166class Dummy5PipelineTask(PipelineTask): 

167 """Fifth test PipelineTask.""" 

168 

169 ConfigClass = Dummy5Config 

170 

171 

172def _make_quantum(run, universe, task, task_def, dim1, dim2, intermediate_refs): 

173 if task_def.connections.initInputs: 

174 init_init_ds_type = DatasetType( 

175 task_def.connections.initInput.name, 

176 (), 

177 storageClass=task_def.connections.initInput.storageClass, 

178 universe=universe, 

179 ) 

180 init_refs = [DatasetRef(init_init_ds_type, DataCoordinate.make_empty(universe), run=run)] 

181 else: 

182 init_refs = None 

183 input_ds_type = DatasetType( 

184 task_def.connections.input.name, 

185 task_def.connections.input.dimensions, 

186 storageClass=task_def.connections.input.storageClass, 

187 universe=universe, 

188 ) 

189 data_id = DataCoordinate.standardize({"D1": dim1, "D2": dim2}, universe=universe) 

190 if ref := intermediate_refs.get((input_ds_type, data_id)): 

191 input_refs = [ref] 

192 else: 

193 input_refs = [DatasetRef(input_ds_type, data_id, run=run)] 

194 output_ds_type = DatasetType( 

195 task_def.connections.output.name, 

196 task_def.connections.output.dimensions, 

197 storageClass=task_def.connections.output.storageClass, 

198 universe=universe, 

199 ) 

200 ref = DatasetRef(output_ds_type, data_id, run=run) 

201 intermediate_refs[(output_ds_type, data_id)] = ref 

202 output_refs = [ref] 

203 quantum = Quantum( 

204 taskName=task.__qualname__, 

205 dataId=data_id, 

206 taskClass=task, 

207 initInputs=init_refs, 

208 inputs={input_ds_type: input_refs}, 

209 outputs={output_ds_type: output_refs}, 

210 ) 

211 return quantum 

212 

213 

214def make_test_helper() -> InMemoryRepo: 

215 """Make a test helper that can produce a quantum graph useful for 

216 clustering tests. 

217 

218 See `make_quantum_graph` for a more complete description of this graph. 

219 """ 

220 dimension_config = DimensionConfig( 

221 { 

222 "version": 1, 

223 "skypix": { 

224 "common": "htm7", 

225 "htm": { 

226 "class": "lsst.sphgeom.HtmPixelization", 

227 "max_level": 24, 

228 }, 

229 }, 

230 "elements": { 

231 "D1": { 

232 "keys": [ 

233 { 

234 "name": "id", 

235 "type": "int", 

236 } 

237 ], 

238 "storage": { 

239 "cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage", 

240 }, 

241 }, 

242 "D2": { 

243 "keys": [ 

244 { 

245 "name": "id", 

246 "type": "int", 

247 } 

248 ], 

249 "storage": { 

250 "cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage", 

251 }, 

252 }, 

253 }, 

254 "packers": {}, 

255 } 

256 ) 

257 helper = InMemoryRepo(dimension_config=dimension_config) 

258 helper.butler.registry.insertDimensionData("D1", *[{"id": n} for n in (1, 3)]) 

259 helper.butler.registry.insertDimensionData("D2", *[{"id": n} for n in (2, 4)]) 

260 # Note that automatic inputs and outputs work for most of the tasks below 

261 # to create a chain from each task to the next. 

262 helper.add_task( 

263 "T1", 

264 dimensions=("D1", "D2"), 

265 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy1InitOutput")}, 

266 ) 

267 helper.add_task( 

268 "T2", 

269 dimensions=("D1", "D2"), 

270 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy1InitOutput")}, 

271 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")}, 

272 ) 

273 helper.add_task( 

274 "T3", 

275 dimensions=("D1", "D2"), 

276 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")}, 

277 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy3InitOutput")}, 

278 ) 

279 helper.add_task( 

280 "T4", 

281 dimensions=("D1", "D2"), 

282 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy3InitOutput")}, 

283 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy4InitOutput")}, 

284 ) 

285 helper.add_task( 

286 "T5", 

287 dimensions=("D1", "D2"), 

288 inputs={ 

289 "initInput": DynamicConnectionConfig(dataset_type_name="Dummy5Input", dimensions=("D1", "D2")) 

290 }, 

291 outputs={ 

292 "initOutput": DynamicConnectionConfig(dataset_type_name="Dummy5Output", dimensions=("D1", "D2")) 

293 }, 

294 ) 

295 helper.add_task( 

296 "T2b", 

297 dimensions=("D1", "D2"), 

298 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")}, 

299 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy2bInitOutput")}, 

300 inputs={"input": DynamicConnectionConfig(dataset_type_name="dataset_auto2", dimensions=("D1", "D2"))}, 

301 ) 

302 helper.insert_datasets("dataset_auto0", where="D1 < D2") 

303 helper.insert_datasets("Dummy5Input", where="D1 < D2") 

304 return helper 

305 

306 

307def make_test_quantum_graph(run: str = "run", uneven=False): 

308 """Create a quantum graph for unit tests. 

309 

310 Parameters 

311 ---------- 

312 run : `str`, optional 

313 Name of the RUN collection for output datasets. 

314 uneven : `bool`, optional 

315 Whether some of the quanta for initial tasks are 

316 not included as if finished in previous run. 

317 

318 Returns 

319 ------- 

320 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

321 A test QuantumGraph looking like the following: 

322 (Task T5 is completely independent). 

323 

324 Numbers in parens are the values for the two dimensions (D1, D2). 

325 

326 .. code-block:: 

327 T1(1,2) T1(1,4) T1(3,4) T5(1,2) T5(1,4) T5(3,4) 

328 | | | 

329 T2(1,2) T2(1,4) T2(3,4) 

330 | | | | | | 

331 | T2b(1,2) | T2b(1,4) | T2b(3,4) 

332 | | | 

333 T3(1,2) T3(1,4) T3(3,4) 

334 | | | 

335 T4(1,2) T4(1,4) T4(3,4) 

336 """ 

337 with make_test_helper() as helper: 

338 qgc = helper.make_quantum_graph_builder(output_run=run).finish(attach_datastore_records=False) 

339 if uneven: 

340 keys_to_drop = {("T1", 1, 2), ("T1", 1, 4), ("T2", 1, 2)} 

341 qgc.quantum_datasets = { 

342 qd.quantum_id: qd 

343 for qd in qgc.quantum_datasets.values() 

344 if (qd.task_label, *qd.data_coordinate) not in keys_to_drop 

345 } 

346 qgc.set_thin_graph() 

347 qgc.set_header_counts() 

348 return qgc.assemble()