Coverage for python / lsst / analysis / tools / interfaces / datastore / _sasquatchDatastore.py: 52%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:09 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("SasquatchDatastore",) 

25 

26"""Sasquatch datastore""" 

27import logging 

28import os 

29from collections.abc import Collection, Iterable, Mapping, Sequence 

30from typing import TYPE_CHECKING, Any, ClassVar 

31 

32from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, StorageClass 

33from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreOpaqueTable 

34from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore 

35from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

36from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge 

37from lsst.resources import ResourcePath, ResourcePathExpression 

38 

39from . import SasquatchDispatcher 

40 

41if TYPE_CHECKING: 

42 from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey 

43 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager 

44 

45 

46log = logging.getLogger(__name__) 

47 

48 

49class SasquatchDatastore(GenericBaseDatastore): 

50 """Basic Datastore for writing to an in Sasquatch instance. 

51 

52 This Datastore is currently write only, meaning that it can dispatch data 

53 to a Sasquatch instance, but at the present can not be used to retrieve 

54 values. 

55 

56 

57 Parameters 

58 ---------- 

59 config : `DatastoreConfig` or `str` 

60 Configuration. 

61 bridgeManager : `DatastoreRegistryBridgeManager` 

62 Object that manages the interface between `Registry` and datastores. 

63 butlerRoot : `str`, optional 

64 Unused parameter. 

65 """ 

66 

67 defaultConfigFile: ClassVar[str | None] = "sasquatchDatastore.yaml" 

68 """Path to configuration defaults. Accessed within the ``configs`` resource 

69 or relative to a search path. Can be None if no defaults specified. 

70 """ 

71 

72 restProxyUrl: str 

73 """Url which points to the http rest proxy. This is where datasets will be 

74 dispatched to. 

75 """ 

76 

77 accessToken: str 

78 """Access token which is used to authenticate to the restProxy. 

79 """ 

80 

81 namespace: str 

82 """The namespace in Sasquatch where the uploaded metrics will be 

83 dispatched. 

84 

85 The namespace can be read from the environment using the 

86 ``$DAF_BUTLER_SASQUATCH_NAMESPACE`` environment variable. If that is not 

87 set the datastore config ``"namespace"`` field will be checked. A default 

88 value of "lsst.dm" is used if no other value can be obtained. 

89 """ 

90 

91 extra_fields: dict[str, str | int | float] | None 

92 """Extra key/value pairs that should be passed along with the metric 

93 when storing in Sasquatch. 

94 

95 Extra fields can be obtained both from the ``$SASQUATCH_EXTRAS`` 

96 environment variable and the `"extra_fields"` entry in the datastore 

97 config. The two sources of information are merged with the environment 

98 variable taking priority. The environment variable must have the form of 

99 "k1=v1;k2=v2". 

100 """ 

101 

102 def __init__( 

103 self, 

104 config: DatastoreConfig, 

105 bridgeManager: DatastoreRegistryBridgeManager, 

106 butlerRoot: str | None = None, 

107 ): 

108 super().__init__(config, bridgeManager) 

109 

110 # Name ourselves either using an explicit name or a name 

111 # derived from the (unexpanded) root. 

112 self.name = self.config.get("name", "{}@{}".format(type(self).__name__, self.config["restProxyUrl"])) 

113 log.debug("Creating datastore %s", self.name) 

114 

115 self._bridge = bridgeManager.register(self.name, ephemeral=False) 

116 

117 self.restProxyUrl = self.config["restProxyUrl"] 

118 

119 self.accessToken = self.config.get("accessToken", "na") 

120 

121 self.namespace = os.environ.get( 

122 "DAF_BUTLER_SASQUATCH_NAMESPACE", # Prioritize the environment 

123 self.config.get( 

124 "namespace", # Fallback to datastore config 

125 "lsst.dm", 

126 ), 

127 ) 

128 

129 extra_fields: dict[str, str | int | float] | None = self.config.get("extra_fields", {}) 

130 if extras_str := os.environ.get("SASQUATCH_EXTRAS"): 

131 for item in extras_str.split(";"): 

132 k, v = item.split("=") 

133 extra_fields[k] = v 

134 self.extra_fields = extra_fields if extra_fields else None 

135 

136 self._dispatcher = SasquatchDispatcher(self.restProxyUrl, self.accessToken, self.namespace) 

137 

138 @classmethod 

139 def _create_from_config( 

140 cls, 

141 config: DatastoreConfig, 

142 bridgeManager: DatastoreRegistryBridgeManager, 

143 butlerRoot: ResourcePathExpression | None, 

144 ) -> SasquatchDatastore: 

145 return SasquatchDatastore(config, bridgeManager) 

146 

147 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> SasquatchDatastore: 

148 return SasquatchDatastore(self.config, bridgeManager) 

149 

150 @property 

151 def bridge(self) -> DatastoreRegistryBridge: 

152 return self._bridge 

153 

154 def put( 

155 self, inMemoryDataset: Any, ref: DatasetRef, *, provenance: DatasetProvenance | None = None 

156 ) -> None: 

157 if self.constraints.isAcceptable(ref): 

158 self._dispatcher.dispatchRef(inMemoryDataset, ref, extraFields=self.extra_fields) 

159 else: 

160 log.debug("Could not put dataset type %s with Sasquatch datastore", ref.datasetType) 

161 raise DatasetTypeNotSupportedError( 

162 f"Could not put dataset type {ref.datasetType} with Sasquatch datastore" 

163 ) 

164 

165 def put_new(self, in_memory_dataset: Any, dataset_ref: DatasetRef) -> Mapping[str, DatasetRef]: 

166 # Docstring inherited from the base class. 

167 self.put(in_memory_dataset, dataset_ref) 

168 # Sasquatch is a sort of ephemeral, because we do not store its 

169 # datastore records in registry, so return empty dict. 

170 return {} 

171 

172 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]: 

173 return {} 

174 

175 def addStoredItemInfo(self, refs: Iterable[DatasetRef], infos: Iterable[Any]) -> None: 

176 raise NotImplementedError() 

177 

178 def getStoredItemsInfo(self, ref: DatasetRef) -> Sequence[Any]: 

179 raise NotImplementedError() 

180 

181 def removeStoredItemInfo(self, ref: DatasetRef) -> None: 

182 raise NotImplementedError() 

183 

184 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: 

185 log.debug("Sasquatch datastore does not support trashing skipping %s", ref) 

186 raise FileNotFoundError() 

187 

188 def emptyTrash( 

189 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False 

190 ) -> set[ResourcePath]: 

191 log.debug("Sasquatch datastore does not support trash, nothing to empty") 

192 return set() 

193 

194 def forget(self, ref: Iterable[DatasetRef]) -> None: 

195 pass 

196 

197 def exists(self, datasetRef: DatasetRef) -> bool: 

198 # sasquatch is not currently searchable 

199 return False 

200 

201 def knows(self, ref: DatasetRef) -> bool: 

202 return False 

203 

204 def get( 

205 self, 

206 datasetRef: DatasetRef, 

207 parameters: Mapping[str, Any] | None = None, 

208 storageClass: StorageClass | str | None = None, 

209 ) -> Any: 

210 raise FileNotFoundError() 

211 

212 def validateConfiguration( 

213 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False 

214 ) -> None: 

215 """Validate some of the configuration for this datastore. 

216 

217 Parameters 

218 ---------- 

219 entities : iterable of `DatasetRef`, `DatasetType`, or `StorageClass` 

220 Entities to test against this configuration. Can be differing 

221 types. 

222 logFailures : `bool`, optional 

223 If `True`, output a log message for every validation error 

224 detected. 

225 

226 Raises 

227 ------ 

228 DatastoreValidationError 

229 Raised if there is a validation problem with a configuration. 

230 All the problems are reported in a single exception. 

231 

232 Notes 

233 ----- 

234 This method is a no-op. 

235 """ 

236 return 

237 

238 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None: 

239 # Docstring is inherited from base class. 

240 return 

241 

242 def getLookupKeys(self) -> set[LookupKey]: 

243 # Docstring is inherited from base class. 

244 return self.constraints.getLookupKeys() 

245 

246 def needs_expanded_data_ids( 

247 self, 

248 transfer: str | None, 

249 entity: DatasetRef | DatasetType | StorageClass | None = None, 

250 ) -> bool: 

251 # Docstring inherited. 

252 return False 

253 

254 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: 

255 # Docstring inherited from the base class. 

256 return 

257 

258 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]: 

259 # Docstring inherited from the base class. 

260 

261 # Sasquatch Datastore records cannot be exported or imported. 

262 return {} 

263 

264 def getURI(self, datasetRef: DatasetRef, predict: bool = False) -> ResourcePath: 

265 raise NotImplementedError() 

266 

267 def getURIs(self, datasetRef: DatasetRef, predict: bool = False) -> DatasetRefURIs: 

268 raise NotImplementedError() 

269 

270 def ingest_zip(self, zip_path: ResourcePath, transfer: str | None) -> None: 

271 raise NotImplementedError() 

272 

273 def retrieveArtifacts( 

274 self, 

275 refs: Iterable[DatasetRef], 

276 destination: ResourcePath, 

277 transfer: str = "auto", 

278 preserve_path: bool = True, 

279 overwrite: bool = False, 

280 ) -> list[ResourcePath]: 

281 raise NotImplementedError() 

282 

283 @classmethod 

284 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: 

285 pass 

286 

287 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: 

288 # Docstring inherited from the base class. 

289 return {}