Coverage for python / lsst / daf / butler / script / removeRuns.py: 39%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:55 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ["removeRuns"]
31from collections import defaultdict
32from collections.abc import Callable, Mapping, Sequence
33from dataclasses import dataclass
34from functools import partial
36from .._butler import Butler
37from .._collection_type import CollectionType
38from ..registry import MissingCollectionError
41@dataclass
42class RemoveRun:
43 """Represents a RUN collection to remove."""
45 # the name of the run:
46 name: str
47 # parent CHAINED collections the RUN belongs to:
48 parents: list[str]
51@dataclass
52class RemoveRunsResult:
53 """Container to return to the cli command.
55 Contains the names of runs that will be deleted, and a map of dataset type
56 to how many of that dataset will be deleted. Also contains the callback
57 function to execute the remove upon user confirmation.
58 """
60 # the callback function to do the removal
61 onConfirmation: Callable[[], None]
62 # list of the run collections that will be removed
63 runs: Sequence[RemoveRun]
64 # mapping of dataset type name to how many will be removed.
65 datasets: Mapping[str, int]
68def _getCollectionInfo(
69 repo: str,
70 collection: str,
71) -> tuple[list[RemoveRun], Mapping[str, int]]:
72 """Get the names and types of collections that match the collection
73 string.
75 Parameters
76 ----------
77 repo : `str`
78 The URI to the repository.
79 collection : `str`
80 The collection string to search for. Same as the `expression`
81 argument to `registry.queryCollections`.
83 Returns
84 -------
85 runs : `list` of `RemoveRun`
86 Describes the runs that will be removed.
87 datasets : `dict` [`str`, `int`]
88 The dataset types and and how many will be removed.
89 """
90 with Butler.from_config(repo) as butler, butler.registry.caching_context():
91 try:
92 collections = butler.collections.query_info(
93 collection,
94 CollectionType.RUN,
95 include_chains=False,
96 include_parents=True,
97 include_summary=True,
98 )
99 except MissingCollectionError:
100 # Act as if no collections matched.
101 collections = []
102 dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(...)]
103 dataset_types = list(butler.collections._filter_dataset_types(dataset_types, collections))
105 runs = []
106 datasets: dict[str, int] = defaultdict(int)
107 for collection_info in collections:
108 assert collection_info.type == CollectionType.RUN and collection_info.parents is not None
109 runs.append(RemoveRun(collection_info.name, list(collection_info.parents)))
110 with butler.query() as query:
111 for dt in dataset_types:
112 results = query.datasets(dt, collections=collection_info.name)
113 count = results.count(exact=False)
114 if count:
115 datasets[dt] += count
117 return runs, {k: datasets[k] for k in sorted(datasets.keys())}
120def removeRuns(
121 repo: str,
122 collection: str,
123) -> RemoveRunsResult:
124 """Remove collections.
126 Parameters
127 ----------
128 repo : `str`
129 Same as the ``config`` argument to ``Butler.__init__``.
130 collection : `str`
131 Same as the ``name`` argument to ``Butler.removeRuns``.
133 Returns
134 -------
135 collections : `RemoveRunsResult`
136 Contains information describing what will be removed.
137 """
138 runs, datasets = _getCollectionInfo(repo, collection)
140 def _doRemove(runs: Sequence[RemoveRun]) -> None:
141 """Perform the remove step."""
142 with Butler.from_config(repo, writeable=True) as butler:
143 butler.removeRuns([r.name for r in runs], unlink_from_chains=True)
145 result = RemoveRunsResult(
146 onConfirmation=partial(_doRemove, runs),
147 runs=runs,
148 datasets=datasets,
149 )
150 return result