Coverage for python / lsst / analysis / tools / interfaces / datastore / _dispatcher.py: 12%

301 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:27 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("SasquatchDispatchPartialFailure", "SasquatchDispatchFailure", "SasquatchDispatcher") 

25 

26import calendar 

27import datetime 

28import json 

29import logging 

30import math 

31import re 

32from collections.abc import Mapping, MutableMapping, Sequence 

33from dataclasses import dataclass 

34from typing import TYPE_CHECKING, Any, cast 

35from uuid import UUID, uuid4 

36 

37import numpy as np 

38import requests 

39 

40from lsst.daf.butler import DatasetRef 

41from lsst.resources import ResourcePath 

42from lsst.utils.packages import getEnvironmentPackages 

43 

44from ...utils import http_client 

45 

46if TYPE_CHECKING: 

47 from .. import MetricMeasurementBundle 

48 

49"""Sasquatch datastore""" 

50 

51log = logging.getLogger(__name__) 

52 

53# Constants associated with SasquatchDispatcher 

54PARTITIONS = 1 

55REPLICATION_FACTOR = 3 

56 

57IDENTIFIER_KEYS = [ 

58 "detector", 

59 "patch", 

60 "skymap", 

61 "visit", 

62 "tract", 

63 "physical_filter", 

64 "instrument", 

65 "band", 

66 "exposure", 

67 "group", 

68 "day_obs", 

69] 

70 

71 

72class SasquatchDispatchPartialFailure(RuntimeError): 

73 """This indicates that a Sasquatch dispatch was partially successful.""" 

74 

75 pass 

76 

77 

78class SasquatchDispatchFailure(RuntimeError): 

79 """This indicates that dispatching a 

80 `~lsst.analysis.tool.interface.MetricMeasurementBundle` failed. 

81 """ 

82 

83 pass 

84 

85 

86def _tag2VersionTime(productStr: str) -> tuple[str, float]: 

87 """Determine versions and dates from the string returned from 

88 getEnvironmentPackages. 

89 

90 The `~lsst.utils.packages.genEnvironmentPackages` function returns the 

91 setup version associated with a product, along with a list of tags that 

92 have been added to it. 

93 

94 This method splits up that return string, and determines the earliest date 

95 associated with the setup package version. 

96 

97 Parameters 

98 ---------- 

99 productStr : `str` 

100 The product string returned from a lookup on the result of a call to 

101 `~lsst.utils.packages.getEnvironmentPackages`. 

102 

103 Returns 

104 ------- 

105 result : `tuple` of `str`, `datetime.datetime` 

106 The first `str` is the version of the package, and the second is the 

107 datetime object associated with that released version. 

108 

109 Raises 

110 ------ 

111 ValueError 

112 Raised if there are no tags which correspond to dates. 

113 """ 

114 times: list[datetime.datetime] = [] 

115 version = productStr.split()[0] 

116 pattern_match = re.findall("[(](.*)[)]", productStr) 

117 if len(pattern_match) == 0: 

118 raise ValueError(f"Could not find any tags in product string {productStr}") 

119 tags: str = pattern_match[0] 

120 for tag in tags.split(): 

121 numDots = tag.count(".") 

122 numUnder = tag.count("_") 

123 separator = "_" 

124 if numDots > numUnder: 

125 separator = "." 

126 match tag.split(separator): 

127 # Daily tag branch. 

128 case ("d", year, month, day): 

129 dt = datetime.datetime(year=int(year), month=int(month), day=int(day)) 

130 # Weekly tag branch. 

131 case ("w", year, week): 

132 iyear = int(year) 

133 iweek = int(week) 

134 # Use 4 as the day because releases are available starting 

135 # on Thursday 

136 dayOfWeek = 4 

137 

138 # Find the first week to contain a thursday in it 

139 cal = calendar.Calendar() 

140 cal.setfirstweekday(6) 

141 i = 0 

142 for i, iterWeek in enumerate(cal.monthdatescalendar(iyear, 1)): 

143 if iterWeek[dayOfWeek].month == 1: 

144 break 

145 # Handle fromisocalendar not being able to handle week 53 

146 # in the case were the date was going to subtract 7 days anyway 

147 if i and iweek == 53: 

148 i = 0 

149 iweek = 52 

150 delta = datetime.timedelta(days=7 * i) 

151 

152 # Correct for a weekly being issued in the last week of the 

153 # previous year, as Thursdays don't always line up evenly in 

154 # a week / year split. 

155 dt = datetime.datetime.fromisocalendar(iyear, iweek, dayOfWeek) - delta 

156 # Skip tags that can't be understood. 

157 case _: 

158 continue 

159 times.append(dt) 

160 if len(times) == 0: 

161 raise ValueError("Could not find any tags corresponding to dates") 

162 minTime = min(times) 

163 minTime.replace(tzinfo=datetime.UTC) 

164 return version, minTime.timestamp() 

165 

166 

167@dataclass 

168class SasquatchDispatcher: 

169 """This class mediates the transfer of MetricMeasurementBundles to a 

170 Sasquatch http kafka proxy server. 

171 """ 

172 

173 url: str 

174 """Url of the Sasquatch proxy server""" 

175 

176 token: str 

177 """Authentication token used in communicating with the proxy server""" 

178 

179 namespace: str = "lsst.dm" 

180 """The namespace in Sasquatch in which to write the uploaded metrics""" 

181 

182 def __post_init__(self) -> None: 

183 match ResourcePath(self.url).scheme: 

184 case "http" | "https": 

185 pass 

186 case _: 

187 raise ValueError("Proxy server must be locatable with either http or https") 

188 

189 self._cluster_id: str | None = None 

190 

191 @property 

192 def clusterId(self) -> str: 

193 """ClusterId of the Kafka proxy 

194 

195 Notes 

196 ----- 

197 The cluster Id will be fetched with a network call if it is not 

198 already cached. 

199 """ 

200 if self._cluster_id is None: 

201 self._populateClusterId() 

202 return cast(str, self._cluster_id) 

203 

204 def _populateClusterId(self) -> None: 

205 """Get Sasquatch kafka cluster ID.""" 

206 

207 headers = {"content-type": "application/json"} 

208 

209 try: 

210 with http_client() as session: 

211 r = session.get(f"{self.url}/v3/clusters", headers=headers) 

212 cluster_id = r.json()["data"][0]["cluster_id"] 

213 self._cluster_id = str(cluster_id) 

214 r.raise_for_status() 

215 except requests.RequestException as e: 

216 raise SasquatchDispatchFailure("Could not retrieve the cluster id for the specified url") from e 

217 

218 def _create_topic(self, topic_name: str) -> bool: 

219 """Create a kafka topic in Sasquatch. 

220 

221 Parameters 

222 ---------- 

223 topic_name : `str` 

224 The name of the kafka topic to create 

225 

226 Returns 

227 ------- 

228 status : `bool` 

229 If this does not encounter an error it will return a True success 

230 code, else it will return a False code. 

231 

232 """ 

233 

234 headers = {"content-type": "application/json"} 

235 

236 topic_config = { 

237 "topic_name": f"{self.namespace}.{topic_name}", 

238 "partitions_count": PARTITIONS, 

239 "replication_factor": REPLICATION_FACTOR, 

240 } 

241 

242 try: 

243 with http_client() as session: 

244 r = session.post( 

245 f"{self.url}/v3/clusters/{self.clusterId}/topics", json=topic_config, headers=headers 

246 ) 

247 r.raise_for_status() 

248 log.debug("Created topic %s.%s", self.namespace, topic_name) 

249 return True 

250 except requests.HTTPError as e: 

251 if e.response.status_code == requests.codes.bad_request: 

252 log.debug("Topic %s.%s already exists.", self.namespace, topic_name) 

253 return True 

254 else: 

255 log.error( 

256 "Unknown error occurred creating kafka topic %s %s", 

257 e.response.status_code, 

258 e.response.reason, 

259 ) 

260 return False 

261 except requests.RequestException: 

262 log.exception("Unknown error occurred creating kafka topic") 

263 return False 

264 

265 def _generateAvroSchema(self, metric: str, record: MutableMapping[str, Any]) -> tuple[str, bool]: 

266 """Infer the Avro schema from the record payload. 

267 

268 Parameters 

269 ---------- 

270 metric : `str` 

271 The name of the metric 

272 record : `MutableMapping` 

273 The prepared record for which a schema is to be generated 

274 

275 Returns 

276 ------- 

277 resultSchema : `str` 

278 A json encoded string of the resulting avro schema 

279 errorCode : bool 

280 A boolean indicating if any record fields had to be trimmed because 

281 a suitable schema could not be generated. True if records were 

282 removed, False otherwise. 

283 """ 

284 schema: dict[str, Any] = {"type": "record", "namespace": self.namespace, "name": metric} 

285 

286 # Record if any records needed to be trimmed 

287 resultsTrimmed = False 

288 

289 fields = list() 

290 # If avro schemas cant be generated for values, they should be removed 

291 # from the records. 

292 keysToRemove: list[str] = [] 

293 for key in record: 

294 value = record[key] 

295 avroType: Mapping[str, Any] 

296 if "timestamp" in key: 

297 avroType = {"type": "double"} 

298 else: 

299 avroType = self._python2Avro(value) 

300 if len(avroType) == 0: 

301 continue 

302 if avroType.get("error_in_conversion"): 

303 keysToRemove.append(key) 

304 resultsTrimmed = True 

305 continue 

306 fields.append({"name": key, **avroType}) 

307 

308 # remove any key that failed to have schema generated 

309 for key in keysToRemove: 

310 record.pop(key) 

311 

312 schema["fields"] = fields 

313 

314 return json.dumps(schema), resultsTrimmed 

315 

316 def _python2Avro(self, value: Any) -> Mapping: 

317 """Map python type to avro schema 

318 

319 Parameters 

320 ---------- 

321 value : `Any` 

322 Any python parameter. 

323 

324 Returns 

325 ------- 

326 result : `Mapping` 

327 Return a mapping that represents an entry in an avro schema. 

328 """ 

329 match value: 

330 case float() | np.float32() | None: 

331 return {"type": "float", "default": 0.0} 

332 case str(): 

333 return {"type": "string", "default": ""} 

334 case int(): 

335 return {"type": "int", "default": 0} 

336 case Sequence(): 

337 tmp = {self._python2Avro(item)["type"] for item in value} 

338 if len(tmp) == 0: 

339 return {} 

340 if len(tmp) > 1: 

341 log.error( 

342 "Sequence contains mixed types: %s, must be homogeneous for avro conversion " 

343 "skipping record", 

344 tmp, 

345 ) 

346 return {"error_in_conversion": True} 

347 return {"type": "array", "items": tmp.pop()} 

348 case _: 

349 log.error("Unsupported type %s, skipping record", type(value)) 

350 return {} 

351 

352 def _handleReferencePackage(self, meta: MutableMapping, bundle: MetricMeasurementBundle) -> None: 

353 """Check to see if there is a reference package. 

354 

355 if there is a reference package, determine the datetime associated with 

356 this reference package. Save the package, the version, and the date to 

357 the common metric fields. 

358 

359 Parameters 

360 ---------- 

361 meta : `MutableMapping` 

362 A mapping which corresponds to fields which should be encoded in 

363 all records. 

364 bundle : `MetricMeasurementBundle` 

365 The bundled metrics 

366 """ 

367 ref_package, package_version, package_timestamp = "", "", 0.0 

368 # This should only attempt to resolve a reference package if 

369 # it is to be used to determine the timestamp. 

370 if getattr(bundle, "timestamp_version", None) == "reference_package_timestamp": 

371 if ref_package := getattr(bundle, "reference_package", ""): 

372 packages = getEnvironmentPackages(True) 

373 if package_info := packages.get(ref_package): 

374 try: 

375 package_version, package_timestamp = _tag2VersionTime(package_info) 

376 except ValueError: 

377 # Could not extract package timestamp leaving empty 

378 pass 

379 # explicit handle if None was set in the bundle for the package 

380 meta["reference_package"] = ref_package or "" 

381 meta["reference_package_version"] = package_version 

382 meta["reference_package_timestamp"] = package_timestamp 

383 

384 def _handleTimes(self, meta: MutableMapping, bundle: MetricMeasurementBundle, run: str) -> None: 

385 """Add times to the meta fields mapping. 

386 

387 Add all appropriate timestamp fields to the meta field mapping. These 

388 will be added to all records. 

389 

390 This method will also look at the bundle to see if it defines a 

391 preferred time. It so it sets that time as the main time stamp to be 

392 used for this record. 

393 

394 Parameters 

395 ---------- 

396 meta : `MutableMapping` 

397 A mapping which corresponds to fields which should be encoded in 

398 all records. 

399 bundle : `MetricMeasurementBundle` 

400 The bundled metrics 

401 run : `str` 

402 The `~lsst.daf.butler.Butler` collection where the 

403 `MetricMeasurementBundle` is stored. 

404 """ 

405 # Determine the timestamp associated with the run, if someone abused 

406 # the run collection, use the current timestamp 

407 if re.match(r"\d{8}T\d{6}Z", stamp := run.split("/")[-1]): 

408 run_timestamp = datetime.datetime.strptime(stamp, r"%Y%m%dT%H%M%S%z") 

409 else: 

410 run_timestamp = datetime.datetime.now() 

411 meta["run_timestamp"] = run_timestamp.timestamp() 

412 

413 # If the bundle supports supplying timestamps, dispatch on the type 

414 # specified. 

415 if hasattr(bundle, "timestamp_version") and bundle.timestamp_version: 

416 match bundle.timestamp_version: 

417 case "reference_package_timestamp": 

418 if not meta["reference_package_timestamp"]: 

419 log.error("Reference package timestamp is empty, using run_timestamp") 

420 meta["timestamp"] = meta["run_timestamp"] 

421 else: 

422 meta["timestamp"] = meta["reference_package_timestamp"] 

423 case "run_timestamp": 

424 meta["timestamp"] = meta["run_timestamp"] 

425 case "current_timestamp": 

426 timeStamp = datetime.datetime.now() 

427 meta["timestamp"] = timeStamp.timestamp() 

428 case "dataset_timestamp": 

429 log.error("dataset timestamps are not yet supported, run_timestamp will be used") 

430 meta["timestamp"] = meta["run_timestamp"] 

431 case str(value) if "explicit_timestamp" in value: 

432 try: 

433 _, splitTime = value.split(":") 

434 except ValueError as excpt: 

435 raise ValueError( 

436 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', " 

437 "where datetime is given in the form '%Y%m%dT%H%M%S%z" 

438 ) from excpt 

439 meta["timestamp"] = datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z").timestamp() 

440 case _: 

441 log.error( 

442 "Timestamp version %s is not supported, run_timestamp will be used", 

443 bundle.timestamp_version, 

444 ) 

445 meta["timestamp"] = meta["run_timestamp"] 

446 # Default to using the run_timestamp. 

447 else: 

448 meta["timestamp"] = meta["run_timestamp"] 

449 

450 def _handleIdentifier( 

451 self, 

452 meta: MutableMapping, 

453 identifierFields: Mapping[str, Any] | None, 

454 datasetIdentifier: str | None, 

455 bundle: MetricMeasurementBundle, 

456 ) -> None: 

457 """Add an identifier to the meta record mapping. 

458 

459 If the bundle declares a dataset identifier to use add that to the 

460 record, otherwise use 'Generic' as the identifier. If the 

461 datasetIdentifier parameter is specified, that is used instead of 

462 anything specified by the bundle. 

463 

464 This will also add any identifier fields supplied to the meta record 

465 mapping. 

466 

467 Together these values (in addition to the timestamp and topic) should 

468 uniquely identify an upload to the Sasquatch system. 

469 

470 Parameters 

471 ---------- 

472 meta : `MutableMapping` 

473 A mapping which corresponds to fields which should be encoded in 

474 all records. 

475 identifierFields: `Mapping` or `None` 

476 The keys and values in this mapping will be both added as fields 

477 in the record, and used in creating a unique tag for the uploaded 

478 dataset type. I.e. the timestamp, and the tag will be unique, and 

479 each record will belong to one combination of such. 

480 datasetIdentifier : `str` or `None` 

481 A string which will be used in creating unique identifier tags. 

482 bundle : `MetricMeasurementBundle` 

483 The bundle containing metric values to upload. 

484 """ 

485 identifier: str 

486 if datasetIdentifier is not None: 

487 identifier = datasetIdentifier 

488 elif hasattr(bundle, "dataset_identifier") and bundle.dataset_identifier is not None: 

489 identifier = bundle.dataset_identifier 

490 else: 

491 identifier = "Generic" 

492 

493 meta["dataset_tag"] = identifier 

494 

495 if identifierFields is None: 

496 identifierFields = {} 

497 for key in IDENTIFIER_KEYS: 

498 value = identifierFields.get(key, "") 

499 meta[key] = f"{value}" 

500 

501 def _prepareBundle( 

502 self, 

503 bundle: MetricMeasurementBundle, 

504 run: str, 

505 datasetType: str, 

506 timestamp: datetime.datetime | None = None, 

507 id: UUID | None = None, 

508 identifierFields: Mapping | None = None, 

509 datasetIdentifier: str | None = None, 

510 extraFields: Mapping | None = None, 

511 ) -> tuple[Mapping[str, list[Any]], bool]: 

512 """Encode all of the inputs into a format that can be sent to the 

513 kafka proxy server. 

514 

515 Parameters 

516 ---------- 

517 bundle : `MetricMeasurementBundle` 

518 The bundle containing metric values to upload. 

519 run : `str` 

520 The run name to associate with these metric values. If this bundle 

521 is also stored in the butler, this should be the butler run 

522 collection the bundle is stored in the butler. 

523 datasetType : `str` 

524 The dataset type name associated with this 

525 `MetricMeasurementBundle` 

526 timestamp : `datetime.datetime`, optional 

527 The timestamp to be associated with the measurements in the ingress 

528 database. If this value is None, timestamp will be set by the run 

529 time or current time. 

530 id : `UUID`, optional 

531 The UUID of the `MetricMeasurementBundle` within the butler. If 

532 `None`, a new random UUID will be generated so that each record in 

533 Sasquatch will have a unique value. 

534 identifierFields: `Mapping`, optional 

535 The keys and values in this mapping will be both added as fields 

536 in the record, and used in creating a unique tag for the uploaded 

537 dataset type. I.e. the timestamp, and the tag will be unique, and 

538 each record will belong to one combination of such. 

539 datasetIdentifier : `str`, optional 

540 A string which will be used in creating unique identifier tags. 

541 extraFields: `Mapping`, optional 

542 Extra mapping keys and values that will be added as fields to the 

543 dispatched record. 

544 

545 Returns 

546 ------- 

547 result : `Mapping` of `str` to `list` 

548 A mapping of metric name of list of metric measurement records. 

549 status : `bool` 

550 A status boolean indicating if some records had to be skipped due 

551 to a problem parsing the bundle. 

552 """ 

553 if id is None: 

554 id = uuid4() 

555 sid = str(id) 

556 meta: dict[str, Any] = dict() 

557 

558 # Add other associated common fields 

559 meta["id"] = sid 

560 meta["run"] = run 

561 meta["dataset_type"] = datasetType 

562 

563 # Check to see if the bundle declares a reference package 

564 self._handleReferencePackage(meta, bundle) 

565 

566 # Handle the various timestamps that could be associated with a record 

567 self._handleTimes(meta, bundle, run) 

568 

569 # Always use the supplied timestamp if one was passed to use. 

570 if timestamp is not None: 

571 meta["timestamp"] = timestamp.timestamp() 

572 

573 self._handleIdentifier(meta, identifierFields, datasetIdentifier, bundle) 

574 

575 # Add in any other fields that were supplied to the function call. 

576 if extraFields is not None: 

577 meta.update(extraFields) 

578 

579 metricRecords: dict[str, list[Any]] = dict() 

580 

581 # Record if any records needed skipped 

582 resultsTrimmed = False 

583 

584 # Look at each of the metrics in the bundle (name, values) 

585 for metric, measurements in bundle.items(): 

586 # Create a list which will contain the records for each measurement 

587 # associated with metric. 

588 metricRecordList = metricRecords.setdefault(f"{bundle.metricNamePrefix}{metric}", list()) 

589 

590 record: dict[str, Any] = meta.copy() 

591 

592 # loop over each metric measurement within the metric 

593 for measurement in measurements: 

594 # need to extract any tags, package info, etc 

595 note_key = f"{measurement.metric_name.metric}.metric_tags" 

596 record["tags"] = dict(measurement.notes.items()).get(note_key, list()) 

597 

598 # Missing values are replaced by 0 in sasquatch, see RFC-763. 

599 name = "" 

600 value = 0.0 

601 match measurement.json: 

602 case {"metric": name, "value": None}: 

603 pass 

604 case {"metric": name, "value": value}: 

605 if math.isnan(value): 

606 log.error( 

607 "Measurement %s had a value that is a NaN, dispatch will be skipped", 

608 measurement, 

609 ) 

610 resultsTrimmed = True 

611 continue 

612 # JSON will not serialize np.float32, must cast. 

613 if isinstance(value, np.float32): 

614 value = float(value) 

615 pass 

616 case {"value": _}: 

617 log.error("Measurement %s does not contain the key 'metric'", measurement) 

618 resultsTrimmed = True 

619 continue 

620 case {"metric": _}: 

621 log.error("Measurement %s does not contain the key 'value'", measurement) 

622 resultsTrimmed = True 

623 continue 

624 record[name] = value 

625 

626 metricRecordList.append({"value": record}) 

627 return metricRecords, resultsTrimmed 

628 

629 def dispatch( 

630 self, 

631 bundle: MetricMeasurementBundle, 

632 run: str, 

633 datasetType: str, 

634 timestamp: datetime.datetime | None = None, 

635 id: UUID | None = None, 

636 datasetIdentifier: str | None = None, 

637 identifierFields: Mapping | None = None, 

638 extraFields: Mapping | None = None, 

639 ) -> None: 

640 """Dispatch a `MetricMeasurementBundle` to Sasquatch. 

641 

642 Parameters 

643 ---------- 

644 bundle : `MetricMeasurementBundle` 

645 The bundle containing metric values to upload. 

646 run : `str` 

647 The run name to associate with these metric values. If this bundle 

648 is also stored in the butler, this should be the butler run 

649 collection the bundle is stored in the butler. This will be used 

650 in generating uniqueness constraints in Sasquatch. 

651 datasetType : `str` 

652 The dataset type name associated with this 

653 `MetricMeasurementBundle`. 

654 timestamp : `datetime.datetime`, optional 

655 The timestamp to be associated with the measurements in the ingress 

656 database. If this value is None, timestamp will be set by the run 

657 time or current time. 

658 id : `UUID`, optional 

659 The UUID of the `MetricMeasurementBundle` within the Butler. If 

660 `None`, a new random UUID will be generated so that each record in 

661 Sasquatch will have a unique value. 

662 datasetIdentifier : `str`, optional 

663 A string which will be used in creating unique identifier tags. If 

664 `None`, a default value will be inserted. 

665 identifierFields: `Mapping`, optional 

666 The keys and values in this mapping will be both added as fields 

667 in the record, and used in creating a unique tag for the uploaded 

668 dataset type. I.e. the timestamp, and the tag will be unique, and 

669 each record will belong to one combination of such. Examples of 

670 entries would be things like visit or tract. 

671 extraFields: `Mapping`, optional 

672 Extra mapping keys and values that will be added as fields to the 

673 dispatched record. 

674 

675 Raises 

676 ------ 

677 SasquatchDispatchPartialFailure, SasquatchDispatchFailure 

678 Raised if there were any errors in dispatching a bundle. 

679 """ 

680 if id is None: 

681 id = uuid4() 

682 

683 # Prepare the bundle by transforming it to a list of metric records 

684 metricRecords, recordsTrimmed = self._prepareBundle( 

685 bundle=bundle, 

686 run=run, 

687 datasetType=datasetType, 

688 timestamp=timestamp, 

689 id=id, 

690 datasetIdentifier=datasetIdentifier, 

691 identifierFields=identifierFields, 

692 extraFields=extraFields, 

693 ) 

694 

695 headers = {"content-type": "application/vnd.kafka.avro.v2+json"} 

696 data: dict[str, Any] = dict() 

697 partialUpload = False 

698 uploadFailed = [] 

699 

700 with http_client() as session: 

701 for metric, record in metricRecords.items(): 

702 # create the kafka topic if it does not already exist 

703 if not self._create_topic(metric): 

704 log.error("Topic not created, skipping dispatch of %s", metric) 

705 continue 

706 recordValue = record[0]["value"] 

707 # Generate schemas for each record 

708 data["value_schema"], schemaTrimmed = self._generateAvroSchema(metric, recordValue) 

709 data["records"] = record 

710 

711 if schemaTrimmed: 

712 partialUpload = True 

713 

714 try: 

715 r = session.post( 

716 f"{self.url}/topics/{self.namespace}.{metric}", json=data, headers=headers 

717 ) 

718 r.raise_for_status() 

719 log.debug("Succesfully sent data for metric %s", metric) 

720 uploadFailed.append(False) 

721 except requests.HTTPError as e: 

722 log.error( 

723 "There was a problem submitting the metric %s: %s, %s", 

724 metric, 

725 e.response.status_code, 

726 e.response.reason, 

727 ) 

728 uploadFailed.append(True) 

729 partialUpload = True 

730 except requests.RequestException as e: 

731 # Don't log full stack trace because there may be lots 

732 # of these. 

733 log.error("There was a problem submitting the metric %s: %s", metric, e) 

734 uploadFailed.append(True) 

735 partialUpload = True 

736 

737 # There may be no metrics to try to upload, and thus the uploadFailed 

738 # list may be empty, check before issuing failure 

739 if len(uploadFailed) > 0 and all(uploadFailed): 

740 raise SasquatchDispatchFailure("All records were unable to be uploaded.") 

741 

742 if partialUpload or recordsTrimmed: 

743 raise SasquatchDispatchPartialFailure("One or more records may not have been uploaded entirely") 

744 

745 def dispatchRef( 

746 self, 

747 bundle: MetricMeasurementBundle, 

748 ref: DatasetRef, 

749 timestamp: datetime.datetime | None = None, 

750 extraFields: Mapping | None = None, 

751 datasetIdentifier: str | None = None, 

752 ) -> None: 

753 """Dispatch a `MetricMeasurementBundle` to Sasquatch with a known 

754 `DatasetRef`. 

755 

756 Parameters 

757 ---------- 

758 bundle : `MetricMeasurementBundle` 

759 The bundle containing metric values to upload. 

760 ref : `DatasetRef` 

761 The `Butler` dataset ref corresponding to the input 

762 `MetricMeasurementBundle`. 

763 timestamp : `datetime.datetime`, optional 

764 The timestamp to be associated with the measurements in the ingress 

765 database. If this value is None, timestamp will be set by the run 

766 time or current time. 

767 extraFields: `Mapping`, optional 

768 Extra mapping keys and values that will be added as fields to the 

769 dispatched record if not None. 

770 datasetIdentifier : `str`, optional 

771 A string which will be used in creating unique identifier tags. If 

772 None, a default value will be inserted. 

773 

774 Raises 

775 ------ 

776 SasquatchDispatchPartialFailure, SasquatchDispatchFailure 

777 Raised if there were any errors in dispatching a bundle. 

778 """ 

779 # Parse the relevant info out of the dataset ref. 

780 serializedRef = ref.to_simple() 

781 id = serializedRef.id 

782 if serializedRef.run is None: 

783 run = "<unknown>" 

784 else: 

785 run = serializedRef.run 

786 dstype = serializedRef.datasetType 

787 datasetType = dstype.name if dstype is not None else "" 

788 dataRefMapping = serializedRef.dataId.dataId if serializedRef.dataId else None 

789 

790 self.dispatch( 

791 bundle, 

792 run=run, 

793 timestamp=timestamp, 

794 datasetType=datasetType, 

795 id=id, 

796 identifierFields=dataRefMapping, 

797 extraFields=extraFields, 

798 datasetIdentifier=datasetIdentifier, 

799 )