Coverage for python / lsst / pipe / base / script / utils.py: 31%
29 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["filter_by_dataset_type_glob", "filter_by_existence"]
32import re
33from collections.abc import Collection
35from lsst.daf.butler import Butler, DatasetRef
36from lsst.daf.butler.utils import globToRegex
37from lsst.utils.logging import getLogger
38from lsst.utils.timer import time_this
40_LOG = getLogger(__name__)
43def _matches_dataset_type(dataset_type_name: str, regexes: list[str | re.Pattern]) -> bool:
44 for regex in regexes:
45 if isinstance(regex, str):
46 if dataset_type_name == regex:
47 return True
48 elif regex.search(dataset_type_name):
49 return True
50 return False
53def filter_by_dataset_type_glob(
54 refs: Collection[DatasetRef], dataset_types: tuple[str, ...]
55) -> Collection[DatasetRef]:
56 """Filter the refs based on requested dataset types.
58 Parameters
59 ----------
60 refs : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ]
61 Datasets to be filtered.
62 dataset_types : `tuple` [ `str`, ...]
63 Dataset type names or globs to use for filtering. Empty tuple implies
64 no filtering.
66 Returns
67 -------
68 filtered : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ]
69 Filter datasets.
70 """
71 regexes = globToRegex(dataset_types)
72 if regexes is ...:
73 # Nothing to do.
74 return refs
76 return {ref for ref in refs if _matches_dataset_type(ref.datasetType.name, regexes)}
79def filter_by_existence(butler: Butler, refs: Collection[DatasetRef]) -> Collection[DatasetRef]:
80 """Filter out the refs that the butler already knows exist.
82 Parameters
83 ----------
84 butler : `lsst.daf.butler.Butler`
85 Butler in which to check existence of given datarefs.
86 refs : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ]
87 Datasets to be filtered.
89 Returns
90 -------
91 filtered : `collections.abc.Collection` [ `lsst.daf.butler.DatasetRef` ]
92 Filter datasets.
93 """
94 _LOG.verbose("Filtering out datasets already known to the target butler...")
95 with time_this(log=_LOG, msg="Completed checking existence"):
96 existence = butler._datastore.knows_these(refs)
97 filtered = [ref for ref in existence if not existence[ref]]
98 _LOG.verbose(
99 "After filtering out those already in the target butler, number of datasets to transfer: %d",
100 len(filtered),
101 )
103 return filtered