Coverage for python / lsst / utils / introspection.py: 12%

123 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:37 +0000

1# This file is part of utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11# 

12"""Utilities relating to introspection in python.""" 

13 

14from __future__ import annotations 

15 

16__all__ = [ 

17 "find_outside_stacklevel", 

18 "get_caller_name", 

19 "get_class_of", 

20 "get_full_type_name", 

21 "get_instance_of", 

22 "take_object_census", 

23 "trace_object_references", 

24] 

25 

26import builtins 

27import collections 

28import gc 

29import inspect 

30import itertools 

31import sys 

32import types 

33import warnings 

34from collections.abc import Set 

35from typing import Any 

36 

37from .doImport import doImport, doImportType 

38 

39 

40def get_full_type_name(cls_: Any) -> str: 

41 """Return full type name of the supplied entity. 

42 

43 Parameters 

44 ---------- 

45 cls_ : `type` or `object` 

46 Entity from which to obtain the full name. Can be an instance 

47 or a `type`. 

48 

49 Returns 

50 ------- 

51 name : `str` 

52 Full name of type. 

53 

54 Notes 

55 ----- 

56 Builtins are returned without the ``builtins`` specifier included. This 

57 allows `str` to be returned as "str" rather than "builtins.str". Any 

58 parts of the path that start with a leading underscore are removed 

59 on the assumption that they are an implementation detail and the 

60 entity will be hoisted into the parent namespace. 

61 """ 

62 # If we have a module that needs to be converted directly 

63 # to a name. 

64 if isinstance(cls_, types.ModuleType): 

65 return cls_.__name__ 

66 # If we have an instance we need to convert to a type 

67 if not hasattr(cls_, "__qualname__"): 

68 cls_ = type(cls_) 

69 if hasattr(builtins, cls_.__qualname__): 

70 # Special case builtins such as str and dict 

71 return cls_.__qualname__ 

72 

73 real_name = cls_.__module__ + "." + cls_.__qualname__ 

74 

75 # Remove components with leading underscores 

76 cleaned_name = ".".join(c for c in real_name.split(".") if not c.startswith("_")) 

77 

78 # Consistency check 

79 if real_name != cleaned_name: 

80 try: 

81 test = doImport(cleaned_name) 

82 except Exception: 

83 # Could not import anything so return the real name 

84 return real_name 

85 

86 # The thing we imported should match the class we started with 

87 # despite the clean up. If it does not we return the real name 

88 if test is not cls_: 

89 return real_name 

90 

91 return cleaned_name 

92 

93 

94def get_class_of(typeOrName: type | str | types.ModuleType) -> type: 

95 """Given the type name or a type, return the python type. 

96 

97 If a type name is given, an attempt will be made to import the type. 

98 

99 Parameters 

100 ---------- 

101 typeOrName : `str` or Python class 

102 A string describing the Python class to load or a Python type. 

103 

104 Returns 

105 ------- 

106 type_ : `type` 

107 Directly returns the Python type if a type was provided, else 

108 tries to import the given string and returns the resulting type. 

109 

110 Notes 

111 ----- 

112 This is a thin wrapper around `~lsst.utils.doImport`. 

113 

114 Raises 

115 ------ 

116 TypeError 

117 Raised if a module is imported rather than a type. 

118 """ 

119 if isinstance(typeOrName, str): 

120 cls = doImportType(typeOrName) 

121 else: 

122 if isinstance(typeOrName, types.ModuleType): 

123 raise TypeError(f"Can not get class of module {get_full_type_name(typeOrName)}") 

124 cls = typeOrName 

125 return cls 

126 

127 

128def get_instance_of(typeOrName: type | str, *args: Any, **kwargs: Any) -> Any: 

129 """Given the type name or a type, instantiate an object of that type. 

130 

131 If a type name is given, an attempt will be made to import the type. 

132 

133 Parameters 

134 ---------- 

135 typeOrName : `str` or Python class 

136 A string describing the Python class to load or a Python type. 

137 *args : `tuple` 

138 Positional arguments to use pass to the object constructor. 

139 **kwargs 

140 Keyword arguments to pass to object constructor. 

141 

142 Returns 

143 ------- 

144 instance : `object` 

145 Instance of the requested type, instantiated with the provided 

146 parameters. 

147 

148 Raises 

149 ------ 

150 TypeError 

151 Raised if a module is imported rather than a type. 

152 """ 

153 cls = get_class_of(typeOrName) 

154 return cls(*args, **kwargs) 

155 

156 

157def get_caller_name(stacklevel: int = 2) -> str: 

158 """Get the name of the caller method. 

159 

160 Any item that cannot be determined (or is not relevant, e.g. a free 

161 function has no class) is silently omitted, along with an 

162 associated separator. 

163 

164 Parameters 

165 ---------- 

166 stacklevel : `int` 

167 How many levels of stack to skip while getting caller name; 

168 1 means "who calls me", 2 means "who calls my caller", etc. 

169 

170 Returns 

171 ------- 

172 name : `str` 

173 Name of the caller as a string in the form ``module.class.method``. 

174 An empty string is returned if ``stacklevel`` exceeds the stack height. 

175 

176 Notes 

177 ----- 

178 Adapted from http://stackoverflow.com/a/9812105 

179 by adding support to get the class from ``parentframe.f_locals['cls']`` 

180 """ 

181 stack = inspect.stack() 

182 start = 0 + stacklevel 

183 if len(stack) < start + 1: 

184 return "" 

185 parentframe = stack[start][0] 

186 

187 name = [] 

188 module = inspect.getmodule(parentframe) 

189 if module: 

190 name.append(module.__name__) 

191 # add class name, if any 

192 if "self" in parentframe.f_locals: 

193 name.append(type(parentframe.f_locals["self"]).__name__) 

194 elif "cls" in parentframe.f_locals: 

195 name.append(parentframe.f_locals["cls"].__name__) 

196 codename = parentframe.f_code.co_name 

197 if codename != "<module>": # top level usually 

198 name.append(codename) # function or a method 

199 return ".".join(name) 

200 

201 

202def find_outside_stacklevel( 

203 *module_names: str, 

204 allow_modules: Set[str] = frozenset(), 

205 allow_methods: Set[str] = frozenset(), 

206 stack_info: dict[str, Any] | None = None, 

207) -> int: 

208 """Find the stacklevel for outside of the given module. 

209 

210 This can be used to determine the stacklevel parameter that should be 

211 passed to log messages or warnings in order to make them appear to 

212 come from external code and not this package. 

213 

214 Parameters 

215 ---------- 

216 *module_names : `str` 

217 The names of the modules to skip when calculating the relevant stack 

218 level. 

219 allow_modules : `set` [`str`] 

220 Names that should not be skipped when calculating the stacklevel. 

221 If the module name starts with any of the names in this set the 

222 corresponding stacklevel is used. 

223 allow_methods : `set` [`str`] 

224 Method names that are allowed to be treated as "outside". Fully 

225 qualified method names must match exactly. Method names without 

226 path components will match solely the method name itself. On Python 

227 3.10 fully qualified names are not supported. 

228 stack_info : `dict` or `None`, optional 

229 If given, the dictionary is filled with information from 

230 the relevant stack frame. This can be used to form your own warning 

231 message without having to call :func:`inspect.stack` yourself with 

232 the stack level. 

233 

234 Returns 

235 ------- 

236 stacklevel : `int` 

237 The stacklevel to use matching the first stack frame outside of the 

238 given module. 

239 

240 Examples 

241 -------- 

242 .. code-block:: python 

243 

244 warnings.warn( 

245 "A warning message", stacklevel=find_outside_stacklevel("lsst.daf") 

246 ) 

247 """ 

248 if sys.version_info < (3, 11, 0): 

249 short_names = {m for m in allow_methods if "." not in m} 

250 if len(short_names) != len(allow_methods): 

251 warnings.warn( 

252 "Python 3.10 does not support fully qualified names in allow_methods. Dropping them.", 

253 stacklevel=2, 

254 ) 

255 allow_methods = short_names 

256 

257 need_full_names = any("." in m for m in allow_methods) 

258 

259 if stack_info is not None: 

260 # Ensure it is empty when we start. 

261 stack_info.clear() 

262 

263 stacklevel = -1 

264 for i, s in enumerate(inspect.stack()): 

265 # This function is never going to be the right answer. 

266 if i == 0: 

267 continue 

268 module = inspect.getmodule(s.frame) 

269 if module is None: 

270 continue 

271 

272 if stack_info is not None: 

273 stack_info["filename"] = s.filename 

274 stack_info["lineno"] = s.lineno 

275 stack_info["name"] = s.frame.f_code.co_name 

276 

277 if allow_methods: 

278 code = s.frame.f_code 

279 names = {code.co_name} # The name of the function itself. 

280 if need_full_names: 

281 full_name = f"{module.__name__}.{code.co_qualname}" 

282 names.add(full_name) 

283 if names & allow_methods: 

284 # Method name is allowed so we stop here. 

285 del s 

286 stacklevel = i 

287 break 

288 

289 # Stack frames sometimes hang around so explicitly delete. 

290 del s 

291 

292 if ( 

293 # The module does not match any of the skipped names. 

294 not any(module.__name__.startswith(name) for name in module_names) 

295 # This match is explicitly allowed to be treated as "outside". 

296 or any(module.__name__.startswith(name) for name in allow_modules) 

297 ): 

298 # 0 will be this function. 

299 # 1 will be the caller 

300 # and so does not need adjustment. 

301 stacklevel = i 

302 break 

303 else: 

304 # The top can't be inside the module. 

305 stacklevel = i 

306 

307 return stacklevel 

308 

309 

310def take_object_census() -> collections.Counter[type]: 

311 """Count the number of existing objects, by type. 

312 

313 The census is returned as a `~collections.Counter` object. Expected usage 

314 involves taking the difference with a different `~collections.Counter` and 

315 examining any changes. 

316 

317 Returns 

318 ------- 

319 census : `collections.Counter` [`type`] 

320 The number of objects found of each type. 

321 

322 Notes 

323 ----- 

324 This function counts *all* Python objects in memory. To count only 

325 reachable objects, run `gc.collect` first. 

326 """ 

327 counts: collections.Counter[type] = collections.Counter() 

328 for obj in gc.get_objects(): 

329 counts[type(obj)] += 1 

330 return counts 

331 

332 

333def trace_object_references( 

334 target_class: type, 

335 count: int = 5, 

336 max_level: int = 10, 

337) -> tuple[list[list], bool]: 

338 """Find the chain(s) of references that make(s) objects of a class 

339 reachable. 

340 

341 Parameters 

342 ---------- 

343 target_class : `type` 

344 The class whose objects need to be traced. This is typically a class 

345 that is known to be leaking. 

346 count : `int`, optional 

347 The number of example objects to trace, if that many exist. 

348 max_level : `int`, optional 

349 The number of levels of references to trace. ``max_level=1`` means 

350 finding only objects that directly refer to the examples. 

351 

352 Returns 

353 ------- 

354 traces : `list` [`list`] 

355 A sequence whose first element (index 0) is the set of example objects 

356 of type ``target_class``, whose second element (index 1) is the set of 

357 objects that refer to the examples, and so on. Contains at most 

358 ``max_level + 1`` elements. 

359 trace_complete : `bool` 

360 `True` if the trace for all objects terminated in at most 

361 ``max_level`` references, and `False` if more references exist. 

362 

363 Examples 

364 -------- 

365 An example with two levels of references: 

366 

367 >>> from collections import namedtuple 

368 >>> class Foo: 

369 ... pass 

370 >>> holder = namedtuple("Holder", ["bar", "baz"]) 

371 >>> myholder = holder(bar={"object": Foo()}, baz=42) 

372 >>> # In doctest, the trace extends up to the whole global dict 

373 >>> # if you let it. 

374 >>> trace_object_references(Foo, max_level=2) # doctest: +ELLIPSIS 

375 ... # doctest: +NORMALIZE_WHITESPACE 

376 ([[<lsst.utils.introspection.Foo object at ...>], 

377 [{'object': <lsst.utils.introspection.Foo object at ...>}], 

378 [Holder(bar={'object': <lsst.utils.introspection.Foo object at ...>}, 

379 baz=42)]], False) 

380 """ 

381 

382 def class_filter(o: Any) -> bool: 

383 return isinstance(o, target_class) 

384 

385 # set() would be more appropriate, but objects may not be hashable. 

386 objs = list(itertools.islice(filter(class_filter, gc.get_objects()), count)) 

387 if objs: 

388 return _recurse_trace(objs, remaining=max_level) 

389 else: 

390 return [objs], True 

391 

392 

393def _recurse_trace(objs: list, remaining: int) -> tuple[list[list], bool]: 

394 """Recursively find references to a set of objects. 

395 

396 Parameters 

397 ---------- 

398 objs : `list` 

399 The objects to trace. 

400 remaining : `int` 

401 The number of levels of references to trace. 

402 

403 Returns 

404 ------- 

405 traces : `list` [`list`] 

406 A sequence whose first element (index 0) is ``objs``, whose second 

407 element (index 1) is the set of objects that refer to those, and so on. 

408 Contains at most ``remaining + 1``. 

409 trace_complete : `bool` 

410 `True` if the trace for all objects terminated in at most 

411 ``remaining`` references, and `False` if more references exist. 

412 """ 

413 # Filter out our own references to the objects. This is needed to avoid 

414 # circular recursion. 

415 refs = _get_clean_refs(objs) 

416 

417 if refs: 

418 if remaining > 1: 

419 more_refs, complete = _recurse_trace(refs, remaining=remaining - 1) 

420 more_refs.insert(0, objs) 

421 return more_refs, complete 

422 else: 

423 more_refs = _get_clean_refs(refs) 

424 return [objs, refs], (not more_refs) 

425 else: 

426 return [objs], True 

427 

428 

429def _get_clean_refs(objects: list) -> list: 

430 """Find references to a set of objects, excluding those needed to query 

431 for references. 

432 

433 Parameters 

434 ---------- 

435 objects : `list` 

436 The objects to find references for. 

437 

438 Returns 

439 ------- 

440 refs : `list` 

441 The objects that refer to the elements of ``objects``, not counting 

442 ``objects`` itself. 

443 """ 

444 # Pre-create the tuple so we know its id() and can filter it out. 

445 # This allows for difference in behavior between python 3.12 and 3.13 

446 # when calling gc.get_referrers with multiple arguments. 

447 objects_tuple = tuple(objects) 

448 refs = gc.get_referrers(*objects_tuple) 

449 ids_to_drop = {id(objects), id(objects_tuple)} 

450 refs = [ref for ref in refs if id(ref) not in ids_to_drop] 

451 refs = [ref for ref in refs if not type(ref).__name__.endswith("_iterator")] 

452 return refs