Coverage for python / lsst / daf / butler / _rubin / temporary_for_ingest.py: 49%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("TemporaryForIngest",)
32import dataclasses
33import glob
34from contextlib import contextmanager
35from typing import TYPE_CHECKING, Self, cast
37from lsst.resources import ResourcePath
39if TYPE_CHECKING:
40 from collections.abc import Iterator
41 from types import TracebackType
43 from .._butler import Butler
44 from .._dataset_ref import DatasetRef
45 from .._file_dataset import FileDataset
46 from .._limited_butler import LimitedButler
49@dataclasses.dataclass
50class TemporaryForIngest:
51 """A context manager for generating temporary paths that will be ingested
52 as butler datasets.
54 Notes
55 -----
56 Neither this class nor its `make_path` method run ingest automatically when
57 their context manager is exited; the `ingest` method must always be called
58 explicitly.
59 """
61 butler: Butler
62 """Full butler to obtain a predicted path from and ingest into."""
64 ref: DatasetRef
65 """Description of the dataset to ingest."""
67 dataset: FileDataset = dataclasses.field(init=False)
68 """The dataset that will be passed to `Butler.ingest`."""
70 @property
71 def path(self) -> ResourcePath:
72 """The temporary path.
74 Guaranteed to be a local POSIX path.
75 """
76 return cast(ResourcePath, self.dataset.path)
78 @property
79 def ospath(self) -> str:
80 """The temporary path as a complete filename."""
81 return self.path.ospath
83 @classmethod
84 @contextmanager
85 def make_path(cls, final_path: ResourcePath) -> Iterator[ResourcePath]:
86 """Return a temporary path context manager given the predicted final
87 path.
89 Parameters
90 ----------
91 final_path : `lsst.resources.ResourcePath`
92 Predicted final path.
94 Returns
95 -------
96 context : `contextlib.AbstractContextManager`
97 A context manager that yields the temporary
98 `~lsst.resources.ResourcePath` when entered and deletes that file
99 when exited.
100 """
101 # Always write to a temporary even if using a local file system -- that
102 # gives us atomic writes. If a process is killed as the file is being
103 # written we do not want it to remain in the correct place but in
104 # corrupt state. For local files write to the output directory not
105 # temporary dir.
106 prefix = final_path.dirname() if final_path.isLocal else None
107 if prefix is not None:
108 prefix.mkdir()
109 with ResourcePath.temporary_uri(
110 suffix=cls._get_temporary_suffix(final_path), prefix=prefix
111 ) as temporary_path:
112 yield temporary_path
114 def ingest(self, record_validation_info: bool = True) -> None:
115 """Ingest the file into the butler.
117 Parameters
118 ----------
119 record_validation_info : `bool`, optional
120 Whether to- record the file size and checksum upon ingest.
121 """
122 self.butler.ingest(self.dataset, transfer="move", record_validation_info=record_validation_info)
124 def __enter__(self) -> Self:
125 from .._file_dataset import FileDataset
127 final_path = self.butler.getURI(self.ref, predict=True).replace(fragment="")
128 prefix = final_path.dirname() if final_path.isLocal else None
129 if prefix is not None:
130 prefix.mkdir()
131 self._temporary_path_context = self.make_path(final_path)
132 temporary_path = self._temporary_path_context.__enter__()
133 self.dataset = FileDataset(temporary_path, [self.ref], formatter=None)
134 return self
136 def __exit__(
137 self,
138 exc_type: type[BaseException] | None,
139 exc_value: BaseException | None,
140 traceback: TracebackType | None,
141 ) -> bool | None:
142 return self._temporary_path_context.__exit__(exc_type, exc_value, traceback)
144 @classmethod
145 def find_orphaned_temporaries_by_path(cls, final_path: ResourcePath) -> list[ResourcePath]:
146 """Search for temporary files that were not successfully ingested.
148 Parameters
149 ----------
150 final_path : `lsst.resources.ResourcePath`
151 Final path a successfully-ingested file would have.
153 Returns
154 -------
155 paths : `list` [ `lsst.resources.ResourcePath` ]
156 Files that look like temporaries that might have been created while
157 trying to write the target dataset.
159 Notes
160 -----
161 Orphaned files are only possible when a context manager is interrupted
162 by a hard error that prevents any cleanup code from running (e.g.
163 sudden loss of power).
164 """
165 if not final_path.isLocal:
166 # We return true tempfile for non-local predicted paths, so orphans
167 # are not our problem (the OS etc. will take care of them).
168 return []
169 return [
170 ResourcePath(filename)
171 for filename in glob.glob(
172 f"{glob.escape(final_path.dirname().ospath)}*{glob.escape(cls._get_temporary_suffix(final_path))}"
173 )
174 if filename != final_path.ospath
175 ]
177 @classmethod
178 def find_orphaned_temporaries_by_ref(cls, ref: DatasetRef, butler: LimitedButler) -> list[ResourcePath]:
179 """Search for temporary files that were not successfully ingested.
181 Parameters
182 ----------
183 ref : `~lsst.daf.butler.DatasetRef`
184 A dataset reference the temporaries correspond to.
185 butler : `~lsst.daf.butler.LimitedButler`
186 Butler that can be used to obtain a predicted URI for a dataset.
188 Returns
189 -------
190 paths : `list` [ `lsst.resources.ResourcePath` ]
191 Files that look like temporaries that might have been created while
192 trying to write the target dataset.
194 Notes
195 -----
196 Orphaned files are only possible when a context manager is interrupted
197 by a hard error that prevents any cleanup code from running (e.g.
198 sudden loss of power).
199 """
200 final_path = butler.getURI(ref, predict=True).replace(fragment="")
201 return cls.find_orphaned_temporaries_by_path(final_path)
203 @staticmethod
204 def _get_temporary_suffix(path: ResourcePath) -> str:
205 ext = path.getExtension()
206 basename = path.basename().removesuffix(ext)
207 return f"{basename}.tmp{ext}"