Coverage for python / lsst / analysis / tools / interfaces / datastore / _dispatcher.py: 12%

298 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:53 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("SasquatchDispatchPartialFailure", "SasquatchDispatchFailure", "SasquatchDispatcher") 

25 

26"""Sasquatch datastore""" 

27import calendar 

28import datetime 

29import json 

30import logging 

31import math 

32import re 

33from collections.abc import Mapping, MutableMapping, Sequence 

34from dataclasses import dataclass 

35from typing import TYPE_CHECKING, Any, cast 

36from uuid import UUID, uuid4 

37 

38import numpy as np 

39import requests 

40from lsst.daf.butler import DatasetRef 

41from lsst.resources import ResourcePath 

42from lsst.utils.packages import getEnvironmentPackages 

43 

44from ...utils import http_client 

45 

46if TYPE_CHECKING: 

47 from .. import MetricMeasurementBundle 

48 

49 

50log = logging.getLogger(__name__) 

51 

52# Constants assocated with SasquatchDispatcher 

53PARTITIONS = 1 

54REPLICATION_FACTOR = 3 

55 

56IDENTIFIER_KEYS = [ 

57 "detector", 

58 "patch", 

59 "skymap", 

60 "visit", 

61 "tract", 

62 "physical_filter", 

63 "instrument", 

64 "band", 

65 "exposure", 

66 "group", 

67 "day_obs", 

68] 

69 

70 

71class SasquatchDispatchPartialFailure(RuntimeError): 

72 """This indicates that a Sasquatch dispatch was partially successful.""" 

73 

74 pass 

75 

76 

77class SasquatchDispatchFailure(RuntimeError): 

78 """This indicates that dispatching a 

79 `~lsst.analysis.tool.interface.MetricMeasurementBundle` failed. 

80 """ 

81 

82 pass 

83 

84 

85def _tag2VersionTime(productStr: str) -> tuple[str, float]: 

86 """Determine versions and dates from the string returned from 

87 getEnvironmentPackages. 

88 

89 The `~lsst.utils.packages.genEnvironmentPackages` function returns the 

90 setup version associated with a product, along with a list of tags that 

91 have been added to it. 

92 

93 This method splits up that return string, and determines the earliest date 

94 associated with the setup package version. 

95 

96 Parameters 

97 ---------- 

98 productStr : `str` 

99 The product string returned from a lookup on the result of a call to 

100 `~lsst.utils.packages.getEnvironmentPackages`. 

101 

102 Returns 

103 ------- 

104 result : `tuple` of `str`, `datetime.datetime` 

105 The first `str` is the version of the package, and the second is the 

106 datetime object associated with that released version. 

107 

108 Raises 

109 ------ 

110 ValueError 

111 Raised if there are no tags which correspond to dates. 

112 """ 

113 times: list[datetime.datetime] = [] 

114 version = productStr.split()[0] 

115 tags: str = re.findall("[(](.*)[)]", productStr)[0] 

116 for tag in tags.split(): 

117 numDots = tag.count(".") 

118 numUnder = tag.count("_") 

119 separator = "_" 

120 if numDots > numUnder: 

121 separator = "." 

122 match tag.split(separator): 

123 # Daily tag branch. 

124 case ("d", year, month, day): 

125 dt = datetime.datetime(year=int(year), month=int(month), day=int(day)) 

126 # Weekly tag branch. 

127 case ("w", year, week): 

128 iyear = int(year) 

129 iweek = int(week) 

130 # Use 4 as the day because releases are available starting 

131 # on Thursday 

132 dayOfWeek = 4 

133 

134 # Find the first week to contain a thursday in it 

135 cal = calendar.Calendar() 

136 cal.setfirstweekday(6) 

137 i = 0 

138 for i, iterWeek in enumerate(cal.monthdatescalendar(iyear, 1)): 

139 if iterWeek[dayOfWeek].month == 1: 

140 break 

141 # Handle fromisocalendar not being able to handle week 53 

142 # in the case were the date was going to subtract 7 days anyway 

143 if i and iweek == 53: 

144 i = 0 

145 iweek = 52 

146 delta = datetime.timedelta(days=7 * i) 

147 

148 # Correct for a weekly being issued in the last week of the 

149 # previous year, as Thursdays don't always line up evenly in 

150 # a week / year split. 

151 dt = datetime.datetime.fromisocalendar(iyear, iweek, dayOfWeek) - delta 

152 # Skip tags that can't be understood. 

153 case _: 

154 continue 

155 times.append(dt) 

156 if len(times) == 0: 

157 raise ValueError("Could not find any tags corresponding to dates") 

158 minTime = min(times) 

159 minTime.replace(tzinfo=datetime.timezone.utc) 

160 return version, minTime.timestamp() 

161 

162 

163@dataclass 

164class SasquatchDispatcher: 

165 """This class mediates the transfer of MetricMeasurementBundles to a 

166 Sasquatch http kafka proxy server. 

167 """ 

168 

169 url: str 

170 """Url of the Sasquatch proxy server""" 

171 

172 token: str 

173 """Authentication token used in communicating with the proxy server""" 

174 

175 namespace: str = "lsst.dm" 

176 """The namespace in Sasquatch in which to write the uploaded metrics""" 

177 

178 def __post_init__(self) -> None: 

179 match ResourcePath(self.url).scheme: 

180 case "http" | "https": 

181 pass 

182 case _: 

183 raise ValueError("Proxy server must be locatable with either http or https") 

184 

185 self._cluster_id: str | None = None 

186 

187 @property 

188 def clusterId(self) -> str: 

189 """ClusterId of the Kafka proxy 

190 

191 Notes 

192 ----- 

193 The cluster Id will be fetched with a network call if it is not 

194 already cached. 

195 """ 

196 if self._cluster_id is None: 

197 self._populateClusterId() 

198 return cast(str, self._cluster_id) 

199 

200 def _populateClusterId(self) -> None: 

201 """Get Sasquatch kafka cluster ID.""" 

202 

203 headers = {"content-type": "application/json"} 

204 

205 try: 

206 with http_client() as session: 

207 r = session.get(f"{self.url}/v3/clusters", headers=headers) 

208 cluster_id = r.json()["data"][0]["cluster_id"] 

209 self._cluster_id = str(cluster_id) 

210 r.raise_for_status() 

211 except requests.RequestException as e: 

212 raise SasquatchDispatchFailure("Could not retrieve the cluster id for the specified url") from e 

213 

214 def _create_topic(self, topic_name: str) -> bool: 

215 """Create a kafka topic in Sasquatch. 

216 

217 Parameters 

218 ---------- 

219 topic_name : `str` 

220 The name of the kafka topic to create 

221 

222 Returns 

223 ------- 

224 status : `bool` 

225 If this does not encounter an error it will return a True success 

226 code, else it will return a False code. 

227 

228 """ 

229 

230 headers = {"content-type": "application/json"} 

231 

232 topic_config = { 

233 "topic_name": f"{self.namespace}.{topic_name}", 

234 "partitions_count": PARTITIONS, 

235 "replication_factor": REPLICATION_FACTOR, 

236 } 

237 

238 try: 

239 with http_client() as session: 

240 r = session.post( 

241 f"{self.url}/v3/clusters/{self.clusterId}/topics", json=topic_config, headers=headers 

242 ) 

243 r.raise_for_status() 

244 log.debug("Created topic %s.%s", self.namespace, topic_name) 

245 return True 

246 except requests.HTTPError as e: 

247 if e.response.status_code == requests.codes.bad_request: 

248 log.debug("Topic %s.%s already exists.", self.namespace, topic_name) 

249 return True 

250 else: 

251 log.error( 

252 "Unknown error occurred creating kafka topic %s %s", 

253 e.response.status_code, 

254 e.response.reason, 

255 ) 

256 return False 

257 except requests.RequestException: 

258 log.exception("Unknown error occurred creating kafka topic") 

259 return False 

260 

261 def _generateAvroSchema(self, metric: str, record: MutableMapping[str, Any]) -> tuple[str, bool]: 

262 """Infer the Avro schema from the record payload. 

263 

264 Parameters 

265 ---------- 

266 metric : `str` 

267 The name of the metric 

268 record : `MutableMapping` 

269 The prepared record for which a schema is to be generated 

270 

271 Returns 

272 ------- 

273 resultSchema : `str` 

274 A json encoded string of the resulting avro schema 

275 errorCode : bool 

276 A boolean indicating if any record fields had to be trimmed because 

277 a suitable schema could not be generated. True if records were 

278 removed, False otherwise. 

279 """ 

280 schema: dict[str, Any] = {"type": "record", "namespace": self.namespace, "name": metric} 

281 

282 # Record if any records needed to be trimmed 

283 resultsTrimmed = False 

284 

285 fields = list() 

286 # If avro schemas cant be generated for values, they should be removed 

287 # from the records. 

288 keysToRemove: list[str] = [] 

289 for key in record: 

290 value = record[key] 

291 avroType: Mapping[str, Any] 

292 if "timestamp" in key: 

293 avroType = {"type": "double"} 

294 else: 

295 avroType = self._python2Avro(value) 

296 if len(avroType) == 0: 

297 continue 

298 if avroType.get("error_in_conversion"): 

299 keysToRemove.append(key) 

300 resultsTrimmed = True 

301 continue 

302 fields.append({"name": key, **avroType}) 

303 

304 # remove any key that failed to have schema generated 

305 for key in keysToRemove: 

306 record.pop(key) 

307 

308 schema["fields"] = fields 

309 

310 return json.dumps(schema), resultsTrimmed 

311 

312 def _python2Avro(self, value: Any) -> Mapping: 

313 """Map python type to avro schema 

314 

315 Parameters 

316 ---------- 

317 value : `Any` 

318 Any python parameter. 

319 

320 Returns 

321 ------- 

322 result : `Mapping` 

323 Return a mapping that represents an entry in an avro schema. 

324 """ 

325 match value: 

326 case float() | np.float32() | None: 

327 return {"type": "float", "default": 0.0} 

328 case str(): 

329 return {"type": "string", "default": ""} 

330 case int(): 

331 return {"type": "int", "default": 0} 

332 case Sequence(): 

333 tmp = {self._python2Avro(item)["type"] for item in value} 

334 if len(tmp) == 0: 

335 return {} 

336 if len(tmp) > 1: 

337 log.error( 

338 "Sequence contains mixed types: %s, must be homogeneous for avro conversion " 

339 "skipping record", 

340 tmp, 

341 ) 

342 return {"error_in_conversion": True} 

343 return {"type": "array", "items": tmp.pop()} 

344 case _: 

345 log.error("Unsupported type %s, skipping record", type(value)) 

346 return {} 

347 

348 def _handleReferencePackage(self, meta: MutableMapping, bundle: MetricMeasurementBundle) -> None: 

349 """Check to see if there is a reference package. 

350 

351 if there is a reference package, determine the datetime associated with 

352 this reference package. Save the package, the version, and the date to 

353 the common metric fields. 

354 

355 Parameters 

356 ---------- 

357 meta : `MutableMapping` 

358 A mapping which corresponds to fields which should be encoded in 

359 all records. 

360 bundle : `MetricMeasurementBundle` 

361 The bundled metrics 

362 """ 

363 package_version, package_timestamp = "", 0.0 

364 if ref_package := getattr(bundle, "reference_package", ""): 

365 ref_package = bundle.reference_package 

366 packages = getEnvironmentPackages(True) 

367 if package_info := packages.get(ref_package): 

368 try: 

369 package_version, package_timestamp = _tag2VersionTime(package_info) 

370 except ValueError: 

371 # Could not extract package timestamp leaving empty 

372 pass 

373 # explicit handle if None was set in the bundle for the package 

374 meta["reference_package"] = ref_package or "" 

375 meta["reference_package_version"] = package_version 

376 meta["reference_package_timestamp"] = package_timestamp 

377 

378 def _handleTimes(self, meta: MutableMapping, bundle: MetricMeasurementBundle, run: str) -> None: 

379 """Add times to the meta fields mapping. 

380 

381 Add all appropriate timestamp fields to the meta field mapping. These 

382 will be added to all records. 

383 

384 This method will also look at the bundle to see if it defines a 

385 preferred time. It so it sets that time as the main time stamp to be 

386 used for this record. 

387 

388 Parameters 

389 ---------- 

390 meta : `MutableMapping` 

391 A mapping which corresponds to fields which should be encoded in 

392 all records. 

393 bundle : `MetricMeasurementBundle` 

394 The bundled metrics 

395 run : `str` 

396 The `~lsst.daf.butler.Butler` collection where the 

397 `MetricMeasurementBundle` is stored. 

398 """ 

399 # Determine the timestamp associated with the run, if someone abused 

400 # the run collection, use the current timestamp 

401 if re.match(r"\d{8}T\d{6}Z", stamp := run.split("/")[-1]): 

402 run_timestamp = datetime.datetime.strptime(stamp, r"%Y%m%dT%H%M%S%z") 

403 else: 

404 run_timestamp = datetime.datetime.now() 

405 meta["run_timestamp"] = run_timestamp.timestamp() 

406 

407 # If the bundle supports supplying timestamps, dispatch on the type 

408 # specified. 

409 if hasattr(bundle, "timestamp_version") and bundle.timestamp_version: 

410 match bundle.timestamp_version: 

411 case "reference_package_timestamp": 

412 if not meta["reference_package_timestamp"]: 

413 log.error("Reference package timestamp is empty, using run_timestamp") 

414 meta["timestamp"] = meta["run_timestamp"] 

415 else: 

416 meta["timestamp"] = meta["reference_package_timestamp"] 

417 case "run_timestamp": 

418 meta["timestamp"] = meta["run_timestamp"] 

419 case "current_timestamp": 

420 timeStamp = datetime.datetime.now() 

421 meta["timestamp"] = timeStamp.timestamp() 

422 case "dataset_timestamp": 

423 log.error("dataset timestamps are not yet supported, run_timestamp will be used") 

424 meta["timestamp"] = meta["run_timestamp"] 

425 case str(value) if "explicit_timestamp" in value: 

426 try: 

427 _, splitTime = value.split(":") 

428 except ValueError as excpt: 

429 raise ValueError( 

430 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', " 

431 "where datetime is given in the form '%Y%m%dT%H%M%S%z" 

432 ) from excpt 

433 meta["timestamp"] = datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z").timestamp() 

434 case _: 

435 log.error( 

436 "Timestamp version %s is not supported, run_timestamp will be used", 

437 bundle.timestamp_version, 

438 ) 

439 meta["timestamp"] = meta["run_timestamp"] 

440 # Default to using the run_timestamp. 

441 else: 

442 meta["timestamp"] = meta["run_timestamp"] 

443 

444 def _handleIdentifier( 

445 self, 

446 meta: MutableMapping, 

447 identifierFields: Mapping[str, Any] | None, 

448 datasetIdentifier: str | None, 

449 bundle: MetricMeasurementBundle, 

450 ) -> None: 

451 """Add an identifier to the meta record mapping. 

452 

453 If the bundle declares a dataset identifier to use add that to the 

454 record, otherwise use 'Generic' as the identifier. If the 

455 datasetIdentifier parameter is specified, that is used instead of 

456 anything specified by the bundle. 

457 

458 This will also add any identifier fields supplied to the meta record 

459 mapping. 

460 

461 Together these values (in addition to the timestamp and topic) should 

462 uniquely identify an upload to the Sasquatch system. 

463 

464 Parameters 

465 ---------- 

466 meta : `MutableMapping` 

467 A mapping which corresponds to fields which should be encoded in 

468 all records. 

469 identifierFields: `Mapping` or `None` 

470 The keys and values in this mapping will be both added as fields 

471 in the record, and used in creating a unique tag for the uploaded 

472 dataset type. I.e. the timestamp, and the tag will be unique, and 

473 each record will belong to one combination of such. 

474 datasetIdentifier : `str` or `None` 

475 A string which will be used in creating unique identifier tags. 

476 bundle : `MetricMeasurementBundle` 

477 The bundle containing metric values to upload. 

478 """ 

479 identifier: str 

480 if datasetIdentifier is not None: 

481 identifier = datasetIdentifier 

482 elif hasattr(bundle, "dataset_identifier") and bundle.dataset_identifier is not None: 

483 identifier = bundle.dataset_identifier 

484 else: 

485 identifier = "Generic" 

486 

487 meta["dataset_tag"] = identifier 

488 

489 if identifierFields is None: 

490 identifierFields = {} 

491 for key in IDENTIFIER_KEYS: 

492 value = identifierFields.get(key, "") 

493 meta[key] = f"{value}" 

494 

495 def _prepareBundle( 

496 self, 

497 bundle: MetricMeasurementBundle, 

498 run: str, 

499 datasetType: str, 

500 timestamp: datetime.datetime | None = None, 

501 id: UUID | None = None, 

502 identifierFields: Mapping | None = None, 

503 datasetIdentifier: str | None = None, 

504 extraFields: Mapping | None = None, 

505 ) -> tuple[Mapping[str, list[Any]], bool]: 

506 """Encode all of the inputs into a format that can be sent to the 

507 kafka proxy server. 

508 

509 Parameters 

510 ---------- 

511 bundle : `MetricMeasurementBundle` 

512 The bundle containing metric values to upload. 

513 run : `str` 

514 The run name to associate with these metric values. If this bundle 

515 is also stored in the butler, this should be the butler run 

516 collection the bundle is stored in the butler. 

517 datasetType : `str` 

518 The dataset type name associated with this 

519 `MetricMeasurementBundle` 

520 timestamp : `datetime.datetime`, optional 

521 The timestamp to be associated with the measurements in the ingress 

522 database. If this value is None, timestamp will be set by the run 

523 time or current time. 

524 id : `UUID`, optional 

525 The UUID of the `MetricMeasurementBundle` within the butler. If 

526 `None`, a new random UUID will be generated so that each record in 

527 Sasquatch will have a unique value. 

528 identifierFields: `Mapping`, optional 

529 The keys and values in this mapping will be both added as fields 

530 in the record, and used in creating a unique tag for the uploaded 

531 dataset type. I.e. the timestamp, and the tag will be unique, and 

532 each record will belong to one combination of such. 

533 datasetIdentifier : `str`, optional 

534 A string which will be used in creating unique identifier tags. 

535 extraFields: `Mapping`, optional 

536 Extra mapping keys and values that will be added as fields to the 

537 dispatched record. 

538 

539 Returns 

540 ------- 

541 result : `Mapping` of `str` to `list` 

542 A mapping of metric name of list of metric measurement records. 

543 status : `bool` 

544 A status boolean indicating if some records had to be skipped due 

545 to a problem parsing the bundle. 

546 """ 

547 if id is None: 

548 id = uuid4() 

549 sid = str(id) 

550 meta: dict[str, Any] = dict() 

551 

552 # Add other associated common fields 

553 meta["id"] = sid 

554 meta["run"] = run 

555 meta["dataset_type"] = datasetType 

556 

557 # Check to see if the bundle declares a reference package 

558 self._handleReferencePackage(meta, bundle) 

559 

560 # Handle the various timestamps that could be associated with a record 

561 self._handleTimes(meta, bundle, run) 

562 

563 # Always use the supplied timestamp if one was passed to use. 

564 if timestamp is not None: 

565 meta["timestamp"] = timestamp.timestamp() 

566 

567 self._handleIdentifier(meta, identifierFields, datasetIdentifier, bundle) 

568 

569 # Add in any other fields that were supplied to the function call. 

570 if extraFields is not None: 

571 meta.update(extraFields) 

572 

573 metricRecords: dict[str, list[Any]] = dict() 

574 

575 # Record if any records needed skipped 

576 resultsTrimmed = False 

577 

578 # Look at each of the metrics in the bundle (name, values) 

579 for metric, measurements in bundle.items(): 

580 # Create a list which will contain the records for each measurement 

581 # associated with metric. 

582 metricRecordList = metricRecords.setdefault(f"{bundle.metricNamePrefix}{metric}", list()) 

583 

584 record: dict[str, Any] = meta.copy() 

585 

586 # loop over each metric measurement within the metric 

587 for measurement in measurements: 

588 # need to extract any tags, package info, etc 

589 note_key = f"{measurement.metric_name.metric}.metric_tags" 

590 record["tags"] = dict(measurement.notes.items()).get(note_key, list()) 

591 

592 # Missing values are replaced by 0 in sasquatch, see RFC-763. 

593 name = "" 

594 value = 0.0 

595 match measurement.json: 

596 case {"metric": name, "value": None}: 

597 pass 

598 case {"metric": name, "value": value}: 

599 if math.isnan(value): 

600 log.error( 

601 "Measurement %s had a value that is a NaN, dispatch will be skipped", 

602 measurement, 

603 ) 

604 resultsTrimmed = True 

605 continue 

606 # JSON will not serialize np.float32, must cast. 

607 if isinstance(value, np.float32): 

608 value = float(value) 

609 pass 

610 case {"value": _}: 

611 log.error("Measurement %s does not contain the key 'metric'", measurement) 

612 resultsTrimmed = True 

613 continue 

614 case {"metric": _}: 

615 log.error("Measurement %s does not contain the key 'value'", measurement) 

616 resultsTrimmed = True 

617 continue 

618 record[name] = value 

619 

620 metricRecordList.append({"value": record}) 

621 return metricRecords, resultsTrimmed 

622 

623 def dispatch( 

624 self, 

625 bundle: MetricMeasurementBundle, 

626 run: str, 

627 datasetType: str, 

628 timestamp: datetime.datetime | None = None, 

629 id: UUID | None = None, 

630 datasetIdentifier: str | None = None, 

631 identifierFields: Mapping | None = None, 

632 extraFields: Mapping | None = None, 

633 ) -> None: 

634 """Dispatch a `MetricMeasurementBundle` to Sasquatch. 

635 

636 Parameters 

637 ---------- 

638 bundle : `MetricMeasurementBundle` 

639 The bundle containing metric values to upload. 

640 run : `str` 

641 The run name to associate with these metric values. If this bundle 

642 is also stored in the butler, this should be the butler run 

643 collection the bundle is stored in the butler. This will be used 

644 in generating uniqueness constraints in Sasquatch. 

645 datasetType : `str` 

646 The dataset type name associated with this 

647 `MetricMeasurementBundle`. 

648 timestamp : `datetime.datetime`, optional 

649 The timestamp to be associated with the measurements in the ingress 

650 database. If this value is None, timestamp will be set by the run 

651 time or current time. 

652 id : `UUID`, optional 

653 The UUID of the `MetricMeasurementBundle` within the Butler. If 

654 `None`, a new random UUID will be generated so that each record in 

655 Sasquatch will have a unique value. 

656 datasetIdentifier : `str`, optional 

657 A string which will be used in creating unique identifier tags. If 

658 `None`, a default value will be inserted. 

659 identifierFields: `Mapping`, optional 

660 The keys and values in this mapping will be both added as fields 

661 in the record, and used in creating a unique tag for the uploaded 

662 dataset type. I.e. the timestamp, and the tag will be unique, and 

663 each record will belong to one combination of such. Examples of 

664 entries would be things like visit or tract. 

665 extraFields: `Mapping`, optional 

666 Extra mapping keys and values that will be added as fields to the 

667 dispatched record. 

668 

669 Raises 

670 ------ 

671 SasquatchDispatchPartialFailure, SasquatchDispatchFailure 

672 Raised if there were any errors in dispatching a bundle. 

673 """ 

674 if id is None: 

675 id = uuid4() 

676 

677 # Prepare the bundle by transforming it to a list of metric records 

678 metricRecords, recordsTrimmed = self._prepareBundle( 

679 bundle=bundle, 

680 run=run, 

681 datasetType=datasetType, 

682 timestamp=timestamp, 

683 id=id, 

684 datasetIdentifier=datasetIdentifier, 

685 identifierFields=identifierFields, 

686 extraFields=extraFields, 

687 ) 

688 

689 headers = {"content-type": "application/vnd.kafka.avro.v2+json"} 

690 data: dict[str, Any] = dict() 

691 partialUpload = False 

692 uploadFailed = [] 

693 

694 with http_client() as session: 

695 for metric, record in metricRecords.items(): 

696 # create the kafka topic if it does not already exist 

697 if not self._create_topic(metric): 

698 log.error("Topic not created, skipping dispatch of %s", metric) 

699 continue 

700 recordValue = record[0]["value"] 

701 # Generate schemas for each record 

702 data["value_schema"], schemaTrimmed = self._generateAvroSchema(metric, recordValue) 

703 data["records"] = record 

704 

705 if schemaTrimmed: 

706 partialUpload = True 

707 

708 try: 

709 r = session.post( 

710 f"{self.url}/topics/{self.namespace}.{metric}", json=data, headers=headers 

711 ) 

712 r.raise_for_status() 

713 log.debug("Succesfully sent data for metric %s", metric) 

714 uploadFailed.append(False) 

715 except requests.HTTPError as e: 

716 log.error( 

717 "There was a problem submitting the metric %s: %s, %s", 

718 metric, 

719 e.response.status_code, 

720 e.response.reason, 

721 ) 

722 uploadFailed.append(True) 

723 partialUpload = True 

724 except requests.RequestException as e: 

725 # Don't log full stack trace because there may be lots 

726 # of these. 

727 log.error("There was a problem submitting the metric %s: %s", metric, e) 

728 uploadFailed.append(True) 

729 partialUpload = True 

730 

731 # There may be no metrics to try to upload, and thus the uploadFailed 

732 # list may be empty, check before issuing failure 

733 if len(uploadFailed) > 0 and all(uploadFailed): 

734 raise SasquatchDispatchFailure("All records were unable to be uploaded.") 

735 

736 if partialUpload or recordsTrimmed: 

737 raise SasquatchDispatchPartialFailure("One or more records may not have been uploaded entirely") 

738 

739 def dispatchRef( 

740 self, 

741 bundle: MetricMeasurementBundle, 

742 ref: DatasetRef, 

743 timestamp: datetime.datetime | None = None, 

744 extraFields: Mapping | None = None, 

745 datasetIdentifier: str | None = None, 

746 ) -> None: 

747 """Dispatch a `MetricMeasurementBundle` to Sasquatch with a known 

748 `DatasetRef`. 

749 

750 Parameters 

751 ---------- 

752 bundle : `MetricMeasurementBundle` 

753 The bundle containing metric values to upload. 

754 ref : `DatasetRef` 

755 The `Butler` dataset ref corresponding to the input 

756 `MetricMeasurementBundle`. 

757 timestamp : `datetime.datetime`, optional 

758 The timestamp to be associated with the measurements in the ingress 

759 database. If this value is None, timestamp will be set by the run 

760 time or current time. 

761 extraFields: `Mapping`, optional 

762 Extra mapping keys and values that will be added as fields to the 

763 dispatched record if not None. 

764 datasetIdentifier : `str`, optional 

765 A string which will be used in creating unique identifier tags. If 

766 None, a default value will be inserted. 

767 

768 Raises 

769 ------ 

770 SasquatchDispatchPartialFailure, SasquatchDispatchFailure 

771 Raised if there were any errors in dispatching a bundle. 

772 """ 

773 # Parse the relevant info out of the dataset ref. 

774 serializedRef = ref.to_simple() 

775 id = serializedRef.id 

776 if serializedRef.run is None: 

777 run = "<unknown>" 

778 else: 

779 run = serializedRef.run 

780 dstype = serializedRef.datasetType 

781 datasetType = dstype.name if dstype is not None else "" 

782 dataRefMapping = serializedRef.dataId.dataId if serializedRef.dataId else None 

783 

784 self.dispatch( 

785 bundle, 

786 run=run, 

787 timestamp=timestamp, 

788 datasetType=datasetType, 

789 id=id, 

790 identifierFields=dataRefMapping, 

791 extraFields=extraFields, 

792 datasetIdentifier=datasetIdentifier, 

793 )