Coverage for python / lsst / ap / association / packageAlerts.py: 15%
299 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:39 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ("PackageAlertsConfig", "PackageAlertsTask")
24import io
25import os
26import sys
27import time
29from astropy import wcs
30import astropy.units as u
31from astropy.nddata import CCDData, VarianceUncertainty
32import pandas as pd
33import struct
34import fastavro
35from confluent_kafka.cimpl import KafkaError
37# confluent_kafka is not in the standard Rubin environment as it is a third
38# party package and is only needed when producing alerts.
39try:
40 import confluent_kafka
41 from confluent_kafka import KafkaException
42 from confluent_kafka.admin import AdminClient
43except ImportError:
44 confluent_kafka = None
46import lsst.alert.packet as alertPack
47from lsst.afw.detection import InvalidPsfError
48import lsst.afw.geom as afwGeom
49import lsst.geom as geom
50import lsst.pex.config as pexConfig
51from lsst.pex.exceptions import InvalidParameterError
52import lsst.pipe.base as pipeBase
53from lsst.pipe.tasks.associationUtils import ss_object_id_to_obj_id
54import lsst.utils.logging
55from lsst.utils.timer import timeMethod
58class PackageAlertsConfig(pexConfig.Config):
59 """Config class for AssociationTask.
60 """
61 schemaFile = pexConfig.Field(
62 dtype=str,
63 doc="Schema definition file URI for the avro alerts.",
64 default=str(alertPack.get_uri_to_latest_schema())
65 )
66 minCutoutSize = pexConfig.RangeField(
67 dtype=int,
68 min=0,
69 max=1000,
70 default=30,
71 doc="Dimension of the square image cutouts to package in the alert."
72 )
73 maxCutoutSize = pexConfig.RangeField(
74 dtype=int,
75 min=0,
76 max=1000,
77 default=102,
78 doc="Dimension of the square image cutouts to package in the alert. The"
79 "default size comes from the max trail length in arcseconds "
80 "(10 deg/day) for a 30 second observation divided by the"
81 "arcseconds per pixel (0.2), which is 62.5 pixels. The effective"
82 "size of the psf (40 pixels) is then added for a total of 102 pixels. "
83 )
84 alertWriteLocation = pexConfig.Field(
85 dtype=str,
86 doc="Location to write alerts to.",
87 default=os.path.join(os.getcwd(), "alerts"),
88 )
90 doProduceAlerts = pexConfig.Field(
91 dtype=bool,
92 doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.",
93 default=False,
94 )
96 doWriteAlerts = pexConfig.Field(
97 dtype=bool,
98 doc="Write alerts to disk if true.",
99 default=False,
100 )
102 doWriteFailedAlerts = pexConfig.Field(
103 dtype=bool,
104 doc="If an alert cannot be sent when doProduceAlerts is set, "
105 "write it to disk for debugging purposes.",
106 default=False,
107 )
109 maxTimeout = pexConfig.Field(
110 dtype=float,
111 doc="Sets the maximum time in seconds to wait for the alert stream "
112 "broker to respond to a query before timing out.",
113 default=15.0,
114 )
116 deliveryTimeout = pexConfig.Field(
117 dtype=float,
118 doc="Sets the time to wait for the producer to wait to deliver an "
119 "alert in milliseconds.",
120 default=1200.0,
121 )
123 useAveragePsf = pexConfig.Field(
124 dtype=bool,
125 doc="Use the average PSF for the image, instead of the PSF for each cutout. "
126 "This option is much less accurate, but much faster.",
127 default=False,
128 )
131class PackageAlertsTask(pipeBase.Task):
132 """Tasks for packaging Dia and Pipelines data into Avro alert packages.
133 """
134 ConfigClass = PackageAlertsConfig
135 _DefaultName = "packageAlerts"
137 _scale = (1.0 * geom.arcseconds).asDegrees()
139 def __init__(self, **kwargs):
140 super().__init__(**kwargs)
141 self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile)
142 os.makedirs(self.config.alertWriteLocation, exist_ok=True)
144 if self.config.doProduceAlerts:
145 if confluent_kafka is not None:
146 self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
147 if not self.password:
148 raise ValueError("Kafka password environment variable was not set.")
149 self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
150 if not self.username:
151 raise ValueError("Kafka username environment variable was not set.")
152 self.server = os.getenv("AP_KAFKA_SERVER")
153 if not self.server:
154 raise ValueError("Kafka server environment variable was not set.")
155 self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")
156 if not self.kafkaTopic:
157 raise ValueError("Kafka topic environment variable was not set.")
159 # confluent_kafka configures all of its classes with dictionaries. This one
160 # sets up the bare minimum that is needed.
161 self.kafkaConfig = {
162 # This is the URL to use to connect to the Kafka cluster.
163 "bootstrap.servers": self.server,
164 # These next two properties tell the Kafka client about the specific
165 # authentication and authorization protocols that should be used when
166 # connecting.
167 "security.protocol": "SASL_PLAINTEXT",
168 "sasl.mechanisms": "SCRAM-SHA-512",
169 # The sasl.username and sasl.password are passed through over
170 # SCRAM-SHA-512 auth to connect to the cluster. The username is not
171 # sensitive, but the password is (of course) a secret value which
172 # should never be committed to source code.
173 "sasl.username": self.username,
174 "sasl.password": self.password,
175 # Batch size limits the largest size of a kafka alert that can be sent.
176 # We set the batch size to 2 Mb.
177 "batch.size": 2097152,
178 "linger.ms": 5,
179 "delivery.timeout.ms": self.config.deliveryTimeout,
180 # Compression options are snappy, lz4, zstd, and gzip.
181 "compression.type": 'snappy'
182 }
183 self.kafkaAdminConfig = {
184 # This is the URL to use to connect to the Kafka cluster.
185 "bootstrap.servers": self.server,
186 # These next two properties tell the Kafka client about the specific
187 # authentication and authorization protocols that should be used when
188 # connecting.
189 "security.protocol": "SASL_PLAINTEXT",
190 "sasl.mechanisms": "SCRAM-SHA-512",
191 # The sasl.username and sasl.password are passed through over
192 # SCRAM-SHA-512 auth to connect to the cluster. The username is not
193 # sensitive, but the password is (of course) a secret value which
194 # should never be committed to source code.
195 "sasl.username": self.username,
196 "sasl.password": self.password,
197 }
199 self._server_check()
200 self.producer = confluent_kafka.Producer(**self.kafkaConfig)
202 else:
203 raise RuntimeError("Produce alerts is set but confluent_kafka is not present in "
204 "the environment. Alerts will not be sent to the alert stream.")
206 @timeMethod
207 def run(self,
208 diaSourceCat,
209 diaObjectCat,
210 diaSrcHistory,
211 diaForcedSources,
212 diffIm,
213 calexp,
214 template,
215 ssSrc=None,
216 doRunForcedMeasurement=True,
217 forcedSourceHistoryThreshold=0,
218 ):
219 """Package DiaSources/Object and exposure data into Avro alerts.
221 Alerts can be sent to the alert stream if ``doProduceAlerts`` is set
222 and written to disk if ``doWriteAlerts`` is set. Both can be set at the
223 same time, and are independent of one another.
225 Writes Avro alerts to a location determined by the
226 ``alertWriteLocation`` configurable.
228 Parameters
229 ----------
230 diaSourceCat : `pandas.DataFrame`
231 New DiaSources to package. DataFrame should be indexed on
232 ``["diaObjectId", "band", "diaSourceId"]``
233 diaObjectCat : `pandas.DataFrame`
234 New and updated DiaObjects matched to the new DiaSources. DataFrame
235 is indexed on ``["diaObjectId"]``
236 diaSrcHistory : `pandas.DataFrame`
237 12 month history of DiaSources matched to the DiaObjects. Excludes
238 the newest DiaSource and is indexed on
239 ``["diaObjectId", "band", "diaSourceId"]``
240 diaForcedSources : `pandas.DataFrame`
241 12 month history of DiaForcedSources matched to the DiaObjects.
242 ``["diaObjectId"]``
243 diffIm : `lsst.afw.image.ExposureF`
244 Difference image the sources in ``diaSourceCat`` were detected in.
245 calexp : `lsst.afw.image.ExposureF`
246 Calexp used to create the ``diffIm``.
247 template : `lsst.afw.image.ExposureF` or `None`
248 Template image used to create the ``diffIm``.
249 ssSrc : `astropy.table.Table`, optional
250 Solar system specific information for diaSources associated to ssObjects.
251 doRunForcedMeasurement : `bool`, optional
252 Flag to indicate whether forced measurement was run.
253 This should only be turned off for debugging purposes.
254 Added to allow disabling forced sources for performance
255 reasons during the ops rehearsal.
256 forcedSourceHistoryThreshold : `int`, optional
257 Minimum number of detections of a diaObject required
258 to run forced photometry. Set to 1 to include all diaObjects.
259 """
261 alerts = []
262 self._patchDiaSources(diaSourceCat)
263 self._patchDiaSources(diaSrcHistory)
264 detector = diffIm.detector.getId()
265 visit = diffIm.visitInfo.id
266 observation_reason = diffIm.visitInfo.getObservationReason()
267 target_name = diffIm.visitInfo.getObject()
268 midpoint_unix = diffIm.visitInfo.date.toAstropy().tai.unix
269 exposure_time = diffIm.visitInfo.exposureTime
270 diffImPhotoCalib = diffIm.getPhotoCalib()
271 calexpPhotoCalib = calexp.getPhotoCalib()
272 templatePhotoCalib = template.getPhotoCalib()
273 diffImPsf = self._computePsf(diffIm, diffIm.psf.getAveragePosition())
274 sciencePsf = self._computePsf(calexp, calexp.psf.getAveragePosition())
275 templatePsf = self._computePsf(template, template.psf.getAveragePosition())
277 # cast to object; works around pandas coercing Series to nullable
278 # Float64 type when selecting single rows below
279 diaSourceCat = diaSourceCat.astype('object')
280 diaObjectCat = diaObjectCat.astype('object')
281 diaSrcHistory = diaSrcHistory.astype('object')
282 diaForcedSources = diaForcedSources.astype('object')
284 if ssSrc is not None:
285 ssSrc = ssSrc.set_index('diaSourceId')
286 ssSrc = ssSrc.astype('object')
288 n_sources = len(diaSourceCat)
289 self.log.info("Packaging alerts for %d DiaSources.", n_sources)
290 # Log every 10 seconds as proof of liveness.
291 loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0)
293 for srcIndex, diaSource in diaSourceCat.iterrows():
294 loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources)
296 alertId = diaSource["diaSourceId"]
297 if srcIndex[0] == 0:
298 diaObject = None
299 objSourceHistory = None
300 objDiaForcedSources = None
301 ssSource = ssSrc.loc[[alertId]].to_dict("records")[0]
302 else:
303 ssSource = None
304 diaObject = diaObjectCat.loc[srcIndex[0]]
305 if diaObject["nDiaSources"] > 1:
306 objSourceHistory = diaSrcHistory.loc[srcIndex[0]]
307 else:
308 objSourceHistory = None
309 if doRunForcedMeasurement and diaObject["nDiaSources"] >= forcedSourceHistoryThreshold:
310 if srcIndex[0] in diaForcedSources.diaObjectId:
311 objDiaForcedSources = diaForcedSources.loc[srcIndex[0]]
312 else:
313 # Send empty table with correct columns
314 objDiaForcedSources = diaForcedSources.loc[[]]
315 else:
316 # Send empty table with correct columns
317 objDiaForcedSources = diaForcedSources.loc[[]]
319 sphPoint = geom.SpherePoint(diaSource["ra"],
320 diaSource["dec"],
321 geom.degrees)
322 pixelPoint = geom.Point2D(diaSource["x"],
323 diaSource["y"])
325 cutoutExtent = self.createDiaSourceExtent(diaSource["bboxSize"])
326 diffImCutout = self.createCcdDataCutout(
327 diffIm,
328 sphPoint,
329 pixelPoint,
330 cutoutExtent,
331 diffImPhotoCalib,
332 diaSource["diaSourceId"],
333 averagePsf=diffImPsf,
334 rotPa=diffIm.visitInfo.boresightRotAngle.asDegrees())
335 calexpCutout = self.createCcdDataCutout(
336 calexp,
337 sphPoint,
338 pixelPoint,
339 cutoutExtent,
340 calexpPhotoCalib,
341 diaSource["diaSourceId"],
342 averagePsf=sciencePsf,
343 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees())
344 templateCutout = self.createCcdDataCutout(
345 template,
346 sphPoint,
347 pixelPoint,
348 cutoutExtent,
349 templatePhotoCalib,
350 diaSource["diaSourceId"],
351 averagePsf=templatePsf,
352 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees())
354 alerts.append(
355 self.makeAlertDict(diaSource["diaSourceId"],
356 observation_reason,
357 target_name,
358 diaSource,
359 diaObject,
360 objSourceHistory,
361 objDiaForcedSources,
362 diffImCutout,
363 calexpCutout,
364 templateCutout,
365 ssSource))
367 if self.config.doProduceAlerts:
368 self.log.info("Producing alerts to %s.", self.kafkaTopic)
369 self.produceAlerts(alerts, visit, detector, midpoint_unix, exposure_time)
370 else:
371 # Fill values for the metadata so that downstream metrics don't crash
372 self.metadata['visit_midpoint'] = midpoint_unix
373 self.metadata['produce_end_timestamp'] = -1
374 self.metadata['produce_start_timestamp'] = -1
375 self.metadata['alert_timing_since_shutter_close'] = -1
376 self.metadata['total_alerts'] = -1
378 if self.config.doWriteAlerts:
379 avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro")
380 self.log.info("Writing alerts to %s.", avro_path)
381 with open(avro_path, "wb") as f:
382 self.alertSchema.store_alerts(f, alerts)
384 def _patchDiaSources(self, diaSources):
385 """Add the ``programId`` column to the data.
387 Parameters
388 ----------
389 diaSources : `pandas.DataFrame`
390 DataFrame of DiaSources to patch.
391 """
392 diaSources["programId"] = 0
394 def createDiaSourceExtent(self, bboxSize):
395 """Create an extent for a box for the cutouts given the size of the
396 square BBox that covers the source footprint.
398 Parameters
399 ----------
400 bboxSize : `int`
401 Size of a side of the square bounding box in pixels.
403 Returns
404 -------
405 extent : `lsst.geom.Extent2I`
406 Geom object representing the size of the bounding box.
407 """
408 if bboxSize < self.config.minCutoutSize:
409 extent = geom.Extent2I(self.config.minCutoutSize,
410 self.config.minCutoutSize)
411 elif bboxSize > self.config.maxCutoutSize:
412 extent = geom.Extent2I(self.config.maxCutoutSize,
413 self.config.maxCutoutSize)
414 else:
415 extent = geom.Extent2I(bboxSize, bboxSize)
416 return extent
418 @timeMethod
419 def produceAlerts(self, alerts, visit, detector, midpoint_unix, exposure_time):
420 """Serialize alerts and send them to the alert stream using
421 confluent_kafka's producer.
423 Parameters
424 ----------
425 alerts : `dict`
426 Dictionary of alerts to be sent to the alert stream.
427 visit, detector : `int`
428 Visit and detector ids of these alerts. Used to write out alerts
429 which fail to be sent to the alert stream.
430 """
431 schema_id = self.alertSchema.get_schema_id()
432 # Serialize and send alerts with the producer timestamp.
433 total_alerts = 0
434 produce_start_timestamp = time.time()
435 try:
436 for alert in alerts:
437 alertBytes = self._serializeAlert(alert,
438 schema=self.alertSchema.definition, schema_id=schema_id)
439 try:
440 timestamp = time.time()*1000 # Current time in milliseconds
441 headers = [("producer_timestamp", str(timestamp).encode('utf-8'))]
442 self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback,
443 headers=headers)
444 self.producer.flush()
445 total_alerts += 1
447 except KafkaException as e:
448 self.log.warning('Kafka error: %s, message was %s bytes',
449 e, sys.getsizeof(alertBytes))
451 if self.config.doWriteFailedAlerts:
452 with open(os.path.join(self.config.alertWriteLocation,
453 f"{visit}_{detector}_{alert['diaSourceId']}.avro"), "wb") as f:
454 f.write(alertBytes)
456 self.producer.flush()
457 finally:
458 produce_end_timestamp = time.time() # Current time in seconds
459 total_time = produce_end_timestamp - (midpoint_unix + exposure_time/2.0)
460 self.metadata['visit_midpoint'] = midpoint_unix
461 self.metadata['produce_end_timestamp'] = produce_end_timestamp
462 self.metadata['produce_start_timestamp'] = produce_start_timestamp
463 self.metadata['alert_timing_since_shutter_close'] = total_time
464 self.metadata['total_alerts'] = total_alerts
465 # A single log message is easier for Loki to parse than timeMethod's start+end pairs.
466 self.log.verbose("Producing alerts: Took %.4f seconds",
467 produce_end_timestamp - produce_start_timestamp)
469 self.log.info(f"Total time since shutter close to produce alerts for"
470 f" visit {visit} detector {detector}: {total_time} seconds")
472 def createCcdDataCutout(self, image, skyCenter, pixelCenter, extent, photoCalib, srcId, averagePsf=None,
473 rotPa=None):
474 """Grab an image as a cutout and return a calibrated CCDData image.
476 Parameters
477 ----------
478 image : `lsst.afw.image.ExposureF`
479 Image to pull cutout from.
480 skyCenter : `lsst.geom.SpherePoint`
481 Center point of DiaSource on the sky.
482 pixelCenter : `lsst.geom.Point2D`
483 Pixel center of DiaSource on the sky.
484 extent : `lsst.geom.Extent2I`
485 Bounding box to cutout from the image.
486 photoCalib : `lsst.afw.image.PhotoCalib`
487 Calibrate object of the image the cutout is cut from.
488 srcId : `int`
489 Unique id of DiaSource. Used for when an error occurs extracting
490 a cutout.
491 averagePsf : `numpy.array`, optional
492 Average PSF to attach to the cutout.
493 Used if ``self.config.useAveragePsf`` is set.
494 cutoutType : str, optional
495 Type of cutout being created ('difference', 'template', or 'science')
497 Returns
498 -------
499 ccdData : `astropy.nddata.CCDData` or `None`
500 CCDData object storing the calibrate information from the input
501 difference or template image.
502 """
503 imBBox = image.getBBox()
504 if not geom.Box2D(image.getBBox()).contains(pixelCenter):
505 self.log.warning(
506 "DiaSource id=%i centroid lies at pixel (%.2f, %.2f) "
507 "which is outside the Exposure with bounding box "
508 "((%i, %i), (%i, %i)). Returning None for cutout...",
509 srcId, pixelCenter.x, pixelCenter.y,
510 imBBox.minX, imBBox.maxX, imBBox.minY, imBBox.maxY)
511 return None
512 # Catch errors in retrieving the cutout.
513 try:
514 cutout = image.getCutout(pixelCenter, extent)
515 except InvalidParameterError:
516 self.log.warning(
517 "Failed to retrieve cutout from image for DiaSource with "
518 "id=%i. InvalidParameterError thrown during cutout "
519 "creation. Returning None for cutout..."
520 % srcId)
521 if self.config.useAveragePsf:
522 if averagePsf is None:
523 self.log.info("Using source id=%i to compute the average PSF.", srcId)
524 averagePsf = self._computePsf(image, pixelCenter, srcId=srcId)
525 cutoutPsf = averagePsf
526 else:
527 cutoutPsf = self._computePsf(image, pixelCenter, srcId=srcId)
529 # Find the value of the bottom corner of our cutout's BBox and
530 # subtract 1 so that the CCDData cutout position value will be
531 # [1, 1].
532 cutOutMinX = cutout.getBBox().minX - 1
533 cutOutMinY = cutout.getBBox().minY - 1
534 center = cutout.getWcs().skyToPixel(skyCenter)
535 calibCutout = photoCalib.calibrateImage(cutout.getMaskedImage())
537 cutoutWcs = wcs.WCS(naxis=2)
538 cutoutWcs.array_shape = (cutout.getBBox().getWidth(),
539 cutout.getBBox().getWidth())
540 cutoutWcs.wcs.crpix = [center.x - cutOutMinX, center.y - cutOutMinY]
541 cutoutWcs.wcs.crval = [skyCenter.getRa().asDegrees(),
542 skyCenter.getDec().asDegrees()]
543 cutoutWcs.wcs.cd = self.makeLocalTransformMatrix(cutout.getWcs(),
544 center,
545 skyCenter)
546 ccdData = CCDData(
547 data=calibCutout.getImage().array,
548 uncertainty=VarianceUncertainty(calibCutout.getVariance().array),
549 flags=calibCutout.getMask().array,
550 wcs=cutoutWcs,
551 psf=cutoutPsf,
552 meta={"cutMinX": cutOutMinX,
553 "cutMinY": cutOutMinY},
554 unit=u.nJy)
556 ccdData.header['ROTPA'] = (rotPa, 'Pos angle in deg of focal plane +Y wrt North')
558 return ccdData
560 def makeLocalTransformMatrix(self, wcs, center, skyCenter):
561 """Create a local, linear approximation of the wcs transformation
562 matrix.
564 The approximation is created as if the center is at RA=0, DEC=0. All
565 comparing x,y coordinate are relative to the position of center. Matrix
566 is initially calculated with units arcseconds and then converted to
567 degrees. This yields higher precision results due to quirks in AST.
569 Parameters
570 ----------
571 wcs : `lsst.afw.geom.SkyWcs`
572 Wcs to approximate
573 center : `lsst.geom.Point2D`
574 Point at which to evaluate the LocalWcs.
575 skyCenter : `lsst.geom.SpherePoint`
576 Point on sky to approximate the Wcs.
578 Returns
579 -------
580 localMatrix : `numpy.ndarray`
581 Matrix representation the local wcs approximation with units
582 degrees.
583 """
584 blankCDMatrix = [[self._scale, 0], [0, self._scale]]
585 localGnomonicWcs = afwGeom.makeSkyWcs(
586 center, skyCenter, blankCDMatrix)
587 measurementToLocalGnomonic = wcs.getTransform().then(
588 localGnomonicWcs.getTransform().inverted()
589 )
590 localMatrix = measurementToLocalGnomonic.getJacobian(center)
591 return localMatrix / 3600
593 def makeAlertDict(self,
594 diaSourceId,
595 observationReason,
596 targetName,
597 diaSource,
598 diaObject,
599 objDiaSrcHistory,
600 objDiaForcedSources,
601 diffImCutout,
602 calexpCutout,
603 templateCutout,
604 ssSource=None):
605 """Convert data and package into a dictionary alert.
607 Parameters
608 ----------
609 diaSourceId : `int`
610 Unique identifier of the triggering diaSource
611 diaSource : `pandas.DataFrame`
612 New single DiaSource to package.
613 observationReason : `str`
614 Scheduler reason for the image containing this diaSource.
615 targetName : `str`
616 Scheduler target for the image containing this diaSource.
617 diaObject : `pandas.DataFrame`
618 DiaObject that ``diaSource`` is matched to.
619 objDiaSrcHistory : `pandas.DataFrame`
620 12 month history of ``diaObject`` excluding the latest DiaSource.
621 objDiaForcedSources : `pandas.DataFrame`
622 12 month history of ``diaObject`` forced measurements.
623 diffImCutout : `astropy.nddata.CCDData` or `None`
624 Cutout of the difference image around the location of ``diaSource``
625 with a min size set by the ``cutoutSize`` configurable.
626 calexpCutout : `astropy.nddata.CCDData` or `None`
627 Cutout of the calexp around the location of ``diaSource``
628 with a min size set by the ``cutoutSize`` configurable.
629 templateCutout : `astropy.nddata.CCDData` or `None`
630 Cutout of the template image around the location of ``diaSource``
631 with a min size set by the ``cutoutSize`` configurable.
632 """
633 alert = dict()
634 alert['diaSourceId'] = diaSourceId
635 # alerts should have NULL if scheduler fields are not populated
636 if not len(observationReason):
637 observationReason = None
638 if not len(targetName):
639 targetName = None
640 alert['observation_reason'] = observationReason
641 alert['target_name'] = targetName
642 alert['diaSource'] = diaSource.to_dict()
644 if objDiaSrcHistory is None:
645 alert['prvDiaSources'] = None
646 else:
647 alert['prvDiaSources'] = objDiaSrcHistory.to_dict("records")
649 if isinstance(objDiaForcedSources, pd.Series):
650 alert['prvDiaForcedSources'] = [objDiaForcedSources.to_dict()]
651 elif objDiaForcedSources is None:
652 alert['prvDiaForcedSources'] = None
653 else:
654 alert['prvDiaForcedSources'] = objDiaForcedSources.to_dict("records")
656 alert['prvDiaNondetectionLimits'] = None
658 if diaObject is None:
659 alert['diaObject'] = None
660 else:
661 alert['diaObject'] = diaObject.to_dict()
663 if ssSource is None:
664 alert['ssSource'] = None
665 alert['mpc_orbits'] = None
666 else:
667 mpcorbColumns = [col for col in ssSource if col[:7] == 'MPCORB_']
668 mpcOrbit = {key[7:]: ssSource[key] for key in mpcorbColumns}
670 fields_to_cast = [
671 "arc_length_total", "arc_length_sel", "a", "mean_anomaly", "period", "mean_motion", "a_unc",
672 "mean_anomaly_unc", "period_unc", "mean_motion_unc", "yarkovsky", "srp", "a1", "a2", "a3",
673 "dt", "yarkovsky_unc", "srp_unc", "a1_unc", "a2_unc", "a3_unc", "dt_unc",
674 "not_normalized_rms", "earth_moid"
675 ]
676 for key in fields_to_cast:
677 if key in mpcOrbit and mpcOrbit[key] is not None:
678 mpcOrbit[key] = float(mpcOrbit[key])
680 if 'packed_primary_provisional_designation' not in mpcOrbit:
681 unpacked_desig = ssSource['designation']
682 packed_desig = ss_object_id_to_obj_id(ssSource['ssObjectId'], packed=True)
683 mpcOrbit['designation'] = unpacked_desig
684 mpcOrbit['packed_primary_provisional_designation'] = packed_desig
685 mpcOrbit['unpacked_primary_provisional_designation'] = unpacked_desig
687 mpcOrbit['id'] = 0
688 ssSource = {key: ssSource[key] for key in ssSource if key not in mpcorbColumns}
689 ssSource['diaSourceId'] = diaSourceId
690 alert['ssSource'] = ssSource
691 alert['mpc_orbits'] = mpcOrbit
693 if diffImCutout is None:
694 alert['cutoutDifference'] = None
695 else:
696 alert['cutoutDifference'] = self.streamCcdDataToBytes(diffImCutout)
698 if calexpCutout is None:
699 alert['cutoutScience'] = None
700 else:
701 alert['cutoutScience'] = self.streamCcdDataToBytes(calexpCutout)
703 if templateCutout is None:
704 alert["cutoutTemplate"] = None
705 else:
706 alert["cutoutTemplate"] = self.streamCcdDataToBytes(templateCutout)
708 return alert
710 def streamCcdDataToBytes(self, cutout):
711 """Serialize a cutout into bytes.
713 Parameters
714 ----------
715 cutout : `astropy.nddata.CCDData`
716 Cutout to serialize.
718 Returns
719 -------
720 coutputBytes : `bytes`
721 Input cutout serialized into byte data.
722 """
723 with io.BytesIO() as streamer:
724 cutout.write(streamer, format="fits")
725 cutoutBytes = streamer.getvalue()
726 return cutoutBytes
728 def _serializeAlert(self, alert, schema=None, schema_id=0):
729 """Serialize an alert to a byte sequence for sending to Kafka.
731 Parameters
732 ----------
733 alert : `dict`
734 An alert payload to be serialized.
735 schema : `dict`, optional
736 An Avro schema definition describing how to encode `alert`. By default,
737 the schema is None, which sets it to the latest schema available.
738 schema_id : `int`, optional
739 The Confluent Schema Registry ID of the schema. By default, 0 (an
740 invalid ID) is used, indicating that the schema is not registered.
742 Returns
743 -------
744 serialized : `bytes`
745 The byte sequence describing the alert, including the Confluent Wire
746 Format prefix.
747 """
748 if schema is None:
749 schema = self.alertSchema.definition
751 buf = io.BytesIO()
752 buf.write(self._serializeConfluentWireHeader(schema_id))
753 fastavro.schemaless_writer(buf, schema, alert)
754 return buf.getvalue()
756 @staticmethod
757 def _serializeConfluentWireHeader(schema_version):
758 """Returns the byte prefix for Confluent Wire Format-style Kafka messages.
760 Parameters
761 ----------
762 schema_version : `int`
763 A version number which indicates the Confluent Schema Registry ID
764 number of the Avro schema used to encode the message that follows this
765 header.
767 Returns
768 -------
769 header : `bytes`
770 The 5-byte encoded message prefix.
772 Notes
773 -----
774 The Confluent Wire Format is described more fully here:
775 https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
776 """
777 ConfluentWireFormatHeader = struct.Struct(">bi")
778 return ConfluentWireFormatHeader.pack(0, schema_version)
780 def _delivery_callback(self, err, msg):
781 if err:
782 if err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
783 topic = msg.topic() if msg and hasattr(msg,
784 'topic') else 'unknown'
785 self.log.warning(
786 'Message failed delivery. Topic %s unknown: %s',
787 topic, err)
788 else:
789 self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset())
791 def _server_check(self):
792 """Checks if the alert stream credentials are still valid and the
793 server is contactable.
795 Raises
796 -------
797 KafkaException
798 Raised if the server us not contactable.
799 RuntimeError
800 Raised if the server is contactable but there are no topics
801 present.
802 """
803 admin_client = AdminClient(self.kafkaAdminConfig)
804 topics = admin_client.list_topics(timeout=self.config.maxTimeout).topics
806 if not topics:
807 raise RuntimeError()
808 if self.kafkaTopic not in topics:
809 raise RuntimeError(
810 f"Topic '{self.kafkaTopic}' not found on the Kafka server. "
811 f"Check that the correct topic has been provided and exists "
812 f"on the Kafka server.")
814 def _computePsf(self, exposure, pixelCenter, srcId=None):
815 """Compute the PSF at a location and catch errors.
817 Parameters
818 ----------
819 exposure : `lsst.afw.image.Exposure`
820 The image to compute the PSF for.
821 pixelCenter : `lsst.geom.Point2D`
822 The location on the image to compute the PSF.
823 srcId : `int`, optional
824 Unique id of DiaSource. Used for when an error occurs extracting
825 a cutout.
827 Returns
828 -------
829 cutoutPsf : `numpy.array`
830 Array of the PSF values.
831 """
832 try:
833 # use exposure.psf.computeKernelImage to provide PSF centered in the array
834 cutoutPsf = exposure.psf.computeKernelImage(pixelCenter).array
835 except InvalidParameterError:
836 if srcId is not None:
837 msg = "Could not calculate PSF for DiaSource with "\
838 "id=%i. InvalidParameterError encountered. Exiting."\
839 % srcId
840 else:
841 msg = "Could not calculate average PSF for the image"
842 self.log.warning(msg)
843 cutoutPsf = None
844 except InvalidPsfError:
845 if srcId is not None:
846 msg = "Could not calculate PSF for DiaSource with "\
847 "id=%i. InvalidPsfError encountered. Exiting."\
848 % srcId
849 else:
850 msg = "Could not calculate average PSF for the image"
851 self.log.warning(msg)
852 cutoutPsf = None
853 return cutoutPsf