Coverage for python / lsst / resources / utils.py: 43%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:32 +0000

1# This file is part of lsst-resources. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("NoTransaction", "TransactionProtocol", "get_tempdir", "os2posix", "posix2os") 

15 

16import contextlib 

17import logging 

18import multiprocessing 

19import os 

20import posixpath 

21import shutil 

22import stat 

23import tempfile 

24from collections.abc import Callable, Iterator 

25from functools import cache 

26from pathlib import Path, PurePath, PurePosixPath 

27from typing import Any, Protocol 

28 

29# Determine if the path separator for the OS looks like POSIX 

30IS_POSIX = os.sep == posixpath.sep 

31 

32# Root path for this operating system. This can use getcwd which 

33# can fail in some situations so in the default case assume that 

34# posix means posix and only determine explicitly in the non-posix case. 

35OS_ROOT_PATH = posixpath.sep if IS_POSIX else Path().resolve().root 

36 

37# Maximum number of worker threads for parallelized operations. 

38# If greater than 10, be aware that this number has to be consistent 

39# with connection pool sizing (for example in urllib3). 

40MAX_WORKERS = 10 

41 

42log = logging.getLogger(__name__) 

43 

44 

45def os2posix(ospath: str) -> str: 

46 """Convert a local path description to a POSIX path description. 

47 

48 Parameters 

49 ---------- 

50 ospath : `str` 

51 Path using the local path separator. 

52 

53 Returns 

54 ------- 

55 posix : `str` 

56 Path using POSIX path separator. 

57 """ 

58 if IS_POSIX: 

59 return ospath 

60 

61 posix = PurePath(ospath).as_posix() 

62 

63 # PurePath strips trailing "/" from paths such that you can no 

64 # longer tell if a path is meant to be referring to a directory 

65 # Try to fix this. 

66 if ospath.endswith(os.sep) and not posix.endswith(posixpath.sep): 

67 posix += posixpath.sep 

68 

69 return posix 

70 

71 

72def posix2os(posix: PurePath | str) -> str: 

73 """Convert a POSIX path description to a local path description. 

74 

75 Parameters 

76 ---------- 

77 posix : `str`, `~pathlib.PurePath` 

78 Path using the POSIX path separator. 

79 

80 Returns 

81 ------- 

82 ospath : `str` 

83 Path using OS path separator. 

84 """ 

85 if IS_POSIX: 

86 return str(posix) 

87 

88 posixPath = PurePosixPath(posix) 

89 paths = list(posixPath.parts) 

90 

91 # Have to convert the root directory after splitting 

92 if paths[0] == posixPath.root: 

93 paths[0] = OS_ROOT_PATH 

94 

95 # Trailing "/" is stripped so we need to add back an empty path 

96 # for consistency 

97 if str(posix).endswith(posixpath.sep): 

98 paths.append("") 

99 

100 return os.path.join(*paths) 

101 

102 

103@cache 

104def get_tempdir() -> str: 

105 """Get POSIX path to temporary directory. 

106 

107 Returns 

108 ------- 

109 tmpdir : `str` 

110 Path to the default temporary directory location. 

111 

112 Notes 

113 ----- 

114 Uses the value of environment variables ``LSST_RESOURCES_TMPDIR`` or 

115 ``TMPDIR``, if defined. Otherwise use the system temporary directory, 

116 with a last-resort fallback to the current working directory if 

117 nothing else is available. 

118 """ 

119 tmpdir = None 

120 # $TMPDIR is also checked with getttempdir() below. 

121 for dir in (os.getenv(v) for v in ("LSST_RESOURCES_TMPDIR", "TMPDIR")): 

122 if dir and os.path.isdir(dir): 

123 tmpdir = dir 

124 break 

125 

126 if tmpdir is None: 

127 tmpdir = tempfile.gettempdir() 

128 

129 return tmpdir 

130 

131 

132class NoTransaction: 

133 """A simple emulation of the 

134 `~lsst.daf.butler.core.datastore.DatastoreTransaction` class. 

135 

136 Notes 

137 ----- 

138 Does nothing. Used as a fallback in the absence of an explicit transaction 

139 class. 

140 """ 

141 

142 def __init__(self) -> None: 

143 return 

144 

145 @contextlib.contextmanager 

146 def undoWith(self, name: str, undoFunc: Callable, *args: Any, **kwargs: Any) -> Iterator[None]: 

147 """No-op context manager to replace 

148 `~lsst.daf.butler.core.datastore.DatastoreTransaction`. 

149 

150 Parameters 

151 ---------- 

152 name : `str` 

153 The name of this undo request. 

154 undoFunc : `~collections.abc.Callable` 

155 Function to call if there is an exception. Not used. 

156 *args : `~typing.Any` 

157 Parameters to pass to ``undoFunc``. 

158 **kwargs : `~typing.Any` 

159 Keyword parameters to pass to ``undoFunc``. 

160 

161 Yields 

162 ------ 

163 `None` 

164 Context manager returns nothing since transactions are disabled 

165 by definition. 

166 """ 

167 yield None 

168 

169 

170class TransactionProtocol(Protocol): 

171 """Protocol for type checking transaction interface.""" 

172 

173 @contextlib.contextmanager 

174 def undoWith(self, name: str, undoFunc: Callable, *args: Any, **kwargs: Any) -> Iterator[None]: ... 174 ↛ exitline 174 didn't return from function 'undoWith' because

175 

176 

177def makeTestTempDir(default_base: str | None = None) -> str: 

178 """Create a temporary directory for test usage. 

179 

180 The directory will be created within ``LSST_RESOURCES_TEST_TMP`` if that 

181 environment variable is set, falling back to ``LSST_RESOURCES_TMPDIR`` 

182 amd then ``default_base`` if none are set. 

183 

184 Parameters 

185 ---------- 

186 default_base : `str`, optional 

187 Default parent directory. Will use system default if no environment 

188 variables are set and base is set to `None`. 

189 

190 Returns 

191 ------- 

192 dir : `str` 

193 Name of the new temporary directory. 

194 """ 

195 base = default_base 

196 for envvar in ("LSST_RESOURCES_TEST_TMP", "LSST_RESOURCES_TMPDIR"): 

197 if envvar in os.environ and os.environ[envvar]: 

198 base = os.environ[envvar] 

199 break 

200 return tempfile.mkdtemp(dir=base) 

201 

202 

203def removeTestTempDir(root: str | None) -> None: 

204 """Attempt to remove a temporary test directory, but do not raise if 

205 unable to. 

206 

207 Unlike `tempfile.TemporaryDirectory`, this passes ``ignore_errors=True`` 

208 to ``shutil.rmtree`` at close, making it safe to use on NFS. 

209 

210 Parameters 

211 ---------- 

212 root : `str`, optional 

213 Name of the directory to be removed. If `None`, nothing will be done. 

214 """ 

215 if root is not None and os.path.exists(root): 

216 shutil.rmtree(root, ignore_errors=True) 

217 

218 

219def ensure_directory_is_writeable(directory_path: str | bytes) -> None: 

220 """Given the path to a directory, ensures that we are able to write it and 

221 access files in it. 

222 

223 Alters the directory permissions by adding the owner-write and 

224 owner-traverse permission bits if they aren't already set 

225 

226 Parameters 

227 ---------- 

228 directory_path : `str` or `bytes` 

229 Path to the directory that will be made writeable. 

230 """ 

231 current_mode = os.stat(directory_path).st_mode 

232 desired_mode = current_mode | stat.S_IWUSR | stat.S_IXUSR 

233 if current_mode != desired_mode: 

234 os.chmod(directory_path, desired_mode) 

235 

236 

237def _get_int_env_var(env_var: str) -> int | None: 

238 int_value = None 

239 env_value = os.getenv(env_var) 

240 if env_value is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 with contextlib.suppress(TypeError): 

242 int_value = int(env_value) 

243 return int_value 

244 

245 

246@cache 

247def _get_num_workers() -> int: 

248 f"""Calculate the number of workers to use. 

249 

250 Returns 

251 ------- 

252 num : `int` 

253 The number of workers to use. Will use the value of the 

254 ``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall 

255 back to using the CPU count (plus 2) but capped at {MAX_WORKERS}. 

256 """ 

257 num_workers: int | None = None 

258 num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS") 

259 

260 # If someone is explicitly specifying a number, let them use that number. 

261 if num_workers is not None: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 return num_workers 

263 

264 if num_workers is None: 264 ↛ 271line 264 didn't jump to line 271 because the condition on line 264 was always true

265 # CPU_LIMIT is used on nublado. 

266 cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count() 

267 if cpu_limit is not None: 267 ↛ 271line 267 didn't jump to line 271 because the condition on line 267 was always true

268 num_workers = cpu_limit + 2 

269 

270 # But don't ever return more than the maximum allowed. 

271 return min([num_workers, MAX_WORKERS])