Coverage for python / lsst / ap / association / diaPipe.py: 17%

458 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:39 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008-2016 AURA/LSST. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23"""PipelineTask for associating DiaSources with previous DiaObjects. 

24 

25Additionally performs forced photometry on the calibrated and difference 

26images at the updated locations of DiaObjects. 

27""" 

28 

29__all__ = ("DiaPipelineConfig", 

30 "DiaPipelineTask", 

31 "DiaPipelineConnections") 

32 

33 

34import lsst.dax.apdb as daxApdb 

35import lsst.pex.config as pexConfig 

36import lsst.pipe.base as pipeBase 

37import lsst.pipe.base.connectionTypes as connTypes 

38import lsst.sphgeom 

39 

40from astropy.table import Table 

41import numpy as np 

42import pandas as pd 

43from lsst.ap.association import ( 

44 AssociationTask, 

45 DiaForcedSourceTask, 

46 PackageAlertsTask) 

47 

48from lsst.ap.association.utils import makeEmptyForcedSourceTable, getRegion, paddedRegion, readSchemaFromApdb 

49from lsst.daf.base import DateTime 

50from lsst.meas.base import DetectorVisitIdGeneratorConfig, \ 

51 DiaObjectCalculationTask 

52from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, checkSdmSchemaColumns, \ 

53 dropEmptyColumns, make_empty_catalog 

54from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask 

55from lsst.utils.timer import timeMethod, duration_from_timeMethod 

56 

57 

58class TooManyDiaObjectsError(pipeBase.AlgorithmError): 

59 """Raised if there are an unusually large number of unassociated DiaSources. 

60 This is usually indicative of an image subtraction error, and needs to be 

61 caught before updating the APDB with a large number of spurious entries. 

62 """ 

63 def __init__(self, *, nNewDiaObjects, threshold): 

64 msg = ("Aborting processing before writing to the APDB." 

65 f" {nNewDiaObjects} new DiaObjects would be created, which exceeds the" 

66 f" configured maximum of {threshold}") 

67 super().__init__(msg) 

68 self.nNewDiaObjects = nNewDiaObjects 

69 self.threshold = threshold 

70 

71 @property 

72 def metadata(self): 

73 return {"nNewDiaObjects": self.nNewDiaObjects, 

74 "threshold": self.threshold 

75 } 

76 

77 

78class PostApdbUpdateError(pipeBase.AlgorithmError): 

79 """Raised for any error that occurs after the APDB has been updated. 

80 This allows partial outputs to be written, and signals that processing 

81 can't be retried. 

82 """ 

83 def __init__(self, *, errorMsg): 

84 msg = ("Aborting processing after writing to the APDB. " 

85 "This image cannot be retried! " 

86 "The original error was:\n" 

87 f"{errorMsg}") 

88 super().__init__(msg) 

89 

90 @property 

91 def metadata(self): 

92 return {} 

93 

94 

95class DiaPipelineConnections( 

96 pipeBase.PipelineTaskConnections, 

97 dimensions=("instrument", "visit", "detector"), 

98 defaultTemplates={"coaddName": "deep", "fakesType": ""}): 

99 """Butler connections for DiaPipelineTask. 

100 """ 

101 diaSourceTable = connTypes.Input( 

102 doc="Catalog of calibrated DiaSources.", 

103 name="{fakesType}{coaddName}Diff_diaSrcTable", 

104 storageClass="DataFrame", 

105 dimensions=("instrument", "visit", "detector"), 

106 ) 

107 solarSystemObjectTable = connTypes.Input( 

108 doc="Catalog of SolarSolarSystem objects expected to be observable in " 

109 "this detectorVisit.", 

110 name="preloaded_SsObjects", 

111 storageClass="ArrowAstropy", 

112 dimensions=("instrument", "group", "detector"), 

113 minimum=0, 

114 ) 

115 diffIm = connTypes.Input( 

116 doc="Difference image on which the DiaSources were detected.", 

117 name="{fakesType}{coaddName}Diff_differenceExp", 

118 storageClass="ExposureF", 

119 dimensions=("instrument", "visit", "detector"), 

120 ) 

121 exposure = connTypes.Input( 

122 doc="Calibrated exposure differenced with a template image during " 

123 "image differencing.", 

124 name="{fakesType}calexp", 

125 storageClass="ExposureF", 

126 dimensions=("instrument", "visit", "detector"), 

127 ) 

128 template = connTypes.Input( 

129 doc="Warped template used to create `subtractedExposure`. Not PSF " 

130 "matched.", 

131 dimensions=("instrument", "visit", "detector"), 

132 storageClass="ExposureF", 

133 name="{fakesType}{coaddName}Diff_templateExp", 

134 ) 

135 preloadedDiaObjects = connTypes.Input( 

136 doc="DiaObjects preloaded from the APDB.", 

137 name="preloaded_diaObjects", 

138 storageClass="DataFrame", 

139 dimensions=("instrument", "group", "detector"), 

140 ) 

141 preloadedDiaSources = connTypes.Input( 

142 doc="DiaSources preloaded from the APDB.", 

143 name="preloaded_diaSources", 

144 storageClass="DataFrame", 

145 dimensions=("instrument", "group", "detector"), 

146 ) 

147 preloadedDiaForcedSources = connTypes.Input( 

148 doc="DiaForcedSources preloaded from the APDB.", 

149 name="preloaded_diaForcedSources", 

150 storageClass="DataFrame", 

151 dimensions=("instrument", "group", "detector"), 

152 ) 

153 apdbMarker = connTypes.Output( 

154 doc="Marker dataset storing the configuration of the Apdb for each " 

155 "visit/detector. Used to signal the completion of the pipeline.", 

156 name="apdb_marker", 

157 storageClass="Config", 

158 dimensions=("instrument", "visit", "detector"), 

159 ) 

160 associatedDiaSources = connTypes.Output( 

161 doc="Optional output storing the DiaSource catalog after matching, " 

162 "calibration, and standardization for insertion into the Apdb.", 

163 name="{fakesType}{coaddName}Diff_assocDiaSrc", 

164 storageClass="ArrowAstropy", 

165 dimensions=("instrument", "visit", "detector"), 

166 ) 

167 associatedSsSources = connTypes.Output( 

168 doc="Optional output storing ssSource data computed during association.", 

169 name="{fakesType}{coaddName}Diff_associatedSsSources", 

170 storageClass="ArrowAstropy", 

171 dimensions=("instrument", "visit", "detector"), 

172 ) 

173 unassociatedSsObjects = connTypes.Output( 

174 doc="Expected locations of an ssObject with no source", 

175 name="ssUnassociatedObjects", 

176 storageClass="ArrowAstropy", 

177 dimensions=("instrument", "visit", "detector"), 

178 ) 

179 

180 diaForcedSources = connTypes.Output( 

181 doc="Optional output storing the forced sources computed at the diaObject positions.", 

182 name="{fakesType}{coaddName}Diff_diaForcedSrc", 

183 storageClass="ArrowAstropy", 

184 dimensions=("instrument", "visit", "detector"), 

185 ) 

186 diaObjects = connTypes.Output( 

187 doc="Optional output storing the updated diaObjects associated to these sources.", 

188 name="{fakesType}{coaddName}Diff_diaObject", 

189 storageClass="ArrowAstropy", 

190 dimensions=("instrument", "visit", "detector"), 

191 ) 

192 newDiaSources = connTypes.Output( 

193 doc="New diaSources not associated with an existing diaObject that" 

194 " were used to create a new diaObject", 

195 name="{fakesType}new_dia_source", 

196 storageClass="ArrowAstropy", 

197 dimensions=("instrument", "visit", "detector"), 

198 ) 

199 marginalDiaSources = connTypes.Output( 

200 doc="Low SNR diaSources not associated with an existing diaObject that" 

201 " were rejected instead of creating a new diaObject", 

202 name="{fakesType}marginal_new_dia_source", 

203 storageClass="ArrowAstropy", 

204 dimensions=("instrument", "visit", "detector"), 

205 ) 

206 

207 def __init__(self, *, config=None): 

208 super().__init__(config=config) 

209 

210 if not config.doWriteAssociatedSources: 

211 self.outputs.remove("associatedDiaSources") 

212 self.outputs.remove("diaForcedSources") 

213 self.outputs.remove("diaObjects") 

214 self.outputs.remove("newDiaSources") 

215 self.outputs.remove("marginalDiaSources") 

216 else: 

217 if not config.doRunForcedMeasurement: 

218 self.outputs.remove("diaForcedSources") 

219 if not config.filterUnAssociatedSources: 

220 self.outputs.remove("newDiaSources") 

221 self.outputs.remove("marginalDiaSources") 

222 if not config.doSolarSystemAssociation: 

223 self.inputs.remove("solarSystemObjectTable") 

224 if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation): 

225 self.outputs.remove("associatedSsSources") 

226 self.outputs.remove("unassociatedSsObjects") 

227 

228 def adjustQuantum(self, inputs, outputs, label, dataId): 

229 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects 

230 in the `lsst.daf.butler.core.Quantum` during the graph generation stage 

231 of the activator. 

232 

233 This implementation checks to make sure that the filters in the dataset 

234 are compatible with AP processing as set by the Apdb/DPDD schema. 

235 

236 Parameters 

237 ---------- 

238 inputs : `dict` 

239 Dictionary whose keys are an input (regular or prerequisite) 

240 connection name and whose values are a tuple of the connection 

241 instance and a collection of associated `DatasetRef` objects. 

242 The exact type of the nested collections is unspecified; it can be 

243 assumed to be multi-pass iterable and support `len` and ``in``, but 

244 it should not be mutated in place. In contrast, the outer 

245 dictionaries are guaranteed to be temporary copies that are true 

246 `dict` instances, and hence may be modified and even returned; this 

247 is especially useful for delegating to `super` (see notes below). 

248 outputs : `dict` 

249 Dict of output datasets, with the same structure as ``inputs``. 

250 label : `str` 

251 Label for this task in the pipeline (should be used in all 

252 diagnostic messages). 

253 data_id : `lsst.daf.butler.DataCoordinate` 

254 Data ID for this quantum in the pipeline (should be used in all 

255 diagnostic messages). 

256 

257 Returns 

258 ------- 

259 adjusted_inputs : `dict` 

260 Dict of the same form as ``inputs`` with updated containers of 

261 input `DatasetRef` objects. Connections that are not changed 

262 should not be returned at all. Datasets may only be removed, not 

263 added. Nested collections may be of any multi-pass iterable type, 

264 and the order of iteration will set the order of iteration within 

265 `PipelineTask.runQuantum`. 

266 adjusted_outputs : `dict` 

267 Dict of updated output datasets, with the same structure and 

268 interpretation as ``adjusted_inputs``. 

269 

270 Raises 

271 ------ 

272 ScalarError 

273 Raised if any `Input` or `PrerequisiteInput` connection has 

274 ``multiple`` set to `False`, but multiple datasets. 

275 NoWorkFound 

276 Raised to indicate that this quantum should not be run; not enough 

277 datasets were found for a regular `Input` connection, and the 

278 quantum should be pruned or skipped. 

279 FileNotFoundError 

280 Raised to cause QuantumGraph generation to fail (with the message 

281 included in this exception); not enough datasets were found for a 

282 `PrerequisiteInput` connection. 

283 """ 

284 _, refs = inputs["diffIm"] 

285 for ref in refs: 

286 if ref.dataId["band"] not in self.config.validBands: 

287 raise ValueError( 

288 f"Requested '{ref.dataId['band']}' not in " 

289 "DiaPipelineConfig.validBands. To process bands not in " 

290 "the standard Rubin set (ugrizy) you must add the band to " 

291 "the validBands list in DiaPipelineConfig and add the " 

292 "appropriate columns to the Apdb schema.") 

293 return super().adjustQuantum(inputs, outputs, label, dataId) 

294 

295 

296class DiaPipelineConfig(pipeBase.PipelineTaskConfig, 

297 pipelineConnections=DiaPipelineConnections): 

298 """Config for DiaPipelineTask. 

299 """ 

300 coaddName = pexConfig.Field( 

301 doc="coadd name: typically one of deep, goodSeeing, or dcr", 

302 dtype=str, 

303 default="deep", 

304 ) 

305 apdb_config_url = pexConfig.Field( 

306 dtype=str, 

307 default=None, 

308 optional=False, 

309 doc="A config file specifying the APDB and its connection parameters, " 

310 "typically written by the apdb-cli command-line utility. " 

311 "The database must already be initialized.", 

312 ) 

313 validBands = pexConfig.ListField( 

314 dtype=str, 

315 default=["u", "g", "r", "i", "z", "y"], 

316 doc="List of bands that are valid for AP processing. To process a " 

317 "band not on this list, the appropriate band specific columns " 

318 "must be added to the Apdb schema in dax_apdb.", 

319 ) 

320 associator = pexConfig.ConfigurableField( 

321 target=AssociationTask, 

322 doc="Task used to associate DiaSources with DiaObjects.", 

323 ) 

324 doSolarSystemAssociation = pexConfig.Field( 

325 dtype=bool, 

326 default=True, 

327 doc="Process SolarSystem objects through the pipeline.", 

328 ) 

329 solarSystemAssociator = pexConfig.ConfigurableField( 

330 target=SolarSystemAssociationTask, 

331 doc="Task used to associate DiaSources with SolarSystemObjects.", 

332 ) 

333 diaCalculation = pexConfig.ConfigurableField( 

334 target=DiaObjectCalculationTask, 

335 doc="Task to compute summary statistics for DiaObjects.", 

336 ) 

337 doReloadDiaObjects = pexConfig.Field( 

338 dtype=bool, 

339 default=True, 

340 doc="Drop preloaded DiaObjects and reload them from the APDB?" 

341 "Used in production when the very latest objects from the APDB " 

342 "are needed.", 

343 ) 

344 angleMargin = pexConfig.RangeField( 

345 doc="Padding to add when loading diaObjects if `doReloadDiaObjects=True`", 

346 dtype=float, 

347 default=2, 

348 min=0, 

349 ) 

350 doRunForcedMeasurement = pexConfig.Field( 

351 dtype=bool, 

352 default=True, 

353 deprecated="Added to allow disabling forced sources for performance " 

354 "reasons during the ops rehearsal. " 

355 "It is expected to be removed.", 

356 doc="Run forced measurement on all of the diaObjects? " 

357 "This should only be turned off for debugging purposes.", 

358 ) 

359 diaForcedSource = pexConfig.ConfigurableField( 

360 target=DiaForcedSourceTask, 

361 doc="Task used for force photometer DiaObject locations in direct and " 

362 "difference images.", 

363 ) 

364 forcedReliabilityThreshold = pexConfig.Field( 

365 dtype=float, 

366 default=0.5, 

367 doc="Minimum reliability score of constituent diaSources to use when " 

368 "determining whether to calculate forced photometry for a " 

369 "diaObject.", 

370 ) 

371 forcedTrailLengthThreshold = pexConfig.Field( 

372 dtype=float, 

373 default=1.4, 

374 doc="Maximum trail length (arseconds) of constituent diaSources to use " 

375 "when determining whether to calculate forced photometry for a " 

376 "diaObject.", 

377 ) 

378 forcedBadFlags = pexConfig.ListField( 

379 dtype=str, 

380 default=["glint_trail", "isDipole"], 

381 doc="Flags of constituent diaSources to exclude when determining " 

382 "whether to calculate forced photometry for a diaObject.", 

383 ) 

384 alertPackager = pexConfig.ConfigurableField( 

385 target=PackageAlertsTask, 

386 doc="Subtask for packaging Ap data into alerts.", 

387 ) 

388 doPackageAlerts = pexConfig.Field( 

389 dtype=bool, 

390 default=False, 

391 doc="Package Dia-data into serialized alerts for distribution and " 

392 "write them to disk.", 

393 ) 

394 doWriteAssociatedSources = pexConfig.Field( 

395 dtype=bool, 

396 default=True, 

397 doc="Write out associated DiaSources, DiaForcedSources, and DiaObjects, " 

398 "formatted following the Science Data Model.", 

399 ) 

400 imagePixelMargin = pexConfig.RangeField( 

401 dtype=int, 

402 default=10, 

403 min=0, 

404 doc="Pad the image by this many pixels before removing off-image " 

405 "diaObjects for association.", 

406 ) 

407 filterUnAssociatedSources = pexConfig.Field( 

408 dtype=bool, 

409 default=True, 

410 doc="Check unassociated diaSources for quality before creating new ." 

411 "diaObjects. DiaSources that are associated with existing diaObjects " 

412 "will not be affected." 

413 ) 

414 newObjectBadFlags = pexConfig.ListField( 

415 dtype=str, 

416 default=("centroid_flag", 

417 "pixelFlags_crCenter", 

418 "pixelFlags_nodataCenter", 

419 "pixelFlags_interpolatedCenter", 

420 "pixelFlags_saturatedCenter", 

421 "pixelFlags_suspectCenter", 

422 "pixelFlags_streakCenter", 

423 "glint_trail"), 

424 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

425 "these flags set before creating new diaObjects.", 

426 ) 

427 maxNewDiaObjects = pexConfig.RangeField( 

428 dtype=float, 

429 default=0, 

430 min=0, 

431 doc="Maximum number of new DiaObjects to create before raising an error." 

432 "Set to zero to disable.", 

433 ) 

434 newObjectSnrThreshold = pexConfig.RangeField( 

435 dtype=float, 

436 default=0, 

437 min=0, 

438 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

439 "Abs(flux/fluxErr) less than this threshold before creating new " 

440 "diaObjects." 

441 "Set to zero to disable.", 

442 ) 

443 newObjectLowReliabilitySnrThreshold = pexConfig.RangeField( 

444 dtype=float, 

445 default=0, 

446 min=0, 

447 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

448 "signal-to-noise ratios less than this threshold if they have" 

449 " low reliability scores before creating new diaObjects." 

450 "Set to zero to disable.", 

451 ) 

452 newObjectReliabilityThreshold = pexConfig.RangeField( 

453 dtype=float, 

454 default=0.1, 

455 min=0, 

456 max=1, 

457 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

458 "reliability scores less than this threshold before creating new " 

459 "diaObjects." 

460 "Set to zero to disable.", 

461 ) 

462 newObjectLowSnrReliabilityThreshold = pexConfig.RangeField( 

463 dtype=float, 

464 default=0.1, 

465 min=0, 

466 max=1, 

467 doc="If `filterUnAssociatedSources` is set, exclude diaSources with " 

468 "low signal-to-noise and reliability scores below this threshold " 

469 "before creating new diaObjects." 

470 "Set to zero to disable.", 

471 ) 

472 newObjectFluxField = pexConfig.Field( 

473 dtype=str, 

474 default="apFlux", 

475 doc="Name of the flux field in the standardized diaSource catalog to " 

476 "use for checking the signal-to-noise before creating new diaObjects.", 

477 ) 

478 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

479 

480 def setDefaults(self): 

481 self.diaCalculation.plugins = ["ap_meanPosition", 

482 "ap_nDiaSources", 

483 "ap_meanFlux", 

484 "ap_sigmaFlux", 

485 "ap_minMaxFlux", 

486 "ap_maxSlopeFlux", 

487 "ap_meanErrFlux", 

488 "ap_meanTotFlux"] 

489 

490 

491class DiaPipelineTask(pipeBase.PipelineTask): 

492 """Task for loading, associating and storing Difference Image Analysis 

493 (DIA) Objects and Sources. 

494 """ 

495 ConfigClass = DiaPipelineConfig 

496 _DefaultName = "diaPipe" 

497 

498 def __init__(self, initInputs=None, **kwargs): 

499 super().__init__(**kwargs) 

500 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url) 

501 self.schema = readSchemaFromApdb(self.apdb) 

502 self.makeSubtask("associator") 

503 self.makeSubtask("diaCalculation") 

504 if self.config.doRunForcedMeasurement: 

505 self.makeSubtask("diaForcedSource") 

506 if self.config.doPackageAlerts: 

507 self.makeSubtask("alertPackager") 

508 if self.config.doSolarSystemAssociation: 

509 self.makeSubtask("solarSystemAssociator") 

510 if self.config.filterUnAssociatedSources: 

511 columns = [self.config.newObjectFluxField, 

512 self.config.newObjectFluxField + "Err", 

513 "reliability"] 

514 columns += self.config.newObjectBadFlags 

515 

516 missing = checkSdmSchemaColumns(self.schema, columns, "DiaSource") 

517 if missing: 

518 raise pipeBase.InvalidQuantumError("Field %s not in the DiaSource schema" % missing) 

519 

520 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

521 inputs = butlerQC.get(inputRefs) 

522 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId) 

523 inputs["band"] = butlerQC.quantum.dataId["band"] 

524 inputs["legacySolarSystemTable"] = None 

525 if not self.config.doSolarSystemAssociation: 

526 inputs["solarSystemObjectTable"] = None 

527 

528 associationResults = pipeBase.Struct( 

529 apdbMarker=None, 

530 associatedDiaSources=None, 

531 diaForcedSources=None, 

532 diaObjects=None, 

533 associatedSsSources=None, 

534 unassociatedSsObjects=None, 

535 newDiaSources=None, 

536 marginalDiaSources=None, 

537 ) 

538 try: 

539 self.run(**inputs, associationResults=associationResults) 

540 except pipeBase.AlgorithmError as e: 

541 error = pipeBase.AnnotatedPartialOutputsError.annotate( 

542 e, 

543 self, 

544 log=self.log 

545 ) 

546 butlerQC.put(associationResults, outputRefs) 

547 raise error from e 

548 butlerQC.put(associationResults, outputRefs) 

549 

550 @timeMethod 

551 def run(self, 

552 diaSourceTable, 

553 legacySolarSystemTable, 

554 diffIm, 

555 exposure, 

556 template, 

557 preloadedDiaObjects, 

558 preloadedDiaSources, 

559 preloadedDiaForcedSources, 

560 band, 

561 idGenerator, 

562 solarSystemObjectTable=None, 

563 associationResults=None): 

564 """Process DiaSources and DiaObjects. 

565 

566 Load previous DiaObjects and their DiaSource history. Calibrate the 

567 values in the diaSourceCat. Associate new DiaSources with previous 

568 DiaObjects. Run forced photometry at the updated DiaObject locations. 

569 Store the results in the Alert Production Database (Apdb). 

570 

571 Parameters 

572 ---------- 

573 diaSourceTable : `pandas.DataFrame` 

574 Newly detected DiaSources. 

575 legacySolarSystemTable : `pandas.DataFrame` 

576 Not used 

577 diffIm : `lsst.afw.image.ExposureF` 

578 Difference image exposure in which the sources in ``diaSourceCat`` 

579 were detected. 

580 exposure : `lsst.afw.image.ExposureF` 

581 Calibrated exposure differenced with a template to create 

582 ``diffIm``. 

583 template : `lsst.afw.image.ExposureF` 

584 Template exposure used to create diffIm. 

585 preloadedDiaObjects : `pandas.DataFrame` 

586 Previously detected DiaObjects, loaded from the APDB. 

587 preloadedDiaSources : `pandas.DataFrame` 

588 Previously detected DiaSources, loaded from the APDB. 

589 preloadedDiaForcedSources : `pandas.DataFrame` 

590 Catalog of previously detected forced DiaSources, from the APDB 

591 band : `str` 

592 The band in which the new DiaSources were detected. 

593 idGenerator : `lsst.meas.base.IdGenerator` 

594 Object that generates source IDs and random number generator seeds. 

595 solarSystemObjectTable : `astropy.table.Table` 

596 Preloaded Solar System objects expected to be visible in the image. 

597 associationResults : `lsst.pipe.base.Struct`, optional 

598 Result struct that is modified to allow saving of partial outputs 

599 for some failure conditions. If the task completes successfully, 

600 this is also returned. 

601 

602 Returns 

603 ------- 

604 associationResults : `lsst.pipe.base.Struct` 

605 Results struct with components. 

606 

607 - ``apdbMarker`` : Marker dataset to store in the Butler indicating 

608 that this ccdVisit has completed successfully. 

609 (`lsst.dax.apdb.ApdbConfig`) 

610 - ``associatedDiaSources`` : Catalog of newly associated 

611 DiaSources. (`pandas.DataFrame`) 

612 - ``diaForcedSources`` : Catalog of new and previously detected 

613 forced DiaSources. (`pandas.DataFrame`) 

614 - ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`) 

615 - ``associatedSsSources`` : Catalog of ssSource records. 

616 (`pandas.DataFrame`) 

617 

618 Raises 

619 ------ 

620 RuntimeError 

621 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

622 """ 

623 if associationResults is None: 

624 associationResults = pipeBase.Struct() 

625 self._validateExposure(exposure, "Science") 

626 self._validateExposure(diffIm, "Difference") 

627 self._validateExposure(template, "Template") 

628 

629 # Accept either legacySolarSystemTable or optional solarSystemObjectTable. 

630 if legacySolarSystemTable is not None and solarSystemObjectTable is None: 

631 solarSystemObjectTable = Table.from_pandas(legacySolarSystemTable) 

632 if self.config.doReloadDiaObjects: 

633 try: 

634 preloadedDiaObjects = self.loadRefreshedDiaObjects(getRegion(exposure), preloadedDiaObjects) 

635 except Exception as e: 

636 self.log.warning("Error encountered while attempting to load " 

637 "the latest diaObjects from the APDB. Processing " 

638 "will continue with only the diaObjects from " 

639 "preload.", exc_info=e) 

640 finally: 

641 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod( 

642 self.metadata, "loadRefreshedDiaObjects", clock="Utc" 

643 ) 

644 self.log.verbose("Re-loading DiaObjects: Took %.4f seconds", 

645 self.metadata["loadDiaObjectsDuration"]) 

646 

647 else: 

648 self.metadata["loadDiaObjectsDuration"] = -1 

649 

650 self.checkTableIndex(preloadedDiaSources, index=["diaObjectId", "band", "diaSourceId"]) 

651 self.checkTableIndex(preloadedDiaObjects, index="diaObjectId") 

652 self.checkTableIndex(preloadedDiaForcedSources, index=["diaObjectId", "diaForcedSourceId"]) 

653 

654 if not preloadedDiaObjects.empty: 

655 # Include a small buffer outside the image so that we can associate sources near the edge 

656 diaObjects, _ = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects, 

657 buffer=self.config.imagePixelMargin) 

658 else: 

659 self.log.info("Preloaded DiaObject table is empty.") 

660 diaObjects = preloadedDiaObjects 

661 

662 # Associate DiaSources with DiaObjects 

663 assocResults = self.associateDiaSources(diaSourceTable, solarSystemObjectTable, diffIm, diaObjects) 

664 

665 # Set unassociated diaObjectIds and ssObjectIds to NULL, and convert to SDM schema format 

666 standardizedAssociatedDiaSources = self.standardizeDataFrame( 

667 assocResults.associatedDiaSources, 

668 "DiaSource", 

669 nullColumns=["diaObjectId", "ssObjectId"] 

670 ) 

671 

672 # Merge associated diaSources 

673 mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs( 

674 preloadedDiaSources, 

675 assocResults.associatedDiaSources, 

676 diaObjects, 

677 assocResults.newDiaObjects, 

678 diffIm 

679 ) 

680 

681 # Compute DiaObject Summary statistics from their full DiaSource 

682 # history. 

683 diaCalResult = self.diaCalculation.run( 

684 mergedDiaObjects, 

685 mergedDiaSourceHistory, 

686 updatedDiaObjectIds, 

687 [band]) 

688 updatedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaCalResult.updatedDiaObjects, 

689 tableName="DiaObject", skipIndex=True) 

690 

691 # Test for duplication in the updated DiaObjects. 

692 if self.testDataFrameIndex(diaCalResult.diaObjectCat): 

693 raise RuntimeError( 

694 "Duplicate DiaObjects (loaded + updated) created after " 

695 "DiaCalculation. This is unexpected behavior and should be " 

696 "reported. Exiting.") 

697 if self.testDataFrameIndex(updatedDiaObjects): 

698 raise RuntimeError( 

699 "Duplicate DiaObjects (updated) created after " 

700 "DiaCalculation. This is unexpected behavior and should be " 

701 "reported. Exiting.") 

702 

703 # Forced source measurement 

704 if self.config.doRunForcedMeasurement: 

705 diaObjectsForced = self._selectGoodDiaObjects(diaCalResult.diaObjectCat, mergedDiaSourceHistory) 

706 diaForcedSources = self.runForcedMeasurement( 

707 diaObjectsForced, updatedDiaObjects, exposure, diffIm, idGenerator 

708 ) 

709 forcedSourceHistoryThreshold = self.diaForcedSource.config.historyThreshold 

710 else: 

711 # alertPackager needs correct columns 

712 diaForcedSources = makeEmptyForcedSourceTable(self.schema) 

713 forcedSourceHistoryThreshold = 0 

714 

715 # Write results to Alert Production Database (APDB) 

716 try: 

717 validityStart = self.writeToApdb(updatedDiaObjects, standardizedAssociatedDiaSources, 

718 diaForcedSources) 

719 finally: 

720 self.metadata["writeToApdbDuration"] = duration_from_timeMethod(self.metadata, "writeToApdb", 

721 clock="Utc") 

722 # A single log message is easier for Loki to parse than timeMethod's start+end pairs. 

723 self.log.verbose("writeToApdb: Took %.4f seconds", self.metadata["writeToApdbDuration"]) 

724 

725 # For historical reasons, apdbMarker is a Config even if it's not meant to be read. 

726 # A default Config is the cheapest way to satisfy the storage class. 

727 associationResults.apdbMarker = pexConfig.Config() 

728 associationResults.associatedDiaSources = assocResults.associatedDiaSources 

729 associationResults.diaForcedSources = diaForcedSources 

730 associationResults.diaObjects = diaCalResult.diaObjectCat 

731 associationResults.unassociatedSsObjects = assocResults.unassociatedSsObjects 

732 associationResults.newDiaSources = assocResults.newDiaSources 

733 associationResults.marginalDiaSources = assocResults.marginalDiaSources 

734 

735 # Catch *any* error after we have updated the APDB 

736 try: 

737 associatedSsSources = assocResults.associatedSsSources 

738 associatedSsSourcesPlusMpcorb = None 

739 if associatedSsSources is not None: 

740 mpcorbColumns = [col for col in associatedSsSources.columns if col[:7] == 'MPCORB_'] 

741 associatedSsSourceMpcorb = associatedSsSources[mpcorbColumns].copy() 

742 associatedSsSources = self.standardizeTable(associatedSsSources, "SSSource", nullColumns=[]) 

743 associatedSsSourcesPlusMpcorb = associatedSsSources.copy() 

744 for mpcorbColumn in mpcorbColumns: 

745 associatedSsSourcesPlusMpcorb[mpcorbColumn] = associatedSsSourceMpcorb[mpcorbColumn] 

746 associationResults.associatedSsSources = associatedSsSources 

747 

748 # patch the otherwise-empty validityStart field for the alerts 

749 updatedDiaObjects['validityStartMjdTai'] = validityStart.get(system=DateTime.MJD, 

750 scale=DateTime.TAI) 

751 

752 # Package alerts 

753 if self.config.doPackageAlerts: 

754 # Append new forced sources to the full history 

755 diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources, 

756 tableName="DiaForcedSource") 

757 if self.testDataFrameIndex(diaForcedSources): 

758 self.log.warning( 

759 "Duplicate DiaForcedSources created after merge with " 

760 "history and new sources. This may cause downstream " 

761 "problems. Dropping duplicates.") 

762 # Drop duplicates via index and keep the first appearance. 

763 # Reset due to the index shape being slight different than 

764 # expected. 

765 diaForcedSourcesFull = diaForcedSourcesFull.groupby( 

766 diaForcedSourcesFull.index).first() 

767 diaForcedSourcesFull.reset_index(drop=True, inplace=True) 

768 diaForcedSourcesFull.set_index( 

769 ["diaObjectId", "diaForcedSourceId"], 

770 drop=False, 

771 inplace=True) 

772 

773 self.alertPackager.run(assocResults.associatedDiaSources, 

774 updatedDiaObjects, 

775 preloadedDiaSources, 

776 diaForcedSourcesFull, 

777 diffIm, 

778 exposure, 

779 template, 

780 ssSrc=associatedSsSourcesPlusMpcorb, 

781 doRunForcedMeasurement=self.config.doRunForcedMeasurement, 

782 forcedSourceHistoryThreshold=forcedSourceHistoryThreshold, 

783 ) 

784 except Exception as e: 

785 # Catch *any* error after we have updated the APDB 

786 raise PostApdbUpdateError(errorMsg=repr(e)) 

787 

788 return associationResults 

789 

790 def _validateExposure(self, exposure, label): 

791 """Validate exposure metadata early to avoid failures after updating 

792 the APDB. 

793 

794 Parameters 

795 ---------- 

796 exposure : `lsst.afw.image.ExposureF` 

797 Calibrated science image that was subtracted. Corresponds to the 

798 ``exposure`` Butler connection (``{fakesType}calexp``). 

799 label : `str` 

800 Specification of the image being validated, for logging messages. 

801 

802 Raises 

803 ------ 

804 ValueError 

805 Raised when any required metadata field is missing or invalid. 

806 """ 

807 errors = [] 

808 

809 photo_calib = exposure.getPhotoCalib() 

810 if photo_calib is None: 

811 errors.append( 

812 f"{label}: PhotoCalib is None; flux-to-nJy calibration " 

813 "in alert packaging will fail." 

814 ) 

815 else: 

816 mean = photo_calib.getCalibrationMean() 

817 if not np.isfinite(mean) or mean <= 0.0: 

818 errors.append( 

819 f"{label}: PhotoCalib calibration mean is {mean!r}; " 

820 "must be a finite, positive value." 

821 ) 

822 

823 if exposure.getWcs() is None: 

824 errors.append( 

825 f"{label}: WCS is None; sky-to-pixel projection and " 

826 "DIA Object association will fail." 

827 ) 

828 

829 # Skip visitInfo checks for the template 

830 if label != "Template": 

831 visit_info = exposure.getInfo().getVisitInfo() 

832 if visit_info is None: 

833 errors.append( 

834 f"{label}: VisitInfo is None; observation time, pointing, " 

835 "rotation angle, and exposure duration are unavailable." 

836 ) 

837 else: 

838 obs_date = visit_info.getDate() 

839 if not obs_date.isValid(): 

840 errors.append( 

841 f"{label}: VisitInfo.date is invalid." 

842 ) 

843 

844 # boresightRaDec — written to APDB rows and alert payloads. 

845 boresight = visit_info.getBoresightRaDec() 

846 ra_deg = boresight.getRa().asDegrees() 

847 dec_deg = boresight.getDec().asDegrees() 

848 if not np.isfinite(ra_deg) or not np.isfinite(dec_deg): 

849 errors.append( 

850 f"{label}: VisitInfo.boresightRaDec contains NaN " 

851 f"(RA={ra_deg}, Dec={dec_deg}); pointing metadata " 

852 "is missing or corrupt." 

853 ) 

854 

855 rot_deg = visit_info.getBoresightRotAngle().asDegrees() 

856 if not np.isfinite(rot_deg): 

857 errors.append( 

858 f"{label}: VisitInfo.boresightRotAngle is NaN; the " 

859 "ROTPA keyword will be corrupt in all alert cutout " 

860 "FITS headers." 

861 ) 

862 

863 filter_label = exposure.getFilter() 

864 if filter_label is None: 

865 errors.append( 

866 f"{label}: filter label is None." 

867 ) 

868 else: 

869 band = filter_label.bandLabel if filter_label.hasBandLabel() else "" 

870 if not band: 

871 errors.append( 

872 f"{label}: filter label carries no band name." 

873 ) 

874 

875 bbox = exposure.getBBox() 

876 if bbox.getWidth() <= 0 or bbox.getHeight() <= 0: 

877 errors.append( 

878 f"{label}: bounding box is empty " 

879 f"({bbox.getWidth()}×{bbox.getHeight()} px); the " 

880 "readout or ISR step may have failed." 

881 ) 

882 

883 if exposure.getPsf() is None: 

884 errors.append( 

885 f"{label}: PSF model is None." 

886 ) 

887 

888 if errors: 

889 detail = "\n ".join(errors) 

890 raise ValueError( 

891 "Input exposure metadata is invalid; aborting before any " 

892 "APDB writes to prevent non-recoverable data corruption:\n " 

893 + detail 

894 ) 

895 

896 def _selectGoodDiaObjects(self, diaObjects, diaSources): 

897 """ 

898 Extract a subset of diaObjects with multiple associated diaSources that 

899 pass selection requirements. 

900 """ 

901 n_history = self.diaForcedSource.config.historyThreshold 

902 required_columns = ["reliability", "trailLength"] 

903 for col in required_columns: 

904 if col not in diaSources.columns: 

905 raise RuntimeError("Required column '%s' missing from diaSource table." % col) 

906 

907 rel_ok = diaSources["reliability"] >= self.config.forcedReliabilityThreshold 

908 trailed_ok = diaSources["trailLength"] <= self.config.forcedTrailLengthThreshold 

909 badFlags = [f for f in self.config.forcedBadFlags if f in diaSources.columns] 

910 if badFlags: 

911 self.log.info("Excluding diaSources with %s flags set when calculating" 

912 " diaObject history for forced photometry." % badFlags) 

913 # Exclude rows where ANY bad flag is True. 

914 # If bad flag columns can contain NA, treat NA as True (i.e., bad) via fillna(True). 

915 bad_any = diaSources[badFlags].fillna(True).any(axis=1) 

916 mask = rel_ok & trailed_ok & ~bad_any 

917 ids = diaSources.index.get_level_values("diaObjectId")[mask] 

918 

919 # Count occurrences in diaSource 

920 counts = pd.Series(ids, copy=False).value_counts() 

921 

922 # IDs that exceed threshold 

923 keep_ids = counts[counts >= n_history].index 

924 

925 # Since diaObject index is unique, direct index filtering is safe and efficient 

926 return diaObjects.loc[diaObjects.index.intersection(keep_ids)] 

927 

928 def createNewDiaObjects(self, unassociatedDiaSources): 

929 """Loop through the set of DiaSources and create new DiaObjects 

930 for unassociated DiaSources. 

931 

932 Parameters 

933 ---------- 

934 unassociatedDiaSources : `pandas.DataFrame` 

935 Set of DiaSources to create new DiaObjects from. 

936 

937 Returns 

938 ------- 

939 results : `lsst.pipe.base.Struct` 

940 Results struct containing: 

941 

942 - diaSources : `pandas.DataFrame` 

943 DiaSource catalog with updated DiaObject ids. 

944 - newDiaObjects : `pandas.DataFrame` 

945 Newly created DiaObjects from the unassociated DiaSources. 

946 - nNewDiaObjects : `int` 

947 Number of newly created diaObjects. 

948 - marginalDiaSources : `pandas.DataFrame` 

949 Unassociated diaSources with low signal-to-noise and/or 

950 reliability, which are excluded from the new DiaObjects. 

951 """ 

952 marginalDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

953 if len(unassociatedDiaSources) == 0: 

954 newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject") 

955 else: 

956 if self.config.filterUnAssociatedSources: 

957 results = self.filterSources(unassociatedDiaSources) 

958 unassociatedDiaSources = results.goodSources 

959 marginalDiaSources = results.badSources 

960 unassociatedDiaSources["diaObjectId"] = unassociatedDiaSources["diaSourceId"] 

961 newDiaObjects = convertDataFrameToSdmSchema(self.schema, unassociatedDiaSources, 

962 tableName="DiaObject", skipIndex=True) 

963 self.metadata["nRejectedNewDiaObjects"] = len(marginalDiaSources) 

964 return pipeBase.Struct(diaSources=unassociatedDiaSources, 

965 newDiaObjects=newDiaObjects, 

966 nNewDiaObjects=len(newDiaObjects), 

967 marginalDiaSources=marginalDiaSources) 

968 

969 def filterSources(self, sources, snrThreshold=None, lowReliabilitySnrThreshold=None, 

970 reliabilityThreshold=None, lowSnrReliabilityThreshold=None, badFlags=None): 

971 """Select good sources out of a catalog. 

972 

973 Parameters 

974 ---------- 

975 sources : `pandas.DataFrame` 

976 Set of DiaSources to check. 

977 snrThreshold : `float`, optional 

978 The minimum signal to noise diaSource to make a new diaObject. 

979 Included for unit tests. Uses the task config value if not set. 

980 lowReliabilitySnrThreshold : `float`, optional 

981 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

982 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

983 Included for unit tests. Uses the task config value if not set. 

984 reliabilityThreshold : `float`, optional 

985 The minimum reliability score diaSource to make a new diaObject 

986 Included for unit tests. Uses the task config value if not set. 

987 lowSnrReliabilityThreshold : `float`, optional 

988 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for 

989 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold`` 

990 Included for unit tests. Uses the task config value if not set. 

991 badFlags : `list` of `str`, optional 

992 Do not create new diaObjects for any diaSource with any of these 

993 flags set. 

994 Included for unit tests. Uses the task config value if not set. 

995 

996 Returns 

997 ------- 

998 results : `lsst.pipe.base.Struct` 

999 Results struct containing: 

1000 

1001 - goodSources : `pandas.DataFrame` 

1002 Subset of the input sources that pass all checks. 

1003 - badSources : `pandas.DataFrame` 

1004 Subset of the input sources that fail any check. 

1005 """ 

1006 if snrThreshold is None: 

1007 snrThreshold = self.config.newObjectSnrThreshold 

1008 if lowReliabilitySnrThreshold is None: 

1009 lowReliabilitySnrThreshold = self.config.newObjectLowReliabilitySnrThreshold 

1010 if reliabilityThreshold is None: 

1011 reliabilityThreshold = self.config.newObjectReliabilityThreshold 

1012 if lowSnrReliabilityThreshold is None: 

1013 lowSnrReliabilityThreshold = self.config.newObjectLowSnrReliabilityThreshold 

1014 if badFlags is None: 

1015 badFlags = self.config.newObjectBadFlags 

1016 flagged = sources[badFlags].fillna(False).any(axis=1) 

1017 fluxField = self.config.newObjectFluxField 

1018 fluxErrField = fluxField + "Err" 

1019 signalToNoise = np.abs(np.array(sources[fluxField]/sources[fluxErrField])) 

1020 reliability = np.array(sources['reliability']) 

1021 nFlagged = np.count_nonzero(flagged) 

1022 if nFlagged > 0: 

1023 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to flags", nFlagged) 

1024 

1025 if snrThreshold > 0: 

1026 snr_flag = signalToNoise < snrThreshold 

1027 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

1028 " signal to noise < %f", 

1029 np.sum(snr_flag), fluxField, snrThreshold) 

1030 flagged += snr_flag 

1031 if reliabilityThreshold > 0: 

1032 reliability_flag = reliability < reliabilityThreshold 

1033 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to " 

1034 "reliability<%f", 

1035 np.sum(reliability_flag), reliabilityThreshold) 

1036 flagged += reliability_flag 

1037 if min(lowReliabilitySnrThreshold, lowSnrReliabilityThreshold) > 0: 

1038 # Only run the combined test if both thresholds are greater than zero 

1039 lowSnrReliability_flag = ((signalToNoise < lowReliabilitySnrThreshold) 

1040 & (reliability < lowSnrReliabilityThreshold)) 

1041 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux" 

1042 " signal to noise < %f combined with reliability< %f", 

1043 np.sum(lowSnrReliability_flag), 

1044 fluxField, 

1045 lowReliabilitySnrThreshold, 

1046 lowSnrReliabilityThreshold) 

1047 flagged += lowSnrReliability_flag 

1048 

1049 if np.count_nonzero(~flagged) > 0: 

1050 goodSources = sources[~flagged].copy(deep=True) 

1051 else: 

1052 goodSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1053 if np.count_nonzero(flagged) > 0: 

1054 badSources = sources[flagged].copy(deep=True) 

1055 else: 

1056 badSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1057 return pipeBase.Struct(goodSources=goodSources, 

1058 badSources=badSources 

1059 ) 

1060 

1061 @timeMethod 

1062 def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects): 

1063 """Associate DiaSources with DiaObjects. 

1064 

1065 Associate new DiaSources with existing DiaObjects. Create new 

1066 DiaObjects fron unassociated DiaSources. Index DiaSource catalogue 

1067 after associations. Append new DiaObjects and DiaSources to their 

1068 previous history. Test for DiaSource and DiaObject duplications. 

1069 Compute DiaObject Summary statistics from their full DiaSource 

1070 history. Test for duplication in the updated DiaObjects. 

1071 

1072 Parameters 

1073 ---------- 

1074 diaSourceTable : `pandas.DataFrame` 

1075 Newly detected DiaSources. 

1076 solarSystemObjectTable : `astropy.table.Table` 

1077 Preloaded Solar System objects expected to be visible in the image. 

1078 diffIm : `lsst.afw.image.ExposureF` 

1079 Difference image exposure in which the sources in ``diaSourceCat`` 

1080 were detected. 

1081 diaObjects : `pandas.DataFrame` 

1082 Table of DiaObjects from preloaded DiaObjects. 

1083 

1084 Returns 

1085 ------- 

1086 results : `lsst.pipe.base.Struct` 

1087 Results struct containing: 

1088 

1089 - associatedDiaSources : `pandas.DataFrame` 

1090 Associated DiaSources with DiaObjects. 

1091 - newDiaObjects : `pandas.DataFrame` 

1092 Table of new DiaObjects after association. 

1093 - associatedSsSources : `astropy.table.Table` 

1094 Table of new ssSources after association. 

1095 - unassociatedSsObjects : `astropy.table.Table` 

1096 Table of expected ssSources that were not associated with a 

1097 diaSource. 

1098 - newDiaSources : `pandas.DataFrame` 

1099 Subset of `associatedDiaSources` consisting of only the 

1100 unassociated diaSources that were added as new diaObjects. 

1101 - marginalDiaSources : `pandas.DataFrame` 

1102 Unassociated diaSources with marginal detections, which were 

1103 removed from `associatedDiaSources` and were not added as new 

1104 diaObjects. 

1105 """ 

1106 associatedCatalogs = [] 

1107 # First associate diaSources with known asteroids 

1108 if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None: 

1109 ssoAssocResult = self.solarSystemAssociator.run( 

1110 Table.from_pandas(diaSourceTable), 

1111 solarSystemObjectTable, 

1112 diffIm.visitInfo, 

1113 diffIm.getBBox(), 

1114 diffIm.wcs 

1115 ) 

1116 nTotalSsObjects = ssoAssocResult.nTotalSsObjects 

1117 nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects 

1118 associatedSsSources = ssoAssocResult.associatedSsSources 

1119 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects 

1120 if len(ssoAssocResult.ssoAssocDiaSources) > 0: 

1121 associatedCatalogs.append(ssoAssocResult.ssoAssocDiaSources.to_pandas()) 

1122 if len(ssoAssocResult.unAssocDiaSources) > 0: 

1123 # If the table is empty then converting time fields to pandas 

1124 # will raise an error. Pass in an empty Dataframe in that case. 

1125 unAssocSSDiaSources = ssoAssocResult.unAssocDiaSources.to_pandas() 

1126 else: 

1127 unAssocSSDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1128 else: 

1129 unAssocSSDiaSources = diaSourceTable 

1130 nTotalSsObjects = 0 

1131 nAssociatedSsObjects = 0 

1132 associatedSsSources = None 

1133 unassociatedSsObjects = None 

1134 # Associate new DiaSources with existing DiaObjects. 

1135 assocResults = self.associator.run(unAssocSSDiaSources, diaObjects) 

1136 createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources) 

1137 

1138 if not assocResults.matchedDiaSources.empty: 

1139 associatedCatalogs.append(assocResults.matchedDiaSources) 

1140 if not createResults.diaSources.empty: 

1141 associatedCatalogs.append(createResults.diaSources) 

1142 if len(associatedCatalogs) == 0: 

1143 associatedDiaSources = make_empty_catalog(self.schema, tableName="DiaSource") 

1144 else: 

1145 associatedDiaSources = pd.concat(associatedCatalogs) 

1146 

1147 self._add_association_meta_data(assocResults.nUpdatedDiaObjects, 

1148 assocResults.nUnassociatedDiaObjects, 

1149 createResults.nNewDiaObjects, 

1150 nTotalSsObjects, 

1151 nAssociatedSsObjects) 

1152 self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects" 

1153 " and dropping %i marginal diaSources.", 

1154 assocResults.nUpdatedDiaObjects, 

1155 assocResults.nUnassociatedDiaObjects, 

1156 createResults.nNewDiaObjects, 

1157 len(createResults.marginalDiaSources), 

1158 ) 

1159 if createResults.nNewDiaObjects > self.config.maxNewDiaObjects > 0: 

1160 raise TooManyDiaObjectsError(nNewDiaObjects=createResults.nNewDiaObjects, 

1161 threshold=self.config.maxNewDiaObjects) 

1162 return pipeBase.Struct(associatedDiaSources=associatedDiaSources, 

1163 newDiaObjects=createResults.newDiaObjects, 

1164 associatedSsSources=associatedSsSources, 

1165 unassociatedSsObjects=unassociatedSsObjects, 

1166 newDiaSources=createResults.diaSources, 

1167 marginalDiaSources=createResults.marginalDiaSources 

1168 ) 

1169 

1170 def standardizeDataFrame(self, dataFrame, tableName, nullColumns=None): 

1171 """Convert a catalog to SDM schema format with NULL ID values 

1172 

1173 Parameters 

1174 ---------- 

1175 dataFrame : `pandas.DataFrame` 

1176 The catalog to standardize/convert 

1177 tableName : `str` 

1178 Schema name of table to which this dataFrame should be standardized 

1179 nullColumns : `list` of `str`, optional 

1180 List of column names to check for default values of 0 that should be 

1181 replaced with `NULL`. 

1182 

1183 

1184 Returns 

1185 ------- 

1186 standardizedAssociatedDiaSources : pandas.DataFrame 

1187 The standardized DiaSource catalog 

1188 """ 

1189 standardizedDataFrame = convertDataFrameToSdmSchema(self.schema, 

1190 dataFrame, 

1191 tableName=tableName, 

1192 skipIndex=True) 

1193 

1194 def _setNullColumn(dataframe, colName): 

1195 """Set specified columns with default values of 0 to NULL.""" 

1196 dataframe.loc[dataframe[colName] == 0, colName] = pd.NA 

1197 

1198 if nullColumns is not None: 

1199 for colName in nullColumns: 

1200 _setNullColumn(standardizedDataFrame, colName) 

1201 return standardizedDataFrame 

1202 

1203 def standardizeTable(self, table, tableName, nullColumns=None): 

1204 """Convert a catalog to SDM schema format with NULL ID values 

1205 

1206 Parameters 

1207 ---------- 

1208 table : `astropy.table.Table` 

1209 The catalog to standardize/convert 

1210 tableName : `str` 

1211 Schema name of table to which this dataFrame should be standardized 

1212 nullColumns : `list` of `str`, optional 

1213 List of column names to check for default values of 0 that should be 

1214 replaced with `NULL`. 

1215 

1216 

1217 Returns 

1218 ------- 

1219 standardizedAssociatedDiaSources : pandas.DataFrame 

1220 The standardized DiaSource catalog 

1221 """ 

1222 dataFrame = table.to_pandas() 

1223 standardizedDataFrame = self.standardizeDataFrame(dataFrame, tableName, nullColumns=nullColumns) 

1224 return standardizedDataFrame 

1225 

1226 @timeMethod 

1227 def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects, 

1228 diffIm): 

1229 """Merge the associated diaSource and diaObjects to their previous history. 

1230 

1231 Parameters 

1232 ---------- 

1233 preloadedDiaSources : `pandas.DataFrame` 

1234 Previously detected DiaSources, loaded from the APDB. 

1235 associatedDiaSources : `pandas.DataFrame` 

1236 Associated DiaSources with DiaObjects. 

1237 diaObjects : `pandas.DataFrame` 

1238 Table of DiaObjects from preloaded DiaObjects. 

1239 newDiaObjects : `pandas.DataFrame` 

1240 Table of new DiaObjects after association. 

1241 

1242 Raises 

1243 ------ 

1244 RuntimeError 

1245 Raised if duplicate DiaObjects or duplicate DiaSources are found. 

1246 

1247 Returns 

1248 ------- 

1249 mergedDiaSourceHistory : `pandas.DataFrame` 

1250 The combined catalog, with all of the rows from preloadedDiaSources 

1251 catalog ordered before the rows of associatedDiaSources catalog. 

1252 mergedDiaObjects : `pandas.DataFrame` 

1253 Table of new DiaObjects merged with their history. 

1254 updatedDiaObjectIds : `numpy.Array` 

1255 Object Id's from associated diaSources. 

1256 """ 

1257 # Index the DiaSource catalog for this visit after all associations 

1258 # have been made. 

1259 updatedDiaObjectIds = associatedDiaSources["diaObjectId"][ 

1260 associatedDiaSources["diaObjectId"] != 0].to_numpy() 

1261 associatedDiaSources.set_index(["diaObjectId", 

1262 "band", 

1263 "diaSourceId"], 

1264 drop=False, 

1265 inplace=True) 

1266 

1267 # Append new DiaObjects and DiaSources to their previous history. 

1268 if diaObjects.empty: 

1269 mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False) 

1270 elif not newDiaObjects.empty: 

1271 mergedDiaObjects = pd.concat( 

1272 [diaObjects, 

1273 newDiaObjects.set_index("diaObjectId", drop=False)], 

1274 sort=True) 

1275 else: 

1276 mergedDiaObjects = diaObjects 

1277 

1278 # Exclude any objects that are off the image after association. 

1279 mergedDiaObjects, updatedDiaObjectIds = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), 

1280 mergedDiaObjects, 

1281 diaObjectIds=updatedDiaObjectIds, 

1282 buffer=-1) 

1283 if self.testDataFrameIndex(mergedDiaObjects): 

1284 raise RuntimeError( 

1285 "Duplicate DiaObjects created after association. This is " 

1286 "likely due to re-running data with an already populated " 

1287 "Apdb. If this was not the case then there was an unexpected " 

1288 "failure in Association while matching and creating new " 

1289 "DiaObjects and should be reported. Exiting.") 

1290 

1291 mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources, 

1292 tableName="DiaSource") 

1293 

1294 # Test for DiaSource duplication first. If duplicates are found, 

1295 # this likely means this is duplicate data being processed and sent 

1296 # to the Apdb. 

1297 if self.testDataFrameIndex(mergedDiaSourceHistory): 

1298 raise RuntimeError( 

1299 "Duplicate DiaSources found after association and merging " 

1300 "with history. This is likely due to re-running data with an " 

1301 "already populated Apdb. If this was not the case then there " 

1302 "was an unexpected failure in Association while matching " 

1303 "sources to objects, and should be reported. Exiting.") 

1304 # Finally, update the diaObject table with the number of associated diaSources 

1305 mergedUpdatedDiaObjects = self.updateObjectTable(mergedDiaObjects, mergedDiaSourceHistory) 

1306 return (mergedDiaSourceHistory, mergedUpdatedDiaObjects, updatedDiaObjectIds) 

1307 

1308 @timeMethod 

1309 def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator): 

1310 """Forced Source Measurement 

1311 

1312 Forced photometry on the difference and calibrated exposures using the 

1313 new and updated DiaObject locations. 

1314 

1315 Parameters 

1316 ---------- 

1317 diaObjects : `pandas.DataFrame` 

1318 Catalog of DiaObjects. 

1319 updatedDiaObjects : `pandas.DataFrame` 

1320 Catalog of updated DiaObjects. 

1321 exposure : `lsst.afw.image.ExposureF` 

1322 Calibrated exposure differenced with a template to create 

1323 ``diffIm``. 

1324 diffIm : `lsst.afw.image.ExposureF` 

1325 Difference image exposure in which the sources in ``diaSourceCat`` 

1326 were detected. 

1327 idGenerator : `lsst.meas.base.IdGenerator` 

1328 Object that generates source IDs and random number generator seeds. 

1329 

1330 Returns 

1331 ------- 

1332 diaForcedSources : `pandas.DataFrame` 

1333 Catalog of calibrated forced photometered fluxes on both the 

1334 difference and direct images at DiaObject locations. 

1335 """ 

1336 # Force photometer on the Difference and Calibrated exposures using 

1337 # the new and updated DiaObject locations. 

1338 diaForcedSources = self.diaForcedSource.run( 

1339 diaObjects, 

1340 updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(), 

1341 exposure, 

1342 diffIm, 

1343 idGenerator=idGenerator) 

1344 self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB") 

1345 diaForcedSources = convertDataFrameToSdmSchema(self.schema, diaForcedSources, 

1346 tableName="DiaForcedSource", skipIndex=True) 

1347 return diaForcedSources 

1348 

1349 @timeMethod 

1350 def loadRefreshedDiaObjects(self, region, preloadedDiaObjects): 

1351 """Load DiaObjects from the Apdb based on their HTM location. 

1352 

1353 Parameters 

1354 ---------- 

1355 region : `sphgeom.Region` 

1356 Region containing the current exposure to load diaObjects from the 

1357 APDB. 

1358 preloadedDiaObjects : `pandas.DataFrame` 

1359 Previously detected DiaObjects, loaded from the APDB. 

1360 

1361 Returns 

1362 ------- 

1363 diaObjects : `pandas.DataFrame` 

1364 DiaObjects loaded from the Apdb that are within the area defined 

1365 by ``pixelRanges``. 

1366 """ 

1367 angleMargin = lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.) 

1368 diaObjects = self.apdb.getDiaObjects(paddedRegion(region, angleMargin)) 

1369 

1370 diaObjects.set_index("diaObjectId", drop=False, inplace=True) 

1371 if diaObjects.index.has_duplicates: 

1372 self.log.warning( 

1373 "Duplicate DiaObjects loaded from the Apdb. This may cause " 

1374 "downstream pipeline issues. Dropping duplicated rows") 

1375 # Drop duplicates via index and keep the first appearance. 

1376 diaObjects = diaObjects.groupby(diaObjects.index).first() 

1377 self.log.info("Loaded %d DiaObjects", len(diaObjects)) 

1378 refreshedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName="DiaObject", 

1379 skipIndex=True) 

1380 

1381 refreshedIsInPreloaded = refreshedDiaObjects.index.isin(preloadedDiaObjects.index) 

1382 preloadedIsInRefreshed = preloadedDiaObjects.index.isin(refreshedDiaObjects.index) 

1383 nUniqueRefreshed = (~refreshedIsInPreloaded).sum() 

1384 nUniquePreloaded = (~preloadedIsInRefreshed).sum() 

1385 if nUniqueRefreshed > 0: 

1386 self.log.info("Reloading the diaObject table during association yielded " 

1387 "an additional %d objects over the %d preloaded diaObjects.", 

1388 nUniqueRefreshed, len(preloadedDiaObjects)) 

1389 if nUniquePreloaded == 0: 

1390 return refreshedDiaObjects 

1391 elif nUniqueRefreshed == 0: 

1392 return preloadedDiaObjects 

1393 else: 

1394 # We can get "preloaded" diaObjects that don't appear in the APDB with 

1395 # the CI datasets in ap_verify, since those are stored as a dataset and 

1396 # an empty APDB is created. In that case, combine the two catalogs. 

1397 # Precedence is given to the refreshed diaObject catalog. 

1398 return pd.concat([refreshedDiaObjects, preloadedDiaObjects.loc[~preloadedIsInRefreshed]]) 

1399 

1400 @timeMethod 

1401 def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources): 

1402 """Write to the Alert Production Database (Apdb). 

1403 

1404 Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1405 Alert Production Database (Apdb). 

1406 

1407 Parameters 

1408 ---------- 

1409 updatedDiaObjects : `pandas.DataFrame` 

1410 Catalog of updated DiaObjects. 

1411 associatedDiaSources : `pandas.DataFrame` 

1412 Associated DiaSources with DiaObjects. 

1413 diaForcedSources : `pandas.DataFrame` 

1414 Catalog of calibrated forced photometered fluxes on both the 

1415 difference and direct images at DiaObject locations. 

1416 

1417 Returns 

1418 ------- 

1419 validityStart : `lsst.daf.base.DateTime` 

1420 Time at which the APDB was updated. 

1421 """ 

1422 # Store DiaSources, updated DiaObjects, and DiaForcedSources in the 

1423 # Apdb. 

1424 # Drop empty columns that are nullable in the APDB. 

1425 diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject") 

1426 diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource") 

1427 diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource") 

1428 

1429 validityStart = DateTime.now() 

1430 self.apdb.store( 

1431 validityStart.toAstropy(), 

1432 diaObjectStore, 

1433 diaSourceStore, 

1434 diaForcedSourceStore) 

1435 self.log.info("APDB updated.") 

1436 

1437 return validityStart 

1438 

1439 def testDataFrameIndex(self, df): 

1440 """Test the sorted DataFrame index for duplicates. 

1441 

1442 Wrapped as a separate function to allow for mocking of the this task 

1443 in unittesting. Default of a mock return for this test is True. 

1444 

1445 Parameters 

1446 ---------- 

1447 df : `pandas.DataFrame` 

1448 DataFrame to text. 

1449 

1450 Returns 

1451 ------- 

1452 `bool` 

1453 True if DataFrame contains duplicate rows. 

1454 """ 

1455 return df.index.has_duplicates 

1456 

1457 def _add_association_meta_data(self, 

1458 nUpdatedDiaObjects, 

1459 nUnassociatedDiaObjects, 

1460 nNewDiaObjects, 

1461 nTotalSsObjects, 

1462 nAssociatedSsObjects): 

1463 """Store summaries of the association step in the task metadata. 

1464 

1465 Parameters 

1466 ---------- 

1467 nUpdatedDiaObjects : `int` 

1468 Number of previous DiaObjects associated and updated in this 

1469 ccdVisit. 

1470 nUnassociatedDiaObjects : `int` 

1471 Number of previous DiaObjects that were not associated or updated 

1472 in this ccdVisit. 

1473 nNewDiaObjects : `int` 

1474 Number of newly created DiaObjects for this ccdVisit. 

1475 nTotalSsObjects : `int` 

1476 Number of SolarSystemObjects within the observable detector 

1477 area. 

1478 nAssociatedSsObjects : `int` 

1479 Number of successfully associated SolarSystemObjects. 

1480 """ 

1481 self.metadata['numUpdatedDiaObjects'] = nUpdatedDiaObjects 

1482 self.metadata['numUnassociatedDiaObjects'] = nUnassociatedDiaObjects 

1483 self.metadata['numNewDiaObjects'] = nNewDiaObjects 

1484 self.metadata['numTotalSolarSystemObjects'] = nTotalSsObjects 

1485 self.metadata['numAssociatedSsObjects'] = nAssociatedSsObjects 

1486 

1487 def purgeDiaObjects(self, bbox, wcs, diaObjCat, diaObjectIds=None, buffer=0): 

1488 """Drop diaObjects that are outside the exposure bounding box. 

1489 

1490 Parameters 

1491 ---------- 

1492 bbox : `lsst.geom.Box2I` 

1493 Bounding box of the exposure. 

1494 wcs : `lsst.afw.geom.SkyWcs` 

1495 Coordinate system definition (wcs) for the exposure. 

1496 diaObjCat : `pandas.DataFrame` 

1497 DiaObjects loaded from the Apdb. 

1498 buffer : `int`, optional 

1499 Width, in pixels, to pad the exposure bounding box. 

1500 

1501 Returns 

1502 ------- 

1503 diaObjCat : `pandas.DataFrame` 

1504 DiaObjects loaded from the Apdb, restricted to the exposure 

1505 bounding box. 

1506 """ 

1507 try: 

1508 bbox.grow(buffer) 

1509 raVals = diaObjCat.ra.to_numpy() 

1510 decVals = diaObjCat.dec.to_numpy() 

1511 xVals, yVals = wcs.skyToPixelArray(raVals, decVals, degrees=True) 

1512 selector = bbox.contains(xVals, yVals) 

1513 nPurged = np.sum(~selector) 

1514 if nPurged > 0: 

1515 if diaObjectIds is not None: 

1516 # We also need to drop any of the associated IDs if this runs after association 

1517 purgedIds = diaObjCat[~selector].diaObjectId 

1518 diaObjectIds = diaObjectIds[~np.isin(diaObjectIds, purgedIds)] 

1519 self.log.info("Dropped %i diaObjects that were outside the bbox " 

1520 "after association, leaving %i in the catalog", 

1521 nPurged, len(diaObjCat) - nPurged) 

1522 else: 

1523 self.log.info("Dropped %i diaObjects that were outside the padded bbox " 

1524 "before association, leaving %i in the catalog", 

1525 nPurged, len(diaObjCat) - nPurged) 

1526 diaObjCat = diaObjCat[selector].copy() 

1527 except Exception as e: 

1528 self.log.warning("Error attempting to check diaObject history: %s", e) 

1529 return diaObjCat, diaObjectIds 

1530 

1531 def mergeCatalogs(self, originalCatalog, newCatalog, tableName): 

1532 """Combine two catalogs, ensuring that the new catalog conforms to the schema. 

1533 

1534 Parameters 

1535 ---------- 

1536 originalCatalog : `pandas.DataFrame` 

1537 The original catalog to be added to. 

1538 newCatalog : `pandas.DataFrame` 

1539 The new catalog to append to `originalCatalog` 

1540 tableName : `str` 

1541 Name of the table in the schema to use. 

1542 

1543 Returns 

1544 ------- 

1545 mergedCatalog : `pandas.DataFrame` 

1546 The combined catalog, with all of the rows from ``originalCatalog`` 

1547 ordered before the rows of ``newCatalog`` 

1548 """ 

1549 if len(newCatalog) > 0: 

1550 catalog = convertDataFrameToSdmSchema(self.schema, newCatalog, 

1551 tableName=tableName, skipIndex=True) 

1552 

1553 mergedCatalog = pd.concat([originalCatalog, catalog], sort=True) 

1554 else: 

1555 mergedCatalog = pd.concat([originalCatalog], sort=True) 

1556 return mergedCatalog.loc[:, originalCatalog.columns] 

1557 

1558 def updateObjectTable(self, diaObjects, diaSources): 

1559 """Update the diaObject table with the new diaSource records. 

1560 

1561 Parameters 

1562 ---------- 

1563 diaObjects : `pandas.DataFrame` 

1564 Table of new DiaObjects merged with their history. 

1565 diaSources : `pandas.DataFrame` 

1566 The combined preloaded and associated diaSource catalog. 

1567 

1568 Returns 

1569 ------- 

1570 updatedDiaObjects : `pandas.DataFrame` 

1571 Table of DiaObjects updated with the number of associated DiaSources 

1572 """ 

1573 nDiaSources = diaSources[["diaSourceId"]].groupby("diaObjectId").agg(len) 

1574 nDiaSources.rename({"diaSourceId": "nDiaSources"}, errors="raise", axis="columns", inplace=True) 

1575 del diaObjects["nDiaSources"] 

1576 updatedDiaObjects = diaObjects.join(nDiaSources, how="left") 

1577 return updatedDiaObjects 

1578 

1579 @staticmethod 

1580 def checkTableIndex(dataFrame, index): 

1581 if dataFrame.index.name is None: 

1582 # The expected index may or may not be set, depending on whether 

1583 # the table was written originally as a DataFrame or something else 

1584 # Parquet-friendly. 

1585 dataFrame.set_index(index, drop=False, inplace=True)