Coverage for python / lsst / ap / association / diaPipe.py: 19%
369 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:39 +0000
1#
2# LSST Data Management System
3# Copyright 2008-2016 AURA/LSST.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
23"""PipelineTask for associating DiaSources with previous DiaObjects.
25Additionally performs forced photometry on the calibrated and difference
26images at the updated locations of DiaObjects.
27"""
29__all__ = ("DiaPipelineConfig",
30 "DiaPipelineTask",
31 "DiaPipelineConnections")
34import lsst.dax.apdb as daxApdb
35import lsst.pex.config as pexConfig
36import lsst.pipe.base as pipeBase
37import lsst.pipe.base.connectionTypes as connTypes
38import lsst.sphgeom
40from astropy.table import Table
41import numpy as np
42import pandas as pd
43from lsst.ap.association import (
44 AssociationTask,
45 DiaForcedSourceTask,
46 PackageAlertsTask)
48from lsst.ap.association.utils import makeEmptyForcedSourceTable, getRegion, paddedRegion, readSchemaFromApdb
49from lsst.daf.base import DateTime
50from lsst.meas.base import DetectorVisitIdGeneratorConfig, \
51 DiaObjectCalculationTask
52from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema, checkSdmSchemaColumns, \
53 dropEmptyColumns, make_empty_catalog
54from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask
55from lsst.utils.timer import timeMethod, duration_from_timeMethod
58class TooManyDiaObjectsError(pipeBase.AlgorithmError):
59 """Raised if there are an unusually large number of unassociated DiaSources.
60 This is usually indicative of an image subtraction error, and needs to be
61 caught before updating the APDB with a large number of spurious entries.
62 """
63 def __init__(self, *, nNewDiaObjects, threshold):
64 msg = ("Aborting processing before writing to the APDB."
65 f" {nNewDiaObjects} new DiaObjects would be created, which exceeds the"
66 f" configured maximum of {threshold}")
67 super().__init__(msg)
68 self.nNewDiaObjects = nNewDiaObjects
69 self.threshold = threshold
71 @property
72 def metadata(self):
73 return {"nNewDiaObjects": self.nNewDiaObjects,
74 "threshold": self.threshold
75 }
78class DiaPipelineConnections(
79 pipeBase.PipelineTaskConnections,
80 dimensions=("instrument", "visit", "detector"),
81 defaultTemplates={"coaddName": "deep", "fakesType": ""}):
82 """Butler connections for DiaPipelineTask.
83 """
84 diaSourceTable = connTypes.Input(
85 doc="Catalog of calibrated DiaSources.",
86 name="{fakesType}{coaddName}Diff_diaSrcTable",
87 storageClass="DataFrame",
88 dimensions=("instrument", "visit", "detector"),
89 )
90 solarSystemObjectTable = connTypes.Input(
91 doc="Catalog of SolarSolarSystem objects expected to be observable in "
92 "this detectorVisit.",
93 name="preloaded_SsObjects",
94 storageClass="ArrowAstropy",
95 dimensions=("instrument", "group", "detector"),
96 minimum=0,
97 )
98 diffIm = connTypes.Input(
99 doc="Difference image on which the DiaSources were detected.",
100 name="{fakesType}{coaddName}Diff_differenceExp",
101 storageClass="ExposureF",
102 dimensions=("instrument", "visit", "detector"),
103 )
104 exposure = connTypes.Input(
105 doc="Calibrated exposure differenced with a template image during "
106 "image differencing.",
107 name="{fakesType}calexp",
108 storageClass="ExposureF",
109 dimensions=("instrument", "visit", "detector"),
110 )
111 template = connTypes.Input(
112 doc="Warped template used to create `subtractedExposure`. Not PSF "
113 "matched.",
114 dimensions=("instrument", "visit", "detector"),
115 storageClass="ExposureF",
116 name="{fakesType}{coaddName}Diff_templateExp",
117 )
118 preloadedDiaObjects = connTypes.Input(
119 doc="DiaObjects preloaded from the APDB.",
120 name="preloaded_diaObjects",
121 storageClass="DataFrame",
122 dimensions=("instrument", "group", "detector"),
123 )
124 preloadedDiaSources = connTypes.Input(
125 doc="DiaSources preloaded from the APDB.",
126 name="preloaded_diaSources",
127 storageClass="DataFrame",
128 dimensions=("instrument", "group", "detector"),
129 )
130 preloadedDiaForcedSources = connTypes.Input(
131 doc="DiaForcedSources preloaded from the APDB.",
132 name="preloaded_diaForcedSources",
133 storageClass="DataFrame",
134 dimensions=("instrument", "group", "detector"),
135 )
136 apdbMarker = connTypes.Output(
137 doc="Marker dataset storing the configuration of the Apdb for each "
138 "visit/detector. Used to signal the completion of the pipeline.",
139 name="apdb_marker",
140 storageClass="Config",
141 dimensions=("instrument", "visit", "detector"),
142 )
143 associatedDiaSources = connTypes.Output(
144 doc="Optional output storing the DiaSource catalog after matching, "
145 "calibration, and standardization for insertion into the Apdb.",
146 name="{fakesType}{coaddName}Diff_assocDiaSrc",
147 storageClass="ArrowAstropy",
148 dimensions=("instrument", "visit", "detector"),
149 )
150 associatedSsSources = connTypes.Output(
151 doc="Optional output storing ssSource data computed during association.",
152 name="{fakesType}{coaddName}Diff_associatedSsSources",
153 storageClass="ArrowAstropy",
154 dimensions=("instrument", "visit", "detector"),
155 )
156 unassociatedSsObjects = connTypes.Output(
157 doc="Expected locations of an ssObject with no source",
158 name="ssUnassociatedObjects",
159 storageClass="ArrowAstropy",
160 dimensions=("instrument", "visit", "detector"),
161 )
163 diaForcedSources = connTypes.Output(
164 doc="Optional output storing the forced sources computed at the diaObject positions.",
165 name="{fakesType}{coaddName}Diff_diaForcedSrc",
166 storageClass="ArrowAstropy",
167 dimensions=("instrument", "visit", "detector"),
168 )
169 diaObjects = connTypes.Output(
170 doc="Optional output storing the updated diaObjects associated to these sources.",
171 name="{fakesType}{coaddName}Diff_diaObject",
172 storageClass="ArrowAstropy",
173 dimensions=("instrument", "visit", "detector"),
174 )
175 newDiaSources = connTypes.Output(
176 doc="New diaSources not associated with an existing diaObject that"
177 " were used to create a new diaObject",
178 name="{fakesType}new_dia_source",
179 storageClass="ArrowAstropy",
180 dimensions=("instrument", "visit", "detector"),
181 )
182 marginalDiaSources = connTypes.Output(
183 doc="Low SNR diaSources not associated with an existing diaObject that"
184 " were rejected instead of creating a new diaObject",
185 name="{fakesType}marginal_new_dia_source",
186 storageClass="ArrowAstropy",
187 dimensions=("instrument", "visit", "detector"),
188 )
190 def __init__(self, *, config=None):
191 super().__init__(config=config)
193 if not config.doWriteAssociatedSources:
194 self.outputs.remove("associatedDiaSources")
195 self.outputs.remove("diaForcedSources")
196 self.outputs.remove("diaObjects")
197 self.outputs.remove("newDiaSources")
198 self.outputs.remove("marginalDiaSources")
199 else:
200 if not config.doRunForcedMeasurement:
201 self.outputs.remove("diaForcedSources")
202 if not config.filterUnAssociatedSources:
203 self.outputs.remove("newDiaSources")
204 self.outputs.remove("marginalDiaSources")
205 if not config.doSolarSystemAssociation:
206 self.inputs.remove("solarSystemObjectTable")
207 if (not config.doWriteAssociatedSources) or (not config.doSolarSystemAssociation):
208 self.outputs.remove("associatedSsSources")
209 self.outputs.remove("unassociatedSsObjects")
211 def adjustQuantum(self, inputs, outputs, label, dataId):
212 """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
213 in the `lsst.daf.butler.core.Quantum` during the graph generation stage
214 of the activator.
216 This implementation checks to make sure that the filters in the dataset
217 are compatible with AP processing as set by the Apdb/DPDD schema.
219 Parameters
220 ----------
221 inputs : `dict`
222 Dictionary whose keys are an input (regular or prerequisite)
223 connection name and whose values are a tuple of the connection
224 instance and a collection of associated `DatasetRef` objects.
225 The exact type of the nested collections is unspecified; it can be
226 assumed to be multi-pass iterable and support `len` and ``in``, but
227 it should not be mutated in place. In contrast, the outer
228 dictionaries are guaranteed to be temporary copies that are true
229 `dict` instances, and hence may be modified and even returned; this
230 is especially useful for delegating to `super` (see notes below).
231 outputs : `dict`
232 Dict of output datasets, with the same structure as ``inputs``.
233 label : `str`
234 Label for this task in the pipeline (should be used in all
235 diagnostic messages).
236 data_id : `lsst.daf.butler.DataCoordinate`
237 Data ID for this quantum in the pipeline (should be used in all
238 diagnostic messages).
240 Returns
241 -------
242 adjusted_inputs : `dict`
243 Dict of the same form as ``inputs`` with updated containers of
244 input `DatasetRef` objects. Connections that are not changed
245 should not be returned at all. Datasets may only be removed, not
246 added. Nested collections may be of any multi-pass iterable type,
247 and the order of iteration will set the order of iteration within
248 `PipelineTask.runQuantum`.
249 adjusted_outputs : `dict`
250 Dict of updated output datasets, with the same structure and
251 interpretation as ``adjusted_inputs``.
253 Raises
254 ------
255 ScalarError
256 Raised if any `Input` or `PrerequisiteInput` connection has
257 ``multiple`` set to `False`, but multiple datasets.
258 NoWorkFound
259 Raised to indicate that this quantum should not be run; not enough
260 datasets were found for a regular `Input` connection, and the
261 quantum should be pruned or skipped.
262 FileNotFoundError
263 Raised to cause QuantumGraph generation to fail (with the message
264 included in this exception); not enough datasets were found for a
265 `PrerequisiteInput` connection.
266 """
267 _, refs = inputs["diffIm"]
268 for ref in refs:
269 if ref.dataId["band"] not in self.config.validBands:
270 raise ValueError(
271 f"Requested '{ref.dataId['band']}' not in "
272 "DiaPipelineConfig.validBands. To process bands not in "
273 "the standard Rubin set (ugrizy) you must add the band to "
274 "the validBands list in DiaPipelineConfig and add the "
275 "appropriate columns to the Apdb schema.")
276 return super().adjustQuantum(inputs, outputs, label, dataId)
279class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
280 pipelineConnections=DiaPipelineConnections):
281 """Config for DiaPipelineTask.
282 """
283 coaddName = pexConfig.Field(
284 doc="coadd name: typically one of deep, goodSeeing, or dcr",
285 dtype=str,
286 default="deep",
287 )
288 apdb_config_url = pexConfig.Field(
289 dtype=str,
290 default=None,
291 optional=False,
292 doc="A config file specifying the APDB and its connection parameters, "
293 "typically written by the apdb-cli command-line utility. "
294 "The database must already be initialized.",
295 )
296 validBands = pexConfig.ListField(
297 dtype=str,
298 default=["u", "g", "r", "i", "z", "y"],
299 doc="List of bands that are valid for AP processing. To process a "
300 "band not on this list, the appropriate band specific columns "
301 "must be added to the Apdb schema in dax_apdb.",
302 )
303 associator = pexConfig.ConfigurableField(
304 target=AssociationTask,
305 doc="Task used to associate DiaSources with DiaObjects.",
306 )
307 doSolarSystemAssociation = pexConfig.Field(
308 dtype=bool,
309 default=True,
310 doc="Process SolarSystem objects through the pipeline.",
311 )
312 solarSystemAssociator = pexConfig.ConfigurableField(
313 target=SolarSystemAssociationTask,
314 doc="Task used to associate DiaSources with SolarSystemObjects.",
315 )
316 diaCalculation = pexConfig.ConfigurableField(
317 target=DiaObjectCalculationTask,
318 doc="Task to compute summary statistics for DiaObjects.",
319 )
320 doReloadDiaObjects = pexConfig.Field(
321 dtype=bool,
322 default=True,
323 doc="Drop preloaded DiaObjects and reload them from the APDB?"
324 "Used in production when the very latest objects from the APDB "
325 "are needed.",
326 )
327 angleMargin = pexConfig.RangeField(
328 doc="Padding to add when loading diaObjects if `doReloadDiaObjects=True`",
329 dtype=float,
330 default=2,
331 min=0,
332 )
333 doRunForcedMeasurement = pexConfig.Field(
334 dtype=bool,
335 default=True,
336 deprecated="Added to allow disabling forced sources for performance "
337 "reasons during the ops rehearsal. "
338 "It is expected to be removed.",
339 doc="Run forced measurement on all of the diaObjects? "
340 "This should only be turned off for debugging purposes.",
341 )
342 diaForcedSource = pexConfig.ConfigurableField(
343 target=DiaForcedSourceTask,
344 doc="Task used for force photometer DiaObject locations in direct and "
345 "difference images.",
346 )
347 alertPackager = pexConfig.ConfigurableField(
348 target=PackageAlertsTask,
349 doc="Subtask for packaging Ap data into alerts.",
350 )
351 doPackageAlerts = pexConfig.Field(
352 dtype=bool,
353 default=False,
354 doc="Package Dia-data into serialized alerts for distribution and "
355 "write them to disk.",
356 )
357 doWriteAssociatedSources = pexConfig.Field(
358 dtype=bool,
359 default=True,
360 doc="Write out associated DiaSources, DiaForcedSources, and DiaObjects, "
361 "formatted following the Science Data Model.",
362 )
363 imagePixelMargin = pexConfig.RangeField(
364 dtype=int,
365 default=10,
366 min=0,
367 doc="Pad the image by this many pixels before removing off-image "
368 "diaObjects for association.",
369 )
370 filterUnAssociatedSources = pexConfig.Field(
371 dtype=bool,
372 default=True,
373 doc="Check unassociated diaSources for quality before creating new ."
374 "diaObjects. DiaSources that are associated with existing diaObjects "
375 "will not be affected."
376 )
377 newObjectBadFlags = pexConfig.ListField(
378 dtype=str,
379 default=("centroid_flag",
380 "pixelFlags_crCenter",
381 "pixelFlags_nodataCenter",
382 "pixelFlags_interpolatedCenter",
383 "pixelFlags_saturatedCenter",
384 "pixelFlags_suspectCenter",
385 "pixelFlags_streakCenter",
386 "glint_trail"),
387 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
388 "these flags set before creating new diaObjects.",
389 )
390 maxNewDiaObjects = pexConfig.RangeField(
391 dtype=float,
392 default=0,
393 min=0,
394 doc="Maximum number of new DiaObjects to create before raising an error."
395 "Set to zero to disable.",
396 )
397 newObjectSnrThreshold = pexConfig.RangeField(
398 dtype=float,
399 default=0,
400 min=0,
401 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
402 "Abs(flux/fluxErr) less than this threshold before creating new "
403 "diaObjects."
404 "Set to zero to disable.",
405 )
406 newObjectLowReliabilitySnrThreshold = pexConfig.RangeField(
407 dtype=float,
408 default=0,
409 min=0,
410 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
411 "signal-to-noise ratios less than this threshold if they have"
412 " low reliability scores before creating new diaObjects."
413 "Set to zero to disable.",
414 )
415 newObjectReliabilityThreshold = pexConfig.RangeField(
416 dtype=float,
417 default=0.1,
418 min=0,
419 max=1,
420 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
421 "reliability scores less than this threshold before creating new "
422 "diaObjects."
423 "Set to zero to disable.",
424 )
425 newObjectLowSnrReliabilityThreshold = pexConfig.RangeField(
426 dtype=float,
427 default=0.1,
428 min=0,
429 max=1,
430 doc="If `filterUnAssociatedSources` is set, exclude diaSources with "
431 "low signal-to-noise and reliability scores below this threshold "
432 "before creating new diaObjects."
433 "Set to zero to disable.",
434 )
435 newObjectFluxField = pexConfig.Field(
436 dtype=str,
437 default="apFlux",
438 doc="Name of the flux field in the standardized diaSource catalog to "
439 "use for checking the signal-to-noise before creating new diaObjects.",
440 )
441 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
443 def setDefaults(self):
444 self.diaCalculation.plugins = ["ap_meanPosition",
445 "ap_nDiaSources",
446 "ap_meanFlux",
447 "ap_sigmaFlux",
448 "ap_minMaxFlux",
449 "ap_maxSlopeFlux",
450 "ap_meanErrFlux",
451 "ap_meanTotFlux"]
454class DiaPipelineTask(pipeBase.PipelineTask):
455 """Task for loading, associating and storing Difference Image Analysis
456 (DIA) Objects and Sources.
457 """
458 ConfigClass = DiaPipelineConfig
459 _DefaultName = "diaPipe"
461 def __init__(self, initInputs=None, **kwargs):
462 super().__init__(**kwargs)
463 self.apdb = daxApdb.Apdb.from_uri(self.config.apdb_config_url)
464 self.schema = readSchemaFromApdb(self.apdb)
465 self.makeSubtask("associator")
466 self.makeSubtask("diaCalculation")
467 if self.config.doRunForcedMeasurement:
468 self.makeSubtask("diaForcedSource")
469 if self.config.doPackageAlerts:
470 self.makeSubtask("alertPackager")
471 if self.config.doSolarSystemAssociation:
472 self.makeSubtask("solarSystemAssociator")
473 if self.config.filterUnAssociatedSources:
474 columns = [self.config.newObjectFluxField,
475 self.config.newObjectFluxField + "Err",
476 "reliability"]
477 columns += self.config.newObjectBadFlags
479 missing = checkSdmSchemaColumns(self.schema, columns, "DiaSource")
480 if missing:
481 raise pipeBase.InvalidQuantumError("Field %s not in the DiaSource schema" % missing)
483 def runQuantum(self, butlerQC, inputRefs, outputRefs):
484 inputs = butlerQC.get(inputRefs)
485 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
486 inputs["band"] = butlerQC.quantum.dataId["band"]
487 inputs["legacySolarSystemTable"] = None
488 if not self.config.doSolarSystemAssociation:
489 inputs["solarSystemObjectTable"] = None
491 outputs = self.run(**inputs)
493 butlerQC.put(outputs, outputRefs)
495 @timeMethod
496 def run(self,
497 diaSourceTable,
498 legacySolarSystemTable,
499 diffIm,
500 exposure,
501 template,
502 preloadedDiaObjects,
503 preloadedDiaSources,
504 preloadedDiaForcedSources,
505 band,
506 idGenerator,
507 solarSystemObjectTable=None):
508 """Process DiaSources and DiaObjects.
510 Load previous DiaObjects and their DiaSource history. Calibrate the
511 values in the diaSourceCat. Associate new DiaSources with previous
512 DiaObjects. Run forced photometry at the updated DiaObject locations.
513 Store the results in the Alert Production Database (Apdb).
515 Parameters
516 ----------
517 diaSourceTable : `pandas.DataFrame`
518 Newly detected DiaSources.
519 legacySolarSystemTable : `pandas.DataFrame`
520 Not used
521 diffIm : `lsst.afw.image.ExposureF`
522 Difference image exposure in which the sources in ``diaSourceCat``
523 were detected.
524 exposure : `lsst.afw.image.ExposureF`
525 Calibrated exposure differenced with a template to create
526 ``diffIm``.
527 template : `lsst.afw.image.ExposureF`
528 Template exposure used to create diffIm.
529 preloadedDiaObjects : `pandas.DataFrame`
530 Previously detected DiaObjects, loaded from the APDB.
531 preloadedDiaSources : `pandas.DataFrame`
532 Previously detected DiaSources, loaded from the APDB.
533 preloadedDiaForcedSources : `pandas.DataFrame`
534 Catalog of previously detected forced DiaSources, from the APDB
535 band : `str`
536 The band in which the new DiaSources were detected.
537 idGenerator : `lsst.meas.base.IdGenerator`
538 Object that generates source IDs and random number generator seeds.
539 solarSystemObjectTable : `astropy.table.Table`
540 Preloaded Solar System objects expected to be visible in the image.
542 Returns
543 -------
544 results : `lsst.pipe.base.Struct`
545 Results struct with components.
547 - ``apdbMarker`` : Marker dataset to store in the Butler indicating
548 that this ccdVisit has completed successfully.
549 (`lsst.dax.apdb.ApdbConfig`)
550 - ``associatedDiaSources`` : Catalog of newly associated
551 DiaSources. (`pandas.DataFrame`)
552 - ``diaForcedSources`` : Catalog of new and previously detected
553 forced DiaSources. (`pandas.DataFrame`)
554 - ``diaObjects`` : Updated table of DiaObjects. (`pandas.DataFrame`)
555 - ``associatedSsSources`` : Catalog of ssSource records.
556 (`pandas.DataFrame`)
558 Raises
559 ------
560 RuntimeError
561 Raised if duplicate DiaObjects or duplicate DiaSources are found.
562 """
563 # Accept either legacySolarSystemTable or optional solarSystemObjectTable.
564 if legacySolarSystemTable is not None and solarSystemObjectTable is None:
565 solarSystemObjectTable = Table.from_pandas(legacySolarSystemTable)
566 if self.config.doReloadDiaObjects:
567 try:
568 preloadedDiaObjects = self.loadRefreshedDiaObjects(getRegion(exposure), preloadedDiaObjects)
569 except Exception as e:
570 self.log.warning("Error encountered while attempting to load "
571 "the latest diaObjects from the APDB. Processing "
572 "will continue with only the diaObjects from "
573 "preload. Error:", e)
574 finally:
575 self.metadata["loadDiaObjectsDuration"] = duration_from_timeMethod(
576 self.metadata, "loadRefreshedDiaObjects", clock="Utc"
577 )
578 self.log.verbose("Re-loading DiaObjects: Took %.4f seconds",
579 self.metadata["loadDiaObjectsDuration"])
581 else:
582 self.metadata["loadDiaObjectsDuration"] = -1
584 self.checkTableIndex(preloadedDiaSources, index=["diaObjectId", "band", "diaSourceId"])
585 self.checkTableIndex(preloadedDiaObjects, index="diaObjectId")
586 self.checkTableIndex(preloadedDiaForcedSources, index=["diaObjectId", "diaForcedSourceId"])
588 if not preloadedDiaObjects.empty:
589 # Include a small buffer outside the image so that we can associate sources near the edge
590 diaObjects, _ = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects,
591 buffer=self.config.imagePixelMargin)
592 else:
593 self.log.info("Preloaded DiaObject table is empty.")
594 diaObjects = preloadedDiaObjects
596 # Associate DiaSources with DiaObjects
597 assocResults = self.associateDiaSources(diaSourceTable, solarSystemObjectTable, diffIm, diaObjects)
599 # Set unassociated diaObjectIds and ssObjectIds to NULL, and convert to SDM schema format
600 standardizedAssociatedDiaSources = self.standardizeDataFrame(
601 assocResults.associatedDiaSources,
602 "DiaSource",
603 nullColumns=["diaObjectId", "ssObjectId"]
604 )
606 # Merge associated diaSources
607 mergedDiaSourceHistory, mergedDiaObjects, updatedDiaObjectIds = self.mergeAssociatedCatalogs(
608 preloadedDiaSources,
609 assocResults.associatedDiaSources,
610 diaObjects,
611 assocResults.newDiaObjects,
612 diffIm
613 )
615 # Compute DiaObject Summary statistics from their full DiaSource
616 # history.
617 diaCalResult = self.diaCalculation.run(
618 mergedDiaObjects,
619 mergedDiaSourceHistory,
620 updatedDiaObjectIds,
621 [band])
622 updatedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaCalResult.updatedDiaObjects,
623 tableName="DiaObject", skipIndex=True)
625 # Test for duplication in the updated DiaObjects.
626 if self.testDataFrameIndex(diaCalResult.diaObjectCat):
627 raise RuntimeError(
628 "Duplicate DiaObjects (loaded + updated) created after "
629 "DiaCalculation. This is unexpected behavior and should be "
630 "reported. Exiting.")
631 if self.testDataFrameIndex(updatedDiaObjects):
632 raise RuntimeError(
633 "Duplicate DiaObjects (updated) created after "
634 "DiaCalculation. This is unexpected behavior and should be "
635 "reported. Exiting.")
637 # Forced source measurement
638 if self.config.doRunForcedMeasurement:
639 diaForcedSources = self.runForcedMeasurement(
640 diaCalResult.diaObjectCat, updatedDiaObjects, exposure, diffIm, idGenerator
641 )
642 forcedSourceHistoryThreshold = self.diaForcedSource.config.historyThreshold
643 else:
644 # alertPackager needs correct columns
645 diaForcedSources = makeEmptyForcedSourceTable(self.schema)
646 forcedSourceHistoryThreshold = 0
648 # Write results to Alert Production Database (APDB)
649 try:
650 validityStart = self.writeToApdb(updatedDiaObjects, standardizedAssociatedDiaSources,
651 diaForcedSources)
652 finally:
653 self.metadata["writeToApdbDuration"] = duration_from_timeMethod(self.metadata, "writeToApdb",
654 clock="Utc")
655 # A single log message is easier for Loki to parse than timeMethod's start+end pairs.
656 self.log.verbose("writeToApdb: Took %.4f seconds", self.metadata["writeToApdbDuration"])
658 associatedSsSources = assocResults.associatedSsSources
659 associatedSsSourcesPlusMpcorb = None
660 if associatedSsSources is not None:
661 mpcorbColumns = [col for col in associatedSsSources.columns if col[:7] == 'MPCORB_']
662 associatedSsSourceMpcorb = associatedSsSources[mpcorbColumns].copy()
663 associatedSsSources = self.standardizeTable(associatedSsSources, "SSSource", nullColumns=[])
664 associatedSsSourcesPlusMpcorb = associatedSsSources.copy()
665 for mpcorbColumn in mpcorbColumns:
666 associatedSsSourcesPlusMpcorb[mpcorbColumn] = associatedSsSourceMpcorb[mpcorbColumn]
668 # patch the otherwise-empty validityStart field for the alerts
669 updatedDiaObjects['validityStartMjdTai'] = validityStart.get(system=DateTime.MJD, scale=DateTime.TAI)
671 # Package alerts
672 if self.config.doPackageAlerts:
673 # Append new forced sources to the full history
674 diaForcedSourcesFull = self.mergeCatalogs(preloadedDiaForcedSources, diaForcedSources,
675 tableName="DiaForcedSource")
676 if self.testDataFrameIndex(diaForcedSources):
677 self.log.warning(
678 "Duplicate DiaForcedSources created after merge with "
679 "history and new sources. This may cause downstream "
680 "problems. Dropping duplicates.")
681 # Drop duplicates via index and keep the first appearance.
682 # Reset due to the index shape being slight different than
683 # expected.
684 diaForcedSourcesFull = diaForcedSourcesFull.groupby(
685 diaForcedSourcesFull.index).first()
686 diaForcedSourcesFull.reset_index(drop=True, inplace=True)
687 diaForcedSourcesFull.set_index(
688 ["diaObjectId", "diaForcedSourceId"],
689 drop=False,
690 inplace=True)
692 self.alertPackager.run(assocResults.associatedDiaSources,
693 updatedDiaObjects,
694 preloadedDiaSources,
695 diaForcedSourcesFull,
696 diffIm,
697 exposure,
698 template,
699 ssSrc=associatedSsSourcesPlusMpcorb,
700 doRunForcedMeasurement=self.config.doRunForcedMeasurement,
701 forcedSourceHistoryThreshold=forcedSourceHistoryThreshold,
702 )
704 # For historical reasons, apdbMarker is a Config even if it's not meant to be read.
705 # A default Config is the cheapest way to satisfy the storage class.
706 marker = pexConfig.Config()
707 return pipeBase.Struct(apdbMarker=marker,
708 associatedDiaSources=assocResults.associatedDiaSources,
709 diaForcedSources=diaForcedSources,
710 diaObjects=diaCalResult.diaObjectCat,
711 associatedSsSources=associatedSsSources,
712 unassociatedSsObjects=assocResults.unassociatedSsObjects,
713 newDiaSources=assocResults.newDiaSources,
714 marginalDiaSources=assocResults.marginalDiaSources
715 )
717 def createNewDiaObjects(self, unassociatedDiaSources):
718 """Loop through the set of DiaSources and create new DiaObjects
719 for unassociated DiaSources.
721 Parameters
722 ----------
723 unassociatedDiaSources : `pandas.DataFrame`
724 Set of DiaSources to create new DiaObjects from.
726 Returns
727 -------
728 results : `lsst.pipe.base.Struct`
729 Results struct containing:
731 - diaSources : `pandas.DataFrame`
732 DiaSource catalog with updated DiaObject ids.
733 - newDiaObjects : `pandas.DataFrame`
734 Newly created DiaObjects from the unassociated DiaSources.
735 - nNewDiaObjects : `int`
736 Number of newly created diaObjects.
737 - marginalDiaSources : `pandas.DataFrame`
738 Unassociated diaSources with low signal-to-noise and/or
739 reliability, which are excluded from the new DiaObjects.
740 """
741 marginalDiaSources = make_empty_catalog(self.schema, tableName="DiaSource")
742 if len(unassociatedDiaSources) == 0:
743 newDiaObjects = make_empty_catalog(self.schema, tableName="DiaObject")
744 else:
745 if self.config.filterUnAssociatedSources:
746 results = self.filterSources(unassociatedDiaSources)
747 unassociatedDiaSources = results.goodSources
748 marginalDiaSources = results.badSources
749 unassociatedDiaSources["diaObjectId"] = unassociatedDiaSources["diaSourceId"]
750 newDiaObjects = convertDataFrameToSdmSchema(self.schema, unassociatedDiaSources,
751 tableName="DiaObject", skipIndex=True)
752 self.metadata["nRejectedNewDiaObjects"] = len(marginalDiaSources)
753 return pipeBase.Struct(diaSources=unassociatedDiaSources,
754 newDiaObjects=newDiaObjects,
755 nNewDiaObjects=len(newDiaObjects),
756 marginalDiaSources=marginalDiaSources)
758 def filterSources(self, sources, snrThreshold=None, lowReliabilitySnrThreshold=None,
759 reliabilityThreshold=None, lowSnrReliabilityThreshold=None, badFlags=None):
760 """Select good sources out of a catalog.
762 Parameters
763 ----------
764 sources : `pandas.DataFrame`
765 Set of DiaSources to check.
766 snrThreshold : `float`, optional
767 The minimum signal to noise diaSource to make a new diaObject.
768 Included for unit tests. Uses the task config value if not set.
769 lowReliabilitySnrThreshold : `float`, optional
770 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for
771 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold``
772 Included for unit tests. Uses the task config value if not set.
773 reliabilityThreshold : `float`, optional
774 The minimum reliability score diaSource to make a new diaObject
775 Included for unit tests. Uses the task config value if not set.
776 lowSnrReliabilityThreshold : `float`, optional
777 Use ``lowSnrReliabilityThreshold`` as the reliability threshold for
778 diaSources with ``snrThreshold`` < SNR < ``lowReliabilitySnrThreshold``
779 Included for unit tests. Uses the task config value if not set.
780 badFlags : `list` of `str`, optional
781 Do not create new diaObjects for any diaSource with any of these
782 flags set.
783 Included for unit tests. Uses the task config value if not set.
785 Returns
786 -------
787 results : `lsst.pipe.base.Struct`
788 Results struct containing:
790 - goodSources : `pandas.DataFrame`
791 Subset of the input sources that pass all checks.
792 - badSources : `pandas.DataFrame`
793 Subset of the input sources that fail any check.
794 """
795 if snrThreshold is None:
796 snrThreshold = self.config.newObjectSnrThreshold
797 if lowReliabilitySnrThreshold is None:
798 lowReliabilitySnrThreshold = self.config.newObjectLowReliabilitySnrThreshold
799 if reliabilityThreshold is None:
800 reliabilityThreshold = self.config.newObjectReliabilityThreshold
801 if lowSnrReliabilityThreshold is None:
802 lowSnrReliabilityThreshold = self.config.newObjectLowSnrReliabilityThreshold
803 if badFlags is None:
804 badFlags = self.config.newObjectBadFlags
805 flagged = sources[badFlags].fillna(False).any(axis=1)
806 fluxField = self.config.newObjectFluxField
807 fluxErrField = fluxField + "Err"
808 signalToNoise = np.abs(np.array(sources[fluxField]/sources[fluxErrField]))
809 reliability = np.array(sources['reliability'])
810 nFlagged = np.count_nonzero(flagged)
811 if nFlagged > 0:
812 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to flags", nFlagged)
814 if snrThreshold > 0:
815 snr_flag = signalToNoise < snrThreshold
816 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux"
817 " signal to noise < %f",
818 np.sum(snr_flag), fluxField, snrThreshold)
819 flagged += snr_flag
820 if reliabilityThreshold > 0:
821 reliability_flag = reliability < reliabilityThreshold
822 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to "
823 "reliability<%f",
824 np.sum(reliability_flag), reliabilityThreshold)
825 flagged += reliability_flag
826 if min(lowReliabilitySnrThreshold, lowSnrReliabilityThreshold) > 0:
827 # Only run the combined test if both thresholds are greater than zero
828 lowSnrReliability_flag = ((signalToNoise < lowReliabilitySnrThreshold)
829 & (reliability < lowSnrReliabilityThreshold))
830 self.log.info("Not creating new diaObjects for %i unassociated diaSources due to %sFlux"
831 " signal to noise < %f combined with reliability< %f",
832 np.sum(lowSnrReliability_flag),
833 fluxField,
834 lowReliabilitySnrThreshold,
835 lowSnrReliabilityThreshold)
836 flagged += lowSnrReliability_flag
838 if np.count_nonzero(~flagged) > 0:
839 goodSources = sources[~flagged].copy(deep=True)
840 else:
841 goodSources = make_empty_catalog(self.schema, tableName="DiaSource")
842 if np.count_nonzero(flagged) > 0:
843 badSources = sources[flagged].copy(deep=True)
844 else:
845 badSources = make_empty_catalog(self.schema, tableName="DiaSource")
846 return pipeBase.Struct(goodSources=goodSources,
847 badSources=badSources
848 )
850 @timeMethod
851 def associateDiaSources(self, diaSourceTable, solarSystemObjectTable, diffIm, diaObjects):
852 """Associate DiaSources with DiaObjects.
854 Associate new DiaSources with existing DiaObjects. Create new
855 DiaObjects fron unassociated DiaSources. Index DiaSource catalogue
856 after associations. Append new DiaObjects and DiaSources to their
857 previous history. Test for DiaSource and DiaObject duplications.
858 Compute DiaObject Summary statistics from their full DiaSource
859 history. Test for duplication in the updated DiaObjects.
861 Parameters
862 ----------
863 diaSourceTable : `pandas.DataFrame`
864 Newly detected DiaSources.
865 solarSystemObjectTable : `astropy.table.Table`
866 Preloaded Solar System objects expected to be visible in the image.
867 diffIm : `lsst.afw.image.ExposureF`
868 Difference image exposure in which the sources in ``diaSourceCat``
869 were detected.
870 diaObjects : `pandas.DataFrame`
871 Table of DiaObjects from preloaded DiaObjects.
873 Returns
874 -------
875 results : `lsst.pipe.base.Struct`
876 Results struct containing:
878 - associatedDiaSources : `pandas.DataFrame`
879 Associated DiaSources with DiaObjects.
880 - newDiaObjects : `pandas.DataFrame`
881 Table of new DiaObjects after association.
882 - associatedSsSources : `astropy.table.Table`
883 Table of new ssSources after association.
884 - unassociatedSsObjects : `astropy.table.Table`
885 Table of expected ssSources that were not associated with a
886 diaSource.
887 - newDiaSources : `pandas.DataFrame`
888 Subset of `associatedDiaSources` consisting of only the
889 unassociated diaSources that were added as new diaObjects.
890 - marginalDiaSources : `pandas.DataFrame`
891 Unassociated diaSources with marginal detections, which were
892 removed from `associatedDiaSources` and were not added as new
893 diaObjects.
894 """
895 # Associate new DiaSources with existing DiaObjects.
896 assocResults = self.associator.run(diaSourceTable, diaObjects)
897 # TODO: work out how to mark asteroid-diaObj overlaps as contaminated
898 toAssociate = []
899 if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None:
900 ssoAssocResult = self.solarSystemAssociator.run(
901 Table.from_pandas(assocResults.unAssocDiaSources),
902 solarSystemObjectTable,
903 diffIm.visitInfo,
904 diffIm.getBBox(),
905 diffIm.wcs
906 )
907 # Create new DiaObjects from unassociated diaSources.
908 if len(ssoAssocResult.unAssocDiaSources) > 0:
909 # If the table is empty then converting time fields to pandas
910 # will raise an error. Pass in an empty Dataframe in that case.
911 createResults = self.createNewDiaObjects(ssoAssocResult.unAssocDiaSources.to_pandas())
912 else:
913 createResults = self.createNewDiaObjects(pd.DataFrame())
914 if len(ssoAssocResult.ssoAssocDiaSources) > 0:
915 toAssociate.append(ssoAssocResult.ssoAssocDiaSources.to_pandas())
916 nTotalSsObjects = ssoAssocResult.nTotalSsObjects
917 nAssociatedSsObjects = ssoAssocResult.nAssociatedSsObjects
918 associatedSsSources = ssoAssocResult.associatedSsSources
919 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects
921 else:
922 # Create new DiaObjects from unassociated diaSources.
923 createResults = self.createNewDiaObjects(assocResults.unAssocDiaSources)
924 nTotalSsObjects = 0
925 nAssociatedSsObjects = 0
926 associatedSsSources = None
927 unassociatedSsObjects = None
928 if not assocResults.matchedDiaSources.empty:
929 toAssociate.append(assocResults.matchedDiaSources)
930 if not createResults.diaSources.empty:
931 toAssociate.append(createResults.diaSources)
932 if len(toAssociate) == 0:
933 associatedDiaSources = make_empty_catalog(self.schema, tableName="DiaSource")
934 else:
935 associatedDiaSources = pd.concat(toAssociate)
937 self._add_association_meta_data(assocResults.nUpdatedDiaObjects,
938 assocResults.nUnassociatedDiaObjects,
939 createResults.nNewDiaObjects,
940 nTotalSsObjects,
941 nAssociatedSsObjects)
942 self.log.info("%i updated and %i unassociated diaObjects. Creating %i new diaObjects"
943 " and dropping %i marginal diaSources.",
944 assocResults.nUpdatedDiaObjects,
945 assocResults.nUnassociatedDiaObjects,
946 createResults.nNewDiaObjects,
947 len(createResults.marginalDiaSources),
948 )
949 if createResults.nNewDiaObjects > self.config.maxNewDiaObjects > 0:
950 raise TooManyDiaObjectsError(nNewDiaObjects=createResults.nNewDiaObjects,
951 threshold=self.config.maxNewDiaObjects)
952 return pipeBase.Struct(associatedDiaSources=associatedDiaSources,
953 newDiaObjects=createResults.newDiaObjects,
954 associatedSsSources=associatedSsSources,
955 unassociatedSsObjects=unassociatedSsObjects,
956 newDiaSources=createResults.diaSources,
957 marginalDiaSources=createResults.marginalDiaSources
958 )
960 def standardizeDataFrame(self, dataFrame, tableName, nullColumns=None):
961 """Convert a catalog to SDM schema format with NULL ID values
963 Parameters
964 ----------
965 dataFrame : `pandas.DataFrame`
966 The catalog to standardize/convert
967 tableName : `str`
968 Schema name of table to which this dataFrame should be standardized
969 nullColumns : `list` of `str`, optional
970 List of column names to check for default values of 0 that should be
971 replaced with `NULL`.
974 Returns
975 -------
976 standardizedAssociatedDiaSources : pandas.DataFrame
977 The standardized DiaSource catalog
978 """
979 standardizedDataFrame = convertDataFrameToSdmSchema(self.schema,
980 dataFrame,
981 tableName=tableName,
982 skipIndex=True)
984 def _setNullColumn(dataframe, colName):
985 """Set specified columns with default values of 0 to NULL."""
986 dataframe.loc[dataframe[colName] == 0, colName] = pd.NA
988 if nullColumns is not None:
989 for colName in nullColumns:
990 _setNullColumn(standardizedDataFrame, colName)
991 return standardizedDataFrame
993 def standardizeTable(self, table, tableName, nullColumns=None):
994 """Convert a catalog to SDM schema format with NULL ID values
996 Parameters
997 ----------
998 table : `astropy.table.Table`
999 The catalog to standardize/convert
1000 tableName : `str`
1001 Schema name of table to which this dataFrame should be standardized
1002 nullColumns : `list` of `str`, optional
1003 List of column names to check for default values of 0 that should be
1004 replaced with `NULL`.
1007 Returns
1008 -------
1009 standardizedAssociatedDiaSources : pandas.DataFrame
1010 The standardized DiaSource catalog
1011 """
1012 dataFrame = table.to_pandas()
1013 standardizedDataFrame = self.standardizeDataFrame(dataFrame, tableName, nullColumns=nullColumns)
1014 return standardizedDataFrame
1016 @timeMethod
1017 def mergeAssociatedCatalogs(self, preloadedDiaSources, associatedDiaSources, diaObjects, newDiaObjects,
1018 diffIm):
1019 """Merge the associated diaSource and diaObjects to their previous history.
1021 Parameters
1022 ----------
1023 preloadedDiaSources : `pandas.DataFrame`
1024 Previously detected DiaSources, loaded from the APDB.
1025 associatedDiaSources : `pandas.DataFrame`
1026 Associated DiaSources with DiaObjects.
1027 diaObjects : `pandas.DataFrame`
1028 Table of DiaObjects from preloaded DiaObjects.
1029 newDiaObjects : `pandas.DataFrame`
1030 Table of new DiaObjects after association.
1032 Raises
1033 ------
1034 RuntimeError
1035 Raised if duplicate DiaObjects or duplicate DiaSources are found.
1037 Returns
1038 -------
1039 mergedDiaSourceHistory : `pandas.DataFrame`
1040 The combined catalog, with all of the rows from preloadedDiaSources
1041 catalog ordered before the rows of associatedDiaSources catalog.
1042 mergedDiaObjects : `pandas.DataFrame`
1043 Table of new DiaObjects merged with their history.
1044 updatedDiaObjectIds : `numpy.Array`
1045 Object Id's from associated diaSources.
1046 """
1047 # Index the DiaSource catalog for this visit after all associations
1048 # have been made.
1049 updatedDiaObjectIds = associatedDiaSources["diaObjectId"][
1050 associatedDiaSources["diaObjectId"] != 0].to_numpy()
1051 associatedDiaSources.set_index(["diaObjectId",
1052 "band",
1053 "diaSourceId"],
1054 drop=False,
1055 inplace=True)
1057 # Append new DiaObjects and DiaSources to their previous history.
1058 if diaObjects.empty:
1059 mergedDiaObjects = newDiaObjects.set_index("diaObjectId", drop=False)
1060 elif not newDiaObjects.empty:
1061 mergedDiaObjects = pd.concat(
1062 [diaObjects,
1063 newDiaObjects.set_index("diaObjectId", drop=False)],
1064 sort=True)
1065 else:
1066 mergedDiaObjects = diaObjects
1068 # Exclude any objects that are off the image after association.
1069 mergedDiaObjects, updatedDiaObjectIds = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(),
1070 mergedDiaObjects,
1071 diaObjectIds=updatedDiaObjectIds,
1072 buffer=-1)
1073 if self.testDataFrameIndex(mergedDiaObjects):
1074 raise RuntimeError(
1075 "Duplicate DiaObjects created after association. This is "
1076 "likely due to re-running data with an already populated "
1077 "Apdb. If this was not the case then there was an unexpected "
1078 "failure in Association while matching and creating new "
1079 "DiaObjects and should be reported. Exiting.")
1081 mergedDiaSourceHistory = self.mergeCatalogs(preloadedDiaSources, associatedDiaSources,
1082 tableName="DiaSource")
1084 # Test for DiaSource duplication first. If duplicates are found,
1085 # this likely means this is duplicate data being processed and sent
1086 # to the Apdb.
1087 if self.testDataFrameIndex(mergedDiaSourceHistory):
1088 raise RuntimeError(
1089 "Duplicate DiaSources found after association and merging "
1090 "with history. This is likely due to re-running data with an "
1091 "already populated Apdb. If this was not the case then there "
1092 "was an unexpected failure in Association while matching "
1093 "sources to objects, and should be reported. Exiting.")
1094 # Finally, update the diaObject table with the number of associated diaSources
1095 mergedUpdatedDiaObjects = self.updateObjectTable(mergedDiaObjects, mergedDiaSourceHistory)
1096 return (mergedDiaSourceHistory, mergedUpdatedDiaObjects, updatedDiaObjectIds)
1098 @timeMethod
1099 def runForcedMeasurement(self, diaObjects, updatedDiaObjects, exposure, diffIm, idGenerator):
1100 """Forced Source Measurement
1102 Forced photometry on the difference and calibrated exposures using the
1103 new and updated DiaObject locations.
1105 Parameters
1106 ----------
1107 diaObjects : `pandas.DataFrame`
1108 Catalog of DiaObjects.
1109 updatedDiaObjects : `pandas.DataFrame`
1110 Catalog of updated DiaObjects.
1111 exposure : `lsst.afw.image.ExposureF`
1112 Calibrated exposure differenced with a template to create
1113 ``diffIm``.
1114 diffIm : `lsst.afw.image.ExposureF`
1115 Difference image exposure in which the sources in ``diaSourceCat``
1116 were detected.
1117 idGenerator : `lsst.meas.base.IdGenerator`
1118 Object that generates source IDs and random number generator seeds.
1120 Returns
1121 -------
1122 diaForcedSources : `pandas.DataFrame`
1123 Catalog of calibrated forced photometered fluxes on both the
1124 difference and direct images at DiaObject locations.
1125 """
1126 # Force photometer on the Difference and Calibrated exposures using
1127 # the new and updated DiaObject locations.
1128 diaForcedSources = self.diaForcedSource.run(
1129 diaObjects,
1130 updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(),
1131 exposure,
1132 diffIm,
1133 idGenerator=idGenerator)
1134 self.log.info(f"Updating {len(diaForcedSources)} diaForcedSources in the APDB")
1135 diaForcedSources = convertDataFrameToSdmSchema(self.schema, diaForcedSources,
1136 tableName="DiaForcedSource", skipIndex=True)
1137 return diaForcedSources
1139 @timeMethod
1140 def loadRefreshedDiaObjects(self, region, preloadedDiaObjects):
1141 """Load DiaObjects from the Apdb based on their HTM location.
1143 Parameters
1144 ----------
1145 region : `sphgeom.Region`
1146 Region containing the current exposure to load diaObjects from the
1147 APDB.
1148 preloadedDiaObjects : `pandas.DataFrame`
1149 Previously detected DiaObjects, loaded from the APDB.
1151 Returns
1152 -------
1153 diaObjects : `pandas.DataFrame`
1154 DiaObjects loaded from the Apdb that are within the area defined
1155 by ``pixelRanges``.
1156 """
1157 angleMargin = lsst.sphgeom.Angle.fromDegrees(self.config.angleMargin/3600.)
1158 diaObjects = self.apdb.getDiaObjects(paddedRegion(region, angleMargin))
1160 diaObjects.set_index("diaObjectId", drop=False, inplace=True)
1161 if diaObjects.index.has_duplicates:
1162 self.log.warning(
1163 "Duplicate DiaObjects loaded from the Apdb. This may cause "
1164 "downstream pipeline issues. Dropping duplicated rows")
1165 # Drop duplicates via index and keep the first appearance.
1166 diaObjects = diaObjects.groupby(diaObjects.index).first()
1167 self.log.info("Loaded %d DiaObjects", len(diaObjects))
1168 refreshedDiaObjects = convertDataFrameToSdmSchema(self.schema, diaObjects, tableName="DiaObject",
1169 skipIndex=True)
1171 refreshedIsInPreloaded = refreshedDiaObjects.index.isin(preloadedDiaObjects.index)
1172 preloadedIsInRefreshed = preloadedDiaObjects.index.isin(refreshedDiaObjects.index)
1173 nUniqueRefreshed = (~refreshedIsInPreloaded).sum()
1174 nUniquePreloaded = (~preloadedIsInRefreshed).sum()
1175 if nUniqueRefreshed > 0:
1176 self.log.info("Reloading the diaObject table during association yielded "
1177 "an additional %d objects over the %d preloaded diaObjects.",
1178 nUniqueRefreshed, len(preloadedDiaObjects))
1179 if nUniquePreloaded == 0:
1180 return refreshedDiaObjects
1181 elif nUniqueRefreshed == 0:
1182 return preloadedDiaObjects
1183 else:
1184 # We can get "preloaded" diaObjects that don't appear in the APDB with
1185 # the CI datasets in ap_verify, since those are stored as a dataset and
1186 # an empty APDB is created. In that case, combine the two catalogs.
1187 # Precedence is given to the refreshed diaObject catalog.
1188 return pd.concat([refreshedDiaObjects, preloadedDiaObjects.loc[~preloadedIsInRefreshed]])
1190 @timeMethod
1191 def writeToApdb(self, updatedDiaObjects, associatedDiaSources, diaForcedSources):
1192 """Write to the Alert Production Database (Apdb).
1194 Store DiaSources, updated DiaObjects, and DiaForcedSources in the
1195 Alert Production Database (Apdb).
1197 Parameters
1198 ----------
1199 updatedDiaObjects : `pandas.DataFrame`
1200 Catalog of updated DiaObjects.
1201 associatedDiaSources : `pandas.DataFrame`
1202 Associated DiaSources with DiaObjects.
1203 diaForcedSources : `pandas.DataFrame`
1204 Catalog of calibrated forced photometered fluxes on both the
1205 difference and direct images at DiaObject locations.
1207 Returns
1208 -------
1209 validityStart : `lsst.daf.base.DateTime`
1210 Time at which the APDB was updated.
1211 """
1212 # Store DiaSources, updated DiaObjects, and DiaForcedSources in the
1213 # Apdb.
1214 # Drop empty columns that are nullable in the APDB.
1215 diaObjectStore = dropEmptyColumns(self.schema, updatedDiaObjects, tableName="DiaObject")
1216 diaSourceStore = dropEmptyColumns(self.schema, associatedDiaSources, tableName="DiaSource")
1217 diaForcedSourceStore = dropEmptyColumns(self.schema, diaForcedSources, tableName="DiaForcedSource")
1219 validityStart = DateTime.now()
1220 self.apdb.store(
1221 validityStart.toAstropy(),
1222 diaObjectStore,
1223 diaSourceStore,
1224 diaForcedSourceStore)
1225 self.log.info("APDB updated.")
1227 return validityStart
1229 def testDataFrameIndex(self, df):
1230 """Test the sorted DataFrame index for duplicates.
1232 Wrapped as a separate function to allow for mocking of the this task
1233 in unittesting. Default of a mock return for this test is True.
1235 Parameters
1236 ----------
1237 df : `pandas.DataFrame`
1238 DataFrame to text.
1240 Returns
1241 -------
1242 `bool`
1243 True if DataFrame contains duplicate rows.
1244 """
1245 return df.index.has_duplicates
1247 def _add_association_meta_data(self,
1248 nUpdatedDiaObjects,
1249 nUnassociatedDiaObjects,
1250 nNewDiaObjects,
1251 nTotalSsObjects,
1252 nAssociatedSsObjects):
1253 """Store summaries of the association step in the task metadata.
1255 Parameters
1256 ----------
1257 nUpdatedDiaObjects : `int`
1258 Number of previous DiaObjects associated and updated in this
1259 ccdVisit.
1260 nUnassociatedDiaObjects : `int`
1261 Number of previous DiaObjects that were not associated or updated
1262 in this ccdVisit.
1263 nNewDiaObjects : `int`
1264 Number of newly created DiaObjects for this ccdVisit.
1265 nTotalSsObjects : `int`
1266 Number of SolarSystemObjects within the observable detector
1267 area.
1268 nAssociatedSsObjects : `int`
1269 Number of successfully associated SolarSystemObjects.
1270 """
1271 self.metadata['numUpdatedDiaObjects'] = nUpdatedDiaObjects
1272 self.metadata['numUnassociatedDiaObjects'] = nUnassociatedDiaObjects
1273 self.metadata['numNewDiaObjects'] = nNewDiaObjects
1274 self.metadata['numTotalSolarSystemObjects'] = nTotalSsObjects
1275 self.metadata['numAssociatedSsObjects'] = nAssociatedSsObjects
1277 def purgeDiaObjects(self, bbox, wcs, diaObjCat, diaObjectIds=None, buffer=0):
1278 """Drop diaObjects that are outside the exposure bounding box.
1280 Parameters
1281 ----------
1282 bbox : `lsst.geom.Box2I`
1283 Bounding box of the exposure.
1284 wcs : `lsst.afw.geom.SkyWcs`
1285 Coordinate system definition (wcs) for the exposure.
1286 diaObjCat : `pandas.DataFrame`
1287 DiaObjects loaded from the Apdb.
1288 buffer : `int`, optional
1289 Width, in pixels, to pad the exposure bounding box.
1291 Returns
1292 -------
1293 diaObjCat : `pandas.DataFrame`
1294 DiaObjects loaded from the Apdb, restricted to the exposure
1295 bounding box.
1296 """
1297 try:
1298 bbox.grow(buffer)
1299 raVals = diaObjCat.ra.to_numpy()
1300 decVals = diaObjCat.dec.to_numpy()
1301 xVals, yVals = wcs.skyToPixelArray(raVals, decVals, degrees=True)
1302 selector = bbox.contains(xVals, yVals)
1303 nPurged = np.sum(~selector)
1304 if nPurged > 0:
1305 if diaObjectIds is not None:
1306 # We also need to drop any of the associated IDs if this runs after association
1307 purgedIds = diaObjCat[~selector].diaObjectId
1308 diaObjectIds = diaObjectIds[~np.isin(diaObjectIds, purgedIds)]
1309 self.log.info("Dropped %i diaObjects that were outside the bbox "
1310 "after association, leaving %i in the catalog",
1311 nPurged, len(diaObjCat) - nPurged)
1312 else:
1313 self.log.info("Dropped %i diaObjects that were outside the padded bbox "
1314 "before association, leaving %i in the catalog",
1315 nPurged, len(diaObjCat) - nPurged)
1316 diaObjCat = diaObjCat[selector].copy()
1317 except Exception as e:
1318 self.log.warning("Error attempting to check diaObject history: %s", e)
1319 return diaObjCat, diaObjectIds
1321 def mergeCatalogs(self, originalCatalog, newCatalog, tableName):
1322 """Combine two catalogs, ensuring that the new catalog conforms to the schema.
1324 Parameters
1325 ----------
1326 originalCatalog : `pandas.DataFrame`
1327 The original catalog to be added to.
1328 newCatalog : `pandas.DataFrame`
1329 The new catalog to append to `originalCatalog`
1330 tableName : `str`
1331 Name of the table in the schema to use.
1333 Returns
1334 -------
1335 mergedCatalog : `pandas.DataFrame`
1336 The combined catalog, with all of the rows from ``originalCatalog``
1337 ordered before the rows of ``newCatalog``
1338 """
1339 if len(newCatalog) > 0:
1340 catalog = convertDataFrameToSdmSchema(self.schema, newCatalog,
1341 tableName=tableName, skipIndex=True)
1343 mergedCatalog = pd.concat([originalCatalog, catalog], sort=True)
1344 else:
1345 mergedCatalog = pd.concat([originalCatalog], sort=True)
1346 return mergedCatalog.loc[:, originalCatalog.columns]
1348 def updateObjectTable(self, diaObjects, diaSources):
1349 """Update the diaObject table with the new diaSource records.
1351 Parameters
1352 ----------
1353 diaObjects : `pandas.DataFrame`
1354 Table of new DiaObjects merged with their history.
1355 diaSources : `pandas.DataFrame`
1356 The combined preloaded and associated diaSource catalog.
1358 Returns
1359 -------
1360 updatedDiaObjects : `pandas.DataFrame`
1361 Table of DiaObjects updated with the number of associated DiaSources
1362 """
1363 nDiaSources = diaSources[["diaSourceId"]].groupby("diaObjectId").agg(len)
1364 nDiaSources.rename({"diaSourceId": "nDiaSources"}, errors="raise", axis="columns", inplace=True)
1365 del diaObjects["nDiaSources"]
1366 updatedDiaObjects = diaObjects.join(nDiaSources, how="left")
1367 return updatedDiaObjects
1369 @staticmethod
1370 def checkTableIndex(dataFrame, index):
1371 if dataFrame.index.name is None:
1372 # The expected index may or may not be set, depending on whether
1373 # the table was written originally as a DataFrame or something else
1374 # Parquet-friendly.
1375 dataFrame.set_index(index, drop=False, inplace=True)