Coverage for python / lsst / daf / butler / datastores / file_datastore / get.py: 20%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28__all__ = ( 

29 "DatasetLocationInformation", 

30 "DatastoreFileGetInformation", 

31 "generate_datastore_get_information", 

32 "get_dataset_as_python_object_from_get_info", 

33) 

34 

35from collections.abc import Mapping 

36from dataclasses import dataclass 

37from typing import Any, TypeAlias 

38 

39from lsst.daf.butler import ( 

40 DatasetRef, 

41 FileDescriptor, 

42 FileIntegrityError, 

43 Formatter, 

44 FormatterV1inV2, 

45 FormatterV2, 

46 Location, 

47 StorageClass, 

48) 

49from lsst.daf.butler.datastore.cache_manager import AbstractDatastoreCacheManager 

50from lsst.daf.butler.datastore.generic_base import post_process_get 

51from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo 

52from lsst.utils.introspection import get_instance_of 

53from lsst.utils.logging import getLogger 

54 

55log = getLogger(__name__) 

56 

57DatasetLocationInformation: TypeAlias = tuple[Location, StoredFileInfo] 

58 

59 

60@dataclass(frozen=True) 

61class DatastoreFileGetInformation: 

62 """Collection of useful parameters needed to retrieve a file from 

63 a Datastore. 

64 """ 

65 

66 location: Location 

67 """The location from which to read the dataset.""" 

68 

69 formatter: Formatter | FormatterV2 

70 """The `Formatter` to use to deserialize the dataset.""" 

71 

72 info: StoredFileInfo 

73 """Stored information about this file and its formatter.""" 

74 

75 assemblerParams: Mapping[str, Any] 

76 """Parameters to use for post-processing the retrieved dataset.""" 

77 

78 formatterParams: Mapping[str, Any] 

79 """Parameters that were understood by the associated formatter.""" 

80 

81 component: str | None 

82 """The component to be retrieved (can be `None`).""" 

83 

84 readStorageClass: StorageClass 

85 """The `StorageClass` of the dataset being read.""" 

86 

87 

88def generate_datastore_get_information( 

89 fileLocations: list[DatasetLocationInformation], 

90 *, 

91 ref: DatasetRef, 

92 parameters: Mapping[str, Any] | None, 

93 readStorageClass: StorageClass | None = None, 

94) -> list[DatastoreFileGetInformation]: 

95 """Process parameters and instantiate formatters for in preparation for 

96 retrieving an artifact and converting it to a Python object. 

97 

98 Parameters 

99 ---------- 

100 fileLocations : `list` [`DatasetLocationInformation`] 

101 List of file locations for this artifact and their associated datastore 

102 records. 

103 ref : `DatasetRef` 

104 The registry information associated with this artifact. 

105 parameters : `~collections.abc.Mapping` [`str`, `typing.Any`] 

106 `StorageClass` and `Formatter` parameters. 

107 readStorageClass : `StorageClass` | `None`, optional 

108 The StorageClass to use when ultimately returning the resulting object 

109 from the get. Defaults to the `StorageClass` specified by ``ref``. 

110 

111 Returns 

112 ------- 

113 getInfo : `list` [`DatastoreFileGetInformation`] 

114 The parameters needed to retrieve each file. 

115 """ 

116 if readStorageClass is None: 

117 readStorageClass = ref.datasetType.storageClass 

118 

119 # Is this a component request? 

120 refComponent = ref.datasetType.component() 

121 

122 disassembled = len(fileLocations) > 1 

123 fileGetInfo = [] 

124 for location, storedFileInfo in fileLocations: 

125 # The storage class used to write the file 

126 writeStorageClass = storedFileInfo.storageClass 

127 thisReadStorageClass = readStorageClass 

128 

129 # If this has been disassembled we need read to match the write 

130 # except for if a component has specified an override. 

131 if disassembled and storedFileInfo.component != refComponent: 

132 thisReadStorageClass = writeStorageClass 

133 

134 formatter = get_instance_of( 

135 storedFileInfo.formatter, 

136 FileDescriptor( 

137 location, 

138 readStorageClass=thisReadStorageClass, 

139 storageClass=writeStorageClass, 

140 parameters=parameters, 

141 component=storedFileInfo.component, 

142 ), 

143 dataId=ref.dataId, 

144 ref=ref, 

145 ) 

146 

147 formatterParams, notFormatterParams = formatter.segregate_parameters() 

148 

149 # Of the remaining parameters, extract the ones supported by 

150 # this StorageClass (for components not all will be handled) 

151 assemblerParams = thisReadStorageClass.filterParameters(notFormatterParams) 

152 

153 # The ref itself could be a component if the dataset was 

154 # disassembled by butler, or we disassembled in datastore and 

155 # components came from the datastore records 

156 component = storedFileInfo.component if storedFileInfo.component else refComponent 

157 

158 fileGetInfo.append( 

159 DatastoreFileGetInformation( 

160 location, 

161 formatter, 

162 storedFileInfo, 

163 assemblerParams, 

164 formatterParams, 

165 component, 

166 thisReadStorageClass, 

167 ) 

168 ) 

169 

170 return fileGetInfo 

171 

172 

173def _read_artifact_into_memory( 

174 getInfo: DatastoreFileGetInformation, 

175 ref: DatasetRef, 

176 cache_manager: AbstractDatastoreCacheManager, 

177 isComponent: bool = False, 

178) -> Any: 

179 """Read the artifact from datastore into in memory object. 

180 

181 Parameters 

182 ---------- 

183 getInfo : `DatastoreFileGetInformation` 

184 Information about the artifact within the datastore. 

185 ref : `DatasetRef` 

186 The registry information associated with this artifact. 

187 isComponent : `bool` 

188 Flag to indicate if a component is being read from this artifact. 

189 cache_manager : `AbstractDatastoreCacheManager` 

190 The cache manager to use for caching retrieved files 

191 

192 Returns 

193 ------- 

194 inMemoryDataset : `object` 

195 The artifact as a python object. 

196 """ 

197 location = getInfo.location 

198 uri = location.uri 

199 log.debug("Accessing data from %s", uri) 

200 

201 # Cannot recalculate checksum but can compare size as a quick check 

202 # Do not do this if the size is negative since that indicates 

203 # we do not know. 

204 recorded_size = getInfo.info.file_size 

205 

206 formatter = getInfo.formatter 

207 

208 if isinstance(formatter, Formatter): 

209 formatter = FormatterV1inV2( 

210 formatter.file_descriptor, 

211 ref=ref, 

212 formatter=formatter, 

213 write_parameters=formatter.write_parameters, 

214 write_recipes=formatter.write_recipes, 

215 ) 

216 

217 assert isinstance(formatter, FormatterV2) 

218 

219 try: 

220 result = formatter.read( 

221 component=getInfo.component if isComponent else None, 

222 expected_size=recorded_size, 

223 cache_manager=cache_manager, 

224 ) 

225 except (FileNotFoundError, FileIntegrityError): 

226 # This is expected for the case where the resource is missing 

227 # or the information we passed to the formatter about the file size 

228 # is incorrect. 

229 # Allow them to propagate up. 

230 raise 

231 except Exception as e: 

232 # For clarity, include any notes that may have been added by the 

233 # formatter to this new exception. 

234 notes = "\n".join(getattr(e, "__notes__", [])) 

235 if notes: 

236 notes = "\n" + notes 

237 raise ValueError( 

238 f"Failure from formatter '{formatter.name()}' for dataset {ref.id}" 

239 f" ({ref.datasetType.name} from {uri}): {e}{notes}" 

240 ) from e 

241 

242 return post_process_get( 

243 result, ref.datasetType.storageClass, getInfo.assemblerParams, isComponent=isComponent 

244 ) 

245 

246 

247def get_dataset_as_python_object_from_get_info( 

248 allGetInfo: list[DatastoreFileGetInformation], 

249 *, 

250 ref: DatasetRef, 

251 parameters: Mapping[str, Any] | None, 

252 cache_manager: AbstractDatastoreCacheManager, 

253) -> Any: 

254 """Retrieve an artifact from storage and return it as a Python object. 

255 

256 Parameters 

257 ---------- 

258 allGetInfo : `list` [`DatastoreFileGetInformation`] 

259 Pre-processed information about each file associated with this 

260 artifact. 

261 ref : `DatasetRef` 

262 The registry information associated with this artifact. 

263 parameters : `~collections.abc.Mapping` [`str`, `typing.Any`] 

264 `StorageClass` and `Formatter` parameters. 

265 cache_manager : `AbstractDatastoreCacheManager` 

266 The cache manager to use for caching retrieved files. 

267 

268 Returns 

269 ------- 

270 python_object : `typing.Any` 

271 The retrieved artifact, converted to a Python object according to the 

272 `StorageClass` specified in ``ref``. 

273 """ 

274 refStorageClass = ref.datasetType.storageClass 

275 refComponent = ref.datasetType.component() 

276 # Create mapping from component name to related info 

277 allComponents = {i.component: i for i in allGetInfo} 

278 

279 # By definition the dataset is disassembled if we have more 

280 # than one record for it. 

281 isDisassembled = len(allGetInfo) > 1 

282 

283 # Look for the special case where we are disassembled but the 

284 # component is a derived component that was not written during 

285 # disassembly. For this scenario we need to check that the 

286 # component requested is listed as a derived component for the 

287 # composite storage class 

288 isDisassembledReadOnlyComponent = False 

289 if isDisassembled and refComponent: 

290 # The composite storage class should be accessible through 

291 # the component dataset type 

292 compositeStorageClass = ref.datasetType.parentStorageClass 

293 

294 # In the unlikely scenario where the composite storage 

295 # class is not known, we can only assume that this is a 

296 # normal component. If that assumption is wrong then the 

297 # branch below that reads a persisted component will fail 

298 # so there is no need to complain here. 

299 if compositeStorageClass is not None: 

300 isDisassembledReadOnlyComponent = refComponent in compositeStorageClass.derivedComponents 

301 

302 if isDisassembled and not refComponent: 

303 # This was a disassembled dataset spread over multiple files 

304 # and we need to put them all back together again. 

305 # Read into memory and then assemble 

306 

307 # Check that the supplied parameters are suitable for the type read 

308 refStorageClass.validateParameters(parameters) 

309 

310 # We want to keep track of all the parameters that were not used 

311 # by formatters. We assume that if any of the component formatters 

312 # use a parameter that we do not need to apply it again in the 

313 # assembler. 

314 usedParams = set() 

315 

316 components: dict[str, Any] = {} 

317 for getInfo in allGetInfo: 

318 # assemblerParams are parameters not understood by the 

319 # associated formatter. 

320 usedParams.update(set(getInfo.formatterParams)) 

321 

322 component = getInfo.component 

323 

324 if component is None: 

325 raise RuntimeError(f"Internal error in datastore assembly of {ref}") 

326 

327 # We do not want the formatter to think it's reading 

328 # a component though because it is really reading a 

329 # standalone dataset -- always tell reader it is not a 

330 # component. 

331 components[component] = _read_artifact_into_memory( 

332 getInfo, ref.makeComponentRef(component), cache_manager, isComponent=False 

333 ) 

334 

335 inMemoryDataset = ref.datasetType.storageClass.delegate().assemble(components) 

336 

337 # Any unused parameters will have to be passed to the assembler 

338 if parameters: 

339 unusedParams = {k: v for k, v in parameters.items() if k not in usedParams} 

340 else: 

341 unusedParams = {} 

342 

343 # Process parameters 

344 return ref.datasetType.storageClass.delegate().handleParameters( 

345 inMemoryDataset, parameters=unusedParams 

346 ) 

347 

348 elif isDisassembledReadOnlyComponent: 

349 compositeStorageClass = ref.datasetType.parentStorageClass 

350 if compositeStorageClass is None: 

351 raise RuntimeError( 

352 f"Unable to retrieve derived component '{refComponent}' since" 

353 "no composite storage class is available." 

354 ) 

355 

356 if refComponent is None: 

357 # Mainly for mypy 

358 raise RuntimeError("Internal error in datastore: component can not be None here") 

359 

360 # Assume that every derived component can be calculated by 

361 # forwarding the request to a single read/write component. 

362 # Rather than guessing which rw component is the right one by 

363 # scanning each for a derived component of the same name, 

364 # we ask the storage class delegate directly which one is best to 

365 # use. 

366 compositeDelegate = compositeStorageClass.delegate() 

367 forwardedComponent = compositeDelegate.selectResponsibleComponent(refComponent, set(allComponents)) 

368 

369 # Select the relevant component 

370 rwInfo = allComponents[forwardedComponent] 

371 

372 # For now assume that read parameters are validated against 

373 # the real component and not the requested component 

374 forwardedStorageClass = rwInfo.formatter.file_descriptor.readStorageClass 

375 forwardedStorageClass.validateParameters(parameters) 

376 

377 # Unfortunately the FileDescriptor inside the formatter will have 

378 # the wrong write storage class so we need to create a new one 

379 # given the immutability constraint. 

380 writeStorageClass = rwInfo.info.storageClass 

381 

382 # We may need to put some thought into parameters for read 

383 # components but for now forward them on as is 

384 readFormatter = type(rwInfo.formatter)( 

385 FileDescriptor( 

386 rwInfo.location, 

387 readStorageClass=refStorageClass, 

388 storageClass=writeStorageClass, 

389 parameters=parameters, 

390 component=forwardedComponent, 

391 ), 

392 dataId=ref.dataId, 

393 ref=ref, 

394 ) 

395 

396 # The assembler can not receive any parameter requests for a 

397 # derived component at this time since the assembler will 

398 # see the storage class of the derived component and those 

399 # parameters will have to be handled by the formatter on the 

400 # forwarded storage class. 

401 assemblerParams: dict[str, Any] = {} 

402 

403 # Need to created a new info that specifies the derived 

404 # component and associated storage class 

405 readInfo = DatastoreFileGetInformation( 

406 rwInfo.location, 

407 readFormatter, 

408 rwInfo.info, 

409 assemblerParams, 

410 {}, 

411 refComponent, 

412 refStorageClass, 

413 ) 

414 

415 return _read_artifact_into_memory(readInfo, ref, cache_manager, isComponent=True) 

416 

417 else: 

418 # Single file request or component from that composite file 

419 for lookup in (refComponent, None): 

420 if lookup in allComponents: 

421 getInfo = allComponents[lookup] 

422 break 

423 else: 

424 raise FileNotFoundError(f"Component {refComponent} not found for ref {ref} in datastore") 

425 

426 # Do not need the component itself if already disassembled 

427 if isDisassembled: 

428 isComponent = False 

429 else: 

430 isComponent = getInfo.component is not None 

431 

432 # For a disassembled component we can validate parameters against 

433 # the component storage class directly 

434 if isDisassembled: 

435 refStorageClass.validateParameters(parameters) 

436 else: 

437 # For an assembled composite this could be a derived 

438 # component derived from a real component. The validity 

439 # of the parameters is not clear. For now validate against 

440 # the composite storage class 

441 getInfo.formatter.file_descriptor.storageClass.validateParameters(parameters) 

442 

443 return _read_artifact_into_memory(getInfo, ref, cache_manager, isComponent=isComponent)