Coverage for python / lsst / daf / butler / datastores / file_datastore / retrieve_artifacts.py: 28%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "ArtifactIndexInfo", 

32 "ZipIndex", 

33 "determine_destination_for_retrieved_artifact", 

34 "retrieve_and_zip", 

35 "unpack_zips", 

36) 

37 

38import logging 

39import tempfile 

40import uuid 

41import zipfile 

42from collections.abc import Iterable 

43from typing import ClassVar, Literal, Protocol, Self 

44 

45from pydantic import BaseModel 

46 

47from lsst.daf.butler import ( 

48 DatasetIdFactory, 

49 DatasetRef, 

50 SerializedDatasetRefContainers, 

51 SerializedDatasetRefContainerV1, 

52) 

53from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo 

54from lsst.resources import ResourcePath, ResourcePathExpression 

55 

56_LOG = logging.getLogger(__name__) 

57 

58 

59class ArtifactIndexInfo(BaseModel): 

60 """Information related to an artifact in an index.""" 

61 

62 info: SerializedStoredFileInfo 

63 """Datastore record information for this file artifact.""" 

64 

65 ids: set[uuid.UUID] 

66 """Dataset IDs for this artifact.""" 

67 

68 def append(self, id_: uuid.UUID) -> None: 

69 """Add an additional dataset ID. 

70 

71 Parameters 

72 ---------- 

73 id_ : `uuid.UUID` 

74 Additional dataset ID to associate with this artifact. 

75 """ 

76 self.ids.add(id_) 

77 

78 @classmethod 

79 def from_single(cls, info: SerializedStoredFileInfo, id_: uuid.UUID) -> Self: 

80 """Create a mapping from a single ID and info. 

81 

82 Parameters 

83 ---------- 

84 info : `SerializedStoredFileInfo` 

85 Datastore record for this artifact. 

86 id_ : `uuid.UUID` 

87 First dataset ID to associate with this artifact. 

88 """ 

89 return cls(info=info, ids=[id_]) 

90 

91 def subset(self, ids: Iterable[uuid.UUID]) -> Self: 

92 """Replace the IDs with a subset of the IDs and return a new instance. 

93 

94 Parameters 

95 ---------- 

96 ids : `~collections.abc.Iterable` [ `uuid.UUID` ] 

97 Subset of IDs to keep. 

98 

99 Returns 

100 ------- 

101 subsetted : `ArtifactIndexInfo` 

102 New instance with the requested subset. 

103 

104 Raises 

105 ------ 

106 ValueError 

107 Raised if the given IDs is not a subset of the current IDs. 

108 """ 

109 subset = set(ids) 

110 if subset - self.ids: 

111 raise ValueError(f"Given subset of {subset} is not a subset of {self.ids}") 

112 return type(self)(ids=subset, info=self.info.model_copy()) 

113 

114 

115class ZipIndex(BaseModel): 

116 """Index of a Zip file of Butler datasets. 

117 

118 A file can be associated with multiple butler datasets and a single 

119 butler dataset can be associated with multiple files. This model 

120 provides the necessary information for ingesting back into a Butler 

121 file datastore. 

122 """ 

123 

124 index_version: Literal["V1"] = "V1" 

125 

126 refs: SerializedDatasetRefContainers 

127 """Deduplicated information for all the `DatasetRef` in the index.""" 

128 

129 artifact_map: dict[str, ArtifactIndexInfo] 

130 """Mapping of each Zip member to associated lookup information.""" 

131 

132 index_name: ClassVar[str] = "_butler_zip_index.json" 

133 """Name to use when saving the index to a file.""" 

134 

135 def generate_uuid5(self) -> uuid.UUID: 

136 """Create a UUID based on the Zip index. 

137 

138 Returns 

139 ------- 

140 id_ : `uuid.UUID` 

141 A UUID5 created from the paths inside the Zip file. Guarantees 

142 that if the Zip file is regenerated with exactly the same file 

143 paths the same answer will be returned. 

144 """ 

145 # Options are: 

146 # - uuid5 based on file paths in zip 

147 # - uuid5 based on ref uuids. 

148 # - checksum derived from the above. 

149 # - uuid5 from file paths and dataset refs. 

150 # Do not attempt to include file contents in UUID. 

151 # Start with uuid5 from file paths. 

152 data = ",".join(sorted(self.artifact_map.keys())) 

153 # No need to come up with a different namespace. 

154 return uuid.uuid5(DatasetIdFactory.NS_UUID, data) 

155 

156 def __len__(self) -> int: 

157 """Return the number of files in the Zip.""" 

158 return len(self.artifact_map) 

159 

160 def calculate_zip_file_name(self) -> str: 

161 """Calculate the default name for the Zip file based on the index 

162 contents. 

163 

164 Returns 

165 ------- 

166 name : `str` 

167 Name of the zip file based on index. 

168 """ 

169 return f"{self.generate_uuid5()}.zip" 

170 

171 def calculate_zip_file_path_in_store(self) -> str: 

172 """Calculate the relative path inside a datastore that should be 

173 used for this Zip file. 

174 

175 Returns 

176 ------- 

177 path_in_store : `str` 

178 Relative path to use for Zip file in datastore. 

179 """ 

180 zip_name = self.calculate_zip_file_name() 

181 return f"zips/{zip_name[:4]}/{zip_name}" 

182 

183 def write_index(self, dir: ResourcePath) -> ResourcePath: 

184 """Write the index to the specified directory. 

185 

186 Parameters 

187 ---------- 

188 dir : `~lsst.resources.ResourcePath` 

189 Directory to write the index file to. 

190 

191 Returns 

192 ------- 

193 index_path : `~lsst.resources.ResourcePath` 

194 Path to the index file that was written. 

195 """ 

196 index_path = dir.join(self.index_name, forceDirectory=False) 

197 with index_path.open("w") as fd: 

198 # Need to include unset/default values so that the version 

199 # discriminator field for refs container appears in the 

200 # serialization. 

201 print(self.model_dump_json(), file=fd) 

202 return index_path 

203 

204 @classmethod 

205 def calc_relative_paths( 

206 cls, root: ResourcePath, paths: Iterable[ResourcePath] 

207 ) -> dict[ResourcePath, str]: 

208 """Calculate the path to use inside the Zip file from the full path. 

209 

210 Parameters 

211 ---------- 

212 root : `lsst.resources.ResourcePath` 

213 The reference root directory. 

214 paths : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ] 

215 The paths to the files that should be included in the Zip file. 

216 

217 Returns 

218 ------- 

219 abs_to_rel : `dict` [ `~lsst.resources.ResourcePath`, `str` ] 

220 Mapping of the original file path to the relative path to use 

221 in Zip file. 

222 """ 

223 file_to_relative: dict[ResourcePath, str] = {} 

224 for p in paths: 

225 # It is an error if there is no relative path. 

226 rel = p.relative_to(root) 

227 if rel is None: 

228 raise RuntimeError(f"Unexepectedly unable to calculate relative path of {p} to {root}.") 

229 file_to_relative[p] = rel 

230 return file_to_relative 

231 

232 @classmethod 

233 def from_artifact_map( 

234 cls, 

235 refs: Iterable[DatasetRef], 

236 artifact_map: dict[ResourcePath, ArtifactIndexInfo], 

237 root: ResourcePath, 

238 ) -> Self: 

239 """Create an index from the mappings returned from 

240 `Datastore.retrieveArtifacts`. 

241 

242 Parameters 

243 ---------- 

244 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

245 Datasets present in the index. 

246 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ 

247 `ArtifactIndexInfo` ] 

248 Mapping of artifact path to information linking it to the 

249 associated refs and datastore information. 

250 root : `lsst.resources.ResourcePath` 

251 Root path to be removed from all the paths before creating the 

252 index. 

253 """ 

254 if not refs: 

255 return cls( 

256 refs=SerializedDatasetRefContainerV1.from_refs(refs), 

257 artifact_map={}, 

258 ) 

259 

260 # Calculate the paths relative to the given root since the Zip file 

261 # uses relative paths. 

262 file_to_relative = cls.calc_relative_paths(root, artifact_map.keys()) 

263 

264 simplified_refs = SerializedDatasetRefContainerV1.from_refs(refs) 

265 

266 # Convert the artifact mapping to relative path. 

267 relative_artifact_map = {file_to_relative[path]: info for path, info in artifact_map.items()} 

268 

269 return cls( 

270 refs=simplified_refs, 

271 artifact_map=relative_artifact_map, 

272 ) 

273 

274 @classmethod 

275 def from_zip_file(cls, zip_path: ResourcePath) -> Self: 

276 """Given a path to a Zip file return the index. 

277 

278 Parameters 

279 ---------- 

280 zip_path : `lsst.resources.ResourcePath` 

281 Path to the Zip file. 

282 """ 

283 with zip_path.open("rb") as fd, zipfile.ZipFile(fd) as zf: 

284 return cls.from_open_zip(zf) 

285 

286 @classmethod 

287 def from_open_zip(cls, zf: zipfile.ZipFile) -> Self: 

288 json_data = zf.read(cls.index_name) 

289 return cls.model_validate_json(json_data) 

290 

291 

292def determine_destination_for_retrieved_artifact( 

293 destination_directory: ResourcePath, source_path: ResourcePath, preserve_path: bool, prefix: str = "" 

294) -> ResourcePath: 

295 """Determine destination path for an artifact retrieved from a datastore. 

296 

297 Parameters 

298 ---------- 

299 destination_directory : `lsst.resources.ResourcePath` 

300 Path to the output directory where file will be stored. 

301 source_path : `lsst.resources.ResourcePath` 

302 Path to the source file to be transferred. This may be relative to the 

303 datastore root, or an absolute path. 

304 preserve_path : `bool` 

305 If `True` the full path of the artifact within the datastore 

306 is preserved. If `False` the final file component of the path 

307 is used. 

308 prefix : `str`, optional 

309 Prefix to add to the file name if ``preserve_path`` is `False`. 

310 

311 Returns 

312 ------- 

313 destination_uri : `~lsst.resources.ResourcePath` 

314 Absolute path to the output destination. 

315 """ 

316 destination_directory = destination_directory.abspath() 

317 

318 target_path: ResourcePathExpression 

319 if preserve_path: 

320 target_path = source_path 

321 if target_path.isabs(): 

322 # This is an absolute path to an external file. 

323 # Use the full path. 

324 target_path = target_path.relativeToPathRoot 

325 else: 

326 target_path = source_path.basename() 

327 if prefix: 

328 target_path = prefix + target_path 

329 

330 target_uri = destination_directory.join(target_path).abspath() 

331 if target_uri.relative_to(destination_directory) is None: 

332 raise ValueError(f"File path attempts to escape destination directory: '{source_path}'") 

333 return target_uri 

334 

335 

336class RetrievalCallable(Protocol): 

337 def __call__( 337 ↛ exitline 337 didn't return from function '__call__' because

338 self, 

339 refs: Iterable[DatasetRef], 

340 destination: ResourcePath, 

341 transfer: str, 

342 preserve_path: bool, 

343 overwrite: bool, 

344 write_index: bool, 

345 add_prefix: bool, 

346 ) -> dict[ResourcePath, ArtifactIndexInfo]: ... 

347 

348 

349def retrieve_and_zip( 

350 refs: Iterable[DatasetRef], 

351 destination: ResourcePathExpression, 

352 retrieval_callback: RetrievalCallable, 

353 overwrite: bool = True, 

354) -> ResourcePath: 

355 """Retrieve artifacts from a Butler and place in ZIP file. 

356 

357 Parameters 

358 ---------- 

359 refs : `collections.abc.Iterable` [ `DatasetRef` ] 

360 The datasets to be included in the zip file. Must all be from 

361 the same dataset type. 

362 destination : `lsst.resources.ResourcePath` 

363 Directory to write the new ZIP file. This directory will 

364 also be used as a staging area for the datasets being downloaded 

365 from the datastore. 

366 retrieval_callback : `~collections.abc.Callable` 

367 Bound method for a function that can retrieve the artifacts and 

368 return the metadata necessary for creating the zip index. For example 

369 `lsst.daf.butler.datastore.Datastore.retrieveArtifacts`. 

370 overwrite : `bool`, optional 

371 If `False` the output Zip will not be written if a file of the 

372 same name is already present in ``destination``. 

373 

374 Returns 

375 ------- 

376 zip_file : `lsst.resources.ResourcePath` 

377 The path to the new ZIP file. 

378 

379 Raises 

380 ------ 

381 ValueError 

382 Raised if there are no refs to retrieve. 

383 """ 

384 if not refs: 

385 raise ValueError("Requested Zip file with no contents.") 

386 

387 outdir = ResourcePath(destination, forceDirectory=True) 

388 if not outdir.isdir(): 

389 raise ValueError(f"Destination location must refer to a directory. Given {destination}") 

390 

391 if not outdir.exists(): 

392 outdir.mkdir() 

393 

394 # Simplest approach: 

395 # - create temp dir in destination 

396 # - Run retrieveArtifacts to that temp dir 

397 # - Create zip file with unique name. 

398 # - Delete temp dir 

399 # - Add index file to ZIP. 

400 # - Return name of zip file. 

401 with tempfile.TemporaryDirectory(dir=outdir.ospath, ignore_cleanup_errors=True) as tmpdir: 

402 tmpdir_path = ResourcePath(tmpdir, forceDirectory=True) 

403 # Retrieve the artifacts and write the index file. Strip paths. 

404 artifact_map = retrieval_callback( 

405 refs=refs, 

406 destination=tmpdir_path, 

407 transfer="auto", 

408 preserve_path=False, 

409 overwrite=False, 

410 write_index=True, 

411 add_prefix=True, 

412 ) 

413 

414 # Read the index to construct file name. 

415 index_path = tmpdir_path.join(ZipIndex.index_name, forceDirectory=False) 

416 index_json = index_path.read() 

417 index = ZipIndex.model_validate_json(index_json) 

418 

419 # Use unique name based on files in Zip. 

420 zip_file_name = index.calculate_zip_file_name() 

421 zip_path = outdir.join(zip_file_name, forceDirectory=False) 

422 if not overwrite and zip_path.exists(): 

423 raise FileExistsError(f"Output Zip at {zip_path} already exists but cannot overwrite.") 

424 with zipfile.ZipFile(zip_path.ospath, "w") as zip: 

425 zip.write(index_path.ospath, index_path.basename(), compress_type=zipfile.ZIP_DEFLATED) 

426 for path, name in index.calc_relative_paths(tmpdir_path, list(artifact_map)).items(): 

427 zip.write(path.ospath, name) 

428 

429 return zip_path 

430 

431 

432def unpack_zips( 

433 zips_to_transfer: Iterable[ResourcePath], 

434 allowed_ids: set[uuid.UUID], 

435 destination: ResourcePath, 

436 preserve_path: bool, 

437 overwrite: bool, 

438) -> dict[ResourcePath, ArtifactIndexInfo]: 

439 """Transfer the Zip files and unpack them in the destination directory. 

440 

441 Parameters 

442 ---------- 

443 zips_to_transfer : `~collections.abc.Iterable` \ 

444 [ `~lsst.resources.ResourcePath` ] 

445 Paths to Zip files to unpack. These must be Zip files that include 

446 the index information and were created by the Butler. 

447 allowed_ids : `set` [ `uuid.UUID` ] 

448 All the possible dataset IDs for which artifacts should be extracted 

449 from the Zip file. If an ID in the Zip file is not present in this 

450 list the artifact will not be extracted from the Zip. 

451 destination : `~lsst.resources.ResourcePath` 

452 Output destination for the Zip contents. 

453 preserve_path : `bool` 

454 Whether to include subdirectories during extraction. If `True` a 

455 directory will be made per Zip. 

456 overwrite : `bool`, optional 

457 If `True` allow transfers to overwrite existing files at the 

458 destination. 

459 

460 Returns 

461 ------- 

462 artifact_map : `dict` \ 

463 [ `~lsst.resources.ResourcePath`, `ArtifactIndexInfo` ] 

464 Path linking Zip contents location to associated artifact information. 

465 """ 

466 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} 

467 for source_uri in zips_to_transfer: 

468 _LOG.debug("Unpacking zip file %s", source_uri) 

469 # Assume that downloading to temporary location is more efficient 

470 # than trying to read the contents remotely. 

471 with ResourcePath.temporary_uri(suffix=".zip") as temp: 

472 temp.transfer_from(source_uri, transfer="auto") 

473 index = ZipIndex.from_zip_file(temp) 

474 

475 if preserve_path: 

476 subdir = ResourcePath( 

477 index.calculate_zip_file_path_in_store(), forceDirectory=False, forceAbsolute=False 

478 ).dirname() 

479 outdir = destination.join(subdir) 

480 else: 

481 outdir = destination 

482 outdir.mkdir() 

483 with temp.open("rb") as fd, zipfile.ZipFile(fd) as zf: 

484 for path_in_zip, artifact_info in index.artifact_map.items(): 

485 # Skip if this specific dataset ref is not requested. 

486 included_ids = artifact_info.ids & allowed_ids 

487 if included_ids: 

488 # Do not apply a new prefix since the zip file 

489 # should already have a prefix. 

490 output_path = outdir.join(path_in_zip, forceDirectory=False) 

491 if not overwrite and output_path.exists(): 

492 raise FileExistsError( 

493 f"Destination path '{output_path}' already exists. " 

494 "Extraction from Zip cannot be completed." 

495 ) 

496 zf.extract(path_in_zip, path=outdir.ospath) 

497 artifact_map[output_path] = artifact_info.subset(included_ids) 

498 return artifact_map