Coverage for python / lsst / pipe / base / script / utils.py: 31%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["filter_by_dataset_type_glob", "filter_by_existence"] 

31 

32import re 

33from collections.abc import Collection 

34 

35from lsst.daf.butler import Butler, DatasetRef 

36from lsst.daf.butler.utils import globToRegex 

37from lsst.utils.logging import getLogger 

38from lsst.utils.timer import time_this 

39 

40_LOG = getLogger(__name__) 

41 

42 

43def _matches_dataset_type(dataset_type_name: str, regexes: list[str | re.Pattern]) -> bool: 

44 for regex in regexes: 

45 if isinstance(regex, str): 

46 if dataset_type_name == regex: 

47 return True 

48 elif regex.search(dataset_type_name): 

49 return True 

50 return False 

51 

52 

53def filter_by_dataset_type_glob( 

54 refs: Collection[DatasetRef], dataset_types: tuple[str, ...] 

55) -> Collection[DatasetRef]: 

56 """Filter the refs based on requested dataset types. 

57 

58 Parameters 

59 ---------- 

60 refs : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ] 

61 Datasets to be filtered. 

62 dataset_types : `tuple` [ `str`, ...] 

63 Dataset type names or globs to use for filtering. Empty tuple implies 

64 no filtering. 

65 

66 Returns 

67 ------- 

68 filtered : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ] 

69 Filter datasets. 

70 """ 

71 regexes = globToRegex(dataset_types) 

72 if regexes is ...: 

73 # Nothing to do. 

74 return refs 

75 

76 return {ref for ref in refs if _matches_dataset_type(ref.datasetType.name, regexes)} 

77 

78 

79def filter_by_existence(butler: Butler, refs: Collection[DatasetRef]) -> Collection[DatasetRef]: 

80 """Filter out the refs that the butler already knows exist. 

81 

82 Parameters 

83 ---------- 

84 butler : `lsst.daf.butler.Butler` 

85 Butler in which to check existence of given datarefs. 

86 refs : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ] 

87 Datasets to be filtered. 

88 

89 Returns 

90 ------- 

91 filtered : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ] 

92 Filter datasets. 

93 """ 

94 _LOG.verbose("Filtering out datasets already known to the target butler...") 

95 with time_this(log=_LOG, msg="Completed checking existence"): 

96 existence = butler._datastore.knows_these(refs) 

97 filtered = [ref for ref in existence if not existence[ref]] 

98 _LOG.verbose( 

99 "After filtering out those already in the target butler, number of datasets to transfer: %d", 

100 len(filtered), 

101 ) 

102 

103 return filtered