Coverage for tests / test_isolatedStarAssociation.py: 12%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:52 +0000
1# This file is part of pipe_tasks.
2#
3# LSST Data Management System
4# This product includes software developed by the
5# LSST Project (http://www.lsst.org/).
6# See COPYRIGHT file at the top of the source tree.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
22"""Test IsolatedStarAssociationTask.
23"""
24import unittest
26import astropy.table
27import numpy as np
29import lsst.utils.tests
30import lsst.pipe.base
31import lsst.skymap
33from lsst.pipe.tasks.isolatedStarAssociation import (IsolatedStarAssociationConfig,
34 IsolatedStarAssociationTask)
35from smatch.matcher import Matcher
38class IsolatedStarAssociationTestCase(lsst.utils.tests.TestCase):
39 """Tests of IsolatedStarAssociationTask.
41 These tests bypass the middleware used for accessing data and
42 managing Task execution.
43 """
44 def setUp(self):
45 self.skymap = self._make_skymap()
46 self.tract = 9813
47 self.handles = self._make_simdata(self.tract)
48 self.visits = np.arange(len(self.handles)) + 1
50 self.handle_dict = {visit: handle for visit, handle in zip(self.visits, self.handles)}
52 config = IsolatedStarAssociationConfig()
53 config.band_order = ['i', 'r']
54 config.extra_columns = ['extra_column']
55 config.source_selector['science'].doFlags = False
56 config.source_selector['science'].doIsolated = False
58 self.isolatedStarAssociationTask = IsolatedStarAssociationTask(config=config)
60 def _make_skymap(self):
61 """Make a testing skymap."""
62 skymap_config = lsst.skymap.ringsSkyMap.RingsSkyMapConfig()
63 skymap_config.numRings = 120
64 skymap_config.projection = "TAN"
65 skymap_config.tractOverlap = 1.0/60
66 skymap_config.pixelScale = 0.168
67 return lsst.skymap.ringsSkyMap.RingsSkyMap(skymap_config)
69 def _make_simdata(self,
70 tract,
71 only_neighbors=False,
72 only_out_of_tract=False,
73 only_out_of_inner_tract=False,
74 no_secondary_overlap=False):
75 """Make simulated data tables and references.
77 Parameters
78 ----------
79 only_neighbors : `bool`, optional
80 Only put in neighbors.
81 only_out_of_tract : `bool`, optional
82 All stars are out of the tract.
83 only_out_of_inner_tract : `bool`, optional
84 All stars are out of the inner tract.
85 no_secondary_overlap : `bool`, optional
86 Secondary band has no overlap with the primary band.
88 Returns
89 -------
90 handles : `list` [`InMemoryDatasetHandle`]
91 List of mock references.
92 """
93 np.random.seed(12345)
95 n_visit_per_band = 5
96 n_star_both = 50
97 n_star_just_one = 5
99 tract_info = self.skymap[tract]
100 ctr = tract_info.ctr_coord
101 ctr_ra = ctr.getRa().asDegrees()
102 ctr_dec = ctr.getDec().asDegrees()
104 ra_both = np.linspace(ctr_ra - 1.5, ctr_ra + 1.5, n_star_both)
105 dec_both = np.linspace(ctr_dec - 1.5, ctr_dec + 1.5, n_star_both)
107 ra_just_r = np.linspace(ctr_ra - 0.5, ctr_ra + 0.5, n_star_just_one)
108 dec_just_r = np.linspace(ctr_dec + 0.2, ctr_dec + 0.2, n_star_just_one)
109 ra_just_i = np.linspace(ctr_ra - 0.5, ctr_ra + 0.5, n_star_just_one)
110 dec_just_i = np.linspace(ctr_dec - 0.2, ctr_dec - 0.2, n_star_just_one)
112 ra_neighbor = np.array([ra_both[n_star_both//2] + 1./3600.])
113 dec_neighbor = np.array([dec_both[n_star_both//2] + 1./3600.])
115 # Create the r-band datarefs
116 dtype = [('sourceId', 'i8'),
117 ('ra', 'f8'),
118 ('dec', 'f8'),
119 ('normCompTophatFlux_instFlux', 'f4'),
120 ('normCompTophatFlux_instFluxErr', 'f4'),
121 ('normCompTophatFlux_instFlux_flag', '?'),
122 ('extendedness', 'f4'),
123 ('sizeExtendedness', 'f4'),
124 ('detect_isPrimary', bool),
125 ('visit', 'i4'),
126 ('detector', 'i4'),
127 ('physical_filter', 'U10'),
128 ('band', 'U2'),
129 ('extra_column', 'f4')]
131 id_counter = 0
132 visit_counter = 1
134 handles = []
135 for band in ['r', 'i']:
136 if band == 'r':
137 filtername = 'R FILTER'
138 ra_just = ra_just_r
139 dec_just = dec_just_r
140 else:
141 filtername = 'I FILTER'
142 ra_just = ra_just_i
143 dec_just = dec_just_i
145 if only_neighbors:
146 star_ra = np.concatenate(([ra_both[n_star_both//2]], ra_neighbor))
147 star_dec = np.concatenate(([dec_both[n_star_both//2]], dec_neighbor))
148 elif no_secondary_overlap:
149 star_ra = np.concatenate((ra_just,))
150 star_dec = np.concatenate((dec_just,))
151 else:
152 star_ra = np.concatenate((ra_both, ra_neighbor, ra_just))
153 star_dec = np.concatenate((dec_both, dec_neighbor, dec_just))
155 if only_out_of_tract:
156 poly = self.skymap[self.tract].outer_sky_polygon
157 use = ~poly.contains(np.deg2rad(star_ra), np.deg2rad(star_dec))
158 star_ra = star_ra[use]
159 star_dec = star_dec[use]
160 elif only_out_of_inner_tract:
161 inner_tract_ids = self.skymap.findTractIdArray(star_ra, star_dec, degrees=True)
162 use = (inner_tract_ids != self.tract)
163 star_ra = star_ra[use]
164 star_dec = star_dec[use]
166 nstar = len(star_ra)
168 for i in range(n_visit_per_band):
169 ras = np.random.normal(loc=star_ra, scale=0.2/3600.)
170 decs = np.random.normal(loc=star_dec, scale=0.2/3600.)
172 table = np.zeros(nstar, dtype=dtype)
173 table['sourceId'] = np.arange(nstar) + id_counter
174 table['ra'] = ras
175 table['dec'] = decs
176 table['normCompTophatFlux_instFlux'] = 100.0
177 table['normCompTophatFlux_instFluxErr'] = 1.0
178 table['physical_filter'] = filtername
179 table['band'] = band
180 table['extra_column'] = np.ones(nstar)
181 table['visit'] = visit_counter
182 table['detector'] = 1
183 table['detect_isPrimary'] = True
185 if i == 0:
186 # Make one star have low s/n
187 table['normCompTophatFlux_instFlux'][0] = 1.0
189 tbl = astropy.table.Table(table)
190 handles.append(lsst.pipe.base.InMemoryDatasetHandle(tbl, storageClass="ArrowAstropy"))
192 id_counter += nstar
193 visit_counter += 1
195 self.n_visit_per_band = n_visit_per_band
196 self.n_star_both = n_star_both
197 self.n_star_just_one = n_star_just_one
198 if only_neighbors:
199 self.star_ras = np.concatenate(([ra_both[n_star_both//2]], ra_neighbor))
200 self.star_decs = np.concatenate(([dec_both[n_star_both//2]], dec_neighbor))
201 else:
202 self.star_ras = np.concatenate((ra_both, ra_just_r, ra_just_i, ra_neighbor))
203 self.star_decs = np.concatenate((dec_both, dec_just_r, dec_just_i, dec_neighbor))
205 return handles
207 def test_compute_unique_ids(self):
208 """Test computation of unique ids."""
209 ids1 = self.isolatedStarAssociationTask._compute_unique_ids(self.skymap,
210 9813,
211 10000)
212 ids2 = self.isolatedStarAssociationTask._compute_unique_ids(self.skymap,
213 9814,
214 5000)
215 ids = np.concatenate((ids1, ids2))
216 self.assertEqual(len(np.unique(ids)), len(ids))
218 def test_remove_neighbors(self):
219 """Test removing close neighbors."""
220 primary_star_cat = np.zeros(3, dtype=[('ra', 'f8'),
221 ('dec', 'f8')])
223 # Put two stars < 2" apart, and across the 0/360 boundary.
224 primary_star_cat['ra'] = [0.7/3600., 360.0 - 0.7/3600., 1.0]
225 primary_star_cat['dec'] = [5.0, 5.0, 5.0]
227 cut_cat = self.isolatedStarAssociationTask._remove_neighbors(primary_star_cat)
229 self.assertEqual(len(cut_cat), 1)
230 np.testing.assert_almost_equal(1.0, cut_cat['ra'][0])
232 def test_match_primary_stars(self):
233 """Test matching primary stars."""
234 # Stack all the sources; we do not want any cutting here.
235 tables = []
236 for handle in self.handles:
237 tbl = handle.get()
238 tables.append(np.asarray(tbl))
239 source_cat = np.concatenate(tables)
241 primary_star_cat = self.isolatedStarAssociationTask._match_primary_stars(['i', 'r'],
242 source_cat)
244 # Ensure we found the right number of stars in each p
245 test_i = (primary_star_cat['primary_band'] == 'i')
246 self.assertEqual(test_i.sum(), self.n_star_both + self.n_star_just_one + 1)
247 test_r = (primary_star_cat['primary_band'] == 'r')
248 self.assertEqual(test_r.sum(), self.n_star_just_one)
250 # Ensure that these stars all match to input stars within 1 arcsec.
251 with Matcher(self.star_ras, self.star_decs) as matcher:
252 idx, i1, i2, d = matcher.query_radius(primary_star_cat['ra'],
253 primary_star_cat['dec'],
254 1./3600.,
255 return_indices=True)
256 self.assertEqual(i1.size, self.star_ras.size)
258 def test_get_source_table_visit_columns(self):
259 """Test making of source table visit columns."""
260 all_columns, persist_columns = self.isolatedStarAssociationTask._get_source_table_visit_column_names()
262 # Make sure all persisted columns are in all columns.
263 for col in persist_columns:
264 self.assertTrue(col in all_columns)
266 # And make sure extendedness is not in persisted columns.
267 self.assertTrue('extendedness' not in persist_columns)
269 def test_match_sources(self):
270 """Test _match_sources source to primary matching."""
271 # Stack all the sources; we do not want any cutting here.
272 tables = []
273 for handle in self.handles:
274 tbl = handle.get()
275 tables.append(np.asarray(tbl))
276 source_cat = np.concatenate(tables)
278 source_cat = np.lib.recfunctions.append_fields(source_cat,
279 ['obj_index'],
280 [np.zeros(source_cat.size, dtype=np.int32)],
281 dtypes=['i4'],
282 usemask=False)
284 primary_bands = ['i', 'r']
286 primary_cat = np.zeros(self.star_ras.size,
287 dtype=self.isolatedStarAssociationTask._get_primary_dtype(primary_bands))
288 primary_cat['ra'] = self.star_ras
289 primary_cat['dec'] = self.star_decs
291 source_cat_sorted, primary_star_cat = self.isolatedStarAssociationTask._match_sources(['i', 'r'],
292 source_cat,
293 primary_cat)
294 # All the star sources should be matched
295 self.assertEqual(source_cat_sorted.size, source_cat.size)
297 # Full index tests are performed in test_run_isolated_star_association_task
299 def test_make_all_star_sources(self):
300 """Test appending all the star sources."""
301 source_cat = self.isolatedStarAssociationTask._make_all_star_sources(self.skymap[self.tract],
302 self.handle_dict)
304 # Make sure we don't have any low s/n sources.
305 sn_min = np.min(source_cat['normCompTophatFlux_instFlux']
306 / source_cat['normCompTophatFlux_instFluxErr'])
307 self.assertGreater(sn_min, 8.0)
309 # And make sure they are all within the tract outer boundary.
310 poly = self.skymap[self.tract].outer_sky_polygon
311 use = poly.contains(np.deg2rad(source_cat['ra']), np.deg2rad(source_cat['dec']))
312 self.assertEqual(use.sum(), len(source_cat))
314 def test_run_isolated_star_association_task(self):
315 """Test running the full task."""
316 struct = self.isolatedStarAssociationTask.run(self.skymap,
317 self.tract,
318 self.handle_dict)
320 star_source_cat = struct.star_source_cat
321 star_cat = struct.star_cat
323 # Check that sources are all unique ids
324 self.assertEqual(np.unique(star_source_cat['sourceId']).size, star_source_cat.size)
326 inner_tract_ids = self.skymap.findTractIdArray(self.star_ras,
327 self.star_decs,
328 degrees=True)
329 inner_stars = (inner_tract_ids == self.tract)
331 # There should be the same number of stars in the inner tract region,
332 # taking away the 2 close neighbors.
333 self.assertEqual(star_cat.size, np.sum(inner_stars) - 2)
335 # Check the star indices
336 for i in range(len(star_cat)):
337 all_source_star = star_source_cat[star_cat['source_cat_index'][i]:
338 star_cat['source_cat_index'][i] + star_cat['nsource'][i]]
340 # Check that these all point to the correct object
341 np.testing.assert_array_equal(all_source_star['obj_index'], i)
343 # Check these are all pointing to the same star position
344 with Matcher(np.atleast_1d(star_cat['ra'][i]),
345 np.atleast_1d(star_cat['dec'][i])) as matcher:
346 idx = matcher.query_radius(all_source_star['ra'],
347 all_source_star['dec'],
348 1./3600.)
349 self.assertEqual(len(idx[0]), star_cat['nsource'][i])
351 # Check per band indices
352 for band in ['r', 'i']:
353 band_source_star = star_source_cat[star_cat[f'source_cat_index_{band}'][i]:
354 star_cat[f'source_cat_index_{band}'][i]
355 + star_cat[f'nsource_{band}'][i]]
356 with Matcher(np.atleast_1d(star_cat['ra'][i]),
357 np.atleast_1d(star_cat['dec'][i])) as matcher:
358 idx = matcher.query_radius(band_source_star['ra'],
359 band_source_star['dec'],
360 1./3600.)
361 self.assertEqual(len(idx[0]), star_cat[f'nsource_{band}'][i])
363 def test_run_task_all_neighbors(self):
364 """Test running the task when all the stars are rejected as neighbors."""
365 handles = self._make_simdata(self.tract, only_neighbors=True)
366 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}
368 struct = self.isolatedStarAssociationTask.run(self.skymap,
369 self.tract,
370 handle_dict)
372 # These should ber zero length.
373 self.assertEqual(len(struct.star_source_cat), 0)
374 self.assertEqual(len(struct.star_cat), 0)
375 # And spot-check a couple of expected fields to make sure they have the right type.
376 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names)
377 self.assertTrue('nsource_i' in struct.star_cat.dtype.names)
379 def test_run_task_all_out_of_tract(self):
380 """Test running the task when all the sources are out of the tract."""
381 handles = self._make_simdata(self.tract, only_out_of_tract=True)
382 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}
384 struct = self.isolatedStarAssociationTask.run(self.skymap,
385 self.tract,
386 handle_dict)
388 # These should ber zero length.
389 self.assertEqual(len(struct.star_source_cat), 0)
390 self.assertEqual(len(struct.star_cat), 0)
391 # And spot-check a couple of expected fields to make sure they have the right type.
392 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names)
393 self.assertTrue('nsource_i' in struct.star_cat.dtype.names)
395 def test_run_task_all_out_of_inner_tract(self):
396 """Test running the task when all the sources are out of the inner tract."""
397 handles = self._make_simdata(self.tract, only_out_of_inner_tract=True)
398 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}
400 struct = self.isolatedStarAssociationTask.run(self.skymap,
401 self.tract,
402 handle_dict)
404 # These should ber zero length.
405 self.assertEqual(len(struct.star_source_cat), 0)
406 self.assertEqual(len(struct.star_cat), 0)
407 # And spot-check a couple of expected fields to make sure they have the right type.
408 self.assertTrue('physical_filter' in struct.star_source_cat.dtype.names)
409 self.assertTrue('nsource_i' in struct.star_cat.dtype.names)
411 def test_run_task_secondary_no_overlap(self):
412 """Test running the task when the secondary band has no overlaps.
414 This tests DM-34834.
415 """
416 handles = self._make_simdata(self.tract, no_secondary_overlap=True)
417 handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}
419 struct = self.isolatedStarAssociationTask.run(self.skymap,
420 self.tract,
421 handle_dict)
423 # Add a sanity check that we got a catalog out.
424 self.assertGreater(len(struct.star_source_cat), 0)
425 self.assertGreater(len(struct.star_cat), 0)
428class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
429 pass
432def setup_module(module):
433 lsst.utils.tests.init()
436if __name__ == "__main__": 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true
437 lsst.utils.tests.init()
438 unittest.main()