Coverage for python / lsst / ap / association / diaPipe.py: 19%

369 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:54 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008-2016 AURA/LSST. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23"""PipelineTask for associating DiaSources with previous DiaObjects. 

24 

25Additionally performs forced photometry on the calibrated and difference 

26images at the updated locations of DiaObjects. 

27""" 

28 

29__all__ = ("DiaPipelineConfig", 

30 "DiaPipelineTask", 

31 "DiaPipelineConnections") 

32 

33 

34import lsst.dax.apdb as daxApdb 

35import lsst.pex.config as pexConfig 

36import lsst.pipe.base as pipeBase 

37import lsst.pipe.base.connectionTypes as connTypes 

38import lsst.sphgeom 

39 

40from astropy.table import Table 

41import numpy as np 

42import pandas as pd 

43from lsst.ap.association import ( 

44 AssociationTask, 

45 DiaForcedSourceTask, 

46 PackageAlertsTask) 

47 

48from lsst.ap.association.utils import makeEmptyForcedSourceTable, getRegion, paddedRegion, readSchemaFromApdb 

49from lsst.daf.base import DateTime 

50from lsst.meas.base import DetectorVisitIdGeneratorConfig, \ 

51 DiaObjectCalculationTask 

52from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, checkSdmSchemaColumns, \ 

53 dropEmptyColumns, make_empty_catalog 

54from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask 

55from lsst.utils.timer import timeMethod, duration_from_timeMethod 

56 

57 

58class TooManyDiaObjectsError(pipeBase.AlgorithmError): 

59 """Raised if there are an unusually large number of unassociated DiaSources. 

60 This is usually indicative of an image subtraction error, and needs to be 

61 caught before updating the APDB with a large number of spurious entries. 

62 """ 

63 def __init__(self, *, nNewDiaObjects, threshold): 

64 msg = ("Aborting processing before writing to the APDB." 

65 f" {nNewDiaObjects} new DiaObjects would be created, which exceeds the" 

66 f" configured maximum of {threshold}") 

67 super().__init__(msg) 

68 self.nNewDiaObjects = nNewDiaObjects 

69 self.threshold = threshold 

70 

71 @property 

72 def metadata(self): 

73 return {"nNewDiaObjects": self.nNewDiaObjects, 

74 "threshold": self.threshold 

75 } 

76 

77 

78class DiaPipelineConnections( 

79 pipeBase.PipelineTaskConnections, 

80 dimensions=("instrument", "visit", "detector"), 

81 defaultTemplates={"coaddName": "deep", "fakesType": ""}): 

82 """Butler connections for DiaPipelineTask. 

83 """ 

84 diaSourceTable = connTypes.Input( 

85 doc="Catalog of calibrated DiaSources.", 

86 name="{fakesType}{coaddName}Diff_diaSrcTable", 

87 storageClass="DataFrame", 

88 dimensions=("instrument", "visit", "detector"), 

89 ) 

90 solarSystemObjectTable = connTypes.Input( 

91 doc="Catalog of SolarSolarSystem objects expected to be observable in " 

92 "this detectorVisit.", 

93 name="preloaded_SsObjects", 

94 storageClass="ArrowAstropy", 

95 dimensions=("instrument", "group", "detector"), 

96 minimum=0, 

97 ) 

98 diffIm = connTypes.Input( 

99 doc="Difference image on which the DiaSources were detected.", 

100 name="{fakesType}{coaddName}Diff_differenceExp", 

101 storageClass="ExposureF", 

102 dimensions=("instrument", "visit", "detector"), 

103 ) 

104 exposure = connTypes.Input( 

105 doc="Calibrated exposure differenced with a template image during " 

106 "image differencing.", 

107 name="{fakesType}calexp", 

108 storageClass="ExposureF", 

109 dimensions=("instrument", "visit", "detector"), 

110 ) 

111 template = connTypes.Input( 

112 doc="Warped template used to create `subtractedExposure`. Not PSF " 

113 "matched.", 

114 dimensions=("instrument", "visit", "detector"), 

115 storageClass="ExposureF", 

116 name="{fakesType}{coaddName}Diff_templateExp", 

117 ) 

118 preloadedDiaObjects = connTypes.Input( 

119 doc="DiaObjects preloaded from the APDB.", 

120 name="preloaded_diaObjects", 

121 storageClass="DataFrame", 

122 dimensions=("instrument", "group", "detector"), 

123 ) 

124 preloadedDiaSources = connTypes.Input( 

125 doc="DiaSources preloaded from the APDB.", 

126 name="preloaded_diaSources", 

127 storageClass="DataFrame", 

128 dimensions=("instrument", "group", "detector"), 

129 ) 

130 preloadedDiaForcedSources = connTypes.Input( 

131 doc="DiaForcedSources preloaded from the APDB.", 

132 name="preloaded_diaForcedSources", 

133 storageClass="DataFrame", 

134 dimensions=("instrument", "group", "detector"), 

135 ) 

136 apdbMarker = connTypes.Output( 

137 doc="Marker dataset storing the configuration of the Apdb for each " 

138 "visit/detector. Used to signal the completion of the pipeline.", 

139 name="apdb_marker", 

140 storageClass="Config", 

141 dimensions=("instrument", "visit", "detector"), 

142 ) 

143 associatedDiaSources = connTypes.Output( 

144 doc="Optional output storing the DiaSource catalog after matching, " 

145 "calibration, and standardization for insertion into the Apdb.", 

146 name="{fakesType}{coaddName}Diff_assocDiaSrc", 

147 storageClass="ArrowAstropy", 

148 dimensions=("instrument", "visit", "detector"), 

149 ) 

150 associatedSsSources = connTypes.Output( 

151 doc="Optional output storing ssSource data computed during association.", 

152 name="{fakesType}{coaddName}Diff_associatedSsSources", 

153 storageClass="ArrowAstropy", 

154 dimensions=("instrument", "visit", "detector"), 

155 ) 

156 unassociatedSsObjects = connTypes.Output( 

157 doc="Expected locations of an ssObject with no source", 

158 name="ssUnassociatedObjects", 

159 storageClass="ArrowAstropy", 

160 dimensions=("instrument", "visit", "detector"), 

161 ) 

162 

163 diaForcedSources = connTypes.Output( 

164 doc="Optional output storing the forced sources computed at the diaObject positions.", 

165 name="{fakesType}{coaddName}Diff_diaForcedSrc", 

166 storageClass="ArrowAstropy", 

167 dimensions=("instrument", "visit", "detector"), 

168 ) 

169 diaObjects = connTypes.Output( 

170 doc="Optional output storing the updated diaObjects associated to these sources.", 

171 name="{fakesType}{coaddName}Diff_diaObject", 

172 storageClass="ArrowAstropy", 

173 dimensions=("instrument", "visit", "detector"), 

174 ) 

175 newDiaSources = connTypes.Output( 

176 doc="New diaSources not associated with an existing diaObject that" 

177 " were used to create a new diaObject", 

178 name="{fakesType}new_dia_source", 

179 storageClass="ArrowAstropy", 

180 dimensions=("instrument", "visit", "detector"), 

181 ) 

182 marginalDiaSources = connTypes.Output( 

183 doc="Low SNR diaSources not associated with an existing diaObject that" 

184 " were rejected instead of creating a new diaObject", 

185 name="{fakesType}marginal_new_dia_source", 

186 storageClass="ArrowAstropy", 

187 dimensions=("instrument", "visit", "detector"), 

188 ) 

189 

190 def __init__(self, *, config=None): 

191 super().__init__(config=config) 

192 

193 if not config.doWriteAssociatedSources: 

194 self.outputs.remove("associatedDiaSources") 

195 self.outputs.remove("diaForcedSources") 

196 self.outputs.remove("diaObjects") 

197 self.outputs.remove("newDiaSources") 

198 self.outputs.remove("marginalDiaSources") 

199 else: 

200 if not config.doRunForcedMeasurement: 

201 self.outputs.remove("diaForcedSources") 

202 if not config.filterUnAssociatedSources: 

203 self.outputs.remove("newDiaSources") 

204 self.outputs.remove("marginalDiaSources") 

205 if not config.doSolarSystemAssociation: 

206 self.inputs.remove("solarSystemObjectTable") 

207 if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation): 

208 self.outputs.remove("associatedSsSources") 

209 self.outputs.remove("unassociatedSsObjects") 

210 

211 def adjustQuantum(self, inputs, outputs, label, dataId): 

212 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects 

213 in the `lsst.daf.butler.core.Quantum` during the graph generation stage 

214 of the activator. 

215 

216 This implementation checks to make sure that the filters in the dataset 

217 are compatible with AP processing as set by the Apdb/DPDD schema. 

218 

219 Parameters 

220 ---------- 

221 inputs : `dict` 

222 Dictionary whose keys are an input (regular or prerequisite) 

223 connection name and whose values are a tuple of the connection 

224 instance and a collection of associated `DatasetRef` objects. 

225 The exact type of the nested collections is unspecified; it can be 

226 assumed to be multi-pass iterable and support `len` and ``in``, but 

227 it should not be mutated in place. In contrast, the outer 

228 dictionaries are guaranteed to be temporary copies that are true 

229 `dict` instances, and hence may be modified and even returned; this 

230 is especially useful for delegating to `super` (see notes below). 

231 outputs : `dict` 

232 Dict of output datasets, with the same structure as ``inputs``. 

233 label : `str` 

234 Label for this task in the pipeline (should be used in all 

235 diagnostic messages). 

236 data_id : `lsst.daf.butler.DataCoordinate` 

237 Data ID for this quantum in the pipeline (should be used in all 

238 diagnostic messages). 

239 

240 Returns 

241 ------- 

242 adjusted_inputs : `dict` 

243 Dict of the same form as ``inputs`` with updated containers of 

244 input `DatasetRef` objects. Connections that are not changed 

245 should not be returned at all. Datasets may only be removed, not 

246 added. Nested collections may be of any multi-pass iterable type, 

247 and the order of iteration will set the order of iteration within 

248 `PipelineTask.runQuantum`. 

249 adjusted_outputs : `dict` 

250 Dict of updated output datasets, with the same structure and 

251 interpretation as ``adjusted_inputs``. 

252 

253 Raises 

254 ------ 

255 ScalarError 

256 Raised if any `Input` or `PrerequisiteInput` connection has 

257 ``multiple`` set to `False`, but multiple datasets. 

258 NoWorkFound 

259 Raised to indicate that this quantum should not be run; not enough 

260 datasets were found for a regular `Input` connection, and the 

261 quantum should be pruned or skipped. 

262 FileNotFoundError 

263 Raised to cause QuantumGraph generation to fail (with the message 

264 included in this exception); not enough datasets were found for a 

265 `PrerequisiteInput` connection. 

266 """ 

267 _, refs = inputs["diffIm"] 

268 for ref in refs: 

269 if ref.dataId["band"] not in self.config.validBands: 

270 raise ValueError( 

271 f"Requested '{ref.dataId['band']}' not in " 

272 "DiaPipelineConfig.validBands. To process bands not in " 

273 "the standard Rubin set (ugrizy) you must add the band to " 

274 "the validBands list in DiaPipelineConfig and add the " 

275 "appropriate columns to the Apdb schema.") 

276 return super().adjustQuantum(inputs, outputs, label, dataId) 

277 

278 

279class DiaPipelineConfig(pipeBase.PipelineTaskConfig, 

280 pipelineConnections=DiaPipelineConnections): 

281 """Config for DiaPipelineTask. 

282 """ 

283 coaddName = pexConfig.Field( 

284 doc="coadd name: typically one of deep, goodSeeing, or dcr", 

285 dtype=str, 

286 default="deep", 

287 ) 

288 apdb_config_url = pexConfig.Field( 

289 dtype=str, 

290 default=None, 

291 optional=False, 

292 doc="A config file specifying the APDB and its connection parameters, " 

293 "typically written by the apdb-cli command-line utility. " 

294 "The database must already be initialized.", 

295 ) 

296 validBands = pexConfig.ListField( 

297 dtype=str, 

298 default=["u", "g", "r", "i", "z", "y"], 

299 doc="List of bands that are valid for AP processing. To process a " 

300 "band not on this list, the appropriate band specific columns " 

301 "must be added to the Apdb schema in dax_apdb.", 

302 ) 

303 associator = pexConfig.ConfigurableField( 

304 target=AssociationTask, 

305 doc="Task used to associate DiaSources with DiaObjects.", 

306 ) 

307 doSolarSystemAssociation = pexConfig.Field( 

308 dtype=bool, 

309 default=True, 

310 doc="Process SolarSystem objects through the pipeline.", 

311 ) 

312 solarSystemAssociator = pexConfig.ConfigurableField( 

313 target=SolarSystemAssociationTask, 

314 doc="Task used to associate DiaSources with SolarSystemObjects.", 

315 ) 

316 diaCalculation = pexConfig.ConfigurableField( 

317 target=DiaObjectCalculationTask, 

318 doc="Task to compute summary statistics for DiaObjects.", 

319 ) 

320 doReloadDiaObjects = pexConfig.Field( 

321 dtype=bool, 

322 default=True, 

323 doc="Drop preloaded DiaObjects and reload them from the APDB?" 

324 "Used in production when the very latest objects from the APDB " 

325 "are needed.", 

326 ) 

327 angleMargin = pexConfig.RangeField( 

328 doc="Padding to add when loading diaObjects if `doReloadDiaObjects=True`", 

329 dtype=float, 

330 default=2, 

331 min=0, 

332 ) 

333 doRunForcedMeasurement = pexConfig.Field( 

334 dtype=bool, 

335 default=True, 

336 deprecated="Added to allow disabling forced sources for performance " 

337 "reasons during the ops rehearsal. " 

338 "It is expected to be removed.", 

339 doc="Run forced measurement on all of the diaObjects? " 

340 "This should only be turned off for debugging purposes.", 

341 ) 

342 diaForcedSource = pexConfig.ConfigurableField( 

343 target=DiaForcedSourceTask, 

344 doc="Task used for force photometer DiaObject locations in direct and " 

345 "difference images.", 

346 ) 

347 alertPackager = pexConfig.ConfigurableField( 

348 target=PackageAlertsTask, 

349 doc="Subtask for packaging Ap data into alerts.", 

350 ) 

351 doPackageAlerts = pexConfig.Field( 

352 dtype=bool, 

353 default=False, 

354 doc="Package Dia-data into serialized alerts for distribution and " 

355 "write them to disk.", 

356 ) 

357 doWriteAssociatedSources = pexConfig.Field( 

358 dtype=bool, 

359 default=True, 

360 doc="Write out associated DiaSources, DiaForcedSources, and DiaObjects, " 

361 "formatted following the Science Data Model.", 

362 ) 

363 imagePixelMargin = pexConfig.RangeField( 

364 dtype=int, 

365 default=10, 

366 min=0, 

367 doc="Pad the image by this many pixels before removing off-image " 

368 "diaObjects for association.", 

369 ) 

370 filterUnAssociatedSources = pexConfig.Field( 

371 dtype=bool, 

372 default=True, 

373 doc="Check unassociated diaSources for quality before creating new ." 

374 "diaObjects. DiaSources that are associated with existing diaObjects " 

375 "will not be affected." 

376 ) 

377 newObjectBadFlags = pexConfig.ListField( 

378 dtype=str, 

379 default=("centroid_flag", 

380 "pixelFlags_crCenter", 

381 "pixelFlags_nodataCenter", 

382 "pixelFlags_interpolatedCenter", 

383 "pixelFlags_saturatedCenter", 

384 "pixelFlags_suspectCenter", 

385 "pixelFlags_streakCenter", 

386 "glint_trail"), 

387 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

388 "these flags set before creating new diaObjects.", 

389 ) 

390 maxNewDiaObjects = pexConfig.RangeField( 

391 dtype=float, 

392 default=0, 

393 min=0, 

394 doc="Maximum number of new DiaObjects to create before raising an error." 

395 "Set to zero to disable.", 

396 ) 

397 newObjectSnrThreshold = pexConfig.RangeField( 

398 dtype=float, 

399 default=0, 

400 min=0, 

401 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

402 "Abs(flux/fluxErr) less than this threshold before creating new " 

403 "diaObjects." 

404 "Set to zero to disable.", 

405 ) 

406 newObjectLowReliabilitySnrThreshold = pexConfig.RangeField( 

407 dtype=float, 

408 default=0, 

409 min=0, 

410 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

411 "signal-to-noise ratios less than this threshold if they have" 

412 " low reliability scores before creating new diaObjects." 

413 "Set to zero to disable.", 

414 ) 

415 newObjectReliabilityThreshold = pexConfig.RangeField( 

416 dtype=float, 

417 default=0.1, 

418 min=0, 

419 max=1, 

420 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

421 "reliability scores less than this threshold before creating new " 

422 "diaObjects." 

423 "Set to zero to disable.", 

424 ) 

425 newObjectLowSnrReliabilityThreshold = pexConfig.RangeField( 

426 dtype=float, 

427 default=0.1, 

428 min=0, 

429 max=1, 

430 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

431 "low signal-to-noise and reliability scores below this threshold " 

432 "before creating new diaObjects." 

433 "Set to zero to disable.", 

434 ) 

435 newObjectFluxField = pexConfig.Field( 

436 dtype=str, 

437 default="apFlux", 

438 doc="Name of the flux field in the standardized diaSource catalog to " 

439 "use for checking the signal-to-noise before creating new diaObjects.", 

440 ) 

441 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

442 

443 def setDefaults(self): 

444 self.diaCalculation.plugins = ["ap_meanPosition", 

445 "ap_nDiaSources", 

446 "ap_meanFlux", 

447 "ap_sigmaFlux", 

448 "ap_minMaxFlux", 

449 "ap_maxSlopeFlux", 

450 "ap_meanErrFlux", 

451 "ap_meanTotFlux"] 

452 

453 

454class DiaPipelineTask(pipeBase.PipelineTask): 

455 """Task for loading, associating and storing Difference Image Analysis 

456 (DIA) Objects and Sources. 

457 """ 

458 ConfigClass = DiaPipelineConfig 

459 _DefaultName = "diaPipe" 

460 

461 def __init__(self, initInputs=None, **kwargs): 

462 super().__init__(**kwargs) 

463 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url) 

464 self.schema = readSchemaFromApdb(self.apdb) 

465 self.makeSubtask("associator") 

466 self.makeSubtask("diaCalculation") 

467 if self.config.doRunForcedMeasurement: 

468 self.makeSubtask("diaForcedSource") 

469 if self.config.doPackageAlerts: 

470 self.makeSubtask("alertPackager") 

471 if self.config.doSolarSystemAssociation: 

472 self.makeSubtask("solarSystemAssociator") 

473 if self.config.filterUnAssociatedSources: 

474 columns = [self.config.newObjectFluxField, 

475 self.config.newObjectFluxField + "Err", 

476 "reliability"] 

477 columns += self.config.newObjectBadFlags 

478 

479 missing = checkSdmSchemaColumns(self.schema, columns, "DiaSource") 

480 if missing: 

481 raise pipeBase.InvalidQuantumError("Field %s not in the DiaSource schema" % missing) 

482 

483 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

484 inputs = butlerQC.get(inputRefs) 

485 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId) 

486 inputs["band"] = butlerQC.quantum.dataId["band"] 

487 inputs["legacySolarSystemTable"] = None 

488 if not self.config.doSolarSystemAssociation: 

489 inputs["solarSystemObjectTable"] = None 

490 

491 outputs = self.run(**inputs) 

492 

493 butlerQC.put(outputs, outputRefs) 

494 

495 @timeMethod 

496 def run(self, 

497 diaSourceTable, 

498 legacySolarSystemTable, 

499 diffIm, 

500 exposure, 

501 template, 

502 preloadedDiaObjects, 

503 preloadedDiaSources, 

504 preloadedDiaForcedSources, 

505 band, 

506 idGenerator, 

507 solarSystemObjectTable=None): 

508 """Process DiaSources and DiaObjects. 

509 

510 Load previous DiaObjects and their DiaSource history. Calibrate the 

511 values in the diaSourceCat. Associate new DiaSources with previous 

512 DiaObjects. Run forced photometry at the updated DiaObject locations. 

513 Store the results in the Alert Production Database (Apdb). 

514 

515 Parameters 

516 ---------- 

517 diaSourceTable : `pandas.DataFrame` 

518 Newly detected DiaSources. 

519 legacySolarSystemTable : `pandas.DataFrame` 

520 Not used 

521 diffIm : `lsst.afw.image.ExposureF` 

522 Difference image exposure in which the sources in ``diaSourceCat`` 

523 were detected. 

524 exposure : `lsst.afw.image.ExposureF` 

525 Calibrated exposure differenced with a template to create 

526 ``diffIm``. 

527 template : `lsst.afw.image.ExposureF` 

528 Template exposure used to create diffIm. 

529 preloadedDiaObjects : `pandas.DataFrame` 

530 Previously detected DiaObjects, loaded from the APDB. 

531 preloadedDiaSources : `pandas.DataFrame` 

532 Previously detected DiaSources, loaded from the APDB. 

533 preloadedDiaForcedSources : `pandas.DataFrame` 

534 Catalog of previously detected forced DiaSources, from the APDB 

535 band : `str` 

536 The band in which the new DiaSources were detected. 

537 idGenerator : `lsst.meas.base.IdGenerator` 

538 Object that generates source IDs and random number generator seeds. 

539 solarSystemObjectTable : `astropy.table.Table` 

540 Preloaded Solar System objects expected to be visible in the image. 

541 

542 Returns 

543 ------- 

544 results : `lsst.pipe.base.Struct` 

545 Results struct with components. 

546 

547 - ``apdbMarker`` : Marker dataset to store in the Butler indicating 

548 that this ccdVisit has completed successfully. 

549 (`lsst.dax.apdb.ApdbConfig`) 

550 - ``associatedDiaSources`` : Catalog of newly associated 

551 DiaSources. (`pandas.DataFrame`) 

552 - ``diaForcedSources`` : Catalog of new and previously detected 

553 forced DiaSources. (`pandas.DataFrame`) 

554 - ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`) 

555 - ``associatedSsSources`` : Catalog of ssSource records. 

556 (`pandas.DataFrame`) 

557 

558 Raises 

559 ------ 

560 RuntimeError 

561 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

562 """ 

563 # Accept either legacySolarSystemTable or optional solarSystemObjectTable. 

564 if legacySolarSystemTable is not None and solarSystemObjectTable is None: 

565 solarSystemObjectTable = Table.from_pandas(legacySolarSystemTable) 

566 if self.config.doReloadDiaObjects: 

567 try: 

568 preloadedDiaObjects = self.loadRefreshedDiaObjects(getRegion(exposure), preloadedDiaObjects) 

569 except Exception as e: 

570 self.log.warning("Error encountered while attempting to load " 

571 "the latest diaObjects from the APDB. Processing " 

572 "will continue with only the diaObjects from " 

573 "preload. Error:", e) 

574 finally: 

575 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod( 

576 self.metadata, "loadRefreshedDiaObjects", clock="Utc" 

577 ) 

578 self.log.verbose("Re-loading DiaObjects: Took %.4f seconds", 

579 self.metadata["loadDiaObjectsDuration"]) 

580 

581 else: 

582 self.metadata["loadDiaObjectsDuration"] = -1 

583 

584 self.checkTableIndex(preloadedDiaSources, index=["diaObjectId", "band", "diaSourceId"]) 

585 self.checkTableIndex(preloadedDiaObjects, index="diaObjectId") 

586 self.checkTableIndex(preloadedDiaForcedSources, index=["diaObjectId", "diaForcedSourceId"]) 

587 

588 if not preloadedDiaObjects.empty: 

589 # Include a small buffer outside the image so that we can associate sources near the edge 

590 diaObjects, _ = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects, 

591 buffer=self.config.imagePixelMargin) 

592 else: 

593 self.log.info("Preloaded DiaObject table is empty.") 

594 diaObjects = preloadedDiaObjects 

595 

596 # Associate DiaSources with DiaObjects 

597 assocResults = self.associateDiaSources(diaSourceTable, solarSystemObjectTable, diffIm, diaObjects) 

598 

599 # Set unassociated diaObjectIds and ssObjectIds to NULL, and convert to SDM schema format 

600 standardizedAssociatedDiaSources = self.standardizeDataFrame( 

601 assocResults.associatedDiaSources, 

602 "DiaSource", 

603 nullColumns=["diaObjectId", "ssObjectId"] 

604 ) 

605 

606 # Merge associated diaSources 

607 mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs( 

608 preloadedDiaSources, 

609 assocResults.associatedDiaSources, 

610 diaObjects, 

611 assocResults.newDiaObjects, 

612 diffIm 

613 ) 

614 

615 # Compute DiaObject Summary statistics from their full DiaSource 

616 # history. 

617 diaCalResult = self.diaCalculation.run( 

618 mergedDiaObjects, 

619 mergedDiaSourceHistory, 

620 updatedDiaObjectIds, 

621 [band]) 

622 updatedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaCalResult.updatedDiaObjects, 

623 tableName="DiaObject", skipIndex=True) 

624 

625 # Test for duplication in the updated DiaObjects. 

626 if self.testDataFrameIndex(diaCalResult.diaObjectCat): 

627 raise RuntimeError( 

628 "Duplicate DiaObjects (loaded + updated) created after " 

629 "DiaCalculation. This is unexpected behavior and should be " 

630 "reported. Exiting.") 

631 if self.testDataFrameIndex(updatedDiaObjects): 

632 raise RuntimeError( 

633 "Duplicate DiaObjects (updated) created after " 

634 "DiaCalculation. This is unexpected behavior and should be " 

635 "reported. Exiting.") 

636 

637 # Forced source measurement 

638 if self.config.doRunForcedMeasurement: 

639 diaForcedSources = self.runForcedMeasurement( 

640 diaCalResult.diaObjectCat, updatedDiaObjects, exposure, diffIm, idGenerator 

641 ) 

642 forcedSourceHistoryThreshold = self.diaForcedSource.config.historyThreshold 

643 else: 

644 # alertPackager needs correct columns 

645 diaForcedSources = makeEmptyForcedSourceTable(self.schema) 

646 forcedSourceHistoryThreshold = 0 

647 

648 # Write results to Alert Production Database (APDB) 

649 try: 

650 validityStart = self.writeToApdb(updatedDiaObjects, standardizedAssociatedDiaSources, 

651 diaForcedSources) 

652 finally: 

653 self.metadata["writeToApdbDuration"] = duration_from_timeMethod(self.metadata, "writeToApdb", 

654 clock="Utc") 

655 # A single log message is easier for Loki to parse than timeMethod's start+end pairs. 

656 self.log.verbose("writeToApdb: Took %.4f seconds", self.metadata["writeToApdbDuration"]) 

657 

658 associatedSsSources = assocResults.associatedSsSources 

659 associatedSsSourcesPlusMpcorb = None 

660 if associatedSsSources is not None: 

661 mpcorbColumns = [col for col in associatedSsSources.columns if col[:7] == 'MPCORB_'] 

662 associatedSsSourceMpcorb = associatedSsSources[mpcorbColumns].copy() 

663 associatedSsSources = self.standardizeTable(associatedSsSources, "SSSource", nullColumns=[]) 

664 associatedSsSourcesPlusMpcorb = associatedSsSources.copy() 

665 for mpcorbColumn in mpcorbColumns: 

666 associatedSsSourcesPlusMpcorb[mpcorbColumn] = associatedSsSourceMpcorb[mpcorbColumn] 

667 

668 # patch the otherwise-empty validityStart field for the alerts 

669 updatedDiaObjects['validityStartMjdTai'] = validityStart.get(system=DateTime.MJD, scale=DateTime.TAI) 

670 

671 # Package alerts 

672 if self.config.doPackageAlerts: 

673 # Append new forced sources to the full history 

674 diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources, 

675 tableName="DiaForcedSource") 

676 if self.testDataFrameIndex(diaForcedSources): 

677 self.log.warning( 

678 "Duplicate DiaForcedSources created after merge with " 

679 "history and new sources. This may cause downstream " 

680 "problems. Dropping duplicates.") 

681 # Drop duplicates via index and keep the first appearance. 

682 # Reset due to the index shape being slight different than 

683 # expected. 

684 diaForcedSourcesFull = diaForcedSourcesFull.groupby( 

685 diaForcedSourcesFull.index).first() 

686 diaForcedSourcesFull.reset_index(drop=True, inplace=True) 

687 diaForcedSourcesFull.set_index( 

688 ["diaObjectId", "diaForcedSourceId"], 

689 drop=False, 

690 inplace=True) 

691 

692 self.alertPackager.run(assocResults.associatedDiaSources, 

693 updatedDiaObjects, 

694 preloadedDiaSources, 

695 diaForcedSourcesFull, 

696 diffIm, 

697 exposure, 

698 template, 

699 ssSrc=associatedSsSourcesPlusMpcorb, 

700 doRunForcedMeasurement=self.config.doRunForcedMeasurement, 

701 forcedSourceHistoryThreshold=forcedSourceHistoryThreshold, 

702 ) 

703 

704 # For historical reasons, apdbMarker is a Config even if it's not meant to be read. 

705 # A default Config is the cheapest way to satisfy the storage class. 

706 marker = pexConfig.Config() 

707 return pipeBase.Struct(apdbMarker=marker, 

708 associatedDiaSources=assocResults.associatedDiaSources, 

709 diaForcedSources=diaForcedSources, 

710 diaObjects=diaCalResult.diaObjectCat, 

711 associatedSsSources=associatedSsSources, 

712 unassociatedSsObjects=assocResults.unassociatedSsObjects, 

713 newDiaSources=assocResults.newDiaSources, 

714 marginalDiaSources=assocResults.marginalDiaSources 

715 ) 

716 

717 def createNewDiaObjects(self, unassociatedDiaSources): 

718 """Loop through the set of DiaSources and create new DiaObjects 

719 for unassociated DiaSources. 

720 

721 Parameters 

722 ---------- 

723 unassociatedDiaSources : `pandas.DataFrame` 

724 Set of DiaSources to create new DiaObjects from. 

725 

726 Returns 

727 ------- 

728 results : `lsst.pipe.base.Struct` 

729 Results struct containing: 

730 

731 - diaSources : `pandas.DataFrame` 

732 DiaSource catalog with updated DiaObject ids. 

733 - newDiaObjects : `pandas.DataFrame` 

734 Newly created DiaObjects from the unassociated DiaSources. 

735 - nNewDiaObjects : `int` 

736 Number of newly created diaObjects. 

737 - marginalDiaSources : `pandas.DataFrame` 

738 Unassociated diaSources with low signal-to-noise and/or 

739 reliability, which are excluded from the new DiaObjects. 

740 """ 

741 marginalDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

742 if len(unassociatedDiaSources) == 0: 

743 newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject") 

744 else: 

745 if self.config.filterUnAssociatedSources: 

746 results = self.filterSources(unassociatedDiaSources) 

747 unassociatedDiaSources = results.goodSources 

748 marginalDiaSources = results.badSources 

749 unassociatedDiaSources["diaObjectId"] = unassociatedDiaSources["diaSourceId"] 

750 newDiaObjects = convertDataFrameToSdmSchema(self.schema, unassociatedDiaSources, 

751 tableName="DiaObject", skipIndex=True) 

752 self.metadata["nRejectedNewDiaObjects"] = len(marginalDiaSources) 

753 return pipeBase.Struct(diaSources=unassociatedDiaSources, 

754 newDiaObjects=newDiaObjects, 

755 nNewDiaObjects=len(newDiaObjects), 

756 marginalDiaSources=marginalDiaSources) 

757 

758 def filterSources(self, sources, snrThreshold=None, lowReliabilitySnrThreshold=None, 

759 reliabilityThreshold=None, lowSnrReliabilityThreshold=None, badFlags=None): 

760 """Select good sources out of a catalog. 

761 

762 Parameters 

763 ---------- 

764 sources : `pandas.DataFrame` 

765 Set of DiaSources to check. 

766 snrThreshold : `float`, optional 

767 The minimum signal to noise diaSource to make a new diaObject. 

768 Included for unit tests. Uses the task config value if not set. 

769 lowReliabilitySnrThreshold : `float`, optional 

770 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

771 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

772 Included for unit tests. Uses the task config value if not set. 

773 reliabilityThreshold : `float`, optional 

774 The minimum reliability score diaSource to make a new diaObject 

775 Included for unit tests. Uses the task config value if not set. 

776 lowSnrReliabilityThreshold : `float`, optional 

777 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

778 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

779 Included for unit tests. Uses the task config value if not set. 

780 badFlags : `list` of `str`, optional 

781 Do not create new diaObjects for any diaSource with any of these 

782 flags set. 

783 Included for unit tests. Uses the task config value if not set. 

784 

785 Returns 

786 ------- 

787 results : `lsst.pipe.base.Struct` 

788 Results struct containing: 

789 

790 - goodSources : `pandas.DataFrame` 

791 Subset of the input sources that pass all checks. 

792 - badSources : `pandas.DataFrame` 

793 Subset of the input sources that fail any check. 

794 """ 

795 if snrThreshold is None: 

796 snrThreshold = self.config.newObjectSnrThreshold 

797 if lowReliabilitySnrThreshold is None: 

798 lowReliabilitySnrThreshold = self.config.newObjectLowReliabilitySnrThreshold 

799 if reliabilityThreshold is None: 

800 reliabilityThreshold = self.config.newObjectReliabilityThreshold 

801 if lowSnrReliabilityThreshold is None: 

802 lowSnrReliabilityThreshold = self.config.newObjectLowSnrReliabilityThreshold 

803 if badFlags is None: 

804 badFlags = self.config.newObjectBadFlags 

805 flagged = sources[badFlags].fillna(False).any(axis=1) 

806 fluxField = self.config.newObjectFluxField 

807 fluxErrField = fluxField + "Err" 

808 signalToNoise = np.abs(np.array(sources[fluxField]/sources[fluxErrField])) 

809 reliability = np.array(sources['reliability']) 

810 nFlagged = np.count_nonzero(flagged) 

811 if nFlagged > 0: 

812 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to flags", nFlagged) 

813 

814 if snrThreshold > 0: 

815 snr_flag = signalToNoise < snrThreshold 

816 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

817 " signal to noise < %f", 

818 np.sum(snr_flag), fluxField, snrThreshold) 

819 flagged += snr_flag 

820 if reliabilityThreshold > 0: 

821 reliability_flag = reliability < reliabilityThreshold 

822 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to " 

823 "reliability<%f", 

824 np.sum(reliability_flag), reliabilityThreshold) 

825 flagged += reliability_flag 

826 if min(lowReliabilitySnrThreshold, lowSnrReliabilityThreshold) > 0: 

827 # Only run the combined test if both thresholds are greater than zero 

828 lowSnrReliability_flag = ((signalToNoise < lowReliabilitySnrThreshold) 

829 & (reliability < lowSnrReliabilityThreshold)) 

830 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

831 " signal to noise < %f combined with reliability< %f", 

832 np.sum(lowSnrReliability_flag), 

833 fluxField, 

834 lowReliabilitySnrThreshold, 

835 lowSnrReliabilityThreshold) 

836 flagged += lowSnrReliability_flag 

837 

838 if np.count_nonzero(~flagged) > 0: 

839 goodSources = sources[~flagged].copy(deep=True) 

840 else: 

841 goodSources = make_empty_catalog(self.schema, tableName="DiaSource") 

842 if np.count_nonzero(flagged) > 0: 

843 badSources = sources[flagged].copy(deep=True) 

844 else: 

845 badSources = make_empty_catalog(self.schema, tableName="DiaSource") 

846 return pipeBase.Struct(goodSources=goodSources, 

847 badSources=badSources 

848 ) 

849 

850 @timeMethod 

851 def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects): 

852 """Associate DiaSources with DiaObjects. 

853 

854 Associate new DiaSources with existing DiaObjects. Create new 

855 DiaObjects fron unassociated DiaSources. Index DiaSource catalogue 

856 after associations. Append new DiaObjects and DiaSources to their 

857 previous history. Test for DiaSource and DiaObject duplications. 

858 Compute DiaObject Summary statistics from their full DiaSource 

859 history. Test for duplication in the updated DiaObjects. 

860 

861 Parameters 

862 ---------- 

863 diaSourceTable : `pandas.DataFrame` 

864 Newly detected DiaSources. 

865 solarSystemObjectTable : `astropy.table.Table` 

866 Preloaded Solar System objects expected to be visible in the image. 

867 diffIm : `lsst.afw.image.ExposureF` 

868 Difference image exposure in which the sources in ``diaSourceCat`` 

869 were detected. 

870 diaObjects : `pandas.DataFrame` 

871 Table of DiaObjects from preloaded DiaObjects. 

872 

873 Returns 

874 ------- 

875 results : `lsst.pipe.base.Struct` 

876 Results struct containing: 

877 

878 - associatedDiaSources : `pandas.DataFrame` 

879 Associated DiaSources with DiaObjects. 

880 - newDiaObjects : `pandas.DataFrame` 

881 Table of new DiaObjects after association. 

882 - associatedSsSources : `astropy.table.Table` 

883 Table of new ssSources after association. 

884 - unassociatedSsObjects : `astropy.table.Table` 

885 Table of expected ssSources that were not associated with a 

886 diaSource. 

887 - newDiaSources : `pandas.DataFrame` 

888 Subset of `associatedDiaSources` consisting of only the 

889 unassociated diaSources that were added as new diaObjects. 

890 - marginalDiaSources : `pandas.DataFrame` 

891 Unassociated diaSources with marginal detections, which were 

892 removed from `associatedDiaSources` and were not added as new 

893 diaObjects. 

894 """ 

895 # Associate new DiaSources with existing DiaObjects. 

896 assocResults = self.associator.run(diaSourceTable, diaObjects) 

897 # TODO: work out how to mark asteroid-diaObj overlaps as contaminated 

898 toAssociate = [] 

899 if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None: 

900 ssoAssocResult = self.solarSystemAssociator.run( 

901 Table.from_pandas(assocResults.unAssocDiaSources), 

902 solarSystemObjectTable, 

903 diffIm.visitInfo, 

904 diffIm.getBBox(), 

905 diffIm.wcs 

906 ) 

907 # Create new DiaObjects from unassociated diaSources. 

908 if len(ssoAssocResult.unAssocDiaSources) > 0: 

909 # If the table is empty then converting time fields to pandas 

910 # will raise an error. Pass in an empty Dataframe in that case. 

911 createResults = self.createNewDiaObjects(ssoAssocResult.unAssocDiaSources.to_pandas()) 

912 else: 

913 createResults = self.createNewDiaObjects(pd.DataFrame()) 

914 if len(ssoAssocResult.ssoAssocDiaSources) > 0: 

915 toAssociate.append(ssoAssocResult.ssoAssocDiaSources.to_pandas()) 

916 nTotalSsObjects = ssoAssocResult.nTotalSsObjects 

917 nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects 

918 associatedSsSources = ssoAssocResult.associatedSsSources 

919 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects 

920 

921 else: 

922 # Create new DiaObjects from unassociated diaSources. 

923 createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources) 

924 nTotalSsObjects = 0 

925 nAssociatedSsObjects = 0 

926 associatedSsSources = None 

927 unassociatedSsObjects = None 

928 if not assocResults.matchedDiaSources.empty: 

929 toAssociate.append(assocResults.matchedDiaSources) 

930 if not createResults.diaSources.empty: 

931 toAssociate.append(createResults.diaSources) 

932 if len(toAssociate) == 0: 

933 associatedDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

934 else: 

935 associatedDiaSources = pd.concat(toAssociate) 

936 

937 self._add_association_meta_data(assocResults.nUpdatedDiaObjects, 

938 assocResults.nUnassociatedDiaObjects, 

939 createResults.nNewDiaObjects, 

940 nTotalSsObjects, 

941 nAssociatedSsObjects) 

942 self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects" 

943 " and dropping %i marginal diaSources.", 

944 assocResults.nUpdatedDiaObjects, 

945 assocResults.nUnassociatedDiaObjects, 

946 createResults.nNewDiaObjects, 

947 len(createResults.marginalDiaSources), 

948 ) 

949 if createResults.nNewDiaObjects > self.config.maxNewDiaObjects > 0: 

950 raise TooManyDiaObjectsError(nNewDiaObjects=createResults.nNewDiaObjects, 

951 threshold=self.config.maxNewDiaObjects) 

952 return pipeBase.Struct(associatedDiaSources=associatedDiaSources, 

953 newDiaObjects=createResults.newDiaObjects, 

954 associatedSsSources=associatedSsSources, 

955 unassociatedSsObjects=unassociatedSsObjects, 

956 newDiaSources=createResults.diaSources, 

957 marginalDiaSources=createResults.marginalDiaSources 

958 ) 

959 

960 def standardizeDataFrame(self, dataFrame, tableName, nullColumns=None): 

961 """Convert a catalog to SDM schema format with NULL ID values 

962 

963 Parameters 

964 ---------- 

965 dataFrame : `pandas.DataFrame` 

966 The catalog to standardize/convert 

967 tableName : `str` 

968 Schema name of table to which this dataFrame should be standardized 

969 nullColumns : `list` of `str`, optional 

970 List of column names to check for default values of 0 that should be 

971 replaced with `NULL`. 

972 

973 

974 Returns 

975 ------- 

976 standardizedAssociatedDiaSources : pandas.DataFrame 

977 The standardized DiaSource catalog 

978 """ 

979 standardizedDataFrame = convertDataFrameToSdmSchema(self.schema, 

980 dataFrame, 

981 tableName=tableName, 

982 skipIndex=True) 

983 

984 def _setNullColumn(dataframe, colName): 

985 """Set specified columns with default values of 0 to NULL.""" 

986 dataframe.loc[dataframe[colName] == 0, colName] = pd.NA 

987 

988 if nullColumns is not None: 

989 for colName in nullColumns: 

990 _setNullColumn(standardizedDataFrame, colName) 

991 return standardizedDataFrame 

992 

993 def standardizeTable(self, table, tableName, nullColumns=None): 

994 """Convert a catalog to SDM schema format with NULL ID values 

995 

996 Parameters 

997 ---------- 

998 table : `astropy.table.Table` 

999 The catalog to standardize/convert 

1000 tableName : `str` 

1001 Schema name of table to which this dataFrame should be standardized 

1002 nullColumns : `list` of `str`, optional 

1003 List of column names to check for default values of 0 that should be 

1004 replaced with `NULL`. 

1005 

1006 

1007 Returns 

1008 ------- 

1009 standardizedAssociatedDiaSources : pandas.DataFrame 

1010 The standardized DiaSource catalog 

1011 """ 

1012 dataFrame = table.to_pandas() 

1013 standardizedDataFrame = self.standardizeDataFrame(dataFrame, tableName, nullColumns=nullColumns) 

1014 return standardizedDataFrame 

1015 

1016 @timeMethod 

1017 def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects, 

1018 diffIm): 

1019 """Merge the associated diaSource and diaObjects to their previous history. 

1020 

1021 Parameters 

1022 ---------- 

1023 preloadedDiaSources : `pandas.DataFrame` 

1024 Previously detected DiaSources, loaded from the APDB. 

1025 associatedDiaSources : `pandas.DataFrame` 

1026 Associated DiaSources with DiaObjects. 

1027 diaObjects : `pandas.DataFrame` 

1028 Table of DiaObjects from preloaded DiaObjects. 

1029 newDiaObjects : `pandas.DataFrame` 

1030 Table of new DiaObjects after association. 

1031 

1032 Raises 

1033 ------ 

1034 RuntimeError 

1035 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

1036 

1037 Returns 

1038 ------- 

1039 mergedDiaSourceHistory : `pandas.DataFrame` 

1040 The combined catalog, with all of the rows from preloadedDiaSources 

1041 catalog ordered before the rows of associatedDiaSources catalog. 

1042 mergedDiaObjects : `pandas.DataFrame` 

1043 Table of new DiaObjects merged with their history. 

1044 updatedDiaObjectIds : `numpy.Array` 

1045 Object Id's from associated diaSources. 

1046 """ 

1047 # Index the DiaSource catalog for this visit after all associations 

1048 # have been made. 

1049 updatedDiaObjectIds = associatedDiaSources["diaObjectId"][ 

1050 associatedDiaSources["diaObjectId"] != 0].to_numpy() 

1051 associatedDiaSources.set_index(["diaObjectId", 

1052 "band", 

1053 "diaSourceId"], 

1054 drop=False, 

1055 inplace=True) 

1056 

1057 # Append new DiaObjects and DiaSources to their previous history. 

1058 if diaObjects.empty: 

1059 mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False) 

1060 elif not newDiaObjects.empty: 

1061 mergedDiaObjects = pd.concat( 

1062 [diaObjects, 

1063 newDiaObjects.set_index("diaObjectId", drop=False)], 

1064 sort=True) 

1065 else: 

1066 mergedDiaObjects = diaObjects 

1067 

1068 # Exclude any objects that are off the image after association. 

1069 mergedDiaObjects, updatedDiaObjectIds = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), 

1070 mergedDiaObjects, 

1071 diaObjectIds=updatedDiaObjectIds, 

1072 buffer=-1) 

1073 if self.testDataFrameIndex(mergedDiaObjects): 

1074 raise RuntimeError( 

1075 "Duplicate DiaObjects created after association. This is " 

1076 "likely due to re-running data with an already populated " 

1077 "Apdb. If this was not the case then there was an unexpected " 

1078 "failure in Association while matching and creating new " 

1079 "DiaObjects and should be reported. Exiting.") 

1080 

1081 mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources, 

1082 tableName="DiaSource") 

1083 

1084 # Test for DiaSource duplication first. If duplicates are found, 

1085 # this likely means this is duplicate data being processed and sent 

1086 # to the Apdb. 

1087 if self.testDataFrameIndex(mergedDiaSourceHistory): 

1088 raise RuntimeError( 

1089 "Duplicate DiaSources found after association and merging " 

1090 "with history. This is likely due to re-running data with an " 

1091 "already populated Apdb. If this was not the case then there " 

1092 "was an unexpected failure in Association while matching " 

1093 "sources to objects, and should be reported. Exiting.") 

1094 # Finally, update the diaObject table with the number of associated diaSources 

1095 mergedUpdatedDiaObjects = self.updateObjectTable(mergedDiaObjects, mergedDiaSourceHistory) 

1096 return (mergedDiaSourceHistory, mergedUpdatedDiaObjects, updatedDiaObjectIds) 

1097 

1098 @timeMethod 

1099 def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator): 

1100 """Forced Source Measurement 

1101 

1102 Forced photometry on the difference and calibrated exposures using the 

1103 new and updated DiaObject locations. 

1104 

1105 Parameters 

1106 ---------- 

1107 diaObjects : `pandas.DataFrame` 

1108 Catalog of DiaObjects. 

1109 updatedDiaObjects : `pandas.DataFrame` 

1110 Catalog of updated DiaObjects. 

1111 exposure : `lsst.afw.image.ExposureF` 

1112 Calibrated exposure differenced with a template to create 

1113 ``diffIm``. 

1114 diffIm : `lsst.afw.image.ExposureF` 

1115 Difference image exposure in which the sources in ``diaSourceCat`` 

1116 were detected. 

1117 idGenerator : `lsst.meas.base.IdGenerator` 

1118 Object that generates source IDs and random number generator seeds. 

1119 

1120 Returns 

1121 ------- 

1122 diaForcedSources : `pandas.DataFrame` 

1123 Catalog of calibrated forced photometered fluxes on both the 

1124 difference and direct images at DiaObject locations. 

1125 """ 

1126 # Force photometer on the Difference and Calibrated exposures using 

1127 # the new and updated DiaObject locations. 

1128 diaForcedSources = self.diaForcedSource.run( 

1129 diaObjects, 

1130 updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(), 

1131 exposure, 

1132 diffIm, 

1133 idGenerator=idGenerator) 

1134 self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB") 

1135 diaForcedSources = convertDataFrameToSdmSchema(self.schema, diaForcedSources, 

1136 tableName="DiaForcedSource", skipIndex=True) 

1137 return diaForcedSources 

1138 

1139 @timeMethod 

1140 def loadRefreshedDiaObjects(self, region, preloadedDiaObjects): 

1141 """Load DiaObjects from the Apdb based on their HTM location. 

1142 

1143 Parameters 

1144 ---------- 

1145 region : `sphgeom.Region` 

1146 Region containing the current exposure to load diaObjects from the 

1147 APDB. 

1148 preloadedDiaObjects : `pandas.DataFrame` 

1149 Previously detected DiaObjects, loaded from the APDB. 

1150 

1151 Returns 

1152 ------- 

1153 diaObjects : `pandas.DataFrame` 

1154 DiaObjects loaded from the Apdb that are within the area defined 

1155 by ``pixelRanges``. 

1156 """ 

1157 angleMargin = lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.) 

1158 diaObjects = self.apdb.getDiaObjects(paddedRegion(region, angleMargin)) 

1159 

1160 diaObjects.set_index("diaObjectId", drop=False, inplace=True) 

1161 if diaObjects.index.has_duplicates: 

1162 self.log.warning( 

1163 "Duplicate DiaObjects loaded from the Apdb. This may cause " 

1164 "downstream pipeline issues. Dropping duplicated rows") 

1165 # Drop duplicates via index and keep the first appearance. 

1166 diaObjects = diaObjects.groupby(diaObjects.index).first() 

1167 self.log.info("Loaded %d DiaObjects", len(diaObjects)) 

1168 refreshedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName="DiaObject", 

1169 skipIndex=True) 

1170 

1171 refreshedIsInPreloaded = refreshedDiaObjects.index.isin(preloadedDiaObjects.index) 

1172 preloadedIsInRefreshed = preloadedDiaObjects.index.isin(refreshedDiaObjects.index) 

1173 nUniqueRefreshed = (~refreshedIsInPreloaded).sum() 

1174 nUniquePreloaded = (~preloadedIsInRefreshed).sum() 

1175 if nUniqueRefreshed > 0: 

1176 self.log.info("Reloading the diaObject table during association yielded " 

1177 "an additional %d objects over the %d preloaded diaObjects.", 

1178 nUniqueRefreshed, len(preloadedDiaObjects)) 

1179 if nUniquePreloaded == 0: 

1180 return refreshedDiaObjects 

1181 elif nUniqueRefreshed == 0: 

1182 return preloadedDiaObjects 

1183 else: 

1184 # We can get "preloaded" diaObjects that don't appear in the APDB with 

1185 # the CI datasets in ap_verify, since those are stored as a dataset and 

1186 # an empty APDB is created. In that case, combine the two catalogs. 

1187 # Precedence is given to the refreshed diaObject catalog. 

1188 return pd.concat([refreshedDiaObjects, preloadedDiaObjects.loc[~preloadedIsInRefreshed]]) 

1189 

1190 @timeMethod 

1191 def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources): 

1192 """Write to the Alert Production Database (Apdb). 

1193 

1194 Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1195 Alert Production Database (Apdb). 

1196 

1197 Parameters 

1198 ---------- 

1199 updatedDiaObjects : `pandas.DataFrame` 

1200 Catalog of updated DiaObjects. 

1201 associatedDiaSources : `pandas.DataFrame` 

1202 Associated DiaSources with DiaObjects. 

1203 diaForcedSources : `pandas.DataFrame` 

1204 Catalog of calibrated forced photometered fluxes on both the 

1205 difference and direct images at DiaObject locations. 

1206 

1207 Returns 

1208 ------- 

1209 validityStart : `lsst.daf.base.DateTime` 

1210 Time at which the APDB was updated. 

1211 """ 

1212 # Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1213 # Apdb. 

1214 # Drop empty columns that are nullable in the APDB. 

1215 diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject") 

1216 diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource") 

1217 diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource") 

1218 

1219 validityStart = DateTime.now() 

1220 self.apdb.store( 

1221 validityStart.toAstropy(), 

1222 diaObjectStore, 

1223 diaSourceStore, 

1224 diaForcedSourceStore) 

1225 self.log.info("APDB updated.") 

1226 

1227 return validityStart 

1228 

1229 def testDataFrameIndex(self, df): 

1230 """Test the sorted DataFrame index for duplicates. 

1231 

1232 Wrapped as a separate function to allow for mocking of the this task 

1233 in unittesting. Default of a mock return for this test is True. 

1234 

1235 Parameters 

1236 ---------- 

1237 df : `pandas.DataFrame` 

1238 DataFrame to text. 

1239 

1240 Returns 

1241 ------- 

1242 `bool` 

1243 True if DataFrame contains duplicate rows. 

1244 """ 

1245 return df.index.has_duplicates 

1246 

1247 def _add_association_meta_data(self, 

1248 nUpdatedDiaObjects, 

1249 nUnassociatedDiaObjects, 

1250 nNewDiaObjects, 

1251 nTotalSsObjects, 

1252 nAssociatedSsObjects): 

1253 """Store summaries of the association step in the task metadata. 

1254 

1255 Parameters 

1256 ---------- 

1257 nUpdatedDiaObjects : `int` 

1258 Number of previous DiaObjects associated and updated in this 

1259 ccdVisit. 

1260 nUnassociatedDiaObjects : `int` 

1261 Number of previous DiaObjects that were not associated or updated 

1262 in this ccdVisit. 

1263 nNewDiaObjects : `int` 

1264 Number of newly created DiaObjects for this ccdVisit. 

1265 nTotalSsObjects : `int` 

1266 Number of SolarSystemObjects within the observable detector 

1267 area. 

1268 nAssociatedSsObjects : `int` 

1269 Number of successfully associated SolarSystemObjects. 

1270 """ 

1271 self.metadata['numUpdatedDiaObjects'] = nUpdatedDiaObjects 

1272 self.metadata['numUnassociatedDiaObjects'] = nUnassociatedDiaObjects 

1273 self.metadata['numNewDiaObjects'] = nNewDiaObjects 

1274 self.metadata['numTotalSolarSystemObjects'] = nTotalSsObjects 

1275 self.metadata['numAssociatedSsObjects'] = nAssociatedSsObjects 

1276 

1277 def purgeDiaObjects(self, bbox, wcs, diaObjCat, diaObjectIds=None, buffer=0): 

1278 """Drop diaObjects that are outside the exposure bounding box. 

1279 

1280 Parameters 

1281 ---------- 

1282 bbox : `lsst.geom.Box2I` 

1283 Bounding box of the exposure. 

1284 wcs : `lsst.afw.geom.SkyWcs` 

1285 Coordinate system definition (wcs) for the exposure. 

1286 diaObjCat : `pandas.DataFrame` 

1287 DiaObjects loaded from the Apdb. 

1288 buffer : `int`, optional 

1289 Width, in pixels, to pad the exposure bounding box. 

1290 

1291 Returns 

1292 ------- 

1293 diaObjCat : `pandas.DataFrame` 

1294 DiaObjects loaded from the Apdb, restricted to the exposure 

1295 bounding box. 

1296 """ 

1297 try: 

1298 bbox.grow(buffer) 

1299 raVals = diaObjCat.ra.to_numpy() 

1300 decVals = diaObjCat.dec.to_numpy() 

1301 xVals, yVals = wcs.skyToPixelArray(raVals, decVals, degrees=True) 

1302 selector = bbox.contains(xVals, yVals) 

1303 nPurged = np.sum(~selector) 

1304 if nPurged > 0: 

1305 if diaObjectIds is not None: 

1306 # We also need to drop any of the associated IDs if this runs after association 

1307 purgedIds = diaObjCat[~selector].diaObjectId 

1308 diaObjectIds = diaObjectIds[~np.isin(diaObjectIds, purgedIds)] 

1309 self.log.info("Dropped %i diaObjects that were outside the bbox " 

1310 "after association, leaving %i in the catalog", 

1311 nPurged, len(diaObjCat) - nPurged) 

1312 else: 

1313 self.log.info("Dropped %i diaObjects that were outside the padded bbox " 

1314 "before association, leaving %i in the catalog", 

1315 nPurged, len(diaObjCat) - nPurged) 

1316 diaObjCat = diaObjCat[selector].copy() 

1317 except Exception as e: 

1318 self.log.warning("Error attempting to check diaObject history: %s", e) 

1319 return diaObjCat, diaObjectIds 

1320 

1321 def mergeCatalogs(self, originalCatalog, newCatalog, tableName): 

1322 """Combine two catalogs, ensuring that the new catalog conforms to the schema. 

1323 

1324 Parameters 

1325 ---------- 

1326 originalCatalog : `pandas.DataFrame` 

1327 The original catalog to be added to. 

1328 newCatalog : `pandas.DataFrame` 

1329 The new catalog to append to `originalCatalog` 

1330 tableName : `str` 

1331 Name of the table in the schema to use. 

1332 

1333 Returns 

1334 ------- 

1335 mergedCatalog : `pandas.DataFrame` 

1336 The combined catalog, with all of the rows from ``originalCatalog`` 

1337 ordered before the rows of ``newCatalog`` 

1338 """ 

1339 if len(newCatalog) > 0: 

1340 catalog = convertDataFrameToSdmSchema(self.schema, newCatalog, 

1341 tableName=tableName, skipIndex=True) 

1342 

1343 mergedCatalog = pd.concat([originalCatalog, catalog], sort=True) 

1344 else: 

1345 mergedCatalog = pd.concat([originalCatalog], sort=True) 

1346 return mergedCatalog.loc[:, originalCatalog.columns] 

1347 

1348 def updateObjectTable(self, diaObjects, diaSources): 

1349 """Update the diaObject table with the new diaSource records. 

1350 

1351 Parameters 

1352 ---------- 

1353 diaObjects : `pandas.DataFrame` 

1354 Table of new DiaObjects merged with their history. 

1355 diaSources : `pandas.DataFrame` 

1356 The combined preloaded and associated diaSource catalog. 

1357 

1358 Returns 

1359 ------- 

1360 updatedDiaObjects : `pandas.DataFrame` 

1361 Table of DiaObjects updated with the number of associated DiaSources 

1362 """ 

1363 nDiaSources = diaSources[["diaSourceId"]].groupby("diaObjectId").agg(len) 

1364 nDiaSources.rename({"diaSourceId": "nDiaSources"}, errors="raise", axis="columns", inplace=True) 

1365 del diaObjects["nDiaSources"] 

1366 updatedDiaObjects = diaObjects.join(nDiaSources, how="left") 

1367 return updatedDiaObjects 

1368 

1369 @staticmethod 

1370 def checkTableIndex(dataFrame, index): 

1371 if dataFrame.index.name is None: 

1372 # The expected index may or may not be set, depending on whether 

1373 # the table was written originally as a DataFrame or something else 

1374 # Parquet-friendly. 

1375 dataFrame.set_index(index, drop=False, inplace=True)