Coverage for python / lsst / daf / butler / registry / obscore / _schema.py: 24%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["ObsCoreSchema"] 

31 

32import re 

33from collections.abc import Sequence 

34from typing import TYPE_CHECKING 

35 

36import sqlalchemy 

37 

38from lsst.daf.butler import ddl 

39from lsst.utils.iteration import ensure_iterable 

40 

41from ._config import DatasetTypeConfig, ExtraColumnConfig, ObsCoreConfig 

42from ._spatial import SpatialObsCorePlugin 

43 

44if TYPE_CHECKING: 

45 from ..interfaces import DatasetRecordStorageManager 

46 

47 

48# Regular expression to match templates in extra_columns that specify simple 

49# dimensions, e.g. "{exposure}". 

50_DIMENSION_TEMPLATE_RE = re.compile(r"^[{](\w+)[}]$") 

51 

52# List of standard columns in output file. This should include at least all 

53# mandatory columns defined in ObsCore note (revision 1.1, Appendix B). Extra 

54# columns can be added via `extra_columns` parameters in configuration. 

55_STATIC_COLUMNS = ( 

56 ddl.FieldSpec( 

57 name="dataproduct_type", dtype=sqlalchemy.String, length=255, doc="Logical data product type" 

58 ), 

59 ddl.FieldSpec( 

60 name="dataproduct_subtype", dtype=sqlalchemy.String, length=255, doc="Data product specific type" 

61 ), 

62 ddl.FieldSpec( 

63 name="facility_name", 

64 dtype=sqlalchemy.String, 

65 length=255, 

66 doc="The name of the facility used for the observation", 

67 ), 

68 ddl.FieldSpec(name="calib_level", dtype=sqlalchemy.SmallInteger, doc="Calibration level {0, 1, 2, 3, 4}"), 

69 ddl.FieldSpec(name="target_name", dtype=sqlalchemy.String, length=255, doc="Object of interest"), 

70 ddl.FieldSpec(name="obs_id", dtype=sqlalchemy.String, length=255, doc="Observation ID"), 

71 ddl.FieldSpec( 

72 name="obs_collection", dtype=sqlalchemy.String, length=255, doc="Name of the data collection" 

73 ), 

74 ddl.FieldSpec( 

75 name="obs_publisher_did", 

76 dtype=sqlalchemy.String, 

77 length=255, 

78 doc="Dataset identifier given by the publisher", 

79 ), 

80 ddl.FieldSpec( 

81 name="access_url", dtype=sqlalchemy.String, length=65535, doc="URL used to access (download) dataset" 

82 ), 

83 ddl.FieldSpec(name="access_format", dtype=sqlalchemy.String, length=255, doc="File content format"), 

84 # Spatial columns s_ra, s_dec, s_fow, s_region are managed by a default 

85 # spatial plugin 

86 ddl.FieldSpec( 

87 name="s_resolution", dtype=sqlalchemy.Float, doc="Spatial resolution of data as FWHM (arcsec)" 

88 ), 

89 ddl.FieldSpec( 

90 name="s_xel1", dtype=sqlalchemy.Integer, doc="Number of elements along the first spatial axis" 

91 ), 

92 ddl.FieldSpec( 

93 name="s_xel2", dtype=sqlalchemy.Integer, doc="Number of elements along the second spatial axis" 

94 ), 

95 ddl.FieldSpec(name="t_xel", dtype=sqlalchemy.Integer, doc="Number of elements along the time axis"), 

96 ddl.FieldSpec(name="t_min", dtype=sqlalchemy.Float, doc="Start time in MJD"), 

97 ddl.FieldSpec(name="t_max", dtype=sqlalchemy.Float, doc="Stop time in MJD"), 

98 ddl.FieldSpec(name="t_exptime", dtype=sqlalchemy.Float, doc="Total exposure time (sec)"), 

99 ddl.FieldSpec(name="t_resolution", dtype=sqlalchemy.Float, doc="Temporal resolution (sec)"), 

100 ddl.FieldSpec(name="em_xel", dtype=sqlalchemy.Integer, doc="Number of elements along the spectral axis"), 

101 ddl.FieldSpec(name="em_min", dtype=sqlalchemy.Float, doc="Start in spectral coordinates (m)"), 

102 ddl.FieldSpec(name="em_max", dtype=sqlalchemy.Float, doc="Stop in spectral coordinates (m)"), 

103 ddl.FieldSpec(name="em_res_power", dtype=sqlalchemy.Float, doc="Spectral resolving power"), 

104 ddl.FieldSpec( 

105 name="em_filter_name", dtype=sqlalchemy.String, length=255, doc="Filter name (non-standard column)" 

106 ), 

107 ddl.FieldSpec(name="o_ucd", dtype=sqlalchemy.String, length=255, doc="UCD of observable"), 

108 ddl.FieldSpec(name="pol_xel", dtype=sqlalchemy.Integer, doc="Number of polarization samples"), 

109 ddl.FieldSpec( 

110 name="instrument_name", 

111 dtype=sqlalchemy.String, 

112 length=255, 

113 doc="Name of the instrument used for this observation", 

114 ), 

115) 

116 

117_TYPE_MAP = { 

118 int: sqlalchemy.BigInteger, 

119 float: sqlalchemy.Float, 

120 bool: sqlalchemy.Boolean, 

121 str: sqlalchemy.String, 

122} 

123 

124 

125class ObsCoreSchema: 

126 """Generate table specification for an ObsCore table based on its 

127 configuration. 

128 

129 Parameters 

130 ---------- 

131 config : `ObsCoreConfig` 

132 ObsCore configuration instance. 

133 spatial_plugins : `~collections.abc.Sequence` of `SpatialObsCorePlugin` 

134 Spatial plugins. 

135 datasets : `type`, optional 

136 Type of dataset records manager. If specified, the ObsCore table will 

137 define a foreign key to ``datasets`` table with "ON DELETE CASCADE" 

138 constraint. 

139 

140 Notes 

141 ----- 

142 This class is designed to support both "live" obscore table which is 

143 located in the same database as the Registry, and standalone table in a 

144 completely separate database. Live obscore table depends on foreign key 

145 constraints with "ON DELETE CASCADE" option to manage lifetime of obscore 

146 records when their original datasets are removed. 

147 """ 

148 

149 def __init__( 

150 self, 

151 config: ObsCoreConfig, 

152 spatial_plugins: Sequence[SpatialObsCorePlugin], 

153 datasets: type[DatasetRecordStorageManager] | None = None, 

154 ): 

155 self._dimension_columns: dict[str, str] = {"instrument": "instrument_name"} 

156 

157 fields = list(_STATIC_COLUMNS) 

158 

159 column_names = {col.name for col in fields} 

160 

161 all_configs: list[ObsCoreConfig | DatasetTypeConfig] = [config] 

162 if config.dataset_types: 

163 all_configs += list(config.dataset_types.values()) 

164 for cfg in all_configs: 

165 if cfg.extra_columns: 

166 for col_name, col_value in cfg.extra_columns.items(): 

167 if col_name in column_names: 

168 continue 

169 doc: str | None = None 

170 if isinstance(col_value, ExtraColumnConfig): 

171 col_type = ddl.VALID_CONFIG_COLUMN_TYPES.get(col_value.type.name) 

172 col_length = col_value.length 

173 doc = col_value.doc 

174 # For columns that store dimensions remember their 

175 # column names. 

176 if match := _DIMENSION_TEMPLATE_RE.match(col_value.template): 

177 dimension = match.group(1) 

178 self._dimension_columns[dimension] = col_name 

179 else: 

180 # Only value is provided, guess type from Python, and 

181 # use a fixed length of 255 for strings. 

182 col_type = _TYPE_MAP.get(type(col_value)) 

183 col_length = 255 if isinstance(col_value, str) else None 

184 if col_type is None: 

185 raise TypeError( 

186 f"Unexpected type in extra_columns: column={col_name}, value={col_value}" 

187 ) 

188 fields.append(ddl.FieldSpec(name=col_name, dtype=col_type, length=col_length, doc=doc)) 

189 column_names.add(col_name) 

190 

191 indices: list[ddl.IndexSpec] = [] 

192 if config.indices: 

193 for columns in config.indices.values(): 

194 indices.append(ddl.IndexSpec(*ensure_iterable(columns))) 

195 

196 self._table_spec = ddl.TableSpec(fields=fields, indexes=indices) 

197 

198 # Possibly extend table specs with plugin-added stuff. 

199 for plugin in spatial_plugins: 

200 plugin.extend_table_spec(self._table_spec) 

201 

202 self._dataset_fk: ddl.FieldSpec | None = None 

203 if datasets is not None: 

204 # Add FK to datasets, is also a PK for this table 

205 self._dataset_fk = datasets.addDatasetForeignKey( 

206 self._table_spec, name="registry_dataset", onDelete="CASCADE", doc="Registry dataset ID" 

207 ) 

208 self._dataset_fk.primaryKey = True 

209 

210 @property 

211 def table_spec(self) -> ddl.TableSpec: 

212 """Specification for obscore table (`ddl.TableSpec`).""" 

213 return self._table_spec 

214 

215 @property 

216 def dataset_fk(self) -> ddl.FieldSpec | None: 

217 """Specification for the field which is a foreign key to ``datasets`` 

218 table, and also a primary key for obscore table (`ddl.FieldSpec` or 

219 `None`). 

220 """ 

221 return self._dataset_fk 

222 

223 def dimension_column(self, dimension: str) -> str | None: 

224 """Return column name for a given dimension. 

225 

226 Parameters 

227 ---------- 

228 dimension : `str` 

229 Dimension name, e.g. "exposure". 

230 

231 Returns 

232 ------- 

233 column_name : `str` or `None` 

234 Name of the column in obscore table or `None` if there is no 

235 configured column for this dimension. 

236 """ 

237 return self._dimension_columns.get(dimension)