Coverage for python / lsst / ap / association / transformDiaSourceCatalog.py: 21%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:09 +0000

1# This file is part of ap_association 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ("TransformDiaSourceCatalogConnections", 

23 "TransformDiaSourceCatalogConfig", 

24 "TransformDiaSourceCatalogTask", 

25 "UnpackApdbFlags") 

26 

27import os 

28import yaml 

29 

30import numpy as np 

31 

32from lsst.daf.base import DateTime 

33import lsst.pex.config as pexConfig 

34import lsst.pipe.base as pipeBase 

35import lsst.pipe.base.connectionTypes as connTypes 

36from lsst.pipe.tasks.postprocess import TransformCatalogBaseTask, TransformCatalogBaseConfig 

37from lsst.pipe.tasks.functors import Column 

38from lsst.utils.timer import timeMethod 

39 

40from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, readSdmSchemaFile 

41 

42 

43class TransformDiaSourceCatalogConnections(pipeBase.PipelineTaskConnections, 

44 dimensions=("instrument", "visit", "detector"), 

45 defaultTemplates={"coaddName": "deep", "fakesType": ""}): 

46 diaSourceSchema = connTypes.InitInput( 

47 doc="Schema for DIASource catalog output by ImageDifference.", 

48 storageClass="SourceCatalog", 

49 name="{fakesType}{coaddName}Diff_diaSrc_schema", 

50 ) 

51 diaSourceCat = connTypes.Input( 

52 doc="Catalog of DiaSources produced during image differencing.", 

53 name="{fakesType}{coaddName}Diff_candidateDiaSrc", 

54 storageClass="SourceCatalog", 

55 dimensions=("instrument", "visit", "detector"), 

56 ) 

57 diffIm = connTypes.Input( 

58 doc="Difference image on which the DiaSources were detected.", 

59 name="{fakesType}{coaddName}Diff_differenceExp", 

60 storageClass="ExposureF", 

61 dimensions=("instrument", "visit", "detector"), 

62 ) 

63 diaSourceTable = connTypes.Output( 

64 doc=".", 

65 name="{fakesType}{coaddName}Diff_diaSrcTable", 

66 storageClass="ArrowAstropy", 

67 dimensions=("instrument", "visit", "detector"), 

68 ) 

69 

70 

71class TransformDiaSourceCatalogConfig(TransformCatalogBaseConfig, 

72 pipelineConnections=TransformDiaSourceCatalogConnections): 

73 flagMap = pexConfig.Field( 

74 dtype=str, 

75 doc="Yaml file specifying SciencePipelines flag fields to bit packs.", 

76 default=os.path.join("${AP_ASSOCIATION_DIR}", 

77 "data", 

78 "association-flag-map.yaml"), 

79 ) 

80 flagRenameMap = pexConfig.Field( 

81 dtype=str, 

82 doc="Yaml file specifying specifying rules to rename flag names", 

83 default=os.path.join("${AP_ASSOCIATION_DIR}", 

84 "data", 

85 "flag-rename-rules.yaml"), 

86 ) 

87 doRemoveSkySources = pexConfig.Field( 

88 dtype=bool, 

89 default=False, 

90 doc="Input DiaSource catalog contains SkySources that should be " 

91 "removed before storing the output DiaSource catalog.", 

92 ) 

93 # TODO: remove on DM-41532 

94 doPackFlags = pexConfig.Field( 

95 dtype=bool, 

96 default=False, 

97 doc="Do pack the flags into one integer column named 'flags'." 

98 "If False, instead produce one boolean column per flag.", 

99 deprecated="This field is no longer used. Will be removed after v28." 

100 ) 

101 doUseApdbSchema = pexConfig.Field( 

102 dtype=bool, 

103 default=False, 

104 doc="Use the APDB schema to coerce the data types of the output columns.", 

105 deprecated="This field has been renamed to doUseSchema, and will be " 

106 "removed after v30." 

107 ) 

108 doUseSchema = pexConfig.Field( 

109 dtype=bool, 

110 default=False, 

111 doc="Use an existing schema to coerce the data types of the output columns." 

112 ) 

113 schemaDir = pexConfig.Field( 

114 dtype=str, 

115 doc="Path to the directory containing schema definitions.", 

116 default=os.path.join("${SDM_SCHEMAS_DIR}", 

117 "yml"), 

118 ) 

119 schemaFile = pexConfig.Field( 

120 dtype=str, 

121 doc="Yaml file specifying the schema of the output catalog.", 

122 default="apdb.yaml", 

123 ) 

124 schemaName = pexConfig.Field( 

125 dtype=str, 

126 doc="Name of the table in the schema file to read.", 

127 default="ApdbSchema", 

128 deprecated="This config is no longer used, and will be removed after v30" 

129 ) 

130 

131 def setDefaults(self): 

132 super().setDefaults() 

133 self.functorFile = os.path.join("${AP_ASSOCIATION_DIR}", 

134 "data", 

135 "DiaSource.yaml") 

136 

137 

138class TransformDiaSourceCatalogTask(TransformCatalogBaseTask): 

139 """Transform a DiaSource catalog by calibrating and renaming columns to 

140 produce a table ready to insert into the Apdb. 

141 

142 Parameters 

143 ---------- 

144 initInputs : `dict` 

145 Must contain ``diaSourceSchema`` as the schema for the input catalog. 

146 """ 

147 ConfigClass = TransformDiaSourceCatalogConfig 

148 _DefaultName = "transformDiaSourceCatalog" 

149 # Needed to create a valid TransformCatalogBaseTask, but unused 

150 inputDataset = "deepDiff_diaSrc" 

151 outputDataset = "deepDiff_diaSrcTable" 

152 

153 def __init__(self, initInputs, **kwargs): 

154 super().__init__(**kwargs) 

155 self.funcs = self.getFunctors() 

156 self.inputSchema = initInputs['diaSourceSchema'].schema 

157 self._create_bit_pack_mappings() 

158 if self.config.doUseSchema: 

159 schemaFile = os.path.join(self.config.schemaDir, self.config.schemaFile) 

160 self.schema = readSdmSchemaFile(schemaFile) 

161 else: 

162 self.schema = None 

163 

164 if not self.config.doPackFlags: 

165 # get the flag rename rules 

166 with open(os.path.expandvars(self.config.flagRenameMap)) as yaml_stream: 

167 self.rename_rules = list(yaml.safe_load_all(yaml_stream)) 

168 

169 def _create_bit_pack_mappings(self): 

170 """Setup all flag bit packings. 

171 """ 

172 self.bit_pack_columns = [] 

173 flag_map_file = os.path.expandvars(self.config.flagMap) 

174 with open(flag_map_file) as yaml_stream: 

175 table_list = list(yaml.safe_load_all(yaml_stream)) 

176 for table in table_list: 

177 if table['tableName'] == 'DiaSource': 

178 self.bit_pack_columns = table['columns'] 

179 break 

180 

181 # Test that all flags requested are present in the input schemas. 

182 # Output schemas are flexible, however if names are not specified in 

183 # the Apdb schema, flag columns will not be persisted. 

184 for outputFlag in self.bit_pack_columns: 

185 bitList = outputFlag['bitList'] 

186 for bit in bitList: 

187 try: 

188 self.inputSchema.find(bit['name']) 

189 except KeyError: 

190 raise KeyError( 

191 "Requested column %s not found in input DiaSource " 

192 "schema. Please check that the requested input " 

193 "column exists." % bit['name']) 

194 

195 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

196 inputs = butlerQC.get(inputRefs) 

197 inputs["band"] = butlerQC.quantum.dataId["band"] 

198 

199 outputs = self.run(**inputs) 

200 

201 butlerQC.put(outputs, outputRefs) 

202 

203 @timeMethod 

204 def run(self, 

205 diaSourceCat, 

206 diffIm, 

207 band, 

208 reliability=None): 

209 """Convert input catalog to ParquetTable/Pandas and run functors. 

210 

211 Additionally, add new columns for stripping information from the 

212 exposure and into the DiaSource catalog. 

213 

214 Parameters 

215 ---------- 

216 diaSourceCat : `lsst.afw.table.SourceCatalog` 

217 Catalog of sources measured on the difference image. 

218 diffIm : `lsst.afw.image.Exposure` 

219 Result of subtracting template and science images. 

220 band : `str` 

221 Filter band of the science image. 

222 reliability : `lsst.afw.table.SourceCatalog` 

223 Reliability (e.g. real/bogus) scores, row-matched to 

224 ``diaSourceCat``. 

225 

226 Returns 

227 ------- 

228 results : `lsst.pipe.base.Struct` 

229 Results struct with components. 

230 

231 - ``diaSourceTable`` : Catalog of DiaSources with calibrated values 

232 and renamed columns. 

233 (`lsst.pipe.tasks.ParquetTable` or `pandas.DataFrame`) 

234 """ 

235 self.log.info( 

236 "Transforming/standardizing the DiaSource table for visit,detector: %i, %i", 

237 diffIm.visitInfo.id, diffIm.detector.getId()) 

238 

239 diaSourceDf = diaSourceCat.asAstropy().to_pandas() 

240 if self.config.doRemoveSkySources: 

241 diaSourceDf = diaSourceDf[~diaSourceDf["sky_source"]] 

242 diaSourceCat = diaSourceCat[~diaSourceCat["sky_source"]] 

243 

244 # Need UTC time but without a timezone because pandas requires a 

245 # naive datetime. 

246 diaSourceDf["timeProcessedMjdTai"] = DateTime.now().get(system=DateTime.MJD, scale=DateTime.TAI) 

247 diaSourceDf["snr"] = getSignificance(diaSourceCat) 

248 diaSourceDf["bboxSize"] = self.computeBBoxSizes(diaSourceCat) 

249 diaSourceDf["visit"] = diffIm.visitInfo.id 

250 # int16 instead of uint8 because databases don't like unsigned bytes. 

251 diaSourceDf["detector"] = np.int16(diffIm.detector.getId()) 

252 diaSourceDf["band"] = band 

253 diaSourceDf["midpointMjdTai"] = diffIm.visitInfo.date.get(system=DateTime.MJD) 

254 diaSourceDf["diaObjectId"] = 0 

255 diaSourceDf["ssObjectId"] = 0 

256 

257 # TODO: this has been formally deprecated and should be removed too 

258 if self.config.doPackFlags: 

259 # either bitpack the flags 

260 self.bitPackFlags(diaSourceDf) 

261 else: 

262 # or add the individual flag functors 

263 self.addUnpackedFlagFunctors() 

264 # and remove the packed flag functor 

265 if 'flags' in self.funcs.funcDict: 

266 del self.funcs.funcDict['flags'] 

267 

268 df = self.transform(band, 

269 diaSourceDf, 

270 self.funcs, 

271 dataId=None).df 

272 if self.config.doUseSchema: 

273 df = convertDataFrameToSdmSchema(self.schema, df, tableName="DiaSource") 

274 

275 return pipeBase.Struct( 

276 diaSourceTable=df, 

277 ) 

278 

279 def addUnpackedFlagFunctors(self): 

280 """Add Column functor for each of the flags to the internal functor 

281 dictionary. 

282 """ 

283 for flag in self.bit_pack_columns[0]['bitList']: 

284 flagName = flag['name'] 

285 targetName = self.funcs.renameCol(flagName, self.rename_rules[0]['flag_rename_rules']) 

286 self.funcs.update({targetName: Column(flagName)}) 

287 

288 def computeBBoxSizes(self, inputCatalog): 

289 """Compute the size of a square bbox that fully contains the detection 

290 footprint. 

291 

292 Parameters 

293 ---------- 

294 inputCatalog : `lsst.afw.table.SourceCatalog` 

295 Catalog containing detected footprints. 

296 

297 Returns 

298 ------- 

299 outputBBoxSizes : `np.ndarray`, (N,) 

300 Array of bbox sizes. 

301 """ 

302 # Schema validation requires that this field is int. 

303 outputBBoxSizes = np.empty(len(inputCatalog), dtype=int) 

304 for i, record in enumerate(inputCatalog): 

305 footprintBBox = record.getFootprint().getBBox() 

306 # Compute twice the size of the largest dimension of the footprint 

307 # bounding box. This is the largest footprint we should need to cover 

308 # the complete DiaSource assuming the centroid is within the bounding 

309 # box. 

310 maxSize = 2 * np.max([footprintBBox.getWidth(), 

311 footprintBBox.getHeight()]) 

312 recX = record.getCentroid().x 

313 recY = record.getCentroid().y 

314 bboxSize = int( 

315 np.ceil(2 * np.max(np.fabs([footprintBBox.maxX - recX, 

316 footprintBBox.minX - recX, 

317 footprintBBox.maxY - recY, 

318 footprintBBox.minY - recY])))) 

319 if bboxSize > maxSize: 

320 bboxSize = maxSize 

321 outputBBoxSizes[i] = bboxSize 

322 

323 return outputBBoxSizes 

324 

325 def bitPackFlags(self, df): 

326 """Pack requested flag columns in inputRecord into single columns in 

327 outputRecord. 

328 

329 Parameters 

330 ---------- 

331 df : `pandas.DataFrame` 

332 DataFrame to read bits from and pack them into. 

333 """ 

334 for outputFlag in self.bit_pack_columns: 

335 bitList = outputFlag['bitList'] 

336 value = np.zeros(len(df), dtype=np.uint64) 

337 for bit in bitList: 

338 # Hard type the bit arrays. 

339 value += (df[bit['name']]*2**bit['bit']).to_numpy().astype(np.uint64) 

340 df[outputFlag['columnName']] = value 

341 

342 

343class UnpackApdbFlags: 

344 """Class for unpacking bits from integer flag fields stored in the Apdb. 

345 

346 Attributes 

347 ---------- 

348 flag_map_file : `str` 

349 Absolute or relative path to a yaml file specifiying mappings of flags 

350 to integer bits. 

351 table_name : `str` 

352 Name of the Apdb table the integer bit data are coming from. 

353 """ 

354 

355 def __init__(self, flag_map_file, table_name): 

356 self.bit_pack_columns = [] 

357 flag_map_file = os.path.expandvars(flag_map_file) 

358 with open(flag_map_file) as yaml_stream: 

359 table_list = list(yaml.safe_load_all(yaml_stream)) 

360 for table in table_list: 

361 if table['tableName'] == table_name: 

362 self.bit_pack_columns = table['columns'] 

363 break 

364 

365 self.output_flag_columns = {} 

366 

367 for column in self.bit_pack_columns: 

368 names = {} 

369 for bit in column["bitList"]: 

370 names[bit["name"]] = bit["bit"] 

371 self.output_flag_columns[column["columnName"]] = names 

372 

373 def unpack(self, input_flag_values, flag_name): 

374 """Determine individual boolean flags from an input array of unsigned 

375 ints. 

376 

377 Parameters 

378 ---------- 

379 input_flag_values : array-like of type uint 

380 Array of integer packed bit flags to unpack. 

381 flag_name : `str` 

382 Apdb column name from the loaded file, e.g. "flags". 

383 

384 Returns 

385 ------- 

386 output_flags : `numpy.ndarray` 

387 Numpy structured array of booleans, one column per flag in the 

388 loaded file. 

389 """ 

390 output_flags = np.zeros(len(input_flag_values), 

391 dtype=[(name, bool) for name in self.output_flag_columns[flag_name]]) 

392 

393 for name in self.output_flag_columns[flag_name]: 

394 masked_bits = np.bitwise_and(input_flag_values, 

395 2**self.output_flag_columns[flag_name][name]) 

396 output_flags[name] = masked_bits 

397 

398 return output_flags 

399 

400 def flagExists(self, flagName, columnName='flags'): 

401 """Check if named flag is in the bitpacked flag set. 

402 

403 Parameters: 

404 ---------- 

405 flagName : `str` 

406 Flag name to search for. 

407 columnName : `str`, optional 

408 Name of bitpacked flag column to search in. 

409 

410 Returns 

411 ------- 

412 flagExists : `bool` 

413 `True` if `flagName` is present in `columnName`. 

414 

415 Raises 

416 ------ 

417 ValueError 

418 Raised if `columnName` is not defined. 

419 """ 

420 if columnName not in self.output_flag_columns: 

421 raise ValueError(f'column {columnName} not in flag map: {self.output_flag_columns}') 

422 

423 return flagName in [c for c in self.output_flag_columns[columnName]] 

424 

425 def makeFlagBitMask(self, flagNames, columnName='flags'): 

426 """Return a bitmask corresponding to the supplied flag names. 

427 

428 Parameters: 

429 ---------- 

430 flagNames : `list` [`str`] 

431 Flag names to include in the bitmask. 

432 columnName : `str`, optional 

433 Name of bitpacked flag column. 

434 

435 Returns 

436 ------- 

437 bitmask : `np.unit64` 

438 Bitmask corresponding to the supplied flag names given the loaded configuration. 

439 

440 Raises 

441 ------ 

442 ValueError 

443 Raised if a flag in `flagName` is not included in `columnName`. 

444 """ 

445 bitmask = np.uint64(0) 

446 

447 for flag in flagNames: 

448 if not self.flagExists(flag, columnName=columnName): 

449 raise ValueError(f"flag '{flag}' not included in '{columnName}' flag column") 

450 

451 for outputFlag in self.bit_pack_columns: 

452 if outputFlag['columnName'] == columnName: 

453 bitList = outputFlag['bitList'] 

454 for bit in bitList: 

455 if bit['name'] in flagNames: 

456 bitmask += np.uint64(2**bit['bit']) 

457 

458 return bitmask 

459 

460 

461def getSignificance(catalog): 

462 """Return the significance value of the first peak in each source 

463 footprint, or NaN for peaks without a significance field. 

464 

465 Parameters 

466 ---------- 

467 catalog : `lsst.afw.table.SourceCatalog` 

468 Catalog to process. 

469 

470 Returns 

471 ------- 

472 significance : `np.ndarray`, (N,) 

473 Signficance of the first peak in each source footprint. 

474 """ 

475 result = np.full(len(catalog), np.nan) 

476 for i, record in enumerate(catalog): 

477 peaks = record.getFootprint().peaks 

478 if "significance" in peaks.schema: 

479 result[i] = peaks[0]["significance"] 

480 return result