Coverage for python / lsst / ap / association / packageAlerts.py: 15%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:09 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ("PackageAlertsConfig", "PackageAlertsTask") 

23 

24import io 

25import os 

26import sys 

27import time 

28 

29from astropy import wcs 

30import astropy.units as u 

31from astropy.nddata import CCDData, VarianceUncertainty 

32import pandas as pd 

33import struct 

34import fastavro 

35from confluent_kafka.cimpl import KafkaError 

36 

37# confluent_kafka is not in the standard Rubin environment as it is a third 

38# party package and is only needed when producing alerts. 

39try: 

40 import confluent_kafka 

41 from confluent_kafka import KafkaException 

42 from confluent_kafka.admin import AdminClient 

43except ImportError: 

44 confluent_kafka = None 

45 

46import lsst.alert.packet as alertPack 

47from lsst.afw.detection import InvalidPsfError 

48import lsst.afw.geom as afwGeom 

49import lsst.geom as geom 

50import lsst.pex.config as pexConfig 

51from lsst.pex.exceptions import InvalidParameterError 

52import lsst.pipe.base as pipeBase 

53from lsst.pipe.tasks.associationUtils import ss_object_id_to_obj_id 

54import lsst.utils.logging 

55from lsst.utils.timer import timeMethod 

56 

57 

58class PackageAlertsConfig(pexConfig.Config): 

59 """Config class for AssociationTask. 

60 """ 

61 schemaFile = pexConfig.Field( 

62 dtype=str, 

63 doc="Schema definition file URI for the avro alerts.", 

64 default=str(alertPack.get_uri_to_latest_schema()) 

65 ) 

66 minCutoutSize = pexConfig.RangeField( 

67 dtype=int, 

68 min=0, 

69 max=1000, 

70 default=30, 

71 doc="Dimension of the square image cutouts to package in the alert." 

72 ) 

73 maxCutoutSize = pexConfig.RangeField( 

74 dtype=int, 

75 min=0, 

76 max=1000, 

77 default=102, 

78 doc="Dimension of the square image cutouts to package in the alert. The" 

79 "default size comes from the max trail length in arcseconds " 

80 "(10 deg/day) for a 30 second observation divided by the" 

81 "arcseconds per pixel (0.2), which is 62.5 pixels. The effective" 

82 "size of the psf (40 pixels) is then added for a total of 102 pixels. " 

83 ) 

84 alertWriteLocation = pexConfig.Field( 

85 dtype=str, 

86 doc="Location to write alerts to.", 

87 default=os.path.join(os.getcwd(), "alerts"), 

88 ) 

89 

90 doProduceAlerts = pexConfig.Field( 

91 dtype=bool, 

92 doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.", 

93 default=False, 

94 ) 

95 

96 doWriteAlerts = pexConfig.Field( 

97 dtype=bool, 

98 doc="Write alerts to disk if true.", 

99 default=False, 

100 ) 

101 

102 doWriteFailedAlerts = pexConfig.Field( 

103 dtype=bool, 

104 doc="If an alert cannot be sent when doProduceAlerts is set, " 

105 "write it to disk for debugging purposes.", 

106 default=False, 

107 ) 

108 

109 maxTimeout = pexConfig.Field( 

110 dtype=float, 

111 doc="Sets the maximum time in seconds to wait for the alert stream " 

112 "broker to respond to a query before timing out.", 

113 default=15.0, 

114 ) 

115 

116 deliveryTimeout = pexConfig.Field( 

117 dtype=float, 

118 doc="Sets the time to wait for the producer to wait to deliver an " 

119 "alert in milliseconds.", 

120 default=1200.0, 

121 ) 

122 

123 useAveragePsf = pexConfig.Field( 

124 dtype=bool, 

125 doc="Use the average PSF for the image, instead of the PSF for each cutout. " 

126 "This option is much less accurate, but much faster.", 

127 default=False, 

128 ) 

129 

130 

131class PackageAlertsTask(pipeBase.Task): 

132 """Tasks for packaging Dia and Pipelines data into Avro alert packages. 

133 """ 

134 ConfigClass = PackageAlertsConfig 

135 _DefaultName = "packageAlerts" 

136 

137 _scale = (1.0 * geom.arcseconds).asDegrees() 

138 

139 def __init__(self, **kwargs): 

140 super().__init__(**kwargs) 

141 self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile) 

142 os.makedirs(self.config.alertWriteLocation, exist_ok=True) 

143 

144 if self.config.doProduceAlerts: 

145 if confluent_kafka is not None: 

146 self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD") 

147 if not self.password: 

148 raise ValueError("Kafka password environment variable was not set.") 

149 self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME") 

150 if not self.username: 

151 raise ValueError("Kafka username environment variable was not set.") 

152 self.server = os.getenv("AP_KAFKA_SERVER") 

153 if not self.server: 

154 raise ValueError("Kafka server environment variable was not set.") 

155 self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC") 

156 if not self.kafkaTopic: 

157 raise ValueError("Kafka topic environment variable was not set.") 

158 

159 # confluent_kafka configures all of its classes with dictionaries. This one 

160 # sets up the bare minimum that is needed. 

161 self.kafkaConfig = { 

162 # This is the URL to use to connect to the Kafka cluster. 

163 "bootstrap.servers": self.server, 

164 # These next two properties tell the Kafka client about the specific 

165 # authentication and authorization protocols that should be used when 

166 # connecting. 

167 "security.protocol": "SASL_PLAINTEXT", 

168 "sasl.mechanisms": "SCRAM-SHA-512", 

169 # The sasl.username and sasl.password are passed through over 

170 # SCRAM-SHA-512 auth to connect to the cluster. The username is not 

171 # sensitive, but the password is (of course) a secret value which 

172 # should never be committed to source code. 

173 "sasl.username": self.username, 

174 "sasl.password": self.password, 

175 # Batch size limits the largest size of a kafka alert that can be sent. 

176 # We set the batch size to 2 Mb. 

177 "batch.size": 2097152, 

178 "linger.ms": 5, 

179 "delivery.timeout.ms": self.config.deliveryTimeout, 

180 # Compression options are snappy, lz4, zstd, and gzip. 

181 "compression.type": 'snappy' 

182 } 

183 self.kafkaAdminConfig = { 

184 # This is the URL to use to connect to the Kafka cluster. 

185 "bootstrap.servers": self.server, 

186 # These next two properties tell the Kafka client about the specific 

187 # authentication and authorization protocols that should be used when 

188 # connecting. 

189 "security.protocol": "SASL_PLAINTEXT", 

190 "sasl.mechanisms": "SCRAM-SHA-512", 

191 # The sasl.username and sasl.password are passed through over 

192 # SCRAM-SHA-512 auth to connect to the cluster. The username is not 

193 # sensitive, but the password is (of course) a secret value which 

194 # should never be committed to source code. 

195 "sasl.username": self.username, 

196 "sasl.password": self.password, 

197 } 

198 

199 self._server_check() 

200 self.producer = confluent_kafka.Producer(**self.kafkaConfig) 

201 

202 else: 

203 raise RuntimeError("Produce alerts is set but confluent_kafka is not present in " 

204 "the environment. Alerts will not be sent to the alert stream.") 

205 

206 @timeMethod 

207 def run(self, 

208 diaSourceCat, 

209 diaObjectCat, 

210 diaSrcHistory, 

211 diaForcedSources, 

212 diffIm, 

213 calexp, 

214 template, 

215 ssSrc=None, 

216 doRunForcedMeasurement=True, 

217 forcedSourceHistoryThreshold=0, 

218 ): 

219 """Package DiaSources/Object and exposure data into Avro alerts. 

220 

221 Alerts can be sent to the alert stream if ``doProduceAlerts`` is set 

222 and written to disk if ``doWriteAlerts`` is set. Both can be set at the 

223 same time, and are independent of one another. 

224 

225 Writes Avro alerts to a location determined by the 

226 ``alertWriteLocation`` configurable. 

227 

228 Parameters 

229 ---------- 

230 diaSourceCat : `pandas.DataFrame` 

231 New DiaSources to package. DataFrame should be indexed on 

232 ``["diaObjectId", "band", "diaSourceId"]`` 

233 diaObjectCat : `pandas.DataFrame` 

234 New and updated DiaObjects matched to the new DiaSources. DataFrame 

235 is indexed on ``["diaObjectId"]`` 

236 diaSrcHistory : `pandas.DataFrame` 

237 12 month history of DiaSources matched to the DiaObjects. Excludes 

238 the newest DiaSource and is indexed on 

239 ``["diaObjectId", "band", "diaSourceId"]`` 

240 diaForcedSources : `pandas.DataFrame` 

241 12 month history of DiaForcedSources matched to the DiaObjects. 

242 ``["diaObjectId"]`` 

243 diffIm : `lsst.afw.image.ExposureF` 

244 Difference image the sources in ``diaSourceCat`` were detected in. 

245 calexp : `lsst.afw.image.ExposureF` 

246 Calexp used to create the ``diffIm``. 

247 template : `lsst.afw.image.ExposureF` or `None` 

248 Template image used to create the ``diffIm``. 

249 ssSrc : `astropy.table.Table`, optional 

250 Solar system specific information for diaSources associated to ssObjects. 

251 doRunForcedMeasurement : `bool`, optional 

252 Flag to indicate whether forced measurement was run. 

253 This should only be turned off for debugging purposes. 

254 Added to allow disabling forced sources for performance 

255 reasons during the ops rehearsal. 

256 forcedSourceHistoryThreshold : `int`, optional 

257 Minimum number of detections of a diaObject required 

258 to run forced photometry. Set to 1 to include all diaObjects. 

259 """ 

260 

261 alerts = [] 

262 self._patchDiaSources(diaSourceCat) 

263 self._patchDiaSources(diaSrcHistory) 

264 detector = diffIm.detector.getId() 

265 visit = diffIm.visitInfo.id 

266 observation_reason = diffIm.visitInfo.getObservationReason() 

267 target_name = diffIm.visitInfo.getObject() 

268 midpoint_unix = diffIm.visitInfo.date.toAstropy().tai.unix 

269 exposure_time = diffIm.visitInfo.exposureTime 

270 diffImPhotoCalib = diffIm.getPhotoCalib() 

271 calexpPhotoCalib = calexp.getPhotoCalib() 

272 templatePhotoCalib = template.getPhotoCalib() 

273 diffImPsf = self._computePsf(diffIm, diffIm.psf.getAveragePosition()) 

274 sciencePsf = self._computePsf(calexp, calexp.psf.getAveragePosition()) 

275 templatePsf = self._computePsf(template, template.psf.getAveragePosition()) 

276 

277 # cast to object; works around pandas coercing Series to nullable 

278 # Float64 type when selecting single rows below 

279 diaSourceCat = diaSourceCat.astype('object') 

280 diaObjectCat = diaObjectCat.astype('object') 

281 diaSrcHistory = diaSrcHistory.astype('object') 

282 diaForcedSources = diaForcedSources.astype('object') 

283 

284 if ssSrc is not None: 

285 ssSrc = ssSrc.set_index('diaSourceId') 

286 ssSrc = ssSrc.astype('object') 

287 

288 n_sources = len(diaSourceCat) 

289 self.log.info("Packaging alerts for %d DiaSources.", n_sources) 

290 # Log every 10 seconds as proof of liveness. 

291 loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0) 

292 

293 for srcIndex, diaSource in diaSourceCat.iterrows(): 

294 loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources) 

295 

296 alertId = diaSource["diaSourceId"] 

297 if srcIndex[0] == 0: 

298 diaObject = None 

299 objSourceHistory = None 

300 objDiaForcedSources = None 

301 ssSource = ssSrc.loc[[alertId]].to_dict("records")[0] 

302 else: 

303 ssSource = None 

304 diaObject = diaObjectCat.loc[srcIndex[0]] 

305 if diaObject["nDiaSources"] > 1: 

306 objSourceHistory = diaSrcHistory.loc[srcIndex[0]] 

307 else: 

308 objSourceHistory = None 

309 if doRunForcedMeasurement and diaObject["nDiaSources"] >= forcedSourceHistoryThreshold: 

310 objDiaForcedSources = diaForcedSources.loc[srcIndex[0]] 

311 else: 

312 # Send empty table with correct columns 

313 objDiaForcedSources = diaForcedSources.loc[[]] 

314 

315 sphPoint = geom.SpherePoint(diaSource["ra"], 

316 diaSource["dec"], 

317 geom.degrees) 

318 pixelPoint = geom.Point2D(diaSource["x"], 

319 diaSource["y"]) 

320 

321 cutoutExtent = self.createDiaSourceExtent(diaSource["bboxSize"]) 

322 diffImCutout = self.createCcdDataCutout( 

323 diffIm, 

324 sphPoint, 

325 pixelPoint, 

326 cutoutExtent, 

327 diffImPhotoCalib, 

328 diaSource["diaSourceId"], 

329 averagePsf=diffImPsf, 

330 rotPa=diffIm.visitInfo.boresightRotAngle.asDegrees()) 

331 calexpCutout = self.createCcdDataCutout( 

332 calexp, 

333 sphPoint, 

334 pixelPoint, 

335 cutoutExtent, 

336 calexpPhotoCalib, 

337 diaSource["diaSourceId"], 

338 averagePsf=sciencePsf, 

339 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees()) 

340 templateCutout = self.createCcdDataCutout( 

341 template, 

342 sphPoint, 

343 pixelPoint, 

344 cutoutExtent, 

345 templatePhotoCalib, 

346 diaSource["diaSourceId"], 

347 averagePsf=templatePsf, 

348 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees()) 

349 

350 alerts.append( 

351 self.makeAlertDict(diaSource["diaSourceId"], 

352 observation_reason, 

353 target_name, 

354 diaSource, 

355 diaObject, 

356 objSourceHistory, 

357 objDiaForcedSources, 

358 diffImCutout, 

359 calexpCutout, 

360 templateCutout, 

361 ssSource)) 

362 

363 if self.config.doProduceAlerts: 

364 self.log.info("Producing alerts to %s.", self.kafkaTopic) 

365 self.produceAlerts(alerts, visit, detector, midpoint_unix, exposure_time) 

366 else: 

367 # Fill values for the metadata so that downstream metrics don't crash 

368 self.metadata['visit_midpoint'] = midpoint_unix 

369 self.metadata['produce_end_timestamp'] = -1 

370 self.metadata['produce_start_timestamp'] = -1 

371 self.metadata['alert_timing_since_shutter_close'] = -1 

372 self.metadata['total_alerts'] = -1 

373 

374 if self.config.doWriteAlerts: 

375 avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro") 

376 self.log.info("Writing alerts to %s.", avro_path) 

377 with open(avro_path, "wb") as f: 

378 self.alertSchema.store_alerts(f, alerts) 

379 

380 def _patchDiaSources(self, diaSources): 

381 """Add the ``programId`` column to the data. 

382 

383 Parameters 

384 ---------- 

385 diaSources : `pandas.DataFrame` 

386 DataFrame of DiaSources to patch. 

387 """ 

388 diaSources["programId"] = 0 

389 

390 def createDiaSourceExtent(self, bboxSize): 

391 """Create an extent for a box for the cutouts given the size of the 

392 square BBox that covers the source footprint. 

393 

394 Parameters 

395 ---------- 

396 bboxSize : `int` 

397 Size of a side of the square bounding box in pixels. 

398 

399 Returns 

400 ------- 

401 extent : `lsst.geom.Extent2I` 

402 Geom object representing the size of the bounding box. 

403 """ 

404 if bboxSize < self.config.minCutoutSize: 

405 extent = geom.Extent2I(self.config.minCutoutSize, 

406 self.config.minCutoutSize) 

407 elif bboxSize > self.config.maxCutoutSize: 

408 extent = geom.Extent2I(self.config.maxCutoutSize, 

409 self.config.maxCutoutSize) 

410 else: 

411 extent = geom.Extent2I(bboxSize, bboxSize) 

412 return extent 

413 

414 @timeMethod 

415 def produceAlerts(self, alerts, visit, detector, midpoint_unix, exposure_time): 

416 """Serialize alerts and send them to the alert stream using 

417 confluent_kafka's producer. 

418 

419 Parameters 

420 ---------- 

421 alerts : `dict` 

422 Dictionary of alerts to be sent to the alert stream. 

423 visit, detector : `int` 

424 Visit and detector ids of these alerts. Used to write out alerts 

425 which fail to be sent to the alert stream. 

426 """ 

427 schema_id = self.alertSchema.get_schema_id() 

428 # Serialize and send alerts with the producer timestamp. 

429 total_alerts = 0 

430 produce_start_timestamp = time.time() 

431 try: 

432 for alert in alerts: 

433 alertBytes = self._serializeAlert(alert, 

434 schema=self.alertSchema.definition, schema_id=schema_id) 

435 try: 

436 timestamp = time.time()*1000 # Current time in milliseconds 

437 headers = [("producer_timestamp", str(timestamp).encode('utf-8'))] 

438 self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback, 

439 headers=headers) 

440 self.producer.flush() 

441 total_alerts += 1 

442 

443 except KafkaException as e: 

444 self.log.warning('Kafka error: %s, message was %s bytes', 

445 e, sys.getsizeof(alertBytes)) 

446 

447 if self.config.doWriteFailedAlerts: 

448 with open(os.path.join(self.config.alertWriteLocation, 

449 f"{visit}_{detector}_{alert['diaSourceId']}.avro"), "wb") as f: 

450 f.write(alertBytes) 

451 

452 self.producer.flush() 

453 finally: 

454 produce_end_timestamp = time.time() # Current time in seconds 

455 total_time = produce_end_timestamp - (midpoint_unix + exposure_time/2.0) 

456 self.metadata['visit_midpoint'] = midpoint_unix 

457 self.metadata['produce_end_timestamp'] = produce_end_timestamp 

458 self.metadata['produce_start_timestamp'] = produce_start_timestamp 

459 self.metadata['alert_timing_since_shutter_close'] = total_time 

460 self.metadata['total_alerts'] = total_alerts 

461 # A single log message is easier for Loki to parse than timeMethod's start+end pairs. 

462 self.log.verbose("Producing alerts: Took %.4f seconds", 

463 produce_end_timestamp - produce_start_timestamp) 

464 

465 self.log.info(f"Total time since shutter close to produce alerts for" 

466 f" visit {visit} detector {detector}: {total_time} seconds") 

467 

468 def createCcdDataCutout(self, image, skyCenter, pixelCenter, extent, photoCalib, srcId, averagePsf=None, 

469 rotPa=None): 

470 """Grab an image as a cutout and return a calibrated CCDData image. 

471 

472 Parameters 

473 ---------- 

474 image : `lsst.afw.image.ExposureF` 

475 Image to pull cutout from. 

476 skyCenter : `lsst.geom.SpherePoint` 

477 Center point of DiaSource on the sky. 

478 pixelCenter : `lsst.geom.Point2D` 

479 Pixel center of DiaSource on the sky. 

480 extent : `lsst.geom.Extent2I` 

481 Bounding box to cutout from the image. 

482 photoCalib : `lsst.afw.image.PhotoCalib` 

483 Calibrate object of the image the cutout is cut from. 

484 srcId : `int` 

485 Unique id of DiaSource. Used for when an error occurs extracting 

486 a cutout. 

487 averagePsf : `numpy.array`, optional 

488 Average PSF to attach to the cutout. 

489 Used if ``self.config.useAveragePsf`` is set. 

490 cutoutType : str, optional 

491 Type of cutout being created ('difference', 'template', or 'science') 

492 

493 Returns 

494 ------- 

495 ccdData : `astropy.nddata.CCDData` or `None` 

496 CCDData object storing the calibrate information from the input 

497 difference or template image. 

498 """ 

499 imBBox = image.getBBox() 

500 if not geom.Box2D(image.getBBox()).contains(pixelCenter): 

501 self.log.warning( 

502 "DiaSource id=%i centroid lies at pixel (%.2f, %.2f) " 

503 "which is outside the Exposure with bounding box " 

504 "((%i, %i), (%i, %i)). Returning None for cutout...", 

505 srcId, pixelCenter.x, pixelCenter.y, 

506 imBBox.minX, imBBox.maxX, imBBox.minY, imBBox.maxY) 

507 return None 

508 # Catch errors in retrieving the cutout. 

509 try: 

510 cutout = image.getCutout(pixelCenter, extent) 

511 except InvalidParameterError: 

512 self.log.warning( 

513 "Failed to retrieve cutout from image for DiaSource with " 

514 "id=%i. InvalidParameterError thrown during cutout " 

515 "creation. Returning None for cutout..." 

516 % srcId) 

517 if self.config.useAveragePsf: 

518 if averagePsf is None: 

519 self.log.info("Using source id=%i to compute the average PSF.", srcId) 

520 averagePsf = self._computePsf(image, pixelCenter, srcId=srcId) 

521 cutoutPsf = averagePsf 

522 else: 

523 cutoutPsf = self._computePsf(image, pixelCenter, srcId=srcId) 

524 

525 # Find the value of the bottom corner of our cutout's BBox and 

526 # subtract 1 so that the CCDData cutout position value will be 

527 # [1, 1]. 

528 cutOutMinX = cutout.getBBox().minX - 1 

529 cutOutMinY = cutout.getBBox().minY - 1 

530 center = cutout.getWcs().skyToPixel(skyCenter) 

531 calibCutout = photoCalib.calibrateImage(cutout.getMaskedImage()) 

532 

533 cutoutWcs = wcs.WCS(naxis=2) 

534 cutoutWcs.array_shape = (cutout.getBBox().getWidth(), 

535 cutout.getBBox().getWidth()) 

536 cutoutWcs.wcs.crpix = [center.x - cutOutMinX, center.y - cutOutMinY] 

537 cutoutWcs.wcs.crval = [skyCenter.getRa().asDegrees(), 

538 skyCenter.getDec().asDegrees()] 

539 cutoutWcs.wcs.cd = self.makeLocalTransformMatrix(cutout.getWcs(), 

540 center, 

541 skyCenter) 

542 ccdData = CCDData( 

543 data=calibCutout.getImage().array, 

544 uncertainty=VarianceUncertainty(calibCutout.getVariance().array), 

545 flags=calibCutout.getMask().array, 

546 wcs=cutoutWcs, 

547 psf=cutoutPsf, 

548 meta={"cutMinX": cutOutMinX, 

549 "cutMinY": cutOutMinY}, 

550 unit=u.nJy) 

551 

552 ccdData.header['ROTPA'] = (rotPa, 'Pos angle in deg of focal plane +Y wrt North') 

553 

554 return ccdData 

555 

556 def makeLocalTransformMatrix(self, wcs, center, skyCenter): 

557 """Create a local, linear approximation of the wcs transformation 

558 matrix. 

559 

560 The approximation is created as if the center is at RA=0, DEC=0. All 

561 comparing x,y coordinate are relative to the position of center. Matrix 

562 is initially calculated with units arcseconds and then converted to 

563 degrees. This yields higher precision results due to quirks in AST. 

564 

565 Parameters 

566 ---------- 

567 wcs : `lsst.afw.geom.SkyWcs` 

568 Wcs to approximate 

569 center : `lsst.geom.Point2D` 

570 Point at which to evaluate the LocalWcs. 

571 skyCenter : `lsst.geom.SpherePoint` 

572 Point on sky to approximate the Wcs. 

573 

574 Returns 

575 ------- 

576 localMatrix : `numpy.ndarray` 

577 Matrix representation the local wcs approximation with units 

578 degrees. 

579 """ 

580 blankCDMatrix = [[self._scale, 0], [0, self._scale]] 

581 localGnomonicWcs = afwGeom.makeSkyWcs( 

582 center, skyCenter, blankCDMatrix) 

583 measurementToLocalGnomonic = wcs.getTransform().then( 

584 localGnomonicWcs.getTransform().inverted() 

585 ) 

586 localMatrix = measurementToLocalGnomonic.getJacobian(center) 

587 return localMatrix / 3600 

588 

589 def makeAlertDict(self, 

590 diaSourceId, 

591 observationReason, 

592 targetName, 

593 diaSource, 

594 diaObject, 

595 objDiaSrcHistory, 

596 objDiaForcedSources, 

597 diffImCutout, 

598 calexpCutout, 

599 templateCutout, 

600 ssSource=None): 

601 """Convert data and package into a dictionary alert. 

602 

603 Parameters 

604 ---------- 

605 diaSourceId : `int` 

606 Unique identifier of the triggering diaSource 

607 diaSource : `pandas.DataFrame` 

608 New single DiaSource to package. 

609 observationReason : `str` 

610 Scheduler reason for the image containing this diaSource. 

611 targetName : `str` 

612 Scheduler target for the image containing this diaSource. 

613 diaObject : `pandas.DataFrame` 

614 DiaObject that ``diaSource`` is matched to. 

615 objDiaSrcHistory : `pandas.DataFrame` 

616 12 month history of ``diaObject`` excluding the latest DiaSource. 

617 objDiaForcedSources : `pandas.DataFrame` 

618 12 month history of ``diaObject`` forced measurements. 

619 diffImCutout : `astropy.nddata.CCDData` or `None` 

620 Cutout of the difference image around the location of ``diaSource`` 

621 with a min size set by the ``cutoutSize`` configurable. 

622 calexpCutout : `astropy.nddata.CCDData` or `None` 

623 Cutout of the calexp around the location of ``diaSource`` 

624 with a min size set by the ``cutoutSize`` configurable. 

625 templateCutout : `astropy.nddata.CCDData` or `None` 

626 Cutout of the template image around the location of ``diaSource`` 

627 with a min size set by the ``cutoutSize`` configurable. 

628 """ 

629 alert = dict() 

630 alert['diaSourceId'] = diaSourceId 

631 # alerts should have NULL if scheduler fields are not populated 

632 if not len(observationReason): 

633 observationReason = None 

634 if not len(targetName): 

635 targetName = None 

636 alert['observation_reason'] = observationReason 

637 alert['target_name'] = targetName 

638 alert['diaSource'] = diaSource.to_dict() 

639 

640 if objDiaSrcHistory is None: 

641 alert['prvDiaSources'] = None 

642 else: 

643 alert['prvDiaSources'] = objDiaSrcHistory.to_dict("records") 

644 

645 if isinstance(objDiaForcedSources, pd.Series): 

646 alert['prvDiaForcedSources'] = [objDiaForcedSources.to_dict()] 

647 elif objDiaForcedSources is None: 

648 alert['prvDiaForcedSources'] = None 

649 else: 

650 alert['prvDiaForcedSources'] = objDiaForcedSources.to_dict("records") 

651 

652 alert['prvDiaNondetectionLimits'] = None 

653 

654 if diaObject is None: 

655 alert['diaObject'] = None 

656 else: 

657 alert['diaObject'] = diaObject.to_dict() 

658 

659 if ssSource is None: 

660 alert['ssSource'] = None 

661 alert['MPCORB'] = None 

662 else: 

663 mpcorbColumns = [col for col in ssSource if col[:7] == 'MPCORB_'] 

664 mpcOrbit = {key[7:]: ssSource[key] for key in mpcorbColumns} 

665 

666 fields_to_cast = [ 

667 "arc_length_total", "arc_length_sel", "a", "mean_anomaly", "period", "mean_motion", "a_unc", 

668 "mean_anomaly_unc", "period_unc", "mean_motion_unc", "yarkovsky", "srp", "a1", "a2", "a3", 

669 "dt", "yarkovsky_unc", "srp_unc", "a1_unc", "a2_unc", "a3_unc", "dt_unc", 

670 "not_normalized_rms", "earth_moid" 

671 ] 

672 for key in fields_to_cast: 

673 if key in mpcOrbit and mpcOrbit[key] is not None: 

674 mpcOrbit[key] = float(mpcOrbit[key]) 

675 

676 if 'packed_primary_provisional_designation' not in mpcOrbit: 

677 unpacked_desig = ssSource['designation'] 

678 packed_desig = ss_object_id_to_obj_id(ssSource['ssObjectId'], packed=True) 

679 mpcOrbit['designation'] = unpacked_desig 

680 mpcOrbit['packed_primary_provisional_designation'] = packed_desig 

681 mpcOrbit['unpacked_primary_provisional_designation'] = unpacked_desig 

682 

683 mpcOrbit['id'] = 0 

684 ssSource = {key: ssSource[key] for key in ssSource if key not in mpcorbColumns} 

685 ssSource['diaSourceId'] = diaSourceId 

686 alert['ssSource'] = ssSource 

687 alert['MPCORB'] = mpcOrbit 

688 

689 if diffImCutout is None: 

690 alert['cutoutDifference'] = None 

691 else: 

692 alert['cutoutDifference'] = self.streamCcdDataToBytes(diffImCutout) 

693 

694 if calexpCutout is None: 

695 alert['cutoutScience'] = None 

696 else: 

697 alert['cutoutScience'] = self.streamCcdDataToBytes(calexpCutout) 

698 

699 if templateCutout is None: 

700 alert["cutoutTemplate"] = None 

701 else: 

702 alert["cutoutTemplate"] = self.streamCcdDataToBytes(templateCutout) 

703 

704 return alert 

705 

706 def streamCcdDataToBytes(self, cutout): 

707 """Serialize a cutout into bytes. 

708 

709 Parameters 

710 ---------- 

711 cutout : `astropy.nddata.CCDData` 

712 Cutout to serialize. 

713 

714 Returns 

715 ------- 

716 coutputBytes : `bytes` 

717 Input cutout serialized into byte data. 

718 """ 

719 with io.BytesIO() as streamer: 

720 cutout.write(streamer, format="fits") 

721 cutoutBytes = streamer.getvalue() 

722 return cutoutBytes 

723 

724 def _serializeAlert(self, alert, schema=None, schema_id=0): 

725 """Serialize an alert to a byte sequence for sending to Kafka. 

726 

727 Parameters 

728 ---------- 

729 alert : `dict` 

730 An alert payload to be serialized. 

731 schema : `dict`, optional 

732 An Avro schema definition describing how to encode `alert`. By default, 

733 the schema is None, which sets it to the latest schema available. 

734 schema_id : `int`, optional 

735 The Confluent Schema Registry ID of the schema. By default, 0 (an 

736 invalid ID) is used, indicating that the schema is not registered. 

737 

738 Returns 

739 ------- 

740 serialized : `bytes` 

741 The byte sequence describing the alert, including the Confluent Wire 

742 Format prefix. 

743 """ 

744 if schema is None: 

745 schema = self.alertSchema.definition 

746 

747 buf = io.BytesIO() 

748 buf.write(self._serializeConfluentWireHeader(schema_id)) 

749 fastavro.schemaless_writer(buf, schema, alert) 

750 return buf.getvalue() 

751 

752 @staticmethod 

753 def _serializeConfluentWireHeader(schema_version): 

754 """Returns the byte prefix for Confluent Wire Format-style Kafka messages. 

755 

756 Parameters 

757 ---------- 

758 schema_version : `int` 

759 A version number which indicates the Confluent Schema Registry ID 

760 number of the Avro schema used to encode the message that follows this 

761 header. 

762 

763 Returns 

764 ------- 

765 header : `bytes` 

766 The 5-byte encoded message prefix. 

767 

768 Notes 

769 ----- 

770 The Confluent Wire Format is described more fully here: 

771 https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format 

772 """ 

773 ConfluentWireFormatHeader = struct.Struct(">bi") 

774 return ConfluentWireFormatHeader.pack(0, schema_version) 

775 

776 def _delivery_callback(self, err, msg): 

777 if err: 

778 if err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART: 

779 topic = msg.topic() if msg and hasattr(msg, 

780 'topic') else 'unknown' 

781 self.log.warning( 

782 'Message failed delivery. Topic %s unknown: %s', 

783 topic, err) 

784 else: 

785 self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset()) 

786 

787 def _server_check(self): 

788 """Checks if the alert stream credentials are still valid and the 

789 server is contactable. 

790 

791 Raises 

792 ------- 

793 KafkaException 

794 Raised if the server us not contactable. 

795 RuntimeError 

796 Raised if the server is contactable but there are no topics 

797 present. 

798 """ 

799 admin_client = AdminClient(self.kafkaAdminConfig) 

800 topics = admin_client.list_topics(timeout=self.config.maxTimeout).topics 

801 

802 if not topics: 

803 raise RuntimeError() 

804 if self.kafkaTopic not in topics: 

805 raise RuntimeError( 

806 f"Topic '{self.kafkaTopic}' not found on the Kafka server. " 

807 f"Check that the correct topic has been provided and exists " 

808 f"on the Kafka server.") 

809 

810 def _computePsf(self, exposure, pixelCenter, srcId=None): 

811 """Compute the PSF at a location and catch errors. 

812 

813 Parameters 

814 ---------- 

815 exposure : `lsst.afw.image.Exposure` 

816 The image to compute the PSF for. 

817 pixelCenter : `lsst.geom.Point2D` 

818 The location on the image to compute the PSF. 

819 srcId : `int`, optional 

820 Unique id of DiaSource. Used for when an error occurs extracting 

821 a cutout. 

822 

823 Returns 

824 ------- 

825 cutoutPsf : `numpy.array` 

826 Array of the PSF values. 

827 """ 

828 try: 

829 # use exposure.psf.computeKernelImage to provide PSF centered in the array 

830 cutoutPsf = exposure.psf.computeKernelImage(pixelCenter).array 

831 except InvalidParameterError: 

832 if srcId is not None: 

833 msg = "Could not calculate PSF for DiaSource with "\ 

834 "id=%i. InvalidParameterError encountered. Exiting."\ 

835 % srcId 

836 else: 

837 msg = "Could not calculate average PSF for the image" 

838 self.log.warning(msg) 

839 cutoutPsf = None 

840 except InvalidPsfError: 

841 if srcId is not None: 

842 msg = "Could not calculate PSF for DiaSource with "\ 

843 "id=%i. InvalidPsfError encountered. Exiting."\ 

844 % srcId 

845 else: 

846 msg = "Could not calculate average PSF for the image" 

847 self.log.warning(msg) 

848 cutoutPsf = None 

849 return cutoutPsf