Coverage for python / lsst / daf / butler / direct_butler / _direct_butler_collections.py: 24%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DirectButlerCollections",) 

31 

32from collections.abc import Iterable, Mapping, Sequence, Set 

33from typing import TYPE_CHECKING, Any 

34 

35import sqlalchemy 

36 

37from lsst.utils.iteration import ensure_iterable 

38 

39from .._butler_collections import ButlerCollections, CollectionInfo 

40from .._collection_type import CollectionType 

41from ..registry._exceptions import OrphanedRecordError 

42from ..registry.interfaces import ChainedCollectionRecord 

43from ..registry.sql_registry import SqlRegistry 

44from ..registry.wildcards import CollectionWildcard 

45 

46if TYPE_CHECKING: 

47 from .._dataset_type import DatasetType 

48 from ..registry._collection_summary import CollectionSummary 

49 

50 

51class DirectButlerCollections(ButlerCollections): 

52 """Implementation of ButlerCollections for DirectButler. 

53 

54 Parameters 

55 ---------- 

56 registry : `~lsst.daf.butler.registry.sql_registry.SqlRegistry` 

57 Registry object used to work with the collections database. 

58 """ 

59 

60 def __init__(self, registry: SqlRegistry): 

61 self._registry = registry 

62 

63 @property 

64 def defaults(self) -> Sequence[str]: 

65 return self._registry.defaults.collections 

66 

67 def extend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None: 

68 return self._registry._managers.collections.extend_collection_chain( 

69 parent_collection_name, list(ensure_iterable(child_collection_names)) 

70 ) 

71 

72 def prepend_chain(self, parent_collection_name: str, child_collection_names: str | Iterable[str]) -> None: 

73 return self._registry._managers.collections.prepend_collection_chain( 

74 parent_collection_name, list(ensure_iterable(child_collection_names)) 

75 ) 

76 

77 def redefine_chain( 

78 self, parent_collection_name: str, child_collection_names: str | Iterable[str] 

79 ) -> None: 

80 self._registry._managers.collections.update_chain( 

81 parent_collection_name, list(ensure_iterable(child_collection_names)) 

82 ) 

83 

84 def remove_from_chain( 

85 self, parent_collection_name: str, child_collection_names: str | Iterable[str] 

86 ) -> None: 

87 return self._registry._managers.collections.remove_from_collection_chain( 

88 parent_collection_name, list(ensure_iterable(child_collection_names)) 

89 ) 

90 

91 def query( 

92 self, 

93 expression: str | Iterable[str], 

94 collection_types: Set[CollectionType] | CollectionType | None = None, 

95 flatten_chains: bool = False, 

96 include_chains: bool | None = None, 

97 ) -> Sequence[str]: 

98 if collection_types is None: 

99 collection_types = CollectionType.all() 

100 # Do not use base implementation for now to avoid the additional 

101 # unused queries. 

102 return self._registry.queryCollections( 

103 expression, 

104 collectionTypes=collection_types, 

105 flattenChains=flatten_chains, 

106 includeChains=include_chains, 

107 ) 

108 

109 def query_info( 

110 self, 

111 expression: str | Iterable[str], 

112 collection_types: Set[CollectionType] | CollectionType | None = None, 

113 flatten_chains: bool = False, 

114 include_chains: bool | None = None, 

115 include_parents: bool = False, 

116 include_summary: bool = False, 

117 include_doc: bool = False, 

118 summary_datasets: Iterable[DatasetType] | Iterable[str] | None = None, 

119 ) -> Sequence[CollectionInfo]: 

120 info = [] 

121 if collection_types is None: 

122 collection_types = CollectionType.all() 

123 elif isinstance(collection_types, CollectionType): 

124 collection_types = {collection_types} 

125 

126 records = self._registry._managers.collections.resolve_wildcard( 

127 CollectionWildcard.from_expression(expression), 

128 collection_types=collection_types, 

129 flatten_chains=flatten_chains, 

130 include_chains=include_chains, 

131 ) 

132 

133 summaries: Mapping[Any, CollectionSummary] = {} 

134 if include_summary: 

135 summaries = self._registry._managers.datasets.fetch_summaries(records, summary_datasets) 

136 

137 docs: Mapping[Any, str] = {} 

138 if include_doc: 

139 docs = self._registry._managers.collections.get_docs(record.key for record in records) 

140 

141 for record in records: 

142 doc = docs.get(record.key, "") 

143 children: tuple[str, ...] = tuple() 

144 if record.type == CollectionType.CHAINED: 

145 assert isinstance(record, ChainedCollectionRecord) 

146 children = tuple(record.children) 

147 parents: frozenset[str] | None = None 

148 if include_parents: 

149 # TODO: This is non-vectorized, so expensive to do in a 

150 # loop. 

151 parents = frozenset(self._registry.getCollectionParentChains(record.name)) 

152 dataset_types: Set[str] | None = None 

153 if summary := summaries.get(record.key): 

154 dataset_types = frozenset([dt.name for dt in summary.dataset_types]) 

155 

156 info.append( 

157 CollectionInfo( 

158 name=record.name, 

159 type=record.type, 

160 doc=doc, 

161 parents=parents, 

162 children=children, 

163 dataset_types=dataset_types, 

164 ) 

165 ) 

166 

167 return info 

168 

169 def get_info( 

170 self, name: str, include_parents: bool = False, include_summary: bool = False 

171 ) -> CollectionInfo: 

172 record = self._registry.get_collection_record(name) 

173 doc = self._registry.getCollectionDocumentation(name) or "" 

174 children: tuple[str, ...] = tuple() 

175 if record.type == CollectionType.CHAINED: 

176 assert isinstance(record, ChainedCollectionRecord) 

177 children = tuple(record.children) 

178 parents: set[str] | None = None 

179 if include_parents: 

180 parents = self._registry.getCollectionParentChains(name) 

181 dataset_types: Set[str] | None = None 

182 if include_summary: 

183 summary = self._registry.getCollectionSummary(name) 

184 dataset_types = frozenset([dt.name for dt in summary.dataset_types]) 

185 

186 return CollectionInfo( 

187 name=name, 

188 type=record.type, 

189 doc=doc, 

190 parents=parents, 

191 children=children, 

192 dataset_types=dataset_types, 

193 ) 

194 

195 def register(self, name: str, type: CollectionType = CollectionType.RUN, doc: str | None = None) -> bool: 

196 return self._registry.registerCollection(name, type, doc) 

197 

198 def x_remove(self, name: str) -> None: 

199 try: 

200 self._registry.removeCollection(name) 

201 except sqlalchemy.exc.IntegrityError as e: 

202 raise OrphanedRecordError(f"Datasets in run {name} are still referenced elsewhere.") from e