Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / summaries.py: 0%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from .... import ddl 

31 

32__all__ = ("CollectionSummaryManager",) 

33 

34import logging 

35from collections.abc import Callable, Iterable, Mapping 

36from typing import Any, Generic, TypeVar 

37 

38import sqlalchemy 

39 

40from lsst.utils.iteration import chunk_iterable 

41 

42from ...._collection_type import CollectionType 

43from ...._dataset_type import DatasetType 

44from ...._named import NamedKeyDict, NamedKeyMapping 

45from ....dimensions import GovernorDimension, addDimensionForeignKey 

46from ..._caching_context import CachingContext 

47from ..._collection_summary import CollectionSummary 

48from ...interfaces import ( 

49 CollectionManager, 

50 CollectionRecord, 

51 Database, 

52 DimensionRecordStorageManager, 

53 StaticTablesContext, 

54) 

55from ...wildcards import CollectionWildcard 

56 

57_T = TypeVar("_T") 

58 

59 

60_LOG = logging.getLogger(__name__) 

61 

62 

63class CollectionSummaryTables(Generic[_T]): 

64 """Structure that holds the table or table specification objects that 

65 summarize the contents of collections. 

66 

67 Parameters 

68 ---------- 

69 datasetType : _T 

70 Table [specification] that summarizes which dataset types are in each 

71 collection. 

72 dimensions : `NamedKeyMapping` 

73 Mapping of table [specifications] that summarize which governor 

74 dimension values are present in the data IDs of each collection. 

75 """ 

76 

77 def __init__( 

78 self, 

79 datasetType: _T, 

80 dimensions: NamedKeyMapping[GovernorDimension, _T], 

81 ): 

82 self.datasetType = datasetType 

83 self.dimensions = dimensions 

84 

85 @classmethod 

86 def makeTableSpecs( 

87 cls, 

88 collections: CollectionManager, 

89 dimensions: DimensionRecordStorageManager, 

90 ) -> CollectionSummaryTables[ddl.TableSpec]: 

91 """Create specifications for all summary tables. 

92 

93 Parameters 

94 ---------- 

95 collections : `CollectionManager` 

96 Manager object for the collections in this `Registry`. 

97 dimensions : `DimensionRecordStorageManager` 

98 Manager object for the dimensions in this `Registry`. 

99 

100 Returns 

101 ------- 

102 tables : `CollectionSummaryTables` [ `ddl.TableSpec` ] 

103 Structure containing table specifications. 

104 """ 

105 # Spec for collection_summary_dataset_type. 

106 datasetTypeTableSpec = ddl.TableSpec(fields=[]) 

107 collections.addCollectionForeignKey(datasetTypeTableSpec, primaryKey=True, onDelete="CASCADE") 

108 datasetTypeTableSpec.fields.add( 

109 ddl.FieldSpec("dataset_type_id", dtype=sqlalchemy.BigInteger, primaryKey=True) 

110 ) 

111 datasetTypeTableSpec.foreignKeys.append( 

112 ddl.ForeignKeySpec( 

113 "dataset_type", source=("dataset_type_id",), target=("id",), onDelete="CASCADE" 

114 ) 

115 ) 

116 # Specs for collection_summary_<dimension>. 

117 dimensionTableSpecs = NamedKeyDict[GovernorDimension, ddl.TableSpec]() 

118 for dimension in dimensions.universe.governor_dimensions: 

119 tableSpec = ddl.TableSpec(fields=[]) 

120 collections.addCollectionForeignKey(tableSpec, primaryKey=True, onDelete="CASCADE") 

121 addDimensionForeignKey(tableSpec, dimension, primaryKey=True) 

122 dimensionTableSpecs[dimension] = tableSpec 

123 return CollectionSummaryTables( 

124 datasetType=datasetTypeTableSpec, 

125 dimensions=dimensionTableSpecs.freeze(), 

126 ) 

127 

128 

129class CollectionSummaryManager: 

130 """Object manages the summaries of what dataset types and governor 

131 dimension values are present in a collection. 

132 

133 Parameters 

134 ---------- 

135 db : `Database` 

136 Interface to the underlying database engine and namespace. 

137 collections : `CollectionManager` 

138 Manager object for the collections in this `Registry`. 

139 tables : `CollectionSummaryTables` 

140 Struct containing the tables that hold collection summaries. 

141 dataset_type_table : `sqlalchemy.schema.Table` 

142 Table containing dataset type definitions. 

143 caching_context : `CachingContext` 

144 Object controlling caching of information returned by managers. 

145 """ 

146 

147 def __init__( 

148 self, 

149 db: Database, 

150 *, 

151 collections: CollectionManager, 

152 tables: CollectionSummaryTables[sqlalchemy.schema.Table], 

153 dataset_type_table: sqlalchemy.schema.Table, 

154 caching_context: CachingContext, 

155 ): 

156 self._db = db 

157 self._collections = collections 

158 self._collectionKeyName = collections.getCollectionForeignKeyName() 

159 self._tables = tables 

160 self._dataset_type_table = dataset_type_table 

161 self._caching_context = caching_context 

162 

163 def clone( 

164 self, 

165 *, 

166 db: Database, 

167 collections: CollectionManager, 

168 caching_context: CachingContext, 

169 ) -> CollectionSummaryManager: 

170 """Make an independent copy of this manager instance bound to new 

171 instances of `Database` and other managers. 

172 

173 Parameters 

174 ---------- 

175 db : `Database` 

176 New `Database` object to use when instantiating the manager. 

177 collections : `CollectionManager` 

178 New `CollectionManager` object to use when instantiating the 

179 manager. 

180 caching_context : `CachingContext` 

181 New `CachingContext` object to use when instantiating the manager. 

182 

183 Returns 

184 ------- 

185 instance : `CollectionSummaryManager` 

186 New manager instance with the same configuration as this instance, 

187 but bound to a new Database object. 

188 """ 

189 return CollectionSummaryManager( 

190 db=db, 

191 collections=collections, 

192 tables=self._tables, 

193 dataset_type_table=self._dataset_type_table, 

194 caching_context=caching_context, 

195 ) 

196 

197 @classmethod 

198 def initialize( 

199 cls, 

200 db: Database, 

201 context: StaticTablesContext, 

202 *, 

203 collections: CollectionManager, 

204 dimensions: DimensionRecordStorageManager, 

205 dataset_type_table: sqlalchemy.schema.Table, 

206 caching_context: CachingContext, 

207 ) -> CollectionSummaryManager: 

208 """Create all summary tables (or check that they have been created), 

209 returning an object to manage them. 

210 

211 Parameters 

212 ---------- 

213 db : `Database` 

214 Interface to the underlying database engine and namespace. 

215 context : `StaticTablesContext` 

216 Context object obtained from `Database.declareStaticTables`; used 

217 to declare any tables that should always be present. 

218 collections : `CollectionManager` 

219 Manager object for the collections in this `Registry`. 

220 dimensions : `DimensionRecordStorageManager` 

221 Manager object for the dimensions in this `Registry`. 

222 dataset_type_table : `sqlalchemy.schema.Table` 

223 Table containing dataset type definitions. 

224 caching_context : `CachingContext` 

225 Object controlling caching of information returned by managers. 

226 

227 Returns 

228 ------- 

229 manager : `CollectionSummaryManager` 

230 New manager object for collection summaries. 

231 """ 

232 specs = CollectionSummaryTables.makeTableSpecs(collections, dimensions) 

233 tables = CollectionSummaryTables( 

234 datasetType=context.addTable("collection_summary_dataset_type", specs.datasetType), 

235 dimensions=NamedKeyDict[GovernorDimension, sqlalchemy.schema.Table]( 

236 { 

237 dimension: context.addTable(f"collection_summary_{dimension.name}", spec) 

238 for dimension, spec in specs.dimensions.items() 

239 } 

240 ).freeze(), 

241 ) 

242 return cls( 

243 db=db, 

244 collections=collections, 

245 tables=tables, 

246 dataset_type_table=dataset_type_table, 

247 caching_context=caching_context, 

248 ) 

249 

250 def update( 

251 self, 

252 collection: CollectionRecord, 

253 dataset_type_ids: Iterable[int], 

254 summary: CollectionSummary, 

255 ) -> None: 

256 """Update the summary tables to associate the given collection with 

257 a dataset type and governor dimension values. 

258 

259 Parameters 

260 ---------- 

261 collection : `CollectionRecord` 

262 Collection whose summary should be updated. 

263 dataset_type_ids : `~collections.abc.Iterable` [ `int` ] 

264 Integer IDs for the dataset types to associate with this 

265 collection. 

266 summary : `CollectionSummary` 

267 Summary to store. Dataset types must correspond to 

268 ``dataset_type_ids``. 

269 

270 Notes 

271 ----- 

272 This method should only be called inside the transaction context of 

273 another operation that inserts or associates datasets. 

274 """ 

275 self._db.ensure( 

276 self._tables.datasetType, 

277 *[ 

278 { 

279 "dataset_type_id": dataset_type_id, 

280 self._collectionKeyName: collection.key, 

281 } 

282 for dataset_type_id in sorted(dataset_type_ids) 

283 ], 

284 ) 

285 for dimension in sorted(summary.governors): 

286 if values := summary.governors[dimension]: 

287 self._db.ensure( 

288 self._tables.dimensions[dimension], 

289 *[{self._collectionKeyName: collection.key, dimension: v} for v in sorted(values)], 

290 ) 

291 

292 def fetch_summaries( 

293 self, 

294 collections: Iterable[CollectionRecord], 

295 dataset_type_names: Iterable[str] | None, 

296 dataset_type_factory: Callable[[sqlalchemy.engine.RowMapping], DatasetType], 

297 ) -> Mapping[Any, CollectionSummary]: 

298 """Fetch collection summaries given their names and dataset types. 

299 

300 Parameters 

301 ---------- 

302 collections : `~collections.abc.Iterable` [`CollectionRecord`] 

303 Collection records to query. 

304 dataset_type_names : `~collections.abc.Iterable` [`str`] 

305 Names of dataset types to include into returned summaries. If 

306 `None` then all dataset types will be included. 

307 dataset_type_factory : `~collections.abc.Callable` 

308 Method that takes a table row and make `DatasetType` instance out 

309 of it. 

310 

311 Returns 

312 ------- 

313 summaries : `~collections.abc.Mapping` [`typing.Any`, \ 

314 `CollectionSummary`] 

315 Collection summaries indexed by collection record key. This mapping 

316 will also contain all nested non-chained collections of the chained 

317 collections. 

318 """ 

319 summaries: dict[Any, CollectionSummary] = {} 

320 # Check what we have in cache first. 

321 if self._caching_context.collection_summaries is not None: 

322 summaries, missing_keys = self._caching_context.collection_summaries.find_summaries( 

323 [record.key for record in collections] 

324 ) 

325 if not missing_keys: 

326 return summaries 

327 else: 

328 collections = [record for record in collections if record.key in missing_keys] 

329 

330 # Need to expand all chained collections first. 

331 non_chains: list[CollectionRecord] = [] 

332 chains: dict[CollectionRecord, list[CollectionRecord]] = {} 

333 for collection in collections: 

334 if collection.type is CollectionType.CHAINED: 

335 children = self._collections.resolve_wildcard( 

336 CollectionWildcard.from_names([collection.name]), 

337 flatten_chains=True, 

338 include_chains=False, 

339 ) 

340 non_chains += children 

341 chains[collection] = children 

342 else: 

343 non_chains.append(collection) 

344 

345 _LOG.debug("Fetching summaries for collections %s.", [record.name for record in non_chains]) 

346 

347 # Set up the SQL query we'll use to fetch all of the summary 

348 # information at once. 

349 coll_col = self._tables.datasetType.columns[self._collectionKeyName].label(self._collectionKeyName) 

350 dataset_type_id_col = self._tables.datasetType.columns.dataset_type_id.label("dataset_type_id") 

351 columns = [coll_col, dataset_type_id_col] + list(self._dataset_type_table.columns) 

352 fromClause: sqlalchemy.sql.expression.FromClause = self._tables.datasetType.join( 

353 self._dataset_type_table 

354 ) 

355 for dimension, table in self._tables.dimensions.items(): 

356 columns.append(table.columns[dimension.name].label(dimension.name)) 

357 fromClause = fromClause.join( 

358 table, 

359 onclause=( 

360 self._tables.datasetType.columns[self._collectionKeyName] 

361 == table.columns[self._collectionKeyName] 

362 ), 

363 isouter=True, 

364 ) 

365 

366 sql = sqlalchemy.sql.select(*columns).select_from(fromClause) 

367 sql = sql.where(coll_col.in_([coll.key for coll in non_chains])) 

368 # For caching we need to fetch complete summaries. 

369 if self._caching_context.collection_summaries is None: 

370 if dataset_type_names is not None: 

371 sql = sql.where(self._dataset_type_table.columns["name"].in_(dataset_type_names)) 

372 

373 # Run the query and construct CollectionSummary objects from the result 

374 # rows. This will never include CHAINED collections or collections 

375 # with no datasets. 

376 with self._db.query(sql) as sql_result: 

377 sql_rows = sql_result.mappings().fetchall() 

378 dataset_type_ids: dict[int, DatasetType] = {} 

379 for row in sql_rows: 

380 # Collection key should never be None/NULL; it's what we join on. 

381 # Extract that and then turn it into a collection name. 

382 collectionKey = row[self._collectionKeyName] 

383 # dataset_type_id should also never be None/NULL; it's in the first 

384 # table we joined. 

385 dataset_type_id = row["dataset_type_id"] 

386 if (dataset_type := dataset_type_ids.get(dataset_type_id)) is None: 

387 dataset_type_ids[dataset_type_id] = dataset_type = dataset_type_factory(row) 

388 # See if we have a summary already for this collection; if not, 

389 # make one. 

390 summary = summaries.get(collectionKey) 

391 if summary is None: 

392 summary = CollectionSummary() 

393 summaries[collectionKey] = summary 

394 # Update the dimensions with the values in this row that 

395 # aren't None/NULL (many will be in general, because these 

396 # enter the query via LEFT OUTER JOIN). 

397 summary.dataset_types.add(dataset_type) 

398 for dimension in self._tables.dimensions: 

399 value = row[dimension.name] 

400 if value is not None: 

401 summary.governors.setdefault(dimension.name, set()).add(value) 

402 

403 # Add empty summary for any missing collection. 

404 for collection in non_chains: 

405 if collection.key not in summaries: 

406 summaries[collection.key] = CollectionSummary() 

407 

408 # Merge children into their chains summaries. 

409 for chain, children in chains.items(): 

410 summaries[chain.key] = CollectionSummary.union(*(summaries[child.key] for child in children)) 

411 

412 if self._caching_context.collection_summaries is not None: 

413 self._caching_context.collection_summaries.update(summaries) 

414 

415 return summaries 

416 

417 def get_collection_ids(self, dataset_type_id: int) -> Iterable[str] | Iterable[int]: 

418 """Get collection IDs for a given dataset type ID. 

419 

420 Parameters 

421 ---------- 

422 dataset_type_id : `int` 

423 Integer ID for the dataset type. 

424 

425 Returns 

426 ------- 

427 collection_ids : `~collections.abc.Iterable` 

428 Collection IDs (ints or strings) associated with the dataset type. 

429 """ 

430 query = sqlalchemy.select(self._tables.datasetType.columns[self._collectionKeyName]) 

431 query = query.where(self._tables.datasetType.columns.dataset_type_id == dataset_type_id) 

432 with self._db.query(query) as result: 

433 return list(result.scalars()) 

434 

435 def delete_collections(self, dataset_type_id: int, collection_ids: Iterable) -> None: 

436 """Delete collection from summaries for a given dataset type. 

437 

438 Parameters 

439 ---------- 

440 dataset_type_id : `int` 

441 Integer ID for the dataset type. 

442 collection_ids : `~collections.abc.Iterable` 

443 Collection IDs (integer or string) to remove from summaries for 

444 this dataset type. 

445 

446 Notes 

447 ----- 

448 This method should only be called inside the transaction context of 

449 another operation that selects collection information. 

450 """ 

451 for collections_chunk in chunk_iterable(collection_ids, 1000): 

452 to_delete = [ 

453 {"dataset_type_id": dataset_type_id, self._collectionKeyName: collection_id} 

454 for collection_id in collections_chunk 

455 ] 

456 self._db.delete( 

457 self._tables.datasetType, ["dataset_type_id", self._collectionKeyName], *to_delete 

458 )