Coverage for python / lsst / utils / introspection.py: 12%
123 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
1# This file is part of utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
11#
12"""Utilities relating to introspection in python."""
14from __future__ import annotations
16__all__ = [
17 "find_outside_stacklevel",
18 "get_caller_name",
19 "get_class_of",
20 "get_full_type_name",
21 "get_instance_of",
22 "take_object_census",
23 "trace_object_references",
24]
26import builtins
27import collections
28import gc
29import inspect
30import itertools
31import sys
32import types
33import warnings
34from collections.abc import Set
35from typing import Any
37from .doImport import doImport, doImportType
40def get_full_type_name(cls_: Any) -> str:
41 """Return full type name of the supplied entity.
43 Parameters
44 ----------
45 cls_ : `type` or `object`
46 Entity from which to obtain the full name. Can be an instance
47 or a `type`.
49 Returns
50 -------
51 name : `str`
52 Full name of type.
54 Notes
55 -----
56 Builtins are returned without the ``builtins`` specifier included. This
57 allows `str` to be returned as "str" rather than "builtins.str". Any
58 parts of the path that start with a leading underscore are removed
59 on the assumption that they are an implementation detail and the
60 entity will be hoisted into the parent namespace.
61 """
62 # If we have a module that needs to be converted directly
63 # to a name.
64 if isinstance(cls_, types.ModuleType):
65 return cls_.__name__
66 # If we have an instance we need to convert to a type
67 if not hasattr(cls_, "__qualname__"):
68 cls_ = type(cls_)
69 if hasattr(builtins, cls_.__qualname__):
70 # Special case builtins such as str and dict
71 return cls_.__qualname__
73 real_name = cls_.__module__ + "." + cls_.__qualname__
75 # Remove components with leading underscores
76 cleaned_name = ".".join(c for c in real_name.split(".") if not c.startswith("_"))
78 # Consistency check
79 if real_name != cleaned_name:
80 try:
81 test = doImport(cleaned_name)
82 except Exception:
83 # Could not import anything so return the real name
84 return real_name
86 # The thing we imported should match the class we started with
87 # despite the clean up. If it does not we return the real name
88 if test is not cls_:
89 return real_name
91 return cleaned_name
94def get_class_of(typeOrName: type | str | types.ModuleType) -> type:
95 """Given the type name or a type, return the python type.
97 If a type name is given, an attempt will be made to import the type.
99 Parameters
100 ----------
101 typeOrName : `str` or Python class
102 A string describing the Python class to load or a Python type.
104 Returns
105 -------
106 type_ : `type`
107 Directly returns the Python type if a type was provided, else
108 tries to import the given string and returns the resulting type.
110 Notes
111 -----
112 This is a thin wrapper around `~lsst.utils.doImport`.
114 Raises
115 ------
116 TypeError
117 Raised if a module is imported rather than a type.
118 """
119 if isinstance(typeOrName, str):
120 cls = doImportType(typeOrName)
121 else:
122 if isinstance(typeOrName, types.ModuleType):
123 raise TypeError(f"Can not get class of module {get_full_type_name(typeOrName)}")
124 cls = typeOrName
125 return cls
128def get_instance_of(typeOrName: type | str, *args: Any, **kwargs: Any) -> Any:
129 """Given the type name or a type, instantiate an object of that type.
131 If a type name is given, an attempt will be made to import the type.
133 Parameters
134 ----------
135 typeOrName : `str` or Python class
136 A string describing the Python class to load or a Python type.
137 *args : `tuple`
138 Positional arguments to use pass to the object constructor.
139 **kwargs
140 Keyword arguments to pass to object constructor.
142 Returns
143 -------
144 instance : `object`
145 Instance of the requested type, instantiated with the provided
146 parameters.
148 Raises
149 ------
150 TypeError
151 Raised if a module is imported rather than a type.
152 """
153 cls = get_class_of(typeOrName)
154 return cls(*args, **kwargs)
157def get_caller_name(stacklevel: int = 2) -> str:
158 """Get the name of the caller method.
160 Any item that cannot be determined (or is not relevant, e.g. a free
161 function has no class) is silently omitted, along with an
162 associated separator.
164 Parameters
165 ----------
166 stacklevel : `int`
167 How many levels of stack to skip while getting caller name;
168 1 means "who calls me", 2 means "who calls my caller", etc.
170 Returns
171 -------
172 name : `str`
173 Name of the caller as a string in the form ``module.class.method``.
174 An empty string is returned if ``stacklevel`` exceeds the stack height.
176 Notes
177 -----
178 Adapted from http://stackoverflow.com/a/9812105
179 by adding support to get the class from ``parentframe.f_locals['cls']``
180 """
181 stack = inspect.stack()
182 start = 0 + stacklevel
183 if len(stack) < start + 1:
184 return ""
185 parentframe = stack[start][0]
187 name = []
188 module = inspect.getmodule(parentframe)
189 if module:
190 name.append(module.__name__)
191 # add class name, if any
192 if "self" in parentframe.f_locals:
193 name.append(type(parentframe.f_locals["self"]).__name__)
194 elif "cls" in parentframe.f_locals:
195 name.append(parentframe.f_locals["cls"].__name__)
196 codename = parentframe.f_code.co_name
197 if codename != "<module>": # top level usually
198 name.append(codename) # function or a method
199 return ".".join(name)
202def find_outside_stacklevel(
203 *module_names: str,
204 allow_modules: Set[str] = frozenset(),
205 allow_methods: Set[str] = frozenset(),
206 stack_info: dict[str, Any] | None = None,
207) -> int:
208 """Find the stacklevel for outside of the given module.
210 This can be used to determine the stacklevel parameter that should be
211 passed to log messages or warnings in order to make them appear to
212 come from external code and not this package.
214 Parameters
215 ----------
216 *module_names : `str`
217 The names of the modules to skip when calculating the relevant stack
218 level.
219 allow_modules : `set` [`str`]
220 Names that should not be skipped when calculating the stacklevel.
221 If the module name starts with any of the names in this set the
222 corresponding stacklevel is used.
223 allow_methods : `set` [`str`]
224 Method names that are allowed to be treated as "outside". Fully
225 qualified method names must match exactly. Method names without
226 path components will match solely the method name itself. On Python
227 3.10 fully qualified names are not supported.
228 stack_info : `dict` or `None`, optional
229 If given, the dictionary is filled with information from
230 the relevant stack frame. This can be used to form your own warning
231 message without having to call :func:`inspect.stack` yourself with
232 the stack level.
234 Returns
235 -------
236 stacklevel : `int`
237 The stacklevel to use matching the first stack frame outside of the
238 given module.
240 Examples
241 --------
242 .. code-block:: python
244 warnings.warn(
245 "A warning message", stacklevel=find_outside_stacklevel("lsst.daf")
246 )
247 """
248 if sys.version_info < (3, 11, 0):
249 short_names = {m for m in allow_methods if "." not in m}
250 if len(short_names) != len(allow_methods):
251 warnings.warn(
252 "Python 3.10 does not support fully qualified names in allow_methods. Dropping them.",
253 stacklevel=2,
254 )
255 allow_methods = short_names
257 need_full_names = any("." in m for m in allow_methods)
259 if stack_info is not None:
260 # Ensure it is empty when we start.
261 stack_info.clear()
263 stacklevel = -1
264 for i, s in enumerate(inspect.stack()):
265 # This function is never going to be the right answer.
266 if i == 0:
267 continue
268 module = inspect.getmodule(s.frame)
269 if module is None:
270 continue
272 if stack_info is not None:
273 stack_info["filename"] = s.filename
274 stack_info["lineno"] = s.lineno
275 stack_info["name"] = s.frame.f_code.co_name
277 if allow_methods:
278 code = s.frame.f_code
279 names = {code.co_name} # The name of the function itself.
280 if need_full_names:
281 full_name = f"{module.__name__}.{code.co_qualname}"
282 names.add(full_name)
283 if names & allow_methods:
284 # Method name is allowed so we stop here.
285 del s
286 stacklevel = i
287 break
289 # Stack frames sometimes hang around so explicitly delete.
290 del s
292 if (
293 # The module does not match any of the skipped names.
294 not any(module.__name__.startswith(name) for name in module_names)
295 # This match is explicitly allowed to be treated as "outside".
296 or any(module.__name__.startswith(name) for name in allow_modules)
297 ):
298 # 0 will be this function.
299 # 1 will be the caller
300 # and so does not need adjustment.
301 stacklevel = i
302 break
303 else:
304 # The top can't be inside the module.
305 stacklevel = i
307 return stacklevel
310def take_object_census() -> collections.Counter[type]:
311 """Count the number of existing objects, by type.
313 The census is returned as a `~collections.Counter` object. Expected usage
314 involves taking the difference with a different `~collections.Counter` and
315 examining any changes.
317 Returns
318 -------
319 census : `collections.Counter` [`type`]
320 The number of objects found of each type.
322 Notes
323 -----
324 This function counts *all* Python objects in memory. To count only
325 reachable objects, run `gc.collect` first.
326 """
327 counts: collections.Counter[type] = collections.Counter()
328 for obj in gc.get_objects():
329 counts[type(obj)] += 1
330 return counts
333def trace_object_references(
334 target_class: type,
335 count: int = 5,
336 max_level: int = 10,
337) -> tuple[list[list], bool]:
338 """Find the chain(s) of references that make(s) objects of a class
339 reachable.
341 Parameters
342 ----------
343 target_class : `type`
344 The class whose objects need to be traced. This is typically a class
345 that is known to be leaking.
346 count : `int`, optional
347 The number of example objects to trace, if that many exist.
348 max_level : `int`, optional
349 The number of levels of references to trace. ``max_level=1`` means
350 finding only objects that directly refer to the examples.
352 Returns
353 -------
354 traces : `list` [`list`]
355 A sequence whose first element (index 0) is the set of example objects
356 of type ``target_class``, whose second element (index 1) is the set of
357 objects that refer to the examples, and so on. Contains at most
358 ``max_level + 1`` elements.
359 trace_complete : `bool`
360 `True` if the trace for all objects terminated in at most
361 ``max_level`` references, and `False` if more references exist.
363 Examples
364 --------
365 An example with two levels of references:
367 >>> from collections import namedtuple
368 >>> class Foo:
369 ... pass
370 >>> holder = namedtuple("Holder", ["bar", "baz"])
371 >>> myholder = holder(bar={"object": Foo()}, baz=42)
372 >>> # In doctest, the trace extends up to the whole global dict
373 >>> # if you let it.
374 >>> trace_object_references(Foo, max_level=2) # doctest: +ELLIPSIS
375 ... # doctest: +NORMALIZE_WHITESPACE
376 ([[<lsst.utils.introspection.Foo object at ...>],
377 [{'object': <lsst.utils.introspection.Foo object at ...>}],
378 [Holder(bar={'object': <lsst.utils.introspection.Foo object at ...>},
379 baz=42)]], False)
380 """
382 def class_filter(o: Any) -> bool:
383 return isinstance(o, target_class)
385 # set() would be more appropriate, but objects may not be hashable.
386 objs = list(itertools.islice(filter(class_filter, gc.get_objects()), count))
387 if objs:
388 return _recurse_trace(objs, remaining=max_level)
389 else:
390 return [objs], True
393def _recurse_trace(objs: list, remaining: int) -> tuple[list[list], bool]:
394 """Recursively find references to a set of objects.
396 Parameters
397 ----------
398 objs : `list`
399 The objects to trace.
400 remaining : `int`
401 The number of levels of references to trace.
403 Returns
404 -------
405 traces : `list` [`list`]
406 A sequence whose first element (index 0) is ``objs``, whose second
407 element (index 1) is the set of objects that refer to those, and so on.
408 Contains at most ``remaining + 1``.
409 trace_complete : `bool`
410 `True` if the trace for all objects terminated in at most
411 ``remaining`` references, and `False` if more references exist.
412 """
413 # Filter out our own references to the objects. This is needed to avoid
414 # circular recursion.
415 refs = _get_clean_refs(objs)
417 if refs:
418 if remaining > 1:
419 more_refs, complete = _recurse_trace(refs, remaining=remaining - 1)
420 more_refs.insert(0, objs)
421 return more_refs, complete
422 else:
423 more_refs = _get_clean_refs(refs)
424 return [objs, refs], (not more_refs)
425 else:
426 return [objs], True
429def _get_clean_refs(objects: list) -> list:
430 """Find references to a set of objects, excluding those needed to query
431 for references.
433 Parameters
434 ----------
435 objects : `list`
436 The objects to find references for.
438 Returns
439 -------
440 refs : `list`
441 The objects that refer to the elements of ``objects``, not counting
442 ``objects`` itself.
443 """
444 # Pre-create the tuple so we know its id() and can filter it out.
445 # This allows for difference in behavior between python 3.12 and 3.13
446 # when calling gc.get_referrers with multiple arguments.
447 objects_tuple = tuple(objects)
448 refs = gc.get_referrers(*objects_tuple)
449 ids_to_drop = {id(objects), id(objects_tuple)}
450 refs = [ref for ref in refs if id(ref) not in ids_to_drop]
451 refs = [ref for ref in refs if not type(ref).__name__.endswith("_iterator")]
452 return refs