Coverage for tests / test_finalizeCharacterization.py: 13%

201 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:45 +0000

1# This file is part of pipe_tasks. 

2# 

3# LSST Data Management System 

4# This product includes software developed by the 

5# LSST Project (http://www.lsst.org/). 

6# See COPYRIGHT file at the top of the source tree. 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22"""Test FinalizeCharacterizationTask. 

23""" 

24import logging 

25import unittest 

26 

27import astropy.table.table 

28import numpy as np 

29 

30import lsst.utils.tests 

31import lsst.afw.detection as afwDetection 

32import lsst.afw.image as afwImage 

33import lsst.afw.table as afwTable 

34import lsst.pipe.base as pipeBase 

35 

36from lsst.pipe.tasks.finalizeCharacterization import ( 

37 FinalizeCharacterizationConfig, 

38 FinalizeCharacterizationTask, 

39 FinalizeCharacterizationDetectorConfig, 

40 FinalizeCharacterizationDetectorTask, 

41 ConsolidateFinalizeCharacterizationDetectorConfig, 

42 ConsolidateFinalizeCharacterizationDetectorTask, 

43) 

44 

45 

46def _make_dummy_psf_and_ap_corr_map(): 

47 # Make dummy versions of these products, including required fields. 

48 psf = afwDetection.GaussianPsf(15, 15, 2.0) 

49 ap_corr_map = afwImage.ApCorrMap() 

50 schema = afwTable.SourceTable.makeMinimalSchema() 

51 schema.addField("visit", type=np.int64, doc="Visit number for the sources.") 

52 schema.addField("detector", type=np.int32, doc="Detector number for the sources.") 

53 measured_src = afwTable.SourceCatalog(schema) 

54 measured_src.resize(10) 

55 measured_src["id"] = np.arange(10) 

56 

57 return psf, ap_corr_map, measured_src 

58 

59 

60class MockFinalizeCharacterizationTask(FinalizeCharacterizationTask): 

61 """A derived class which skips the initialization routines. 

62 """ 

63 def __init__(self, **kwargs): 

64 pipeBase.PipelineTask.__init__(self, **kwargs) 

65 

66 self.makeSubtask('reserve_selection') 

67 self.makeSubtask('source_selector') 

68 

69 def compute_psf_and_ap_corr_map( 

70 self, 

71 visit, 

72 detector, 

73 exposure, 

74 src, 

75 isolated_src_table, 

76 fgcm_standard_star_cat, 

77 use_super=False, 

78 ): 

79 """A mocked version of this method.""" 

80 if use_super: 

81 return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src, 

82 isolated_src_table, fgcm_standard_star_cat) 

83 

84 return _make_dummy_psf_and_ap_corr_map() 

85 

86 

87class MockFinalizeCharacterizationDetectorTask(FinalizeCharacterizationDetectorTask): 

88 """A derived class which skips the initialization routines. 

89 """ 

90 def __init__(self, **kwargs): 

91 pipeBase.PipelineTask.__init__(self, **kwargs) 

92 

93 self.makeSubtask('reserve_selection') 

94 self.makeSubtask('source_selector') 

95 

96 def compute_psf_and_ap_corr_map( 

97 self, 

98 visit, 

99 detector, 

100 exposure, 

101 src, 

102 isolated_src_table, 

103 fgcm_standard_star_cat, 

104 use_super=False, 

105 ): 

106 """A mocked version of this method.""" 

107 return _make_dummy_psf_and_ap_corr_map() 

108 

109 

110class FinalizeCharacterizationTestCase(lsst.utils.tests.TestCase): 

111 """Tests of some functionality of FinalizeCharacterizationTask. 

112 

113 Full testing comes from integration tests such as ci_hsc and ci_imsim. 

114 

115 These tests bypass the middleware used for accessing data and 

116 managing Task execution. 

117 """ 

118 def setUp(self): 

119 config = FinalizeCharacterizationConfig() 

120 

121 self.finalizeCharacterizationTask = MockFinalizeCharacterizationTask( 

122 config=config, 

123 ) 

124 

125 config_det = FinalizeCharacterizationDetectorConfig() 

126 

127 self.finalizeCharacterizationDetectorTask = MockFinalizeCharacterizationDetectorTask( 

128 config=config_det, 

129 ) 

130 

131 self.isolated_star_cat_dict, self.isolated_star_source_dict, self.fgcm_cat = self._make_isocats() 

132 

133 def _make_isocats(self): 

134 """Make test isolated star catalogs. 

135 

136 Returns 

137 ------- 

138 isolated_star_cat_dict : `dict` 

139 Per-"tract" dict of isolated star catalogs. 

140 isolate_star_source_dict : `dict` 

141 Per-"tract" dict of isolated source catalogs. 

142 """ 

143 dtype_cat = [('isolated_star_id', 'i8'), 

144 ('ra', 'f8'), 

145 ('dec', 'f8'), 

146 ('primary_band', 'U2'), 

147 ('source_cat_index', 'i4'), 

148 ('nsource', 'i4'), 

149 ('source_cat_index_i', 'i4'), 

150 ('nsource_i', 'i4'), 

151 ('source_cat_index_r', 'i4'), 

152 ('nsource_r', 'i4'), 

153 ('source_cat_index_z', 'i4'), 

154 ('nsource_z', 'i4')] 

155 

156 dtype_source = [('sourceId', 'i8'), 

157 ('obj_index', 'i4')] 

158 

159 dtype_fgcm = [ 

160 ('ra', 'f8'), 

161 ('dec', 'f8'), 

162 ('mag_g', 'f8'), 

163 ('mag_i', 'f8'), 

164 ] 

165 

166 isolated_star_cat_dict = {} 

167 isolated_star_source_dict = {} 

168 fgcm_cat_dict = {} 

169 

170 np.random.seed(12345) 

171 

172 # There are 90 stars in both r, i. 10 individually in each. 

173 nstar = 110 

174 nsource_per_band_per_star = 2 

175 self.nstar_total = nstar 

176 self.nstar_per_band = nstar - 10 

177 

178 # This is a brute-force assembly of a star catalog and matched sources. 

179 for tract in [0, 1, 2]: 

180 ra = np.random.uniform(low=tract, high=tract + 1.0, size=nstar) 

181 dec = np.random.uniform(low=0.0, high=1.0, size=nstar) 

182 

183 cat = np.zeros(nstar, dtype=dtype_cat) 

184 cat['isolated_star_id'] = tract*nstar + np.arange(nstar) 

185 cat['ra'] = ra 

186 cat['dec'] = dec 

187 if tract < 2: 

188 cat['primary_band'][0: 100] = 'i' 

189 cat['primary_band'][100:] = 'r' 

190 else: 

191 # Tract 2 only has z band. 

192 cat['primary_band'][:] = 'z' 

193 

194 source_cats = [] 

195 counter = 0 

196 for i in range(cat.size): 

197 cat['source_cat_index'][i] = counter 

198 if tract < 2: 

199 if i < 90: 

200 cat['nsource'][i] = 2*nsource_per_band_per_star 

201 bands = ['r', 'i'] 

202 else: 

203 cat['nsource'][i] = nsource_per_band_per_star 

204 if i < 100: 

205 bands = ['i'] 

206 else: 

207 bands = ['r'] 

208 else: 

209 cat['nsource'][i] = nsource_per_band_per_star 

210 bands = ['z'] 

211 

212 for band in bands: 

213 cat[f'source_cat_index_{band}'][i] = counter 

214 cat[f'nsource_{band}'][i] = nsource_per_band_per_star 

215 source_cat = np.zeros(nsource_per_band_per_star, dtype=dtype_source) 

216 source_cat['sourceId'] = np.arange( 

217 tract*nstar + counter, 

218 tract*nstar + counter + nsource_per_band_per_star 

219 ) 

220 source_cat['obj_index'] = i 

221 

222 source_cats.append(source_cat) 

223 

224 counter += nsource_per_band_per_star 

225 

226 fgcm_cat = np.zeros(nstar, dtype=dtype_fgcm) 

227 fgcm_cat['ra'] = ra 

228 fgcm_cat['dec'] = dec 

229 fgcm_cat['mag_g'] = np.random.uniform(low=16, high=20, size=nstar) 

230 fgcm_cat['mag_i'] = np.random.uniform(low=16, high=20, size=nstar) 

231 

232 source_cat = np.concatenate(source_cats) 

233 

234 isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(cat), 

235 storageClass="ArrowAstropy") 

236 isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(source_cat), 

237 storageClass="ArrowAstropy") 

238 fgcm_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(fgcm_cat), 

239 storageClass="ArrowAstropy") 

240 

241 return isolated_star_cat_dict, isolated_star_source_dict, fgcm_cat_dict 

242 

243 def test_concat_isolated_star_cats(self): 

244 """Test concatenation and reservation of the isolated star catalogs. 

245 """ 

246 

247 for band in ['r', 'i']: 

248 iso, iso_src = self.finalizeCharacterizationTask.concat_isolated_star_cats( 

249 band, 

250 self.isolated_star_cat_dict, 

251 self.isolated_star_source_dict 

252 ) 

253 

254 # There are two tracts, so double everything. 

255 self.assertEqual(len(iso), 2*self.nstar_per_band) 

256 

257 reserve_fraction = self.finalizeCharacterizationTask.config.reserve_selection.reserve_fraction 

258 self.assertEqual(np.sum(iso['reserved']), 

259 int(reserve_fraction*len(iso))) 

260 

261 # 2 tracts, 4 observations per tract per star, minus 2*10 not in the given band. 

262 self.assertEqual(len(iso_src), 2*(4*len(iso)//2 - 20)) 

263 

264 # Check that every star is properly matched to the sources. 

265 for i in range(len(iso)): 

266 np.testing.assert_array_equal( 

267 iso_src['obj_index'][iso[f'source_cat_index_{band}'][i]: 

268 iso[f'source_cat_index_{band}'][i] + iso[f'nsource_{band}'][i]], 

269 i 

270 ) 

271 

272 # Check that every reserved star is marked as a reserved source. 

273 res_star, = np.where(iso['reserved']) 

274 for i in res_star: 

275 np.testing.assert_array_equal( 

276 iso_src['reserved'][iso[f'source_cat_index_{band}'][i]: 

277 iso[f'source_cat_index_{band}'][i] + iso[f'nsource_{band}'][i]], 

278 True 

279 ) 

280 

281 # Check that every reserved source is marked as a reserved star. 

282 res_src, = np.where(iso_src['reserved']) 

283 np.testing.assert_array_equal( 

284 iso['reserved'][iso_src['obj_index'][res_src]], 

285 True 

286 ) 

287 

288 def test_concat_isolate_star_cats_no_sources(self): 

289 """Test concatenation when there are no sources in a tract.""" 

290 iso, iso_src = self.finalizeCharacterizationTask.concat_isolated_star_cats( 

291 'z', 

292 self.isolated_star_cat_dict, 

293 self.isolated_star_source_dict 

294 ) 

295 

296 self.assertGreater(len(iso), 0) 

297 

298 def test_compute_psf_and_ap_corr_map_no_sources(self): 

299 """Test log message when there are no good sources after selection.""" 

300 # Create an empty source catalog. 

301 src_schema = afwTable.SourceTable.makeMinimalSchema() 

302 src_schema.addField('base_GaussianFlux_instFlux', type='F', doc='Flux field') 

303 src_schema.addField('base_GaussianFlux_instFluxErr', type='F', doc='Flux field') 

304 src = afwTable.SourceCatalog(src_schema) 

305 

306 # Set defaults and placeholders for required positional arguments. 

307 self.finalizeCharacterizationTask.config.source_selector['science'].flags.bad = [] 

308 visit = 0 

309 detector = 0 

310 exposure = None 

311 isolated_source_table = None 

312 fgcm_cat = None 

313 with self.assertLogs(level=logging.WARNING) as cm: 

314 psf, ap_corr_map, measured_src = self.finalizeCharacterizationTask.compute_psf_and_ap_corr_map( 

315 visit, 

316 detector, 

317 exposure, 

318 src, 

319 isolated_source_table, 

320 fgcm_cat, 

321 use_super=True, 

322 ) 

323 self.assertIn( 

324 "No good sources remain after cuts for visit {}, detector {}".format(visit, detector), 

325 cm.output[0] 

326 ) 

327 

328 def test_run_visit(self): 

329 """Test the run method on a full visit.""" 

330 visit = 100 

331 detector0 = 0 

332 detector1 = 1 

333 band = 'r' 

334 # src_dict should be a dictionary keyed by detector, with src handles. 

335 # calexp_dict should be a dictionary keyed by detector, with calexp handles. 

336 # These can be dummy objects. 

337 

338 src0 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema()) 

339 src1 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema()) 

340 calexp0 = afwImage.ExposureF() 

341 calexp1 = afwImage.ExposureF() 

342 

343 src_dict = { 

344 detector0: pipeBase.InMemoryDatasetHandle(src0), 

345 detector1: pipeBase.InMemoryDatasetHandle(src1), 

346 } 

347 calexp_dict = { 

348 detector0: pipeBase.InMemoryDatasetHandle(calexp0), 

349 detector1: pipeBase.InMemoryDatasetHandle(calexp1), 

350 } 

351 

352 results = self.finalizeCharacterizationTask.run( 

353 visit, 

354 band, 

355 self.isolated_star_cat_dict, 

356 self.isolated_star_source_dict, 

357 src_dict, 

358 calexp_dict, 

359 fgcm_standard_star_dict=self.fgcm_cat, 

360 ) 

361 

362 # Get the dummy values. 

363 psf, ap_corr_map, measured_src = _make_dummy_psf_and_ap_corr_map() 

364 

365 self.assertEqual(len(results.psf_ap_corr_cat), 2) 

366 np.testing.assert_array_equal(results.psf_ap_corr_cat["id"], [detector0, detector1]) 

367 np.testing.assert_array_equal(results.psf_ap_corr_cat["visit"], visit) 

368 row = results.psf_ap_corr_cat.find(detector0) 

369 self.assertEqual(row.getPsf().getSigma(), psf.getSigma()) 

370 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map)) 

371 np.testing.assert_array_equal(results.output_table["visit"], visit) 

372 table_len = len(results.output_table) 

373 np.testing.assert_array_equal(results.output_table["detector"][0: table_len // 2], detector0) 

374 np.testing.assert_array_equal(results.output_table["detector"][table_len // 2:], detector1) 

375 

376 def test_run_detectors(self): 

377 """Test the run method on individual detectors.""" 

378 visit = 100 

379 detector0 = 0 

380 detector1 = 1 

381 band = 'r' 

382 

383 src0 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema()) 

384 src1 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema()) 

385 calexp0 = afwImage.ExposureF() 

386 calexp1 = afwImage.ExposureF() 

387 

388 results0 = self.finalizeCharacterizationDetectorTask.run( 

389 visit, 

390 band, 

391 detector0, 

392 self.isolated_star_cat_dict, 

393 self.isolated_star_source_dict, 

394 src0, 

395 calexp0, 

396 fgcm_standard_star_dict=self.fgcm_cat, 

397 ) 

398 

399 results1 = self.finalizeCharacterizationDetectorTask.run( 

400 visit, 

401 band, 

402 detector1, 

403 self.isolated_star_cat_dict, 

404 self.isolated_star_source_dict, 

405 src1, 

406 calexp1, 

407 fgcm_standard_star_dict=self.fgcm_cat, 

408 ) 

409 

410 # Get the dummy values. 

411 psf, ap_corr_map, measured_src = _make_dummy_psf_and_ap_corr_map() 

412 

413 # Compare to expected values. 

414 self.assertEqual(len(results0.psf_ap_corr_cat), 1) 

415 self.assertEqual(len(results1.psf_ap_corr_cat), 1) 

416 np.testing.assert_array_equal(results0.psf_ap_corr_cat["id"], detector0) 

417 np.testing.assert_array_equal(results1.psf_ap_corr_cat["id"], detector1) 

418 np.testing.assert_array_equal(results0.psf_ap_corr_cat["visit"], visit) 

419 np.testing.assert_array_equal(results1.psf_ap_corr_cat["visit"], visit) 

420 row = results0.psf_ap_corr_cat.find(detector0) 

421 self.assertEqual(row.getPsf().getSigma(), psf.getSigma()) 

422 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map)) 

423 row = results1.psf_ap_corr_cat.find(detector1) 

424 self.assertEqual(row.getPsf().getSigma(), psf.getSigma()) 

425 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map)) 

426 np.testing.assert_array_equal(results0.output_table["visit"], visit) 

427 np.testing.assert_array_equal(results1.output_table["visit"], visit) 

428 np.testing.assert_array_equal(results0.output_table["detector"], detector0) 

429 np.testing.assert_array_equal(results1.output_table["detector"], detector1) 

430 

431 # Now test the task to concatenate these together. 

432 consolidate_task = ConsolidateFinalizeCharacterizationDetectorTask( 

433 config=ConsolidateFinalizeCharacterizationDetectorConfig(), 

434 ) 

435 

436 psf_ap_corr_detector_dict = { 

437 detector0: pipeBase.InMemoryDatasetHandle(results0.psf_ap_corr_cat), 

438 detector1: pipeBase.InMemoryDatasetHandle(results1.psf_ap_corr_cat), 

439 } 

440 src_detector_table_dict = { 

441 detector0: pipeBase.InMemoryDatasetHandle(results0.output_table, storageClass="ArrowAstropy"), 

442 detector1: pipeBase.InMemoryDatasetHandle(results1.output_table, storageClass="ArrowAstropy"), 

443 } 

444 

445 results = consolidate_task.run( 

446 psf_ap_corr_detector_dict=psf_ap_corr_detector_dict, 

447 src_detector_table_dict=src_detector_table_dict, 

448 ) 

449 

450 self.assertEqual(len(results.psf_ap_corr_cat), 2) 

451 np.testing.assert_array_equal(results.psf_ap_corr_cat["id"], [detector0, detector1]) 

452 np.testing.assert_array_equal(results.psf_ap_corr_cat["visit"], visit) 

453 row = results.psf_ap_corr_cat.find(detector0) 

454 self.assertEqual(row.getPsf().getSigma(), psf.getSigma()) 

455 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map)) 

456 np.testing.assert_array_equal(results.output_table["visit"], visit) 

457 table_len = len(results.output_table) 

458 np.testing.assert_array_equal(results.output_table["detector"][0: table_len // 2], detector0) 

459 np.testing.assert_array_equal(results.output_table["detector"][table_len // 2:], detector1) 

460 

461 

462class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

463 pass 

464 

465 

466def setup_module(module): 

467 lsst.utils.tests.init() 

468 

469 

470if __name__ == "__main__": 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 lsst.utils.tests.init() 

472 unittest.main()