Coverage for python / lsst / analysis / tools / interfaces / datastore / _sasquatchDatastore.py: 50%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:27 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("SasquatchDatastore",) 

25 

26import logging 

27import os 

28from collections.abc import Collection, Iterable, Mapping, Sequence 

29from typing import TYPE_CHECKING, Any, ClassVar 

30 

31from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, StorageClass 

32from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreOpaqueTable 

33from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore 

34from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

35from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge 

36from lsst.resources import ResourcePath, ResourcePathExpression 

37 

38from . import SasquatchDispatcher, SasquatchDispatchFailure, SasquatchDispatchPartialFailure 

39 

40if TYPE_CHECKING: 

41 from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey 

42 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager 

43 

44"""Sasquatch datastore""" 

45 

46log = logging.getLogger(__name__) 

47 

48 

49class SasquatchDatastore(GenericBaseDatastore): 

50 """Basic Datastore for writing to an in Sasquatch instance. 

51 

52 This Datastore is currently write only, meaning that it can dispatch data 

53 to a Sasquatch instance, but at the present can not be used to retrieve 

54 values. 

55 

56 

57 Parameters 

58 ---------- 

59 config : `DatastoreConfig` or `str` 

60 Configuration. 

61 bridgeManager : `DatastoreRegistryBridgeManager` 

62 Object that manages the interface between `Registry` and datastores. 

63 butlerRoot : `str`, optional 

64 Unused parameter. 

65 """ 

66 

67 defaultConfigFile: ClassVar[str | None] = "sasquatchDatastore.yaml" 

68 """Path to configuration defaults. Accessed within the ``configs`` resource 

69 or relative to a search path. Can be None if no defaults specified. 

70 """ 

71 

72 restProxyUrl: str 

73 """Url which points to the http rest proxy. This is where datasets will be 

74 dispatched to. 

75 """ 

76 

77 accessToken: str 

78 """Access token which is used to authenticate to the restProxy. 

79 """ 

80 

81 namespace: str 

82 """The namespace in Sasquatch where the uploaded metrics will be 

83 dispatched. 

84 

85 The namespace can be read from the environment using the 

86 ``$DAF_BUTLER_SASQUATCH_NAMESPACE`` environment variable. If that is not 

87 set the datastore config ``"namespace"`` field will be checked. A default 

88 value of "lsst.dm" is used if no other value can be obtained. 

89 """ 

90 

91 extra_fields: dict[str, str | int | float] | None 

92 """Extra key/value pairs that should be passed along with the metric 

93 when storing in Sasquatch. 

94 

95 Extra fields can be obtained both from the ``$SASQUATCH_EXTRAS`` 

96 environment variable and the `"extra_fields"` entry in the datastore 

97 config. The two sources of information are merged with the environment 

98 variable taking priority. The environment variable must have the form of 

99 "k1=v1;k2=v2". 

100 """ 

101 

102 def __init__( 

103 self, 

104 config: DatastoreConfig, 

105 bridgeManager: DatastoreRegistryBridgeManager, 

106 butlerRoot: str | None = None, 

107 ): 

108 super().__init__(config, bridgeManager) 

109 

110 # Name ourselves either using an explicit name or a name 

111 # derived from the (unexpanded) root. 

112 self.name = self.config.get("name", "{}@{}".format(type(self).__name__, self.config["restProxyUrl"])) 

113 log.debug("Creating datastore %s", self.name) 

114 

115 self._bridge = bridgeManager.register(self.name, ephemeral=False) 

116 

117 self.restProxyUrl = self.config["restProxyUrl"] 

118 

119 self.accessToken = self.config.get("accessToken", "na") 

120 

121 self.namespace = os.environ.get( 

122 "DAF_BUTLER_SASQUATCH_NAMESPACE", # Prioritize the environment 

123 self.config.get( 

124 "namespace", # Fallback to datastore config 

125 "lsst.dm", 

126 ), 

127 ) 

128 

129 extra_fields: dict[str, str | int | float] | None = self.config.get("extra_fields", {}) 

130 if extras_str := os.environ.get("SASQUATCH_EXTRAS"): 

131 for item in extras_str.split(";"): 

132 k, v = item.split("=") 

133 extra_fields[k] = v 

134 self.extra_fields = extra_fields if extra_fields else None 

135 

136 self._dispatcher = SasquatchDispatcher(self.restProxyUrl, self.accessToken, self.namespace) 

137 

138 @classmethod 

139 def _create_from_config( 

140 cls, 

141 config: DatastoreConfig, 

142 bridgeManager: DatastoreRegistryBridgeManager, 

143 butlerRoot: ResourcePathExpression | None, 

144 ) -> SasquatchDatastore: 

145 return SasquatchDatastore(config, bridgeManager) 

146 

147 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> SasquatchDatastore: 

148 return SasquatchDatastore(self.config, bridgeManager) 

149 

150 @property 

151 def bridge(self) -> DatastoreRegistryBridge: 

152 return self._bridge 

153 

154 def put( 

155 self, inMemoryDataset: Any, ref: DatasetRef, *, provenance: DatasetProvenance | None = None 

156 ) -> None: 

157 if self.constraints.isAcceptable(ref): 

158 try: 

159 self._dispatcher.dispatchRef(inMemoryDataset, ref, extraFields=self.extra_fields) 

160 except SasquatchDispatchFailure: 

161 log.warning("Failed to dispatch metric bundle to Sasquatch.") 

162 except SasquatchDispatchPartialFailure: 

163 log.warning("Only some of the metrics were successfully dispatched to Sasquatch.") 

164 else: 

165 log.debug("Could not put dataset type %s with Sasquatch datastore", ref.datasetType) 

166 raise DatasetTypeNotSupportedError( 

167 f"Could not put dataset type {ref.datasetType} with Sasquatch datastore" 

168 ) 

169 

170 def put_new(self, in_memory_dataset: Any, dataset_ref: DatasetRef) -> Mapping[str, DatasetRef]: 

171 # Docstring inherited from the base class. 

172 self.put(in_memory_dataset, dataset_ref) 

173 # Sasquatch is a sort of ephemeral, because we do not store its 

174 # datastore records in registry, so return empty dict. 

175 return {} 

176 

177 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]: 

178 return {} 

179 

180 def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) -> None: 

181 raise NotImplementedError() 

182 

183 def getStoredItemsInfo(self, ref: DatasetRef) -> Sequence[Any]: 

184 raise NotImplementedError() 

185 

186 def removeStoredItemInfo(self, ref: DatasetRef) -> None: 

187 raise NotImplementedError() 

188 

189 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: 

190 log.debug("Sasquatch datastore does not support trashing skipping %s", ref) 

191 raise FileNotFoundError() 

192 

193 def emptyTrash( 

194 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False 

195 ) -> set[ResourcePath]: 

196 log.debug("Sasquatch datastore does not support trash, nothing to empty") 

197 return set() 

198 

199 def forget(self, ref: Iterable[DatasetRef]) -> None: 

200 pass 

201 

202 def exists(self, datasetRef: DatasetRef) -> bool: 

203 # sasquatch is not currently searchable 

204 return False 

205 

206 def knows(self, ref: DatasetRef) -> bool: 

207 return False 

208 

209 def get( 

210 self, 

211 datasetRef: DatasetRef, 

212 parameters: Mapping[str, Any] | None = None, 

213 storageClass: StorageClass | str | None = None, 

214 ) -> Any: 

215 raise FileNotFoundError() 

216 

217 def validateConfiguration( 

218 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False 

219 ) -> None: 

220 """Validate some of the configuration for this datastore. 

221 

222 Parameters 

223 ---------- 

224 entities : iterable of `DatasetRef`, `DatasetType`, or `StorageClass` 

225 Entities to test against this configuration. Can be differing 

226 types. 

227 logFailures : `bool`, optional 

228 If `True`, output a log message for every validation error 

229 detected. 

230 

231 Raises 

232 ------ 

233 DatastoreValidationError 

234 Raised if there is a validation problem with a configuration. 

235 All the problems are reported in a single exception. 

236 

237 Notes 

238 ----- 

239 This method is a no-op. 

240 """ 

241 return 

242 

243 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None: 

244 # Docstring is inherited from base class. 

245 return 

246 

247 def getLookupKeys(self) -> set[LookupKey]: 

248 # Docstring is inherited from base class. 

249 return self.constraints.getLookupKeys() 

250 

251 def needs_expanded_data_ids( 

252 self, 

253 transfer: str | None, 

254 entity: DatasetRef | DatasetType | StorageClass | None = None, 

255 ) -> bool: 

256 # Docstring inherited. 

257 return False 

258 

259 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: 

260 # Docstring inherited from the base class. 

261 return 

262 

263 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]: 

264 # Docstring inherited from the base class. 

265 

266 # Sasquatch Datastore records cannot be exported or imported. 

267 return {} 

268 

269 def getURI(self, datasetRef: DatasetRef, predict: bool = False) -> ResourcePath: 

270 raise NotImplementedError() 

271 

272 def getURIs(self, datasetRef: DatasetRef, predict: bool = False) -> DatasetRefURIs: 

273 raise NotImplementedError() 

274 

275 def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: 

276 raise NotImplementedError() 

277 

278 def retrieveArtifacts( 

279 self, 

280 refs: Iterable[DatasetRef], 

281 destination: ResourcePath, 

282 transfer: str = "auto", 

283 preserve_path: bool = True, 

284 overwrite: bool = False, 

285 ) -> list[ResourcePath]: 

286 raise NotImplementedError() 

287 

288 @classmethod 

289 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: 

290 pass 

291 

292 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: 

293 # Docstring inherited from the base class. 

294 return {}