Coverage for python / lsst / daf / butler / datastores / file_datastore / retrieve_artifacts.py: 28%
136 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "ArtifactIndexInfo",
32 "ZipIndex",
33 "determine_destination_for_retrieved_artifact",
34 "retrieve_and_zip",
35 "unpack_zips",
36)
38import logging
39import tempfile
40import uuid
41import zipfile
42from collections.abc import Iterable
43from typing import ClassVar, Literal, Protocol, Self
45from pydantic import BaseModel
47from lsst.daf.butler import (
48 DatasetIdFactory,
49 DatasetRef,
50 SerializedDatasetRefContainers,
51 SerializedDatasetRefContainerV1,
52)
53from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo
54from lsst.resources import ResourcePath, ResourcePathExpression
56_LOG = logging.getLogger(__name__)
59class ArtifactIndexInfo(BaseModel):
60 """Information related to an artifact in an index."""
62 info: SerializedStoredFileInfo
63 """Datastore record information for this file artifact."""
65 ids: set[uuid.UUID]
66 """Dataset IDs for this artifact."""
68 def append(self, id_: uuid.UUID) -> None:
69 """Add an additional dataset ID.
71 Parameters
72 ----------
73 id_ : `uuid.UUID`
74 Additional dataset ID to associate with this artifact.
75 """
76 self.ids.add(id_)
78 @classmethod
79 def from_single(cls, info: SerializedStoredFileInfo, id_: uuid.UUID) -> Self:
80 """Create a mapping from a single ID and info.
82 Parameters
83 ----------
84 info : `SerializedStoredFileInfo`
85 Datastore record for this artifact.
86 id_ : `uuid.UUID`
87 First dataset ID to associate with this artifact.
88 """
89 return cls(info=info, ids=[id_])
91 def subset(self, ids: Iterable[uuid.UUID]) -> Self:
92 """Replace the IDs with a subset of the IDs and return a new instance.
94 Parameters
95 ----------
96 ids : `~collections.abc.Iterable` [ `uuid.UUID` ]
97 Subset of IDs to keep.
99 Returns
100 -------
101 subsetted : `ArtifactIndexInfo`
102 New instance with the requested subset.
104 Raises
105 ------
106 ValueError
107 Raised if the given IDs is not a subset of the current IDs.
108 """
109 subset = set(ids)
110 if subset - self.ids:
111 raise ValueError(f"Given subset of {subset} is not a subset of {self.ids}")
112 return type(self)(ids=subset, info=self.info.model_copy())
115class ZipIndex(BaseModel):
116 """Index of a Zip file of Butler datasets.
118 A file can be associated with multiple butler datasets and a single
119 butler dataset can be associated with multiple files. This model
120 provides the necessary information for ingesting back into a Butler
121 file datastore.
122 """
124 index_version: Literal["V1"] = "V1"
126 refs: SerializedDatasetRefContainers
127 """Deduplicated information for all the `DatasetRef` in the index."""
129 artifact_map: dict[str, ArtifactIndexInfo]
130 """Mapping of each Zip member to associated lookup information."""
132 index_name: ClassVar[str] = "_butler_zip_index.json"
133 """Name to use when saving the index to a file."""
135 def generate_uuid5(self) -> uuid.UUID:
136 """Create a UUID based on the Zip index.
138 Returns
139 -------
140 id_ : `uuid.UUID`
141 A UUID5 created from the paths inside the Zip file. Guarantees
142 that if the Zip file is regenerated with exactly the same file
143 paths the same answer will be returned.
144 """
145 # Options are:
146 # - uuid5 based on file paths in zip
147 # - uuid5 based on ref uuids.
148 # - checksum derived from the above.
149 # - uuid5 from file paths and dataset refs.
150 # Do not attempt to include file contents in UUID.
151 # Start with uuid5 from file paths.
152 data = ",".join(sorted(self.artifact_map.keys()))
153 # No need to come up with a different namespace.
154 return uuid.uuid5(DatasetIdFactory.NS_UUID, data)
156 def __len__(self) -> int:
157 """Return the number of files in the Zip."""
158 return len(self.artifact_map)
160 def calculate_zip_file_name(self) -> str:
161 """Calculate the default name for the Zip file based on the index
162 contents.
164 Returns
165 -------
166 name : `str`
167 Name of the zip file based on index.
168 """
169 return f"{self.generate_uuid5()}.zip"
171 def calculate_zip_file_path_in_store(self) -> str:
172 """Calculate the relative path inside a datastore that should be
173 used for this Zip file.
175 Returns
176 -------
177 path_in_store : `str`
178 Relative path to use for Zip file in datastore.
179 """
180 zip_name = self.calculate_zip_file_name()
181 return f"zips/{zip_name[:4]}/{zip_name}"
183 def write_index(self, dir: ResourcePath) -> ResourcePath:
184 """Write the index to the specified directory.
186 Parameters
187 ----------
188 dir : `~lsst.resources.ResourcePath`
189 Directory to write the index file to.
191 Returns
192 -------
193 index_path : `~lsst.resources.ResourcePath`
194 Path to the index file that was written.
195 """
196 index_path = dir.join(self.index_name, forceDirectory=False)
197 with index_path.open("w") as fd:
198 # Need to include unset/default values so that the version
199 # discriminator field for refs container appears in the
200 # serialization.
201 print(self.model_dump_json(), file=fd)
202 return index_path
204 @classmethod
205 def calc_relative_paths(
206 cls, root: ResourcePath, paths: Iterable[ResourcePath]
207 ) -> dict[ResourcePath, str]:
208 """Calculate the path to use inside the Zip file from the full path.
210 Parameters
211 ----------
212 root : `lsst.resources.ResourcePath`
213 The reference root directory.
214 paths : `~collections.abc.Iterable` [ `lsst.resources.ResourcePath` ]
215 The paths to the files that should be included in the Zip file.
217 Returns
218 -------
219 abs_to_rel : `dict` [ `~lsst.resources.ResourcePath`, `str` ]
220 Mapping of the original file path to the relative path to use
221 in Zip file.
222 """
223 file_to_relative: dict[ResourcePath, str] = {}
224 for p in paths:
225 # It is an error if there is no relative path.
226 rel = p.relative_to(root)
227 if rel is None:
228 raise RuntimeError(f"Unexepectedly unable to calculate relative path of {p} to {root}.")
229 file_to_relative[p] = rel
230 return file_to_relative
232 @classmethod
233 def from_artifact_map(
234 cls,
235 refs: Iterable[DatasetRef],
236 artifact_map: dict[ResourcePath, ArtifactIndexInfo],
237 root: ResourcePath,
238 ) -> Self:
239 """Create an index from the mappings returned from
240 `Datastore.retrieveArtifacts`.
242 Parameters
243 ----------
244 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
245 Datasets present in the index.
246 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \
247 `ArtifactIndexInfo` ]
248 Mapping of artifact path to information linking it to the
249 associated refs and datastore information.
250 root : `lsst.resources.ResourcePath`
251 Root path to be removed from all the paths before creating the
252 index.
253 """
254 if not refs:
255 return cls(
256 refs=SerializedDatasetRefContainerV1.from_refs(refs),
257 artifact_map={},
258 )
260 # Calculate the paths relative to the given root since the Zip file
261 # uses relative paths.
262 file_to_relative = cls.calc_relative_paths(root, artifact_map.keys())
264 simplified_refs = SerializedDatasetRefContainerV1.from_refs(refs)
266 # Convert the artifact mapping to relative path.
267 relative_artifact_map = {file_to_relative[path]: info for path, info in artifact_map.items()}
269 return cls(
270 refs=simplified_refs,
271 artifact_map=relative_artifact_map,
272 )
274 @classmethod
275 def from_zip_file(cls, zip_path: ResourcePath) -> Self:
276 """Given a path to a Zip file return the index.
278 Parameters
279 ----------
280 zip_path : `lsst.resources.ResourcePath`
281 Path to the Zip file.
282 """
283 with zip_path.open("rb") as fd, zipfile.ZipFile(fd) as zf:
284 return cls.from_open_zip(zf)
286 @classmethod
287 def from_open_zip(cls, zf: zipfile.ZipFile) -> Self:
288 json_data = zf.read(cls.index_name)
289 return cls.model_validate_json(json_data)
292def determine_destination_for_retrieved_artifact(
293 destination_directory: ResourcePath, source_path: ResourcePath, preserve_path: bool, prefix: str = ""
294) -> ResourcePath:
295 """Determine destination path for an artifact retrieved from a datastore.
297 Parameters
298 ----------
299 destination_directory : `lsst.resources.ResourcePath`
300 Path to the output directory where file will be stored.
301 source_path : `lsst.resources.ResourcePath`
302 Path to the source file to be transferred. This may be relative to the
303 datastore root, or an absolute path.
304 preserve_path : `bool`
305 If `True` the full path of the artifact within the datastore
306 is preserved. If `False` the final file component of the path
307 is used.
308 prefix : `str`, optional
309 Prefix to add to the file name if ``preserve_path`` is `False`.
311 Returns
312 -------
313 destination_uri : `~lsst.resources.ResourcePath`
314 Absolute path to the output destination.
315 """
316 destination_directory = destination_directory.abspath()
318 target_path: ResourcePathExpression
319 if preserve_path:
320 target_path = source_path
321 if target_path.isabs():
322 # This is an absolute path to an external file.
323 # Use the full path.
324 target_path = target_path.relativeToPathRoot
325 else:
326 target_path = source_path.basename()
327 if prefix:
328 target_path = prefix + target_path
330 target_uri = destination_directory.join(target_path).abspath()
331 if target_uri.relative_to(destination_directory) is None:
332 raise ValueError(f"File path attempts to escape destination directory: '{source_path}'")
333 return target_uri
336class RetrievalCallable(Protocol):
337 def __call__( 337 ↛ exitline 337 didn't return from function '__call__' because
338 self,
339 refs: Iterable[DatasetRef],
340 destination: ResourcePath,
341 transfer: str,
342 preserve_path: bool,
343 overwrite: bool,
344 write_index: bool,
345 add_prefix: bool,
346 ) -> dict[ResourcePath, ArtifactIndexInfo]: ...
349def retrieve_and_zip(
350 refs: Iterable[DatasetRef],
351 destination: ResourcePathExpression,
352 retrieval_callback: RetrievalCallable,
353 overwrite: bool = True,
354) -> ResourcePath:
355 """Retrieve artifacts from a Butler and place in ZIP file.
357 Parameters
358 ----------
359 refs : `collections.abc.Iterable` [ `DatasetRef` ]
360 The datasets to be included in the zip file. Must all be from
361 the same dataset type.
362 destination : `lsst.resources.ResourcePath`
363 Directory to write the new ZIP file. This directory will
364 also be used as a staging area for the datasets being downloaded
365 from the datastore.
366 retrieval_callback : `~collections.abc.Callable`
367 Bound method for a function that can retrieve the artifacts and
368 return the metadata necessary for creating the zip index. For example
369 `lsst.daf.butler.datastore.Datastore.retrieveArtifacts`.
370 overwrite : `bool`, optional
371 If `False` the output Zip will not be written if a file of the
372 same name is already present in ``destination``.
374 Returns
375 -------
376 zip_file : `lsst.resources.ResourcePath`
377 The path to the new ZIP file.
379 Raises
380 ------
381 ValueError
382 Raised if there are no refs to retrieve.
383 """
384 if not refs:
385 raise ValueError("Requested Zip file with no contents.")
387 outdir = ResourcePath(destination, forceDirectory=True)
388 if not outdir.isdir():
389 raise ValueError(f"Destination location must refer to a directory. Given {destination}")
391 if not outdir.exists():
392 outdir.mkdir()
394 # Simplest approach:
395 # - create temp dir in destination
396 # - Run retrieveArtifacts to that temp dir
397 # - Create zip file with unique name.
398 # - Delete temp dir
399 # - Add index file to ZIP.
400 # - Return name of zip file.
401 with tempfile.TemporaryDirectory(dir=outdir.ospath, ignore_cleanup_errors=True) as tmpdir:
402 tmpdir_path = ResourcePath(tmpdir, forceDirectory=True)
403 # Retrieve the artifacts and write the index file. Strip paths.
404 artifact_map = retrieval_callback(
405 refs=refs,
406 destination=tmpdir_path,
407 transfer="auto",
408 preserve_path=False,
409 overwrite=False,
410 write_index=True,
411 add_prefix=True,
412 )
414 # Read the index to construct file name.
415 index_path = tmpdir_path.join(ZipIndex.index_name, forceDirectory=False)
416 index_json = index_path.read()
417 index = ZipIndex.model_validate_json(index_json)
419 # Use unique name based on files in Zip.
420 zip_file_name = index.calculate_zip_file_name()
421 zip_path = outdir.join(zip_file_name, forceDirectory=False)
422 if not overwrite and zip_path.exists():
423 raise FileExistsError(f"Output Zip at {zip_path} already exists but cannot overwrite.")
424 with zipfile.ZipFile(zip_path.ospath, "w") as zip:
425 zip.write(index_path.ospath, index_path.basename(), compress_type=zipfile.ZIP_DEFLATED)
426 for path, name in index.calc_relative_paths(tmpdir_path, list(artifact_map)).items():
427 zip.write(path.ospath, name)
429 return zip_path
432def unpack_zips(
433 zips_to_transfer: Iterable[ResourcePath],
434 allowed_ids: set[uuid.UUID],
435 destination: ResourcePath,
436 preserve_path: bool,
437 overwrite: bool,
438) -> dict[ResourcePath, ArtifactIndexInfo]:
439 """Transfer the Zip files and unpack them in the destination directory.
441 Parameters
442 ----------
443 zips_to_transfer : `~collections.abc.Iterable` \
444 [ `~lsst.resources.ResourcePath` ]
445 Paths to Zip files to unpack. These must be Zip files that include
446 the index information and were created by the Butler.
447 allowed_ids : `set` [ `uuid.UUID` ]
448 All the possible dataset IDs for which artifacts should be extracted
449 from the Zip file. If an ID in the Zip file is not present in this
450 list the artifact will not be extracted from the Zip.
451 destination : `~lsst.resources.ResourcePath`
452 Output destination for the Zip contents.
453 preserve_path : `bool`
454 Whether to include subdirectories during extraction. If `True` a
455 directory will be made per Zip.
456 overwrite : `bool`, optional
457 If `True` allow transfers to overwrite existing files at the
458 destination.
460 Returns
461 -------
462 artifact_map : `dict` \
463 [ `~lsst.resources.ResourcePath`, `ArtifactIndexInfo` ]
464 Path linking Zip contents location to associated artifact information.
465 """
466 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
467 for source_uri in zips_to_transfer:
468 _LOG.debug("Unpacking zip file %s", source_uri)
469 # Assume that downloading to temporary location is more efficient
470 # than trying to read the contents remotely.
471 with ResourcePath.temporary_uri(suffix=".zip") as temp:
472 temp.transfer_from(source_uri, transfer="auto")
473 index = ZipIndex.from_zip_file(temp)
475 if preserve_path:
476 subdir = ResourcePath(
477 index.calculate_zip_file_path_in_store(), forceDirectory=False, forceAbsolute=False
478 ).dirname()
479 outdir = destination.join(subdir)
480 else:
481 outdir = destination
482 outdir.mkdir()
483 with temp.open("rb") as fd, zipfile.ZipFile(fd) as zf:
484 for path_in_zip, artifact_info in index.artifact_map.items():
485 # Skip if this specific dataset ref is not requested.
486 included_ids = artifact_info.ids & allowed_ids
487 if included_ids:
488 # Do not apply a new prefix since the zip file
489 # should already have a prefix.
490 output_path = outdir.join(path_in_zip, forceDirectory=False)
491 if not overwrite and output_path.exists():
492 raise FileExistsError(
493 f"Destination path '{output_path}' already exists. "
494 "Extraction from Zip cannot be completed."
495 )
496 zf.extract(path_in_zip, path=outdir.ospath)
497 artifact_map[output_path] = artifact_info.subset(included_ids)
498 return artifact_map