Coverage for python / lsst / daf / butler / remote_butler / _registry.py: 0%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:17 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import contextlib 

31from collections.abc import Iterable, Iterator, Mapping, Sequence 

32from typing import Any 

33 

34from lsst.daf.butler import Butler 

35from lsst.utils.iteration import ensure_iterable 

36 

37from .._collection_type import CollectionType 

38from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef 

39from .._dataset_type import DatasetType 

40from .._storage_class import StorageClassFactory 

41from .._timespan import Timespan 

42from ..dimensions import ( 

43 DataCoordinate, 

44 DataId, 

45 DimensionElement, 

46 DimensionGroup, 

47 DimensionRecord, 

48 DimensionUniverse, 

49) 

50from ..registry import ( 

51 CollectionArgType, 

52 CollectionSummary, 

53 CollectionTypeError, 

54 DatasetTypeError, 

55 RegistryDefaults, 

56) 

57from ..registry._registry_base import RegistryBase 

58from ._collection_args import ( 

59 convert_collection_arg_to_glob_string_list, 

60 convert_dataset_type_arg_to_glob_string_list, 

61) 

62from ._defaults import DefaultsHolder 

63from ._http_connection import RemoteButlerHttpConnection, parse_model 

64from .server_models import ( 

65 ExpandDataIdRequestModel, 

66 ExpandDataIdResponseModel, 

67 GetCollectionSummaryResponseModel, 

68 QueryDatasetTypesRequestModel, 

69 QueryDatasetTypesResponseModel, 

70) 

71 

72 

73class RemoteButlerRegistry(RegistryBase): 

74 """Implementation of the `Registry` interface for `RemoteButler. 

75 

76 Parameters 

77 ---------- 

78 butler : `Butler` 

79 Butler instance to which this registry delegates operations. 

80 defaults : `DefaultHolder` 

81 Reference to object containing default collections and data ID. 

82 connection : `RemoteButlerHttpConnection` 

83 HTTP connection to Butler server for looking up data. 

84 """ 

85 

86 def __init__(self, butler: Butler, defaults: DefaultsHolder, connection: RemoteButlerHttpConnection): 

87 super().__init__(butler) 

88 self._connection = connection 

89 self._defaults = defaults 

90 

91 def isWriteable(self) -> bool: 

92 return self._butler.isWriteable() 

93 

94 @property 

95 def dimensions(self) -> DimensionUniverse: 

96 return self._butler.dimensions 

97 

98 @property 

99 def defaults(self) -> RegistryDefaults: 

100 return self._defaults.get() 

101 

102 @defaults.setter 

103 def defaults(self, value: RegistryDefaults) -> None: 

104 value.finish(self) 

105 self._defaults.set(value) 

106 

107 def refresh(self) -> None: 

108 # In theory the server should manage all necessary invalidation of 

109 # state. 

110 pass 

111 

112 def refresh_collection_summaries(self) -> None: 

113 # Docstring inherited from a base class. 

114 raise NotImplementedError() 

115 

116 @contextlib.contextmanager 

117 def caching_context(self) -> Iterator[None]: 

118 yield 

119 

120 @contextlib.contextmanager 

121 def transaction(self, *, savepoint: bool = False) -> Iterator[None]: 

122 # RemoteButler will never support transactions 

123 raise NotImplementedError() 

124 

125 def registerCollection( 

126 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None 

127 ) -> bool: 

128 raise NotImplementedError() 

129 

130 def getCollectionType(self, name: str) -> CollectionType: 

131 return self._butler.collections.get_info(name).type 

132 

133 def registerRun(self, name: str, doc: str | None = None) -> bool: 

134 raise NotImplementedError() 

135 

136 def removeCollection(self, name: str) -> None: 

137 raise NotImplementedError() 

138 

139 def getCollectionChain(self, parent: str) -> Sequence[str]: 

140 info = self._butler.collections.get_info(parent) 

141 if info.type is not CollectionType.CHAINED: 

142 raise CollectionTypeError(f"Collection '{parent}' has type {info.type.name}, not CHAINED.") 

143 return info.children 

144 

145 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None: 

146 raise NotImplementedError() 

147 

148 def getCollectionParentChains(self, collection: str) -> set[str]: 

149 info = self._butler.collections.get_info(collection, include_parents=True) 

150 assert info.parents is not None, "Requested list of parents from server, but it did not send them." 

151 return set(info.parents) 

152 

153 def getCollectionDocumentation(self, collection: str) -> str | None: 

154 doc = self._butler.collections.get_info(collection).doc 

155 if not doc: 

156 return None 

157 return doc 

158 

159 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None: 

160 raise NotImplementedError() 

161 

162 def getCollectionSummary(self, collection: str) -> CollectionSummary: 

163 response = self._connection.get("collection_summary", {"name": collection}) 

164 parsed = parse_model(response, GetCollectionSummaryResponseModel) 

165 return CollectionSummary.from_simple(parsed.summary, self.dimensions) 

166 

167 def registerDatasetType(self, datasetType: DatasetType) -> bool: 

168 raise NotImplementedError() 

169 

170 def removeDatasetType(self, name: str | tuple[str, ...]) -> None: 

171 raise NotImplementedError() 

172 

173 def getDatasetType(self, name: str) -> DatasetType: 

174 return self._butler.get_dataset_type(name) 

175 

176 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool: 

177 raise NotImplementedError() 

178 

179 def findDataset( 

180 self, 

181 datasetType: DatasetType | str, 

182 dataId: DataId | None = None, 

183 *, 

184 collections: CollectionArgType | None = None, 

185 timespan: Timespan | None = None, 

186 datastore_records: bool = False, 

187 **kwargs: Any, 

188 ) -> DatasetRef | None: 

189 # Components are supported by Butler.find_dataset, but are required to 

190 # raise an exception in registry.findDataset (see 

191 # RegistryTests.testComponentLookups). Apparently the registry version 

192 # used to support components, but at some point a decision was made to 

193 # draw a hard architectural boundary between registry and datastore so 

194 # that registry is no longer allowed to know about components. 

195 if _is_component_dataset_type(datasetType): 

196 raise DatasetTypeError( 

197 "Component dataset types are not supported;" 

198 " use DatasetRef or DatasetType methods to obtain components from parents instead." 

199 ) 

200 

201 if collections is not None: 

202 collections = convert_collection_arg_to_glob_string_list(collections) 

203 

204 return self._butler.find_dataset( 

205 datasetType, 

206 dataId, 

207 collections=collections, 

208 timespan=timespan, 

209 datastore_records=datastore_records, 

210 **kwargs, 

211 ) 

212 

213 def insertDatasets( 

214 self, 

215 datasetType: DatasetType | str, 

216 dataIds: Iterable[DataId], 

217 run: str | None = None, 

218 expand: bool = True, 

219 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, 

220 ) -> list[DatasetRef]: 

221 raise NotImplementedError() 

222 

223 def _importDatasets( 

224 self, 

225 datasets: Iterable[DatasetRef], 

226 expand: bool = True, 

227 assume_new: bool = False, 

228 ) -> list[DatasetRef]: 

229 raise NotImplementedError() 

230 

231 def getDataset(self, id: DatasetId) -> DatasetRef | None: 

232 return self._butler.get_dataset(id) 

233 

234 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]: 

235 raise NotImplementedError() 

236 

237 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None: 

238 raise NotImplementedError() 

239 

240 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

241 raise NotImplementedError() 

242 

243 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None: 

244 raise NotImplementedError() 

245 

246 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None: 

247 raise NotImplementedError() 

248 

249 def decertify( 

250 self, 

251 collection: str, 

252 datasetType: str | DatasetType, 

253 timespan: Timespan, 

254 *, 

255 dataIds: Iterable[DataId] | None = None, 

256 ) -> None: 

257 raise NotImplementedError() 

258 

259 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]: 

260 raise NotImplementedError() 

261 

262 def expandDataId( 

263 self, 

264 dataId: DataId | None = None, 

265 *, 

266 dimensions: Iterable[str] | DimensionGroup | None = None, 

267 records: Mapping[str, DimensionRecord | None] | None = None, 

268 withDefaults: bool = True, 

269 **kwargs: Any, 

270 ) -> DataCoordinate: 

271 standardized = DataCoordinate.standardize( 

272 dataId, 

273 dimensions=dimensions, 

274 universe=self.dimensions, 

275 defaults=self.defaults.dataId if withDefaults else None, 

276 **kwargs, 

277 ) 

278 if standardized.hasRecords(): 

279 return standardized 

280 

281 request = ExpandDataIdRequestModel(data_id=standardized.to_simple().dataId) 

282 response = self._connection.post("expand_data_id", request) 

283 model = parse_model(response, ExpandDataIdResponseModel) 

284 return DataCoordinate.from_simple(model.data_coordinate, self.dimensions) 

285 

286 def insertDimensionData( 

287 self, 

288 element: DimensionElement | str, 

289 *data: Mapping[str, Any] | DimensionRecord, 

290 conform: bool = True, 

291 replace: bool = False, 

292 skip_existing: bool = False, 

293 ) -> None: 

294 raise NotImplementedError() 

295 

296 def syncDimensionData( 

297 self, 

298 element: DimensionElement | str, 

299 row: Mapping[str, Any] | DimensionRecord, 

300 conform: bool = True, 

301 update: bool = False, 

302 ) -> bool | dict[str, Any]: 

303 raise NotImplementedError() 

304 

305 def queryDatasetTypes( 

306 self, 

307 expression: Any = ..., 

308 *, 

309 components: bool = False, 

310 missing: list[str] | None = None, 

311 ) -> Iterable[DatasetType]: 

312 query = convert_dataset_type_arg_to_glob_string_list(expression) 

313 request = QueryDatasetTypesRequestModel(search=query.search) 

314 response = self._connection.post("query_dataset_types", request) 

315 model = parse_model(response, QueryDatasetTypesResponseModel) 

316 if missing is not None: 

317 missing.extend(model.missing) 

318 

319 result = [] 

320 for dt in model.dataset_types: 

321 if dt.name in query.explicit_dataset_types: 

322 # Users are permitted to pass in already-existing DatasetType 

323 # instances, and we are supposed to preserve their overridden 

324 # storage class etc. 

325 result.append(query.explicit_dataset_types[dt.name]) 

326 else: 

327 result.append(DatasetType.from_simple(dt, self.dimensions)) 

328 

329 return result 

330 

331 def queryCollections( 

332 self, 

333 expression: Any = ..., 

334 datasetType: DatasetType | None = None, 

335 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(), 

336 flattenChains: bool = False, 

337 includeChains: bool | None = None, 

338 ) -> Sequence[str]: 

339 return self._butler.collections.query( 

340 expression, 

341 collection_types=set(ensure_iterable(collectionTypes)), 

342 flatten_chains=flattenChains, 

343 include_chains=includeChains, 

344 ) 

345 

346 @property 

347 def storageClasses(self) -> StorageClassFactory: 

348 return self._butler.storageClasses 

349 

350 

351def _is_component_dataset_type(dataset_type: DatasetType | str) -> bool: 

352 """Return true if the given dataset type refers to a component.""" 

353 if isinstance(dataset_type, DatasetType): 

354 return dataset_type.component() is not None 

355 elif isinstance(dataset_type, str): 

356 parent, component = DatasetType.splitDatasetTypeName(dataset_type) 

357 return component is not None 

358 else: 

359 raise TypeError(f"Expected DatasetType or str, got {dataset_type!r}")