Coverage for python / lsst / ap / association / packageAlerts.py: 15%

299 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:39 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ("PackageAlertsConfig", "PackageAlertsTask") 

23 

24import io 

25import os 

26import sys 

27import time 

28 

29from astropy import wcs 

30import astropy.units as u 

31from astropy.nddata import CCDData, VarianceUncertainty 

32import pandas as pd 

33import struct 

34import fastavro 

35from confluent_kafka.cimpl import KafkaError 

36 

37# confluent_kafka is not in the standard Rubin environment as it is a third 

38# party package and is only needed when producing alerts. 

39try: 

40 import confluent_kafka 

41 from confluent_kafka import KafkaException 

42 from confluent_kafka.admin import AdminClient 

43except ImportError: 

44 confluent_kafka = None 

45 

46import lsst.alert.packet as alertPack 

47from lsst.afw.detection import InvalidPsfError 

48import lsst.afw.geom as afwGeom 

49import lsst.geom as geom 

50import lsst.pex.config as pexConfig 

51from lsst.pex.exceptions import InvalidParameterError 

52import lsst.pipe.base as pipeBase 

53from lsst.pipe.tasks.associationUtils import ss_object_id_to_obj_id 

54import lsst.utils.logging 

55from lsst.utils.timer import timeMethod 

56 

57 

58class PackageAlertsConfig(pexConfig.Config): 

59 """Config class for AssociationTask. 

60 """ 

61 schemaFile = pexConfig.Field( 

62 dtype=str, 

63 doc="Schema definition file URI for the avro alerts.", 

64 default=str(alertPack.get_uri_to_latest_schema()) 

65 ) 

66 minCutoutSize = pexConfig.RangeField( 

67 dtype=int, 

68 min=0, 

69 max=1000, 

70 default=30, 

71 doc="Dimension of the square image cutouts to package in the alert." 

72 ) 

73 maxCutoutSize = pexConfig.RangeField( 

74 dtype=int, 

75 min=0, 

76 max=1000, 

77 default=102, 

78 doc="Dimension of the square image cutouts to package in the alert. The" 

79 "default size comes from the max trail length in arcseconds " 

80 "(10 deg/day) for a 30 second observation divided by the" 

81 "arcseconds per pixel (0.2), which is 62.5 pixels. The effective" 

82 "size of the psf (40 pixels) is then added for a total of 102 pixels. " 

83 ) 

84 alertWriteLocation = pexConfig.Field( 

85 dtype=str, 

86 doc="Location to write alerts to.", 

87 default=os.path.join(os.getcwd(), "alerts"), 

88 ) 

89 

90 doProduceAlerts = pexConfig.Field( 

91 dtype=bool, 

92 doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.", 

93 default=False, 

94 ) 

95 

96 doWriteAlerts = pexConfig.Field( 

97 dtype=bool, 

98 doc="Write alerts to disk if true.", 

99 default=False, 

100 ) 

101 

102 doWriteFailedAlerts = pexConfig.Field( 

103 dtype=bool, 

104 doc="If an alert cannot be sent when doProduceAlerts is set, " 

105 "write it to disk for debugging purposes.", 

106 default=False, 

107 ) 

108 

109 maxTimeout = pexConfig.Field( 

110 dtype=float, 

111 doc="Sets the maximum time in seconds to wait for the alert stream " 

112 "broker to respond to a query before timing out.", 

113 default=15.0, 

114 ) 

115 

116 deliveryTimeout = pexConfig.Field( 

117 dtype=float, 

118 doc="Sets the time to wait for the producer to wait to deliver an " 

119 "alert in milliseconds.", 

120 default=1200.0, 

121 ) 

122 

123 useAveragePsf = pexConfig.Field( 

124 dtype=bool, 

125 doc="Use the average PSF for the image, instead of the PSF for each cutout. " 

126 "This option is much less accurate, but much faster.", 

127 default=False, 

128 ) 

129 

130 

131class PackageAlertsTask(pipeBase.Task): 

132 """Tasks for packaging Dia and Pipelines data into Avro alert packages. 

133 """ 

134 ConfigClass = PackageAlertsConfig 

135 _DefaultName = "packageAlerts" 

136 

137 _scale = (1.0 * geom.arcseconds).asDegrees() 

138 

139 def __init__(self, **kwargs): 

140 super().__init__(**kwargs) 

141 self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile) 

142 os.makedirs(self.config.alertWriteLocation, exist_ok=True) 

143 

144 if self.config.doProduceAlerts: 

145 if confluent_kafka is not None: 

146 self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD") 

147 if not self.password: 

148 raise ValueError("Kafka password environment variable was not set.") 

149 self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME") 

150 if not self.username: 

151 raise ValueError("Kafka username environment variable was not set.") 

152 self.server = os.getenv("AP_KAFKA_SERVER") 

153 if not self.server: 

154 raise ValueError("Kafka server environment variable was not set.") 

155 self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC") 

156 if not self.kafkaTopic: 

157 raise ValueError("Kafka topic environment variable was not set.") 

158 

159 # confluent_kafka configures all of its classes with dictionaries. This one 

160 # sets up the bare minimum that is needed. 

161 self.kafkaConfig = { 

162 # This is the URL to use to connect to the Kafka cluster. 

163 "bootstrap.servers": self.server, 

164 # These next two properties tell the Kafka client about the specific 

165 # authentication and authorization protocols that should be used when 

166 # connecting. 

167 "security.protocol": "SASL_PLAINTEXT", 

168 "sasl.mechanisms": "SCRAM-SHA-512", 

169 # The sasl.username and sasl.password are passed through over 

170 # SCRAM-SHA-512 auth to connect to the cluster. The username is not 

171 # sensitive, but the password is (of course) a secret value which 

172 # should never be committed to source code. 

173 "sasl.username": self.username, 

174 "sasl.password": self.password, 

175 # Batch size limits the largest size of a kafka alert that can be sent. 

176 # We set the batch size to 2 Mb. 

177 "batch.size": 2097152, 

178 "linger.ms": 5, 

179 "delivery.timeout.ms": self.config.deliveryTimeout, 

180 # Compression options are snappy, lz4, zstd, and gzip. 

181 "compression.type": 'snappy' 

182 } 

183 self.kafkaAdminConfig = { 

184 # This is the URL to use to connect to the Kafka cluster. 

185 "bootstrap.servers": self.server, 

186 # These next two properties tell the Kafka client about the specific 

187 # authentication and authorization protocols that should be used when 

188 # connecting. 

189 "security.protocol": "SASL_PLAINTEXT", 

190 "sasl.mechanisms": "SCRAM-SHA-512", 

191 # The sasl.username and sasl.password are passed through over 

192 # SCRAM-SHA-512 auth to connect to the cluster. The username is not 

193 # sensitive, but the password is (of course) a secret value which 

194 # should never be committed to source code. 

195 "sasl.username": self.username, 

196 "sasl.password": self.password, 

197 } 

198 

199 self._server_check() 

200 self.producer = confluent_kafka.Producer(**self.kafkaConfig) 

201 

202 else: 

203 raise RuntimeError("Produce alerts is set but confluent_kafka is not present in " 

204 "the environment. Alerts will not be sent to the alert stream.") 

205 

206 @timeMethod 

207 def run(self, 

208 diaSourceCat, 

209 diaObjectCat, 

210 diaSrcHistory, 

211 diaForcedSources, 

212 diffIm, 

213 calexp, 

214 template, 

215 ssSrc=None, 

216 doRunForcedMeasurement=True, 

217 forcedSourceHistoryThreshold=0, 

218 ): 

219 """Package DiaSources/Object and exposure data into Avro alerts. 

220 

221 Alerts can be sent to the alert stream if ``doProduceAlerts`` is set 

222 and written to disk if ``doWriteAlerts`` is set. Both can be set at the 

223 same time, and are independent of one another. 

224 

225 Writes Avro alerts to a location determined by the 

226 ``alertWriteLocation`` configurable. 

227 

228 Parameters 

229 ---------- 

230 diaSourceCat : `pandas.DataFrame` 

231 New DiaSources to package. DataFrame should be indexed on 

232 ``["diaObjectId", "band", "diaSourceId"]`` 

233 diaObjectCat : `pandas.DataFrame` 

234 New and updated DiaObjects matched to the new DiaSources. DataFrame 

235 is indexed on ``["diaObjectId"]`` 

236 diaSrcHistory : `pandas.DataFrame` 

237 12 month history of DiaSources matched to the DiaObjects. Excludes 

238 the newest DiaSource and is indexed on 

239 ``["diaObjectId", "band", "diaSourceId"]`` 

240 diaForcedSources : `pandas.DataFrame` 

241 12 month history of DiaForcedSources matched to the DiaObjects. 

242 ``["diaObjectId"]`` 

243 diffIm : `lsst.afw.image.ExposureF` 

244 Difference image the sources in ``diaSourceCat`` were detected in. 

245 calexp : `lsst.afw.image.ExposureF` 

246 Calexp used to create the ``diffIm``. 

247 template : `lsst.afw.image.ExposureF` or `None` 

248 Template image used to create the ``diffIm``. 

249 ssSrc : `astropy.table.Table`, optional 

250 Solar system specific information for diaSources associated to ssObjects. 

251 doRunForcedMeasurement : `bool`, optional 

252 Flag to indicate whether forced measurement was run. 

253 This should only be turned off for debugging purposes. 

254 Added to allow disabling forced sources for performance 

255 reasons during the ops rehearsal. 

256 forcedSourceHistoryThreshold : `int`, optional 

257 Minimum number of detections of a diaObject required 

258 to run forced photometry. Set to 1 to include all diaObjects. 

259 """ 

260 

261 alerts = [] 

262 self._patchDiaSources(diaSourceCat) 

263 self._patchDiaSources(diaSrcHistory) 

264 detector = diffIm.detector.getId() 

265 visit = diffIm.visitInfo.id 

266 observation_reason = diffIm.visitInfo.getObservationReason() 

267 target_name = diffIm.visitInfo.getObject() 

268 midpoint_unix = diffIm.visitInfo.date.toAstropy().tai.unix 

269 exposure_time = diffIm.visitInfo.exposureTime 

270 diffImPhotoCalib = diffIm.getPhotoCalib() 

271 calexpPhotoCalib = calexp.getPhotoCalib() 

272 templatePhotoCalib = template.getPhotoCalib() 

273 diffImPsf = self._computePsf(diffIm, diffIm.psf.getAveragePosition()) 

274 sciencePsf = self._computePsf(calexp, calexp.psf.getAveragePosition()) 

275 templatePsf = self._computePsf(template, template.psf.getAveragePosition()) 

276 

277 # cast to object; works around pandas coercing Series to nullable 

278 # Float64 type when selecting single rows below 

279 diaSourceCat = diaSourceCat.astype('object') 

280 diaObjectCat = diaObjectCat.astype('object') 

281 diaSrcHistory = diaSrcHistory.astype('object') 

282 diaForcedSources = diaForcedSources.astype('object') 

283 

284 if ssSrc is not None: 

285 ssSrc = ssSrc.set_index('diaSourceId') 

286 ssSrc = ssSrc.astype('object') 

287 

288 n_sources = len(diaSourceCat) 

289 self.log.info("Packaging alerts for %d DiaSources.", n_sources) 

290 # Log every 10 seconds as proof of liveness. 

291 loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0) 

292 

293 for srcIndex, diaSource in diaSourceCat.iterrows(): 

294 loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources) 

295 

296 alertId = diaSource["diaSourceId"] 

297 if srcIndex[0] == 0: 

298 diaObject = None 

299 objSourceHistory = None 

300 objDiaForcedSources = None 

301 ssSource = ssSrc.loc[[alertId]].to_dict("records")[0] 

302 else: 

303 ssSource = None 

304 diaObject = diaObjectCat.loc[srcIndex[0]] 

305 if diaObject["nDiaSources"] > 1: 

306 objSourceHistory = diaSrcHistory.loc[srcIndex[0]] 

307 else: 

308 objSourceHistory = None 

309 if doRunForcedMeasurement and diaObject["nDiaSources"] >= forcedSourceHistoryThreshold: 

310 if srcIndex[0] in diaForcedSources.diaObjectId: 

311 objDiaForcedSources = diaForcedSources.loc[srcIndex[0]] 

312 else: 

313 # Send empty table with correct columns 

314 objDiaForcedSources = diaForcedSources.loc[[]] 

315 else: 

316 # Send empty table with correct columns 

317 objDiaForcedSources = diaForcedSources.loc[[]] 

318 

319 sphPoint = geom.SpherePoint(diaSource["ra"], 

320 diaSource["dec"], 

321 geom.degrees) 

322 pixelPoint = geom.Point2D(diaSource["x"], 

323 diaSource["y"]) 

324 

325 cutoutExtent = self.createDiaSourceExtent(diaSource["bboxSize"]) 

326 diffImCutout = self.createCcdDataCutout( 

327 diffIm, 

328 sphPoint, 

329 pixelPoint, 

330 cutoutExtent, 

331 diffImPhotoCalib, 

332 diaSource["diaSourceId"], 

333 averagePsf=diffImPsf, 

334 rotPa=diffIm.visitInfo.boresightRotAngle.asDegrees()) 

335 calexpCutout = self.createCcdDataCutout( 

336 calexp, 

337 sphPoint, 

338 pixelPoint, 

339 cutoutExtent, 

340 calexpPhotoCalib, 

341 diaSource["diaSourceId"], 

342 averagePsf=sciencePsf, 

343 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees()) 

344 templateCutout = self.createCcdDataCutout( 

345 template, 

346 sphPoint, 

347 pixelPoint, 

348 cutoutExtent, 

349 templatePhotoCalib, 

350 diaSource["diaSourceId"], 

351 averagePsf=templatePsf, 

352 rotPa=calexp.visitInfo.boresightRotAngle.asDegrees()) 

353 

354 alerts.append( 

355 self.makeAlertDict(diaSource["diaSourceId"], 

356 observation_reason, 

357 target_name, 

358 diaSource, 

359 diaObject, 

360 objSourceHistory, 

361 objDiaForcedSources, 

362 diffImCutout, 

363 calexpCutout, 

364 templateCutout, 

365 ssSource)) 

366 

367 if self.config.doProduceAlerts: 

368 self.log.info("Producing alerts to %s.", self.kafkaTopic) 

369 self.produceAlerts(alerts, visit, detector, midpoint_unix, exposure_time) 

370 else: 

371 # Fill values for the metadata so that downstream metrics don't crash 

372 self.metadata['visit_midpoint'] = midpoint_unix 

373 self.metadata['produce_end_timestamp'] = -1 

374 self.metadata['produce_start_timestamp'] = -1 

375 self.metadata['alert_timing_since_shutter_close'] = -1 

376 self.metadata['total_alerts'] = -1 

377 

378 if self.config.doWriteAlerts: 

379 avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro") 

380 self.log.info("Writing alerts to %s.", avro_path) 

381 with open(avro_path, "wb") as f: 

382 self.alertSchema.store_alerts(f, alerts) 

383 

384 def _patchDiaSources(self, diaSources): 

385 """Add the ``programId`` column to the data. 

386 

387 Parameters 

388 ---------- 

389 diaSources : `pandas.DataFrame` 

390 DataFrame of DiaSources to patch. 

391 """ 

392 diaSources["programId"] = 0 

393 

394 def createDiaSourceExtent(self, bboxSize): 

395 """Create an extent for a box for the cutouts given the size of the 

396 square BBox that covers the source footprint. 

397 

398 Parameters 

399 ---------- 

400 bboxSize : `int` 

401 Size of a side of the square bounding box in pixels. 

402 

403 Returns 

404 ------- 

405 extent : `lsst.geom.Extent2I` 

406 Geom object representing the size of the bounding box. 

407 """ 

408 if bboxSize < self.config.minCutoutSize: 

409 extent = geom.Extent2I(self.config.minCutoutSize, 

410 self.config.minCutoutSize) 

411 elif bboxSize > self.config.maxCutoutSize: 

412 extent = geom.Extent2I(self.config.maxCutoutSize, 

413 self.config.maxCutoutSize) 

414 else: 

415 extent = geom.Extent2I(bboxSize, bboxSize) 

416 return extent 

417 

418 @timeMethod 

419 def produceAlerts(self, alerts, visit, detector, midpoint_unix, exposure_time): 

420 """Serialize alerts and send them to the alert stream using 

421 confluent_kafka's producer. 

422 

423 Parameters 

424 ---------- 

425 alerts : `dict` 

426 Dictionary of alerts to be sent to the alert stream. 

427 visit, detector : `int` 

428 Visit and detector ids of these alerts. Used to write out alerts 

429 which fail to be sent to the alert stream. 

430 """ 

431 schema_id = self.alertSchema.get_schema_id() 

432 # Serialize and send alerts with the producer timestamp. 

433 total_alerts = 0 

434 produce_start_timestamp = time.time() 

435 try: 

436 for alert in alerts: 

437 alertBytes = self._serializeAlert(alert, 

438 schema=self.alertSchema.definition, schema_id=schema_id) 

439 try: 

440 timestamp = time.time()*1000 # Current time in milliseconds 

441 headers = [("producer_timestamp", str(timestamp).encode('utf-8'))] 

442 self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback, 

443 headers=headers) 

444 self.producer.flush() 

445 total_alerts += 1 

446 

447 except KafkaException as e: 

448 self.log.warning('Kafka error: %s, message was %s bytes', 

449 e, sys.getsizeof(alertBytes)) 

450 

451 if self.config.doWriteFailedAlerts: 

452 with open(os.path.join(self.config.alertWriteLocation, 

453 f"{visit}_{detector}_{alert['diaSourceId']}.avro"), "wb") as f: 

454 f.write(alertBytes) 

455 

456 self.producer.flush() 

457 finally: 

458 produce_end_timestamp = time.time() # Current time in seconds 

459 total_time = produce_end_timestamp - (midpoint_unix + exposure_time/2.0) 

460 self.metadata['visit_midpoint'] = midpoint_unix 

461 self.metadata['produce_end_timestamp'] = produce_end_timestamp 

462 self.metadata['produce_start_timestamp'] = produce_start_timestamp 

463 self.metadata['alert_timing_since_shutter_close'] = total_time 

464 self.metadata['total_alerts'] = total_alerts 

465 # A single log message is easier for Loki to parse than timeMethod's start+end pairs. 

466 self.log.verbose("Producing alerts: Took %.4f seconds", 

467 produce_end_timestamp - produce_start_timestamp) 

468 

469 self.log.info(f"Total time since shutter close to produce alerts for" 

470 f" visit {visit} detector {detector}: {total_time} seconds") 

471 

472 def createCcdDataCutout(self, image, skyCenter, pixelCenter, extent, photoCalib, srcId, averagePsf=None, 

473 rotPa=None): 

474 """Grab an image as a cutout and return a calibrated CCDData image. 

475 

476 Parameters 

477 ---------- 

478 image : `lsst.afw.image.ExposureF` 

479 Image to pull cutout from. 

480 skyCenter : `lsst.geom.SpherePoint` 

481 Center point of DiaSource on the sky. 

482 pixelCenter : `lsst.geom.Point2D` 

483 Pixel center of DiaSource on the sky. 

484 extent : `lsst.geom.Extent2I` 

485 Bounding box to cutout from the image. 

486 photoCalib : `lsst.afw.image.PhotoCalib` 

487 Calibrate object of the image the cutout is cut from. 

488 srcId : `int` 

489 Unique id of DiaSource. Used for when an error occurs extracting 

490 a cutout. 

491 averagePsf : `numpy.array`, optional 

492 Average PSF to attach to the cutout. 

493 Used if ``self.config.useAveragePsf`` is set. 

494 cutoutType : str, optional 

495 Type of cutout being created ('difference', 'template', or 'science') 

496 

497 Returns 

498 ------- 

499 ccdData : `astropy.nddata.CCDData` or `None` 

500 CCDData object storing the calibrate information from the input 

501 difference or template image. 

502 """ 

503 imBBox = image.getBBox() 

504 if not geom.Box2D(image.getBBox()).contains(pixelCenter): 

505 self.log.warning( 

506 "DiaSource id=%i centroid lies at pixel (%.2f, %.2f) " 

507 "which is outside the Exposure with bounding box " 

508 "((%i, %i), (%i, %i)). Returning None for cutout...", 

509 srcId, pixelCenter.x, pixelCenter.y, 

510 imBBox.minX, imBBox.maxX, imBBox.minY, imBBox.maxY) 

511 return None 

512 # Catch errors in retrieving the cutout. 

513 try: 

514 cutout = image.getCutout(pixelCenter, extent) 

515 except InvalidParameterError: 

516 self.log.warning( 

517 "Failed to retrieve cutout from image for DiaSource with " 

518 "id=%i. InvalidParameterError thrown during cutout " 

519 "creation. Returning None for cutout..." 

520 % srcId) 

521 if self.config.useAveragePsf: 

522 if averagePsf is None: 

523 self.log.info("Using source id=%i to compute the average PSF.", srcId) 

524 averagePsf = self._computePsf(image, pixelCenter, srcId=srcId) 

525 cutoutPsf = averagePsf 

526 else: 

527 cutoutPsf = self._computePsf(image, pixelCenter, srcId=srcId) 

528 

529 # Find the value of the bottom corner of our cutout's BBox and 

530 # subtract 1 so that the CCDData cutout position value will be 

531 # [1, 1]. 

532 cutOutMinX = cutout.getBBox().minX - 1 

533 cutOutMinY = cutout.getBBox().minY - 1 

534 center = cutout.getWcs().skyToPixel(skyCenter) 

535 calibCutout = photoCalib.calibrateImage(cutout.getMaskedImage()) 

536 

537 cutoutWcs = wcs.WCS(naxis=2) 

538 cutoutWcs.array_shape = (cutout.getBBox().getWidth(), 

539 cutout.getBBox().getWidth()) 

540 cutoutWcs.wcs.crpix = [center.x - cutOutMinX, center.y - cutOutMinY] 

541 cutoutWcs.wcs.crval = [skyCenter.getRa().asDegrees(), 

542 skyCenter.getDec().asDegrees()] 

543 cutoutWcs.wcs.cd = self.makeLocalTransformMatrix(cutout.getWcs(), 

544 center, 

545 skyCenter) 

546 ccdData = CCDData( 

547 data=calibCutout.getImage().array, 

548 uncertainty=VarianceUncertainty(calibCutout.getVariance().array), 

549 flags=calibCutout.getMask().array, 

550 wcs=cutoutWcs, 

551 psf=cutoutPsf, 

552 meta={"cutMinX": cutOutMinX, 

553 "cutMinY": cutOutMinY}, 

554 unit=u.nJy) 

555 

556 ccdData.header['ROTPA'] = (rotPa, 'Pos angle in deg of focal plane +Y wrt North') 

557 

558 return ccdData 

559 

560 def makeLocalTransformMatrix(self, wcs, center, skyCenter): 

561 """Create a local, linear approximation of the wcs transformation 

562 matrix. 

563 

564 The approximation is created as if the center is at RA=0, DEC=0. All 

565 comparing x,y coordinate are relative to the position of center. Matrix 

566 is initially calculated with units arcseconds and then converted to 

567 degrees. This yields higher precision results due to quirks in AST. 

568 

569 Parameters 

570 ---------- 

571 wcs : `lsst.afw.geom.SkyWcs` 

572 Wcs to approximate 

573 center : `lsst.geom.Point2D` 

574 Point at which to evaluate the LocalWcs. 

575 skyCenter : `lsst.geom.SpherePoint` 

576 Point on sky to approximate the Wcs. 

577 

578 Returns 

579 ------- 

580 localMatrix : `numpy.ndarray` 

581 Matrix representation the local wcs approximation with units 

582 degrees. 

583 """ 

584 blankCDMatrix = [[self._scale, 0], [0, self._scale]] 

585 localGnomonicWcs = afwGeom.makeSkyWcs( 

586 center, skyCenter, blankCDMatrix) 

587 measurementToLocalGnomonic = wcs.getTransform().then( 

588 localGnomonicWcs.getTransform().inverted() 

589 ) 

590 localMatrix = measurementToLocalGnomonic.getJacobian(center) 

591 return localMatrix / 3600 

592 

593 def makeAlertDict(self, 

594 diaSourceId, 

595 observationReason, 

596 targetName, 

597 diaSource, 

598 diaObject, 

599 objDiaSrcHistory, 

600 objDiaForcedSources, 

601 diffImCutout, 

602 calexpCutout, 

603 templateCutout, 

604 ssSource=None): 

605 """Convert data and package into a dictionary alert. 

606 

607 Parameters 

608 ---------- 

609 diaSourceId : `int` 

610 Unique identifier of the triggering diaSource 

611 diaSource : `pandas.DataFrame` 

612 New single DiaSource to package. 

613 observationReason : `str` 

614 Scheduler reason for the image containing this diaSource. 

615 targetName : `str` 

616 Scheduler target for the image containing this diaSource. 

617 diaObject : `pandas.DataFrame` 

618 DiaObject that ``diaSource`` is matched to. 

619 objDiaSrcHistory : `pandas.DataFrame` 

620 12 month history of ``diaObject`` excluding the latest DiaSource. 

621 objDiaForcedSources : `pandas.DataFrame` 

622 12 month history of ``diaObject`` forced measurements. 

623 diffImCutout : `astropy.nddata.CCDData` or `None` 

624 Cutout of the difference image around the location of ``diaSource`` 

625 with a min size set by the ``cutoutSize`` configurable. 

626 calexpCutout : `astropy.nddata.CCDData` or `None` 

627 Cutout of the calexp around the location of ``diaSource`` 

628 with a min size set by the ``cutoutSize`` configurable. 

629 templateCutout : `astropy.nddata.CCDData` or `None` 

630 Cutout of the template image around the location of ``diaSource`` 

631 with a min size set by the ``cutoutSize`` configurable. 

632 """ 

633 alert = dict() 

634 alert['diaSourceId'] = diaSourceId 

635 # alerts should have NULL if scheduler fields are not populated 

636 if not len(observationReason): 

637 observationReason = None 

638 if not len(targetName): 

639 targetName = None 

640 alert['observation_reason'] = observationReason 

641 alert['target_name'] = targetName 

642 alert['diaSource'] = diaSource.to_dict() 

643 

644 if objDiaSrcHistory is None: 

645 alert['prvDiaSources'] = None 

646 else: 

647 alert['prvDiaSources'] = objDiaSrcHistory.to_dict("records") 

648 

649 if isinstance(objDiaForcedSources, pd.Series): 

650 alert['prvDiaForcedSources'] = [objDiaForcedSources.to_dict()] 

651 elif objDiaForcedSources is None: 

652 alert['prvDiaForcedSources'] = None 

653 else: 

654 alert['prvDiaForcedSources'] = objDiaForcedSources.to_dict("records") 

655 

656 alert['prvDiaNondetectionLimits'] = None 

657 

658 if diaObject is None: 

659 alert['diaObject'] = None 

660 else: 

661 alert['diaObject'] = diaObject.to_dict() 

662 

663 if ssSource is None: 

664 alert['ssSource'] = None 

665 alert['mpc_orbits'] = None 

666 else: 

667 mpcorbColumns = [col for col in ssSource if col[:7] == 'MPCORB_'] 

668 mpcOrbit = {key[7:]: ssSource[key] for key in mpcorbColumns} 

669 

670 fields_to_cast = [ 

671 "arc_length_total", "arc_length_sel", "a", "mean_anomaly", "period", "mean_motion", "a_unc", 

672 "mean_anomaly_unc", "period_unc", "mean_motion_unc", "yarkovsky", "srp", "a1", "a2", "a3", 

673 "dt", "yarkovsky_unc", "srp_unc", "a1_unc", "a2_unc", "a3_unc", "dt_unc", 

674 "not_normalized_rms", "earth_moid" 

675 ] 

676 for key in fields_to_cast: 

677 if key in mpcOrbit and mpcOrbit[key] is not None: 

678 mpcOrbit[key] = float(mpcOrbit[key]) 

679 

680 if 'packed_primary_provisional_designation' not in mpcOrbit: 

681 unpacked_desig = ssSource['designation'] 

682 packed_desig = ss_object_id_to_obj_id(ssSource['ssObjectId'], packed=True) 

683 mpcOrbit['designation'] = unpacked_desig 

684 mpcOrbit['packed_primary_provisional_designation'] = packed_desig 

685 mpcOrbit['unpacked_primary_provisional_designation'] = unpacked_desig 

686 

687 mpcOrbit['id'] = 0 

688 ssSource = {key: ssSource[key] for key in ssSource if key not in mpcorbColumns} 

689 ssSource['diaSourceId'] = diaSourceId 

690 alert['ssSource'] = ssSource 

691 alert['mpc_orbits'] = mpcOrbit 

692 

693 if diffImCutout is None: 

694 alert['cutoutDifference'] = None 

695 else: 

696 alert['cutoutDifference'] = self.streamCcdDataToBytes(diffImCutout) 

697 

698 if calexpCutout is None: 

699 alert['cutoutScience'] = None 

700 else: 

701 alert['cutoutScience'] = self.streamCcdDataToBytes(calexpCutout) 

702 

703 if templateCutout is None: 

704 alert["cutoutTemplate"] = None 

705 else: 

706 alert["cutoutTemplate"] = self.streamCcdDataToBytes(templateCutout) 

707 

708 return alert 

709 

710 def streamCcdDataToBytes(self, cutout): 

711 """Serialize a cutout into bytes. 

712 

713 Parameters 

714 ---------- 

715 cutout : `astropy.nddata.CCDData` 

716 Cutout to serialize. 

717 

718 Returns 

719 ------- 

720 coutputBytes : `bytes` 

721 Input cutout serialized into byte data. 

722 """ 

723 with io.BytesIO() as streamer: 

724 cutout.write(streamer, format="fits") 

725 cutoutBytes = streamer.getvalue() 

726 return cutoutBytes 

727 

728 def _serializeAlert(self, alert, schema=None, schema_id=0): 

729 """Serialize an alert to a byte sequence for sending to Kafka. 

730 

731 Parameters 

732 ---------- 

733 alert : `dict` 

734 An alert payload to be serialized. 

735 schema : `dict`, optional 

736 An Avro schema definition describing how to encode `alert`. By default, 

737 the schema is None, which sets it to the latest schema available. 

738 schema_id : `int`, optional 

739 The Confluent Schema Registry ID of the schema. By default, 0 (an 

740 invalid ID) is used, indicating that the schema is not registered. 

741 

742 Returns 

743 ------- 

744 serialized : `bytes` 

745 The byte sequence describing the alert, including the Confluent Wire 

746 Format prefix. 

747 """ 

748 if schema is None: 

749 schema = self.alertSchema.definition 

750 

751 buf = io.BytesIO() 

752 buf.write(self._serializeConfluentWireHeader(schema_id)) 

753 fastavro.schemaless_writer(buf, schema, alert) 

754 return buf.getvalue() 

755 

756 @staticmethod 

757 def _serializeConfluentWireHeader(schema_version): 

758 """Returns the byte prefix for Confluent Wire Format-style Kafka messages. 

759 

760 Parameters 

761 ---------- 

762 schema_version : `int` 

763 A version number which indicates the Confluent Schema Registry ID 

764 number of the Avro schema used to encode the message that follows this 

765 header. 

766 

767 Returns 

768 ------- 

769 header : `bytes` 

770 The 5-byte encoded message prefix. 

771 

772 Notes 

773 ----- 

774 The Confluent Wire Format is described more fully here: 

775 https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format 

776 """ 

777 ConfluentWireFormatHeader = struct.Struct(">bi") 

778 return ConfluentWireFormatHeader.pack(0, schema_version) 

779 

780 def _delivery_callback(self, err, msg): 

781 if err: 

782 if err.code() == KafkaError.UNKNOWN_TOPIC_OR_PART: 

783 topic = msg.topic() if msg and hasattr(msg, 

784 'topic') else 'unknown' 

785 self.log.warning( 

786 'Message failed delivery. Topic %s unknown: %s', 

787 topic, err) 

788 else: 

789 self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset()) 

790 

791 def _server_check(self): 

792 """Checks if the alert stream credentials are still valid and the 

793 server is contactable. 

794 

795 Raises 

796 ------- 

797 KafkaException 

798 Raised if the server us not contactable. 

799 RuntimeError 

800 Raised if the server is contactable but there are no topics 

801 present. 

802 """ 

803 admin_client = AdminClient(self.kafkaAdminConfig) 

804 topics = admin_client.list_topics(timeout=self.config.maxTimeout).topics 

805 

806 if not topics: 

807 raise RuntimeError() 

808 if self.kafkaTopic not in topics: 

809 raise RuntimeError( 

810 f"Topic '{self.kafkaTopic}' not found on the Kafka server. " 

811 f"Check that the correct topic has been provided and exists " 

812 f"on the Kafka server.") 

813 

814 def _computePsf(self, exposure, pixelCenter, srcId=None): 

815 """Compute the PSF at a location and catch errors. 

816 

817 Parameters 

818 ---------- 

819 exposure : `lsst.afw.image.Exposure` 

820 The image to compute the PSF for. 

821 pixelCenter : `lsst.geom.Point2D` 

822 The location on the image to compute the PSF. 

823 srcId : `int`, optional 

824 Unique id of DiaSource. Used for when an error occurs extracting 

825 a cutout. 

826 

827 Returns 

828 ------- 

829 cutoutPsf : `numpy.array` 

830 Array of the PSF values. 

831 """ 

832 try: 

833 # use exposure.psf.computeKernelImage to provide PSF centered in the array 

834 cutoutPsf = exposure.psf.computeKernelImage(pixelCenter).array 

835 except InvalidParameterError: 

836 if srcId is not None: 

837 msg = "Could not calculate PSF for DiaSource with "\ 

838 "id=%i. InvalidParameterError encountered. Exiting."\ 

839 % srcId 

840 else: 

841 msg = "Could not calculate average PSF for the image" 

842 self.log.warning(msg) 

843 cutoutPsf = None 

844 except InvalidPsfError: 

845 if srcId is not None: 

846 msg = "Could not calculate PSF for DiaSource with "\ 

847 "id=%i. InvalidPsfError encountered. Exiting."\ 

848 % srcId 

849 else: 

850 msg = "Could not calculate average PSF for the image" 

851 self.log.warning(msg) 

852 cutoutPsf = None 

853 return cutoutPsf