Coverage for python / lsst / daf / butler / registry / queries / _query_data_coordinates.py: 33%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from collections.abc import Iterable, Iterator 

31from contextlib import contextmanager 

32from typing import Any 

33 

34from ..._butler import Butler 

35from ..._dataset_ref import DatasetRef 

36from ..._dataset_type import DatasetType 

37from ..._exceptions import DatasetTypeError 

38from ...dimensions import DataCoordinate, DimensionGroup 

39from ...queries import DataCoordinateQueryResults, Query 

40from ._query_common import CommonQueryArguments, LegacyQueryResultsMixin, resolve_collections 

41from ._query_datasets import QueryDriverDatasetRefQueryResults 

42from ._results import DataCoordinateQueryResults as LegacyDataCoordinateQueryResults 

43from ._results import ParentDatasetQueryResults 

44 

45 

46class QueryDriverDataCoordinateQueryResults( 

47 LegacyQueryResultsMixin[DataCoordinateQueryResults], 

48 LegacyDataCoordinateQueryResults, 

49): 

50 """Implementation of the legacy ``DimensionRecordQueryResults`` interface 

51 using the new query system. 

52 

53 Parameters 

54 ---------- 

55 butler : `Butler` 

56 Butler object used to execute queries. 

57 dimensions : `DimensionGroup` | `None` 

58 Dimensions of the data IDs to yield from the query. 

59 expanded : bool 

60 `True` if the query will also fetch dimension records associated with 

61 the data IDs. 

62 args : `CommonQueryArguments` 

63 User-facing arguments forwarded from 

64 ``registry.queryDimensionRecords``. 

65 """ 

66 

67 def __init__( 

68 self, 

69 butler: Butler, 

70 dimensions: DimensionGroup, 

71 expanded: bool, 

72 args: CommonQueryArguments, 

73 ) -> None: 

74 LegacyQueryResultsMixin.__init__(self, butler, args) 

75 LegacyDataCoordinateQueryResults.__init__(self) 

76 self._dimensions = dimensions 

77 self._expanded = expanded 

78 

79 def _build_result(self, query: Query) -> DataCoordinateQueryResults: 

80 results = query.data_ids(self._dimensions) 

81 if self._expanded: 

82 return results.with_dimension_records() 

83 else: 

84 return results 

85 

86 @property 

87 def dimensions(self) -> DimensionGroup: 

88 return self._dimensions 

89 

90 def hasFull(self) -> bool: 

91 return True 

92 

93 def hasRecords(self) -> bool: 

94 return self._expanded 

95 

96 def __iter__(self) -> Iterator[DataCoordinate]: 

97 with self._build_query() as result: 

98 # We have to eagerly fetch the results to prevent 

99 # leaking the resources associated with QueryDriver. 

100 records = list(result) 

101 return iter(records) 

102 

103 @contextmanager 

104 def materialize(self) -> Iterator[LegacyDataCoordinateQueryResults]: 

105 yield self 

106 

107 def expanded(self) -> LegacyDataCoordinateQueryResults: 

108 return QueryDriverDataCoordinateQueryResults(self._butler, self._dimensions, True, self._args) 

109 

110 def subset( 

111 self, 

112 dimensions: DimensionGroup | Iterable[str] | None = None, 

113 *, 

114 unique: bool = False, 

115 ) -> LegacyDataCoordinateQueryResults: 

116 # 'unique' parameter is intentionally ignored -- all data ID queries 

117 # using the new query system are automatically de-duplicated. 

118 

119 if dimensions is None: 

120 return self 

121 

122 dimensions = self.dimensions.universe.conform(dimensions) 

123 if not dimensions.issubset(self.dimensions): 

124 raise ValueError(f"{dimensions} is not a subset of {self.dimensions}") 

125 return QueryDriverDataCoordinateQueryResults(self._butler, dimensions, self._expanded, self._args) 

126 

127 def findDatasets( 

128 self, 

129 datasetType: DatasetType | str, 

130 collections: Any, 

131 *, 

132 findFirst: bool = True, 

133 components: bool = False, 

134 ) -> ParentDatasetQueryResults: 

135 if components is not False: 

136 raise DatasetTypeError( 

137 "Dataset component queries are no longer supported by Registry. Use " 

138 "DatasetType methods to obtain components from parent dataset types instead." 

139 ) 

140 

141 if not isinstance(datasetType, DatasetType): 

142 datasetType = self._butler.get_dataset_type(datasetType) 

143 

144 doomed_by: list[str] = [] 

145 collections = resolve_collections(self._butler, collections, doomed_by) 

146 args = self._args.replaceCollections(collections) 

147 return QueryDriverDatasetRefQueryResults( 

148 self._butler, 

149 args, 

150 dataset_type=datasetType, 

151 find_first=findFirst, 

152 extra_dimensions=self.dimensions, 

153 doomed_by=doomed_by, 

154 expanded=False, 

155 ) 

156 

157 def findRelatedDatasets( 

158 self, 

159 datasetType: DatasetType | str, 

160 collections: Any, 

161 *, 

162 findFirst: bool = True, 

163 dimensions: DimensionGroup | Iterable[str] | None = None, 

164 ) -> Iterable[tuple[DataCoordinate, DatasetRef]]: 

165 with self._butler.query() as query: 

166 if not isinstance(datasetType, DatasetType): 

167 datasetType = self._butler.get_dataset_type(datasetType) 

168 

169 if dimensions is None: 

170 dimensions = self.dimensions 

171 dimensions = DimensionGroup(self._butler.dimensions, dimensions) 

172 

173 collections = resolve_collections(self._butler, collections, []) 

174 result = query.join_dataset_search(datasetType, collections).general( 

175 dimensions, 

176 dataset_fields={datasetType.name: {"dataset_id", "run"}}, 

177 find_first=findFirst, 

178 ) 

179 result = self._apply_result_modifiers(result) 

180 for row in result.iter_tuples(datasetType): 

181 yield row.data_id.subset(dimensions), row.refs[0]