Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / summaries.py: 0%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:30 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from .... import ddl 

31 

32__all__ = ("CollectionSummaryManager",) 

33 

34import logging 

35from collections.abc import Callable, Iterable, Mapping 

36from typing import Any, Generic, TypeVar 

37 

38import sqlalchemy 

39 

40from lsst.utils.iteration import chunk_iterable 

41 

42from ...._collection_type import CollectionType 

43from ...._dataset_type import DatasetType 

44from ...._named import NamedKeyDict, NamedKeyMapping 

45from ....dimensions import GovernorDimension, addDimensionForeignKey 

46from ..._caching_context import CachingContext 

47from ..._collection_summary import CollectionSummary 

48from ...interfaces import ( 

49 CollectionManager, 

50 CollectionRecord, 

51 Database, 

52 DimensionRecordStorageManager, 

53 StaticTablesContext, 

54) 

55from ...wildcards import CollectionWildcard 

56 

57_T = TypeVar("_T") 

58 

59 

60_LOG = logging.getLogger(__name__) 

61 

62 

63class CollectionSummaryTables(Generic[_T]): 

64 """Structure that holds the table or table specification objects that 

65 summarize the contents of collections. 

66 

67 Parameters 

68 ---------- 

69 datasetType : _T 

70 Table [specification] that summarizes which dataset types are in each 

71 collection. 

72 dimensions : `NamedKeyMapping` 

73 Mapping of table [specifications] that summarize which governor 

74 dimension values are present in the data IDs of each collection. 

75 """ 

76 

77 def __init__( 

78 self, 

79 datasetType: _T, 

80 dimensions: NamedKeyMapping[GovernorDimension, _T], 

81 ): 

82 self.datasetType = datasetType 

83 self.dimensions = dimensions 

84 

85 @classmethod 

86 def makeTableSpecs( 

87 cls, 

88 collections: CollectionManager, 

89 dimensions: DimensionRecordStorageManager, 

90 ) -> CollectionSummaryTables[ddl.TableSpec]: 

91 """Create specifications for all summary tables. 

92 

93 Parameters 

94 ---------- 

95 collections : `CollectionManager` 

96 Manager object for the collections in this `Registry`. 

97 dimensions : `DimensionRecordStorageManager` 

98 Manager object for the dimensions in this `Registry`. 

99 

100 Returns 

101 ------- 

102 tables : `CollectionSummaryTables` [ `ddl.TableSpec` ] 

103 Structure containing table specifications. 

104 """ 

105 # Spec for collection_summary_dataset_type. 

106 datasetTypeTableSpec = ddl.TableSpec(fields=[]) 

107 collections.addCollectionForeignKey(datasetTypeTableSpec, primaryKey=True, onDelete="CASCADE") 

108 datasetTypeTableSpec.fields.add( 

109 ddl.FieldSpec("dataset_type_id", dtype=sqlalchemy.BigInteger, primaryKey=True) 

110 ) 

111 datasetTypeTableSpec.foreignKeys.append( 

112 ddl.ForeignKeySpec( 

113 "dataset_type", source=("dataset_type_id",), target=("id",), onDelete="CASCADE" 

114 ) 

115 ) 

116 # Specs for collection_summary_<dimension>. 

117 dimensionTableSpecs = NamedKeyDict[GovernorDimension, ddl.TableSpec]() 

118 for dimension in dimensions.universe.governor_dimensions: 

119 tableSpec = ddl.TableSpec(fields=[]) 

120 collections.addCollectionForeignKey(tableSpec, primaryKey=True, onDelete="CASCADE") 

121 addDimensionForeignKey(tableSpec, dimension, primaryKey=True) 

122 dimensionTableSpecs[dimension] = tableSpec 

123 return CollectionSummaryTables( 

124 datasetType=datasetTypeTableSpec, 

125 dimensions=dimensionTableSpecs.freeze(), 

126 ) 

127 

128 

129class CollectionSummaryManager: 

130 """Object manages the summaries of what dataset types and governor 

131 dimension values are present in a collection. 

132 

133 Parameters 

134 ---------- 

135 db : `Database` 

136 Interface to the underlying database engine and namespace. 

137 collections : `CollectionManager` 

138 Manager object for the collections in this `Registry`. 

139 tables : `CollectionSummaryTables` 

140 Struct containing the tables that hold collection summaries. 

141 dataset_type_table : `sqlalchemy.schema.Table` 

142 Table containing dataset type definitions. 

143 caching_context : `CachingContext` 

144 Object controlling caching of information returned by managers. 

145 reversed_renames 

146 Mapping from the new name for a dataset in a configured override to 

147 the original name still used in the database. 

148 """ 

149 

150 def __init__( 

151 self, 

152 db: Database, 

153 *, 

154 collections: CollectionManager, 

155 tables: CollectionSummaryTables[sqlalchemy.schema.Table], 

156 dataset_type_table: sqlalchemy.schema.Table, 

157 caching_context: CachingContext, 

158 reversed_renames: Mapping[str, str], 

159 ): 

160 self._db = db 

161 self._collections = collections 

162 self._collectionKeyName = collections.getCollectionForeignKeyName() 

163 self._tables = tables 

164 self._dataset_type_table = dataset_type_table 

165 self._caching_context = caching_context 

166 self._reversed_renames = reversed_renames 

167 

168 def clone( 

169 self, 

170 *, 

171 db: Database, 

172 collections: CollectionManager, 

173 caching_context: CachingContext, 

174 ) -> CollectionSummaryManager: 

175 """Make an independent copy of this manager instance bound to new 

176 instances of `Database` and other managers. 

177 

178 Parameters 

179 ---------- 

180 db : `Database` 

181 New `Database` object to use when instantiating the manager. 

182 collections : `CollectionManager` 

183 New `CollectionManager` object to use when instantiating the 

184 manager. 

185 caching_context : `CachingContext` 

186 New `CachingContext` object to use when instantiating the manager. 

187 

188 Returns 

189 ------- 

190 instance : `CollectionSummaryManager` 

191 New manager instance with the same configuration as this instance, 

192 but bound to a new Database object. 

193 """ 

194 return CollectionSummaryManager( 

195 db=db, 

196 collections=collections, 

197 tables=self._tables, 

198 dataset_type_table=self._dataset_type_table, 

199 caching_context=caching_context, 

200 reversed_renames=self._reversed_renames, 

201 ) 

202 

203 @classmethod 

204 def initialize( 

205 cls, 

206 db: Database, 

207 context: StaticTablesContext, 

208 *, 

209 collections: CollectionManager, 

210 dimensions: DimensionRecordStorageManager, 

211 dataset_type_table: sqlalchemy.schema.Table, 

212 caching_context: CachingContext, 

213 reversed_renames: dict[str, str], 

214 ) -> CollectionSummaryManager: 

215 """Create all summary tables (or check that they have been created), 

216 returning an object to manage them. 

217 

218 Parameters 

219 ---------- 

220 db : `Database` 

221 Interface to the underlying database engine and namespace. 

222 context : `StaticTablesContext` 

223 Context object obtained from `Database.declareStaticTables`; used 

224 to declare any tables that should always be present. 

225 collections : `CollectionManager` 

226 Manager object for the collections in this `Registry`. 

227 dimensions : `DimensionRecordStorageManager` 

228 Manager object for the dimensions in this `Registry`. 

229 dataset_type_table : `sqlalchemy.schema.Table` 

230 Table containing dataset type definitions. 

231 caching_context : `CachingContext` 

232 Object controlling caching of information returned by managers. 

233 reversed_renames 

234 Mapping from the new name for a dataset in a configured override 

235 to the original name still used in the database. 

236 

237 Returns 

238 ------- 

239 manager : `CollectionSummaryManager` 

240 New manager object for collection summaries. 

241 """ 

242 specs = CollectionSummaryTables.makeTableSpecs(collections, dimensions) 

243 tables = CollectionSummaryTables( 

244 datasetType=context.addTable("collection_summary_dataset_type", specs.datasetType), 

245 dimensions=NamedKeyDict[GovernorDimension, sqlalchemy.schema.Table]( 

246 { 

247 dimension: context.addTable(f"collection_summary_{dimension.name}", spec) 

248 for dimension, spec in specs.dimensions.items() 

249 } 

250 ).freeze(), 

251 ) 

252 return cls( 

253 db=db, 

254 collections=collections, 

255 tables=tables, 

256 dataset_type_table=dataset_type_table, 

257 caching_context=caching_context, 

258 reversed_renames=reversed_renames, 

259 ) 

260 

261 def update( 

262 self, 

263 collection: CollectionRecord, 

264 dataset_type_ids: Iterable[int], 

265 summary: CollectionSummary, 

266 ) -> None: 

267 """Update the summary tables to associate the given collection with 

268 a dataset type and governor dimension values. 

269 

270 Parameters 

271 ---------- 

272 collection : `CollectionRecord` 

273 Collection whose summary should be updated. 

274 dataset_type_ids : `~collections.abc.Iterable` [ `int` ] 

275 Integer IDs for the dataset types to associate with this 

276 collection. 

277 summary : `CollectionSummary` 

278 Summary to store. Dataset types must correspond to 

279 ``dataset_type_ids``. 

280 

281 Notes 

282 ----- 

283 This method should only be called inside the transaction context of 

284 another operation that inserts or associates datasets. 

285 """ 

286 self._db.ensure( 

287 self._tables.datasetType, 

288 *[ 

289 { 

290 "dataset_type_id": dataset_type_id, 

291 self._collectionKeyName: collection.key, 

292 } 

293 for dataset_type_id in sorted(dataset_type_ids) 

294 ], 

295 ) 

296 for dimension in sorted(summary.governors): 

297 if values := summary.governors[dimension]: 

298 self._db.ensure( 

299 self._tables.dimensions[dimension], 

300 *[{self._collectionKeyName: collection.key, dimension: v} for v in sorted(values)], 

301 ) 

302 

303 def fetch_summaries( 

304 self, 

305 collections: Iterable[CollectionRecord], 

306 dataset_type_names: Iterable[str] | None, 

307 dataset_type_factory: Callable[[sqlalchemy.engine.RowMapping], DatasetType], 

308 ) -> Mapping[Any, CollectionSummary]: 

309 """Fetch collection summaries given their names and dataset types. 

310 

311 Parameters 

312 ---------- 

313 collections : `~collections.abc.Iterable` [`CollectionRecord`] 

314 Collection records to query. 

315 dataset_type_names : `~collections.abc.Iterable` [`str`] 

316 Names of dataset types to include into returned summaries. If 

317 `None` then all dataset types will be included. 

318 dataset_type_factory : `~collections.abc.Callable` 

319 Method that takes a table row and make `DatasetType` instance out 

320 of it. 

321 

322 Returns 

323 ------- 

324 summaries : `~collections.abc.Mapping` [`typing.Any`, \ 

325 `CollectionSummary`] 

326 Collection summaries indexed by collection record key. This mapping 

327 will also contain all nested non-chained collections of the chained 

328 collections. 

329 """ 

330 summaries: dict[Any, CollectionSummary] = {} 

331 # Check what we have in cache first. 

332 if self._caching_context.collection_summaries is not None: 

333 summaries, missing_keys = self._caching_context.collection_summaries.find_summaries( 

334 [record.key for record in collections] 

335 ) 

336 if not missing_keys: 

337 return summaries 

338 else: 

339 collections = [record for record in collections if record.key in missing_keys] 

340 

341 # Need to expand all chained collections first. 

342 non_chains: list[CollectionRecord] = [] 

343 chains: dict[CollectionRecord, list[CollectionRecord]] = {} 

344 for collection in collections: 

345 if collection.type is CollectionType.CHAINED: 

346 children = self._collections.resolve_wildcard( 

347 CollectionWildcard.from_names([collection.name]), 

348 flatten_chains=True, 

349 include_chains=False, 

350 ) 

351 non_chains += children 

352 chains[collection] = children 

353 else: 

354 non_chains.append(collection) 

355 

356 _LOG.debug("Fetching summaries for collections %s.", [record.name for record in non_chains]) 

357 

358 # Set up the SQL query we'll use to fetch all of the summary 

359 # information at once. 

360 coll_col = self._tables.datasetType.columns[self._collectionKeyName].label(self._collectionKeyName) 

361 dataset_type_id_col = self._tables.datasetType.columns.dataset_type_id.label("dataset_type_id") 

362 columns = [coll_col, dataset_type_id_col] + list(self._dataset_type_table.columns) 

363 fromClause: sqlalchemy.sql.expression.FromClause = self._tables.datasetType.join( 

364 self._dataset_type_table 

365 ) 

366 for dimension, table in self._tables.dimensions.items(): 

367 columns.append(table.columns[dimension.name].label(dimension.name)) 

368 fromClause = fromClause.join( 

369 table, 

370 onclause=( 

371 self._tables.datasetType.columns[self._collectionKeyName] 

372 == table.columns[self._collectionKeyName] 

373 ), 

374 isouter=True, 

375 ) 

376 

377 sql = sqlalchemy.sql.select(*columns).select_from(fromClause) 

378 sql = sql.where(coll_col.in_([coll.key for coll in non_chains])) 

379 # For caching we need to fetch complete summaries. 

380 if self._caching_context.collection_summaries is None: 

381 if dataset_type_names is not None: 

382 db_dataset_type_names = [ 

383 self._reversed_renames.get(name, name) for name in dataset_type_names 

384 ] 

385 sql = sql.where(self._dataset_type_table.columns["name"].in_(db_dataset_type_names)) 

386 

387 # Run the query and construct CollectionSummary objects from the result 

388 # rows. This will never include CHAINED collections or collections 

389 # with no datasets. 

390 with self._db.query(sql) as sql_result: 

391 sql_rows = sql_result.mappings().fetchall() 

392 dataset_type_ids: dict[int, DatasetType] = {} 

393 for row in sql_rows: 

394 # Collection key should never be None/NULL; it's what we join on. 

395 # Extract that and then turn it into a collection name. 

396 collectionKey = row[self._collectionKeyName] 

397 # dataset_type_id should also never be None/NULL; it's in the first 

398 # table we joined. 

399 dataset_type_id = row["dataset_type_id"] 

400 if (dataset_type := dataset_type_ids.get(dataset_type_id)) is None: 

401 dataset_type_ids[dataset_type_id] = dataset_type = dataset_type_factory(row) 

402 # See if we have a summary already for this collection; if not, 

403 # make one. 

404 summary = summaries.get(collectionKey) 

405 if summary is None: 

406 summary = CollectionSummary() 

407 summaries[collectionKey] = summary 

408 # Update the dimensions with the values in this row that 

409 # aren't None/NULL (many will be in general, because these 

410 # enter the query via LEFT OUTER JOIN). 

411 summary.dataset_types.add(dataset_type) 

412 for dimension in self._tables.dimensions: 

413 value = row[dimension.name] 

414 if value is not None: 

415 summary.governors.setdefault(dimension.name, set()).add(value) 

416 

417 # Add empty summary for any missing collection. 

418 for collection in non_chains: 

419 if collection.key not in summaries: 

420 summaries[collection.key] = CollectionSummary() 

421 

422 # Merge children into their chains summaries. 

423 for chain, children in chains.items(): 

424 summaries[chain.key] = CollectionSummary.union(*(summaries[child.key] for child in children)) 

425 

426 if self._caching_context.collection_summaries is not None: 

427 self._caching_context.collection_summaries.update(summaries) 

428 

429 return summaries 

430 

431 def get_collection_ids(self, dataset_type_id: int) -> Iterable[str] | Iterable[int]: 

432 """Get collection IDs for a given dataset type ID. 

433 

434 Parameters 

435 ---------- 

436 dataset_type_id : `int` 

437 Integer ID for the dataset type. 

438 

439 Returns 

440 ------- 

441 collection_ids : `~collections.abc.Iterable` 

442 Collection IDs (ints or strings) associated with the dataset type. 

443 """ 

444 query = sqlalchemy.select(self._tables.datasetType.columns[self._collectionKeyName]) 

445 query = query.where(self._tables.datasetType.columns.dataset_type_id == dataset_type_id) 

446 with self._db.query(query) as result: 

447 return list(result.scalars()) 

448 

449 def delete_collections(self, dataset_type_id: int, collection_ids: Iterable) -> None: 

450 """Delete collection from summaries for a given dataset type. 

451 

452 Parameters 

453 ---------- 

454 dataset_type_id : `int` 

455 Integer ID for the dataset type. 

456 collection_ids : `~collections.abc.Iterable` 

457 Collection IDs (integer or string) to remove from summaries for 

458 this dataset type. 

459 

460 Notes 

461 ----- 

462 This method should only be called inside the transaction context of 

463 another operation that selects collection information. 

464 """ 

465 for collections_chunk in chunk_iterable(collection_ids, 1000): 

466 to_delete = [ 

467 {"dataset_type_id": dataset_type_id, self._collectionKeyName: collection_id} 

468 for collection_id in collections_chunk 

469 ] 

470 self._db.delete( 

471 self._tables.datasetType, ["dataset_type_id", self._collectionKeyName], *to_delete 

472 )