Coverage for python / lsst / daf / butler / registry / interfaces / _obscore.py: 94%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Interfaces for classes that manage obscore table(s) in a `Registry`.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ["ObsCoreTableManager"] 

33 

34from abc import abstractmethod 

35from collections.abc import Iterable, Iterator, Mapping 

36from contextlib import contextmanager 

37from typing import TYPE_CHECKING, Any 

38 

39import sqlalchemy 

40 

41from ._versioning import VersionedExtension, VersionTuple 

42 

43if TYPE_CHECKING: 

44 from lsst.sphgeom import Region 

45 

46 from ..._dataset_ref import DatasetRef 

47 from ...dimensions import DimensionUniverse 

48 from ...queries import QueryFactoryFunction 

49 from ._collections import CollectionRecord 

50 from ._database import Database, StaticTablesContext 

51 from ._datasets import DatasetRecordStorageManager 

52 from ._dimensions import DimensionRecordStorageManager 

53 

54 

55class ObsCoreTableManager(VersionedExtension): 

56 """An interface for populating ObsCore tables(s). 

57 

58 Parameters 

59 ---------- 

60 registry_schema_version : `VersionTuple` or `None`, optional 

61 Version of registry schema. 

62 """ 

63 

64 def __init__(self, *, registry_schema_version: VersionTuple | None = None): 

65 super().__init__(registry_schema_version=registry_schema_version) 

66 

67 @abstractmethod 

68 def clone( 

69 self, 

70 *, 

71 db: Database, 

72 dimensions: DimensionRecordStorageManager, 

73 ) -> ObsCoreTableManager: 

74 """Make an independent copy of this manager instance bound to new 

75 instances of `Database` and other managers. 

76 

77 Parameters 

78 ---------- 

79 db : `Database` 

80 New `Database` object to use when instantiating the manager. 

81 dimensions : `DimensionRecordStorageManager` 

82 New `DimensionRecordStorageManager` object to use when 

83 instantiating the manager. 

84 

85 Returns 

86 ------- 

87 instance : `ObsCoreTableManager` 

88 New manager instance with the same configuration as this instance, 

89 but bound to a new Database object. 

90 """ 

91 raise NotImplementedError() 

92 

93 @classmethod 

94 @abstractmethod 

95 def initialize( 

96 cls, 

97 db: Database, 

98 context: StaticTablesContext, 

99 *, 

100 universe: DimensionUniverse, 

101 config: Mapping, 

102 datasets: type[DatasetRecordStorageManager], 

103 dimensions: DimensionRecordStorageManager, 

104 registry_schema_version: VersionTuple | None = None, 

105 ) -> ObsCoreTableManager: 

106 """Construct an instance of the manager. 

107 

108 Parameters 

109 ---------- 

110 db : `Database` 

111 Interface to the underlying database engine and namespace. 

112 context : `StaticTablesContext` 

113 Context object obtained from `Database.declareStaticTables`; used 

114 to declare any tables that should always be present in a layer 

115 implemented with this manager. 

116 universe : `DimensionUniverse` 

117 All dimensions known to the registry. 

118 config : `dict` [ `str`, `typing.Any` ] 

119 Configuration of the obscore manager. 

120 datasets : `type` 

121 Type of dataset manager. 

122 dimensions : `DimensionRecordStorageManager` 

123 Manager for Registry dimensions. 

124 registry_schema_version : `VersionTuple` or `None` 

125 Schema version of this extension as defined in registry. 

126 

127 Returns 

128 ------- 

129 manager : `ObsCoreTableManager` 

130 An instance of a concrete `ObsCoreTableManager` subclass. 

131 """ 

132 raise NotImplementedError() 

133 

134 @abstractmethod 

135 def set_query_function(self, query_func: QueryFactoryFunction) -> None: 

136 """Set up a function to be used for querying the database. This must 

137 be called before attempting to insert datasets. 

138 

139 Parameters 

140 ---------- 

141 query_func : `QueryFactoryFunction` 

142 Function returning a context manager that sets up a `Query` object 

143 for querying the registry. (That is, a function equivalent to 

144 ``Butler.query()``). 

145 """ 

146 pass 

147 

148 @abstractmethod 

149 def config_json(self) -> str: 

150 """Dump configuration in JSON format. 

151 

152 Returns 

153 ------- 

154 json : `str` 

155 Configuration serialized in JSON format. 

156 """ 

157 raise NotImplementedError() 

158 

159 @abstractmethod 

160 def add_datasets(self, refs: Iterable[DatasetRef]) -> int: 

161 """Possibly add datasets to the obscore table. 

162 

163 This method should be called when new datasets are added to a RUN 

164 collection. 

165 

166 Parameters 

167 ---------- 

168 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

169 Dataset refs to add. Dataset refs have to be completely expanded. 

170 If a record with the same dataset ID is already in obscore table, 

171 the dataset is ignored. 

172 

173 Returns 

174 ------- 

175 count : `int` 

176 Actual number of records inserted into obscore table. 

177 

178 Notes 

179 ----- 

180 Dataset data types and collection names are checked against configured 

181 list of collections and dataset types, non-matching datasets are 

182 ignored and not added to the obscore table. 

183 

184 When configuration parameter ``collection_type`` is not "RUN", this 

185 method should return immediately. 

186 

187 Note that there is no matching method to remove datasets from obscore 

188 table, we assume that removal happens via foreign key constraint to 

189 dataset table with "ON DELETE CASCADE" option. 

190 """ 

191 raise NotImplementedError() 

192 

193 @abstractmethod 

194 def associate(self, refs: Iterable[DatasetRef], collection: CollectionRecord) -> int: 

195 """Possibly add datasets to the obscore table. 

196 

197 This method should be called when existing datasets are associated with 

198 a TAGGED collection. 

199 

200 Parameters 

201 ---------- 

202 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

203 Dataset refs to add. Dataset refs have to be completely expanded. 

204 If a record with the same dataset ID is already in obscore table, 

205 the dataset is ignored. 

206 collection : `CollectionRecord` 

207 Collection record for a TAGGED collection. 

208 

209 Returns 

210 ------- 

211 count : `int` 

212 Actual number of records inserted into obscore table. 

213 

214 Notes 

215 ----- 

216 Dataset data types and collection names are checked against configured 

217 list of collections and dataset types, non-matching datasets are 

218 ignored and not added to the obscore table. 

219 

220 When configuration parameter ``collection_type`` is not "TAGGED", this 

221 method should return immediately. 

222 """ 

223 raise NotImplementedError() 

224 

225 @abstractmethod 

226 def disassociate(self, refs: Iterable[DatasetRef], collection: CollectionRecord) -> int: 

227 """Possibly remove datasets from the obscore table. 

228 

229 This method should be called when datasets are disassociated from a 

230 TAGGED collection. 

231 

232 Parameters 

233 ---------- 

234 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

235 Dataset refs to remove. Dataset refs have to be resolved. 

236 collection : `CollectionRecord` 

237 Collection record for a TAGGED collection. 

238 

239 Returns 

240 ------- 

241 count : `int` 

242 Actual number of records removed from obscore table. 

243 

244 Notes 

245 ----- 

246 Dataset data types and collection names are checked against configured 

247 list of collections and dataset types, non-matching datasets are 

248 ignored and not added to the obscore table. 

249 

250 When configuration parameter ``collection_type`` is not "TAGGED", this 

251 method should return immediately. 

252 """ 

253 raise NotImplementedError() 

254 

255 @abstractmethod 

256 def update_exposure_regions(self, instrument: str, region_data: Iterable[tuple[int, int, Region]]) -> int: 

257 """Update existing exposure records with spatial region data. 

258 

259 Parameters 

260 ---------- 

261 instrument : `str` 

262 Instrument name. 

263 region_data : `~collections.abc.Iterable` [`tuple` [`int`, `int`, \ 

264 `~lsst.sphgeom.Region` ]] 

265 Sequence of tuples, each tuple contains three values - exposure ID, 

266 detector ID, and corresponding region. 

267 

268 Returns 

269 ------- 

270 count : `int` 

271 Actual number of records updated. 

272 

273 Notes 

274 ----- 

275 This method is needed to update obscore records for raw exposures which 

276 are ingested before their corresponding visits are defined. Exposure 

277 records added when visit is already defined will get their regions 

278 from their matching visits automatically. 

279 """ 

280 raise NotImplementedError() 

281 

282 @abstractmethod 

283 @contextmanager 

284 def query( 

285 self, columns: Iterable[str | sqlalchemy.sql.expression.ColumnElement] | None = None, /, **kwargs: Any 

286 ) -> Iterator[sqlalchemy.engine.CursorResult]: 

287 """Run a SELECT query against obscore table and return result rows. 

288 

289 Parameters 

290 ---------- 

291 columns : `~collections.abc.Iterable` [`str`] 

292 Columns to return from query. It is a sequence which can include 

293 column names or any other column elements (e.g. 

294 `sqlalchemy.sql.functions.count` function). 

295 **kwargs 

296 Restriction on values of individual obscore columns. Key is the 

297 column name, value is the required value of the column. Multiple 

298 restrictions are ANDed together. 

299 

300 Returns 

301 ------- 

302 result_context : `sqlalchemy.engine.CursorResult` 

303 Context manager that returns the query result object when entered. 

304 These results are invalidated when the context is exited. 

305 """ 

306 raise NotImplementedError()