Coverage for python / lsst / pipe / tasks / forcedPhotDetector.py: 21%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:45 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import functools 

23 

24import astropy.table 

25import numpy as np 

26import pandas as pd 

27 

28import lsst.afw.geom 

29import lsst.afw.image 

30import lsst.daf.butler 

31import lsst.pex.config 

32from lsst.meas.base import DetectorVisitIdGeneratorConfig 

33from lsst.meas.base.simple_forced_measurement import SimpleForcedMeasurementTask 

34from lsst.meas.base.applyApCorr import ApplyApCorrTask 

35import lsst.pipe.base as pipeBase 

36from lsst.pipe.base import PipelineTaskConnections, NoWorkFound 

37import lsst.pipe.base.connectionTypes as cT 

38from lsst.skymap import BaseSkyMap 

39 

40__all__ = ("ForcedPhotDetectorConfig", "ForcedPhotDetectorTask") 

41 

42 

43class ForcedPhotDetectorConnections(PipelineTaskConnections, 

44 dimensions=("visit", "detector", "tract")): 

45 exposure = cT.Input( 

46 doc="Input exposure to perform photometry on.", 

47 name="visit_image", 

48 storageClass="ExposureF", 

49 dimensions=["visit", "detector"], 

50 ) 

51 diaExposure = cT.Input( 

52 doc="Input difference image to perform photometry on.", 

53 name="difference_image", 

54 storageClass="ExposureF", 

55 dimensions=["visit", "detector"], 

56 ) 

57 refCat = cT.Input( 

58 doc="Catalog of shapes and positions at which to force photometry.", 

59 name="object_patch", 

60 storageClass="ArrowAstropy", 

61 dimensions=["tract", "patch"], 

62 multiple=True, 

63 deferLoad=True, 

64 ) 

65 skyMap = cT.Input( 

66 doc="SkyMap dataset that defines the coordinate system of the reference catalog.", 

67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

68 storageClass="SkyMap", 

69 dimensions=["skymap"], 

70 ) 

71 outputCatalog = cT.Output( 

72 doc="Output forced photometry catalog.", 

73 name="object_forced_source_unstandardized", 

74 storageClass="DataFrame", 

75 dimensions=("visit", "detector", "skymap", "tract") 

76 ) 

77 

78 def __init__(self, *, config=None): 

79 super().__init__(config=config) 

80 assert isinstance(config, ForcedPhotDetectorConfig) 

81 

82 if not config.doDirectPhotometry: 

83 del self.exposure 

84 if not config.doDifferencePhotometry: 

85 del self.diaExposure 

86 

87 

88class ForcedPhotDetectorConfig(pipeBase.PipelineTaskConfig, 

89 pipelineConnections=ForcedPhotDetectorConnections): 

90 """Configuration for the ForcedPhotDetectorTask.""" 

91 measurement = lsst.pex.config.ConfigurableField( 

92 target=SimpleForcedMeasurementTask, 

93 doc="subtask to do forced measurement" 

94 ) 

95 doApCorr = lsst.pex.config.Field( 

96 dtype=bool, 

97 default=True, 

98 doc="Run subtask to apply aperture corrections" 

99 ) 

100 applyApCorr = lsst.pex.config.ConfigurableField( 

101 target=ApplyApCorrTask, 

102 doc="Subtask to apply aperture corrections" 

103 ) 

104 doDirectPhotometry = lsst.pex.config.Field( 

105 doc="Perform direct photometry on the input exposure.", 

106 dtype=bool, 

107 default=True, 

108 ) 

109 doDifferencePhotometry = lsst.pex.config.Field( 

110 doc="Perform photometry on the difference image.", 

111 dtype=bool, 

112 default=True, 

113 ) 

114 refCatIdColumn = lsst.pex.config.Field( 

115 dtype=str, 

116 default="objectId", 

117 doc=( 

118 "Name of the column that provides the object ID from the refCat connection. " 

119 "measurement.copyColumns['id'] must be set to this value as well." 

120 "Ignored if refCatStorageClass='SourceCatalog'." 

121 ) 

122 ) 

123 refCatRaColumn = lsst.pex.config.Field( 

124 dtype=str, 

125 default="coord_ra", 

126 doc=( 

127 "Name of the column that provides the right ascension (in floating-point degrees) from the " 

128 "refCat connection. " 

129 "Ignored if refCatStorageClass='SourceCatalog'." 

130 ) 

131 ) 

132 refCatDecColumn = lsst.pex.config.Field( 

133 dtype=str, 

134 default="coord_dec", 

135 doc=( 

136 "Name of the column that provides the declination (in floating-point degrees) from the " 

137 "refCat connection. " 

138 "Ignored if refCatStorageClass='SourceCatalog'." 

139 ) 

140 ) 

141 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

142 

143 

144class ForcedPhotDetectorTask(pipeBase.PipelineTask): 

145 """A pipeline task for performing forced photometry on CCD images.""" 

146 ConfigClass = ForcedPhotDetectorConfig 

147 _DefaultName = "forcedPhotDetector" 

148 

149 def __init__(self, initInputs=None, **kwargs): 

150 super().__init__(**kwargs) 

151 

152 refSchema = lsst.afw.table.SourceTable.makeMinimalSchema() 

153 

154 self.makeSubtask("measurement", refSchema=refSchema) 

155 if self.config.doApCorr: 

156 self.makeSubtask("applyApCorr", schema=self.measurement.schema) 

157 

158 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

159 inputs = butlerQC.get(inputRefs) 

160 

161 if "exposure" in inputs: 

162 exposure = inputs["exposure"] 

163 bbox = exposure.getBBox() 

164 wcs = exposure.getWcs() 

165 if wcs is None: 

166 raise NoWorkFound("Exposure has no valid WCS.") 

167 else: 

168 exposure = None 

169 

170 if "diaExposure" in inputs: 

171 diaExposure = inputs["diaExposure"] 

172 if exposure is None: 

173 bbox = diaExposure.getBBox() 

174 wcs = diaExposure.getWcs() 

175 if wcs is None: 

176 raise NoWorkFound("Difference exposure has no valid WCS.") 

177 else: 

178 if exposure is None: 

179 # If neither exposure nor diaExposure is provided, we cannot proceed. 

180 raise NoWorkFound("No valid exposure or difference exposure provided.") 

181 diaExposure = None 

182 

183 tract = butlerQC.quantum.dataId["tract"] 

184 skyMap = inputs.pop("skyMap") 

185 refWcs = skyMap[tract].getWcs() 

186 

187 self.log.info("Filtering ref cats: %s", ','.join([str(i.dataId) for i in inputs["refCat"]])) 

188 

189 refTable = self._filterRefTable( 

190 inputs["refCat"], 

191 bbox, 

192 wcs, 

193 ) 

194 # Convert the table into a SourceCatalog. 

195 # This is necessary because the forced measurement subtask expects a 

196 # SourceCatalog as input. 

197 refCat = self._makeMinimalSourceCatalogFromAstropy(refTable) 

198 

199 # Create the catalogs to hold measurements 

200 if self.config.doDirectPhotometry: 

201 id_generator = self.config.idGenerator.apply(inputRefs.exposure.dataId) 

202 directCat = self._generateMeasCat(refCat, idFactory=id_generator.make_table_id_factory()) 

203 else: 

204 directCat = None 

205 if self.config.doDifferencePhotometry: 

206 id_generator = self.config.idGenerator.apply(inputRefs.diaExposure.dataId) 

207 diffCat = self._generateMeasCat(refCat, idFactory=id_generator.make_table_id_factory()) 

208 else: 

209 diffCat = None 

210 

211 outputs = self.run( 

212 refCat=refCat, 

213 objectIds=refTable[self.config.refCatIdColumn], 

214 visit=butlerQC.quantum.dataId["visit"], 

215 detector=butlerQC.quantum.dataId["detector"], 

216 refWcs=refWcs, 

217 directCat=directCat, 

218 diffCat=diffCat, 

219 exposure=exposure, 

220 diaExposure=diaExposure, 

221 band=butlerQC.quantum.dataId["band"], 

222 ) 

223 

224 butlerQC.put(outputs, outputRefs) 

225 

226 def run( 

227 self, 

228 refCat: lsst.afw.table.SourceCatalog, 

229 objectIds: np.ndarray, 

230 visit: int, 

231 detector: int, 

232 refWcs: lsst.afw.geom.SkyWcs, 

233 directCat: lsst.afw.table.SourceCatalog | None = None, 

234 diffCat: lsst.afw.table.SourceCatalog | None = None, 

235 exposure: lsst.afw.image.Exposure | None = None, 

236 diaExposure: lsst.afw.image.Exposure | None = None, 

237 band: str | None = None, 

238 ) -> pipeBase.Struct: 

239 """Run forced photometry on a single detector. 

240 

241 There is a lot of prep work in the `runQuantum` method and it is 

242 expected that this taks is usually run as a pipeline task, not 

243 executed as a stand alone function. 

244 

245 Parameters 

246 ---------- 

247 refCat : 

248 Reference catalog for forced photometry. 

249 objectIds : 

250 Array of object IDs corresponding to the reference catalog. 

251 visit : 

252 Visit ID for the observation. 

253 detector : 

254 Detector ID for the observation. 

255 refWcs : 

256 Reference WCS for the observation. 

257 directCat : 

258 Catalog for direct photometry. 

259 Only required when `config.doDirectPhotometry` is True. 

260 diffCat : 

261 Catalog for difference photometry. 

262 Only required when `config.doDifferencePhotometry` is True. 

263 exposure : 

264 Exposure for direct photometry. 

265 Only required when `config.doDirectPhotometry` is True. 

266 diaExposure : 

267 Exposure for difference photometry. 

268 Only required when `config.doDifferencePhotometry` is True. 

269 band : 

270 Band for the observation. 

271 """ 

272 results: dict[str, lsst.afw.table.SourceCatalog] = {} 

273 if self.config.doDirectPhotometry: 

274 if exposure is None: 

275 raise ValueError("`exposure` must be provided for direct photometry.") 

276 if directCat is None: 

277 raise ValueError("`directCat` must be provided for direct photometry.") 

278 self.log.info("Running forced measurement on %s objects", len(refCat)) 

279 self._runForcedPhotometry(refCat, directCat, exposure, refWcs) 

280 results["calexp"] = directCat 

281 

282 if self.config.doDifferencePhotometry: 

283 if diaExposure is None: 

284 raise ValueError("`diaExposure` must be provided for difference photometry.") 

285 if diffCat is None: 

286 raise ValueError("`diffCat` must be provided for difference photometry.") 

287 self.log.info("Running forced measurement on %s objects on difference image", len(refCat)) 

288 self._runForcedPhotometry(refCat, diffCat, diaExposure, refWcs) 

289 results["diff"] = diffCat 

290 

291 # Convert the astropy tables to pandas DataFrames and reindex them 

292 dfs = [] 

293 for dataset, catalog in results.items(): 

294 measTbl = catalog.asAstropy() 

295 measTbl[self.config.refCatIdColumn] = objectIds 

296 df = measTbl.to_pandas().set_index(self.config.refCatIdColumn, drop=False) 

297 df = df.reindex(sorted(df.columns), axis=1) 

298 df["visit"] = visit 

299 # int16 instead of uint8 because databases don't like unsigned bytes. 

300 df["detector"] = np.int16(detector) 

301 df["band"] = band if band else pd.NA 

302 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns], 

303 names=("dataset", "column")) 

304 dfs.append(df) 

305 

306 # Join the DataFrames on the index (which is the object ID) 

307 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs) 

308 return pipeBase.Struct(outputCatalog=outputCatalog) 

309 

310 def _runForcedPhotometry( 

311 self, 

312 refCat: lsst.afw.table.SourceCatalog, 

313 measCat: lsst.afw.table.SourceCatalog, 

314 exposure: lsst.afw.image.Exposure, 

315 refWcs: lsst.afw.geom.SkyWcs, 

316 ) -> None: 

317 """Perform forced measurement on a single exposure. 

318 

319 Parameters 

320 ---------- 

321 refCat : `lsst.afw.table.SourceCatalog` 

322 Catalog containing the reference catalog data, with columns 

323 for the object ID, right ascension, and declination. 

324 measCat : `lsst.afw.table.SourceCatalog` 

325 Catalog containing the measurement data, with columns for the 

326 object ID and measured quantities. 

327 This catalog is updated in-place. 

328 exposure : `lsst.afw.image.exposure.Exposure` 

329 Input exposure to adjust calibrations. 

330 refWcs : `lsst.afw.geom.SkyWcs` 

331 Defines the X,Y coordinate system of ``refCat``. 

332 """ 

333 self.measurement.run( 

334 refCat=refCat, 

335 measCat=measCat, 

336 exposure=exposure, 

337 refWcs=refWcs, 

338 ) 

339 if self.config.doApCorr: 

340 apCorrMap = exposure.getInfo().getApCorrMap() 

341 if apCorrMap is None: 

342 self.log.warning("Forced exposure image does not have valid aperture correction; skipping.") 

343 else: 

344 self.applyApCorr.run( 

345 catalog=measCat, 

346 apCorrMap=apCorrMap, 

347 ) 

348 

349 def _makeMinimalSourceCatalogFromAstropy(self, table): 

350 """Create minimal schema SourceCatalog from an Astropy Table. 

351 

352 The forced measurement subtask expects this as input. 

353 

354 Parameters 

355 ---------- 

356 table : `astropy.table.Table` 

357 Table with locations and ids. 

358 

359 Returns 

360 ------- 

361 outputCatalog : `lsst.afw.table.SourceTable` 

362 Output catalog with minimal schema. 

363 """ 

364 schema = lsst.afw.table.SourceTable.makeMinimalSchema() 

365 outputCatalog = lsst.afw.table.SourceCatalog(schema) 

366 outputCatalog.resize(len(table)) 

367 outputCatalog["id"] = table[self.config.refCatIdColumn] 

368 outputCatalog[outputCatalog.getCoordKey().getRa()] = np.deg2rad(table[self.config.refCatRaColumn]) 

369 outputCatalog[outputCatalog.getCoordKey().getDec()] = np.deg2rad(table[self.config.refCatDecColumn]) 

370 return outputCatalog 

371 

372 def _filterRefTable(self, refTableHandles, exposureBBox, exposureWcs): 

373 """Prepare a merged, filtered reference catalog from ArrowAstropy 

374 inputs. 

375 

376 Parameters 

377 ---------- 

378 refTableHandles : sequence of `lsst.daf.butler.DeferredDatasetHandle` 

379 Handles for catalogs of shapes and positions at which to force 

380 photometry. 

381 exposureBBox : `lsst.geom.Box2I` 

382 Bounding box on which to select rows that overlap 

383 exposureWcs : `lsst.afw.geom.SkyWcs` 

384 World coordinate system to convert sky coords in ref cat to 

385 pixel coords with which to compare with exposureBBox 

386 

387 Returns 

388 ------- 

389 refTable : `astropy.table.Table` 

390 Astropy Table with only rows from the reference 

391 catalogs that overlap the exposure bounding box. 

392 """ 

393 table_list = [] 

394 for i in refTableHandles: 

395 try: 

396 table_list.append(i.get( 

397 parameters={ 

398 "columns": [ 

399 self.config.refCatIdColumn, 

400 self.config.refCatRaColumn, 

401 self.config.refCatDecColumn, 

402 ] 

403 } 

404 )) 

405 except ValueError: 

406 self.log.info("Skipping %s due to empty object table.", i.dataId) 

407 continue 

408 if not table_list: 

409 raise NoWorkFound("All overlapping object catalogs are empty.") 

410 full_table = astropy.table.vstack(table_list) 

411 # translate ra/dec coords in table to detector pixel coords 

412 # to down-select rows that overlap the detector bbox 

413 x, y = exposureWcs.skyToPixelArray( 

414 full_table[self.config.refCatRaColumn], 

415 full_table[self.config.refCatDecColumn], 

416 degrees=True, 

417 ) 

418 inBBox = lsst.geom.Box2D(exposureBBox).contains(x, y) 

419 return full_table[inBBox] 

420 

421 def _generateMeasCat( 

422 self, 

423 refCat: lsst.afw.table.SourceCatalog, 

424 idFactory: lsst.afw.table.IdFactory | None = None, 

425 ) -> lsst.afw.table.SourceCatalog: 

426 r"""Initialize an output catalog from the reference catalog. 

427 

428 Parameters 

429 ---------- 

430 refCat : `lsst.afw.table.SourceCatalog,` 

431 Catalog of reference sources. 

432 idFactory : `lsst.afw.table.IdFactory`, optional 

433 Factory for creating IDs for sources. 

434 

435 Returns 

436 ------- 

437 meascat : `lsst.afw.table.SourceCatalog` 

438 Source catalog ready for measurement. 

439 

440 Notes 

441 ----- 

442 This generates a new blank `~lsst.afw.table.SourceRecord` for each 

443 record in ``refCat``. Note that this method does not attach any 

444 `~lsst.afw.detection.Footprint`\ s, which is done in 

445 ``config.measure``. 

446 """ 

447 if idFactory is None: 

448 idFactory = lsst.afw.table.IdFactory.makeSimple() 

449 table = lsst.afw.table.SourceTable.make(self.measurement.schema, idFactory) 

450 measCat = lsst.afw.table.SourceCatalog(table) 

451 table = measCat.table 

452 table.setMetadata(self.measurement.algMetadata) 

453 table.preallocate(len(refCat)) 

454 for ref in refCat: 

455 newSource = measCat.addNew() 

456 newSource.assign(ref, self.measurement.mapper) 

457 return measCat