Coverage for python / lsst / daf / butler / _rubin / temporary_for_ingest.py: 49%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("TemporaryForIngest",) 

31 

32import dataclasses 

33import glob 

34from contextlib import contextmanager 

35from typing import TYPE_CHECKING, Self, cast 

36 

37from lsst.resources import ResourcePath 

38 

39if TYPE_CHECKING: 

40 from collections.abc import Iterator 

41 from types import TracebackType 

42 

43 from .._butler import Butler 

44 from .._dataset_ref import DatasetRef 

45 from .._file_dataset import FileDataset 

46 from .._limited_butler import LimitedButler 

47 

48 

49@dataclasses.dataclass 

50class TemporaryForIngest: 

51 """A context manager for generating temporary paths that will be ingested 

52 as butler datasets. 

53 

54 Notes 

55 ----- 

56 Neither this class nor its `make_path` method run ingest automatically when 

57 their context manager is exited; the `ingest` method must always be called 

58 explicitly. 

59 """ 

60 

61 butler: Butler 

62 """Full butler to obtain a predicted path from and ingest into.""" 

63 

64 ref: DatasetRef 

65 """Description of the dataset to ingest.""" 

66 

67 dataset: FileDataset = dataclasses.field(init=False) 

68 """The dataset that will be passed to `Butler.ingest`.""" 

69 

70 @property 

71 def path(self) -> ResourcePath: 

72 """The temporary path. 

73 

74 Guaranteed to be a local POSIX path. 

75 """ 

76 return cast(ResourcePath, self.dataset.path) 

77 

78 @property 

79 def ospath(self) -> str: 

80 """The temporary path as a complete filename.""" 

81 return self.path.ospath 

82 

83 @classmethod 

84 @contextmanager 

85 def make_path(cls, final_path: ResourcePath) -> Iterator[ResourcePath]: 

86 """Return a temporary path context manager given the predicted final 

87 path. 

88 

89 Parameters 

90 ---------- 

91 final_path : `lsst.resources.ResourcePath` 

92 Predicted final path. 

93 

94 Returns 

95 ------- 

96 context : `contextlib.AbstractContextManager` 

97 A context manager that yields the temporary 

98 `~lsst.resources.ResourcePath` when entered and deletes that file 

99 when exited. 

100 """ 

101 # Always write to a temporary even if using a local file system -- that 

102 # gives us atomic writes. If a process is killed as the file is being 

103 # written we do not want it to remain in the correct place but in 

104 # corrupt state. For local files write to the output directory not 

105 # temporary dir. 

106 prefix = final_path.dirname() if final_path.isLocal else None 

107 if prefix is not None: 

108 prefix.mkdir() 

109 with ResourcePath.temporary_uri( 

110 suffix=cls._get_temporary_suffix(final_path), prefix=prefix 

111 ) as temporary_path: 

112 yield temporary_path 

113 

114 def ingest(self, record_validation_info: bool = True) -> None: 

115 """Ingest the file into the butler. 

116 

117 Parameters 

118 ---------- 

119 record_validation_info : `bool`, optional 

120 Whether to- record the file size and checksum upon ingest. 

121 """ 

122 self.butler.ingest(self.dataset, transfer="move", record_validation_info=record_validation_info) 

123 

124 def __enter__(self) -> Self: 

125 from .._file_dataset import FileDataset 

126 

127 final_path = self.butler.getURI(self.ref, predict=True).replace(fragment="") 

128 prefix = final_path.dirname() if final_path.isLocal else None 

129 if prefix is not None: 

130 prefix.mkdir() 

131 self._temporary_path_context = self.make_path(final_path) 

132 temporary_path = self._temporary_path_context.__enter__() 

133 self.dataset = FileDataset(temporary_path, [self.ref], formatter=None) 

134 return self 

135 

136 def __exit__( 

137 self, 

138 exc_type: type[BaseException] | None, 

139 exc_value: BaseException | None, 

140 traceback: TracebackType | None, 

141 ) -> bool | None: 

142 return self._temporary_path_context.__exit__(exc_type, exc_value, traceback) 

143 

144 @classmethod 

145 def find_orphaned_temporaries_by_path(cls, final_path: ResourcePath) -> list[ResourcePath]: 

146 """Search for temporary files that were not successfully ingested. 

147 

148 Parameters 

149 ---------- 

150 final_path : `lsst.resources.ResourcePath` 

151 Final path a successfully-ingested file would have. 

152 

153 Returns 

154 ------- 

155 paths : `list` [ `lsst.resources.ResourcePath` ] 

156 Files that look like temporaries that might have been created while 

157 trying to write the target dataset. 

158 

159 Notes 

160 ----- 

161 Orphaned files are only possible when a context manager is interrupted 

162 by a hard error that prevents any cleanup code from running (e.g. 

163 sudden loss of power). 

164 """ 

165 if not final_path.isLocal: 

166 # We return true tempfile for non-local predicted paths, so orphans 

167 # are not our problem (the OS etc. will take care of them). 

168 return [] 

169 return [ 

170 ResourcePath(filename) 

171 for filename in glob.glob( 

172 f"{glob.escape(final_path.dirname().ospath)}*{glob.escape(cls._get_temporary_suffix(final_path))}" 

173 ) 

174 if filename != final_path.ospath 

175 ] 

176 

177 @classmethod 

178 def find_orphaned_temporaries_by_ref(cls, ref: DatasetRef, butler: LimitedButler) -> list[ResourcePath]: 

179 """Search for temporary files that were not successfully ingested. 

180 

181 Parameters 

182 ---------- 

183 ref : `~lsst.daf.butler.DatasetRef` 

184 A dataset reference the temporaries correspond to. 

185 butler : `~lsst.daf.butler.LimitedButler` 

186 Butler that can be used to obtain a predicted URI for a dataset. 

187 

188 Returns 

189 ------- 

190 paths : `list` [ `lsst.resources.ResourcePath` ] 

191 Files that look like temporaries that might have been created while 

192 trying to write the target dataset. 

193 

194 Notes 

195 ----- 

196 Orphaned files are only possible when a context manager is interrupted 

197 by a hard error that prevents any cleanup code from running (e.g. 

198 sudden loss of power). 

199 """ 

200 final_path = butler.getURI(ref, predict=True).replace(fragment="") 

201 return cls.find_orphaned_temporaries_by_path(final_path) 

202 

203 @staticmethod 

204 def _get_temporary_suffix(path: ResourcePath) -> str: 

205 ext = path.getExtension() 

206 basename = path.basename().removesuffix(ext) 

207 return f"{basename}.tmp{ext}"