Coverage for python / lsst / analysis / tools / interfaces / datastore / _sasquatchDatastore.py: 50%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:19 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ("SasquatchDatastore",)
26import logging
27import os
28from collections.abc import Collection, Iterable, Mapping, Sequence
29from typing import TYPE_CHECKING, Any, ClassVar
31from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, StorageClass
32from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreOpaqueTable
33from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore
34from lsst.daf.butler.datastore.record_data import DatastoreRecordData
35from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge
36from lsst.resources import ResourcePath, ResourcePathExpression
38from . import SasquatchDispatcher, SasquatchDispatchFailure, SasquatchDispatchPartialFailure
40if TYPE_CHECKING:
41 from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey
42 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager
44"""Sasquatch datastore"""
46log = logging.getLogger(__name__)
49class SasquatchDatastore(GenericBaseDatastore):
50 """Basic Datastore for writing to an in Sasquatch instance.
52 This Datastore is currently write only, meaning that it can dispatch data
53 to a Sasquatch instance, but at the present can not be used to retrieve
54 values.
57 Parameters
58 ----------
59 config : `DatastoreConfig` or `str`
60 Configuration.
61 bridgeManager : `DatastoreRegistryBridgeManager`
62 Object that manages the interface between `Registry` and datastores.
63 butlerRoot : `str`, optional
64 Unused parameter.
65 """
67 defaultConfigFile: ClassVar[str | None] = "sasquatchDatastore.yaml"
68 """Path to configuration defaults. Accessed within the ``configs`` resource
69 or relative to a search path. Can be None if no defaults specified.
70 """
72 restProxyUrl: str
73 """Url which points to the http rest proxy. This is where datasets will be
74 dispatched to.
75 """
77 accessToken: str
78 """Access token which is used to authenticate to the restProxy.
79 """
81 namespace: str
82 """The namespace in Sasquatch where the uploaded metrics will be
83 dispatched.
85 The namespace can be read from the environment using the
86 ``$DAF_BUTLER_SASQUATCH_NAMESPACE`` environment variable. If that is not
87 set the datastore config ``"namespace"`` field will be checked. A default
88 value of "lsst.dm" is used if no other value can be obtained.
89 """
91 extra_fields: dict[str, str | int | float] | None
92 """Extra key/value pairs that should be passed along with the metric
93 when storing in Sasquatch.
95 Extra fields can be obtained both from the ``$SASQUATCH_EXTRAS``
96 environment variable and the `"extra_fields"` entry in the datastore
97 config. The two sources of information are merged with the environment
98 variable taking priority. The environment variable must have the form of
99 "k1=v1;k2=v2".
100 """
102 def __init__(
103 self,
104 config: DatastoreConfig,
105 bridgeManager: DatastoreRegistryBridgeManager,
106 butlerRoot: str | None = None,
107 ):
108 super().__init__(config, bridgeManager)
110 # Name ourselves either using an explicit name or a name
111 # derived from the (unexpanded) root.
112 self.name = self.config.get("name", "{}@{}".format(type(self).__name__, self.config["restProxyUrl"]))
113 log.debug("Creating datastore %s", self.name)
115 self._bridge = bridgeManager.register(self.name, ephemeral=False)
117 self.restProxyUrl = self.config["restProxyUrl"]
119 self.accessToken = self.config.get("accessToken", "na")
121 self.namespace = os.environ.get(
122 "DAF_BUTLER_SASQUATCH_NAMESPACE", # Prioritize the environment
123 self.config.get(
124 "namespace", # Fallback to datastore config
125 "lsst.dm",
126 ),
127 )
129 extra_fields: dict[str, str | int | float] | None = self.config.get("extra_fields", {})
130 if extras_str := os.environ.get("SASQUATCH_EXTRAS"):
131 for item in extras_str.split(";"):
132 k, v = item.split("=")
133 extra_fields[k] = v
134 self.extra_fields = extra_fields if extra_fields else None
136 self._dispatcher = SasquatchDispatcher(self.restProxyUrl, self.accessToken, self.namespace)
138 @classmethod
139 def _create_from_config(
140 cls,
141 config: DatastoreConfig,
142 bridgeManager: DatastoreRegistryBridgeManager,
143 butlerRoot: ResourcePathExpression | None,
144 ) -> SasquatchDatastore:
145 return SasquatchDatastore(config, bridgeManager)
147 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> SasquatchDatastore:
148 return SasquatchDatastore(self.config, bridgeManager)
150 @property
151 def bridge(self) -> DatastoreRegistryBridge:
152 return self._bridge
154 def put(
155 self, inMemoryDataset: Any, ref: DatasetRef, *, provenance: DatasetProvenance | None = None
156 ) -> None:
157 if self.constraints.isAcceptable(ref):
158 try:
159 self._dispatcher.dispatchRef(inMemoryDataset, ref, extraFields=self.extra_fields)
160 except SasquatchDispatchFailure:
161 log.warning("Failed to dispatch metric bundle to Sasquatch.")
162 except SasquatchDispatchPartialFailure:
163 log.warning("Only some of the metrics were successfully dispatched to Sasquatch.")
164 else:
165 log.debug("Could not put dataset type %s with Sasquatch datastore", ref.datasetType)
166 raise DatasetTypeNotSupportedError(
167 f"Could not put dataset type {ref.datasetType} with Sasquatch datastore"
168 )
170 def put_new(self, in_memory_dataset: Any, dataset_ref: DatasetRef) -> Mapping[str, DatasetRef]:
171 # Docstring inherited from the base class.
172 self.put(in_memory_dataset, dataset_ref)
173 # Sasquatch is a sort of ephemeral, because we do not store its
174 # datastore records in registry, so return empty dict.
175 return {}
177 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]:
178 return {}
180 def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) -> None:
181 raise NotImplementedError()
183 def getStoredItemsInfo(self, ref: DatasetRef) -> Sequence[Any]:
184 raise NotImplementedError()
186 def removeStoredItemInfo(self, ref: DatasetRef) -> None:
187 raise NotImplementedError()
189 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None:
190 log.debug("Sasquatch datastore does not support trashing skipping %s", ref)
191 raise FileNotFoundError()
193 def emptyTrash(
194 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False
195 ) -> set[ResourcePath]:
196 log.debug("Sasquatch datastore does not support trash, nothing to empty")
197 return set()
199 def forget(self, ref: Iterable[DatasetRef]) -> None:
200 pass
202 def exists(self, datasetRef: DatasetRef) -> bool:
203 # sasquatch is not currently searchable
204 return False
206 def knows(self, ref: DatasetRef) -> bool:
207 return False
209 def get(
210 self,
211 datasetRef: DatasetRef,
212 parameters: Mapping[str, Any] | None = None,
213 storageClass: StorageClass | str | None = None,
214 ) -> Any:
215 raise FileNotFoundError()
217 def validateConfiguration(
218 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False
219 ) -> None:
220 """Validate some of the configuration for this datastore.
222 Parameters
223 ----------
224 entities : iterable of `DatasetRef`, `DatasetType`, or `StorageClass`
225 Entities to test against this configuration. Can be differing
226 types.
227 logFailures : `bool`, optional
228 If `True`, output a log message for every validation error
229 detected.
231 Raises
232 ------
233 DatastoreValidationError
234 Raised if there is a validation problem with a configuration.
235 All the problems are reported in a single exception.
237 Notes
238 -----
239 This method is a no-op.
240 """
241 return
243 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None:
244 # Docstring is inherited from base class.
245 return
247 def getLookupKeys(self) -> set[LookupKey]:
248 # Docstring is inherited from base class.
249 return self.constraints.getLookupKeys()
251 def needs_expanded_data_ids(
252 self,
253 transfer: str | None,
254 entity: DatasetRef | DatasetType | StorageClass | None = None,
255 ) -> bool:
256 # Docstring inherited.
257 return False
259 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None:
260 # Docstring inherited from the base class.
261 return
263 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]:
264 # Docstring inherited from the base class.
266 # Sasquatch Datastore records cannot be exported or imported.
267 return {}
269 def getURI(self, datasetRef: DatasetRef, predict: bool = False) -> ResourcePath:
270 raise NotImplementedError()
272 def getURIs(self, datasetRef: DatasetRef, predict: bool = False) -> DatasetRefURIs:
273 raise NotImplementedError()
275 def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None:
276 raise NotImplementedError()
278 def retrieveArtifacts(
279 self,
280 refs: Iterable[DatasetRef],
281 destination: ResourcePath,
282 transfer: str = "auto",
283 preserve_path: bool = True,
284 overwrite: bool = False,
285 ) -> list[ResourcePath]:
286 raise NotImplementedError()
288 @classmethod
289 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None:
290 pass
292 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
293 # Docstring inherited from the base class.
294 return {}