Coverage for python / lsst / analysis / tools / interfaces / datastore / _dispatcher.py: 12%
301 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:53 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ("SasquatchDispatchPartialFailure", "SasquatchDispatchFailure", "SasquatchDispatcher")
26import calendar
27import datetime
28import json
29import logging
30import math
31import re
32from collections.abc import Mapping, MutableMapping, Sequence
33from dataclasses import dataclass
34from typing import TYPE_CHECKING, Any, cast
35from uuid import UUID, uuid4
37import numpy as np
38import requests
40from lsst.daf.butler import DatasetRef
41from lsst.resources import ResourcePath
42from lsst.utils.packages import getEnvironmentPackages
44from ...utils import http_client
46if TYPE_CHECKING:
47 from .. import MetricMeasurementBundle
49"""Sasquatch datastore"""
51log = logging.getLogger(__name__)
53# Constants associated with SasquatchDispatcher
54PARTITIONS = 1
55REPLICATION_FACTOR = 3
57IDENTIFIER_KEYS = [
58 "detector",
59 "patch",
60 "skymap",
61 "visit",
62 "tract",
63 "physical_filter",
64 "instrument",
65 "band",
66 "exposure",
67 "group",
68 "day_obs",
69]
72class SasquatchDispatchPartialFailure(RuntimeError):
73 """This indicates that a Sasquatch dispatch was partially successful."""
75 pass
78class SasquatchDispatchFailure(RuntimeError):
79 """This indicates that dispatching a
80 `~lsst.analysis.tool.interface.MetricMeasurementBundle` failed.
81 """
83 pass
86def _tag2VersionTime(productStr: str) -> tuple[str, float]:
87 """Determine versions and dates from the string returned from
88 getEnvironmentPackages.
90 The `~lsst.utils.packages.genEnvironmentPackages` function returns the
91 setup version associated with a product, along with a list of tags that
92 have been added to it.
94 This method splits up that return string, and determines the earliest date
95 associated with the setup package version.
97 Parameters
98 ----------
99 productStr : `str`
100 The product string returned from a lookup on the result of a call to
101 `~lsst.utils.packages.getEnvironmentPackages`.
103 Returns
104 -------
105 result : `tuple` of `str`, `datetime.datetime`
106 The first `str` is the version of the package, and the second is the
107 datetime object associated with that released version.
109 Raises
110 ------
111 ValueError
112 Raised if there are no tags which correspond to dates.
113 """
114 times: list[datetime.datetime] = []
115 version = productStr.split()[0]
116 pattern_match = re.findall("[(](.*)[)]", productStr)
117 if len(pattern_match) == 0:
118 raise ValueError(f"Could not find any tags in product string {productStr}")
119 tags: str = pattern_match[0]
120 for tag in tags.split():
121 numDots = tag.count(".")
122 numUnder = tag.count("_")
123 separator = "_"
124 if numDots > numUnder:
125 separator = "."
126 match tag.split(separator):
127 # Daily tag branch.
128 case ("d", year, month, day):
129 dt = datetime.datetime(year=int(year), month=int(month), day=int(day))
130 # Weekly tag branch.
131 case ("w", year, week):
132 iyear = int(year)
133 iweek = int(week)
134 # Use 4 as the day because releases are available starting
135 # on Thursday
136 dayOfWeek = 4
138 # Find the first week to contain a thursday in it
139 cal = calendar.Calendar()
140 cal.setfirstweekday(6)
141 i = 0
142 for i, iterWeek in enumerate(cal.monthdatescalendar(iyear, 1)):
143 if iterWeek[dayOfWeek].month == 1:
144 break
145 # Handle fromisocalendar not being able to handle week 53
146 # in the case were the date was going to subtract 7 days anyway
147 if i and iweek == 53:
148 i = 0
149 iweek = 52
150 delta = datetime.timedelta(days=7 * i)
152 # Correct for a weekly being issued in the last week of the
153 # previous year, as Thursdays don't always line up evenly in
154 # a week / year split.
155 dt = datetime.datetime.fromisocalendar(iyear, iweek, dayOfWeek) - delta
156 # Skip tags that can't be understood.
157 case _:
158 continue
159 times.append(dt)
160 if len(times) == 0:
161 raise ValueError("Could not find any tags corresponding to dates")
162 minTime = min(times)
163 minTime.replace(tzinfo=datetime.UTC)
164 return version, minTime.timestamp()
167@dataclass
168class SasquatchDispatcher:
169 """This class mediates the transfer of MetricMeasurementBundles to a
170 Sasquatch http kafka proxy server.
171 """
173 url: str
174 """Url of the Sasquatch proxy server"""
176 token: str
177 """Authentication token used in communicating with the proxy server"""
179 namespace: str = "lsst.dm"
180 """The namespace in Sasquatch in which to write the uploaded metrics"""
182 def __post_init__(self) -> None:
183 match ResourcePath(self.url).scheme:
184 case "http" | "https":
185 pass
186 case _:
187 raise ValueError("Proxy server must be locatable with either http or https")
189 self._cluster_id: str | None = None
191 @property
192 def clusterId(self) -> str:
193 """ClusterId of the Kafka proxy
195 Notes
196 -----
197 The cluster Id will be fetched with a network call if it is not
198 already cached.
199 """
200 if self._cluster_id is None:
201 self._populateClusterId()
202 return cast(str, self._cluster_id)
204 def _populateClusterId(self) -> None:
205 """Get Sasquatch kafka cluster ID."""
207 headers = {"content-type": "application/json"}
209 try:
210 with http_client() as session:
211 r = session.get(f"{self.url}/v3/clusters", headers=headers)
212 cluster_id = r.json()["data"][0]["cluster_id"]
213 self._cluster_id = str(cluster_id)
214 r.raise_for_status()
215 except requests.RequestException as e:
216 raise SasquatchDispatchFailure("Could not retrieve the cluster id for the specified url") from e
218 def _create_topic(self, topic_name: str) -> bool:
219 """Create a kafka topic in Sasquatch.
221 Parameters
222 ----------
223 topic_name : `str`
224 The name of the kafka topic to create
226 Returns
227 -------
228 status : `bool`
229 If this does not encounter an error it will return a True success
230 code, else it will return a False code.
232 """
234 headers = {"content-type": "application/json"}
236 topic_config = {
237 "topic_name": f"{self.namespace}.{topic_name}",
238 "partitions_count": PARTITIONS,
239 "replication_factor": REPLICATION_FACTOR,
240 }
242 try:
243 with http_client() as session:
244 r = session.post(
245 f"{self.url}/v3/clusters/{self.clusterId}/topics", json=topic_config, headers=headers
246 )
247 r.raise_for_status()
248 log.debug("Created topic %s.%s", self.namespace, topic_name)
249 return True
250 except requests.HTTPError as e:
251 if e.response.status_code == requests.codes.bad_request:
252 log.debug("Topic %s.%s already exists.", self.namespace, topic_name)
253 return True
254 else:
255 log.error(
256 "Unknown error occurred creating kafka topic %s %s",
257 e.response.status_code,
258 e.response.reason,
259 )
260 return False
261 except requests.RequestException:
262 log.exception("Unknown error occurred creating kafka topic")
263 return False
265 def _generateAvroSchema(self, metric: str, record: MutableMapping[str, Any]) -> tuple[str, bool]:
266 """Infer the Avro schema from the record payload.
268 Parameters
269 ----------
270 metric : `str`
271 The name of the metric
272 record : `MutableMapping`
273 The prepared record for which a schema is to be generated
275 Returns
276 -------
277 resultSchema : `str`
278 A json encoded string of the resulting avro schema
279 errorCode : bool
280 A boolean indicating if any record fields had to be trimmed because
281 a suitable schema could not be generated. True if records were
282 removed, False otherwise.
283 """
284 schema: dict[str, Any] = {"type": "record", "namespace": self.namespace, "name": metric}
286 # Record if any records needed to be trimmed
287 resultsTrimmed = False
289 fields = list()
290 # If avro schemas cant be generated for values, they should be removed
291 # from the records.
292 keysToRemove: list[str] = []
293 for key in record:
294 value = record[key]
295 avroType: Mapping[str, Any]
296 if "timestamp" in key:
297 avroType = {"type": "double"}
298 else:
299 avroType = self._python2Avro(value)
300 if len(avroType) == 0:
301 continue
302 if avroType.get("error_in_conversion"):
303 keysToRemove.append(key)
304 resultsTrimmed = True
305 continue
306 fields.append({"name": key, **avroType})
308 # remove any key that failed to have schema generated
309 for key in keysToRemove:
310 record.pop(key)
312 schema["fields"] = fields
314 return json.dumps(schema), resultsTrimmed
316 def _python2Avro(self, value: Any) -> Mapping:
317 """Map python type to avro schema
319 Parameters
320 ----------
321 value : `Any`
322 Any python parameter.
324 Returns
325 -------
326 result : `Mapping`
327 Return a mapping that represents an entry in an avro schema.
328 """
329 match value:
330 case float() | np.float32() | None:
331 return {"type": "float", "default": 0.0}
332 case str():
333 return {"type": "string", "default": ""}
334 case int():
335 return {"type": "int", "default": 0}
336 case Sequence():
337 tmp = {self._python2Avro(item)["type"] for item in value}
338 if len(tmp) == 0:
339 return {}
340 if len(tmp) > 1:
341 log.error(
342 "Sequence contains mixed types: %s, must be homogeneous for avro conversion "
343 "skipping record",
344 tmp,
345 )
346 return {"error_in_conversion": True}
347 return {"type": "array", "items": tmp.pop()}
348 case _:
349 log.error("Unsupported type %s, skipping record", type(value))
350 return {}
352 def _handleReferencePackage(self, meta: MutableMapping, bundle: MetricMeasurementBundle) -> None:
353 """Check to see if there is a reference package.
355 if there is a reference package, determine the datetime associated with
356 this reference package. Save the package, the version, and the date to
357 the common metric fields.
359 Parameters
360 ----------
361 meta : `MutableMapping`
362 A mapping which corresponds to fields which should be encoded in
363 all records.
364 bundle : `MetricMeasurementBundle`
365 The bundled metrics
366 """
367 ref_package, package_version, package_timestamp = "", "", 0.0
368 # This should only attempt to resolve a reference package if
369 # it is to be used to determine the timestamp.
370 if getattr(bundle, "timestamp_version", None) == "reference_package_timestamp":
371 if ref_package := getattr(bundle, "reference_package", ""):
372 packages = getEnvironmentPackages(True)
373 if package_info := packages.get(ref_package):
374 try:
375 package_version, package_timestamp = _tag2VersionTime(package_info)
376 except ValueError:
377 # Could not extract package timestamp leaving empty
378 pass
379 # explicit handle if None was set in the bundle for the package
380 meta["reference_package"] = ref_package or ""
381 meta["reference_package_version"] = package_version
382 meta["reference_package_timestamp"] = package_timestamp
384 def _handleTimes(self, meta: MutableMapping, bundle: MetricMeasurementBundle, run: str) -> None:
385 """Add times to the meta fields mapping.
387 Add all appropriate timestamp fields to the meta field mapping. These
388 will be added to all records.
390 This method will also look at the bundle to see if it defines a
391 preferred time. It so it sets that time as the main time stamp to be
392 used for this record.
394 Parameters
395 ----------
396 meta : `MutableMapping`
397 A mapping which corresponds to fields which should be encoded in
398 all records.
399 bundle : `MetricMeasurementBundle`
400 The bundled metrics
401 run : `str`
402 The `~lsst.daf.butler.Butler` collection where the
403 `MetricMeasurementBundle` is stored.
404 """
405 # Determine the timestamp associated with the run, if someone abused
406 # the run collection, use the current timestamp
407 if re.match(r"\d{8}T\d{6}Z", stamp := run.split("/")[-1]):
408 run_timestamp = datetime.datetime.strptime(stamp, r"%Y%m%dT%H%M%S%z")
409 else:
410 run_timestamp = datetime.datetime.now()
411 meta["run_timestamp"] = run_timestamp.timestamp()
413 # If the bundle supports supplying timestamps, dispatch on the type
414 # specified.
415 if hasattr(bundle, "timestamp_version") and bundle.timestamp_version:
416 match bundle.timestamp_version:
417 case "reference_package_timestamp":
418 if not meta["reference_package_timestamp"]:
419 log.error("Reference package timestamp is empty, using run_timestamp")
420 meta["timestamp"] = meta["run_timestamp"]
421 else:
422 meta["timestamp"] = meta["reference_package_timestamp"]
423 case "run_timestamp":
424 meta["timestamp"] = meta["run_timestamp"]
425 case "current_timestamp":
426 timeStamp = datetime.datetime.now()
427 meta["timestamp"] = timeStamp.timestamp()
428 case "dataset_timestamp":
429 log.error("dataset timestamps are not yet supported, run_timestamp will be used")
430 meta["timestamp"] = meta["run_timestamp"]
431 case str(value) if "explicit_timestamp" in value:
432 try:
433 _, splitTime = value.split(":")
434 except ValueError as excpt:
435 raise ValueError(
436 "Explicit timestamp must be given in the format 'explicit_timestamp:datetime', "
437 "where datetime is given in the form '%Y%m%dT%H%M%S%z"
438 ) from excpt
439 meta["timestamp"] = datetime.datetime.strptime(splitTime, r"%Y%m%dT%H%M%S%z").timestamp()
440 case _:
441 log.error(
442 "Timestamp version %s is not supported, run_timestamp will be used",
443 bundle.timestamp_version,
444 )
445 meta["timestamp"] = meta["run_timestamp"]
446 # Default to using the run_timestamp.
447 else:
448 meta["timestamp"] = meta["run_timestamp"]
450 def _handleIdentifier(
451 self,
452 meta: MutableMapping,
453 identifierFields: Mapping[str, Any] | None,
454 datasetIdentifier: str | None,
455 bundle: MetricMeasurementBundle,
456 ) -> None:
457 """Add an identifier to the meta record mapping.
459 If the bundle declares a dataset identifier to use add that to the
460 record, otherwise use 'Generic' as the identifier. If the
461 datasetIdentifier parameter is specified, that is used instead of
462 anything specified by the bundle.
464 This will also add any identifier fields supplied to the meta record
465 mapping.
467 Together these values (in addition to the timestamp and topic) should
468 uniquely identify an upload to the Sasquatch system.
470 Parameters
471 ----------
472 meta : `MutableMapping`
473 A mapping which corresponds to fields which should be encoded in
474 all records.
475 identifierFields: `Mapping` or `None`
476 The keys and values in this mapping will be both added as fields
477 in the record, and used in creating a unique tag for the uploaded
478 dataset type. I.e. the timestamp, and the tag will be unique, and
479 each record will belong to one combination of such.
480 datasetIdentifier : `str` or `None`
481 A string which will be used in creating unique identifier tags.
482 bundle : `MetricMeasurementBundle`
483 The bundle containing metric values to upload.
484 """
485 identifier: str
486 if datasetIdentifier is not None:
487 identifier = datasetIdentifier
488 elif hasattr(bundle, "dataset_identifier") and bundle.dataset_identifier is not None:
489 identifier = bundle.dataset_identifier
490 else:
491 identifier = "Generic"
493 meta["dataset_tag"] = identifier
495 if identifierFields is None:
496 identifierFields = {}
497 for key in IDENTIFIER_KEYS:
498 value = identifierFields.get(key, "")
499 meta[key] = f"{value}"
501 def _prepareBundle(
502 self,
503 bundle: MetricMeasurementBundle,
504 run: str,
505 datasetType: str,
506 timestamp: datetime.datetime | None = None,
507 id: UUID | None = None,
508 identifierFields: Mapping | None = None,
509 datasetIdentifier: str | None = None,
510 extraFields: Mapping | None = None,
511 ) -> tuple[Mapping[str, list[Any]], bool]:
512 """Encode all of the inputs into a format that can be sent to the
513 kafka proxy server.
515 Parameters
516 ----------
517 bundle : `MetricMeasurementBundle`
518 The bundle containing metric values to upload.
519 run : `str`
520 The run name to associate with these metric values. If this bundle
521 is also stored in the butler, this should be the butler run
522 collection the bundle is stored in the butler.
523 datasetType : `str`
524 The dataset type name associated with this
525 `MetricMeasurementBundle`
526 timestamp : `datetime.datetime`, optional
527 The timestamp to be associated with the measurements in the ingress
528 database. If this value is None, timestamp will be set by the run
529 time or current time.
530 id : `UUID`, optional
531 The UUID of the `MetricMeasurementBundle` within the butler. If
532 `None`, a new random UUID will be generated so that each record in
533 Sasquatch will have a unique value.
534 identifierFields: `Mapping`, optional
535 The keys and values in this mapping will be both added as fields
536 in the record, and used in creating a unique tag for the uploaded
537 dataset type. I.e. the timestamp, and the tag will be unique, and
538 each record will belong to one combination of such.
539 datasetIdentifier : `str`, optional
540 A string which will be used in creating unique identifier tags.
541 extraFields: `Mapping`, optional
542 Extra mapping keys and values that will be added as fields to the
543 dispatched record.
545 Returns
546 -------
547 result : `Mapping` of `str` to `list`
548 A mapping of metric name of list of metric measurement records.
549 status : `bool`
550 A status boolean indicating if some records had to be skipped due
551 to a problem parsing the bundle.
552 """
553 if id is None:
554 id = uuid4()
555 sid = str(id)
556 meta: dict[str, Any] = dict()
558 # Add other associated common fields
559 meta["id"] = sid
560 meta["run"] = run
561 meta["dataset_type"] = datasetType
563 # Check to see if the bundle declares a reference package
564 self._handleReferencePackage(meta, bundle)
566 # Handle the various timestamps that could be associated with a record
567 self._handleTimes(meta, bundle, run)
569 # Always use the supplied timestamp if one was passed to use.
570 if timestamp is not None:
571 meta["timestamp"] = timestamp.timestamp()
573 self._handleIdentifier(meta, identifierFields, datasetIdentifier, bundle)
575 # Add in any other fields that were supplied to the function call.
576 if extraFields is not None:
577 meta.update(extraFields)
579 metricRecords: dict[str, list[Any]] = dict()
581 # Record if any records needed skipped
582 resultsTrimmed = False
584 # Look at each of the metrics in the bundle (name, values)
585 for metric, measurements in bundle.items():
586 # Create a list which will contain the records for each measurement
587 # associated with metric.
588 metricRecordList = metricRecords.setdefault(f"{bundle.metricNamePrefix}{metric}", list())
590 record: dict[str, Any] = meta.copy()
592 # loop over each metric measurement within the metric
593 for measurement in measurements:
594 # need to extract any tags, package info, etc
595 note_key = f"{measurement.metric_name.metric}.metric_tags"
596 record["tags"] = dict(measurement.notes.items()).get(note_key, list())
598 # Missing values are replaced by 0 in sasquatch, see RFC-763.
599 name = ""
600 value = 0.0
601 match measurement.json:
602 case {"metric": name, "value": None}:
603 pass
604 case {"metric": name, "value": value}:
605 if math.isnan(value):
606 log.error(
607 "Measurement %s had a value that is a NaN, dispatch will be skipped",
608 measurement,
609 )
610 resultsTrimmed = True
611 continue
612 # JSON will not serialize np.float32, must cast.
613 if isinstance(value, np.float32):
614 value = float(value)
615 pass
616 case {"value": _}:
617 log.error("Measurement %s does not contain the key 'metric'", measurement)
618 resultsTrimmed = True
619 continue
620 case {"metric": _}:
621 log.error("Measurement %s does not contain the key 'value'", measurement)
622 resultsTrimmed = True
623 continue
624 record[name] = value
626 metricRecordList.append({"value": record})
627 return metricRecords, resultsTrimmed
629 def dispatch(
630 self,
631 bundle: MetricMeasurementBundle,
632 run: str,
633 datasetType: str,
634 timestamp: datetime.datetime | None = None,
635 id: UUID | None = None,
636 datasetIdentifier: str | None = None,
637 identifierFields: Mapping | None = None,
638 extraFields: Mapping | None = None,
639 ) -> None:
640 """Dispatch a `MetricMeasurementBundle` to Sasquatch.
642 Parameters
643 ----------
644 bundle : `MetricMeasurementBundle`
645 The bundle containing metric values to upload.
646 run : `str`
647 The run name to associate with these metric values. If this bundle
648 is also stored in the butler, this should be the butler run
649 collection the bundle is stored in the butler. This will be used
650 in generating uniqueness constraints in Sasquatch.
651 datasetType : `str`
652 The dataset type name associated with this
653 `MetricMeasurementBundle`.
654 timestamp : `datetime.datetime`, optional
655 The timestamp to be associated with the measurements in the ingress
656 database. If this value is None, timestamp will be set by the run
657 time or current time.
658 id : `UUID`, optional
659 The UUID of the `MetricMeasurementBundle` within the Butler. If
660 `None`, a new random UUID will be generated so that each record in
661 Sasquatch will have a unique value.
662 datasetIdentifier : `str`, optional
663 A string which will be used in creating unique identifier tags. If
664 `None`, a default value will be inserted.
665 identifierFields: `Mapping`, optional
666 The keys and values in this mapping will be both added as fields
667 in the record, and used in creating a unique tag for the uploaded
668 dataset type. I.e. the timestamp, and the tag will be unique, and
669 each record will belong to one combination of such. Examples of
670 entries would be things like visit or tract.
671 extraFields: `Mapping`, optional
672 Extra mapping keys and values that will be added as fields to the
673 dispatched record.
675 Raises
676 ------
677 SasquatchDispatchPartialFailure, SasquatchDispatchFailure
678 Raised if there were any errors in dispatching a bundle.
679 """
680 if id is None:
681 id = uuid4()
683 # Prepare the bundle by transforming it to a list of metric records
684 metricRecords, recordsTrimmed = self._prepareBundle(
685 bundle=bundle,
686 run=run,
687 datasetType=datasetType,
688 timestamp=timestamp,
689 id=id,
690 datasetIdentifier=datasetIdentifier,
691 identifierFields=identifierFields,
692 extraFields=extraFields,
693 )
695 headers = {"content-type": "application/vnd.kafka.avro.v2+json"}
696 data: dict[str, Any] = dict()
697 partialUpload = False
698 uploadFailed = []
700 with http_client() as session:
701 for metric, record in metricRecords.items():
702 # create the kafka topic if it does not already exist
703 if not self._create_topic(metric):
704 log.error("Topic not created, skipping dispatch of %s", metric)
705 continue
706 recordValue = record[0]["value"]
707 # Generate schemas for each record
708 data["value_schema"], schemaTrimmed = self._generateAvroSchema(metric, recordValue)
709 data["records"] = record
711 if schemaTrimmed:
712 partialUpload = True
714 try:
715 r = session.post(
716 f"{self.url}/topics/{self.namespace}.{metric}", json=data, headers=headers
717 )
718 r.raise_for_status()
719 log.debug("Succesfully sent data for metric %s", metric)
720 uploadFailed.append(False)
721 except requests.HTTPError as e:
722 log.error(
723 "There was a problem submitting the metric %s: %s, %s",
724 metric,
725 e.response.status_code,
726 e.response.reason,
727 )
728 uploadFailed.append(True)
729 partialUpload = True
730 except requests.RequestException as e:
731 # Don't log full stack trace because there may be lots
732 # of these.
733 log.error("There was a problem submitting the metric %s: %s", metric, e)
734 uploadFailed.append(True)
735 partialUpload = True
737 # There may be no metrics to try to upload, and thus the uploadFailed
738 # list may be empty, check before issuing failure
739 if len(uploadFailed) > 0 and all(uploadFailed):
740 raise SasquatchDispatchFailure("All records were unable to be uploaded.")
742 if partialUpload or recordsTrimmed:
743 raise SasquatchDispatchPartialFailure("One or more records may not have been uploaded entirely")
745 def dispatchRef(
746 self,
747 bundle: MetricMeasurementBundle,
748 ref: DatasetRef,
749 timestamp: datetime.datetime | None = None,
750 extraFields: Mapping | None = None,
751 datasetIdentifier: str | None = None,
752 ) -> None:
753 """Dispatch a `MetricMeasurementBundle` to Sasquatch with a known
754 `DatasetRef`.
756 Parameters
757 ----------
758 bundle : `MetricMeasurementBundle`
759 The bundle containing metric values to upload.
760 ref : `DatasetRef`
761 The `Butler` dataset ref corresponding to the input
762 `MetricMeasurementBundle`.
763 timestamp : `datetime.datetime`, optional
764 The timestamp to be associated with the measurements in the ingress
765 database. If this value is None, timestamp will be set by the run
766 time or current time.
767 extraFields: `Mapping`, optional
768 Extra mapping keys and values that will be added as fields to the
769 dispatched record if not None.
770 datasetIdentifier : `str`, optional
771 A string which will be used in creating unique identifier tags. If
772 None, a default value will be inserted.
774 Raises
775 ------
776 SasquatchDispatchPartialFailure, SasquatchDispatchFailure
777 Raised if there were any errors in dispatching a bundle.
778 """
779 # Parse the relevant info out of the dataset ref.
780 serializedRef = ref.to_simple()
781 id = serializedRef.id
782 if serializedRef.run is None:
783 run = "<unknown>"
784 else:
785 run = serializedRef.run
786 dstype = serializedRef.datasetType
787 datasetType = dstype.name if dstype is not None else ""
788 dataRefMapping = serializedRef.dataId.dataId if serializedRef.dataId else None
790 self.dispatch(
791 bundle,
792 run=run,
793 timestamp=timestamp,
794 datasetType=datasetType,
795 id=id,
796 identifierFields=dataRefMapping,
797 extraFields=extraFields,
798 datasetIdentifier=datasetIdentifier,
799 )