Coverage for python / lsst / ap / association / diaForcedSource.py: 30%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:54 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Methods for force photometering direct and difference images at DiaObject 

23locations. 

24""" 

25 

26__all__ = ["DiaForcedSourceTask", "DiaForcedSourcedConfig"] 

27 

28import numpy as np 

29import pandas as pd 

30 

31import lsst.afw.table as afwTable 

32from lsst.daf.base import DateTime 

33import lsst.geom as geom 

34from lsst.meas.base import ForcedMeasurementTask 

35import lsst.pex.config as pexConfig 

36import lsst.pipe.base as pipeBase 

37from lsst.utils.timer import timeMethod 

38 

39 

40class DiaForcedSourcedConfig(pexConfig.Config): 

41 """Configuration for the generic DiaForcedSourcedTask class. 

42 """ 

43 forcedMeasurement = pexConfig.ConfigurableField( 

44 target=ForcedMeasurementTask, 

45 doc="Subtask to force photometer DiaObjects in the direct and " 

46 "difference images.", 

47 ) 

48 dropColumns = pexConfig.ListField( 

49 dtype=str, 

50 doc="Columns produced in forced measurement that can be dropped upon " 

51 "creation and storage of the final pandas data.", 

52 ) 

53 historyThreshold = pexConfig.Field( 

54 dtype=int, 

55 doc="Minimum number of detections of a diaObject required " 

56 "to run forced photometry. Set to 1 to include all diaObjects.", 

57 default=2, 

58 ) 

59 

60 def setDefaults(self): 

61 self.forcedMeasurement.plugins = ["base_TransformedCentroidFromCoord", 

62 "base_PsfFlux"] 

63 self.forcedMeasurement.doReplaceWithNoise = False 

64 self.forcedMeasurement.copyColumns = { 

65 "id": "diaObjectId", 

66 "coord_ra": "ra", 

67 "coord_dec": "dec"} 

68 self.forcedMeasurement.slots.centroid = "base_TransformedCentroidFromCoord" 

69 self.forcedMeasurement.slots.psfFlux = "base_PsfFlux" 

70 self.forcedMeasurement.slots.shape = None 

71 self.dropColumns = ['coord_ra', 'coord_dec', 'x', 'y', 'parent', 

72 'base_TransformedCentroidFromCoord_x', 

73 'base_TransformedCentroidFromCoord_y', 

74 'base_PsfFlux_instFlux', 

75 'base_PsfFlux_instFluxErr', 'base_PsfFlux_area', 

76 'slot_PsfFlux_area', 'base_PsfFlux_flag', 

77 'slot_PsfFlux_flag', 

78 'base_PsfFlux_flag_noGoodPixels', 

79 'slot_PsfFlux_flag_noGoodPixels', 

80 'base_PsfFlux_flag_edge', 'slot_PsfFlux_flag_edge', 

81 'base_PsfFlux_chi2', 'slot_PsfFlux_chi2', 

82 'base_PsfFlux_npixels', 'slot_PsfFlux_npixels', 

83 ] 

84 

85 

86class DiaForcedSourceTask(pipeBase.Task): 

87 """Task for measuring and storing forced sources at DiaObject locations 

88 in both difference and direct images. 

89 """ 

90 ConfigClass = DiaForcedSourcedConfig 

91 _DefaultName = "diaForcedSource" 

92 

93 def __init__(self, **kwargs): 

94 pipeBase.Task.__init__(self, **kwargs) 

95 self.makeSubtask("forcedMeasurement", 

96 refSchema=afwTable.SourceTable.makeMinimalSchema()) 

97 

98 @timeMethod 

99 def run(self, 

100 dia_objects, 

101 updatedDiaObjectIds, 

102 exposure, 

103 diffim, 

104 idGenerator): 

105 """Measure forced sources on the direct and difference images. 

106 

107 Parameters 

108 ---------- 

109 dia_objects : `pandas.DataFrame` 

110 Catalog of previously observed and newly created DiaObjects 

111 contained within the difference and direct images. DiaObjects 

112 must be indexed on the ``diaObjectId`` column. 

113 updatedDiaObjectIds : `numpy.ndarray` 

114 Array of diaObjectIds that were updated during this dia processing. 

115 Used to assure that the pipeline includes all diaObjects that were 

116 updated in case one falls on the edge of the CCD. 

117 exposure : `lsst.afw.image.Exposure` 

118 Direct image exposure. 

119 diffim : `lsst.afw.image.Exposure` 

120 Difference image. 

121 idGenerator : `lsst.meas.base.IdGenerator` 

122 Object that generates source IDs and random number generator seeds. 

123 

124 Returns 

125 ------- 

126 output_forced_sources : `pandas.DataFrame` 

127 Catalog of calibrated forced photometered fluxes on both the 

128 difference and direct images at DiaObject locations. 

129 """ 

130 # Restrict forced source measurement to objects with sufficient history to be reliable. 

131 objectTable = dia_objects.query(f'nDiaSources >= {self.config.historyThreshold}') 

132 if objectTable.empty: 

133 # The dataframe will be coerced to the correct (empty) format in diaPipe. 

134 return pd.DataFrame() 

135 

136 afw_dia_objects = self._convert_from_pandas(objectTable) 

137 

138 idFactoryDiff = idGenerator.make_table_id_factory() 

139 

140 diffForcedSources = self.forcedMeasurement.generateMeasCat( 

141 diffim, 

142 afw_dia_objects, 

143 diffim.getWcs(), 

144 idFactory=idFactoryDiff) 

145 self.forcedMeasurement.run( 

146 diffForcedSources, diffim, afw_dia_objects, diffim.getWcs()) 

147 

148 directForcedSources = self.forcedMeasurement.generateMeasCat( 

149 exposure, 

150 afw_dia_objects, 

151 exposure.getWcs(), 

152 idFactory=idFactoryDiff) 

153 self.forcedMeasurement.run( 

154 directForcedSources, exposure, afw_dia_objects, exposure.getWcs()) 

155 

156 output_forced_sources = self._calibrate_and_merge(diffForcedSources, 

157 directForcedSources, 

158 diffim, 

159 exposure) 

160 

161 output_forced_sources = self._trim_to_exposure(output_forced_sources, 

162 updatedDiaObjectIds, 

163 exposure) 

164 # Drop superfluous columns from output DataFrame. 

165 output_forced_sources.drop(columns=self.config.dropColumns, inplace=True) 

166 return output_forced_sources.set_index( 

167 ["diaObjectId", "diaForcedSourceId"], 

168 drop=False) 

169 

170 def _convert_from_pandas(self, input_objects): 

171 """Create minimal schema SourceCatalog from a pandas DataFrame. 

172 

173 We need a catalog of this type to run within the forced measurement 

174 subtask. 

175 

176 Parameters 

177 ---------- 

178 input_objects : `pandas.DataFrame` 

179 DiaObjects with locations and ids. `` 

180 

181 Returns 

182 ------- 

183 outputCatalog : `lsst.afw.table.SourceTable` 

184 Output catalog with minimal schema. 

185 """ 

186 schema = afwTable.SourceTable.makeMinimalSchema() 

187 

188 outputCatalog = afwTable.SourceCatalog(schema) 

189 outputCatalog.reserve(len(input_objects)) 

190 

191 for obj_id, df_row in input_objects.iterrows(): 

192 outputRecord = outputCatalog.addNew() 

193 outputRecord.setId(obj_id) 

194 outputRecord.setCoord( 

195 geom.SpherePoint(df_row["ra"], 

196 df_row["dec"], 

197 geom.degrees)) 

198 return outputCatalog 

199 

200 def _calibrate_and_merge(self, 

201 diff_sources, 

202 direct_sources, 

203 diff_exp, 

204 direct_exp): 

205 """Take the two output catalogs from the ForcedMeasurementTasks and 

206 calibrate, combine, and convert them to Pandas. 

207 

208 Parameters 

209 ---------- 

210 diff_sources : `lsst.afw.table.SourceTable` 

211 Catalog with PsFluxes measured on the difference image. 

212 direct_sources : `lsst.afw.table.SourceTable` 

213 Catalog with PsfFluxes measured on the direct (calexp) image. 

214 diff_exp : `lsst.afw.image.Exposure` 

215 Difference exposure ``diff_sources`` were measured on. 

216 direct_exp : `lsst.afw.image.Exposure` 

217 Direct (calexp) exposure ``direct_sources`` were measured on. 

218 

219 Returns 

220 ------- 

221 output_catalog : `pandas.DataFrame` 

222 Catalog calibrated diaForcedSources. 

223 """ 

224 diff_calib = diff_exp.getPhotoCalib() 

225 direct_calib = direct_exp.getPhotoCalib() 

226 

227 diff_fluxes = diff_calib.instFluxToNanojansky(diff_sources, 

228 "slot_PsfFlux") 

229 direct_fluxes = direct_calib.instFluxToNanojansky(direct_sources, 

230 "slot_PsfFlux") 

231 

232 output_catalog = diff_sources.asAstropy().to_pandas() 

233 # afwTable source catalogs store coordinates as radians, but the 

234 # output must be in degrees 

235 output_catalog.loc[:, "ra"] = np.rad2deg(output_catalog.loc[:, "ra"]) 

236 output_catalog.loc[:, "dec"] = np.rad2deg(output_catalog.loc[:, "dec"]) 

237 output_catalog.rename(columns={"id": "diaForcedSourceId", 

238 "slot_PsfFlux_instFlux": "psfFlux", 

239 "slot_PsfFlux_instFluxErr": "psfFluxErr", 

240 "slot_Centroid_x": "x", 

241 "slot_Centroid_y": "y"}, 

242 inplace=True) 

243 output_catalog.loc[:, "psfFlux"] = diff_fluxes[:, 0] 

244 output_catalog.loc[:, "psfFluxErr"] = diff_fluxes[:, 1] 

245 

246 output_catalog["scienceFlux"] = direct_fluxes[:, 0] 

247 output_catalog["scienceFluxErr"] = direct_fluxes[:, 1] 

248 

249 midpointMjdTai = direct_exp.visitInfo.date.get(system=DateTime.MJD) 

250 output_catalog["visit"] = direct_exp.visitInfo.id 

251 output_catalog["detector"] = direct_exp.detector.getId() 

252 output_catalog["midpointMjdTai"] = midpointMjdTai 

253 output_catalog["band"] = diff_exp.getFilter().bandLabel 

254 # Pandas requires a naive UTC datetime. 

255 output_catalog["timeProcessedMjdTai"] = DateTime.now().get(system=DateTime.MJD, scale=DateTime.TAI) 

256 # TODO: propagate actual flags (DM-42355) 

257 

258 return output_catalog 

259 

260 def _trim_to_exposure(self, catalog, updatedDiaObjectIds, exposure): 

261 """Remove DiaForcedSources that are outside of the bounding box region. 

262 

263 Paramters 

264 --------- 

265 catalog : `pandas.DataFrame` 

266 DiaForcedSources to check against the exposure bounding box. 

267 updatedDiaObjectIds : `numpy.ndarray` 

268 Array of diaObjectIds that were updated during this dia processing. 

269 Used to assure that the pipeline includes all diaObjects that were 

270 updated in case one falls on the edge of the CCD. 

271 exposure : `lsst.afw.image.Exposure` 

272 Exposure to check against. 

273 

274 Returns 

275 ------- 

276 output : `pandas.DataFrame` 

277 DataFrame trimmed to only the objects within the exposure bounding 

278 box. 

279 """ 

280 bbox = geom.Box2D(exposure.getBBox()) 

281 

282 xS = catalog.loc[:, "x"] 

283 yS = catalog.loc[:, "y"] 

284 

285 return catalog[ 

286 np.logical_or(bbox.contains(xS, yS), 

287 np.isin(catalog.loc[:, "diaObjectId"], 

288 updatedDiaObjectIds))]