Coverage for python / lsst / pipe / tasks / finalizeCharacterization.py: 14%

450 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:21 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Task to run a finalized image characterization, using additional data. 

23""" 

24 

25__all__ = [ 

26 'FinalizeCharacterizationConnections', 

27 'FinalizeCharacterizationConfig', 

28 'FinalizeCharacterizationTask', 

29 'FinalizeCharacterizationDetectorConnections', 

30 'FinalizeCharacterizationDetectorConfig', 

31 'FinalizeCharacterizationDetectorTask', 

32 'ConsolidateFinalizeCharacterizationDetectorConnections', 

33 'ConsolidateFinalizeCharacterizationDetectorConfig', 

34 'ConsolidateFinalizeCharacterizationDetectorTask', 

35] 

36 

37import astropy.table 

38import astropy.units as u 

39import numpy as np 

40import esutil 

41from smatch.matcher import Matcher 

42 

43 

44import lsst.pex.config as pexConfig 

45import lsst.pipe.base as pipeBase 

46import lsst.daf.base as dafBase 

47import lsst.afw.table as afwTable 

48import lsst.meas.algorithms as measAlg 

49import lsst.meas.extensions.piff.piffPsfDeterminer # noqa: F401 

50from lsst.meas.algorithms import MeasureApCorrTask 

51from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask 

52from lsst.meas.algorithms.sourceSelector import sourceSelectorRegistry 

53 

54from .reserveIsolatedStars import ReserveIsolatedStarsTask 

55from lsst.obs.base.utils import TableVStack 

56 

57 

58class FinalizeCharacterizationConnectionsBase( 

59 pipeBase.PipelineTaskConnections, 

60 dimensions=('instrument', 'visit',), 

61 defaultTemplates={}, 

62): 

63 src_schema = pipeBase.connectionTypes.InitInput( 

64 doc='Input schema used for src catalogs.', 

65 name='src_schema', 

66 storageClass='SourceCatalog', 

67 ) 

68 isolated_star_cats = pipeBase.connectionTypes.Input( 

69 doc=('Catalog of isolated stars with average positions, number of associated ' 

70 'sources, and indexes to the isolated_star_sources catalogs.'), 

71 name='isolated_star_presource_associations', 

72 storageClass='ArrowAstropy', 

73 dimensions=('instrument', 'tract', 'skymap'), 

74 deferLoad=True, 

75 multiple=True, 

76 ) 

77 isolated_star_sources = pipeBase.connectionTypes.Input( 

78 doc=('Catalog of isolated star sources with sourceIds, and indexes to the ' 

79 'isolated_star_cats catalogs.'), 

80 name='isolated_star_presources', 

81 storageClass='ArrowAstropy', 

82 dimensions=('instrument', 'tract', 'skymap'), 

83 deferLoad=True, 

84 multiple=True, 

85 ) 

86 fgcm_standard_star = pipeBase.connectionTypes.Input( 

87 doc=('Catalog of fgcm for color corrections, and indexes to the ' 

88 'isolated_star_cats catalogs.'), 

89 name='fgcm_standard_star', 

90 storageClass='ArrowAstropy', 

91 dimensions=('instrument', 'tract', 'skymap'), 

92 deferLoad=True, 

93 multiple=True, 

94 ) 

95 

96 def __init__(self, *, config=None): 

97 super().__init__(config=config) 

98 if config is None: 

99 return None 

100 if not self.config.psf_determiner['piff'].useColor: 

101 self.inputs.remove("fgcm_standard_star") 

102 

103 

104class FinalizeCharacterizationConnections( 

105 FinalizeCharacterizationConnectionsBase, 

106 dimensions=('instrument', 'visit',), 

107 defaultTemplates={}, 

108): 

109 srcs = pipeBase.connectionTypes.Input( 

110 doc='Source catalogs for the visit', 

111 name='src', 

112 storageClass='SourceCatalog', 

113 dimensions=('instrument', 'visit', 'detector'), 

114 deferLoad=True, 

115 multiple=True, 

116 deferGraphConstraint=True, 

117 ) 

118 calexps = pipeBase.connectionTypes.Input( 

119 doc='Calexps for the visit', 

120 name='calexp', 

121 storageClass='ExposureF', 

122 dimensions=('instrument', 'visit', 'detector'), 

123 deferLoad=True, 

124 multiple=True, 

125 ) 

126 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output( 

127 doc=('Per-visit finalized psf models and aperture corrections. This ' 

128 'catalog uses detector id for the id and are sorted for fast ' 

129 'lookups of a detector.'), 

130 name='finalized_psf_ap_corr_catalog', 

131 storageClass='ExposureCatalog', 

132 dimensions=('instrument', 'visit'), 

133 ) 

134 finalized_src_table = pipeBase.connectionTypes.Output( 

135 doc=('Per-visit catalog of measurements for psf/flag/etc.'), 

136 name='finalized_src_table', 

137 storageClass='ArrowAstropy', 

138 dimensions=('instrument', 'visit'), 

139 ) 

140 

141 

142class FinalizeCharacterizationDetectorConnections( 

143 FinalizeCharacterizationConnectionsBase, 

144 dimensions=('instrument', 'visit', 'detector',), 

145 defaultTemplates={}, 

146): 

147 src = pipeBase.connectionTypes.Input( 

148 doc='Source catalog for the visit/detector.', 

149 name='src', 

150 storageClass='SourceCatalog', 

151 dimensions=('instrument', 'visit', 'detector'), 

152 ) 

153 calexp = pipeBase.connectionTypes.Input( 

154 doc='Calibrated exposure for the visit/detector.', 

155 name='calexp', 

156 storageClass='ExposureF', 

157 dimensions=('instrument', 'visit', 'detector'), 

158 ) 

159 finalized_psf_ap_corr_detector_cat = pipeBase.connectionTypes.Output( 

160 doc=('Per-visit/per-detector finalized psf models and aperture corrections. This ' 

161 'catalog uses detector id for the id.'), 

162 name='finalized_psf_ap_corr_detector_catalog', 

163 storageClass='ExposureCatalog', 

164 dimensions=('instrument', 'visit', 'detector'), 

165 ) 

166 finalized_src_detector_table = pipeBase.connectionTypes.Output( 

167 doc=('Per-visit/per-detector catalog of measurements for psf/flag/etc.'), 

168 name='finalized_src_detector_table', 

169 storageClass='ArrowAstropy', 

170 dimensions=('instrument', 'visit', 'detector'), 

171 ) 

172 

173 

174class FinalizeCharacterizationConfigBase( 

175 pipeBase.PipelineTaskConfig, 

176 pipelineConnections=FinalizeCharacterizationConnectionsBase, 

177): 

178 """Configuration for FinalizeCharacterizationBaseTask.""" 

179 source_selector = sourceSelectorRegistry.makeField( 

180 doc="How to select sources", 

181 default="science" 

182 ) 

183 id_column = pexConfig.Field( 

184 doc='Name of column in isolated_star_sources with source id.', 

185 dtype=str, 

186 default='sourceId', 

187 ) 

188 reserve_selection = pexConfig.ConfigurableField( 

189 target=ReserveIsolatedStarsTask, 

190 doc='Task to select reserved stars', 

191 ) 

192 make_psf_candidates = pexConfig.ConfigurableField( 

193 target=measAlg.MakePsfCandidatesTask, 

194 doc='Task to make psf candidates from selected stars.', 

195 ) 

196 psf_determiner = measAlg.psfDeterminerRegistry.makeField( 

197 'PSF Determination algorithm', 

198 default='piff' 

199 ) 

200 measurement = pexConfig.ConfigurableField( 

201 target=SingleFrameMeasurementTask, 

202 doc='Measure sources for aperture corrections' 

203 ) 

204 measure_ap_corr = pexConfig.ConfigurableField( 

205 target=MeasureApCorrTask, 

206 doc="Subtask to measure aperture corrections" 

207 ) 

208 apply_ap_corr = pexConfig.ConfigurableField( 

209 target=ApplyApCorrTask, 

210 doc="Subtask to apply aperture corrections" 

211 ) 

212 

213 def setDefaults(self): 

214 super().setDefaults() 

215 

216 source_selector = self.source_selector['science'] 

217 source_selector.setDefaults() 

218 

219 # We use the source selector only to select out flagged objects 

220 # and signal-to-noise. Isolated, unresolved sources are handled 

221 # by the isolated star catalog. 

222 

223 source_selector.doFlags = True 

224 source_selector.doSignalToNoise = True 

225 source_selector.doFluxLimit = False 

226 source_selector.doUnresolved = False 

227 source_selector.doIsolated = False 

228 

229 source_selector.signalToNoise.minimum = 50.0 

230 source_selector.signalToNoise.maximum = 1000.0 

231 

232 source_selector.signalToNoise.fluxField = 'base_GaussianFlux_instFlux' 

233 source_selector.signalToNoise.errField = 'base_GaussianFlux_instFluxErr' 

234 

235 source_selector.flags.bad = ['base_PixelFlags_flag_edge', 

236 'base_PixelFlags_flag_nodata', 

237 'base_PixelFlags_flag_interpolatedCenter', 

238 'base_PixelFlags_flag_saturatedCenter', 

239 'base_PixelFlags_flag_crCenter', 

240 'base_PixelFlags_flag_bad', 

241 'base_PixelFlags_flag_interpolated', 

242 'base_PixelFlags_flag_saturated', 

243 'slot_Centroid_flag', 

244 'base_GaussianFlux_flag'] 

245 

246 # Configure aperture correction to select only high s/n sources (that 

247 # were used in the psf modeling) to avoid background problems when 

248 # computing the aperture correction map. 

249 self.measure_ap_corr.sourceSelector = 'science' 

250 

251 ap_selector = self.measure_ap_corr.sourceSelector['science'] 

252 # We do not need to filter flags or unresolved because we have used 

253 # the filtered isolated stars as an input 

254 ap_selector.doFlags = False 

255 ap_selector.doUnresolved = False 

256 

257 import lsst.meas.modelfit # noqa: F401 

258 import lsst.meas.extensions.photometryKron # noqa: F401 

259 import lsst.meas.extensions.convolved # noqa: F401 

260 import lsst.meas.extensions.gaap # noqa: F401 

261 import lsst.meas.extensions.shapeHSM # noqa: F401 

262 

263 # Set up measurement defaults 

264 self.measurement.plugins.names = [ 

265 'base_FPPosition', 

266 'base_PsfFlux', 

267 'base_GaussianFlux', 

268 'modelfit_DoubleShapeletPsfApprox', 

269 'modelfit_CModel', 

270 'ext_photometryKron_KronFlux', 

271 'ext_convolved_ConvolvedFlux', 

272 'ext_gaap_GaapFlux', 

273 'ext_shapeHSM_HsmShapeRegauss', 

274 'ext_shapeHSM_HsmSourceMoments', 

275 'ext_shapeHSM_HsmPsfMoments', 

276 'ext_shapeHSM_HsmSourceMomentsRound', 

277 'ext_shapeHSM_HigherOrderMomentsSource', 

278 'ext_shapeHSM_HigherOrderMomentsPSF', 

279 ] 

280 self.measurement.slots.modelFlux = 'modelfit_CModel' 

281 self.measurement.plugins['ext_convolved_ConvolvedFlux'].seeing.append(8.0) 

282 self.measurement.plugins['ext_gaap_GaapFlux'].sigmas = [ 

283 0.5, 

284 0.7, 

285 1.0, 

286 1.5, 

287 2.5, 

288 3.0 

289 ] 

290 self.measurement.plugins['ext_gaap_GaapFlux'].doPsfPhotometry = True 

291 self.measurement.slots.shape = 'ext_shapeHSM_HsmSourceMoments' 

292 self.measurement.slots.psfShape = 'ext_shapeHSM_HsmPsfMoments' 

293 self.measurement.plugins['ext_shapeHSM_HsmShapeRegauss'].deblendNChild = "" 

294 

295 # TODO: Remove in DM-44658, streak masking to happen only in ip_diffim 

296 # Keep track of which footprints contain streaks 

297 self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['STREAK'] 

298 self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['STREAK'] 

299 

300 # Turn off slot setting for measurement for centroid and shape 

301 # (for which we use the input src catalog measurements) 

302 self.measurement.slots.centroid = None 

303 self.measurement.slots.apFlux = None 

304 self.measurement.slots.calibFlux = None 

305 

306 names = self.measurement.plugins['ext_convolved_ConvolvedFlux'].getAllResultNames() 

307 self.measure_ap_corr.allowFailure += names 

308 names = self.measurement.plugins["ext_gaap_GaapFlux"].getAllGaapResultNames() 

309 self.measure_ap_corr.allowFailure += names 

310 

311 

312class FinalizeCharacterizationConfig( 

313 FinalizeCharacterizationConfigBase, 

314 pipelineConnections=FinalizeCharacterizationConnections, 

315): 

316 pass 

317 

318 

319class FinalizeCharacterizationDetectorConfig( 

320 FinalizeCharacterizationConfigBase, 

321 pipelineConnections=FinalizeCharacterizationDetectorConnections, 

322): 

323 pass 

324 

325 

326class FinalizeCharacterizationTaskBase(pipeBase.PipelineTask): 

327 """Run final characterization on exposures.""" 

328 ConfigClass = FinalizeCharacterizationConfigBase 

329 _DefaultName = 'finalize_characterization_base' 

330 

331 def __init__(self, initInputs=None, **kwargs): 

332 super().__init__(initInputs=initInputs, **kwargs) 

333 

334 self.schema_mapper, self.schema = self._make_output_schema_mapper( 

335 initInputs['src_schema'].schema 

336 ) 

337 

338 self.makeSubtask('reserve_selection') 

339 self.makeSubtask('source_selector') 

340 self.makeSubtask('make_psf_candidates') 

341 self.makeSubtask('psf_determiner') 

342 self.makeSubtask('measurement', schema=self.schema) 

343 self.makeSubtask('measure_ap_corr', schema=self.schema) 

344 self.makeSubtask('apply_ap_corr', schema=self.schema) 

345 

346 # Only log warning and fatal errors from the source_selector 

347 self.source_selector.log.setLevel(self.source_selector.log.WARN) 

348 self.isPsfDeterminerPiff = False 

349 if isinstance(self.psf_determiner, lsst.meas.extensions.piff.piffPsfDeterminer.PiffPsfDeterminerTask): 

350 self.isPsfDeterminerPiff = True 

351 

352 def _make_output_schema_mapper(self, input_schema): 

353 """Make the schema mapper from the input schema to the output schema. 

354 

355 Parameters 

356 ---------- 

357 input_schema : `lsst.afw.table.Schema` 

358 Input schema. 

359 

360 Returns 

361 ------- 

362 mapper : `lsst.afw.table.SchemaMapper` 

363 Schema mapper 

364 output_schema : `lsst.afw.table.Schema` 

365 Output schema (with alias map) 

366 """ 

367 mapper = afwTable.SchemaMapper(input_schema) 

368 mapper.addMinimalSchema(afwTable.SourceTable.makeMinimalSchema()) 

369 mapper.addMapping(input_schema['slot_Centroid_x'].asKey()) 

370 mapper.addMapping(input_schema['slot_Centroid_y'].asKey()) 

371 

372 # The aperture fields may be used by the psf determiner. 

373 aper_fields = input_schema.extract('base_CircularApertureFlux_*') 

374 for field, item in aper_fields.items(): 

375 mapper.addMapping(item.key) 

376 

377 # The following two may be redundant, but then the mapping is a no-op. 

378 # Note that the slot_CalibFlux mapping will copy over any 

379 # normalized compensated fluxes that are used for calibration. 

380 apflux_fields = input_schema.extract('slot_ApFlux_*') 

381 for field, item in apflux_fields.items(): 

382 mapper.addMapping(item.key) 

383 

384 calibflux_fields = input_schema.extract('slot_CalibFlux_*') 

385 for field, item in calibflux_fields.items(): 

386 mapper.addMapping(item.key) 

387 

388 mapper.addMapping( 

389 input_schema[self.config.source_selector.active.signalToNoise.fluxField].asKey(), 

390 'calib_psf_selection_flux') 

391 mapper.addMapping( 

392 input_schema[self.config.source_selector.active.signalToNoise.errField].asKey(), 

393 'calib_psf_selection_flux_err') 

394 

395 output_schema = mapper.getOutputSchema() 

396 

397 output_schema.addField( 

398 'calib_psf_candidate', 

399 type='Flag', 

400 doc=('set if the source was a candidate for PSF determination, ' 

401 'as determined from FinalizeCharacterizationTask.'), 

402 ) 

403 output_schema.addField( 

404 'calib_psf_reserved', 

405 type='Flag', 

406 doc=('set if source was reserved from PSF determination by ' 

407 'FinalizeCharacterizationTask.'), 

408 ) 

409 output_schema.addField( 

410 'calib_psf_used', 

411 type='Flag', 

412 doc=('set if source was used in the PSF determination by ' 

413 'FinalizeCharacterizationTask.'), 

414 ) 

415 output_schema.addField( 

416 'visit', 

417 type=np.int64, 

418 doc='Visit number for the sources.', 

419 ) 

420 output_schema.addField( 

421 'detector', 

422 type=np.int32, 

423 doc='Detector number for the sources.', 

424 ) 

425 output_schema.addField( 

426 'psf_color_value', 

427 type=np.float32, 

428 doc="Color used in PSF fit." 

429 ) 

430 output_schema.addField( 

431 'psf_color_type', 

432 type=str, 

433 size=10, 

434 doc="Color used in PSF fit." 

435 ) 

436 output_schema.addField( 

437 'psf_max_value', 

438 type=np.float32, 

439 doc="Maximum value in the star image used to train PSF.", 

440 doReplace=True, 

441 ) 

442 

443 alias_map = input_schema.getAliasMap() 

444 alias_map_output = afwTable.AliasMap() 

445 alias_map_output.set('slot_Centroid', alias_map.get('slot_Centroid')) 

446 alias_map_output.set('slot_ApFlux', alias_map.get('slot_ApFlux')) 

447 alias_map_output.set('slot_CalibFlux', alias_map.get('slot_CalibFlux')) 

448 

449 output_schema.setAliasMap(alias_map_output) 

450 

451 return mapper, output_schema 

452 

453 def _make_selection_schema_mapper(self, input_schema): 

454 """Make the schema mapper from the input schema to the selection schema. 

455 

456 Parameters 

457 ---------- 

458 input_schema : `lsst.afw.table.Schema` 

459 Input schema. 

460 

461 Returns 

462 ------- 

463 mapper : `lsst.afw.table.SchemaMapper` 

464 Schema mapper 

465 selection_schema : `lsst.afw.table.Schema` 

466 Selection schema (with alias map) 

467 """ 

468 mapper = afwTable.SchemaMapper(input_schema) 

469 mapper.addMinimalSchema(input_schema) 

470 

471 selection_schema = mapper.getOutputSchema() 

472 

473 selection_schema.setAliasMap(input_schema.getAliasMap()) 

474 

475 selection_schema.addField( 

476 'psf_color_value', 

477 type=np.float32, 

478 doc="Color used in PSF fit." 

479 ) 

480 selection_schema.addField( 

481 'psf_color_type', 

482 type=str, 

483 size=10, 

484 doc="Color used in PSF fit." 

485 ) 

486 selection_schema.addField( 

487 'psf_max_value', 

488 type=np.float32, 

489 doc="Maximum value in the star image used to train PSF.", 

490 doReplace=True, 

491 ) 

492 

493 return mapper, selection_schema 

494 

495 def concat_isolated_star_cats( 

496 self, 

497 band, 

498 isolated_star_cat_dict, 

499 isolated_star_source_dict, 

500 visit=None, 

501 detector=None, 

502 ): 

503 """ 

504 Concatenate isolated star catalogs and make reserve selection. 

505 

506 Parameters 

507 ---------- 

508 band : `str` 

509 Band name. Used to select reserved stars. 

510 isolated_star_cat_dict : `dict` 

511 Per-tract dict of isolated star catalog handles. 

512 isolated_star_source_dict : `dict` 

513 Per-tract dict of isolated star source catalog handles. 

514 visit : `int`, optional 

515 Visit to down-select sources. 

516 detector : `int`, optional 

517 Detector to down-select sources. Will only be used if visit 

518 is also set. 

519 

520 Returns 

521 ------- 

522 isolated_table : `astropy.table.Table` (N,) 

523 Table of isolated stars, with indexes to isolated sources. 

524 Returns None if there are no usable isolated catalogs. 

525 isolated_source_table : `astropy.table.Table` (M,) 

526 Table of isolated sources, with indexes to isolated stars. 

527 Returns None if there are no usable isolated catalogs. 

528 """ 

529 isolated_tables = [] 

530 isolated_sources = [] 

531 merge_cat_counter = 0 

532 merge_source_counter = 0 

533 

534 handle = isolated_star_cat_dict[list(isolated_star_cat_dict.keys())[0]] 

535 all_source_columns = handle.get(component='columns') 

536 source_columns = [self.config.id_column, 'obj_index'] 

537 # visit can be used if it is in the input catalog. 

538 if visit is not None and visit in all_source_columns: 

539 source_columns.append('visit') 

540 if detector is not None: 

541 source_columns.append('detector') 

542 else: 

543 visit = None 

544 detector = None 

545 

546 for tract in isolated_star_cat_dict: 

547 astropy_cat = isolated_star_cat_dict[tract].get() 

548 astropy_source = isolated_star_source_dict[tract].get( 

549 parameters={'columns': source_columns} 

550 ) 

551 

552 # Cut the isolated star sources to this visit/detector. 

553 sources_downselected = False 

554 if visit is not None: 

555 sources_downselected = True 

556 these_sources = (astropy_source['visit'] == visit) 

557 

558 if these_sources.sum() == 0: 

559 self.log.info('No sources found for visit %d in tract %d.', visit, tract) 

560 continue 

561 

562 if detector is not None: 

563 these_sources &= (astropy_source['detector'] == detector) 

564 if these_sources.sum() == 0: 

565 self.log.info( 

566 'No sources found for visit %d, detector %d in tract %d.', 

567 visit, 

568 detector, 

569 tract, 

570 ) 

571 continue 

572 

573 astropy_source = astropy_source[these_sources] 

574 

575 # Cut isolated star table to those observed in this band, and adjust indexes 

576 # We must use all the stars in the table in the band to ensure consistent 

577 # reserve star selection below. 

578 (use_band,) = (astropy_cat[f'nsource_{band}'] > 0).nonzero() 

579 

580 if len(use_band) == 0: 

581 # There are no sources in this band in this tract. 

582 self.log.info("No sources found in %s band in tract %d.", band, tract) 

583 continue 

584 

585 # With the following matching: 

586 # table_source[b] <-> table_cat[use_band[a]] 

587 a, b = esutil.numpy_util.match(use_band, np.asarray(astropy_source['obj_index'])) 

588 

589 # Update indexes and cut to band-selected stars/sources 

590 astropy_source['obj_index'][b] = a 

591 if sources_downselected: 

592 # The isolated star catalog is built such that each star 

593 # in the catalog is matched to multiple sources. But due 

594 # to the way it is constructed of isolated stars there 

595 # is at most one unique source on any visit associated 

596 # with a single isolated star in the catalog. Therefore, 

597 # everything is guaranteed to already be uniquely 

598 # matched here because we did visit (and detector) 

599 # down-selection above. 

600 astropy_cat[f'source_cat_index_{band}'][use_band][a] = b 

601 else: 

602 # If there was no down-selection then each star has 

603 # multiple sources. 

604 _, index_new = np.unique(a, return_index=True) 

605 astropy_cat[f'source_cat_index_{band}'][use_band] = index_new 

606 

607 # After the following cuts, the catalogs have the following properties: 

608 # - table_cat only contains isolated stars that have at least one source 

609 # in ``band``. 

610 # - table_source only contains ``band`` sources. 

611 # - The slice table_cat["source_cat_index_{band}"]: table_cat["source_cat_index_{band}"] 

612 # + table_cat["nsource_{band}] 

613 # applied to table_source will give all the sources associated with the star. 

614 # - For each source, table_source["obj_index"] points to the index of the associated 

615 # isolated star. 

616 astropy_source = astropy_source[b] 

617 astropy_cat = astropy_cat[use_band] 

618 

619 # Add reserved flag column to tables 

620 astropy_cat['reserved'] = False 

621 astropy_source['reserved'] = False 

622 

623 # Get reserve star flags 

624 astropy_cat['reserved'][:] = self.reserve_selection.run( 

625 len(astropy_cat), 

626 extra=f'{band}_{tract}', 

627 ) 

628 astropy_source['reserved'][:] = astropy_cat['reserved'][astropy_source['obj_index']] 

629 

630 # Offset indexes to account for tract merging 

631 astropy_cat[f'source_cat_index_{band}'] += merge_source_counter 

632 astropy_source['obj_index'] += merge_cat_counter 

633 

634 isolated_tables.append(astropy_cat) 

635 isolated_sources.append(astropy_source) 

636 

637 merge_cat_counter += len(astropy_cat) 

638 merge_source_counter += len(astropy_source) 

639 

640 if len(isolated_tables) > 0: 

641 isolated_table = astropy.table.vstack(isolated_tables, metadata_conflicts='silent') 

642 isolated_source_table = astropy.table.vstack(isolated_sources, metadata_conflicts='silent') 

643 else: 

644 isolated_table = None 

645 isolated_source_table = None 

646 

647 return isolated_table, isolated_source_table 

648 

649 def add_src_colors(self, srcCat, fgcmCat, band): 

650 

651 if self.isPsfDeterminerPiff and fgcmCat is not None: 

652 

653 raSrc = (srcCat['coord_ra'] * u.radian).to(u.degree).value 

654 decSrc = (srcCat['coord_dec'] * u.radian).to(u.degree).value 

655 

656 with Matcher(raSrc, decSrc) as matcher: 

657 idx, idxSrcCat, idxColorCat, d = matcher.query_radius( 

658 fgcmCat["ra"], 

659 fgcmCat["dec"], 

660 1. / 3600.0, 

661 return_indices=True, 

662 ) 

663 

664 magStr1 = self.psf_determiner.config.color[band][0] 

665 magStr2 = self.psf_determiner.config.color[band][2] 

666 colors = fgcmCat[f'mag_{magStr1}'] - fgcmCat[f'mag_{magStr2}'] 

667 

668 for idSrc, idColor in zip(idxSrcCat, idxColorCat): 

669 srcCat[idSrc]['psf_color_value'] = colors[idColor] 

670 srcCat[idSrc]['psf_color_type'] = f"{magStr1}-{magStr2}" 

671 

672 def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, 

673 isolated_source_table, fgcm_standard_star_cat): 

674 """Compute psf model and aperture correction map for a single exposure. 

675 

676 Parameters 

677 ---------- 

678 visit : `int` 

679 Visit number (for logging). 

680 detector : `int` 

681 Detector number (for logging). 

682 exposure : `lsst.afw.image.ExposureF` 

683 src : `lsst.afw.table.SourceCatalog` 

684 isolated_source_table : `np.ndarray` or `astropy.table.Table` 

685 fgcm_standard_star_cat : `np.ndarray` 

686 

687 Returns 

688 ------- 

689 psf : `lsst.meas.algorithms.ImagePsf` 

690 PSF Model 

691 ap_corr_map : `lsst.afw.image.ApCorrMap` 

692 Aperture correction map. 

693 measured_src : `lsst.afw.table.SourceCatalog` 

694 Updated source catalog with measurements, flags and aperture corrections. 

695 """ 

696 # Extract footprints from the input src catalog for noise replacement. 

697 footprints = SingleFrameMeasurementTask.getFootprintsFromCatalog(src) 

698 

699 # Apply source selector (s/n, flags, etc.) 

700 good_src = self.source_selector.selectSources(src) 

701 if sum(good_src.selected) == 0: 

702 self.log.warning('No good sources remain after cuts for visit %d, detector %d', 

703 visit, detector) 

704 return None, None, None 

705 

706 # Cut down input src to the selected sources 

707 # We use a separate schema/mapper here than for the output/measurement catalog because of 

708 # clashes between fields that were previously run and those that need to be rerun with 

709 # the new psf model. This may be slightly inefficient but keeps input 

710 # and output values cleanly separated. 

711 selection_mapper, selection_schema = self._make_selection_schema_mapper(src.schema) 

712 

713 selected_src = afwTable.SourceCatalog(selection_schema) 

714 selected_src.reserve(good_src.selected.sum()) 

715 selected_src.extend(src[good_src.selected], mapper=selection_mapper) 

716 

717 # The calib flags have been copied from the input table, 

718 # and we reset them here just to ensure they aren't propagated. 

719 selected_src['calib_psf_candidate'] = np.zeros(len(selected_src), dtype=bool) 

720 selected_src['calib_psf_used'] = np.zeros(len(selected_src), dtype=bool) 

721 selected_src['calib_psf_reserved'] = np.zeros(len(selected_src), dtype=bool) 

722 

723 # Find the isolated sources and set flags 

724 matched_src, matched_iso = esutil.numpy_util.match( 

725 selected_src['id'], 

726 np.asarray(isolated_source_table[self.config.id_column]), 

727 ) 

728 if len(matched_src) == 0: 

729 self.log.warning( 

730 "No candidates from matched isolate stars for visit=%s, detector=%s " 

731 "(this is probably the result of an earlier astrometry failure).", 

732 visit, detector, 

733 ) 

734 return None, None, None 

735 

736 matched_arr = np.zeros(len(selected_src), dtype=bool) 

737 matched_arr[matched_src] = True 

738 selected_src['calib_psf_candidate'] = matched_arr 

739 

740 reserved_arr = np.zeros(len(selected_src), dtype=bool) 

741 reserved_arr[matched_src] = np.asarray(isolated_source_table['reserved'][matched_iso]) 

742 selected_src['calib_psf_reserved'] = reserved_arr 

743 

744 selected_src = selected_src[selected_src['calib_psf_candidate']].copy(deep=True) 

745 

746 # Make the measured source catalog as well, based on the selected catalog. 

747 measured_src = afwTable.SourceCatalog(self.schema) 

748 measured_src.reserve(len(selected_src)) 

749 measured_src.extend(selected_src, mapper=self.schema_mapper) 

750 

751 # We need to copy over the calib_psf flags because they were not in the mapper 

752 measured_src['calib_psf_candidate'] = selected_src['calib_psf_candidate'] 

753 measured_src['calib_psf_reserved'] = selected_src['calib_psf_reserved'] 

754 if exposure.filter.hasBandLabel(): 

755 band = exposure.filter.bandLabel 

756 else: 

757 band = None 

758 self.add_src_colors(selected_src, fgcm_standard_star_cat, band) 

759 self.add_src_colors(measured_src, fgcm_standard_star_cat, band) 

760 

761 # Select the psf candidates from the selection catalog 

762 try: 

763 psf_selection_result = self.make_psf_candidates.run(selected_src, exposure=exposure) 

764 _ = self.make_psf_candidates.run(measured_src, exposure=exposure) 

765 except Exception as e: 

766 self.log.exception('Failed to make PSF candidates for visit %d, detector %d: %s', 

767 visit, detector, e) 

768 return None, None, measured_src 

769 

770 psf_cand_cat = psf_selection_result.goodStarCat 

771 

772 # Make list of psf candidates to send to the determiner 

773 # (omitting those marked as reserved) 

774 psf_determiner_list = [cand for cand, use 

775 in zip(psf_selection_result.psfCandidates, 

776 ~psf_cand_cat['calib_psf_reserved']) if use] 

777 flag_key = psf_cand_cat.schema['calib_psf_used'].asKey() 

778 

779 try: 

780 psf, cell_set = self.psf_determiner.determinePsf(exposure, 

781 psf_determiner_list, 

782 self.metadata, 

783 flagKey=flag_key) 

784 except Exception as e: 

785 self.log.exception('Failed to determine PSF for visit %d, detector %d: %s', 

786 visit, detector, e) 

787 return None, None, measured_src 

788 # Verify that the PSF is usable by downstream tasks 

789 sigma = psf.computeShape(psf.getAveragePosition(), psf.getAverageColor()).getDeterminantRadius() 

790 if np.isnan(sigma): 

791 self.log.warning('Failed to determine psf for visit %d, detector %d: ' 

792 'Computed final PSF size is NAN.', 

793 visit, detector) 

794 return None, None, measured_src 

795 

796 # Set the psf in the exposure for measurement/aperture corrections. 

797 exposure.setPsf(psf) 

798 

799 # At this point, we need to transfer the psf used flag from the selection 

800 # catalog to the measurement catalog. 

801 matched_selected, matched_measured = esutil.numpy_util.match( 

802 selected_src['id'], 

803 measured_src['id'] 

804 ) 

805 measured_used = np.zeros(len(measured_src), dtype=bool) 

806 measured_used[matched_measured] = selected_src['calib_psf_used'][matched_selected] 

807 measured_src['calib_psf_used'] = measured_used 

808 

809 # Next, we do the measurement on all the psf candidate, used, and reserved stars. 

810 # We use the full footprint list from the input src catalog for noise replacement. 

811 try: 

812 self.measurement.run(measCat=measured_src, exposure=exposure, footprints=footprints) 

813 except Exception as e: 

814 self.log.warning('Failed to make measurements for visit %d, detector %d: %s', 

815 visit, detector, e) 

816 return psf, None, measured_src 

817 

818 # And finally the ap corr map. 

819 try: 

820 ap_corr_map = self.measure_ap_corr.run(exposure=exposure, 

821 catalog=measured_src).apCorrMap 

822 except Exception as e: 

823 self.log.warning('Failed to compute aperture corrections for visit %d, detector %d: %s', 

824 visit, detector, e) 

825 return psf, None, measured_src 

826 

827 # Need to merge the original normalization aperture correction map. 

828 ap_corr_map_input = exposure.apCorrMap 

829 for key in ap_corr_map_input: 

830 if key not in ap_corr_map: 

831 ap_corr_map[key] = ap_corr_map_input[key] 

832 

833 self.apply_ap_corr.run(catalog=measured_src, apCorrMap=ap_corr_map) 

834 

835 return psf, ap_corr_map, measured_src 

836 

837 

838class FinalizeCharacterizationTask(FinalizeCharacterizationTaskBase): 

839 """Run final characterization on full visits.""" 

840 ConfigClass = FinalizeCharacterizationConfig 

841 _DefaultName = 'finalize_characterization' 

842 

843 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

844 input_handle_dict = butlerQC.get(inputRefs) 

845 

846 band = butlerQC.quantum.dataId['band'] 

847 visit = butlerQC.quantum.dataId['visit'] 

848 

849 src_dict_temp = {handle.dataId['detector']: handle 

850 for handle in input_handle_dict['srcs']} 

851 calexp_dict_temp = {handle.dataId['detector']: handle 

852 for handle in input_handle_dict['calexps']} 

853 isolated_star_cat_dict_temp = {handle.dataId['tract']: handle 

854 for handle in input_handle_dict['isolated_star_cats']} 

855 isolated_star_source_dict_temp = {handle.dataId['tract']: handle 

856 for handle in input_handle_dict['isolated_star_sources']} 

857 

858 src_dict = {detector: src_dict_temp[detector] for 

859 detector in sorted(src_dict_temp.keys())} 

860 calexp_dict = {detector: calexp_dict_temp[detector] for 

861 detector in sorted(calexp_dict_temp.keys())} 

862 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for 

863 tract in sorted(isolated_star_cat_dict_temp.keys())} 

864 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for 

865 tract in sorted(isolated_star_source_dict_temp.keys())} 

866 

867 if self.config.psf_determiner['piff'].useColor: 

868 fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle 

869 for handle in input_handle_dict['fgcm_standard_star']} 

870 fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for 

871 tract in sorted(fgcm_standard_star_dict_temp.keys())} 

872 else: 

873 fgcm_standard_star_dict = None 

874 

875 struct = self.run( 

876 visit, 

877 band, 

878 isolated_star_cat_dict, 

879 isolated_star_source_dict, 

880 src_dict, 

881 calexp_dict, 

882 fgcm_standard_star_dict=fgcm_standard_star_dict, 

883 ) 

884 

885 butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat) 

886 butlerQC.put(struct.output_table, outputRefs.finalized_src_table) 

887 

888 def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, 

889 src_dict, calexp_dict, fgcm_standard_star_dict=None): 

890 """ 

891 Run the FinalizeCharacterizationTask. 

892 

893 Parameters 

894 ---------- 

895 visit : `int` 

896 Visit number. Used in the output catalogs. 

897 band : `str` 

898 Band name. Used to select reserved stars. 

899 isolated_star_cat_dict : `dict` 

900 Per-tract dict of isolated star catalog handles. 

901 isolated_star_source_dict : `dict` 

902 Per-tract dict of isolated star source catalog handles. 

903 src_dict : `dict` 

904 Per-detector dict of src catalog handles. 

905 calexp_dict : `dict` 

906 Per-detector dict of calibrated exposure handles. 

907 fgcm_standard_star_dict : `dict` 

908 Per tract dict of fgcm isolated stars. 

909 

910 Returns 

911 ------- 

912 struct : `lsst.pipe.base.struct` 

913 Struct with outputs for persistence. 

914 

915 Raises 

916 ------ 

917 NoWorkFound 

918 Raised if the selector returns no good sources. 

919 """ 

920 # Check if we have the same inputs for each of the 

921 # src_dict and calexp_dict. 

922 src_detectors = set(src_dict.keys()) 

923 calexp_detectors = set(calexp_dict.keys()) 

924 

925 if src_detectors != calexp_detectors: 

926 detector_keys = sorted(src_detectors.intersection(calexp_detectors)) 

927 self.log.warning( 

928 "Input src and calexp have mismatched detectors; " 

929 "running intersection of %d detectors.", 

930 len(detector_keys), 

931 ) 

932 else: 

933 detector_keys = sorted(src_detectors) 

934 

935 # We do not need the isolated star table in this task. 

936 # However, it is used in tests to confirm consistency of indexes. 

937 _, isolated_source_table = self.concat_isolated_star_cats( 

938 band, 

939 isolated_star_cat_dict, 

940 isolated_star_source_dict, 

941 visit=visit, 

942 ) 

943 

944 if isolated_source_table is None: 

945 raise pipeBase.NoWorkFound(f'No good isolated sources found for any detectors in visit {visit}') 

946 

947 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema() 

948 exposure_cat_schema.addField('visit', type='L', doc='Visit number') 

949 

950 metadata = dafBase.PropertyList() 

951 metadata.add("COMMENT", "Catalog id is detector id, sorted.") 

952 metadata.add("COMMENT", "Only detectors with data have entries.") 

953 

954 psf_ap_corr_cat = afwTable.ExposureCatalog(exposure_cat_schema) 

955 psf_ap_corr_cat.setMetadata(metadata) 

956 

957 measured_src_tables = [] 

958 measured_src_table = None 

959 

960 if fgcm_standard_star_dict is not None: 

961 

962 fgcm_standard_star_cat = [] 

963 

964 for tract in fgcm_standard_star_dict: 

965 astropy_fgcm = fgcm_standard_star_dict[tract].get() 

966 table_fgcm = np.asarray(astropy_fgcm) 

967 fgcm_standard_star_cat.append(table_fgcm) 

968 

969 fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat) 

970 else: 

971 fgcm_standard_star_cat = None 

972 

973 self.log.info("Running finalizeCharacterization on %d detectors.", len(detector_keys)) 

974 for detector in detector_keys: 

975 self.log.info("Starting finalizeCharacterization on detector ID %d.", detector) 

976 src = src_dict[detector].get() 

977 exposure = calexp_dict[detector].get() 

978 

979 psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map( 

980 visit, 

981 detector, 

982 exposure, 

983 src, 

984 isolated_source_table, 

985 fgcm_standard_star_cat, 

986 ) 

987 

988 # And now we package it together... 

989 if measured_src is not None: 

990 record = psf_ap_corr_cat.addNew() 

991 record['id'] = int(detector) 

992 record['visit'] = visit 

993 if psf is not None: 

994 record.setPsf(psf) 

995 if ap_corr_map is not None: 

996 record.setApCorrMap(ap_corr_map) 

997 

998 measured_src['visit'][:] = visit 

999 measured_src['detector'][:] = detector 

1000 

1001 measured_src_tables.append(measured_src.asAstropy()) 

1002 

1003 if len(measured_src_tables) > 0: 

1004 measured_src_table = astropy.table.vstack(measured_src_tables, join_type='exact') 

1005 

1006 if measured_src_table is None: 

1007 raise pipeBase.NoWorkFound(f'No good sources found for any detectors in visit {visit}') 

1008 

1009 return pipeBase.Struct( 

1010 psf_ap_corr_cat=psf_ap_corr_cat, 

1011 output_table=measured_src_table, 

1012 ) 

1013 

1014 

1015class FinalizeCharacterizationDetectorTask(FinalizeCharacterizationTaskBase): 

1016 """Run final characterization per detector.""" 

1017 ConfigClass = FinalizeCharacterizationDetectorConfig 

1018 _DefaultName = 'finalize_characterization_detector' 

1019 

1020 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1021 input_handle_dict = butlerQC.get(inputRefs) 

1022 

1023 band = butlerQC.quantum.dataId['band'] 

1024 visit = butlerQC.quantum.dataId['visit'] 

1025 detector = butlerQC.quantum.dataId['detector'] 

1026 

1027 isolated_star_cat_dict_temp = {handle.dataId['tract']: handle 

1028 for handle in input_handle_dict['isolated_star_cats']} 

1029 isolated_star_source_dict_temp = {handle.dataId['tract']: handle 

1030 for handle in input_handle_dict['isolated_star_sources']} 

1031 

1032 isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for 

1033 tract in sorted(isolated_star_cat_dict_temp.keys())} 

1034 isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for 

1035 tract in sorted(isolated_star_source_dict_temp.keys())} 

1036 

1037 if self.config.psf_determiner['piff'].useColor: 

1038 fgcm_standard_star_dict_temp = {handle.dataId['tract']: handle 

1039 for handle in input_handle_dict['fgcm_standard_star']} 

1040 fgcm_standard_star_dict = {tract: fgcm_standard_star_dict_temp[tract] for 

1041 tract in sorted(fgcm_standard_star_dict_temp.keys())} 

1042 else: 

1043 fgcm_standard_star_dict = None 

1044 

1045 struct = self.run( 

1046 visit, 

1047 band, 

1048 detector, 

1049 isolated_star_cat_dict, 

1050 isolated_star_source_dict, 

1051 input_handle_dict['src'], 

1052 input_handle_dict['calexp'], 

1053 fgcm_standard_star_dict=fgcm_standard_star_dict, 

1054 ) 

1055 

1056 butlerQC.put( 

1057 struct.psf_ap_corr_cat, 

1058 outputRefs.finalized_psf_ap_corr_detector_cat, 

1059 ) 

1060 butlerQC.put( 

1061 struct.output_table, 

1062 outputRefs.finalized_src_detector_table, 

1063 ) 

1064 

1065 def run(self, visit, band, detector, isolated_star_cat_dict, isolated_star_source_dict, 

1066 src, exposure, fgcm_standard_star_dict=None): 

1067 """ 

1068 Run the FinalizeCharacterizationDetectorTask. 

1069 

1070 Parameters 

1071 ---------- 

1072 visit : `int` 

1073 Visit number. Used in the output catalogs. 

1074 band : `str` 

1075 Band name. Used to select reserved stars. 

1076 detector : `int` 

1077 Detector number. 

1078 isolated_star_cat_dict : `dict` 

1079 Per-tract dict of isolated star catalog handles. 

1080 isolated_star_source_dict : `dict` 

1081 Per-tract dict of isolated star source catalog handles. 

1082 src : `lsst.afw.table.SourceCatalog` 

1083 Src catalog. 

1084 exposure : `lsst.afw.image.Exposure` 

1085 Calexp exposure. 

1086 fgcm_standard_star_dict : `dict` 

1087 Per tract dict of fgcm isolated stars. 

1088 

1089 Returns 

1090 ------- 

1091 struct : `lsst.pipe.base.struct` 

1092 Struct with outputs for persistence. 

1093 

1094 Raises 

1095 ------ 

1096 NoWorkFound 

1097 Raised if the selector returns no good sources. 

1098 """ 

1099 # We do not need the isolated star table in this task. 

1100 # However, it is used in tests to confirm consistency of indexes. 

1101 _, isolated_source_table = self.concat_isolated_star_cats( 

1102 band, 

1103 isolated_star_cat_dict, 

1104 isolated_star_source_dict, 

1105 visit=visit, 

1106 detector=detector, 

1107 ) 

1108 

1109 if isolated_source_table is None: 

1110 raise pipeBase.NoWorkFound(f'No good isolated sources found for any detectors in visit {visit}') 

1111 

1112 exposure_cat_schema = afwTable.ExposureTable.makeMinimalSchema() 

1113 exposure_cat_schema.addField('visit', type='L', doc='Visit number') 

1114 

1115 metadata = dafBase.PropertyList() 

1116 metadata.add("COMMENT", "Catalog id is detector id, sorted.") 

1117 metadata.add("COMMENT", "Only one detector with data has an entry.") 

1118 

1119 psf_ap_corr_cat = afwTable.ExposureCatalog(exposure_cat_schema) 

1120 psf_ap_corr_cat.setMetadata(metadata) 

1121 

1122 self.log.info("Starting finalizeCharacterization on detector ID %d.", detector) 

1123 

1124 if fgcm_standard_star_dict is not None: 

1125 fgcm_standard_star_cat = [] 

1126 

1127 for tract in fgcm_standard_star_dict: 

1128 astropy_fgcm = fgcm_standard_star_dict[tract].get() 

1129 table_fgcm = np.asarray(astropy_fgcm) 

1130 fgcm_standard_star_cat.append(table_fgcm) 

1131 

1132 fgcm_standard_star_cat = np.concatenate(fgcm_standard_star_cat) 

1133 else: 

1134 fgcm_standard_star_cat = None 

1135 

1136 psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map( 

1137 visit, 

1138 detector, 

1139 exposure, 

1140 src, 

1141 isolated_source_table, 

1142 fgcm_standard_star_cat, 

1143 ) 

1144 

1145 # And now we package it together... 

1146 measured_src_table = None 

1147 if measured_src is not None: 

1148 record = psf_ap_corr_cat.addNew() 

1149 record['id'] = int(detector) 

1150 record['visit'] = visit 

1151 if psf is not None: 

1152 record.setPsf(psf) 

1153 if ap_corr_map is not None: 

1154 record.setApCorrMap(ap_corr_map) 

1155 

1156 measured_src['visit'][:] = visit 

1157 measured_src['detector'][:] = detector 

1158 

1159 measured_src_table = measured_src.asAstropy() 

1160 

1161 if measured_src_table is None: 

1162 raise pipeBase.NoWorkFound(f'No good sources found for visit {visit} / detector {detector}') 

1163 

1164 return pipeBase.Struct( 

1165 psf_ap_corr_cat=psf_ap_corr_cat, 

1166 output_table=measured_src_table, 

1167 ) 

1168 

1169 

1170class ConsolidateFinalizeCharacterizationDetectorConnections( 

1171 pipeBase.PipelineTaskConnections, 

1172 dimensions=('instrument', 'visit',), 

1173): 

1174 finalized_psf_ap_corr_detector_cats = pipeBase.connectionTypes.Input( 

1175 doc='Per-visit/per-detector finalized psf models and aperture corrections.', 

1176 name='finalized_psf_ap_corr_detector_catalog', 

1177 storageClass='ExposureCatalog', 

1178 dimensions=('instrument', 'visit', 'detector'), 

1179 multiple=True, 

1180 deferLoad=True, 

1181 ) 

1182 finalized_src_detector_tables = pipeBase.connectionTypes.Input( 

1183 doc=('Per-visit/per-detector catalog of measurements for psf/flag/etc.'), 

1184 name='finalized_src_detector_table', 

1185 storageClass='ArrowAstropy', 

1186 dimensions=('instrument', 'visit', 'detector'), 

1187 multiple=True, 

1188 deferLoad=True, 

1189 ) 

1190 finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output( 

1191 doc=('Per-visit finalized psf models and aperture corrections. This ' 

1192 'catalog uses detector id for the id and are sorted for fast ' 

1193 'lookups of a detector.'), 

1194 name='finalized_psf_ap_corr_catalog', 

1195 storageClass='ExposureCatalog', 

1196 dimensions=('instrument', 'visit'), 

1197 ) 

1198 finalized_src_table = pipeBase.connectionTypes.Output( 

1199 doc=('Per-visit catalog of measurements for psf/flag/etc.'), 

1200 name='finalized_src_table', 

1201 storageClass='ArrowAstropy', 

1202 dimensions=('instrument', 'visit'), 

1203 ) 

1204 

1205 

1206class ConsolidateFinalizeCharacterizationDetectorConfig( 

1207 pipeBase.PipelineTaskConfig, 

1208 pipelineConnections=ConsolidateFinalizeCharacterizationDetectorConnections, 

1209): 

1210 pass 

1211 

1212 

1213class ConsolidateFinalizeCharacterizationDetectorTask(pipeBase.PipelineTask): 

1214 """Consolidate per-detector finalize characterization catalogs.""" 

1215 ConfigClass = ConsolidateFinalizeCharacterizationDetectorConfig 

1216 _DefaultName = 'consolidate_finalize_characterization_detector' 

1217 

1218 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1219 input_handle_dict = butlerQC.get(inputRefs) 

1220 

1221 psf_ap_corr_detector_dict_temp = { 

1222 handle.dataId['detector']: handle 

1223 for handle in input_handle_dict['finalized_psf_ap_corr_detector_cats'] 

1224 } 

1225 src_detector_table_dict_temp = { 

1226 handle.dataId['detector']: handle 

1227 for handle in input_handle_dict['finalized_src_detector_tables'] 

1228 } 

1229 

1230 psf_ap_corr_detector_dict = { 

1231 detector: psf_ap_corr_detector_dict_temp[detector] 

1232 for detector in sorted(psf_ap_corr_detector_dict_temp.keys()) 

1233 } 

1234 src_detector_table_dict = { 

1235 detector: src_detector_table_dict_temp[detector] 

1236 for detector in sorted(src_detector_table_dict_temp.keys()) 

1237 } 

1238 

1239 result = self.run( 

1240 psf_ap_corr_detector_dict=psf_ap_corr_detector_dict, 

1241 src_detector_table_dict=src_detector_table_dict, 

1242 ) 

1243 

1244 butlerQC.put(result.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat) 

1245 butlerQC.put(result.output_table, outputRefs.finalized_src_table) 

1246 

1247 def run(self, *, psf_ap_corr_detector_dict, src_detector_table_dict): 

1248 """ 

1249 Run the ConsolidateFinalizeCharacterizationDetectorTask. 

1250 

1251 Parameters 

1252 ---------- 

1253 psf_ap_corr_detector_dict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`] 

1254 Dictionary of input exposure catalogs, keyed by detector id. 

1255 src_detector_table_dict : `dict` [`int`, `lsst.daf.butler.DeferredDatasetHandle`] 

1256 Dictionary of input source tables, keyed by detector id. 

1257 

1258 Returns 

1259 ------- 

1260 result : `lsst.pipe.base.struct` 

1261 Struct with the following outputs: 

1262 ``psf_ap_corr_cat``: Consolidated exposure catalog 

1263 ``src_table``: Consolidated source table. 

1264 """ 

1265 if not len(psf_ap_corr_detector_dict): 

1266 raise pipeBase.NoWorkFound("No inputs found.") 

1267 

1268 if not np.all( 

1269 np.asarray(psf_ap_corr_detector_dict.keys()) 

1270 == np.asarray(src_detector_table_dict.keys()) 

1271 ): 

1272 raise ValueError( 

1273 "Input psf_ap_corr_detector_dict and src_detector_table_dict must have the same keys", 

1274 ) 

1275 

1276 psf_ap_corr_cat = None 

1277 for detector_id, handle in psf_ap_corr_detector_dict.items(): 

1278 if psf_ap_corr_cat is None: 

1279 psf_ap_corr_cat = handle.get() 

1280 else: 

1281 psf_ap_corr_cat.append(handle.get().find(detector_id)) 

1282 

1283 # Make sure it is a contiguous catalog. 

1284 psf_ap_corr_cat = psf_ap_corr_cat.copy(deep=True) 

1285 

1286 src_table = TableVStack.vstack_handles(src_detector_table_dict.values()) 

1287 

1288 return pipeBase.Struct( 

1289 psf_ap_corr_cat=psf_ap_corr_cat, 

1290 output_table=src_table, 

1291 )