Coverage for python / lsst / daf / butler / registry / obscore / _records.py: 16%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["DerivedRegionFactory", "Record", "RecordFactory"]
32import logging
33import warnings
34from abc import abstractmethod
35from collections.abc import Callable, Collection, Mapping
36from importlib.metadata import entry_points
37from typing import TYPE_CHECKING, Any, cast
38from uuid import UUID
40import astropy.time
42from lsst.daf.butler import DataCoordinate, DatasetRef, Dimension, DimensionRecord, DimensionUniverse
43from lsst.utils.introspection import find_outside_stacklevel, get_full_type_name
45from ._config import ExtraColumnConfig, ExtraColumnType, ObsCoreConfig
46from ._spatial import RegionTypeError, RegionTypeWarning
48if TYPE_CHECKING:
49 from lsst.daf.butler import DimensionGroup
50 from lsst.sphgeom import Region
52 from ._config import DatasetTypeConfig
53 from ._schema import ObsCoreSchema
54 from ._spatial import SpatialObsCorePlugin
56_LOG = logging.getLogger(__name__)
58# Map extra column type to a conversion method that takes string.
59_TYPE_CONVERSION: Mapping[str, Callable[[str], Any]] = {
60 ExtraColumnType.bool: lambda x: bool(int(x)), # expect integer number/string as input.
61 ExtraColumnType.int: int,
62 ExtraColumnType.float: float,
63 ExtraColumnType.string: str,
64}
67class DerivedRegionFactory:
68 """Abstract interface for a class that returns a Region for a data ID."""
70 @abstractmethod
71 def derived_region(self, dataId: DataCoordinate) -> Region | None:
72 """Return a region for a given DataId that may have been derived.
74 Parameters
75 ----------
76 dataId : `DataCoordinate`
77 Data ID for the relevant dataset.
79 Returns
80 -------
81 region : `lsst.sphgeom.Region`
82 `None` is returned if region cannot be determined.
83 """
84 raise NotImplementedError()
87Record = dict[str, Any]
90class RecordFactory:
91 """Class that implements conversion of dataset information to ObsCore.
93 Parameters
94 ----------
95 config : `ObsCoreConfig`
96 Complete configuration specifying conversion options.
97 schema : `ObsCoreSchema`
98 Description of obscore schema.
99 universe : `DimensionUniverse`
100 Registry dimensions universe.
101 spatial_plugins : `~collections.abc.Collection` of `SpatialObsCorePlugin`
102 Spatial plugins.
103 derived_region_factory : `DerivedRegionFactory`, optional
104 Factory for handling derived regions that are not directly
105 available from the data ID.
106 """
108 def __init__(
109 self,
110 config: ObsCoreConfig,
111 schema: ObsCoreSchema,
112 universe: DimensionUniverse,
113 spatial_plugins: Collection[SpatialObsCorePlugin],
114 derived_region_factory: DerivedRegionFactory | None = None,
115 ):
116 self.config = config
117 self.schema = schema
118 self.universe = universe
119 self.derived_region_factory = derived_region_factory
120 self.spatial_plugins = spatial_plugins
122 # All dimension elements used below.
123 self.band = cast(Dimension, universe["band"])
124 self.exposure = universe["exposure"]
125 self.visit = universe["visit"]
126 self.physical_filter = cast(Dimension, universe["physical_filter"])
128 @abstractmethod
129 def region_dimension(self, dimensions: DimensionGroup) -> tuple[str, str] | tuple[None, None]:
130 """Return the dimension to use to obtain a region.
132 Parameters
133 ----------
134 dimensions : `DimensionGroup`
135 The dimensions to be examined.
137 Returns
138 -------
139 region_dim : `str` or `None`
140 The dimension to use to get the region information. Can be `None`
141 if there is no relevant dimension for a region.
142 region_metadata : `str` or `None`
143 The metadata field for the ``region_dim`` that specifies the
144 region itself. Will be `None` if ``region_dim`` is `None`.
146 Notes
147 -----
148 This is universe specific. For example, in the ``daf_butler`` namespace
149 the region for an ``exposure`` is obtained by looking for the relevant
150 ``visit`` or ``visit_detector_region``.
151 """
152 raise NotImplementedError()
154 def make_generic_records(self, ref: DatasetRef, dataset_config: DatasetTypeConfig) -> Record:
155 """Fill record content that is not associated with a specific universe.
157 Parameters
158 ----------
159 ref : `DatasetRef`
160 Dataset ref, its DataId must be in expanded form.
161 dataset_config : `DatasetTypeConfig`
162 The configuration for the dataset type.
164 Returns
165 -------
166 record : `dict` [ `str`, `typing.Any` ]
167 Record content with generic content filled in that does not
168 depend on a specific universe.
169 """
170 record: dict[str, str | int | float | UUID | None] = {}
172 record["dataproduct_type"] = dataset_config.dataproduct_type
173 record["dataproduct_subtype"] = dataset_config.dataproduct_subtype
174 record["o_ucd"] = dataset_config.o_ucd
175 record["calib_level"] = dataset_config.calib_level
176 if dataset_config.obs_collection is not None:
177 record["obs_collection"] = dataset_config.obs_collection
178 else:
179 record["obs_collection"] = self.config.obs_collection
180 record["access_format"] = dataset_config.access_format
182 if dataset_config.s_xel is not None:
183 record["s_xel1"] = dataset_config.s_xel[0]
184 record["s_xel2"] = dataset_config.s_xel[1]
186 dataId = ref.dataId
187 dataset_type_name = ref.datasetType.name
189 # Dictionary to use for substitutions when formatting various
190 # strings from configuration.
191 fmt_kws: dict[str, Any] = dict(records=dataId.records)
192 fmt_kws.update(dataId.mapping)
193 fmt_kws.update(id=ref.id)
194 fmt_kws.update(run=ref.run)
195 fmt_kws.update(dataset_type=dataset_type_name)
196 fmt_kws.update(record)
198 # In some cases we would like some keys to be duplicated
199 # with different names. For example making an exposure be a visit.
200 self.finalize_format_keywords(fmt_kws)
202 if dataset_config.obs_id_fmt:
203 record["obs_id"] = dataset_config.obs_id_fmt.format(**fmt_kws)
204 fmt_kws["obs_id"] = record["obs_id"]
206 if dataset_config.datalink_url_fmt:
207 record["access_url"] = dataset_config.datalink_url_fmt.format(**fmt_kws)
209 if self.config.obs_publisher_did_fmt:
210 record["obs_publisher_did"] = self.config.obs_publisher_did_fmt.format(**fmt_kws)
212 extra_columns = {}
213 if self.config.extra_columns:
214 extra_columns.update(self.config.extra_columns)
215 if dataset_config.extra_columns:
216 extra_columns.update(dataset_config.extra_columns)
217 for key, column_value in extra_columns.items():
218 # Try to expand the template with known keys, if expansion
219 # fails due to a missing key name then store None.
220 if isinstance(column_value, ExtraColumnConfig):
221 try:
222 value = column_value.template.format(**fmt_kws)
223 record[key] = _TYPE_CONVERSION[column_value.type](value)
224 except KeyError:
225 pass
226 else:
227 # Just a static value.
228 record[key] = column_value
230 return record
232 @abstractmethod
233 def make_universe_records(self, ref: DatasetRef) -> Record:
234 """Create universe-specific record content.
236 Parameters
237 ----------
238 ref : `DatasetRef`
239 Dataset ref, its DataId must be in expanded form.
241 Returns
242 -------
243 record : `dict` [ `str`, `typing.Any` ]
244 Record content populated using algorithms specific to this
245 dimension universe.
246 """
247 raise NotImplementedError()
249 def __call__(self, ref: DatasetRef) -> Record | None:
250 """Make an ObsCore record from a dataset.
252 Parameters
253 ----------
254 ref : `DatasetRef`
255 Dataset ref, its DataId must be in expanded form.
257 Returns
258 -------
259 record : `dict` [ `str`, `typing.Any` ] or `None`
260 ObsCore record represented as a dictionary. `None` is returned if
261 dataset does not need to be stored in the obscore table, e.g. when
262 dataset type is not in obscore configuration.
264 Notes
265 -----
266 This method filters records by dataset type and returns `None` if
267 reference dataset type is not configured. It does not check reference
268 run name against configured collections, all runs are acceptable by
269 this method.
270 """
271 # Quick check for dataset type.
272 dataset_type_name = ref.datasetType.name
273 dataset_config = self.config.dataset_types.get(dataset_type_name)
274 if dataset_config is None:
275 return None
277 # _LOG.debug("New record, dataId=%s", dataId.full)
278 # _LOG.debug("New record, records=%s", dataId.records)
280 record: dict[str, str | int | float | UUID | None]
282 # We need all columns filled, to simplify logic below just pre-fill
283 # everything with None.
284 record = {field.name: None for field in self.schema.table_spec.fields}
286 record.update(self.make_generic_records(ref, dataset_config))
287 record.update(self.make_universe_records(ref))
289 return record
291 def make_spatial_records(self, region: Region | None, warn: bool = False, msg: str = "") -> Record:
292 """Make spatial records for a given region.
294 Parameters
295 ----------
296 region : `~lsst.sphgeom.Region` or `None`
297 Spacial region to convert to record.
298 warn : `bool`, optional
299 If `False`, an exception will be raised if the type of region
300 is not supported. If `True` a warning will be issued and an
301 empty `dict` returned.
302 msg : `str`, optional
303 Message to use in warning. Generic message will be used if not
304 given. This message will be used to annotate any `RegionTypeError`
305 exception raised if defined.
307 Returns
308 -------
309 record : `dict`
310 Record items.
312 Raises
313 ------
314 RegionTypeError
315 Raised if type of the region is not supported and ``warn`` is
316 `False`.
317 """
318 record = Record()
319 try:
320 # Ask each plugin for its values to add to a record.
321 for plugin in self.spatial_plugins:
322 plugin_record = plugin.make_records(region)
323 if plugin_record is not None:
324 record.update(plugin_record)
325 except RegionTypeError as exc:
326 if warn:
327 if not msg:
328 msg = "Failed to convert obscore region"
329 warnings.warn(
330 f"{msg}: {exc}",
331 category=RegionTypeWarning,
332 stacklevel=find_outside_stacklevel("lsst.daf.butler"),
333 )
334 # Clear the record.
335 record = Record()
336 else:
337 if msg:
338 exc.add_note(msg)
339 raise
340 return record
342 def finalize_format_keywords(self, format_kws: dict[str, Any]) -> None:
343 """Modify the formatting dict as required after it has been populated
344 with generic content.
346 Parameters
347 ----------
348 format_kws : `dict`
349 Dictionary that might be modified in place.
350 """
351 pass
353 @classmethod
354 def get_record_type_from_universe(cls, universe: DimensionUniverse) -> type[RecordFactory]:
355 namespace = universe.namespace
356 if namespace == "daf_butler":
357 return DafButlerRecordFactory
358 # Check for entry points.
359 plugins = {p.name: p for p in entry_points(group="butler.obscore_factory")}
360 if namespace in plugins:
361 func = plugins[namespace].load()
362 # The entry point function is required to return the class that
363 # should be used for the RecordFactory for this universe.
364 record_factory_type = func()
365 if not issubclass(record_factory_type, RecordFactory):
366 raise TypeError(
367 f"Entry point for universe {namespace} did not return RecordFactory. "
368 f"Returned {get_full_type_name(record_factory_type)}"
369 )
370 return record_factory_type
371 raise ValueError(f"Unable to load record factory dynamically for universe namespace {namespace}")
374class DafButlerRecordFactory(RecordFactory):
375 """Class that implements conversion of dataset information to ObsCore
376 using the daf_butler dimension universe namespace.
378 Parameters
379 ----------
380 config : `ObsCoreConfig`
381 Complete configuration specifying conversion options.
382 schema : `ObsCoreSchema`
383 Description of obscore schema.
384 universe : `DimensionUniverse`
385 Registry dimensions universe.
386 spatial_plugins : `~collections.abc.Collection` of `SpatialObsCorePlugin`
387 Spatial plugins.
388 derived_region_factory : `DerivedRegionFactory`, optional
389 Factory for handling derived regions that are not directly
390 available from the data ID.
391 """
393 def __init__(
394 self,
395 config: ObsCoreConfig,
396 schema: ObsCoreSchema,
397 universe: DimensionUniverse,
398 spatial_plugins: Collection[SpatialObsCorePlugin],
399 derived_region_factory: DerivedRegionFactory | None = None,
400 ):
401 super().__init__(
402 config=config,
403 schema=schema,
404 universe=universe,
405 spatial_plugins=spatial_plugins,
406 derived_region_factory=derived_region_factory,
407 )
409 # All dimension elements used below.
410 self.band = cast(Dimension, universe["band"])
411 self.exposure = universe["exposure"]
412 self.visit = universe["visit"]
413 self.physical_filter = cast(Dimension, universe["physical_filter"])
415 def make_universe_records(self, ref: DatasetRef) -> Record:
416 # Construct records using the daf_butler dimension universe.
417 dataId = ref.dataId
418 record: dict[str, str | int | float | UUID | None] = {}
420 instrument_name = cast(str | None, dataId.get("instrument", self.config.fallback_instrument))
421 record["instrument_name"] = instrument_name
422 if self.schema.dataset_fk is not None:
423 record[self.schema.dataset_fk.name] = ref.id
425 record["facility_name"] = self.config.facility_map.get(
426 instrument_name or "", self.config.facility_name
427 )
429 timespan = dataId.timespan
430 if timespan is not None:
431 if timespan.begin is not None:
432 t_min = cast(astropy.time.Time, timespan.begin)
433 record["t_min"] = float(t_min.utc.mjd)
434 if timespan.end is not None:
435 t_max = cast(astropy.time.Time, timespan.end)
436 record["t_max"] = float(t_max.utc.mjd)
438 region = dataId.region
439 if self.exposure.name in dataId:
440 if (dimension_record := dataId.records[self.exposure.name]) is not None:
441 self._exposure_records(dimension_record, record)
442 if self.derived_region_factory is not None:
443 region = self.derived_region_factory.derived_region(dataId)
444 elif self.visit.name in dataId and (dimension_record := dataId.records[self.visit.name]) is not None:
445 self._visit_records(dimension_record, record)
447 # Create spatial records.
448 record.update(
449 self.make_spatial_records(
450 region, warn=True, msg=f"Failed to convert region for obscore dataset {ref.id}"
451 )
452 )
454 if self.band.name in dataId:
455 em_range = None
456 if (label := dataId.get(self.physical_filter.name)) is not None:
457 em_range = self.config.spectral_ranges.get(cast(str, label))
458 if not em_range:
459 band_name = dataId[self.band.name]
460 assert isinstance(band_name, str), "Band name must be string"
461 em_range = self.config.spectral_ranges.get(band_name)
462 if em_range:
463 record["em_min"], record["em_max"] = em_range
464 else:
465 _LOG.warning("could not find spectral range for dataId=%s", dataId)
466 record["em_filter_name"] = dataId["band"]
468 return record
470 def _exposure_records(self, dimension_record: DimensionRecord, record: dict[str, Any]) -> None:
471 """Extract all needed info from a visit dimension record."""
472 record["t_exptime"] = dimension_record.exposure_time
473 record["target_name"] = dimension_record.target_name
475 def _visit_records(self, dimension_record: DimensionRecord, record: dict[str, Any]) -> None:
476 """Extract all needed info from an exposure dimension record."""
477 record["t_exptime"] = dimension_record.exposure_time
478 record["target_name"] = dimension_record.target_name
480 def region_dimension(self, dimensions: DimensionGroup) -> tuple[str, str] | tuple[None, None]:
481 # Inherited doc string.
482 region_dim = dimensions.region_dimension
483 if not region_dim:
484 if "exposure" in dimensions:
485 if "detector" in dimensions:
486 region_dim = "visit_detector_region"
487 else:
488 region_dim = "visit"
489 else:
490 return None, None
491 return region_dim, "region"
493 def finalize_format_keywords(self, format_kws: dict[str, Any]) -> None:
494 # If we have an exposure but do not have a visit, copy the exposure
495 # into the visit field.
496 if "exposure" in format_kws and "visit" not in format_kws:
497 format_kws["visit"] = format_kws["exposure"]