Coverage for python / lsst / ap / association / diaForcedSource.py: 30%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:21 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:21 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Methods for force photometering direct and difference images at DiaObject
23locations.
24"""
26__all__ = ["DiaForcedSourceTask", "DiaForcedSourcedConfig"]
28import numpy as np
29import pandas as pd
31import lsst.afw.table as afwTable
32from lsst.daf.base import DateTime
33import lsst.geom as geom
34from lsst.meas.base import ForcedMeasurementTask
35import lsst.pex.config as pexConfig
36import lsst.pipe.base as pipeBase
37from lsst.utils.timer import timeMethod
40class DiaForcedSourcedConfig(pexConfig.Config):
41 """Configuration for the generic DiaForcedSourcedTask class.
42 """
43 forcedMeasurement = pexConfig.ConfigurableField(
44 target=ForcedMeasurementTask,
45 doc="Subtask to force photometer DiaObjects in the direct and "
46 "difference images.",
47 )
48 dropColumns = pexConfig.ListField(
49 dtype=str,
50 doc="Columns produced in forced measurement that can be dropped upon "
51 "creation and storage of the final pandas data.",
52 )
53 historyThreshold = pexConfig.Field(
54 dtype=int,
55 doc="Minimum number of detections of a diaObject required "
56 "to run forced photometry. Set to 1 to include all diaObjects.",
57 default=2,
58 )
60 def setDefaults(self):
61 self.forcedMeasurement.plugins = ["base_TransformedCentroidFromCoord",
62 "base_PsfFlux"]
63 self.forcedMeasurement.doReplaceWithNoise = False
64 self.forcedMeasurement.copyColumns = {
65 "id": "diaObjectId",
66 "coord_ra": "ra",
67 "coord_dec": "dec"}
68 self.forcedMeasurement.slots.centroid = "base_TransformedCentroidFromCoord"
69 self.forcedMeasurement.slots.psfFlux = "base_PsfFlux"
70 self.forcedMeasurement.slots.shape = None
71 self.dropColumns = ['coord_ra', 'coord_dec', 'x', 'y', 'parent',
72 'base_TransformedCentroidFromCoord_x',
73 'base_TransformedCentroidFromCoord_y',
74 'base_PsfFlux_instFlux',
75 'base_PsfFlux_instFluxErr', 'base_PsfFlux_area',
76 'slot_PsfFlux_area', 'base_PsfFlux_flag',
77 'slot_PsfFlux_flag',
78 'base_PsfFlux_flag_noGoodPixels',
79 'slot_PsfFlux_flag_noGoodPixels',
80 'base_PsfFlux_flag_edge', 'slot_PsfFlux_flag_edge',
81 'base_PsfFlux_chi2', 'slot_PsfFlux_chi2',
82 'base_PsfFlux_npixels', 'slot_PsfFlux_npixels',
83 ]
86class DiaForcedSourceTask(pipeBase.Task):
87 """Task for measuring and storing forced sources at DiaObject locations
88 in both difference and direct images.
89 """
90 ConfigClass = DiaForcedSourcedConfig
91 _DefaultName = "diaForcedSource"
93 def __init__(self, **kwargs):
94 pipeBase.Task.__init__(self, **kwargs)
95 self.makeSubtask("forcedMeasurement",
96 refSchema=afwTable.SourceTable.makeMinimalSchema())
98 @timeMethod
99 def run(self,
100 dia_objects,
101 updatedDiaObjectIds,
102 exposure,
103 diffim,
104 idGenerator):
105 """Measure forced sources on the direct and difference images.
107 Parameters
108 ----------
109 dia_objects : `pandas.DataFrame`
110 Catalog of previously observed and newly created DiaObjects
111 contained within the difference and direct images. DiaObjects
112 must be indexed on the ``diaObjectId`` column.
113 updatedDiaObjectIds : `numpy.ndarray`
114 Array of diaObjectIds that were updated during this dia processing.
115 Used to assure that the pipeline includes all diaObjects that were
116 updated in case one falls on the edge of the CCD.
117 exposure : `lsst.afw.image.Exposure`
118 Direct image exposure.
119 diffim : `lsst.afw.image.Exposure`
120 Difference image.
121 idGenerator : `lsst.meas.base.IdGenerator`
122 Object that generates source IDs and random number generator seeds.
124 Returns
125 -------
126 output_forced_sources : `pandas.DataFrame`
127 Catalog of calibrated forced photometered fluxes on both the
128 difference and direct images at DiaObject locations.
129 """
130 # Restrict forced source measurement to objects with sufficient history to be reliable.
131 objectTable = dia_objects.query(f'nDiaSources >= {self.config.historyThreshold}')
132 if objectTable.empty:
133 # The dataframe will be coerced to the correct (empty) format in diaPipe.
134 return pd.DataFrame()
136 afw_dia_objects = self._convert_from_pandas(objectTable)
138 idFactoryDiff = idGenerator.make_table_id_factory()
140 diffForcedSources = self.forcedMeasurement.generateMeasCat(
141 diffim,
142 afw_dia_objects,
143 diffim.getWcs(),
144 idFactory=idFactoryDiff)
145 self.forcedMeasurement.run(
146 diffForcedSources, diffim, afw_dia_objects, diffim.getWcs())
148 directForcedSources = self.forcedMeasurement.generateMeasCat(
149 exposure,
150 afw_dia_objects,
151 exposure.getWcs(),
152 idFactory=idFactoryDiff)
153 self.forcedMeasurement.run(
154 directForcedSources, exposure, afw_dia_objects, exposure.getWcs())
156 output_forced_sources = self._calibrate_and_merge(diffForcedSources,
157 directForcedSources,
158 diffim,
159 exposure)
161 output_forced_sources = self._trim_to_exposure(output_forced_sources,
162 updatedDiaObjectIds,
163 exposure)
164 # Drop superfluous columns from output DataFrame.
165 output_forced_sources.drop(columns=self.config.dropColumns, inplace=True)
166 return output_forced_sources.set_index(
167 ["diaObjectId", "diaForcedSourceId"],
168 drop=False)
170 def _convert_from_pandas(self, input_objects):
171 """Create minimal schema SourceCatalog from a pandas DataFrame.
173 We need a catalog of this type to run within the forced measurement
174 subtask.
176 Parameters
177 ----------
178 input_objects : `pandas.DataFrame`
179 DiaObjects with locations and ids. ``
181 Returns
182 -------
183 outputCatalog : `lsst.afw.table.SourceTable`
184 Output catalog with minimal schema.
185 """
186 schema = afwTable.SourceTable.makeMinimalSchema()
188 outputCatalog = afwTable.SourceCatalog(schema)
189 outputCatalog.reserve(len(input_objects))
191 for obj_id, df_row in input_objects.iterrows():
192 outputRecord = outputCatalog.addNew()
193 outputRecord.setId(obj_id)
194 outputRecord.setCoord(
195 geom.SpherePoint(df_row["ra"],
196 df_row["dec"],
197 geom.degrees))
198 return outputCatalog
200 def _calibrate_and_merge(self,
201 diff_sources,
202 direct_sources,
203 diff_exp,
204 direct_exp):
205 """Take the two output catalogs from the ForcedMeasurementTasks and
206 calibrate, combine, and convert them to Pandas.
208 Parameters
209 ----------
210 diff_sources : `lsst.afw.table.SourceTable`
211 Catalog with PsFluxes measured on the difference image.
212 direct_sources : `lsst.afw.table.SourceTable`
213 Catalog with PsfFluxes measured on the direct (calexp) image.
214 diff_exp : `lsst.afw.image.Exposure`
215 Difference exposure ``diff_sources`` were measured on.
216 direct_exp : `lsst.afw.image.Exposure`
217 Direct (calexp) exposure ``direct_sources`` were measured on.
219 Returns
220 -------
221 output_catalog : `pandas.DataFrame`
222 Catalog calibrated diaForcedSources.
223 """
224 diff_calib = diff_exp.getPhotoCalib()
225 direct_calib = direct_exp.getPhotoCalib()
227 diff_fluxes = diff_calib.instFluxToNanojansky(diff_sources,
228 "slot_PsfFlux")
229 direct_fluxes = direct_calib.instFluxToNanojansky(direct_sources,
230 "slot_PsfFlux")
232 output_catalog = diff_sources.asAstropy().to_pandas()
233 # afwTable source catalogs store coordinates as radians, but the
234 # output must be in degrees
235 output_catalog.loc[:, "ra"] = np.rad2deg(output_catalog.loc[:, "ra"])
236 output_catalog.loc[:, "dec"] = np.rad2deg(output_catalog.loc[:, "dec"])
237 output_catalog.rename(columns={"id": "diaForcedSourceId",
238 "slot_PsfFlux_instFlux": "psfFlux",
239 "slot_PsfFlux_instFluxErr": "psfFluxErr",
240 "slot_Centroid_x": "x",
241 "slot_Centroid_y": "y"},
242 inplace=True)
243 output_catalog.loc[:, "psfFlux"] = diff_fluxes[:, 0]
244 output_catalog.loc[:, "psfFluxErr"] = diff_fluxes[:, 1]
246 output_catalog["scienceFlux"] = direct_fluxes[:, 0]
247 output_catalog["scienceFluxErr"] = direct_fluxes[:, 1]
249 midpointMjdTai = direct_exp.visitInfo.date.get(system=DateTime.MJD)
250 output_catalog["visit"] = direct_exp.visitInfo.id
251 output_catalog["detector"] = direct_exp.detector.getId()
252 output_catalog["midpointMjdTai"] = midpointMjdTai
253 output_catalog["band"] = diff_exp.getFilter().bandLabel
254 # Pandas requires a naive UTC datetime.
255 output_catalog["timeProcessedMjdTai"] = DateTime.now().get(system=DateTime.MJD, scale=DateTime.TAI)
256 # TODO: propagate actual flags (DM-42355)
258 return output_catalog
260 def _trim_to_exposure(self, catalog, updatedDiaObjectIds, exposure):
261 """Remove DiaForcedSources that are outside of the bounding box region.
263 Paramters
264 ---------
265 catalog : `pandas.DataFrame`
266 DiaForcedSources to check against the exposure bounding box.
267 updatedDiaObjectIds : `numpy.ndarray`
268 Array of diaObjectIds that were updated during this dia processing.
269 Used to assure that the pipeline includes all diaObjects that were
270 updated in case one falls on the edge of the CCD.
271 exposure : `lsst.afw.image.Exposure`
272 Exposure to check against.
274 Returns
275 -------
276 output : `pandas.DataFrame`
277 DataFrame trimmed to only the objects within the exposure bounding
278 box.
279 """
280 bbox = geom.Box2D(exposure.getBBox())
282 xS = catalog.loc[:, "x"]
283 yS = catalog.loc[:, "y"]
285 return catalog[
286 np.logical_or(bbox.contains(xS, yS),
287 np.isin(catalog.loc[:, "diaObjectId"],
288 updatedDiaObjectIds))]