Coverage for python / lsst / daf / butler / registry / obscore / _records.py: 16%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["DerivedRegionFactory", "Record", "RecordFactory"] 

31 

32import logging 

33import warnings 

34from abc import abstractmethod 

35from collections.abc import Callable, Collection, Mapping 

36from importlib.metadata import entry_points 

37from typing import TYPE_CHECKING, Any, cast 

38from uuid import UUID 

39 

40import astropy.time 

41 

42from lsst.daf.butler import DataCoordinate, DatasetRef, Dimension, DimensionRecord, DimensionUniverse 

43from lsst.utils.introspection import find_outside_stacklevel, get_full_type_name 

44 

45from ._config import ExtraColumnConfig, ExtraColumnType, ObsCoreConfig 

46from ._spatial import RegionTypeError, RegionTypeWarning 

47 

48if TYPE_CHECKING: 

49 from lsst.daf.butler import DimensionGroup 

50 from lsst.sphgeom import Region 

51 

52 from ._config import DatasetTypeConfig 

53 from ._schema import ObsCoreSchema 

54 from ._spatial import SpatialObsCorePlugin 

55 

56_LOG = logging.getLogger(__name__) 

57 

58# Map extra column type to a conversion method that takes string. 

59_TYPE_CONVERSION: Mapping[str, Callable[[str], Any]] = { 

60 ExtraColumnType.bool: lambda x: bool(int(x)), # expect integer number/string as input. 

61 ExtraColumnType.int: int, 

62 ExtraColumnType.float: float, 

63 ExtraColumnType.string: str, 

64} 

65 

66 

67class DerivedRegionFactory: 

68 """Abstract interface for a class that returns a Region for a data ID.""" 

69 

70 @abstractmethod 

71 def derived_region(self, dataId: DataCoordinate) -> Region | None: 

72 """Return a region for a given DataId that may have been derived. 

73 

74 Parameters 

75 ---------- 

76 dataId : `DataCoordinate` 

77 Data ID for the relevant dataset. 

78 

79 Returns 

80 ------- 

81 region : `lsst.sphgeom.Region` 

82 `None` is returned if region cannot be determined. 

83 """ 

84 raise NotImplementedError() 

85 

86 

87Record = dict[str, Any] 

88 

89 

90class RecordFactory: 

91 """Class that implements conversion of dataset information to ObsCore. 

92 

93 Parameters 

94 ---------- 

95 config : `ObsCoreConfig` 

96 Complete configuration specifying conversion options. 

97 schema : `ObsCoreSchema` 

98 Description of obscore schema. 

99 universe : `DimensionUniverse` 

100 Registry dimensions universe. 

101 spatial_plugins : `~collections.abc.Collection` of `SpatialObsCorePlugin` 

102 Spatial plugins. 

103 derived_region_factory : `DerivedRegionFactory`, optional 

104 Factory for handling derived regions that are not directly 

105 available from the data ID. 

106 """ 

107 

108 def __init__( 

109 self, 

110 config: ObsCoreConfig, 

111 schema: ObsCoreSchema, 

112 universe: DimensionUniverse, 

113 spatial_plugins: Collection[SpatialObsCorePlugin], 

114 derived_region_factory: DerivedRegionFactory | None = None, 

115 ): 

116 self.config = config 

117 self.schema = schema 

118 self.universe = universe 

119 self.derived_region_factory = derived_region_factory 

120 self.spatial_plugins = spatial_plugins 

121 

122 # All dimension elements used below. 

123 self.band = cast(Dimension, universe["band"]) 

124 self.exposure = universe["exposure"] 

125 self.visit = universe["visit"] 

126 self.physical_filter = cast(Dimension, universe["physical_filter"]) 

127 

128 @abstractmethod 

129 def region_dimension(self, dimensions: DimensionGroup) -> tuple[str, str] | tuple[None, None]: 

130 """Return the dimension to use to obtain a region. 

131 

132 Parameters 

133 ---------- 

134 dimensions : `DimensionGroup` 

135 The dimensions to be examined. 

136 

137 Returns 

138 ------- 

139 region_dim : `str` or `None` 

140 The dimension to use to get the region information. Can be `None` 

141 if there is no relevant dimension for a region. 

142 region_metadata : `str` or `None` 

143 The metadata field for the ``region_dim`` that specifies the 

144 region itself. Will be `None` if ``region_dim`` is `None`. 

145 

146 Notes 

147 ----- 

148 This is universe specific. For example, in the ``daf_butler`` namespace 

149 the region for an ``exposure`` is obtained by looking for the relevant 

150 ``visit`` or ``visit_detector_region``. 

151 """ 

152 raise NotImplementedError() 

153 

154 def make_generic_records(self, ref: DatasetRef, dataset_config: DatasetTypeConfig) -> Record: 

155 """Fill record content that is not associated with a specific universe. 

156 

157 Parameters 

158 ---------- 

159 ref : `DatasetRef` 

160 Dataset ref, its DataId must be in expanded form. 

161 dataset_config : `DatasetTypeConfig` 

162 The configuration for the dataset type. 

163 

164 Returns 

165 ------- 

166 record : `dict` [ `str`, `typing.Any` ] 

167 Record content with generic content filled in that does not 

168 depend on a specific universe. 

169 """ 

170 record: dict[str, str | int | float | UUID | None] = {} 

171 

172 record["dataproduct_type"] = dataset_config.dataproduct_type 

173 record["dataproduct_subtype"] = dataset_config.dataproduct_subtype 

174 record["o_ucd"] = dataset_config.o_ucd 

175 record["calib_level"] = dataset_config.calib_level 

176 if dataset_config.obs_collection is not None: 

177 record["obs_collection"] = dataset_config.obs_collection 

178 else: 

179 record["obs_collection"] = self.config.obs_collection 

180 record["access_format"] = dataset_config.access_format 

181 

182 if dataset_config.s_xel is not None: 

183 record["s_xel1"] = dataset_config.s_xel[0] 

184 record["s_xel2"] = dataset_config.s_xel[1] 

185 

186 dataId = ref.dataId 

187 dataset_type_name = ref.datasetType.name 

188 

189 # Dictionary to use for substitutions when formatting various 

190 # strings from configuration. 

191 fmt_kws: dict[str, Any] = dict(records=dataId.records) 

192 fmt_kws.update(dataId.mapping) 

193 fmt_kws.update(id=ref.id) 

194 fmt_kws.update(run=ref.run) 

195 fmt_kws.update(dataset_type=dataset_type_name) 

196 fmt_kws.update(record) 

197 

198 # In some cases we would like some keys to be duplicated 

199 # with different names. For example making an exposure be a visit. 

200 self.finalize_format_keywords(fmt_kws) 

201 

202 if dataset_config.obs_id_fmt: 

203 record["obs_id"] = dataset_config.obs_id_fmt.format(**fmt_kws) 

204 fmt_kws["obs_id"] = record["obs_id"] 

205 

206 if dataset_config.datalink_url_fmt: 

207 record["access_url"] = dataset_config.datalink_url_fmt.format(**fmt_kws) 

208 

209 if self.config.obs_publisher_did_fmt: 

210 record["obs_publisher_did"] = self.config.obs_publisher_did_fmt.format(**fmt_kws) 

211 

212 extra_columns = {} 

213 if self.config.extra_columns: 

214 extra_columns.update(self.config.extra_columns) 

215 if dataset_config.extra_columns: 

216 extra_columns.update(dataset_config.extra_columns) 

217 for key, column_value in extra_columns.items(): 

218 # Try to expand the template with known keys, if expansion 

219 # fails due to a missing key name then store None. 

220 if isinstance(column_value, ExtraColumnConfig): 

221 try: 

222 value = column_value.template.format(**fmt_kws) 

223 record[key] = _TYPE_CONVERSION[column_value.type](value) 

224 except KeyError: 

225 pass 

226 else: 

227 # Just a static value. 

228 record[key] = column_value 

229 

230 return record 

231 

232 @abstractmethod 

233 def make_universe_records(self, ref: DatasetRef) -> Record: 

234 """Create universe-specific record content. 

235 

236 Parameters 

237 ---------- 

238 ref : `DatasetRef` 

239 Dataset ref, its DataId must be in expanded form. 

240 

241 Returns 

242 ------- 

243 record : `dict` [ `str`, `typing.Any` ] 

244 Record content populated using algorithms specific to this 

245 dimension universe. 

246 """ 

247 raise NotImplementedError() 

248 

249 def __call__(self, ref: DatasetRef) -> Record | None: 

250 """Make an ObsCore record from a dataset. 

251 

252 Parameters 

253 ---------- 

254 ref : `DatasetRef` 

255 Dataset ref, its DataId must be in expanded form. 

256 

257 Returns 

258 ------- 

259 record : `dict` [ `str`, `typing.Any` ] or `None` 

260 ObsCore record represented as a dictionary. `None` is returned if 

261 dataset does not need to be stored in the obscore table, e.g. when 

262 dataset type is not in obscore configuration. 

263 

264 Notes 

265 ----- 

266 This method filters records by dataset type and returns `None` if 

267 reference dataset type is not configured. It does not check reference 

268 run name against configured collections, all runs are acceptable by 

269 this method. 

270 """ 

271 # Quick check for dataset type. 

272 dataset_type_name = ref.datasetType.name 

273 dataset_config = self.config.dataset_types.get(dataset_type_name) 

274 if dataset_config is None: 

275 return None 

276 

277 # _LOG.debug("New record, dataId=%s", dataId.full) 

278 # _LOG.debug("New record, records=%s", dataId.records) 

279 

280 record: dict[str, str | int | float | UUID | None] 

281 

282 # We need all columns filled, to simplify logic below just pre-fill 

283 # everything with None. 

284 record = {field.name: None for field in self.schema.table_spec.fields} 

285 

286 record.update(self.make_generic_records(ref, dataset_config)) 

287 record.update(self.make_universe_records(ref)) 

288 

289 return record 

290 

291 def make_spatial_records(self, region: Region | None, warn: bool = False, msg: str = "") -> Record: 

292 """Make spatial records for a given region. 

293 

294 Parameters 

295 ---------- 

296 region : `~lsst.sphgeom.Region` or `None` 

297 Spacial region to convert to record. 

298 warn : `bool`, optional 

299 If `False`, an exception will be raised if the type of region 

300 is not supported. If `True` a warning will be issued and an 

301 empty `dict` returned. 

302 msg : `str`, optional 

303 Message to use in warning. Generic message will be used if not 

304 given. This message will be used to annotate any `RegionTypeError` 

305 exception raised if defined. 

306 

307 Returns 

308 ------- 

309 record : `dict` 

310 Record items. 

311 

312 Raises 

313 ------ 

314 RegionTypeError 

315 Raised if type of the region is not supported and ``warn`` is 

316 `False`. 

317 """ 

318 record = Record() 

319 try: 

320 # Ask each plugin for its values to add to a record. 

321 for plugin in self.spatial_plugins: 

322 plugin_record = plugin.make_records(region) 

323 if plugin_record is not None: 

324 record.update(plugin_record) 

325 except RegionTypeError as exc: 

326 if warn: 

327 if not msg: 

328 msg = "Failed to convert obscore region" 

329 warnings.warn( 

330 f"{msg}: {exc}", 

331 category=RegionTypeWarning, 

332 stacklevel=find_outside_stacklevel("lsst.daf.butler"), 

333 ) 

334 # Clear the record. 

335 record = Record() 

336 else: 

337 if msg: 

338 exc.add_note(msg) 

339 raise 

340 return record 

341 

342 def finalize_format_keywords(self, format_kws: dict[str, Any]) -> None: 

343 """Modify the formatting dict as required after it has been populated 

344 with generic content. 

345 

346 Parameters 

347 ---------- 

348 format_kws : `dict` 

349 Dictionary that might be modified in place. 

350 """ 

351 pass 

352 

353 @classmethod 

354 def get_record_type_from_universe(cls, universe: DimensionUniverse) -> type[RecordFactory]: 

355 namespace = universe.namespace 

356 if namespace == "daf_butler": 

357 return DafButlerRecordFactory 

358 # Check for entry points. 

359 plugins = {p.name: p for p in entry_points(group="butler.obscore_factory")} 

360 if namespace in plugins: 

361 func = plugins[namespace].load() 

362 # The entry point function is required to return the class that 

363 # should be used for the RecordFactory for this universe. 

364 record_factory_type = func() 

365 if not issubclass(record_factory_type, RecordFactory): 

366 raise TypeError( 

367 f"Entry point for universe {namespace} did not return RecordFactory. " 

368 f"Returned {get_full_type_name(record_factory_type)}" 

369 ) 

370 return record_factory_type 

371 raise ValueError(f"Unable to load record factory dynamically for universe namespace {namespace}") 

372 

373 

374class DafButlerRecordFactory(RecordFactory): 

375 """Class that implements conversion of dataset information to ObsCore 

376 using the daf_butler dimension universe namespace. 

377 

378 Parameters 

379 ---------- 

380 config : `ObsCoreConfig` 

381 Complete configuration specifying conversion options. 

382 schema : `ObsCoreSchema` 

383 Description of obscore schema. 

384 universe : `DimensionUniverse` 

385 Registry dimensions universe. 

386 spatial_plugins : `~collections.abc.Collection` of `SpatialObsCorePlugin` 

387 Spatial plugins. 

388 derived_region_factory : `DerivedRegionFactory`, optional 

389 Factory for handling derived regions that are not directly 

390 available from the data ID. 

391 """ 

392 

393 def __init__( 

394 self, 

395 config: ObsCoreConfig, 

396 schema: ObsCoreSchema, 

397 universe: DimensionUniverse, 

398 spatial_plugins: Collection[SpatialObsCorePlugin], 

399 derived_region_factory: DerivedRegionFactory | None = None, 

400 ): 

401 super().__init__( 

402 config=config, 

403 schema=schema, 

404 universe=universe, 

405 spatial_plugins=spatial_plugins, 

406 derived_region_factory=derived_region_factory, 

407 ) 

408 

409 # All dimension elements used below. 

410 self.band = cast(Dimension, universe["band"]) 

411 self.exposure = universe["exposure"] 

412 self.visit = universe["visit"] 

413 self.physical_filter = cast(Dimension, universe["physical_filter"]) 

414 

415 def make_universe_records(self, ref: DatasetRef) -> Record: 

416 # Construct records using the daf_butler dimension universe. 

417 dataId = ref.dataId 

418 record: dict[str, str | int | float | UUID | None] = {} 

419 

420 instrument_name = cast(str | None, dataId.get("instrument", self.config.fallback_instrument)) 

421 record["instrument_name"] = instrument_name 

422 if self.schema.dataset_fk is not None: 

423 record[self.schema.dataset_fk.name] = ref.id 

424 

425 record["facility_name"] = self.config.facility_map.get( 

426 instrument_name or "", self.config.facility_name 

427 ) 

428 

429 timespan = dataId.timespan 

430 if timespan is not None: 

431 if timespan.begin is not None: 

432 t_min = cast(astropy.time.Time, timespan.begin) 

433 record["t_min"] = float(t_min.utc.mjd) 

434 if timespan.end is not None: 

435 t_max = cast(astropy.time.Time, timespan.end) 

436 record["t_max"] = float(t_max.utc.mjd) 

437 

438 region = dataId.region 

439 if self.exposure.name in dataId: 

440 if (dimension_record := dataId.records[self.exposure.name]) is not None: 

441 self._exposure_records(dimension_record, record) 

442 if self.derived_region_factory is not None: 

443 region = self.derived_region_factory.derived_region(dataId) 

444 elif self.visit.name in dataId and (dimension_record := dataId.records[self.visit.name]) is not None: 

445 self._visit_records(dimension_record, record) 

446 

447 # Create spatial records. 

448 record.update( 

449 self.make_spatial_records( 

450 region, warn=True, msg=f"Failed to convert region for obscore dataset {ref.id}" 

451 ) 

452 ) 

453 

454 if self.band.name in dataId: 

455 em_range = None 

456 if (label := dataId.get(self.physical_filter.name)) is not None: 

457 em_range = self.config.spectral_ranges.get(cast(str, label)) 

458 if not em_range: 

459 band_name = dataId[self.band.name] 

460 assert isinstance(band_name, str), "Band name must be string" 

461 em_range = self.config.spectral_ranges.get(band_name) 

462 if em_range: 

463 record["em_min"], record["em_max"] = em_range 

464 else: 

465 _LOG.warning("could not find spectral range for dataId=%s", dataId) 

466 record["em_filter_name"] = dataId["band"] 

467 

468 return record 

469 

470 def _exposure_records(self, dimension_record: DimensionRecord, record: dict[str, Any]) -> None: 

471 """Extract all needed info from a visit dimension record.""" 

472 record["t_exptime"] = dimension_record.exposure_time 

473 record["target_name"] = dimension_record.target_name 

474 

475 def _visit_records(self, dimension_record: DimensionRecord, record: dict[str, Any]) -> None: 

476 """Extract all needed info from an exposure dimension record.""" 

477 record["t_exptime"] = dimension_record.exposure_time 

478 record["target_name"] = dimension_record.target_name 

479 

480 def region_dimension(self, dimensions: DimensionGroup) -> tuple[str, str] | tuple[None, None]: 

481 # Inherited doc string. 

482 region_dim = dimensions.region_dimension 

483 if not region_dim: 

484 if "exposure" in dimensions: 

485 if "detector" in dimensions: 

486 region_dim = "visit_detector_region" 

487 else: 

488 region_dim = "visit" 

489 else: 

490 return None, None 

491 return region_dim, "region" 

492 

493 def finalize_format_keywords(self, format_kws: dict[str, Any]) -> None: 

494 # If we have an exposure but do not have a visit, copy the exposure 

495 # into the visit field. 

496 if "exposure" in format_kws and "visit" not in format_kws: 

497 format_kws["visit"] = format_kws["exposure"]