Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / summaries.py: 0%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from .... import ddl
32__all__ = ("CollectionSummaryManager",)
34import logging
35from collections.abc import Callable, Iterable, Mapping
36from typing import Any, Generic, TypeVar
38import sqlalchemy
40from lsst.utils.iteration import chunk_iterable
42from ...._collection_type import CollectionType
43from ...._dataset_type import DatasetType
44from ...._named import NamedKeyDict, NamedKeyMapping
45from ....dimensions import GovernorDimension, addDimensionForeignKey
46from ..._caching_context import CachingContext
47from ..._collection_summary import CollectionSummary
48from ...interfaces import (
49 CollectionManager,
50 CollectionRecord,
51 Database,
52 DimensionRecordStorageManager,
53 StaticTablesContext,
54)
55from ...wildcards import CollectionWildcard
57_T = TypeVar("_T")
60_LOG = logging.getLogger(__name__)
63class CollectionSummaryTables(Generic[_T]):
64 """Structure that holds the table or table specification objects that
65 summarize the contents of collections.
67 Parameters
68 ----------
69 datasetType : _T
70 Table [specification] that summarizes which dataset types are in each
71 collection.
72 dimensions : `NamedKeyMapping`
73 Mapping of table [specifications] that summarize which governor
74 dimension values are present in the data IDs of each collection.
75 """
77 def __init__(
78 self,
79 datasetType: _T,
80 dimensions: NamedKeyMapping[GovernorDimension, _T],
81 ):
82 self.datasetType = datasetType
83 self.dimensions = dimensions
85 @classmethod
86 def makeTableSpecs(
87 cls,
88 collections: CollectionManager,
89 dimensions: DimensionRecordStorageManager,
90 ) -> CollectionSummaryTables[ddl.TableSpec]:
91 """Create specifications for all summary tables.
93 Parameters
94 ----------
95 collections : `CollectionManager`
96 Manager object for the collections in this `Registry`.
97 dimensions : `DimensionRecordStorageManager`
98 Manager object for the dimensions in this `Registry`.
100 Returns
101 -------
102 tables : `CollectionSummaryTables` [ `ddl.TableSpec` ]
103 Structure containing table specifications.
104 """
105 # Spec for collection_summary_dataset_type.
106 datasetTypeTableSpec = ddl.TableSpec(fields=[])
107 collections.addCollectionForeignKey(datasetTypeTableSpec, primaryKey=True, onDelete="CASCADE")
108 datasetTypeTableSpec.fields.add(
109 ddl.FieldSpec("dataset_type_id", dtype=sqlalchemy.BigInteger, primaryKey=True)
110 )
111 datasetTypeTableSpec.foreignKeys.append(
112 ddl.ForeignKeySpec(
113 "dataset_type", source=("dataset_type_id",), target=("id",), onDelete="CASCADE"
114 )
115 )
116 # Specs for collection_summary_<dimension>.
117 dimensionTableSpecs = NamedKeyDict[GovernorDimension, ddl.TableSpec]()
118 for dimension in dimensions.universe.governor_dimensions:
119 tableSpec = ddl.TableSpec(fields=[])
120 collections.addCollectionForeignKey(tableSpec, primaryKey=True, onDelete="CASCADE")
121 addDimensionForeignKey(tableSpec, dimension, primaryKey=True)
122 dimensionTableSpecs[dimension] = tableSpec
123 return CollectionSummaryTables(
124 datasetType=datasetTypeTableSpec,
125 dimensions=dimensionTableSpecs.freeze(),
126 )
129class CollectionSummaryManager:
130 """Object manages the summaries of what dataset types and governor
131 dimension values are present in a collection.
133 Parameters
134 ----------
135 db : `Database`
136 Interface to the underlying database engine and namespace.
137 collections : `CollectionManager`
138 Manager object for the collections in this `Registry`.
139 tables : `CollectionSummaryTables`
140 Struct containing the tables that hold collection summaries.
141 dataset_type_table : `sqlalchemy.schema.Table`
142 Table containing dataset type definitions.
143 caching_context : `CachingContext`
144 Object controlling caching of information returned by managers.
145 reversed_renames
146 Mapping from the new name for a dataset in a configured override to
147 the original name still used in the database.
148 """
150 def __init__(
151 self,
152 db: Database,
153 *,
154 collections: CollectionManager,
155 tables: CollectionSummaryTables[sqlalchemy.schema.Table],
156 dataset_type_table: sqlalchemy.schema.Table,
157 caching_context: CachingContext,
158 reversed_renames: Mapping[str, str],
159 ):
160 self._db = db
161 self._collections = collections
162 self._collectionKeyName = collections.getCollectionForeignKeyName()
163 self._tables = tables
164 self._dataset_type_table = dataset_type_table
165 self._caching_context = caching_context
166 self._reversed_renames = reversed_renames
168 def clone(
169 self,
170 *,
171 db: Database,
172 collections: CollectionManager,
173 caching_context: CachingContext,
174 ) -> CollectionSummaryManager:
175 """Make an independent copy of this manager instance bound to new
176 instances of `Database` and other managers.
178 Parameters
179 ----------
180 db : `Database`
181 New `Database` object to use when instantiating the manager.
182 collections : `CollectionManager`
183 New `CollectionManager` object to use when instantiating the
184 manager.
185 caching_context : `CachingContext`
186 New `CachingContext` object to use when instantiating the manager.
188 Returns
189 -------
190 instance : `CollectionSummaryManager`
191 New manager instance with the same configuration as this instance,
192 but bound to a new Database object.
193 """
194 return CollectionSummaryManager(
195 db=db,
196 collections=collections,
197 tables=self._tables,
198 dataset_type_table=self._dataset_type_table,
199 caching_context=caching_context,
200 reversed_renames=self._reversed_renames,
201 )
203 @classmethod
204 def initialize(
205 cls,
206 db: Database,
207 context: StaticTablesContext,
208 *,
209 collections: CollectionManager,
210 dimensions: DimensionRecordStorageManager,
211 dataset_type_table: sqlalchemy.schema.Table,
212 caching_context: CachingContext,
213 reversed_renames: dict[str, str],
214 ) -> CollectionSummaryManager:
215 """Create all summary tables (or check that they have been created),
216 returning an object to manage them.
218 Parameters
219 ----------
220 db : `Database`
221 Interface to the underlying database engine and namespace.
222 context : `StaticTablesContext`
223 Context object obtained from `Database.declareStaticTables`; used
224 to declare any tables that should always be present.
225 collections : `CollectionManager`
226 Manager object for the collections in this `Registry`.
227 dimensions : `DimensionRecordStorageManager`
228 Manager object for the dimensions in this `Registry`.
229 dataset_type_table : `sqlalchemy.schema.Table`
230 Table containing dataset type definitions.
231 caching_context : `CachingContext`
232 Object controlling caching of information returned by managers.
233 reversed_renames
234 Mapping from the new name for a dataset in a configured override
235 to the original name still used in the database.
237 Returns
238 -------
239 manager : `CollectionSummaryManager`
240 New manager object for collection summaries.
241 """
242 specs = CollectionSummaryTables.makeTableSpecs(collections, dimensions)
243 tables = CollectionSummaryTables(
244 datasetType=context.addTable("collection_summary_dataset_type", specs.datasetType),
245 dimensions=NamedKeyDict[GovernorDimension, sqlalchemy.schema.Table](
246 {
247 dimension: context.addTable(f"collection_summary_{dimension.name}", spec)
248 for dimension, spec in specs.dimensions.items()
249 }
250 ).freeze(),
251 )
252 return cls(
253 db=db,
254 collections=collections,
255 tables=tables,
256 dataset_type_table=dataset_type_table,
257 caching_context=caching_context,
258 reversed_renames=reversed_renames,
259 )
261 def update(
262 self,
263 collection: CollectionRecord,
264 dataset_type_ids: Iterable[int],
265 summary: CollectionSummary,
266 ) -> None:
267 """Update the summary tables to associate the given collection with
268 a dataset type and governor dimension values.
270 Parameters
271 ----------
272 collection : `CollectionRecord`
273 Collection whose summary should be updated.
274 dataset_type_ids : `~collections.abc.Iterable` [ `int` ]
275 Integer IDs for the dataset types to associate with this
276 collection.
277 summary : `CollectionSummary`
278 Summary to store. Dataset types must correspond to
279 ``dataset_type_ids``.
281 Notes
282 -----
283 This method should only be called inside the transaction context of
284 another operation that inserts or associates datasets.
285 """
286 self._db.ensure(
287 self._tables.datasetType,
288 *[
289 {
290 "dataset_type_id": dataset_type_id,
291 self._collectionKeyName: collection.key,
292 }
293 for dataset_type_id in sorted(dataset_type_ids)
294 ],
295 )
296 for dimension in sorted(summary.governors):
297 if values := summary.governors[dimension]:
298 self._db.ensure(
299 self._tables.dimensions[dimension],
300 *[{self._collectionKeyName: collection.key, dimension: v} for v in sorted(values)],
301 )
303 def fetch_summaries(
304 self,
305 collections: Iterable[CollectionRecord],
306 dataset_type_names: Iterable[str] | None,
307 dataset_type_factory: Callable[[sqlalchemy.engine.RowMapping], DatasetType],
308 ) -> Mapping[Any, CollectionSummary]:
309 """Fetch collection summaries given their names and dataset types.
311 Parameters
312 ----------
313 collections : `~collections.abc.Iterable` [`CollectionRecord`]
314 Collection records to query.
315 dataset_type_names : `~collections.abc.Iterable` [`str`]
316 Names of dataset types to include into returned summaries. If
317 `None` then all dataset types will be included.
318 dataset_type_factory : `~collections.abc.Callable`
319 Method that takes a table row and make `DatasetType` instance out
320 of it.
322 Returns
323 -------
324 summaries : `~collections.abc.Mapping` [`typing.Any`, \
325 `CollectionSummary`]
326 Collection summaries indexed by collection record key. This mapping
327 will also contain all nested non-chained collections of the chained
328 collections.
329 """
330 summaries: dict[Any, CollectionSummary] = {}
331 # Check what we have in cache first.
332 if self._caching_context.collection_summaries is not None:
333 summaries, missing_keys = self._caching_context.collection_summaries.find_summaries(
334 [record.key for record in collections]
335 )
336 if not missing_keys:
337 return summaries
338 else:
339 collections = [record for record in collections if record.key in missing_keys]
341 # Need to expand all chained collections first.
342 non_chains: list[CollectionRecord] = []
343 chains: dict[CollectionRecord, list[CollectionRecord]] = {}
344 for collection in collections:
345 if collection.type is CollectionType.CHAINED:
346 children = self._collections.resolve_wildcard(
347 CollectionWildcard.from_names([collection.name]),
348 flatten_chains=True,
349 include_chains=False,
350 )
351 non_chains += children
352 chains[collection] = children
353 else:
354 non_chains.append(collection)
356 _LOG.debug("Fetching summaries for collections %s.", [record.name for record in non_chains])
358 # Set up the SQL query we'll use to fetch all of the summary
359 # information at once.
360 coll_col = self._tables.datasetType.columns[self._collectionKeyName].label(self._collectionKeyName)
361 dataset_type_id_col = self._tables.datasetType.columns.dataset_type_id.label("dataset_type_id")
362 columns = [coll_col, dataset_type_id_col] + list(self._dataset_type_table.columns)
363 fromClause: sqlalchemy.sql.expression.FromClause = self._tables.datasetType.join(
364 self._dataset_type_table
365 )
366 for dimension, table in self._tables.dimensions.items():
367 columns.append(table.columns[dimension.name].label(dimension.name))
368 fromClause = fromClause.join(
369 table,
370 onclause=(
371 self._tables.datasetType.columns[self._collectionKeyName]
372 == table.columns[self._collectionKeyName]
373 ),
374 isouter=True,
375 )
377 sql = sqlalchemy.sql.select(*columns).select_from(fromClause)
378 sql = sql.where(coll_col.in_([coll.key for coll in non_chains]))
379 # For caching we need to fetch complete summaries.
380 if self._caching_context.collection_summaries is None:
381 if dataset_type_names is not None:
382 db_dataset_type_names = [
383 self._reversed_renames.get(name, name) for name in dataset_type_names
384 ]
385 sql = sql.where(self._dataset_type_table.columns["name"].in_(db_dataset_type_names))
387 # Run the query and construct CollectionSummary objects from the result
388 # rows. This will never include CHAINED collections or collections
389 # with no datasets.
390 with self._db.query(sql) as sql_result:
391 sql_rows = sql_result.mappings().fetchall()
392 dataset_type_ids: dict[int, DatasetType] = {}
393 for row in sql_rows:
394 # Collection key should never be None/NULL; it's what we join on.
395 # Extract that and then turn it into a collection name.
396 collectionKey = row[self._collectionKeyName]
397 # dataset_type_id should also never be None/NULL; it's in the first
398 # table we joined.
399 dataset_type_id = row["dataset_type_id"]
400 if (dataset_type := dataset_type_ids.get(dataset_type_id)) is None:
401 dataset_type_ids[dataset_type_id] = dataset_type = dataset_type_factory(row)
402 # See if we have a summary already for this collection; if not,
403 # make one.
404 summary = summaries.get(collectionKey)
405 if summary is None:
406 summary = CollectionSummary()
407 summaries[collectionKey] = summary
408 # Update the dimensions with the values in this row that
409 # aren't None/NULL (many will be in general, because these
410 # enter the query via LEFT OUTER JOIN).
411 summary.dataset_types.add(dataset_type)
412 for dimension in self._tables.dimensions:
413 value = row[dimension.name]
414 if value is not None:
415 summary.governors.setdefault(dimension.name, set()).add(value)
417 # Add empty summary for any missing collection.
418 for collection in non_chains:
419 if collection.key not in summaries:
420 summaries[collection.key] = CollectionSummary()
422 # Merge children into their chains summaries.
423 for chain, children in chains.items():
424 summaries[chain.key] = CollectionSummary.union(*(summaries[child.key] for child in children))
426 if self._caching_context.collection_summaries is not None:
427 self._caching_context.collection_summaries.update(summaries)
429 return summaries
431 def get_collection_ids(self, dataset_type_id: int) -> Iterable[str] | Iterable[int]:
432 """Get collection IDs for a given dataset type ID.
434 Parameters
435 ----------
436 dataset_type_id : `int`
437 Integer ID for the dataset type.
439 Returns
440 -------
441 collection_ids : `~collections.abc.Iterable`
442 Collection IDs (ints or strings) associated with the dataset type.
443 """
444 query = sqlalchemy.select(self._tables.datasetType.columns[self._collectionKeyName])
445 query = query.where(self._tables.datasetType.columns.dataset_type_id == dataset_type_id)
446 with self._db.query(query) as result:
447 return list(result.scalars())
449 def delete_collections(self, dataset_type_id: int, collection_ids: Iterable) -> None:
450 """Delete collection from summaries for a given dataset type.
452 Parameters
453 ----------
454 dataset_type_id : `int`
455 Integer ID for the dataset type.
456 collection_ids : `~collections.abc.Iterable`
457 Collection IDs (integer or string) to remove from summaries for
458 this dataset type.
460 Notes
461 -----
462 This method should only be called inside the transaction context of
463 another operation that selects collection information.
464 """
465 for collections_chunk in chunk_iterable(collection_ids, 1000):
466 to_delete = [
467 {"dataset_type_id": dataset_type_id, self._collectionKeyName: collection_id}
468 for collection_id in collections_chunk
469 ]
470 self._db.delete(
471 self._tables.datasetType, ["dataset_type_id", self._collectionKeyName], *to_delete
472 )