Coverage for python / lsst / daf / butler / registry / datasets / byDimensions / summaries.py: 0%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from .... import ddl
32__all__ = ("CollectionSummaryManager",)
34import logging
35from collections.abc import Callable, Iterable, Mapping
36from typing import Any, Generic, TypeVar
38import sqlalchemy
40from lsst.utils.iteration import chunk_iterable
42from ...._collection_type import CollectionType
43from ...._dataset_type import DatasetType
44from ...._named import NamedKeyDict, NamedKeyMapping
45from ....dimensions import GovernorDimension, addDimensionForeignKey
46from ..._caching_context import CachingContext
47from ..._collection_summary import CollectionSummary
48from ...interfaces import (
49 CollectionManager,
50 CollectionRecord,
51 Database,
52 DimensionRecordStorageManager,
53 StaticTablesContext,
54)
55from ...wildcards import CollectionWildcard
57_T = TypeVar("_T")
60_LOG = logging.getLogger(__name__)
63class CollectionSummaryTables(Generic[_T]):
64 """Structure that holds the table or table specification objects that
65 summarize the contents of collections.
67 Parameters
68 ----------
69 datasetType : _T
70 Table [specification] that summarizes which dataset types are in each
71 collection.
72 dimensions : `NamedKeyMapping`
73 Mapping of table [specifications] that summarize which governor
74 dimension values are present in the data IDs of each collection.
75 """
77 def __init__(
78 self,
79 datasetType: _T,
80 dimensions: NamedKeyMapping[GovernorDimension, _T],
81 ):
82 self.datasetType = datasetType
83 self.dimensions = dimensions
85 @classmethod
86 def makeTableSpecs(
87 cls,
88 collections: CollectionManager,
89 dimensions: DimensionRecordStorageManager,
90 ) -> CollectionSummaryTables[ddl.TableSpec]:
91 """Create specifications for all summary tables.
93 Parameters
94 ----------
95 collections : `CollectionManager`
96 Manager object for the collections in this `Registry`.
97 dimensions : `DimensionRecordStorageManager`
98 Manager object for the dimensions in this `Registry`.
100 Returns
101 -------
102 tables : `CollectionSummaryTables` [ `ddl.TableSpec` ]
103 Structure containing table specifications.
104 """
105 # Spec for collection_summary_dataset_type.
106 datasetTypeTableSpec = ddl.TableSpec(fields=[])
107 collections.addCollectionForeignKey(datasetTypeTableSpec, primaryKey=True, onDelete="CASCADE")
108 datasetTypeTableSpec.fields.add(
109 ddl.FieldSpec("dataset_type_id", dtype=sqlalchemy.BigInteger, primaryKey=True)
110 )
111 datasetTypeTableSpec.foreignKeys.append(
112 ddl.ForeignKeySpec(
113 "dataset_type", source=("dataset_type_id",), target=("id",), onDelete="CASCADE"
114 )
115 )
116 # Specs for collection_summary_<dimension>.
117 dimensionTableSpecs = NamedKeyDict[GovernorDimension, ddl.TableSpec]()
118 for dimension in dimensions.universe.governor_dimensions:
119 tableSpec = ddl.TableSpec(fields=[])
120 collections.addCollectionForeignKey(tableSpec, primaryKey=True, onDelete="CASCADE")
121 addDimensionForeignKey(tableSpec, dimension, primaryKey=True)
122 dimensionTableSpecs[dimension] = tableSpec
123 return CollectionSummaryTables(
124 datasetType=datasetTypeTableSpec,
125 dimensions=dimensionTableSpecs.freeze(),
126 )
129class CollectionSummaryManager:
130 """Object manages the summaries of what dataset types and governor
131 dimension values are present in a collection.
133 Parameters
134 ----------
135 db : `Database`
136 Interface to the underlying database engine and namespace.
137 collections : `CollectionManager`
138 Manager object for the collections in this `Registry`.
139 tables : `CollectionSummaryTables`
140 Struct containing the tables that hold collection summaries.
141 dataset_type_table : `sqlalchemy.schema.Table`
142 Table containing dataset type definitions.
143 caching_context : `CachingContext`
144 Object controlling caching of information returned by managers.
145 """
147 def __init__(
148 self,
149 db: Database,
150 *,
151 collections: CollectionManager,
152 tables: CollectionSummaryTables[sqlalchemy.schema.Table],
153 dataset_type_table: sqlalchemy.schema.Table,
154 caching_context: CachingContext,
155 ):
156 self._db = db
157 self._collections = collections
158 self._collectionKeyName = collections.getCollectionForeignKeyName()
159 self._tables = tables
160 self._dataset_type_table = dataset_type_table
161 self._caching_context = caching_context
163 def clone(
164 self,
165 *,
166 db: Database,
167 collections: CollectionManager,
168 caching_context: CachingContext,
169 ) -> CollectionSummaryManager:
170 """Make an independent copy of this manager instance bound to new
171 instances of `Database` and other managers.
173 Parameters
174 ----------
175 db : `Database`
176 New `Database` object to use when instantiating the manager.
177 collections : `CollectionManager`
178 New `CollectionManager` object to use when instantiating the
179 manager.
180 caching_context : `CachingContext`
181 New `CachingContext` object to use when instantiating the manager.
183 Returns
184 -------
185 instance : `CollectionSummaryManager`
186 New manager instance with the same configuration as this instance,
187 but bound to a new Database object.
188 """
189 return CollectionSummaryManager(
190 db=db,
191 collections=collections,
192 tables=self._tables,
193 dataset_type_table=self._dataset_type_table,
194 caching_context=caching_context,
195 )
197 @classmethod
198 def initialize(
199 cls,
200 db: Database,
201 context: StaticTablesContext,
202 *,
203 collections: CollectionManager,
204 dimensions: DimensionRecordStorageManager,
205 dataset_type_table: sqlalchemy.schema.Table,
206 caching_context: CachingContext,
207 ) -> CollectionSummaryManager:
208 """Create all summary tables (or check that they have been created),
209 returning an object to manage them.
211 Parameters
212 ----------
213 db : `Database`
214 Interface to the underlying database engine and namespace.
215 context : `StaticTablesContext`
216 Context object obtained from `Database.declareStaticTables`; used
217 to declare any tables that should always be present.
218 collections : `CollectionManager`
219 Manager object for the collections in this `Registry`.
220 dimensions : `DimensionRecordStorageManager`
221 Manager object for the dimensions in this `Registry`.
222 dataset_type_table : `sqlalchemy.schema.Table`
223 Table containing dataset type definitions.
224 caching_context : `CachingContext`
225 Object controlling caching of information returned by managers.
227 Returns
228 -------
229 manager : `CollectionSummaryManager`
230 New manager object for collection summaries.
231 """
232 specs = CollectionSummaryTables.makeTableSpecs(collections, dimensions)
233 tables = CollectionSummaryTables(
234 datasetType=context.addTable("collection_summary_dataset_type", specs.datasetType),
235 dimensions=NamedKeyDict[GovernorDimension, sqlalchemy.schema.Table](
236 {
237 dimension: context.addTable(f"collection_summary_{dimension.name}", spec)
238 for dimension, spec in specs.dimensions.items()
239 }
240 ).freeze(),
241 )
242 return cls(
243 db=db,
244 collections=collections,
245 tables=tables,
246 dataset_type_table=dataset_type_table,
247 caching_context=caching_context,
248 )
250 def update(
251 self,
252 collection: CollectionRecord,
253 dataset_type_ids: Iterable[int],
254 summary: CollectionSummary,
255 ) -> None:
256 """Update the summary tables to associate the given collection with
257 a dataset type and governor dimension values.
259 Parameters
260 ----------
261 collection : `CollectionRecord`
262 Collection whose summary should be updated.
263 dataset_type_ids : `~collections.abc.Iterable` [ `int` ]
264 Integer IDs for the dataset types to associate with this
265 collection.
266 summary : `CollectionSummary`
267 Summary to store. Dataset types must correspond to
268 ``dataset_type_ids``.
270 Notes
271 -----
272 This method should only be called inside the transaction context of
273 another operation that inserts or associates datasets.
274 """
275 self._db.ensure(
276 self._tables.datasetType,
277 *[
278 {
279 "dataset_type_id": dataset_type_id,
280 self._collectionKeyName: collection.key,
281 }
282 for dataset_type_id in sorted(dataset_type_ids)
283 ],
284 )
285 for dimension in sorted(summary.governors):
286 if values := summary.governors[dimension]:
287 self._db.ensure(
288 self._tables.dimensions[dimension],
289 *[{self._collectionKeyName: collection.key, dimension: v} for v in sorted(values)],
290 )
292 def fetch_summaries(
293 self,
294 collections: Iterable[CollectionRecord],
295 dataset_type_names: Iterable[str] | None,
296 dataset_type_factory: Callable[[sqlalchemy.engine.RowMapping], DatasetType],
297 ) -> Mapping[Any, CollectionSummary]:
298 """Fetch collection summaries given their names and dataset types.
300 Parameters
301 ----------
302 collections : `~collections.abc.Iterable` [`CollectionRecord`]
303 Collection records to query.
304 dataset_type_names : `~collections.abc.Iterable` [`str`]
305 Names of dataset types to include into returned summaries. If
306 `None` then all dataset types will be included.
307 dataset_type_factory : `~collections.abc.Callable`
308 Method that takes a table row and make `DatasetType` instance out
309 of it.
311 Returns
312 -------
313 summaries : `~collections.abc.Mapping` [`typing.Any`, \
314 `CollectionSummary`]
315 Collection summaries indexed by collection record key. This mapping
316 will also contain all nested non-chained collections of the chained
317 collections.
318 """
319 summaries: dict[Any, CollectionSummary] = {}
320 # Check what we have in cache first.
321 if self._caching_context.collection_summaries is not None:
322 summaries, missing_keys = self._caching_context.collection_summaries.find_summaries(
323 [record.key for record in collections]
324 )
325 if not missing_keys:
326 return summaries
327 else:
328 collections = [record for record in collections if record.key in missing_keys]
330 # Need to expand all chained collections first.
331 non_chains: list[CollectionRecord] = []
332 chains: dict[CollectionRecord, list[CollectionRecord]] = {}
333 for collection in collections:
334 if collection.type is CollectionType.CHAINED:
335 children = self._collections.resolve_wildcard(
336 CollectionWildcard.from_names([collection.name]),
337 flatten_chains=True,
338 include_chains=False,
339 )
340 non_chains += children
341 chains[collection] = children
342 else:
343 non_chains.append(collection)
345 _LOG.debug("Fetching summaries for collections %s.", [record.name for record in non_chains])
347 # Set up the SQL query we'll use to fetch all of the summary
348 # information at once.
349 coll_col = self._tables.datasetType.columns[self._collectionKeyName].label(self._collectionKeyName)
350 dataset_type_id_col = self._tables.datasetType.columns.dataset_type_id.label("dataset_type_id")
351 columns = [coll_col, dataset_type_id_col] + list(self._dataset_type_table.columns)
352 fromClause: sqlalchemy.sql.expression.FromClause = self._tables.datasetType.join(
353 self._dataset_type_table
354 )
355 for dimension, table in self._tables.dimensions.items():
356 columns.append(table.columns[dimension.name].label(dimension.name))
357 fromClause = fromClause.join(
358 table,
359 onclause=(
360 self._tables.datasetType.columns[self._collectionKeyName]
361 == table.columns[self._collectionKeyName]
362 ),
363 isouter=True,
364 )
366 sql = sqlalchemy.sql.select(*columns).select_from(fromClause)
367 sql = sql.where(coll_col.in_([coll.key for coll in non_chains]))
368 # For caching we need to fetch complete summaries.
369 if self._caching_context.collection_summaries is None:
370 if dataset_type_names is not None:
371 sql = sql.where(self._dataset_type_table.columns["name"].in_(dataset_type_names))
373 # Run the query and construct CollectionSummary objects from the result
374 # rows. This will never include CHAINED collections or collections
375 # with no datasets.
376 with self._db.query(sql) as sql_result:
377 sql_rows = sql_result.mappings().fetchall()
378 dataset_type_ids: dict[int, DatasetType] = {}
379 for row in sql_rows:
380 # Collection key should never be None/NULL; it's what we join on.
381 # Extract that and then turn it into a collection name.
382 collectionKey = row[self._collectionKeyName]
383 # dataset_type_id should also never be None/NULL; it's in the first
384 # table we joined.
385 dataset_type_id = row["dataset_type_id"]
386 if (dataset_type := dataset_type_ids.get(dataset_type_id)) is None:
387 dataset_type_ids[dataset_type_id] = dataset_type = dataset_type_factory(row)
388 # See if we have a summary already for this collection; if not,
389 # make one.
390 summary = summaries.get(collectionKey)
391 if summary is None:
392 summary = CollectionSummary()
393 summaries[collectionKey] = summary
394 # Update the dimensions with the values in this row that
395 # aren't None/NULL (many will be in general, because these
396 # enter the query via LEFT OUTER JOIN).
397 summary.dataset_types.add(dataset_type)
398 for dimension in self._tables.dimensions:
399 value = row[dimension.name]
400 if value is not None:
401 summary.governors.setdefault(dimension.name, set()).add(value)
403 # Add empty summary for any missing collection.
404 for collection in non_chains:
405 if collection.key not in summaries:
406 summaries[collection.key] = CollectionSummary()
408 # Merge children into their chains summaries.
409 for chain, children in chains.items():
410 summaries[chain.key] = CollectionSummary.union(*(summaries[child.key] for child in children))
412 if self._caching_context.collection_summaries is not None:
413 self._caching_context.collection_summaries.update(summaries)
415 return summaries
417 def get_collection_ids(self, dataset_type_id: int) -> Iterable[str] | Iterable[int]:
418 """Get collection IDs for a given dataset type ID.
420 Parameters
421 ----------
422 dataset_type_id : `int`
423 Integer ID for the dataset type.
425 Returns
426 -------
427 collection_ids : `~collections.abc.Iterable`
428 Collection IDs (ints or strings) associated with the dataset type.
429 """
430 query = sqlalchemy.select(self._tables.datasetType.columns[self._collectionKeyName])
431 query = query.where(self._tables.datasetType.columns.dataset_type_id == dataset_type_id)
432 with self._db.query(query) as result:
433 return list(result.scalars())
435 def delete_collections(self, dataset_type_id: int, collection_ids: Iterable) -> None:
436 """Delete collection from summaries for a given dataset type.
438 Parameters
439 ----------
440 dataset_type_id : `int`
441 Integer ID for the dataset type.
442 collection_ids : `~collections.abc.Iterable`
443 Collection IDs (integer or string) to remove from summaries for
444 this dataset type.
446 Notes
447 -----
448 This method should only be called inside the transaction context of
449 another operation that selects collection information.
450 """
451 for collections_chunk in chunk_iterable(collection_ids, 1000):
452 to_delete = [
453 {"dataset_type_id": dataset_type_id, self._collectionKeyName: collection_id}
454 for collection_id in collections_chunk
455 ]
456 self._db.delete(
457 self._tables.datasetType, ["dataset_type_id", self._collectionKeyName], *to_delete
458 )