Coverage for tests / test_finalizeCharacterization.py: 13%
201 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:39 +0000
1# This file is part of pipe_tasks.
2#
3# LSST Data Management System
4# This product includes software developed by the
5# LSST Project (http://www.lsst.org/).
6# See COPYRIGHT file at the top of the source tree.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
22"""Test FinalizeCharacterizationTask.
23"""
24import logging
25import unittest
27import astropy.table.table
28import numpy as np
30import lsst.utils.tests
31import lsst.afw.detection as afwDetection
32import lsst.afw.image as afwImage
33import lsst.afw.table as afwTable
34import lsst.pipe.base as pipeBase
36from lsst.pipe.tasks.finalizeCharacterization import (
37 FinalizeCharacterizationConfig,
38 FinalizeCharacterizationTask,
39 FinalizeCharacterizationDetectorConfig,
40 FinalizeCharacterizationDetectorTask,
41 ConsolidateFinalizeCharacterizationDetectorConfig,
42 ConsolidateFinalizeCharacterizationDetectorTask,
43)
46def _make_dummy_psf_and_ap_corr_map():
47 # Make dummy versions of these products, including required fields.
48 psf = afwDetection.GaussianPsf(15, 15, 2.0)
49 ap_corr_map = afwImage.ApCorrMap()
50 schema = afwTable.SourceTable.makeMinimalSchema()
51 schema.addField("visit", type=np.int64, doc="Visit number for the sources.")
52 schema.addField("detector", type=np.int32, doc="Detector number for the sources.")
53 measured_src = afwTable.SourceCatalog(schema)
54 measured_src.resize(10)
55 measured_src["id"] = np.arange(10)
57 return psf, ap_corr_map, measured_src
60class MockFinalizeCharacterizationTask(FinalizeCharacterizationTask):
61 """A derived class which skips the initialization routines.
62 """
63 def __init__(self, **kwargs):
64 pipeBase.PipelineTask.__init__(self, **kwargs)
66 self.makeSubtask('reserve_selection')
67 self.makeSubtask('source_selector')
69 def compute_psf_and_ap_corr_map(
70 self,
71 visit,
72 detector,
73 exposure,
74 src,
75 isolated_src_table,
76 fgcm_standard_star_cat,
77 use_super=False,
78 ):
79 """A mocked version of this method."""
80 if use_super:
81 return super().compute_psf_and_ap_corr_map(visit, detector, exposure, src,
82 isolated_src_table, fgcm_standard_star_cat)
84 return _make_dummy_psf_and_ap_corr_map()
87class MockFinalizeCharacterizationDetectorTask(FinalizeCharacterizationDetectorTask):
88 """A derived class which skips the initialization routines.
89 """
90 def __init__(self, **kwargs):
91 pipeBase.PipelineTask.__init__(self, **kwargs)
93 self.makeSubtask('reserve_selection')
94 self.makeSubtask('source_selector')
96 def compute_psf_and_ap_corr_map(
97 self,
98 visit,
99 detector,
100 exposure,
101 src,
102 isolated_src_table,
103 fgcm_standard_star_cat,
104 use_super=False,
105 ):
106 """A mocked version of this method."""
107 return _make_dummy_psf_and_ap_corr_map()
110class FinalizeCharacterizationTestCase(lsst.utils.tests.TestCase):
111 """Tests of some functionality of FinalizeCharacterizationTask.
113 Full testing comes from integration tests such as ci_hsc and ci_imsim.
115 These tests bypass the middleware used for accessing data and
116 managing Task execution.
117 """
118 def setUp(self):
119 config = FinalizeCharacterizationConfig()
121 self.finalizeCharacterizationTask = MockFinalizeCharacterizationTask(
122 config=config,
123 )
125 config_det = FinalizeCharacterizationDetectorConfig()
127 self.finalizeCharacterizationDetectorTask = MockFinalizeCharacterizationDetectorTask(
128 config=config_det,
129 )
131 self.isolated_star_cat_dict, self.isolated_star_source_dict, self.fgcm_cat = self._make_isocats()
133 def _make_isocats(self):
134 """Make test isolated star catalogs.
136 Returns
137 -------
138 isolated_star_cat_dict : `dict`
139 Per-"tract" dict of isolated star catalogs.
140 isolate_star_source_dict : `dict`
141 Per-"tract" dict of isolated source catalogs.
142 """
143 dtype_cat = [('isolated_star_id', 'i8'),
144 ('ra', 'f8'),
145 ('dec', 'f8'),
146 ('primary_band', 'U2'),
147 ('source_cat_index', 'i4'),
148 ('nsource', 'i4'),
149 ('source_cat_index_i', 'i4'),
150 ('nsource_i', 'i4'),
151 ('source_cat_index_r', 'i4'),
152 ('nsource_r', 'i4'),
153 ('source_cat_index_z', 'i4'),
154 ('nsource_z', 'i4')]
156 dtype_source = [('sourceId', 'i8'),
157 ('obj_index', 'i4')]
159 dtype_fgcm = [
160 ('ra', 'f8'),
161 ('dec', 'f8'),
162 ('mag_g', 'f8'),
163 ('mag_i', 'f8'),
164 ]
166 isolated_star_cat_dict = {}
167 isolated_star_source_dict = {}
168 fgcm_cat_dict = {}
170 np.random.seed(12345)
172 # There are 90 stars in both r, i. 10 individually in each.
173 nstar = 110
174 nsource_per_band_per_star = 2
175 self.nstar_total = nstar
176 self.nstar_per_band = nstar - 10
178 # This is a brute-force assembly of a star catalog and matched sources.
179 for tract in [0, 1, 2]:
180 ra = np.random.uniform(low=tract, high=tract + 1.0, size=nstar)
181 dec = np.random.uniform(low=0.0, high=1.0, size=nstar)
183 cat = np.zeros(nstar, dtype=dtype_cat)
184 cat['isolated_star_id'] = tract*nstar + np.arange(nstar)
185 cat['ra'] = ra
186 cat['dec'] = dec
187 if tract < 2:
188 cat['primary_band'][0: 100] = 'i'
189 cat['primary_band'][100:] = 'r'
190 else:
191 # Tract 2 only has z band.
192 cat['primary_band'][:] = 'z'
194 source_cats = []
195 counter = 0
196 for i in range(cat.size):
197 cat['source_cat_index'][i] = counter
198 if tract < 2:
199 if i < 90:
200 cat['nsource'][i] = 2*nsource_per_band_per_star
201 bands = ['r', 'i']
202 else:
203 cat['nsource'][i] = nsource_per_band_per_star
204 if i < 100:
205 bands = ['i']
206 else:
207 bands = ['r']
208 else:
209 cat['nsource'][i] = nsource_per_band_per_star
210 bands = ['z']
212 for band in bands:
213 cat[f'source_cat_index_{band}'][i] = counter
214 cat[f'nsource_{band}'][i] = nsource_per_band_per_star
215 source_cat = np.zeros(nsource_per_band_per_star, dtype=dtype_source)
216 source_cat['sourceId'] = np.arange(
217 tract*nstar + counter,
218 tract*nstar + counter + nsource_per_band_per_star
219 )
220 source_cat['obj_index'] = i
222 source_cats.append(source_cat)
224 counter += nsource_per_band_per_star
226 fgcm_cat = np.zeros(nstar, dtype=dtype_fgcm)
227 fgcm_cat['ra'] = ra
228 fgcm_cat['dec'] = dec
229 fgcm_cat['mag_g'] = np.random.uniform(low=16, high=20, size=nstar)
230 fgcm_cat['mag_i'] = np.random.uniform(low=16, high=20, size=nstar)
232 source_cat = np.concatenate(source_cats)
234 isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(cat),
235 storageClass="ArrowAstropy")
236 isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(source_cat),
237 storageClass="ArrowAstropy")
238 fgcm_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(fgcm_cat),
239 storageClass="ArrowAstropy")
241 return isolated_star_cat_dict, isolated_star_source_dict, fgcm_cat_dict
243 def test_concat_isolated_star_cats(self):
244 """Test concatenation and reservation of the isolated star catalogs.
245 """
247 for band in ['r', 'i']:
248 iso, iso_src = self.finalizeCharacterizationTask.concat_isolated_star_cats(
249 band,
250 self.isolated_star_cat_dict,
251 self.isolated_star_source_dict
252 )
254 # There are two tracts, so double everything.
255 self.assertEqual(len(iso), 2*self.nstar_per_band)
257 reserve_fraction = self.finalizeCharacterizationTask.config.reserve_selection.reserve_fraction
258 self.assertEqual(np.sum(iso['reserved']),
259 int(reserve_fraction*len(iso)))
261 # 2 tracts, 4 observations per tract per star, minus 2*10 not in the given band.
262 self.assertEqual(len(iso_src), 2*(4*len(iso)//2 - 20))
264 # Check that every star is properly matched to the sources.
265 for i in range(len(iso)):
266 np.testing.assert_array_equal(
267 iso_src['obj_index'][iso[f'source_cat_index_{band}'][i]:
268 iso[f'source_cat_index_{band}'][i] + iso[f'nsource_{band}'][i]],
269 i
270 )
272 # Check that every reserved star is marked as a reserved source.
273 res_star, = np.where(iso['reserved'])
274 for i in res_star:
275 np.testing.assert_array_equal(
276 iso_src['reserved'][iso[f'source_cat_index_{band}'][i]:
277 iso[f'source_cat_index_{band}'][i] + iso[f'nsource_{band}'][i]],
278 True
279 )
281 # Check that every reserved source is marked as a reserved star.
282 res_src, = np.where(iso_src['reserved'])
283 np.testing.assert_array_equal(
284 iso['reserved'][iso_src['obj_index'][res_src]],
285 True
286 )
288 def test_concat_isolate_star_cats_no_sources(self):
289 """Test concatenation when there are no sources in a tract."""
290 iso, iso_src = self.finalizeCharacterizationTask.concat_isolated_star_cats(
291 'z',
292 self.isolated_star_cat_dict,
293 self.isolated_star_source_dict
294 )
296 self.assertGreater(len(iso), 0)
298 def test_compute_psf_and_ap_corr_map_no_sources(self):
299 """Test log message when there are no good sources after selection."""
300 # Create an empty source catalog.
301 src_schema = afwTable.SourceTable.makeMinimalSchema()
302 src_schema.addField('base_GaussianFlux_instFlux', type='F', doc='Flux field')
303 src_schema.addField('base_GaussianFlux_instFluxErr', type='F', doc='Flux field')
304 src = afwTable.SourceCatalog(src_schema)
306 # Set defaults and placeholders for required positional arguments.
307 self.finalizeCharacterizationTask.config.source_selector['science'].flags.bad = []
308 visit = 0
309 detector = 0
310 exposure = None
311 isolated_source_table = None
312 fgcm_cat = None
313 with self.assertLogs(level=logging.WARNING) as cm:
314 psf, ap_corr_map, measured_src = self.finalizeCharacterizationTask.compute_psf_and_ap_corr_map(
315 visit,
316 detector,
317 exposure,
318 src,
319 isolated_source_table,
320 fgcm_cat,
321 use_super=True,
322 )
323 self.assertIn(
324 "No good sources remain after cuts for visit {}, detector {}".format(visit, detector),
325 cm.output[0]
326 )
328 def test_run_visit(self):
329 """Test the run method on a full visit."""
330 visit = 100
331 detector0 = 0
332 detector1 = 1
333 band = 'r'
334 # src_dict should be a dictionary keyed by detector, with src handles.
335 # calexp_dict should be a dictionary keyed by detector, with calexp handles.
336 # These can be dummy objects.
338 src0 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema())
339 src1 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema())
340 calexp0 = afwImage.ExposureF()
341 calexp1 = afwImage.ExposureF()
343 src_dict = {
344 detector0: pipeBase.InMemoryDatasetHandle(src0),
345 detector1: pipeBase.InMemoryDatasetHandle(src1),
346 }
347 calexp_dict = {
348 detector0: pipeBase.InMemoryDatasetHandle(calexp0),
349 detector1: pipeBase.InMemoryDatasetHandle(calexp1),
350 }
352 results = self.finalizeCharacterizationTask.run(
353 visit,
354 band,
355 self.isolated_star_cat_dict,
356 self.isolated_star_source_dict,
357 src_dict,
358 calexp_dict,
359 fgcm_standard_star_dict=self.fgcm_cat,
360 )
362 # Get the dummy values.
363 psf, ap_corr_map, measured_src = _make_dummy_psf_and_ap_corr_map()
365 self.assertEqual(len(results.psf_ap_corr_cat), 2)
366 np.testing.assert_array_equal(results.psf_ap_corr_cat["id"], [detector0, detector1])
367 np.testing.assert_array_equal(results.psf_ap_corr_cat["visit"], visit)
368 row = results.psf_ap_corr_cat.find(detector0)
369 self.assertEqual(row.getPsf().getSigma(), psf.getSigma())
370 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map))
371 np.testing.assert_array_equal(results.output_table["visit"], visit)
372 table_len = len(results.output_table)
373 np.testing.assert_array_equal(results.output_table["detector"][0: table_len // 2], detector0)
374 np.testing.assert_array_equal(results.output_table["detector"][table_len // 2:], detector1)
376 def test_run_detectors(self):
377 """Test the run method on individual detectors."""
378 visit = 100
379 detector0 = 0
380 detector1 = 1
381 band = 'r'
383 src0 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema())
384 src1 = afwTable.SourceCatalog(afwTable.SourceTable.makeMinimalSchema())
385 calexp0 = afwImage.ExposureF()
386 calexp1 = afwImage.ExposureF()
388 results0 = self.finalizeCharacterizationDetectorTask.run(
389 visit,
390 band,
391 detector0,
392 self.isolated_star_cat_dict,
393 self.isolated_star_source_dict,
394 src0,
395 calexp0,
396 fgcm_standard_star_dict=self.fgcm_cat,
397 )
399 results1 = self.finalizeCharacterizationDetectorTask.run(
400 visit,
401 band,
402 detector1,
403 self.isolated_star_cat_dict,
404 self.isolated_star_source_dict,
405 src1,
406 calexp1,
407 fgcm_standard_star_dict=self.fgcm_cat,
408 )
410 # Get the dummy values.
411 psf, ap_corr_map, measured_src = _make_dummy_psf_and_ap_corr_map()
413 # Compare to expected values.
414 self.assertEqual(len(results0.psf_ap_corr_cat), 1)
415 self.assertEqual(len(results1.psf_ap_corr_cat), 1)
416 np.testing.assert_array_equal(results0.psf_ap_corr_cat["id"], detector0)
417 np.testing.assert_array_equal(results1.psf_ap_corr_cat["id"], detector1)
418 np.testing.assert_array_equal(results0.psf_ap_corr_cat["visit"], visit)
419 np.testing.assert_array_equal(results1.psf_ap_corr_cat["visit"], visit)
420 row = results0.psf_ap_corr_cat.find(detector0)
421 self.assertEqual(row.getPsf().getSigma(), psf.getSigma())
422 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map))
423 row = results1.psf_ap_corr_cat.find(detector1)
424 self.assertEqual(row.getPsf().getSigma(), psf.getSigma())
425 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map))
426 np.testing.assert_array_equal(results0.output_table["visit"], visit)
427 np.testing.assert_array_equal(results1.output_table["visit"], visit)
428 np.testing.assert_array_equal(results0.output_table["detector"], detector0)
429 np.testing.assert_array_equal(results1.output_table["detector"], detector1)
431 # Now test the task to concatenate these together.
432 consolidate_task = ConsolidateFinalizeCharacterizationDetectorTask(
433 config=ConsolidateFinalizeCharacterizationDetectorConfig(),
434 )
436 psf_ap_corr_detector_dict = {
437 detector0: pipeBase.InMemoryDatasetHandle(results0.psf_ap_corr_cat),
438 detector1: pipeBase.InMemoryDatasetHandle(results1.psf_ap_corr_cat),
439 }
440 src_detector_table_dict = {
441 detector0: pipeBase.InMemoryDatasetHandle(results0.output_table, storageClass="ArrowAstropy"),
442 detector1: pipeBase.InMemoryDatasetHandle(results1.output_table, storageClass="ArrowAstropy"),
443 }
445 results = consolidate_task.run(
446 psf_ap_corr_detector_dict=psf_ap_corr_detector_dict,
447 src_detector_table_dict=src_detector_table_dict,
448 )
450 self.assertEqual(len(results.psf_ap_corr_cat), 2)
451 np.testing.assert_array_equal(results.psf_ap_corr_cat["id"], [detector0, detector1])
452 np.testing.assert_array_equal(results.psf_ap_corr_cat["visit"], visit)
453 row = results.psf_ap_corr_cat.find(detector0)
454 self.assertEqual(row.getPsf().getSigma(), psf.getSigma())
455 self.assertEqual(list(row.getApCorrMap()), list(ap_corr_map))
456 np.testing.assert_array_equal(results.output_table["visit"], visit)
457 table_len = len(results.output_table)
458 np.testing.assert_array_equal(results.output_table["detector"][0: table_len // 2], detector0)
459 np.testing.assert_array_equal(results.output_table["detector"][table_len // 2:], detector1)
462class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
463 pass
466def setup_module(module):
467 lsst.utils.tests.init()
470if __name__ == "__main__": 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 lsst.utils.tests.init()
472 unittest.main()