Coverage for python / lsst / analysis / tools / interfaces / datastore / _dispatcher.py: 12%
298 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ("SasquatchDispatchPartialFailure", "SasquatchDispatchFailure", "SasquatchDispatcher")
26"""Sasquatch datastore"""
27import calendar
28import datetime
29import json
30import logging
31import math
32import re
33from collections.abc import Mapping, MutableMapping, Sequence
34from dataclasses import dataclass
35from typing import TYPE_CHECKING, Any, cast
36from uuid import UUID, uuid4
38import numpy as np
39import requests
40from lsst.daf.butler import DatasetRef
41from lsst.resources import ResourcePath
42from lsst.utils.packages import getEnvironmentPackages
44from ...utils import http_client
46if TYPE_CHECKING:
47 from .. import MetricMeasurementBundle
50log = logging.getLogger(__name__)
52# Constants assocated with SasquatchDispatcher
53PARTITIONS = 1
54REPLICATION_FACTOR = 3
56IDENTIFIER_KEYS = [
57 "detector",
58 "patch",
59 "skymap",
60 "visit",
61 "tract",
62 "physical_filter",
63 "instrument",
64 "band",
65 "exposure",
66 "group",
67 "day_obs",
68]
71class SasquatchDispatchPartialFailure(RuntimeError):
72 """This indicates that a Sasquatch dispatch was partially successful."""
74 pass
77class SasquatchDispatchFailure(RuntimeError):
78 """This indicates that dispatching a
79 `~lsst.analysis.tool.interface.MetricMeasurementBundle` failed.
80 """
82 pass
85def _tag2VersionTime(productStr: str) -> tuple[str, float]:
86 """Determine versions and dates from the string returned from
87 getEnvironmentPackages.
89 The `~lsst.utils.packages.genEnvironmentPackages` function returns the
90 setup version associated with a product, along with a list of tags that
91 have been added to it.
93 This method splits up that return string, and determines the earliest date
94 associated with the setup package version.
96 Parameters
97 ----------
98 productStr : `str`
99 The product string returned from a lookup on the result of a call to
100 `~lsst.utils.packages.getEnvironmentPackages`.
102 Returns
103 -------
104 result : `tuple` of `str`, `datetime.datetime`
105 The first `str` is the version of the package, and the second is the
106 datetime object associated with that released version.
108 Raises
109 ------
110 ValueError
111 Raised if there are no tags which correspond to dates.
112 """
113 times: list[datetime.datetime] = []
114 version = productStr.split()[0]
115 tags: str = re.findall("[(](.*)[)]", productStr)[0]
116 for tag in tags.split():
117 numDots = tag.count(".")
118 numUnder = tag.count("_")
119 separator = "_"
120 if numDots > numUnder:
121 separator = "."
122 match tag.split(separator):
123 # Daily tag branch.
124 case ("d", year, month, day):
125 dt = datetime.datetime(year=int(year), month=int(month), day=int(day))
126 # Weekly tag branch.
127 case ("w", year, week):
128 iyear = int(year)
129 iweek = int(week)
130 # Use 4 as the day because releases are available starting
131 # on Thursday
132 dayOfWeek = 4
134 # Find the first week to contain a thursday in it
135 cal = calendar.Calendar()
136 cal.setfirstweekday(6)
137 i = 0
138 for i, iterWeek in enumerate(cal.monthdatescalendar(iyear, 1)):
139 if iterWeek[dayOfWeek].month == 1:
140 break
141 # Handle fromisocalendar not being able to handle week 53
142 # in the case were the date was going to subtract 7 days anyway
143 if i and iweek == 53:
144 i = 0
145 iweek = 52
146 delta = datetime.timedelta(days=7 * i)
148 # Correct for a weekly being issued in the last week of the
149 # previous year, as Thursdays don't always line up evenly in
150 # a week / year split.
151 dt = datetime.datetime.fromisocalendar(iyear, iweek, dayOfWeek) - delta
152 # Skip tags that can't be understood.
153 case _:
154 continue
155 times.append(dt)
156 if len(times) == 0:
157 raise ValueError("Could not find any tags corresponding to dates")
158 minTime = min(times)
159 minTime.replace(tzinfo=datetime.timezone.utc)
160 return version, minTime.timestamp()
163@dataclass
164class SasquatchDispatcher:
165 """This class mediates the transfer of MetricMeasurementBundles to a
166 Sasquatch http kafka proxy server.
167 """
169 url: str
170 """Url of the Sasquatch proxy server"""
172 token: str
173 """Authentication token used in communicating with the proxy server"""
175 namespace: str = "lsst.dm"
176 """The namespace in Sasquatch in which to write the uploaded metrics"""
178 def __post_init__(self) -> None:
179 match ResourcePath(self.url).scheme:
180 case "http" | "https":
181 pass
182 case _:
183 raise ValueError("Proxy server must be locatable with either http or https")
185 self._cluster_id: str | None = None
187 @property
188 def clusterId(self) -> str:
189 """ClusterId of the Kafka proxy
191 Notes
192 -----
193 The cluster Id will be fetched with a network call if it is not
194 already cached.
195 """
196 if self._cluster_id is None:
197 self._populateClusterId()
198 return cast(str, self._cluster_id)
200 def _populateClusterId(self) -> None:
201 """Get Sasquatch kafka cluster ID."""
203 headers = {"content-type": "application/json"}
205 try:
206 with http_client() as session:
207 r = session.get(f"{self.url}/v3/clusters", headers=headers)
208 cluster_id = r.json()["data"][0]["cluster_id"]
209 self._cluster_id = str(cluster_id)
210 r.raise_for_status()
211 except requests.RequestException as e:
212 raise SasquatchDispatchFailure("Could not retrieve the cluster id for the specified url") from e
214 def _create_topic(self, topic_name: str) -> bool:
215 """Create a kafka topic in Sasquatch.
217 Parameters
218 ----------
219 topic_name : `str`
220 The name of the kafka topic to create
222 Returns
223 -------
224 status : `bool`
225 If this does not encounter an error it will return a True success
226 code, else it will return a False code.
228 """
230 headers = {"content-type": "application/json"}
232 topic_config = {
233 "topic_name": f"{self.namespace}.{topic_name}",
234 "partitions_count": PARTITIONS,
235 "replication_factor": REPLICATION_FACTOR,
236 }
238 try:
239 with http_client() as session:
240 r = session.post(
241 f"{self.url}/v3/clusters/{self.clusterId}/topics", json=topic_config, headers=headers
242 )
243 r.raise_for_status()
244 log.debug("Created topic %s.%s", self.namespace, topic_name)
245 return True
246 except requests.HTTPError as e:
247 if e.response.status_code == requests.codes.bad_request:
248 log.debug("Topic %s.%s already exists.", self.namespace, topic_name)
249 return True
250 else:
251 log.error(
252 "Unknown error occurred creating kafka topic %s %s",
253 e.response.status_code,
254 e.response.reason,
255 )
256 return False
257 except requests.RequestException:
258 log.exception("Unknown error occurred creating kafka topic")
259 return False
261 def _generateAvroSchema(self, metric: str, record: MutableMapping[str, Any]) -> tuple[str, bool]:
262 """Infer the Avro schema from the record payload.
264 Parameters
265 ----------
266 metric : `str`
267 The name of the metric
268 record : `MutableMapping`
269 The prepared record for which a schema is to be generated
271 Returns
272 -------
273 resultSchema : `str`
274 A json encoded string of the resulting avro schema
275 errorCode : bool
276 A boolean indicating if any record fields had to be trimmed because
277 a suitable schema could not be generated. True if records were
278 removed, False otherwise.
279 """
280 schema: dict[str, Any] = {"type": "record", "namespace": self.namespace, "name": metric}
282 # Record if any records needed to be trimmed
283 resultsTrimmed = False
285 fields = list()
286 # If avro schemas cant be generated for values, they should be removed
287 # from the records.
288 keysToRemove: list[str] = []
289 for key in record:
290 value = record[key]
291 avroType: Mapping[str, Any]
292 if "timestamp" in key:
293 avroType = {"type": "double"}
294 else:
295 avroType = self._python2Avro(value)
296 if len(avroType) == 0:
297 continue
298 if avroType.get("error_in_conversion"):
299 keysToRemove.append(key)
300 resultsTrimmed = True
301 continue
302 fields.append({"name": key, **avroType})
304 # remove any key that failed to have schema generated
305 for key in keysToRemove:
306 record.pop(key)
308 schema["fields"] = fields
310 return json.dumps(schema), resultsTrimmed
312 def _python2Avro(self, value: Any) -> Mapping:
313 """Map python type to avro schema
315 Parameters
316 ----------
317 value : `Any`
318 Any python parameter.
320 Returns
321 -------
322 result : `Mapping`
323 Return a mapping that represents an entry in an avro schema.
324 """
325 match value:
326 case float() | np.float32() | None:
327 return {"type": "float", "default": 0.0}
328 case str():
329 return {"type": "string", "default": ""}
330 case int():
331 return {"type": "int", "default": 0}
332 case Sequence():
333 tmp = {self._python2Avro(item)["type"] for item in value}
334 if len(tmp) == 0:
335 return {}
336 if len(tmp) > 1:
337 log.error(
338 "Sequence contains mixed types: %s, must be homogeneous for avro conversion "
339 "skipping record",
340 tmp,
341 )
342 return {"error_in_conversion": True}
343 return {"type": "array", "items": tmp.pop()}
344 case _:
345 log.error("Unsupported type %s, skipping record", type(value))
346 return {}
348 def _handleReferencePackage(self, meta: MutableMapping, bundle: MetricMeasurementBundle) -> None:
349 """Check to see if there is a reference package.
351 if there is a reference package, determine the datetime associated with
352 this reference package. Save the package, the version, and the date to
353 the common metric fields.
355 Parameters
356 ----------
357 meta : `MutableMapping`
358 A mapping which corresponds to fields which should be encoded in
359 all records.
360 bundle : `MetricMeasurementBundle`
361 The bundled metrics
362 """
363 package_version, package_timestamp = "", 0.0
364 if ref_package := getattr(bundle, "reference_package", ""):
365 ref_package = bundle.reference_package
366 packages = getEnvironmentPackages(True)
367 if package_info := packages.get(ref_package):
368 try:
369 package_version, package_timestamp = _tag2VersionTime(package_info)
370 except ValueError:
371 # Could not extract package timestamp leaving empty
372 pass
373 # explicit handle if None was set in the bundle for the package
374 meta["reference_package"] = ref_package or ""
375 meta["reference_package_version"] = package_version
376 meta["reference_package_timestamp"] = package_timestamp
378 def _handleTimes(self, meta: MutableMapping, bundle: MetricMeasurementBundle, run: str) -> None:
379 """Add times to the meta fields mapping.
381 Add all appropriate timestamp fields to the meta field mapping. These
382 will be added to all records.
384 This method will also look at the bundle to see if it defines a
385 preferred time. It so it sets that time as the main time stamp to be
386 used for this record.
388 Parameters
389 ----------
390 meta : `MutableMapping`
391 A mapping which corresponds to fields which should be encoded in
392 all records.
393 bundle : `MetricMeasurementBundle`
394 The bundled metrics
395 run : `str`
396 The `~lsst.daf.butler.Butler` collection where the
397 `MetricMeasurementBundle` is stored.
398 """
399 # Determine the timestamp associated with the run, if someone abused
400 # the run collection, use the current timestamp
401 if re.match(r"\d{8}T\d{6}Z", stamp := run.split("/")[-1]):
402 run_timestamp = datetime.datetime.strptime(stamp, r"%Y%m%dT%H%M%S%z")
403 else:
404 run_timestamp = datetime.datetime.now()
405 meta["run_timestamp"] = run_timestamp.timestamp()
407 # If the bundle supports supplying timestamps, dispatch on the type
408 # specified.
409 if hasattr(bundle, "timestamp_version") and bundle.timestamp_version:
410 match bundle.timestamp_version:
411 case "reference_package_timestamp":
412 if not meta["reference_package_timestamp"]:
413 log.error("Reference package timestamp is empty, using run_timestamp")
414 meta["timestamp"] = meta["run_timestamp"]
415 else:
416 meta["timestamp"] = meta["reference_package_timestamp"]
417 case "run_timestamp":
418 meta["timestamp"] = meta["run_timestamp"]
419 case "current_timestamp":
420 timeStamp = datetime.datetime.now()
421 meta["timestamp"] = timeStamp.timestamp()
422 case "dataset_timestamp":
423 log.error("dataset timestamps are not yet supported, run_timestamp will be used")
424 meta["timestamp"] = meta["run_timestamp"]
425 case str(value) if "explicit_timestamp" in value:
426 try:
427 _, splitTime = value.split(":")
428 except ValueError as excpt:
429 raise ValueError(
430 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
431 "where datetime is given in the form '%Y%m%dT%H%M%S%z"
432 ) from excpt
433 meta["timestamp"] = datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z").timestamp()
434 case _:
435 log.error(
436 "Timestamp version %s is not supported, run_timestamp will be used",
437 bundle.timestamp_version,
438 )
439 meta["timestamp"] = meta["run_timestamp"]
440 # Default to using the run_timestamp.
441 else:
442 meta["timestamp"] = meta["run_timestamp"]
444 def _handleIdentifier(
445 self,
446 meta: MutableMapping,
447 identifierFields: Mapping[str, Any] | None,
448 datasetIdentifier: str | None,
449 bundle: MetricMeasurementBundle,
450 ) -> None:
451 """Add an identifier to the meta record mapping.
453 If the bundle declares a dataset identifier to use add that to the
454 record, otherwise use 'Generic' as the identifier. If the
455 datasetIdentifier parameter is specified, that is used instead of
456 anything specified by the bundle.
458 This will also add any identifier fields supplied to the meta record
459 mapping.
461 Together these values (in addition to the timestamp and topic) should
462 uniquely identify an upload to the Sasquatch system.
464 Parameters
465 ----------
466 meta : `MutableMapping`
467 A mapping which corresponds to fields which should be encoded in
468 all records.
469 identifierFields: `Mapping` or `None`
470 The keys and values in this mapping will be both added as fields
471 in the record, and used in creating a unique tag for the uploaded
472 dataset type. I.e. the timestamp, and the tag will be unique, and
473 each record will belong to one combination of such.
474 datasetIdentifier : `str` or `None`
475 A string which will be used in creating unique identifier tags.
476 bundle : `MetricMeasurementBundle`
477 The bundle containing metric values to upload.
478 """
479 identifier: str
480 if datasetIdentifier is not None:
481 identifier = datasetIdentifier
482 elif hasattr(bundle, "dataset_identifier") and bundle.dataset_identifier is not None:
483 identifier = bundle.dataset_identifier
484 else:
485 identifier = "Generic"
487 meta["dataset_tag"] = identifier
489 if identifierFields is None:
490 identifierFields = {}
491 for key in IDENTIFIER_KEYS:
492 value = identifierFields.get(key, "")
493 meta[key] = f"{value}"
495 def _prepareBundle(
496 self,
497 bundle: MetricMeasurementBundle,
498 run: str,
499 datasetType: str,
500 timestamp: datetime.datetime | None = None,
501 id: UUID | None = None,
502 identifierFields: Mapping | None = None,
503 datasetIdentifier: str | None = None,
504 extraFields: Mapping | None = None,
505 ) -> tuple[Mapping[str, list[Any]], bool]:
506 """Encode all of the inputs into a format that can be sent to the
507 kafka proxy server.
509 Parameters
510 ----------
511 bundle : `MetricMeasurementBundle`
512 The bundle containing metric values to upload.
513 run : `str`
514 The run name to associate with these metric values. If this bundle
515 is also stored in the butler, this should be the butler run
516 collection the bundle is stored in the butler.
517 datasetType : `str`
518 The dataset type name associated with this
519 `MetricMeasurementBundle`
520 timestamp : `datetime.datetime`, optional
521 The timestamp to be associated with the measurements in the ingress
522 database. If this value is None, timestamp will be set by the run
523 time or current time.
524 id : `UUID`, optional
525 The UUID of the `MetricMeasurementBundle` within the butler. If
526 `None`, a new random UUID will be generated so that each record in
527 Sasquatch will have a unique value.
528 identifierFields: `Mapping`, optional
529 The keys and values in this mapping will be both added as fields
530 in the record, and used in creating a unique tag for the uploaded
531 dataset type. I.e. the timestamp, and the tag will be unique, and
532 each record will belong to one combination of such.
533 datasetIdentifier : `str`, optional
534 A string which will be used in creating unique identifier tags.
535 extraFields: `Mapping`, optional
536 Extra mapping keys and values that will be added as fields to the
537 dispatched record.
539 Returns
540 -------
541 result : `Mapping` of `str` to `list`
542 A mapping of metric name of list of metric measurement records.
543 status : `bool`
544 A status boolean indicating if some records had to be skipped due
545 to a problem parsing the bundle.
546 """
547 if id is None:
548 id = uuid4()
549 sid = str(id)
550 meta: dict[str, Any] = dict()
552 # Add other associated common fields
553 meta["id"] = sid
554 meta["run"] = run
555 meta["dataset_type"] = datasetType
557 # Check to see if the bundle declares a reference package
558 self._handleReferencePackage(meta, bundle)
560 # Handle the various timestamps that could be associated with a record
561 self._handleTimes(meta, bundle, run)
563 # Always use the supplied timestamp if one was passed to use.
564 if timestamp is not None:
565 meta["timestamp"] = timestamp.timestamp()
567 self._handleIdentifier(meta, identifierFields, datasetIdentifier, bundle)
569 # Add in any other fields that were supplied to the function call.
570 if extraFields is not None:
571 meta.update(extraFields)
573 metricRecords: dict[str, list[Any]] = dict()
575 # Record if any records needed skipped
576 resultsTrimmed = False
578 # Look at each of the metrics in the bundle (name, values)
579 for metric, measurements in bundle.items():
580 # Create a list which will contain the records for each measurement
581 # associated with metric.
582 metricRecordList = metricRecords.setdefault(f"{bundle.metricNamePrefix}{metric}", list())
584 record: dict[str, Any] = meta.copy()
586 # loop over each metric measurement within the metric
587 for measurement in measurements:
588 # need to extract any tags, package info, etc
589 note_key = f"{measurement.metric_name.metric}.metric_tags"
590 record["tags"] = dict(measurement.notes.items()).get(note_key, list())
592 # Missing values are replaced by 0 in sasquatch, see RFC-763.
593 name = ""
594 value = 0.0
595 match measurement.json:
596 case {"metric": name, "value": None}:
597 pass
598 case {"metric": name, "value": value}:
599 if math.isnan(value):
600 log.error(
601 "Measurement %s had a value that is a NaN, dispatch will be skipped",
602 measurement,
603 )
604 resultsTrimmed = True
605 continue
606 # JSON will not serialize np.float32, must cast.
607 if isinstance(value, np.float32):
608 value = float(value)
609 pass
610 case {"value": _}:
611 log.error("Measurement %s does not contain the key 'metric'", measurement)
612 resultsTrimmed = True
613 continue
614 case {"metric": _}:
615 log.error("Measurement %s does not contain the key 'value'", measurement)
616 resultsTrimmed = True
617 continue
618 record[name] = value
620 metricRecordList.append({"value": record})
621 return metricRecords, resultsTrimmed
623 def dispatch(
624 self,
625 bundle: MetricMeasurementBundle,
626 run: str,
627 datasetType: str,
628 timestamp: datetime.datetime | None = None,
629 id: UUID | None = None,
630 datasetIdentifier: str | None = None,
631 identifierFields: Mapping | None = None,
632 extraFields: Mapping | None = None,
633 ) -> None:
634 """Dispatch a `MetricMeasurementBundle` to Sasquatch.
636 Parameters
637 ----------
638 bundle : `MetricMeasurementBundle`
639 The bundle containing metric values to upload.
640 run : `str`
641 The run name to associate with these metric values. If this bundle
642 is also stored in the butler, this should be the butler run
643 collection the bundle is stored in the butler. This will be used
644 in generating uniqueness constraints in Sasquatch.
645 datasetType : `str`
646 The dataset type name associated with this
647 `MetricMeasurementBundle`.
648 timestamp : `datetime.datetime`, optional
649 The timestamp to be associated with the measurements in the ingress
650 database. If this value is None, timestamp will be set by the run
651 time or current time.
652 id : `UUID`, optional
653 The UUID of the `MetricMeasurementBundle` within the Butler. If
654 `None`, a new random UUID will be generated so that each record in
655 Sasquatch will have a unique value.
656 datasetIdentifier : `str`, optional
657 A string which will be used in creating unique identifier tags. If
658 `None`, a default value will be inserted.
659 identifierFields: `Mapping`, optional
660 The keys and values in this mapping will be both added as fields
661 in the record, and used in creating a unique tag for the uploaded
662 dataset type. I.e. the timestamp, and the tag will be unique, and
663 each record will belong to one combination of such. Examples of
664 entries would be things like visit or tract.
665 extraFields: `Mapping`, optional
666 Extra mapping keys and values that will be added as fields to the
667 dispatched record.
669 Raises
670 ------
671 SasquatchDispatchPartialFailure, SasquatchDispatchFailure
672 Raised if there were any errors in dispatching a bundle.
673 """
674 if id is None:
675 id = uuid4()
677 # Prepare the bundle by transforming it to a list of metric records
678 metricRecords, recordsTrimmed = self._prepareBundle(
679 bundle=bundle,
680 run=run,
681 datasetType=datasetType,
682 timestamp=timestamp,
683 id=id,
684 datasetIdentifier=datasetIdentifier,
685 identifierFields=identifierFields,
686 extraFields=extraFields,
687 )
689 headers = {"content-type": "application/vnd.kafka.avro.v2+json"}
690 data: dict[str, Any] = dict()
691 partialUpload = False
692 uploadFailed = []
694 with http_client() as session:
695 for metric, record in metricRecords.items():
696 # create the kafka topic if it does not already exist
697 if not self._create_topic(metric):
698 log.error("Topic not created, skipping dispatch of %s", metric)
699 continue
700 recordValue = record[0]["value"]
701 # Generate schemas for each record
702 data["value_schema"], schemaTrimmed = self._generateAvroSchema(metric, recordValue)
703 data["records"] = record
705 if schemaTrimmed:
706 partialUpload = True
708 try:
709 r = session.post(
710 f"{self.url}/topics/{self.namespace}.{metric}", json=data, headers=headers
711 )
712 r.raise_for_status()
713 log.debug("Succesfully sent data for metric %s", metric)
714 uploadFailed.append(False)
715 except requests.HTTPError as e:
716 log.error(
717 "There was a problem submitting the metric %s: %s, %s",
718 metric,
719 e.response.status_code,
720 e.response.reason,
721 )
722 uploadFailed.append(True)
723 partialUpload = True
724 except requests.RequestException as e:
725 # Don't log full stack trace because there may be lots
726 # of these.
727 log.error("There was a problem submitting the metric %s: %s", metric, e)
728 uploadFailed.append(True)
729 partialUpload = True
731 # There may be no metrics to try to upload, and thus the uploadFailed
732 # list may be empty, check before issuing failure
733 if len(uploadFailed) > 0 and all(uploadFailed):
734 raise SasquatchDispatchFailure("All records were unable to be uploaded.")
736 if partialUpload or recordsTrimmed:
737 raise SasquatchDispatchPartialFailure("One or more records may not have been uploaded entirely")
739 def dispatchRef(
740 self,
741 bundle: MetricMeasurementBundle,
742 ref: DatasetRef,
743 timestamp: datetime.datetime | None = None,
744 extraFields: Mapping | None = None,
745 datasetIdentifier: str | None = None,
746 ) -> None:
747 """Dispatch a `MetricMeasurementBundle` to Sasquatch with a known
748 `DatasetRef`.
750 Parameters
751 ----------
752 bundle : `MetricMeasurementBundle`
753 The bundle containing metric values to upload.
754 ref : `DatasetRef`
755 The `Butler` dataset ref corresponding to the input
756 `MetricMeasurementBundle`.
757 timestamp : `datetime.datetime`, optional
758 The timestamp to be associated with the measurements in the ingress
759 database. If this value is None, timestamp will be set by the run
760 time or current time.
761 extraFields: `Mapping`, optional
762 Extra mapping keys and values that will be added as fields to the
763 dispatched record if not None.
764 datasetIdentifier : `str`, optional
765 A string which will be used in creating unique identifier tags. If
766 None, a default value will be inserted.
768 Raises
769 ------
770 SasquatchDispatchPartialFailure, SasquatchDispatchFailure
771 Raised if there were any errors in dispatching a bundle.
772 """
773 # Parse the relevant info out of the dataset ref.
774 serializedRef = ref.to_simple()
775 id = serializedRef.id
776 if serializedRef.run is None:
777 run = "<unknown>"
778 else:
779 run = serializedRef.run
780 dstype = serializedRef.datasetType
781 datasetType = dstype.name if dstype is not None else ""
782 dataRefMapping = serializedRef.dataId.dataId if serializedRef.dataId else None
784 self.dispatch(
785 bundle,
786 run=run,
787 timestamp=timestamp,
788 datasetType=datasetType,
789 id=id,
790 identifierFields=dataRefMapping,
791 extraFields=extraFields,
792 datasetIdentifier=datasetIdentifier,
793 )