Coverage for python / lsst / ap / association / packageAlerts.py: 15%
297 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:54 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ("PackageAlertsConfig", "PackageAlertsTask")
24import io
25import os
26import sys
27import time
29from astropy import wcs
30import astropy.units as u
31from astropy.nddata import CCDData, VarianceUncertainty
32import pandas as pd
33import struct
34import fastavro
35from confluent_kafka.cimpl import KafkaError
37# confluent_kafka is not in the standard Rubin environment as it is a third
38# party package and is only needed when producing alerts.
39try:
40 import confluent_kafka
41 from confluent_kafka import KafkaException
42 from confluent_kafka.admin import AdminClient
43except ImportError:
44 confluent_kafka = None
46import lsst.alert.packet as alertPack
47from lsst.afw.detection import InvalidPsfError
48import lsst.afw.geom as afwGeom
49import lsst.geom as geom
50import lsst.pex.config as pexConfig
51from lsst.pex.exceptions import InvalidParameterError
52import lsst.pipe.base as pipeBase
53from lsst.pipe.tasks.associationUtils import ss_object_id_to_obj_id
54import lsst.utils.logging
55from lsst.utils.timer import timeMethod
58class PackageAlertsConfig(pexConfig.Config):
59 """Config class for AssociationTask.
60 """
61 schemaFile = pexConfig.Field(
62 dtype=str,
63 doc="Schema definition file URI for the avro alerts.",
64 default=str(alertPack.get_uri_to_latest_schema())
65 )
66 minCutoutSize = pexConfig.RangeField(
67 dtype=int,
68 min=0,
69 max=1000,
70 default=30,
71 doc="Dimension of the square image cutouts to package in the alert."
72 )
73 maxCutoutSize = pexConfig.RangeField(
74 dtype=int,
75 min=0,
76 max=1000,
77 default=102,
78 doc="Dimension of the square image cutouts to package in the alert. The"
79 "default size comes from the max trail length in arcseconds "
80 "(10 deg/day) for a 30 second observation divided by the"
81 "arcseconds per pixel (0.2), which is 62.5 pixels. The effective"
82 "size of the psf (40 pixels) is then added for a total of 102 pixels. "
83 )
84 alertWriteLocation = pexConfig.Field(
85 dtype=str,
86 doc="Location to write alerts to.",
87 default=os.path.join(os.getcwd(), "alerts"),
88 )
90 doProduceAlerts = pexConfig.Field(
91 dtype=bool,
92 doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.",
93 default=False,
94 )
96 doWriteAlerts = pexConfig.Field(
97 dtype=bool,
98 doc="Write alerts to disk if true.",
99 default=False,
100 )
102 doWriteFailedAlerts = pexConfig.Field(
103 dtype=bool,
104 doc="If an alert cannot be sent when doProduceAlerts is set, "
105 "write it to disk for debugging purposes.",
106 default=False,
107 )
109 maxTimeout = pexConfig.Field(
110 dtype=float,
111 doc="Sets the maximum time in seconds to wait for the alert stream "
112 "broker to respond to a query before timing out.",
113 default=15.0,
114 )
116 deliveryTimeout = pexConfig.Field(
117 dtype=float,
118 doc="Sets the time to wait for the producer to wait to deliver an "
119 "alert in milliseconds.",
120 default=1200.0,
121 )
123 useAveragePsf = pexConfig.Field(
124 dtype=bool,
125 doc="Use the average PSF for the image, instead of the PSF for each cutout. "
126 "This option is much less accurate, but much faster.",
127 default=False,
128 )
131class PackageAlertsTask(pipeBase.Task):
132 """Tasks for packaging Dia and Pipelines data into Avro alert packages.
133 """
134 ConfigClass = PackageAlertsConfig
135 _DefaultName = "packageAlerts"
137 _scale = (1.0 * geom.arcseconds).asDegrees()
139 def __init__(self, **kwargs):
140 super().__init__(**kwargs)
141 self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile)
142 os.makedirs(self.config.alertWriteLocation, exist_ok=True)
144 if self.config.doProduceAlerts:
145 if confluent_kafka is not None:
146 self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
147 if not self.password:
148 raise ValueError("Kafka password environment variable was not set.")
149 self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
150 if not self.username:
151 raise ValueError("Kafka username environment variable was not set.")
152 self.server = os.getenv("AP_KAFKA_SERVER")
153 if not self.server:
154 raise ValueError("Kafka server environment variable was not set.")
155 self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")
156 if not self.kafkaTopic:
157 raise ValueError("Kafka topic environment variable was not set.")
159 # confluent_kafka configures all of its classes with dictionaries. This one
160 # sets up the bare minimum that is needed.
161 self.kafkaConfig = {
162 # This is the URL to use to connect to the Kafka cluster.
163 "bootstrap.servers": self.server,
164 # These next two properties tell the Kafka client about the specific
165 # authentication and authorization protocols that should be used when
166 # connecting.
167 "security.protocol": "SASL_PLAINTEXT",
168 "sasl.mechanisms": "SCRAM-SHA-512",
169 # The sasl.username and sasl.password are passed through over
170 # SCRAM-SHA-512 auth to connect to the cluster. The username is not
171 # sensitive, but the password is (of course) a secret value which
172 # should never be committed to source code.
173 "sasl.username": self.username,
174 "sasl.password": self.password,
175 # Batch size limits the largest size of a kafka alert that can be sent.
176 # We set the batch size to 2 Mb.
177 "batch.size": 2097152,
178 "linger.ms": 5,
179 "delivery.timeout.ms": self.config.deliveryTimeout,
180 # Compression options are snappy, lz4, zstd, and gzip.
181 "compression.type": 'snappy'
182 }
183 self.kafkaAdminConfig = {
184 # This is the URL to use to connect to the Kafka cluster.
185 "bootstrap.servers": self.server,
186 # These next two properties tell the Kafka client about the specific
187 # authentication and authorization protocols that should be used when
188 # connecting.
189 "security.protocol": "SASL_PLAINTEXT",
190 "sasl.mechanisms": "SCRAM-SHA-512",
191 # The sasl.username and sasl.password are passed through over
192 # SCRAM-SHA-512 auth to connect to the cluster. The username is not
193 # sensitive, but the password is (of course) a secret value which
194 # should never be committed to source code.
195 "sasl.username": self.username,
196 "sasl.password": self.password,
197 }
199 self._server_check()
200 self.producer = confluent_kafka.Producer(**self.kafkaConfig)
202 else:
203 raise RuntimeError("Produce alerts is set but confluent_kafka is not present in "
204 "the environment. Alerts will not be sent to the alert stream.")
206 @timeMethod
207 def run(self,
208 diaSourceCat,
209 diaObjectCat,
210 diaSrcHistory,
211 diaForcedSources,
212 diffIm,
213 calexp,
214 template,
215 ssSrc=None,
216 doRunForcedMeasurement=True,
217 forcedSourceHistoryThreshold=0,
218 ):
219 """Package DiaSources/Object and exposure data into Avro alerts.
221 Alerts can be sent to the alert stream if ``doProduceAlerts`` is set
222 and written to disk if ``doWriteAlerts`` is set. Both can be set at the
223 same time, and are independent of one another.
225 Writes Avro alerts to a location determined by the
226 ``alertWriteLocation`` configurable.
228 Parameters
229 ----------
230 diaSourceCat : `pandas.DataFrame`
231 New DiaSources to package. DataFrame should be indexed on
232 ``["diaObjectId", "band", "diaSourceId"]``
233 diaObjectCat : `pandas.DataFrame`
234 New and updated DiaObjects matched to the new DiaSources. DataFrame
235 is indexed on ``["diaObjectId"]``
236 diaSrcHistory : `pandas.DataFrame`
237 12 month history of DiaSources matched to the DiaObjects. Excludes
238 the newest DiaSource and is indexed on
239 ``["diaObjectId", "band", "diaSourceId"]``
240 diaForcedSources : `pandas.DataFrame`
241 12 month history of DiaForcedSources matched to the DiaObjects.
242 ``["diaObjectId"]``
243 diffIm : `lsst.afw.image.ExposureF`
244 Difference image the sources in ``diaSourceCat`` were detected in.
245 calexp : `lsst.afw.image.ExposureF`
246 Calexp used to create the ``diffIm``.
247 template : `lsst.afw.image.ExposureF` or `None`
248 Template image used to create the ``diffIm``.
249 ssSrc : `astropy.table.Table`, optional
250 Solar system specific information for diaSources associated to ssObjects.
251 doRunForcedMeasurement : `bool`, optional
252 Flag to indicate whether forced measurement was run.
253 This should only be turned off for debugging purposes.
254 Added to allow disabling forced sources for performance
255 reasons during the ops rehearsal.
256 forcedSourceHistoryThreshold : `int`, optional
257 Minimum number of detections of a diaObject required
258 to run forced photometry. Set to 1 to include all diaObjects.
259 """
261 alerts = []
262 self._patchDiaSources(diaSourceCat)
263 self._patchDiaSources(diaSrcHistory)
264 detector = diffIm.detector.getId()
265 visit = diffIm.visitInfo.id
266 observation_reason = diffIm.visitInfo.getObservationReason()
267 target_name = diffIm.visitInfo.getObject()
268 midpoint_unix = diffIm.visitInfo.date.toAstropy().tai.unix
269 exposure_time = diffIm.visitInfo.exposureTime
270 diffImPhotoCalib = diffIm.getPhotoCalib()
271 calexpPhotoCalib = calexp.getPhotoCalib()
272 templatePhotoCalib = template.getPhotoCalib()
273 diffImPsf = self._computePsf(diffIm, diffIm.psf.getAveragePosition())
274 sciencePsf = self._computePsf(calexp, calexp.psf.getAveragePosition())
275 templatePsf = self._computePsf(template, template.psf.getAveragePosition())
277 # cast to object; works around pandas coercing Series to nullable
278 # Float64 type when selecting single rows below
279 diaSourceCat = diaSourceCat.astype('object')
280 diaObjectCat = diaObjectCat.astype('object')
281 diaSrcHistory = diaSrcHistory.astype('object')
282 diaForcedSources = diaForcedSources.astype('object')
284 if ssSrc is not None:
285 ssSrc = ssSrc.set_index('diaSourceId')
286 ssSrc = ssSrc.astype('object')
288 n_sources = len(diaSourceCat)
289 self.log.info("Packaging alerts for %d DiaSources.", n_sources)
290 # Log every 10 seconds as proof of liveness.
291 loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0)
293 for srcIndex, diaSource in diaSourceCat.iterrows():
294 loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources)
296 alertId = diaSource["diaSourceId"]
297 if srcIndex[0] == 0:
298 diaObject = None
299 objSourceHistory = None
300 objDiaForcedSources = None
301 ssSource = ssSrc.loc[[alertId]].to_dict("records")[0]
302 else:
303 ssSource = None
304 diaObject = diaObjectCat.loc[srcIndex[0]]
305 if diaObject["nDiaSources"] > 1:
306 objSourceHistory = diaSrcHistory.loc[srcIndex[0]]
307 else:
308 objSourceHistory = None
309 if doRunForcedMeasurement and diaObject["nDiaSources"] >= forcedSourceHistoryThreshold:
310 objDiaForcedSources = diaForcedSources.loc[srcIndex[0]]
311 else:
312 # Send empty table with correct columns
313 objDiaForcedSources = diaForcedSources.loc[[]]
315 sphPoint = geom.SpherePoint(diaSource["ra"],
316 diaSource["dec"],
317 geom.degrees)
318 pixelPoint = geom.Point2D(diaSource["x"],
319 diaSource["y"])
321 cutoutExtent = self.createDiaSourceExtent(diaSource["bboxSize"])
322 diffImCutout = self.createCcdDataCutout(
323 diffIm,
324 sphPoint,
325 pixelPoint,
326 cutoutExtent,
327 diffImPhotoCalib,
328 diaSource["diaSourceId"],
329 averagePsf=diffImPsf,
330 rotPa=diffIm.visitInfo.boresightRotAngle.asDegrees())
331 calexpCutout = self.createCcdDataCutout(
332 calexp,
333 sphPoint,
334 pixelPoint,
335 cutoutExtent,
336 calexpPhotoCalib,
337 diaSource["diaSourceId"],
338 averagePsf=sciencePsf,
339 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees())
340 templateCutout = self.createCcdDataCutout(
341 template,
342 sphPoint,
343 pixelPoint,
344 cutoutExtent,
345 templatePhotoCalib,
346 diaSource["diaSourceId"],
347 averagePsf=templatePsf,
348 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees())
350 alerts.append(
351 self.makeAlertDict(diaSource["diaSourceId"],
352 observation_reason,
353 target_name,
354 diaSource,
355 diaObject,
356 objSourceHistory,
357 objDiaForcedSources,
358 diffImCutout,
359 calexpCutout,
360 templateCutout,
361 ssSource))
363 if self.config.doProduceAlerts:
364 self.log.info("Producing alerts to %s.", self.kafkaTopic)
365 self.produceAlerts(alerts, visit, detector, midpoint_unix, exposure_time)
366 else:
367 # Fill values for the metadata so that downstream metrics don't crash
368 self.metadata['visit_midpoint'] = midpoint_unix
369 self.metadata['produce_end_timestamp'] = -1
370 self.metadata['produce_start_timestamp'] = -1
371 self.metadata['alert_timing_since_shutter_close'] = -1
372 self.metadata['total_alerts'] = -1
374 if self.config.doWriteAlerts:
375 avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro")
376 self.log.info("Writing alerts to %s.", avro_path)
377 with open(avro_path, "wb") as f:
378 self.alertSchema.store_alerts(f, alerts)
380 def _patchDiaSources(self, diaSources):
381 """Add the ``programId`` column to the data.
383 Parameters
384 ----------
385 diaSources : `pandas.DataFrame`
386 DataFrame of DiaSources to patch.
387 """
388 diaSources["programId"] = 0
390 def createDiaSourceExtent(self, bboxSize):
391 """Create an extent for a box for the cutouts given the size of the
392 square BBox that covers the source footprint.
394 Parameters
395 ----------
396 bboxSize : `int`
397 Size of a side of the square bounding box in pixels.
399 Returns
400 -------
401 extent : `lsst.geom.Extent2I`
402 Geom object representing the size of the bounding box.
403 """
404 if bboxSize < self.config.minCutoutSize:
405 extent = geom.Extent2I(self.config.minCutoutSize,
406 self.config.minCutoutSize)
407 elif bboxSize > self.config.maxCutoutSize:
408 extent = geom.Extent2I(self.config.maxCutoutSize,
409 self.config.maxCutoutSize)
410 else:
411 extent = geom.Extent2I(bboxSize, bboxSize)
412 return extent
414 @timeMethod
415 def produceAlerts(self, alerts, visit, detector, midpoint_unix, exposure_time):
416 """Serialize alerts and send them to the alert stream using
417 confluent_kafka's producer.
419 Parameters
420 ----------
421 alerts : `dict`
422 Dictionary of alerts to be sent to the alert stream.
423 visit, detector : `int`
424 Visit and detector ids of these alerts. Used to write out alerts
425 which fail to be sent to the alert stream.
426 """
427 schema_id = self.alertSchema.get_schema_id()
428 # Serialize and send alerts with the producer timestamp.
429 total_alerts = 0
430 produce_start_timestamp = time.time()
431 try:
432 for alert in alerts:
433 alertBytes = self._serializeAlert(alert,
434 schema=self.alertSchema.definition, schema_id=schema_id)
435 try:
436 timestamp = time.time()*1000 # Current time in milliseconds
437 headers = [("producer_timestamp", str(timestamp).encode('utf-8'))]
438 self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback,
439 headers=headers)
440 self.producer.flush()
441 total_alerts += 1
443 except KafkaException as e:
444 self.log.warning('Kafka error: %s, message was %s bytes',
445 e, sys.getsizeof(alertBytes))
447 if self.config.doWriteFailedAlerts:
448 with open(os.path.join(self.config.alertWriteLocation,
449 f"{visit}_{detector}_{alert['diaSourceId']}.avro"), "wb") as f:
450 f.write(alertBytes)
452 self.producer.flush()
453 finally:
454 produce_end_timestamp = time.time() # Current time in seconds
455 total_time = produce_end_timestamp - (midpoint_unix + exposure_time/2.0)
456 self.metadata['visit_midpoint'] = midpoint_unix
457 self.metadata['produce_end_timestamp'] = produce_end_timestamp
458 self.metadata['produce_start_timestamp'] = produce_start_timestamp
459 self.metadata['alert_timing_since_shutter_close'] = total_time
460 self.metadata['total_alerts'] = total_alerts
461 # A single log message is easier for Loki to parse than timeMethod's start+end pairs.
462 self.log.verbose("Producing alerts: Took %.4f seconds",
463 produce_end_timestamp - produce_start_timestamp)
465 self.log.info(f"Total time since shutter close to produce alerts for"
466 f" visit {visit} detector {detector}: {total_time} seconds")
468 def createCcdDataCutout(self, image, skyCenter, pixelCenter, extent, photoCalib, srcId, averagePsf=None,
469 rotPa=None):
470 """Grab an image as a cutout and return a calibrated CCDData image.
472 Parameters
473 ----------
474 image : `lsst.afw.image.ExposureF`
475 Image to pull cutout from.
476 skyCenter : `lsst.geom.SpherePoint`
477 Center point of DiaSource on the sky.
478 pixelCenter : `lsst.geom.Point2D`
479 Pixel center of DiaSource on the sky.
480 extent : `lsst.geom.Extent2I`
481 Bounding box to cutout from the image.
482 photoCalib : `lsst.afw.image.PhotoCalib`
483 Calibrate object of the image the cutout is cut from.
484 srcId : `int`
485 Unique id of DiaSource. Used for when an error occurs extracting
486 a cutout.
487 averagePsf : `numpy.array`, optional
488 Average PSF to attach to the cutout.
489 Used if ``self.config.useAveragePsf`` is set.
490 cutoutType : str, optional
491 Type of cutout being created ('difference', 'template', or 'science')
493 Returns
494 -------
495 ccdData : `astropy.nddata.CCDData` or `None`
496 CCDData object storing the calibrate information from the input
497 difference or template image.
498 """
499 imBBox = image.getBBox()
500 if not geom.Box2D(image.getBBox()).contains(pixelCenter):
501 self.log.warning(
502 "DiaSource id=%i centroid lies at pixel (%.2f, %.2f) "
503 "which is outside the Exposure with bounding box "
504 "((%i, %i), (%i, %i)). Returning None for cutout...",
505 srcId, pixelCenter.x, pixelCenter.y,
506 imBBox.minX, imBBox.maxX, imBBox.minY, imBBox.maxY)
507 return None
508 # Catch errors in retrieving the cutout.
509 try:
510 cutout = image.getCutout(pixelCenter, extent)
511 except InvalidParameterError:
512 self.log.warning(
513 "Failed to retrieve cutout from image for DiaSource with "
514 "id=%i. InvalidParameterError thrown during cutout "
515 "creation. Returning None for cutout..."
516 % srcId)
517 if self.config.useAveragePsf:
518 if averagePsf is None:
519 self.log.info("Using source id=%i to compute the average PSF.", srcId)
520 averagePsf = self._computePsf(image, pixelCenter, srcId=srcId)
521 cutoutPsf = averagePsf
522 else:
523 cutoutPsf = self._computePsf(image, pixelCenter, srcId=srcId)
525 # Find the value of the bottom corner of our cutout's BBox and
526 # subtract 1 so that the CCDData cutout position value will be
527 # [1, 1].
528 cutOutMinX = cutout.getBBox().minX - 1
529 cutOutMinY = cutout.getBBox().minY - 1
530 center = cutout.getWcs().skyToPixel(skyCenter)
531 calibCutout = photoCalib.calibrateImage(cutout.getMaskedImage())
533 cutoutWcs = wcs.WCS(naxis=2)
534 cutoutWcs.array_shape = (cutout.getBBox().getWidth(),
535 cutout.getBBox().getWidth())
536 cutoutWcs.wcs.crpix = [center.x - cutOutMinX, center.y - cutOutMinY]
537 cutoutWcs.wcs.crval = [skyCenter.getRa().asDegrees(),
538 skyCenter.getDec().asDegrees()]
539 cutoutWcs.wcs.cd = self.makeLocalTransformMatrix(cutout.getWcs(),
540 center,
541 skyCenter)
542 ccdData = CCDData(
543 data=calibCutout.getImage().array,
544 uncertainty=VarianceUncertainty(calibCutout.getVariance().array),
545 flags=calibCutout.getMask().array,
546 wcs=cutoutWcs,
547 psf=cutoutPsf,
548 meta={"cutMinX": cutOutMinX,
549 "cutMinY": cutOutMinY},
550 unit=u.nJy)
552 ccdData.header['ROTPA'] = (rotPa, 'Pos angle in deg of focal plane +Y wrt North')
554 return ccdData
556 def makeLocalTransformMatrix(self, wcs, center, skyCenter):
557 """Create a local, linear approximation of the wcs transformation
558 matrix.
560 The approximation is created as if the center is at RA=0, DEC=0. All
561 comparing x,y coordinate are relative to the position of center. Matrix
562 is initially calculated with units arcseconds and then converted to
563 degrees. This yields higher precision results due to quirks in AST.
565 Parameters
566 ----------
567 wcs : `lsst.afw.geom.SkyWcs`
568 Wcs to approximate
569 center : `lsst.geom.Point2D`
570 Point at which to evaluate the LocalWcs.
571 skyCenter : `lsst.geom.SpherePoint`
572 Point on sky to approximate the Wcs.
574 Returns
575 -------
576 localMatrix : `numpy.ndarray`
577 Matrix representation the local wcs approximation with units
578 degrees.
579 """
580 blankCDMatrix = [[self._scale, 0], [0, self._scale]]
581 localGnomonicWcs = afwGeom.makeSkyWcs(
582 center, skyCenter, blankCDMatrix)
583 measurementToLocalGnomonic = wcs.getTransform().then(
584 localGnomonicWcs.getTransform().inverted()
585 )
586 localMatrix = measurementToLocalGnomonic.getJacobian(center)
587 return localMatrix / 3600
589 def makeAlertDict(self,
590 diaSourceId,
591 observationReason,
592 targetName,
593 diaSource,
594 diaObject,
595 objDiaSrcHistory,
596 objDiaForcedSources,
597 diffImCutout,
598 calexpCutout,
599 templateCutout,
600 ssSource=None):
601 """Convert data and package into a dictionary alert.
603 Parameters
604 ----------
605 diaSourceId : `int`
606 Unique identifier of the triggering diaSource
607 diaSource : `pandas.DataFrame`
608 New single DiaSource to package.
609 observationReason : `str`
610 Scheduler reason for the image containing this diaSource.
611 targetName : `str`
612 Scheduler target for the image containing this diaSource.
613 diaObject : `pandas.DataFrame`
614 DiaObject that ``diaSource`` is matched to.
615 objDiaSrcHistory : `pandas.DataFrame`
616 12 month history of ``diaObject`` excluding the latest DiaSource.
617 objDiaForcedSources : `pandas.DataFrame`
618 12 month history of ``diaObject`` forced measurements.
619 diffImCutout : `astropy.nddata.CCDData` or `None`
620 Cutout of the difference image around the location of ``diaSource``
621 with a min size set by the ``cutoutSize`` configurable.
622 calexpCutout : `astropy.nddata.CCDData` or `None`
623 Cutout of the calexp around the location of ``diaSource``
624 with a min size set by the ``cutoutSize`` configurable.
625 templateCutout : `astropy.nddata.CCDData` or `None`
626 Cutout of the template image around the location of ``diaSource``
627 with a min size set by the ``cutoutSize`` configurable.
628 """
629 alert = dict()
630 alert['diaSourceId'] = diaSourceId
631 # alerts should have NULL if scheduler fields are not populated
632 if not len(observationReason):
633 observationReason = None
634 if not len(targetName):
635 targetName = None
636 alert['observation_reason'] = observationReason
637 alert['target_name'] = targetName
638 alert['diaSource'] = diaSource.to_dict()
640 if objDiaSrcHistory is None:
641 alert['prvDiaSources'] = None
642 else:
643 alert['prvDiaSources'] = objDiaSrcHistory.to_dict("records")
645 if isinstance(objDiaForcedSources, pd.Series):
646 alert['prvDiaForcedSources'] = [objDiaForcedSources.to_dict()]
647 elif objDiaForcedSources is None:
648 alert['prvDiaForcedSources'] = None
649 else:
650 alert['prvDiaForcedSources'] = objDiaForcedSources.to_dict("records")
652 alert['prvDiaNondetectionLimits'] = None
654 if diaObject is None:
655 alert['diaObject'] = None
656 else:
657 alert['diaObject'] = diaObject.to_dict()
659 if ssSource is None:
660 alert['ssSource'] = None
661 alert['MPCORB'] = None
662 else:
663 mpcorbColumns = [col for col in ssSource if col[:7] == 'MPCORB_']
664 mpcOrbit = {key[7:]: ssSource[key] for key in mpcorbColumns}
666 fields_to_cast = [
667 "arc_length_total", "arc_length_sel", "a", "mean_anomaly", "period", "mean_motion", "a_unc",
668 "mean_anomaly_unc", "period_unc", "mean_motion_unc", "yarkovsky", "srp", "a1", "a2", "a3",
669 "dt", "yarkovsky_unc", "srp_unc", "a1_unc", "a2_unc", "a3_unc", "dt_unc",
670 "not_normalized_rms", "earth_moid"
671 ]
672 for key in fields_to_cast:
673 if key in mpcOrbit and mpcOrbit[key] is not None:
674 mpcOrbit[key] = float(mpcOrbit[key])
676 if 'packed_primary_provisional_designation' not in mpcOrbit:
677 unpacked_desig = ssSource['designation']
678 packed_desig = ss_object_id_to_obj_id(ssSource['ssObjectId'], packed=True)
679 mpcOrbit['designation'] = unpacked_desig
680 mpcOrbit['packed_primary_provisional_designation'] = packed_desig
681 mpcOrbit['unpacked_primary_provisional_designation'] = unpacked_desig
683 mpcOrbit['id'] = 0
684 ssSource = {key: ssSource[key] for key in ssSource if key not in mpcorbColumns}
685 ssSource['diaSourceId'] = diaSourceId
686 alert['ssSource'] = ssSource
687 alert['MPCORB'] = mpcOrbit
689 if diffImCutout is None:
690 alert['cutoutDifference'] = None
691 else:
692 alert['cutoutDifference'] = self.streamCcdDataToBytes(diffImCutout)
694 if calexpCutout is None:
695 alert['cutoutScience'] = None
696 else:
697 alert['cutoutScience'] = self.streamCcdDataToBytes(calexpCutout)
699 if templateCutout is None:
700 alert["cutoutTemplate"] = None
701 else:
702 alert["cutoutTemplate"] = self.streamCcdDataToBytes(templateCutout)
704 return alert
706 def streamCcdDataToBytes(self, cutout):
707 """Serialize a cutout into bytes.
709 Parameters
710 ----------
711 cutout : `astropy.nddata.CCDData`
712 Cutout to serialize.
714 Returns
715 -------
716 coutputBytes : `bytes`
717 Input cutout serialized into byte data.
718 """
719 with io.BytesIO() as streamer:
720 cutout.write(streamer, format="fits")
721 cutoutBytes = streamer.getvalue()
722 return cutoutBytes
724 def _serializeAlert(self, alert, schema=None, schema_id=0):
725 """Serialize an alert to a byte sequence for sending to Kafka.
727 Parameters
728 ----------
729 alert : `dict`
730 An alert payload to be serialized.
731 schema : `dict`, optional
732 An Avro schema definition describing how to encode `alert`. By default,
733 the schema is None, which sets it to the latest schema available.
734 schema_id : `int`, optional
735 The Confluent Schema Registry ID of the schema. By default, 0 (an
736 invalid ID) is used, indicating that the schema is not registered.
738 Returns
739 -------
740 serialized : `bytes`
741 The byte sequence describing the alert, including the Confluent Wire
742 Format prefix.
743 """
744 if schema is None:
745 schema = self.alertSchema.definition
747 buf = io.BytesIO()
748 buf.write(self._serializeConfluentWireHeader(schema_id))
749 fastavro.schemaless_writer(buf, schema, alert)
750 return buf.getvalue()
752 @staticmethod
753 def _serializeConfluentWireHeader(schema_version):
754 """Returns the byte prefix for Confluent Wire Format-style Kafka messages.
756 Parameters
757 ----------
758 schema_version : `int`
759 A version number which indicates the Confluent Schema Registry ID
760 number of the Avro schema used to encode the message that follows this
761 header.
763 Returns
764 -------
765 header : `bytes`
766 The 5-byte encoded message prefix.
768 Notes
769 -----
770 The Confluent Wire Format is described more fully here:
771 https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
772 """
773 ConfluentWireFormatHeader = struct.Struct(">bi")
774 return ConfluentWireFormatHeader.pack(0, schema_version)
776 def _delivery_callback(self, err, msg):
777 if err:
778 if err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
779 topic = msg.topic() if msg and hasattr(msg,
780 'topic') else 'unknown'
781 self.log.warning(
782 'Message failed delivery. Topic %s unknown: %s',
783 topic, err)
784 else:
785 self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset())
787 def _server_check(self):
788 """Checks if the alert stream credentials are still valid and the
789 server is contactable.
791 Raises
792 -------
793 KafkaException
794 Raised if the server us not contactable.
795 RuntimeError
796 Raised if the server is contactable but there are no topics
797 present.
798 """
799 admin_client = AdminClient(self.kafkaAdminConfig)
800 topics = admin_client.list_topics(timeout=self.config.maxTimeout).topics
802 if not topics:
803 raise RuntimeError()
804 if self.kafkaTopic not in topics:
805 raise RuntimeError(
806 f"Topic '{self.kafkaTopic}' not found on the Kafka server. "
807 f"Check that the correct topic has been provided and exists "
808 f"on the Kafka server.")
810 def _computePsf(self, exposure, pixelCenter, srcId=None):
811 """Compute the PSF at a location and catch errors.
813 Parameters
814 ----------
815 exposure : `lsst.afw.image.Exposure`
816 The image to compute the PSF for.
817 pixelCenter : `lsst.geom.Point2D`
818 The location on the image to compute the PSF.
819 srcId : `int`, optional
820 Unique id of DiaSource. Used for when an error occurs extracting
821 a cutout.
823 Returns
824 -------
825 cutoutPsf : `numpy.array`
826 Array of the PSF values.
827 """
828 try:
829 # use exposure.psf.computeKernelImage to provide PSF centered in the array
830 cutoutPsf = exposure.psf.computeKernelImage(pixelCenter).array
831 except InvalidParameterError:
832 if srcId is not None:
833 msg = "Could not calculate PSF for DiaSource with "\
834 "id=%i. InvalidParameterError encountered. Exiting."\
835 % srcId
836 else:
837 msg = "Could not calculate average PSF for the image"
838 self.log.warning(msg)
839 cutoutPsf = None
840 except InvalidPsfError:
841 if srcId is not None:
842 msg = "Could not calculate PSF for DiaSource with "\
843 "id=%i. InvalidPsfError encountered. Exiting."\
844 % srcId
845 else:
846 msg = "Could not calculate average PSF for the image"
847 self.log.warning(msg)
848 cutoutPsf = None
849 return cutoutPsf