Coverage for python / lsst / dax / apdb / apdbSchema.py: 39%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:46 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Module containing methods and classes for generic APDB schema operations. 

23 

24The code in this module is independent of the specific technology used to 

25implement APDB. 

26""" 

27 

28from __future__ import annotations 

29 

30__all__ = ["ApdbSchema", "ApdbTables"] 

31 

32import enum 

33import logging 

34from collections.abc import Mapping, MutableMapping 

35from functools import cached_property 

36 

37import felis.datamodel 

38import numpy 

39 

40from .schema_model import ExtraDataTypes, Schema, Table 

41from .versionTuple import VersionTuple 

42 

43_LOG = logging.getLogger(__name__) 

44 

45# In most cases column types are determined by Cassandra driver, but in some 

46# cases we need to create Pandas Dataframe ourselves and we use this map to 

47# infer types of columns from their YAML schema. Note that Cassandra saves 

48# timestamps with millisecond precision, but pandas maps datetime type to 

49# "datetime64[ns]". 

50_dtype_map: Mapping[felis.datamodel.DataType | ExtraDataTypes, type | str] = { 

51 felis.datamodel.DataType.double: numpy.float64, 

52 felis.datamodel.DataType.float: numpy.float32, 

53 felis.datamodel.DataType.timestamp: "datetime64[ns]", 

54 felis.datamodel.DataType.long: numpy.int64, 

55 felis.datamodel.DataType.int: numpy.int32, 

56 felis.datamodel.DataType.short: numpy.int16, 

57 felis.datamodel.DataType.byte: numpy.int8, 

58 felis.datamodel.DataType.binary: object, 

59 felis.datamodel.DataType.char: object, 

60 felis.datamodel.DataType.text: object, 

61 felis.datamodel.DataType.string: object, 

62 felis.datamodel.DataType.unicode: object, 

63 felis.datamodel.DataType.boolean: bool, 

64} 

65 

66 

67@enum.unique 

68class ApdbTables(enum.Enum): 

69 """Names of the tables in APDB schema.""" 

70 

71 DiaObject = "DiaObject" 

72 """Name of the table for DIAObject records.""" 

73 

74 DiaSource = "DiaSource" 

75 """Name of the table for DIASource records.""" 

76 

77 DiaForcedSource = "DiaForcedSource" 

78 """Name of the table for DIAForcedSource records.""" 

79 

80 DiaObjectLast = "DiaObjectLast" 

81 """Name of the table for the last version of DIAObject records. 

82 

83 This table may be optional for some implementations. 

84 """ 

85 

86 SSObject = "SSObject" 

87 """Name of the table for SSObject records.""" 

88 

89 SSSource = "SSSource" 

90 """Name of the table for SSSource records.""" 

91 

92 DiaObject_To_Object_Match = "DiaObject_To_Object_Match" 

93 """Name of the table for DiaObject_To_Object_Match records.""" 

94 

95 metadata = "metadata" 

96 """Name of the metadata table, this table may not always exist.""" 

97 

98 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str: 

99 """Return full table name. 

100 

101 Parameters 

102 ---------- 

103 prefix : `str`, optional 

104 Optional prefix for table name. 

105 time_partition : `int`, optional 

106 Optional time partition, should only be used for tables that 

107 support time partitioning. 

108 """ 

109 name = f"{prefix}{self.value}" 

110 if time_partition is not None: 

111 name = f"{name}_{time_partition}" 

112 return name 

113 

114 

115class ApdbSchema: 

116 """Class for management of APDB schema. 

117 

118 Attributes 

119 ---------- 

120 tableSchemas : `dict` 

121 Maps table name to `TableDef` instance. 

122 

123 Parameters 

124 ---------- 

125 schema_file : `str` 

126 Location of the YAML file with APDB schema. 

127 ss_schema_file : `str` 

128 Location of the YAML file with SSO schema. File will be loaded if APDB 

129 schema file does not contain SSObject/SSSource tables. Can be set to 

130 empty string to skip loading of SSObject/SSSource schema. 

131 """ 

132 

133 def __init__( 

134 self, 

135 schema_file: str, 

136 ss_schema_file: str, 

137 ): 

138 # build complete table schema 

139 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file) 

140 if ss_schema_file: 

141 if ApdbTables.SSObject not in self.tableSchemas or ApdbTables.SSSource not in self.tableSchemas: 

142 # Read additional SSP schema. 

143 ssp_tables, _ = self._buildSchemas(ss_schema_file) 

144 if ApdbTables.SSObject not in ssp_tables or ApdbTables.SSSource not in ssp_tables: 

145 raise LookupError(f"Cannot locate SSObject/SSSource table in {ss_schema_file}") 

146 self.tableSchemas = dict(self.tableSchemas) | { 

147 ApdbTables.SSObject: ssp_tables[ApdbTables.SSObject], 

148 ApdbTables.SSSource: ssp_tables[ApdbTables.SSSource], 

149 } 

150 

151 def column_dtype(self, felis_type: felis.datamodel.DataType | ExtraDataTypes) -> type | str: 

152 """Return Pandas data type for a given Felis column type. 

153 

154 Parameters 

155 ---------- 

156 felis_type : `felis.datamodel.DataType` 

157 Felis type, on of the enums defined in `felis.datamodel` module. 

158 

159 Returns 

160 ------- 

161 column_dtype : `type` or `str` 

162 Type that can be used for columns in Pandas. 

163 

164 Raises 

165 ------ 

166 TypeError 

167 Raised if type is cannot be handled. 

168 """ 

169 try: 

170 return _dtype_map[felis_type] 

171 except KeyError: 

172 raise TypeError(f"Unexpected Felis type: {felis_type}") 

173 

174 def schemaVersion(self) -> VersionTuple: 

175 """Return schema version as defined in YAML schema file. 

176 

177 Returns 

178 ------- 

179 version : `VersionTuple` 

180 Version number read from YAML file, if YAML file does not define 

181 schema version then "0.1.0" is returned. 

182 """ 

183 if self._schemaVersion is None: 

184 return VersionTuple(0, 1, 0) 

185 else: 

186 return self._schemaVersion 

187 

188 @classmethod 

189 def _buildSchemas(cls, schema_file: str) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]: 

190 """Create schema definitions for tables from felis schema. 

191 

192 Reads YAML schema and builds a dictionary containing 

193 `.schema_model.Table` instances for each APDB table appearing in that 

194 schema. 

195 

196 Parameters 

197 ---------- 

198 schema_file : `str` 

199 Name of YAML file with ``felis`` schema. 

200 

201 Returns 

202 ------- 

203 tables : `dict` [`ApdbTables`, `schema_model.Table`] 

204 Mapping of table names to `.schema_model.Table` instances. 

205 version : `VersionTuple` or `None` 

206 Schema version defined in schema file, `None` if version is not 

207 defined. 

208 """ 

209 _LOG.debug("Loading felis schema from %s", schema_file) 

210 felis_schema = felis.datamodel.Schema.from_uri(schema_file, context={"id_generation": True}) 

211 schema = Schema.from_felis(felis_schema) 

212 

213 # convert all dicts into classes 

214 tables: MutableMapping[ApdbTables, Table] = {} 

215 for table in schema.tables: 

216 try: 

217 table_enum = ApdbTables(table.name) 

218 except ValueError: 

219 # There may be other tables in the schema that do not belong 

220 # to APDB. 

221 continue 

222 else: 

223 tables[table_enum] = table 

224 

225 version: VersionTuple | None = None 

226 if schema.version is not None: 

227 version = VersionTuple.fromString(schema.version.current) 

228 

229 _LOG.debug("Loaded schema for tables %s", list(tables)) 

230 return tables, version 

231 

232 @cached_property 

233 def has_mjd_timestamps(self) -> bool: 

234 """True if timestamps columns are in MJD (`bool`).""" 

235 table = self.tableSchemas[ApdbTables.DiaObject] 

236 # Look for validityStartMjdTai or validityStart 

237 for column in table.columns: 

238 if column.name == "validityStartMjdTai": 

239 return True 

240 elif column.name == "validityStart": 

241 return False 

242 raise LookupError( 

243 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema." 

244 )