Coverage for tests / test_isolatedStarAssociation.py: 12%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:11 +0000

1# This file is part of pipe_tasks. 

2# 

3# LSST Data Management System 

4# This product includes software developed by the 

5# LSST Project (http://www.lsst.org/). 

6# See COPYRIGHT file at the top of the source tree. 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22"""Test IsolatedStarAssociationTask. 

23""" 

24import unittest 

25 

26import astropy.table 

27import numpy as np 

28 

29import lsst.utils.tests 

30import lsst.pipe.base 

31import lsst.skymap 

32 

33from lsst.pipe.tasks.isolatedStarAssociation import (IsolatedStarAssociationConfig, 

34 IsolatedStarAssociationTask) 

35from smatch.matcher import Matcher 

36 

37 

38class IsolatedStarAssociationTestCase(lsst.utils.tests.TestCase): 

39 """Tests of IsolatedStarAssociationTask. 

40 

41 These tests bypass the middleware used for accessing data and 

42 managing Task execution. 

43 """ 

44 def setUp(self): 

45 self.skymap = self._make_skymap() 

46 self.tract = 9813 

47 self.handles = self._make_simdata(self.tract) 

48 self.visits = np.arange(len(self.handles)) + 1 

49 

50 self.handle_dict = {visit: handle for visit, handle in zip(self.visits, self.handles)} 

51 

52 config = IsolatedStarAssociationConfig() 

53 config.band_order = ['i', 'r'] 

54 config.extra_columns = ['extra_column'] 

55 config.source_selector['science'].doFlags = False 

56 config.source_selector['science'].doIsolated = False 

57 

58 self.isolatedStarAssociationTask = IsolatedStarAssociationTask(config=config) 

59 

60 def _make_skymap(self): 

61 """Make a testing skymap.""" 

62 skymap_config = lsst.skymap.ringsSkyMap.RingsSkyMapConfig() 

63 skymap_config.numRings = 120 

64 skymap_config.projection = "TAN" 

65 skymap_config.tractOverlap = 1.0/60 

66 skymap_config.pixelScale = 0.168 

67 return lsst.skymap.ringsSkyMap.RingsSkyMap(skymap_config) 

68 

69 def _make_simdata(self, 

70 tract, 

71 only_neighbors=False, 

72 only_out_of_tract=False, 

73 only_out_of_inner_tract=False, 

74 no_secondary_overlap=False): 

75 """Make simulated data tables and references. 

76 

77 Parameters 

78 ---------- 

79 only_neighbors : `bool`, optional 

80 Only put in neighbors. 

81 only_out_of_tract : `bool`, optional 

82 All stars are out of the tract. 

83 only_out_of_inner_tract : `bool`, optional 

84 All stars are out of the inner tract. 

85 no_secondary_overlap : `bool`, optional 

86 Secondary band has no overlap with the primary band. 

87 

88 Returns 

89 ------- 

90 handles : `list` [`InMemoryDatasetHandle`] 

91 List of mock references. 

92 """ 

93 np.random.seed(12345) 

94 

95 n_visit_per_band = 5 

96 n_star_both = 50 

97 n_star_just_one = 5 

98 

99 tract_info = self.skymap[tract] 

100 ctr = tract_info.ctr_coord 

101 ctr_ra = ctr.getRa().asDegrees() 

102 ctr_dec = ctr.getDec().asDegrees() 

103 

104 ra_both = np.linspace(ctr_ra - 1.5, ctr_ra + 1.5, n_star_both) 

105 dec_both = np.linspace(ctr_dec - 1.5, ctr_dec + 1.5, n_star_both) 

106 

107 ra_just_r = np.linspace(ctr_ra - 0.5, ctr_ra + 0.5, n_star_just_one) 

108 dec_just_r = np.linspace(ctr_dec + 0.2, ctr_dec + 0.2, n_star_just_one) 

109 ra_just_i = np.linspace(ctr_ra - 0.5, ctr_ra + 0.5, n_star_just_one) 

110 dec_just_i = np.linspace(ctr_dec - 0.2, ctr_dec - 0.2, n_star_just_one) 

111 

112 ra_neighbor = np.array([ra_both[n_star_both//2] + 1./3600.]) 

113 dec_neighbor = np.array([dec_both[n_star_both//2] + 1./3600.]) 

114 

115 # Create the r-band datarefs 

116 dtype = [('sourceId', 'i8'), 

117 ('ra', 'f8'), 

118 ('dec', 'f8'), 

119 ('normCompTophatFlux_instFlux', 'f4'), 

120 ('normCompTophatFlux_instFluxErr', 'f4'), 

121 ('normCompTophatFlux_instFlux_flag', '?'), 

122 ('extendedness', 'f4'), 

123 ('sizeExtendedness', 'f4'), 

124 ('detect_isPrimary', bool), 

125 ('visit', 'i4'), 

126 ('detector', 'i4'), 

127 ('physical_filter', 'U10'), 

128 ('band', 'U2'), 

129 ('extra_column', 'f4')] 

130 

131 id_counter = 0 

132 visit_counter = 1 

133 

134 handles = [] 

135 for band in ['r', 'i']: 

136 if band == 'r': 

137 filtername = 'R FILTER' 

138 ra_just = ra_just_r 

139 dec_just = dec_just_r 

140 else: 

141 filtername = 'I FILTER' 

142 ra_just = ra_just_i 

143 dec_just = dec_just_i 

144 

145 if only_neighbors: 

146 star_ra = np.concatenate(([ra_both[n_star_both//2]], ra_neighbor)) 

147 star_dec = np.concatenate(([dec_both[n_star_both//2]], dec_neighbor)) 

148 elif no_secondary_overlap: 

149 star_ra = np.concatenate((ra_just,)) 

150 star_dec = np.concatenate((dec_just,)) 

151 else: 

152 star_ra = np.concatenate((ra_both, ra_neighbor, ra_just)) 

153 star_dec = np.concatenate((dec_both, dec_neighbor, dec_just)) 

154 

155 if only_out_of_tract: 

156 poly = self.skymap[self.tract].outer_sky_polygon 

157 use = ~poly.contains(np.deg2rad(star_ra), np.deg2rad(star_dec)) 

158 star_ra = star_ra[use] 

159 star_dec = star_dec[use] 

160 elif only_out_of_inner_tract: 

161 inner_tract_ids = self.skymap.findTractIdArray(star_ra, star_dec, degrees=True) 

162 use = (inner_tract_ids != self.tract) 

163 star_ra = star_ra[use] 

164 star_dec = star_dec[use] 

165 

166 nstar = len(star_ra) 

167 

168 for i in range(n_visit_per_band): 

169 ras = np.random.normal(loc=star_ra, scale=0.2/3600.) 

170 decs = np.random.normal(loc=star_dec, scale=0.2/3600.) 

171 

172 table = np.zeros(nstar, dtype=dtype) 

173 table['sourceId'] = np.arange(nstar) + id_counter 

174 table['ra'] = ras 

175 table['dec'] = decs 

176 table['normCompTophatFlux_instFlux'] = 100.0 

177 table['normCompTophatFlux_instFluxErr'] = 1.0 

178 table['physical_filter'] = filtername 

179 table['band'] = band 

180 table['extra_column'] = np.ones(nstar) 

181 table['visit'] = visit_counter 

182 table['detector'] = 1 

183 table['detect_isPrimary'] = True 

184 

185 if i == 0: 

186 # Make one star have low s/n 

187 table['normCompTophatFlux_instFlux'][0] = 1.0 

188 

189 tbl = astropy.table.Table(table) 

190 handles.append(lsst.pipe.base.InMemoryDatasetHandle(tbl, storageClass="ArrowAstropy")) 

191 

192 id_counter += nstar 

193 visit_counter += 1 

194 

195 self.n_visit_per_band = n_visit_per_band 

196 self.n_star_both = n_star_both 

197 self.n_star_just_one = n_star_just_one 

198 if only_neighbors: 

199 self.star_ras = np.concatenate(([ra_both[n_star_both//2]], ra_neighbor)) 

200 self.star_decs = np.concatenate(([dec_both[n_star_both//2]], dec_neighbor)) 

201 else: 

202 self.star_ras = np.concatenate((ra_both, ra_just_r, ra_just_i, ra_neighbor)) 

203 self.star_decs = np.concatenate((dec_both, dec_just_r, dec_just_i, dec_neighbor)) 

204 

205 return handles 

206 

207 def test_compute_unique_ids(self): 

208 """Test computation of unique ids.""" 

209 ids1 = self.isolatedStarAssociationTask._compute_unique_ids(self.skymap, 

210 9813, 

211 10000) 

212 ids2 = self.isolatedStarAssociationTask._compute_unique_ids(self.skymap, 

213 9814, 

214 5000) 

215 ids = np.concatenate((ids1, ids2)) 

216 self.assertEqual(len(np.unique(ids)), len(ids)) 

217 

218 def test_remove_neighbors(self): 

219 """Test removing close neighbors.""" 

220 primary_star_cat = np.zeros(3, dtype=[('ra', 'f8'), 

221 ('dec', 'f8')]) 

222 

223 # Put two stars < 2" apart, and across the 0/360 boundary. 

224 primary_star_cat['ra'] = [0.7/3600., 360.0 - 0.7/3600., 1.0] 

225 primary_star_cat['dec'] = [5.0, 5.0, 5.0] 

226 

227 cut_cat = self.isolatedStarAssociationTask._remove_neighbors(primary_star_cat) 

228 

229 self.assertEqual(len(cut_cat), 1) 

230 np.testing.assert_almost_equal(1.0, cut_cat['ra'][0]) 

231 

232 def test_match_primary_stars(self): 

233 """Test matching primary stars.""" 

234 # Stack all the sources; we do not want any cutting here. 

235 tables = [] 

236 for handle in self.handles: 

237 tbl = handle.get() 

238 tables.append(np.asarray(tbl)) 

239 source_cat = np.concatenate(tables) 

240 

241 primary_star_cat = self.isolatedStarAssociationTask._match_primary_stars(['i', 'r'], 

242 source_cat) 

243 

244 # Ensure we found the right number of stars in each p 

245 test_i = (primary_star_cat['primary_band'] == 'i') 

246 self.assertEqual(test_i.sum(), self.n_star_both + self.n_star_just_one + 1) 

247 test_r = (primary_star_cat['primary_band'] == 'r') 

248 self.assertEqual(test_r.sum(), self.n_star_just_one) 

249 

250 # Ensure that these stars all match to input stars within 1 arcsec. 

251 with Matcher(self.star_ras, self.star_decs) as matcher: 

252 idx, i1, i2, d = matcher.query_radius(primary_star_cat['ra'], 

253 primary_star_cat['dec'], 

254 1./3600., 

255 return_indices=True) 

256 self.assertEqual(i1.size, self.star_ras.size) 

257 

258 def test_get_source_table_visit_columns(self): 

259 """Test making of source table visit columns.""" 

260 all_columns, persist_columns = self.isolatedStarAssociationTask._get_source_table_visit_column_names() 

261 

262 # Make sure all persisted columns are in all columns. 

263 for col in persist_columns: 

264 self.assertTrue(col in all_columns) 

265 

266 # And make sure extendedness is not in persisted columns. 

267 self.assertTrue('extendedness' not in persist_columns) 

268 

269 def test_match_sources(self): 

270 """Test _match_sources source to primary matching.""" 

271 # Stack all the sources; we do not want any cutting here. 

272 tables = [] 

273 for handle in self.handles: 

274 tbl = handle.get() 

275 tables.append(np.asarray(tbl)) 

276 source_cat = np.concatenate(tables) 

277 

278 source_cat = np.lib.recfunctions.append_fields(source_cat, 

279 ['obj_index'], 

280 [np.zeros(source_cat.size, dtype=np.int32)], 

281 dtypes=['i4'], 

282 usemask=False) 

283 

284 primary_bands = ['i', 'r'] 

285 

286 primary_cat = np.zeros(self.star_ras.size, 

287 dtype=self.isolatedStarAssociationTask._get_primary_dtype(primary_bands)) 

288 primary_cat['ra'] = self.star_ras 

289 primary_cat['dec'] = self.star_decs 

290 

291 source_cat_sorted, primary_star_cat = self.isolatedStarAssociationTask._match_sources(['i', 'r'], 

292 source_cat, 

293 primary_cat) 

294 # All the star sources should be matched 

295 self.assertEqual(source_cat_sorted.size, source_cat.size) 

296 

297 # Full index tests are performed in test_run_isolated_star_association_task 

298 

299 def test_make_all_star_sources(self): 

300 """Test appending all the star sources.""" 

301 source_cat = self.isolatedStarAssociationTask._make_all_star_sources(self.skymap[self.tract], 

302 self.handle_dict) 

303 

304 # Make sure we don't have any low s/n sources. 

305 sn_min = np.min(source_cat['normCompTophatFlux_instFlux'] 

306 / source_cat['normCompTophatFlux_instFluxErr']) 

307 self.assertGreater(sn_min, 8.0) 

308 

309 # And make sure they are all within the tract outer boundary. 

310 poly = self.skymap[self.tract].outer_sky_polygon 

311 use = poly.contains(np.deg2rad(source_cat['ra']), np.deg2rad(source_cat['dec'])) 

312 self.assertEqual(use.sum(), len(source_cat)) 

313 

314 def test_run_isolated_star_association_task(self): 

315 """Test running the full task.""" 

316 struct = self.isolatedStarAssociationTask.run(self.skymap, 

317 self.tract, 

318 self.handle_dict) 

319 

320 star_source_cat = struct.star_source_cat 

321 star_cat = struct.star_cat 

322 

323 # Check that sources are all unique ids 

324 self.assertEqual(np.unique(star_source_cat['sourceId']).size, star_source_cat.size) 

325 

326 inner_tract_ids = self.skymap.findTractIdArray(self.star_ras, 

327 self.star_decs, 

328 degrees=True) 

329 inner_stars = (inner_tract_ids == self.tract) 

330 

331 # There should be the same number of stars in the inner tract region, 

332 # taking away the 2 close neighbors. 

333 self.assertEqual(star_cat.size, np.sum(inner_stars) - 2) 

334 

335 # Check the star indices 

336 for i in range(len(star_cat)): 

337 all_source_star = star_source_cat[star_cat['source_cat_index'][i]: 

338 star_cat['source_cat_index'][i] + star_cat['nsource'][i]] 

339 

340 # Check that these all point to the correct object 

341 np.testing.assert_array_equal(all_source_star['obj_index'], i) 

342 

343 # Check these are all pointing to the same star position 

344 with Matcher(np.atleast_1d(star_cat['ra'][i]), 

345 np.atleast_1d(star_cat['dec'][i])) as matcher: 

346 idx = matcher.query_radius(all_source_star['ra'], 

347 all_source_star['dec'], 

348 1./3600.) 

349 self.assertEqual(len(idx[0]), star_cat['nsource'][i]) 

350 

351 # Check per band indices 

352 for band in ['r', 'i']: 

353 band_source_star = star_source_cat[star_cat[f'source_cat_index_{band}'][i]: 

354 star_cat[f'source_cat_index_{band}'][i] 

355 + star_cat[f'nsource_{band}'][i]] 

356 with Matcher(np.atleast_1d(star_cat['ra'][i]), 

357 np.atleast_1d(star_cat['dec'][i])) as matcher: 

358 idx = matcher.query_radius(band_source_star['ra'], 

359 band_source_star['dec'], 

360 1./3600.) 

361 self.assertEqual(len(idx[0]), star_cat[f'nsource_{band}'][i]) 

362 

363 def test_run_task_all_neighbors(self): 

364 """Test running the task when all the stars are rejected as neighbors.""" 

365 handles = self._make_simdata(self.tract, only_neighbors=True) 

366 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} 

367 

368 struct = self.isolatedStarAssociationTask.run(self.skymap, 

369 self.tract, 

370 handle_dict) 

371 

372 # These should ber zero length. 

373 self.assertEqual(len(struct.star_source_cat), 0) 

374 self.assertEqual(len(struct.star_cat), 0) 

375 # And spot-check a couple of expected fields to make sure they have the right type. 

376 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names) 

377 self.assertTrue('nsource_i' in struct.star_cat.dtype.names) 

378 

379 def test_run_task_all_out_of_tract(self): 

380 """Test running the task when all the sources are out of the tract.""" 

381 handles = self._make_simdata(self.tract, only_out_of_tract=True) 

382 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} 

383 

384 struct = self.isolatedStarAssociationTask.run(self.skymap, 

385 self.tract, 

386 handle_dict) 

387 

388 # These should ber zero length. 

389 self.assertEqual(len(struct.star_source_cat), 0) 

390 self.assertEqual(len(struct.star_cat), 0) 

391 # And spot-check a couple of expected fields to make sure they have the right type. 

392 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names) 

393 self.assertTrue('nsource_i' in struct.star_cat.dtype.names) 

394 

395 def test_run_task_all_out_of_inner_tract(self): 

396 """Test running the task when all the sources are out of the inner tract.""" 

397 handles = self._make_simdata(self.tract, only_out_of_inner_tract=True) 

398 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} 

399 

400 struct = self.isolatedStarAssociationTask.run(self.skymap, 

401 self.tract, 

402 handle_dict) 

403 

404 # These should ber zero length. 

405 self.assertEqual(len(struct.star_source_cat), 0) 

406 self.assertEqual(len(struct.star_cat), 0) 

407 # And spot-check a couple of expected fields to make sure they have the right type. 

408 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names) 

409 self.assertTrue('nsource_i' in struct.star_cat.dtype.names) 

410 

411 def test_run_task_secondary_no_overlap(self): 

412 """Test running the task when the secondary band has no overlaps. 

413 

414 This tests DM-34834. 

415 """ 

416 handles = self._make_simdata(self.tract, no_secondary_overlap=True) 

417 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} 

418 

419 struct = self.isolatedStarAssociationTask.run(self.skymap, 

420 self.tract, 

421 handle_dict) 

422 

423 # Add a sanity check that we got a catalog out. 

424 self.assertGreater(len(struct.star_source_cat), 0) 

425 self.assertGreater(len(struct.star_cat), 0) 

426 

427 

428class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

429 pass 

430 

431 

432def setup_module(module): 

433 lsst.utils.tests.init() 

434 

435 

436if __name__ == "__main__": 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true

437 lsst.utils.tests.init() 

438 unittest.main()