Coverage for python / lsst / daf / butler / datastore / stored_file_info.py: 40%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("SerializedStoredFileInfo", "StoredDatastoreItemInfo", "StoredFileInfo") 

31 

32import inspect 

33from collections.abc import Iterable, Mapping 

34from dataclasses import dataclass 

35from typing import TYPE_CHECKING, Any 

36 

37import pydantic 

38 

39from lsst.resources import ResourcePath 

40from lsst.utils import doImportType 

41from lsst.utils.introspection import get_full_type_name 

42 

43from .._formatter import Formatter, FormatterParameter, FormatterV2 

44from .._location import Location, LocationFactory 

45from .._storage_class import StorageClass, StorageClassFactory 

46 

47if TYPE_CHECKING: 

48 from .._dataset_ref import DatasetRef 

49 

50# String to use when a Python None is encountered 

51NULLSTR = "__NULL_STRING__" 

52 

53 

54class StoredDatastoreItemInfo: 

55 """Internal information associated with a stored dataset in a `Datastore`. 

56 

57 This is an empty base class. Datastore implementations are expected to 

58 write their own subclasses. 

59 """ 

60 

61 __slots__ = () 

62 

63 def file_location(self, factory: LocationFactory) -> Location: 

64 """Return the location of artifact. 

65 

66 Parameters 

67 ---------- 

68 factory : `LocationFactory` 

69 Factory relevant to the datastore represented by this item. 

70 

71 Returns 

72 ------- 

73 location : `Location` 

74 The location of the item within this datastore. 

75 """ 

76 raise NotImplementedError("The base class does not know how to locate an item in a datastore.") 

77 

78 @classmethod 

79 def from_record(cls: type[StoredDatastoreItemInfo], record: Mapping[str, Any]) -> StoredDatastoreItemInfo: 

80 """Create instance from database record. 

81 

82 Parameters 

83 ---------- 

84 record : `dict` 

85 The record associated with this item. 

86 

87 Returns 

88 ------- 

89 info : `StoredDatastoreItemInfo` 

90 The newly-constructed item corresponding to the record. 

91 """ 

92 raise NotImplementedError() 

93 

94 def to_record(self, **kwargs: Any) -> dict[str, Any]: 

95 """Convert record contents to a dictionary. 

96 

97 Parameters 

98 ---------- 

99 **kwargs 

100 Additional items to add to returned record. 

101 """ 

102 raise NotImplementedError() 

103 

104 def update(self, **kwargs: Any) -> StoredDatastoreItemInfo: 

105 """Create a new class with everything retained apart from the 

106 specified values. 

107 

108 Parameters 

109 ---------- 

110 **kwargs : `~collections.abc.Mapping` 

111 Values to override. 

112 

113 Returns 

114 ------- 

115 updated : `StoredDatastoreItemInfo` 

116 A new instance of the object with updated values. 

117 """ 

118 raise NotImplementedError() 

119 

120 @classmethod 

121 def to_records( 

122 cls, records: Iterable[StoredDatastoreItemInfo], **kwargs: Any 

123 ) -> tuple[str, Iterable[Mapping[str, Any]]]: 

124 """Convert a collection of records to dictionaries. 

125 

126 Parameters 

127 ---------- 

128 records : `~collections.abc.Iterable` [ `StoredDatastoreItemInfo` ] 

129 A collection of records, all records must be of the same type. 

130 **kwargs 

131 Additional items to add to each returned record. 

132 

133 Returns 

134 ------- 

135 class_name : `str` 

136 Name of the record class. 

137 records : `list` [ `dict` ] 

138 Records in their dictionary representation. 

139 """ 

140 if not records: 

141 return "", [] 

142 classes = {record.__class__ for record in records} 

143 assert len(classes) == 1, f"Records have to be of the same class: {classes}" 

144 return get_full_type_name(classes.pop()), [record.to_record(**kwargs) for record in records] 

145 

146 @classmethod 

147 def from_records( 

148 cls, class_name: str, records: Iterable[Mapping[str, Any]] 

149 ) -> list[StoredDatastoreItemInfo]: 

150 """Convert collection of dictionaries to records. 

151 

152 Parameters 

153 ---------- 

154 class_name : `str` 

155 Name of the record class. 

156 records : `~collections.abc.Iterable` [ `dict` ] 

157 Records in their dictionary representation. 

158 

159 Returns 

160 ------- 

161 infos : `list` [`StoredDatastoreItemInfo`] 

162 Sequence of records converted to typed representation. 

163 

164 Raises 

165 ------ 

166 TypeError 

167 Raised if ``class_name`` is not a sub-class of 

168 `StoredDatastoreItemInfo`. 

169 """ 

170 try: 

171 klass = doImportType(class_name) 

172 except ImportError: 

173 # Prior to DM-41043 we were embedding a lsst.daf.butler.core 

174 # path in the serialized form, which we never wanted; fix this 

175 # one case. 

176 if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo": 

177 klass = StoredFileInfo 

178 else: 

179 raise 

180 if not issubclass(klass, StoredDatastoreItemInfo): 

181 raise TypeError(f"Class {class_name} is not a subclass of StoredDatastoreItemInfo") 

182 return [klass.from_record(record) for record in records] 

183 

184 

185@dataclass(frozen=True, slots=True) 

186class StoredFileInfo(StoredDatastoreItemInfo): 

187 """Datastore-private metadata associated with a Datastore file. 

188 

189 Parameters 

190 ---------- 

191 formatter : `Formatter` or `FormatterV2` or `str` 

192 The formatter to use for this dataset. 

193 path : `str` 

194 Path to the artifact associated with this dataset. 

195 storageClass : `StorageClass` or `None` 

196 The storage class associated with this dataset. If `None`, 

197 ``storage_class_name`` must be provided as a keyword argument. 

198 component : `str` or `None`, optional 

199 The component if disassembled. 

200 checksum : `str` or `None`, optional 

201 The checksum of the artifact. 

202 file_size : `int` 

203 The size of the file in bytes. -1 indicates the size is not known. 

204 storage_class_name : `str`, optional 

205 Name of the storage class. This may be passed instead of 

206 ``storageClass`` to defer loading storage class definitions (e.g. if a 

207 butler configuration may not have been loaded yet). Note that 

208 ``storageClass=None`` must be passed explicitly (for backward 

209 compatibility, it remains a positional argument with no default). 

210 """ 

211 

212 def __init__( 

213 self, 

214 formatter: FormatterParameter, 

215 path: str, 

216 storageClass: StorageClass | None, 

217 component: str | None, 

218 checksum: str | None, 

219 file_size: int, 

220 *, 

221 storage_class_name: str | None = None, 

222 ): 

223 # Use these shenanigans to allow us to use a frozen dataclass 

224 object.__setattr__(self, "path", path) 

225 if storageClass is not None: 

226 object.__setattr__(self, "storage_class_name", storageClass.name) 

227 else: 

228 if storage_class_name is None: 

229 raise TypeError("At least one of 'storageClass' and 'storage_class_name' must be provided.") 

230 object.__setattr__(self, "storage_class_name", storage_class_name) 

231 object.__setattr__(self, "component", component) 

232 object.__setattr__(self, "checksum", checksum) 

233 object.__setattr__(self, "file_size", file_size) 

234 

235 if isinstance(formatter, str): 

236 # We trust that this string refers to a Formatter 

237 formatterStr = formatter 

238 elif isinstance(formatter, Formatter | FormatterV2) or ( 

239 inspect.isclass(formatter) and issubclass(formatter, Formatter | FormatterV2) 

240 ): 

241 formatterStr = formatter.name() 

242 else: 

243 raise TypeError(f"Supplied formatter '{formatter}' is not a Formatter") 

244 object.__setattr__(self, "formatter", formatterStr) 

245 

246 formatter: str 

247 """Fully-qualified name of Formatter. If a Formatter class or instance 

248 is given the name will be extracted.""" 

249 

250 path: str 

251 """Path to dataset within Datastore.""" 

252 

253 storage_class_name: str 

254 """Name of the storage class associated with this dataset.""" 

255 

256 component: str | None 

257 """Component associated with this file. Can be `None` if the file does 

258 not refer to a component of a composite.""" 

259 

260 checksum: str | None 

261 """Checksum of the serialized dataset.""" 

262 

263 file_size: int 

264 """Size of the serialized dataset in bytes.""" 

265 

266 @property 

267 def storageClass(self) -> StorageClass: 

268 """Storage class associated with this dataset.""" 

269 return StorageClassFactory().getStorageClass(self.storage_class_name) 

270 

271 def rebase(self, ref: DatasetRef) -> StoredFileInfo: 

272 """Return a copy of the record suitable for a specified reference. 

273 

274 Parameters 

275 ---------- 

276 ref : `DatasetRef` 

277 DatasetRef which provides component name and dataset ID for the 

278 new returned record. 

279 

280 Returns 

281 ------- 

282 record : `StoredFileInfo` 

283 New record instance. 

284 """ 

285 # take component from the ref, rest comes from self 

286 component = ref.datasetType.component() 

287 if component is None: 

288 component = self.component 

289 return self.update(component=component) 

290 

291 def to_record(self, **kwargs: Any) -> dict[str, Any]: 

292 """Convert the supplied ref to a database record. 

293 

294 Parameters 

295 ---------- 

296 **kwargs : `typing.Any` 

297 Additional information to be added to the record. 

298 """ 

299 component = self.component 

300 if component is None: 

301 # Use empty string since we want this to be part of the 

302 # primary key. 

303 component = NULLSTR 

304 return dict( 

305 formatter=self.formatter, 

306 path=self.path, 

307 storage_class=self.storage_class_name, 

308 component=component, 

309 checksum=self.checksum, 

310 file_size=self.file_size, 

311 **kwargs, 

312 ) 

313 

314 def to_simple(self) -> SerializedStoredFileInfo: 

315 record = self.to_record() 

316 # We allow None on the model but the record contains a "null string" 

317 # instead 

318 record["component"] = self.component 

319 return SerializedStoredFileInfo.model_validate(record) 

320 

321 def file_location(self, factory: LocationFactory) -> Location: 

322 """Return the location of artifact. 

323 

324 Parameters 

325 ---------- 

326 factory : `LocationFactory` 

327 Factory relevant to the datastore represented by this item. 

328 

329 Returns 

330 ------- 

331 location : `Location` 

332 The location of the item within this datastore. 

333 """ 

334 uriInStore = ResourcePath(self.path, forceAbsolute=False, forceDirectory=False) 

335 if uriInStore.isabs(): 

336 location = Location(None, uriInStore) 

337 else: 

338 location = factory.from_uri(uriInStore, trusted_path=True) 

339 return location 

340 

341 @classmethod 

342 def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredFileInfo: 

343 """Create instance from database record. 

344 

345 Parameters 

346 ---------- 

347 record : `dict` 

348 The record associated with this item. 

349 

350 Returns 

351 ------- 

352 info : `StoredFileInfo` 

353 The newly-constructed item corresponding to the record. 

354 """ 

355 # Convert name of StorageClass to instance 

356 component = record["component"] if (record["component"] and record["component"] != NULLSTR) else None 

357 info = cls( 

358 formatter=record["formatter"], 

359 path=record["path"], 

360 storageClass=None, 

361 storage_class_name=record["storage_class"], 

362 component=component, 

363 checksum=record["checksum"], 

364 file_size=record["file_size"], 

365 ) 

366 return info 

367 

368 @classmethod 

369 def from_simple(cls: type[StoredFileInfo], model: SerializedStoredFileInfo) -> StoredFileInfo: 

370 return cls.from_record(dict(model)) 

371 

372 def update(self, **kwargs: Any) -> StoredFileInfo: 

373 new_args: dict[str, Any] = {"storageClass": None} # so `storage_class_name` can be passed. 

374 for k in self.__slots__: 

375 if k in kwargs: 

376 new_args[k] = kwargs.pop(k) 

377 else: 

378 new_args[k] = getattr(self, k) 

379 if kwargs: 

380 raise ValueError(f"Unexpected keyword arguments for update: {', '.join(kwargs)}") 

381 return type(self)(**new_args) 

382 

383 def __reduce__(self) -> str | tuple[Any, ...]: 

384 return (self.from_record, (self.to_record(),)) 

385 

386 @property 

387 def artifact_path(self) -> str: 

388 """Path to dataset as stored in Datastore with fragments removed.""" 

389 if "#" in self.path: 

390 return self.path[: self.path.rfind("#")] 

391 return self.path 

392 

393 

394class SerializedStoredFileInfo(pydantic.BaseModel): 

395 """Serialized representation of `StoredFileInfo` properties.""" 

396 

397 formatter: str 

398 """Fully-qualified name of Formatter.""" 

399 

400 path: str 

401 """Path to dataset within Datastore.""" 

402 

403 storage_class: str 

404 """Name of the StorageClass associated with Dataset.""" 

405 

406 component: str | None = None 

407 """Component associated with this file. Can be `None` if the file does 

408 not refer to a component of a composite.""" 

409 

410 checksum: str | None = None 

411 """Checksum of the serialized dataset.""" 

412 

413 file_size: int 

414 """Size of the serialized dataset in bytes.""" 

415 

416 

417def make_datastore_path_relative(path: str) -> str: 

418 """Normalize a path from a `StoredFileInfo` object so 

419 that it is always relative. 

420 

421 Parameters 

422 ---------- 

423 path : `str` 

424 The file path from a `StoredFileInfo`. 

425 

426 Returns 

427 ------- 

428 normalized_path : `str` 

429 The original path, if it was relative. Otherwise, a version of it that 

430 was converted to a relative path, stripping URI scheme and netloc from 

431 it. 

432 """ 

433 # Force the datastore file path sent to the client to be relative, since 

434 # absolute URLs in the server will generally not be reachable by the 

435 # client. If an absolute URL is sent, it (or a portion of it) can end up 

436 # baked into the FileDatastore that is the target of the transfer in some 

437 # cases. 

438 rpath = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

439 if rpath.isabs(): 

440 relative = rpath.relativeToPathRoot 

441 if rpath.fragment: 

442 # Preserve the fragment, since this used to indicate special 

443 # processing like zip extraction. 

444 return f"{relative}#{rpath.fragment}" 

445 else: 

446 return relative 

447 else: 

448 return path