Coverage for tests / test_drp-v2.py: 19%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:28 +0000

1# This file is part of drp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import logging 

25import os 

26import re 

27import tempfile 

28import unittest 

29from collections.abc import Set 

30from typing import Any 

31 

32from lsst.daf.butler import Butler, Config, DatasetType 

33from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

34from lsst.pipe.base import Pipeline, PipelineGraph 

35 

36from lsst.drp.pipe.tests.correspondence import Correspondence 

37 

38_LOG = logging.getLogger(__name__) 

39 

40 

41PIPELINES_DIR = os.path.join(os.path.dirname(__file__), "..", "pipelines") 

42TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

43 

44 

45COMCAM_REFCAT = "the_monster_20250219" 

46COMCAM_INPUTS = { 

47 COMCAM_REFCAT, 

48 "raw", 

49 "flat", 

50 "dark", 

51 "bias", 

52 "bfk", 

53 "linearizer", 

54 "cti", 

55 "ptc", 

56 "gain_correction", 

57 "crosstalk", 

58 "camera", 

59 "defects", 

60 "skyMap", 

61 "fgcmLookUpTable", 

62 "pretrainedModelPackage", 

63 "illuminationCorrection", 

64 # Solar system state required to generateEphemerides 

65 "mpcorb", 

66 "de440s", 

67 "sb441_n16", 

68 "obscodes", 

69 "linux_p1550p2650", 

70 "pck00010", 

71 "earth_latest_high_prec", 

72 "earth_620120_250826", 

73 "earth_2025_250826_2125_predict", 

74 "naif0012", 

75} 

76 

77 

78class DrpV2TestCase(unittest.TestCase): 

79 """Self-consistency and overall-input checks for the DRP-v2 pipeline 

80 variants. 

81 """ 

82 

83 def setUp(self): 

84 self.root = makeTestTempDir(TEST_DIR) 

85 self.maxDiff = None 

86 

87 def tearDown(self): 

88 removeTestTempDir(self.root) 

89 

90 def make_butler(self, **kwargs: Any) -> Butler: 

91 """Construct a butler repository and return a `Butler` client.""" 

92 config = Config() 

93 

94 # make separate temporary directory for registry of this instance 

95 tmpdir = tempfile.mkdtemp(dir=self.root) 

96 config["registry", "db"] = f"sqlite:///{tmpdir}/gen3.sqlite3" 

97 config = Butler.makeRepo(self.root, config) 

98 butler = Butler.from_config(config, **kwargs) 

99 return butler 

100 

101 def register_refcat(self, butler: Butler, name: str) -> None: 

102 butler.registry.registerDatasetType( 

103 DatasetType( 

104 name, 

105 {"htm7"}, 

106 "SimpleCatalog", 

107 isCalibration=False, 

108 universe=butler.dimensions, 

109 ) 

110 ) 

111 

112 def register_dataset(self, butler: Butler, name: str, dimensions: set(str), storageClass: str) -> None: 

113 butler.registry.registerDatasetType( 

114 DatasetType( 

115 name, 

116 dimensions, 

117 storageClass, 

118 isCalibration=False, 

119 universe=butler.dimensions, 

120 ) 

121 ) 

122 

123 def test_comcam_full_load(self) -> None: 

124 """Test the LSSTComCam/DRP-v2 pipeline after reading the full pipeline 

125 at once. 

126 """ 

127 butler = self.make_butler(writeable=True) 

128 self.register_refcat(butler, COMCAM_REFCAT) 

129 pipeline = Pipeline.from_uri( 

130 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml") 

131 ) 

132 # Just constructing and resolving the pipeline graph does a lot of 

133 # validation, since it checks that the step flow and dimensions are 

134 # valid (step flow is what the PipelineStepTester checks for the v1 

135 # pipelines). 

136 pipeline_graph = pipeline.to_graph(registry=butler.registry) 

137 self.check_stage( 

138 pipeline_graph, pipeline_graph.task_subsets["stage1-single-visit"], "step1" 

139 ) 

140 self.check_stage( 

141 pipeline_graph, pipeline_graph.task_subsets["stage2-recalibrate"], "step2" 

142 ) 

143 self.check_stage( 

144 pipeline_graph, pipeline_graph.task_subsets["stage3-coadd"], "step3" 

145 ) 

146 self.check_stage( 

147 pipeline_graph, 

148 pipeline_graph.task_subsets["stage4-measure-variability"], 

149 "step4", 

150 ) 

151 # Check that the overall inputs are only the ones we expect. 

152 overall_inputs = {name for name, _ in pipeline_graph.iter_overall_inputs()} 

153 self.assertEqual(overall_inputs, COMCAM_INPUTS) 

154 

155 def test_comcam_stage_load(self) -> None: 

156 """Test the LSSTComCam/DRP-v2 pipeline after reading the pipeline 

157 one stage at a time. 

158 """ 

159 butler = self.make_butler(writeable=True) 

160 self.register_refcat(butler, COMCAM_REFCAT) 

161 # Pre-define visit_table. 

162 # It is stored as ArrowAstropy but read as a DataFrame. 

163 self.register_dataset(butler, "visit_table", {"instrument"}, "ArrowAstropy") 

164 pipeline_graph_1 = Pipeline.from_uri( 

165 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage1-single-visit") 

166 ).to_graph(registry=butler.registry) 

167 pipeline_graph_1.register_dataset_types(butler) 

168 self.check_stage(pipeline_graph_1, pipeline_graph_1.tasks.keys(), "") 

169 pipeline_graph_2 = Pipeline.from_uri( 

170 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage2-recalibrate") 

171 ).to_graph(registry=butler.registry) 

172 pipeline_graph_2.register_dataset_types(butler) 

173 self.check_stage( 

174 pipeline_graph_2, pipeline_graph_2.task_subsets["stage2-recalibrate"], "" 

175 ) 

176 pipeline_graph_3 = Pipeline.from_uri( 

177 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage3-coadd") 

178 ).to_graph(registry=butler.registry) 

179 pipeline_graph_3.register_dataset_types(butler) 

180 self.check_stage( 

181 pipeline_graph_3, pipeline_graph_3.task_subsets["stage3-coadd"], "" 

182 ) 

183 pipeline_graph_4 = Pipeline.from_uri( 

184 os.path.join( 

185 PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage4-measure-variability" 

186 ) 

187 ).to_graph(registry=butler.registry) 

188 pipeline_graph_4.register_dataset_types(butler) 

189 self.check_stage( 

190 pipeline_graph_4, 

191 pipeline_graph_4.task_subsets["stage4-measure-variability"], 

192 "", 

193 ) 

194 # Spot-check a few prominent outputs for each stage. 

195 self.assertIsNotNone(pipeline_graph_1.producer_of("single_visit_star")) 

196 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_image")) 

197 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_summary")) 

198 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_table")) 

199 self.assertIsNotNone( 

200 pipeline_graph_1.producer_of("preliminary_visit_detector_table") 

201 ) 

202 self.assertIsNotNone( 

203 pipeline_graph_1.producer_of("single_visit_star_association_metrics") 

204 ) 

205 self.assertIsNotNone(pipeline_graph_2.producer_of("recalibrated_star")) 

206 self.assertIsNotNone( 

207 pipeline_graph_2.producer_of("recalibrated_star_association_metrics") 

208 ) 

209 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_summary")) 

210 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_table")) 

211 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_detector_table")) 

212 self.assertIsNotNone(pipeline_graph_3.producer_of("deep_coadd")) 

213 self.assertIsNotNone(pipeline_graph_3.producer_of("template_coadd")) 

214 self.assertIsNotNone(pipeline_graph_3.producer_of("object")) 

215 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_source")) 

216 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_object")) 

217 self.assertIsNotNone(pipeline_graph_4.producer_of("source")) 

218 self.assertIsNotNone( 

219 pipeline_graph_4.producer_of("analysis_source_association_metrics") 

220 ) 

221 self.assertIsNotNone(pipeline_graph_4.producer_of("visit_image")) 

222 self.assertIsNotNone(pipeline_graph_4.producer_of("object_forced_source")) 

223 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_object_forced_source")) 

224 

225 def check_stage( 

226 self, pipeline_graph: PipelineGraph, stage_members: Set[str], step_prefix: str 

227 ) -> None: 

228 """Check that the formal steps that should subdivide a stage subset 

229 actually do. 

230 

231 Parameters 

232 ---------- 

233 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph` 

234 A pipeline graph that has step definitions. 

235 stage_members : `Set` [ `str` ] 

236 The task labels that belong to the stage being tested. 

237 step_prefix : `str` 

238 String prefix for all steps that are part of this stage. May be 

239 an empty string to match all steps in the pipeline and check that 

240 only the desired steps are in the pipeline. 

241 """ 

242 step_labels = [ 

243 label for label in pipeline_graph.steps if label.startswith(step_prefix) 

244 ] 

245 # Steps should be sorted alphabetically. 

246 self.assertEqual(sorted(step_labels), step_labels) 

247 # Test that the checkpoint/stage subset is the unions of its steps. 

248 step_member_union = set() 

249 for step_label in step_labels: 

250 step_member_union.update(pipeline_graph.task_subsets[step_label]) 

251 self.assertSetEqual(step_member_union, set(stage_members)) 

252 

253 def test_comcam_correspondence(self) -> None: 

254 butler = self.make_butler(writeable=True) 

255 self.register_refcat(butler, COMCAM_REFCAT) 

256 correspondence_filename = os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml") 

257 new_pipeline_graph = Pipeline.from_uri(correspondence_filename).to_graph( 

258 registry=butler.registry 

259 ) 

260 old_steps = [ 

261 "step1", 

262 "step2a", 

263 "step2b", 

264 "step2c", 

265 "step2d", 

266 "step2e", 

267 "step3a", 

268 "step3b", 

269 "step4", 

270 "step5", 

271 "step6", 

272 "step7", 

273 ] 

274 old_pipeline_graph = Pipeline.from_uri( 

275 os.path.join(PIPELINES_DIR, f"LSSTComCam/DRP.yaml#{','.join(old_steps)}") 

276 ).to_graph(registry=butler.registry) 

277 ignore_edges = { 

278 # Use recalibrated_star instead of single_visit_star for calib 

279 # flag propagation. 

280 ("measureObjectUnforced", "sourceTableHandles"), 

281 # Use deep_coadd instead of deep_coadd_preliminary for WCS. 

282 ("measureObjectForced", "refWcs"), 

283 } 

284 correspondence_filename = os.path.join( 

285 TEST_DIR, "migration_data", "LSSTComCam", "DRP-v2.json" 

286 ) 

287 correspondence = Correspondence.read(correspondence_filename) 

288 if correspondence.check( 

289 new_pipeline_graph, 

290 old_pipeline_graph, 

291 "LSSTComCam/DRP-v2", 

292 "LSSTComCam/DRP", 

293 ignore_edges=ignore_edges, 

294 ): 

295 new_correspondence = correspondence.find_matches( 

296 new_pipeline_graph, old_pipeline_graph 

297 ).sorted(new_pipeline_graph, old_pipeline_graph) 

298 new_correspondence_filename = ( 

299 f"{os.path.splitext(correspondence_filename)[0]}.new.json" 

300 ) 

301 new_correspondence.write(new_correspondence_filename) 

302 print( 

303 "An attempt to fix the correspondence file has " 

304 f"been written to {new_correspondence_filename!r}." 

305 ) 

306 if messages := new_correspondence.check( 

307 new_pipeline_graph, 

308 old_pipeline_graph, 

309 "LSSTComCam/DRP-v2", 

310 "LSSTComCam/DRP", 

311 ignore_edges=ignore_edges, 

312 ): 

313 print("Some problems remain that require manual edits:") 

314 for message in messages: 

315 print(message) 

316 else: 

317 print("This file should be reviewed and moved to replace the original.") 

318 raise AssertionError( 

319 f"Pipeline correspondence file {correspondence_filename!r} is out of date." 

320 ) 

321 for new_label, old_label in correspondence.tasks_new_to_old.items(): 

322 new_node = new_pipeline_graph.tasks[new_label] 

323 old_node = old_pipeline_graph.tasks[old_label] 

324 task_config_diff = correspondence.diff_task_configs(new_node, old_node) 

325 if task_config_diff: 

326 print(task_config_diff) 

327 raise AssertionError( 

328 f"Differences detected in configs for task {new_label!r} (previously {old_label!r})." 

329 ) 

330 

331 def test_comcam_compat(self) -> None: 

332 """Test the LSSTComCam/DRP-v2-compat pipeline, which should be 

333 identical to DRP-v2 aside from replacing 'source' -> 'source2' 

334 throughout. 

335 """ 

336 butler = self.make_butler(writeable=True) 

337 self.register_refcat(butler, COMCAM_REFCAT) 

338 pipeline = Pipeline.from_uri( 

339 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml") 

340 ) 

341 pipeline_graph = pipeline.to_graph(registry=butler.registry) 

342 pipeline_compat = Pipeline.from_uri( 

343 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2-compat.yaml") 

344 ) 

345 pipeline_graph_compat = pipeline_compat.to_graph(registry=butler.registry) 

346 self.assertFalse( 

347 Correspondence( 

348 dataset_types_new_to_old={"source2": "source"}, 

349 ).find_matches(pipeline_graph_compat, pipeline_graph).check( 

350 pipeline_graph_compat, pipeline_graph, "DRP-v2-compat", "DRP-v2" 

351 ) 

352 ) 

353 

354 def test_comcam_release_id_parameter(self) -> None: 

355 """Test that changing the release_id parameter affects all appropriate 

356 config options in the pipeline. 

357 """ 

358 pipeline = Pipeline.fromString( 

359 """ 

360 description: test pipeline 

361 imports: 

362 - $DRP_PIPE_DIR/pipelines/LSSTComCam/DRP-v2.yaml 

363 parameters: 

364 release_id: 52 

365 """ 

366 ) 

367 pipeline_graph = pipeline.to_graph() 

368 regex = re.compile(r"config\.(.+)\.release_id\=") 

369 release_id_options = [] 

370 for task_node in pipeline_graph.tasks.values(): 

371 for match_string in regex.findall(task_node.get_config_str()): 

372 attribute_path: list[str] = match_string.split(".") 

373 if any(not term.isidentifier() for term in attribute_path): 

374 _LOG.warning( 

375 f"Not checking config option {task_node.label}:{match_string}.release_id " 

376 "because the test code is not sophisticated enough. Please improve it." 

377 ) 

378 continue 

379 config_attribute = task_node.config 

380 for term in attribute_path: 

381 config_attribute = getattr(config_attribute, term) 

382 self.assertEqual(config_attribute.release_id, 52) 

383 release_id_options.append((task_node.label, attribute_path)) 

384 # Spot check a few expected entries to make sure a logic bug or bad 

385 # regex isn't preventing this test from doing anything. 

386 self.assertIn(("calibrateImage", ["id_generator"]), release_id_options) 

387 self.assertIn(("detectCoaddPeaks", ["idGenerator"]), release_id_options) 

388 

389 

390if __name__ == "__main__": 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 unittest.main()