Coverage for python / lsst / daf / butler / script / removeCollections.py: 32%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ["removeCollections"]
31from collections.abc import Callable
32from dataclasses import dataclass
33from functools import partial
35from astropy.table import Table
37from .._butler import Butler
38from .._collection_type import CollectionType
39from ..registry import MissingCollectionError, OrphanedRecordError
42@dataclass
43class RemoveCollectionResult:
44 """Container to return to the cli command; holds tables describing the
45 collections that will be removed, as well as any found RUN collections
46 which can not be removed by this command. Also holds the callback function
47 to execute the remove upon user confirmation.
48 """
50 # the callback function to do the removal
51 onConfirmation: Callable[[], None]
52 # astropy table describing data that will be removed.
53 removeCollectionsTable: Table
54 # astropy table describing collection chain references that will be
55 # removed.
56 removeChainsTable: Table
57 # astropy table describing any run collections that will NOT be removed.
58 runsTable: Table
61@dataclass
62class CollectionInfo:
63 """Lightweight container to hold the name and type of non-run
64 collections, as well as the names of run collections.
65 """
67 nonRunCollections: Table
68 runCollections: Table
69 parentCollections: dict[str, tuple[str, ...]]
70 """Mapping from child collection name to the list of chained collections
71 names that contain it.
72 """
75def _getCollectionInfo(repo: str, collection: str, include_parents: bool) -> CollectionInfo:
76 """Get the names and types of collections that match the collection
77 string.
79 Parameters
80 ----------
81 repo : `str`
82 The URI to the repository.
83 collection : `str`
84 The collection string to search for. Same as the `expression`
85 argument to `registry.queryCollections`.
86 include_parents : `bool`
87 If `True`, will fetch the list of parent chained collections containing
88 the given collections.
90 Returns
91 -------
92 collectionInfo : `CollectionInfo`
93 Contains tables with run and non-run collection info.
94 """
95 with Butler.from_config(repo, without_datastore=True) as butler:
96 try:
97 collections_info = sorted(
98 butler.collections.query_info(
99 collection, include_chains=True, include_parents=include_parents
100 )
101 )
102 except MissingCollectionError:
103 # Hide the error and act like no collections should be removed.
104 collections_info = []
105 collections = Table(names=("Collection", "Collection Type"), dtype=(str, str))
106 runCollections = Table(names=("Collection",), dtype=(str,))
107 parents: dict[str, tuple[str, ...]] = {}
108 for collection_info in collections_info:
109 if collection_info.type == CollectionType.RUN:
110 runCollections.add_row((collection_info.name,))
111 else:
112 collections.add_row((collection_info.name, collection_info.type.name))
113 if (
114 include_parents
115 and collection_info.parents is not None
116 and len(collection_info.parents) > 0
117 ):
118 parents[collection_info.name] = tuple(collection_info.parents)
120 return CollectionInfo(collections, runCollections, parents)
123def removeCollections(repo: str, collection: str, remove_from_parents: bool) -> RemoveCollectionResult:
124 """Remove collections.
126 Parameters
127 ----------
128 repo : `str`
129 Same as the ``config`` argument to ``Butler.__init__``.
130 collection : `str`
131 Same as the ``name`` argument to ``Registry.removeCollection``.
132 remove_from_parents : `bool`
133 If `True`, will remove the given collections from any chained
134 collections they belong to before removing the collection
135 itself.
137 Returns
138 -------
139 collections : `RemoveCollectionResult`
140 Contains tables describing what will be removed, and
141 run collections that *will not* be removed.
142 """
143 collectionInfo = _getCollectionInfo(repo, collection, remove_from_parents)
145 def _doRemove(collections: Table) -> None:
146 """Perform the prune collection step."""
147 with Butler.from_config(repo, writeable=True, without_datastore=True) as butler:
148 for name in collections["Collection"]:
149 with butler.transaction():
150 for parent in collectionInfo.parentCollections.get(name, []):
151 butler.collections.remove_from_chain(parent, name)
152 try:
153 butler.collections.x_remove(name)
154 except OrphanedRecordError as e:
155 e.add_note(
156 "Add the --remove-from-parents flag to this command"
157 " if you are sure this collection is no longer needed."
158 )
159 raise
161 remove_chains_table = Table(names=("Child Collection", "Parent Collection"), dtype=(str, str))
162 for child in sorted(collectionInfo.parentCollections.keys()):
163 parents = collectionInfo.parentCollections[child]
164 key = child
165 for parent in sorted(parents):
166 remove_chains_table.add_row((key, parent))
167 key = ""
169 result = RemoveCollectionResult(
170 onConfirmation=partial(_doRemove, collectionInfo.nonRunCollections),
171 removeCollectionsTable=collectionInfo.nonRunCollections,
172 runsTable=collectionInfo.runCollections,
173 removeChainsTable=remove_chains_table,
174 )
175 return result