Coverage for python / lsst / resources / utils.py: 43%
88 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:44 +0000
1# This file is part of lsst-resources.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("NoTransaction", "TransactionProtocol", "get_tempdir", "os2posix", "posix2os")
16import contextlib
17import logging
18import multiprocessing
19import os
20import posixpath
21import shutil
22import stat
23import tempfile
24from collections.abc import Callable, Iterator
25from functools import cache
26from pathlib import Path, PurePath, PurePosixPath
27from typing import Any, Protocol
29# Determine if the path separator for the OS looks like POSIX
30IS_POSIX = os.sep == posixpath.sep
32# Root path for this operating system. This can use getcwd which
33# can fail in some situations so in the default case assume that
34# posix means posix and only determine explicitly in the non-posix case.
35OS_ROOT_PATH = posixpath.sep if IS_POSIX else Path().resolve().root
37# Maximum number of worker threads for parallelized operations.
38# If greater than 10, be aware that this number has to be consistent
39# with connection pool sizing (for example in urllib3).
40MAX_WORKERS = 10
42log = logging.getLogger(__name__)
45def os2posix(ospath: str) -> str:
46 """Convert a local path description to a POSIX path description.
48 Parameters
49 ----------
50 ospath : `str`
51 Path using the local path separator.
53 Returns
54 -------
55 posix : `str`
56 Path using POSIX path separator.
57 """
58 if IS_POSIX:
59 return ospath
61 posix = PurePath(ospath).as_posix()
63 # PurePath strips trailing "/" from paths such that you can no
64 # longer tell if a path is meant to be referring to a directory
65 # Try to fix this.
66 if ospath.endswith(os.sep) and not posix.endswith(posixpath.sep):
67 posix += posixpath.sep
69 return posix
72def posix2os(posix: PurePath | str) -> str:
73 """Convert a POSIX path description to a local path description.
75 Parameters
76 ----------
77 posix : `str`, `~pathlib.PurePath`
78 Path using the POSIX path separator.
80 Returns
81 -------
82 ospath : `str`
83 Path using OS path separator.
84 """
85 if IS_POSIX:
86 return str(posix)
88 posixPath = PurePosixPath(posix)
89 paths = list(posixPath.parts)
91 # Have to convert the root directory after splitting
92 if paths[0] == posixPath.root:
93 paths[0] = OS_ROOT_PATH
95 # Trailing "/" is stripped so we need to add back an empty path
96 # for consistency
97 if str(posix).endswith(posixpath.sep):
98 paths.append("")
100 return os.path.join(*paths)
103@cache
104def get_tempdir() -> str:
105 """Get POSIX path to temporary directory.
107 Returns
108 -------
109 tmpdir : `str`
110 Path to the default temporary directory location.
112 Notes
113 -----
114 Uses the value of environment variables ``LSST_RESOURCES_TMPDIR`` or
115 ``TMPDIR``, if defined. Otherwise use the system temporary directory,
116 with a last-resort fallback to the current working directory if
117 nothing else is available.
118 """
119 tmpdir = None
120 # $TMPDIR is also checked with getttempdir() below.
121 for dir in (os.getenv(v) for v in ("LSST_RESOURCES_TMPDIR", "TMPDIR")):
122 if dir and os.path.isdir(dir):
123 tmpdir = dir
124 break
126 if tmpdir is None:
127 tmpdir = tempfile.gettempdir()
129 return tmpdir
132class NoTransaction:
133 """A simple emulation of the
134 `~lsst.daf.butler.core.datastore.DatastoreTransaction` class.
136 Notes
137 -----
138 Does nothing. Used as a fallback in the absence of an explicit transaction
139 class.
140 """
142 def __init__(self) -> None:
143 return
145 @contextlib.contextmanager
146 def undoWith(self, name: str, undoFunc: Callable, *args: Any, **kwargs: Any) -> Iterator[None]:
147 """No-op context manager to replace
148 `~lsst.daf.butler.core.datastore.DatastoreTransaction`.
150 Parameters
151 ----------
152 name : `str`
153 The name of this undo request.
154 undoFunc : `~collections.abc.Callable`
155 Function to call if there is an exception. Not used.
156 *args : `~typing.Any`
157 Parameters to pass to ``undoFunc``.
158 **kwargs : `~typing.Any`
159 Keyword parameters to pass to ``undoFunc``.
161 Yields
162 ------
163 `None`
164 Context manager returns nothing since transactions are disabled
165 by definition.
166 """
167 yield None
170class TransactionProtocol(Protocol):
171 """Protocol for type checking transaction interface."""
173 @contextlib.contextmanager
174 def undoWith(self, name: str, undoFunc: Callable, *args: Any, **kwargs: Any) -> Iterator[None]: ... 174 ↛ exitline 174 didn't return from function 'undoWith' because
177def makeTestTempDir(default_base: str | None = None) -> str:
178 """Create a temporary directory for test usage.
180 The directory will be created within ``LSST_RESOURCES_TEST_TMP`` if that
181 environment variable is set, falling back to ``LSST_RESOURCES_TMPDIR``
182 amd then ``default_base`` if none are set.
184 Parameters
185 ----------
186 default_base : `str`, optional
187 Default parent directory. Will use system default if no environment
188 variables are set and base is set to `None`.
190 Returns
191 -------
192 dir : `str`
193 Name of the new temporary directory.
194 """
195 base = default_base
196 for envvar in ("LSST_RESOURCES_TEST_TMP", "LSST_RESOURCES_TMPDIR"):
197 if envvar in os.environ and os.environ[envvar]:
198 base = os.environ[envvar]
199 break
200 return tempfile.mkdtemp(dir=base)
203def removeTestTempDir(root: str | None) -> None:
204 """Attempt to remove a temporary test directory, but do not raise if
205 unable to.
207 Unlike `tempfile.TemporaryDirectory`, this passes ``ignore_errors=True``
208 to ``shutil.rmtree`` at close, making it safe to use on NFS.
210 Parameters
211 ----------
212 root : `str`, optional
213 Name of the directory to be removed. If `None`, nothing will be done.
214 """
215 if root is not None and os.path.exists(root):
216 shutil.rmtree(root, ignore_errors=True)
219def ensure_directory_is_writeable(directory_path: str | bytes) -> None:
220 """Given the path to a directory, ensures that we are able to write it and
221 access files in it.
223 Alters the directory permissions by adding the owner-write and
224 owner-traverse permission bits if they aren't already set
226 Parameters
227 ----------
228 directory_path : `str` or `bytes`
229 Path to the directory that will be made writeable.
230 """
231 current_mode = os.stat(directory_path).st_mode
232 desired_mode = current_mode | stat.S_IWUSR | stat.S_IXUSR
233 if current_mode != desired_mode:
234 os.chmod(directory_path, desired_mode)
237def _get_int_env_var(env_var: str) -> int | None:
238 int_value = None
239 env_value = os.getenv(env_var)
240 if env_value is not None: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 with contextlib.suppress(TypeError):
242 int_value = int(env_value)
243 return int_value
246@cache
247def _get_num_workers() -> int:
248 f"""Calculate the number of workers to use.
250 Returns
251 -------
252 num : `int`
253 The number of workers to use. Will use the value of the
254 ``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall
255 back to using the CPU count (plus 2) but capped at {MAX_WORKERS}.
256 """
257 num_workers: int | None = None
258 num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS")
260 # If someone is explicitly specifying a number, let them use that number.
261 if num_workers is not None: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 return num_workers
264 if num_workers is None: 264 ↛ 271line 264 didn't jump to line 271 because the condition on line 264 was always true
265 # CPU_LIMIT is used on nublado.
266 cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count()
267 if cpu_limit is not None: 267 ↛ 271line 267 didn't jump to line 271 because the condition on line 267 was always true
268 num_workers = cpu_limit + 2
270 # But don't ever return more than the maximum allowed.
271 return min([num_workers, MAX_WORKERS])