Coverage for python / lsst / dax / apdb / apdbSchema.py: 35%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:35 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22"""Module containing methods and classes for generic APDB schema operations. 

23 

24The code in this module is independent of the specific technology used to 

25implement APDB. 

26""" 

27 

28from __future__ import annotations 

29 

30__all__ = ["ApdbSchema", "ApdbTables"] 

31 

32import enum 

33import logging 

34from collections.abc import Mapping, MutableMapping 

35from functools import cached_property 

36 

37import felis.datamodel 

38 

39from .schema_model import Schema, Table 

40from .versionTuple import VersionTuple 

41 

42_LOG = logging.getLogger(__name__) 

43 

44 

45@enum.unique 

46class ApdbTables(enum.Enum): 

47 """Names of the tables in APDB schema.""" 

48 

49 DiaObject = "DiaObject" 

50 """Name of the table for DIAObject records.""" 

51 

52 DiaSource = "DiaSource" 

53 """Name of the table for DIASource records.""" 

54 

55 DiaForcedSource = "DiaForcedSource" 

56 """Name of the table for DIAForcedSource records.""" 

57 

58 DiaObjectLast = "DiaObjectLast" 

59 """Name of the table for the last version of DIAObject records. 

60 

61 This table may be optional for some implementations. 

62 """ 

63 

64 SSObject = "SSObject" 

65 """Name of the table for SSObject records.""" 

66 

67 SSSource = "SSSource" 

68 """Name of the table for SSSource records.""" 

69 

70 DiaObject_To_Object_Match = "DiaObject_To_Object_Match" 

71 """Name of the table for DiaObject_To_Object_Match records.""" 

72 

73 metadata = "metadata" 

74 """Name of the metadata table, this table may not always exist.""" 

75 

76 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str: 

77 """Return full table name. 

78 

79 Parameters 

80 ---------- 

81 prefix : `str`, optional 

82 Optional prefix for table name. 

83 time_partition : `int`, optional 

84 Optional time partition, should only be used for tables that 

85 support time partitioning. 

86 """ 

87 name = f"{prefix}{self.value}" 

88 if time_partition is not None: 

89 name = f"{name}_{time_partition}" 

90 return name 

91 

92 

93class ApdbSchema: 

94 """Class for management of APDB schema. 

95 

96 Attributes 

97 ---------- 

98 tableSchemas : `dict` 

99 Maps table name to `TableDef` instance. 

100 

101 Parameters 

102 ---------- 

103 schema_file : `str` 

104 Location of the YAML file with APDB schema. 

105 ss_schema_file : `str` 

106 Location of the YAML file with SSO schema. File will be loaded if APDB 

107 schema file does not contain SSObject/SSSource tables. Can be set to 

108 empty string to skip loading of SSObject/SSSource schema. 

109 """ 

110 

111 def __init__( 

112 self, 

113 schema_file: str, 

114 ss_schema_file: str, 

115 ): 

116 # build complete table schema 

117 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file) 

118 if ss_schema_file: 

119 if ApdbTables.SSObject not in self.tableSchemas or ApdbTables.SSSource not in self.tableSchemas: 

120 # Read additional SSP schema. 

121 ssp_tables, _ = self._buildSchemas(ss_schema_file) 

122 if ApdbTables.SSObject not in ssp_tables or ApdbTables.SSSource not in ssp_tables: 

123 raise LookupError(f"Cannot locate SSObject/SSSource table in {ss_schema_file}") 

124 self.tableSchemas = dict(self.tableSchemas) | { 

125 ApdbTables.SSObject: ssp_tables[ApdbTables.SSObject], 

126 ApdbTables.SSSource: ssp_tables[ApdbTables.SSSource], 

127 } 

128 

129 def schemaVersion(self) -> VersionTuple: 

130 """Return schema version as defined in YAML schema file. 

131 

132 Returns 

133 ------- 

134 version : `VersionTuple` 

135 Version number read from YAML file, if YAML file does not define 

136 schema version then "0.1.0" is returned. 

137 """ 

138 if self._schemaVersion is None: 

139 return VersionTuple(0, 1, 0) 

140 else: 

141 return self._schemaVersion 

142 

143 @classmethod 

144 def _buildSchemas(cls, schema_file: str) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]: 

145 """Create schema definitions for tables from felis schema. 

146 

147 Reads YAML schema and builds a dictionary containing 

148 `.schema_model.Table` instances for each APDB table appearing in that 

149 schema. 

150 

151 Parameters 

152 ---------- 

153 schema_file : `str` 

154 Name of YAML file with ``felis`` schema. 

155 

156 Returns 

157 ------- 

158 tables : `dict` [`ApdbTables`, `schema_model.Table`] 

159 Mapping of table names to `.schema_model.Table` instances. 

160 version : `VersionTuple` or `None` 

161 Schema version defined in schema file, `None` if version is not 

162 defined. 

163 """ 

164 _LOG.debug("Loading felis schema from %s", schema_file) 

165 felis_schema = felis.datamodel.Schema.from_uri(schema_file, context={"id_generation": True}) 

166 schema = Schema.from_felis(felis_schema) 

167 

168 # convert all dicts into classes 

169 tables: MutableMapping[ApdbTables, Table] = {} 

170 for table in schema.tables: 

171 try: 

172 table_enum = ApdbTables(table.name) 

173 except ValueError: 

174 # There may be other tables in the schema that do not belong 

175 # to APDB. 

176 continue 

177 else: 

178 tables[table_enum] = table 

179 

180 version: VersionTuple | None = None 

181 if schema.version is not None: 

182 version = VersionTuple.fromString(schema.version.current) 

183 

184 _LOG.debug("Loaded schema for tables %s", list(tables)) 

185 return tables, version 

186 

187 @cached_property 

188 def has_mjd_timestamps(self) -> bool: 

189 """True if timestamps columns are in MJD (`bool`).""" 

190 table = self.tableSchemas[ApdbTables.DiaObject] 

191 # Look for validityStartMjdTai or validityStart 

192 for column in table.columns: 

193 if column.name == "validityStartMjdTai": 

194 return True 

195 elif column.name == "validityStart": 

196 return False 

197 raise LookupError( 

198 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema." 

199 ) 

200 

201 def timestamp_column_name(self, column: str) -> str: 

202 """Return column name before/after schema migration to MJD TAI. 

203 

204 Parameters 

205 ---------- 

206 column : `str` 

207 Column name before MJD TAI schema migration. 

208 

209 Returns 

210 ------- 

211 column : `str` 

212 Column name in current schema. 

213 """ 

214 if self.has_mjd_timestamps: 

215 if column == "time_processed": 

216 return "timeProcessedMjdTai" 

217 elif column == "time_withdrawn": 

218 return "timeWithdrawnMjdTai" 

219 else: 

220 return f"{column}MjdTai" 

221 else: 

222 return column