Coverage for python / felis / metadata.py: 15%

147 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:14 +0000

1"""Build SQLAlchemy metadata from a Felis schema.""" 

2 

3# This file is part of felis. 

4# 

5# Developed for the LSST Data Management System. 

6# This product includes software developed by the LSST Project 

7# (https://www.lsst.org). 

8# See the COPYRIGHT file at the top-level directory of this distribution 

9# for details of code ownership. 

10# 

11# This program is free software: you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation, either version 3 of the License, or 

14# (at your option) any later version. 

15# 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20# 

21# You should have received a copy of the GNU General Public License 

22# along with this program. If not, see <https://www.gnu.org/licenses/>. 

23 

24from __future__ import annotations 

25 

26import logging 

27from typing import IO, Any, Literal 

28 

29from lsst.utils.iteration import ensure_iterable 

30from sqlalchemy import ( 

31 CheckConstraint, 

32 Column, 

33 Constraint, 

34 ForeignKeyConstraint, 

35 Index, 

36 MetaData, 

37 PrimaryKeyConstraint, 

38 Table, 

39 TextClause, 

40 UniqueConstraint, 

41 text, 

42) 

43from sqlalchemy.dialects import mysql, postgresql 

44from sqlalchemy.types import TypeEngine 

45 

46from . import datamodel 

47from .datamodel import Schema 

48from .db import _sqltypes as sqltypes 

49from .db._variants import make_variant_dict 

50from .db.database_context import is_sqlite_url 

51from .types import FelisType 

52 

53__all__ = ("MetaDataBuilder", "get_datatype_with_variants") 

54 

55logger = logging.getLogger(__name__) 

56 

57 

58def _handle_timestamp_column(column_obj: datamodel.Column, variant_dict: dict[str, TypeEngine[Any]]) -> None: 

59 """Handle columns with the timestamp datatype. 

60 

61 Parameters 

62 ---------- 

63 column_obj 

64 The column object representing the timestamp. 

65 variant_dict 

66 The dictionary of variant overrides for the datatype. 

67 

68 Notes 

69 ----- 

70 This function updates the variant dictionary with the appropriate 

71 timestamp type for the column object but only if the precision is set. 

72 Otherwise, the default timestamp objects defined in the Felis type system 

73 will be used instead. 

74 """ 

75 if column_obj.precision is not None: 

76 args: Any = [False, column_obj.precision] # Turn off timezone. 

77 variant_dict.update({"postgresql": postgresql.TIMESTAMP(*args), "mysql": mysql.DATETIME(*args)}) 

78 

79 

80def get_datatype_with_variants(column_obj: datamodel.Column) -> TypeEngine: 

81 """Use the Felis type system to get a SQLAlchemy datatype with variant 

82 overrides from the information in a Felis column object. 

83 

84 Parameters 

85 ---------- 

86 column_obj 

87 The column object from which to get the datatype. 

88 

89 Returns 

90 ------- 

91 `~sqlalchemy.types.TypeEngine` 

92 The SQLAlchemy datatype object. 

93 

94 Raises 

95 ------ 

96 ValueError 

97 Raised if the column has a sized type but no length or if the datatype 

98 is invalid. 

99 """ 

100 variant_dict = make_variant_dict(column_obj) 

101 felis_type = FelisType.felis_type(column_obj.datatype.value) 

102 datatype_fun = getattr(sqltypes, column_obj.datatype.value, None) 

103 if datatype_fun is None: 

104 raise ValueError(f"Unknown datatype: {column_obj.datatype.value}") 

105 args = [] 

106 if felis_type.is_sized: 

107 # Add length argument for size types. 

108 if not column_obj.length: 

109 raise ValueError(f"Column {column_obj.name} has sized type '{column_obj.datatype}' but no length") 

110 args = [column_obj.length] 

111 if felis_type.is_timestamp: 

112 _handle_timestamp_column(column_obj, variant_dict) 

113 return datatype_fun(*args, **variant_dict) 

114 

115 

116_VALID_SERVER_DEFAULTS = ("CURRENT_TIMESTAMP", "NOW()", "LOCALTIMESTAMP", "NULL") 

117 

118 

119class MetaDataBuilder: 

120 """Build a SQLAlchemy metadata object from a Felis schema. 

121 

122 Parameters 

123 ---------- 

124 schema 

125 The schema object from which to build the SQLAlchemy metadata. 

126 apply_schema_to_metadata 

127 Whether to apply the schema name to the metadata object. 

128 ignore_constraints 

129 Whether to ignore constraints when building the metadata. 

130 table_name_postfix 

131 A string to append to the table names when building the metadata. 

132 skip_indexes 

133 Skip indexes when building the metadata. 

134 """ 

135 

136 def __init__( 

137 self, 

138 schema: Schema, 

139 apply_schema_to_metadata: bool = True, 

140 ignore_constraints: bool = False, 

141 table_name_postfix: str = "", 

142 skip_indexes: bool = False, 

143 ) -> None: 

144 """Initialize the metadata builder.""" 

145 self.schema = schema 

146 if not apply_schema_to_metadata: 

147 logger.debug("Schema name will not be applied to metadata") 

148 self.metadata = MetaData(schema=schema.name if apply_schema_to_metadata else None) 

149 self._objects: dict[str, Any] = {} 

150 self.ignore_constraints = ignore_constraints 

151 self.table_name_postfix = table_name_postfix 

152 self.skip_indexes = skip_indexes 

153 

154 def build(self) -> MetaData: 

155 """Build the SQLAlchemy tables and constraints from the schema. 

156 

157 Notes 

158 ----- 

159 This first builds the tables and then makes a second pass to build the 

160 constraints. This is necessary because the constraints may reference 

161 objects that are not yet created when the tables are built. 

162 

163 Returns 

164 ------- 

165 `~sqlalchemy.sql.schema.MetaData` 

166 The SQLAlchemy metadata object. 

167 """ 

168 self.build_tables() 

169 if not self.skip_indexes: 

170 self.build_indexes() 

171 else: 

172 logger.warning("Ignoring indexes") 

173 if not self.ignore_constraints: 

174 self.build_constraints() 

175 else: 

176 logger.warning("Ignoring constraints") 

177 return self.metadata 

178 

179 def build_tables(self) -> None: 

180 """Build the SQLAlchemy tables from the schema.""" 

181 for table in self.schema.tables: 

182 self.build_table(table) 

183 if table.primary_key: 

184 primary_key = self.build_primary_key(table.primary_key) 

185 self._objects[table.id].append_constraint(primary_key) 

186 

187 def build_primary_key(self, primary_key_columns: str | list[str]) -> PrimaryKeyConstraint: 

188 """Build a SQAlchemy ``PrimaryKeyConstraint`` from a single column ID 

189 or a list of them. 

190 

191 Parameters 

192 ---------- 

193 primary_key_columns 

194 The column ID or list of column IDs from which to build the primary 

195 key. 

196 

197 Returns 

198 ------- 

199 `~sqlalchemy.sql.schema.PrimaryKeyConstraint` 

200 The SQLAlchemy primary key constraint object. 

201 

202 Notes 

203 ----- 

204 The ``primary_key_columns`` is a string or a list of strings 

205 representing IDs which will be used to find the columnn objects in the 

206 builder's internal ID map. 

207 """ 

208 return PrimaryKeyConstraint( 

209 *[self._objects[column_id] for column_id in ensure_iterable(primary_key_columns)] 

210 ) 

211 

212 def build_table(self, table_obj: datamodel.Table) -> None: 

213 """Build a SQLAlchemy ``Table`` from a Felis table and add it to the 

214 metadata. 

215 

216 Parameters 

217 ---------- 

218 table_obj 

219 The Felis table object from which to build the SQLAlchemy table. 

220 

221 Notes 

222 ----- 

223 Several MySQL table options, including the engine and charset, are 

224 handled by adding annotations to the table. This is not needed for 

225 Postgres, as Felis does not support any table options for this dialect. 

226 """ 

227 # Process mysql table options. 

228 optargs = {} 

229 if table_obj.mysql_engine: 

230 optargs["mysql_engine"] = table_obj.mysql_engine 

231 if table_obj.mysql_charset: 

232 optargs["mysql_charset"] = table_obj.mysql_charset 

233 

234 # Create the SQLAlchemy table object and its columns. 

235 name = table_obj.name 

236 id = table_obj.id 

237 description = table_obj.description 

238 columns = [self.build_column(column) for column in table_obj.columns] 

239 table = Table( 

240 name + self.table_name_postfix, 

241 self.metadata, 

242 *columns, 

243 comment=description, 

244 **optargs, # type: ignore[arg-type] 

245 ) 

246 

247 self._objects[id] = table 

248 

249 def build_column(self, column_obj: datamodel.Column) -> Column: 

250 """Build a SQLAlchemy ``Column`` from a Felis column object. 

251 

252 Parameters 

253 ---------- 

254 column_obj 

255 The column object from which to build the SQLAlchemy column. 

256 

257 Returns 

258 ------- 

259 `~sqlalchemy.sql.schema.Column` 

260 The SQLAlchemy column object. 

261 """ 

262 # Get basic column attributes. 

263 name = column_obj.name 

264 id = column_obj.id 

265 description = column_obj.description 

266 value = column_obj.value 

267 nullable = column_obj.nullable 

268 

269 # Get datatype, handling variant overrides such as "mysql:datatype". 

270 datatype = get_datatype_with_variants(column_obj) 

271 

272 # Set autoincrement, depending on if it was provided explicitly. 

273 autoincrement: Literal["auto"] | bool = ( 

274 column_obj.autoincrement if column_obj.autoincrement is not None else "auto" 

275 ) 

276 

277 server_default: str | TextClause | None = None 

278 if value is not None: 

279 server_default = str(value) 

280 if server_default in _VALID_SERVER_DEFAULTS or not isinstance(value, str): 

281 # If the server default is a valid keyword or not a string, 

282 # use it as is. 

283 server_default = text(server_default) 

284 

285 if server_default is not None: 

286 logger.debug(f"Column '{id}' has default value: {server_default}") 

287 

288 column: Column = Column( 

289 name, 

290 datatype, 

291 comment=description, 

292 autoincrement=autoincrement, 

293 nullable=nullable, 

294 server_default=server_default, 

295 ) 

296 

297 self._objects[id] = column 

298 

299 return column 

300 

301 def build_constraints(self) -> None: 

302 """Build the SQLAlchemy constraints from the Felis schema and append 

303 them to the associated table in the metadata. 

304 

305 Notes 

306 ----- 

307 This is performed as a separate step after building the tables so that 

308 all the referenced objects in the constraints will be present and can 

309 be looked up by their ID. 

310 """ 

311 for table_obj in self.schema.tables: 

312 table = self._objects[table_obj.id] 

313 for constraint_obj in table_obj.constraints: 

314 constraint = self.build_constraint(constraint_obj) 

315 table.append_constraint(constraint) 

316 

317 def build_constraint(self, constraint_obj: datamodel.Constraint) -> Constraint: 

318 """Build a SQLAlchemy ``Constraint`` from a Felis constraint. 

319 

320 Parameters 

321 ---------- 

322 constraint_obj 

323 The Felis object from which to build the constraint. 

324 

325 Returns 

326 ------- 

327 `~sqlalchemy.sql.schema.Constraint` 

328 The SQLAlchemy constraint object. 

329 

330 Raises 

331 ------ 

332 ValueError 

333 If the constraint type is not recognized. 

334 TypeError 

335 If the constraint object is not the expected type. 

336 """ 

337 args: dict[str, Any] = { 

338 "name": constraint_obj.name or None, 

339 "comment": constraint_obj.description or None, 

340 "deferrable": constraint_obj.deferrable or None, 

341 "initially": constraint_obj.initially or None, 

342 } 

343 

344 constraint: Constraint 

345 

346 if isinstance(constraint_obj, datamodel.ForeignKeyConstraint): 

347 fk_obj: datamodel.ForeignKeyConstraint = constraint_obj 

348 columns = [self._objects[column_id] for column_id in fk_obj.columns] 

349 refcolumns = [self._objects[column_id] for column_id in fk_obj.referenced_columns] 

350 if constraint_obj.on_delete is not None: 

351 args["ondelete"] = constraint_obj.on_delete 

352 if constraint_obj.on_update is not None: 

353 args["onupdate"] = constraint_obj.on_update 

354 constraint = ForeignKeyConstraint(columns, refcolumns, **args) 

355 elif isinstance(constraint_obj, datamodel.CheckConstraint): 

356 check_obj: datamodel.CheckConstraint = constraint_obj 

357 expression = check_obj.expression 

358 constraint = CheckConstraint(expression, **args) 

359 elif isinstance(constraint_obj, datamodel.UniqueConstraint): 

360 uniq_obj: datamodel.UniqueConstraint = constraint_obj 

361 columns = [self._objects[column_id] for column_id in uniq_obj.columns] 

362 constraint = UniqueConstraint(*columns, **args) 

363 else: 

364 raise ValueError(f"Unknown constraint type: {type(constraint_obj)}") 

365 

366 self._objects[constraint_obj.id] = constraint 

367 

368 return constraint 

369 

370 def build_index(self, index_obj: datamodel.Index) -> Index: 

371 """Build a SQLAlchemy ``Index`` from a Felis `~felis.datamodel.Index`. 

372 

373 Parameters 

374 ---------- 

375 index_obj 

376 The Felis object from which to build the SQLAlchemy index. 

377 

378 Returns 

379 ------- 

380 `~sqlalchemy.sql.schema.Index` 

381 The SQLAlchemy index object. 

382 """ 

383 columns = [self._objects[c_id] for c_id in (index_obj.columns if index_obj.columns else [])] 

384 expressions = index_obj.expressions if index_obj.expressions else [] 

385 index = Index(index_obj.name, *columns, *expressions) 

386 self._objects[index_obj.id] = index 

387 return index 

388 

389 def build_indexes(self) -> None: 

390 """Build the SQLAlchemy indexes from the Felis schema and add them to 

391 the associated table in the metadata. 

392 """ 

393 for table in self.schema.tables: 

394 md_table = self._objects.get(table.id, None) 

395 if md_table is None: 

396 raise KeyError(f"Table with ID '{table.id}' not found in objects map") 

397 if not isinstance(md_table, Table): 

398 raise TypeError(f"Expected Table object, got {type(md_table)}") 

399 indexes = [self.build_index(index) for index in table.indexes] 

400 for index in indexes: 

401 index._set_parent(md_table) 

402 md_table.indexes.add(index) 

403 

404 

405def create_metadata( 

406 felis_file: IO[str], 

407 schema_name: str | None = None, 

408 id_generation: bool = True, 

409 ignore_constraints: bool = False, 

410 skip_indexes: bool = False, 

411 engine_url: str | None = None, 

412) -> MetaData: 

413 """Create SQLAlchemy metadata from a Felis schema file. 

414 

415 Parameters 

416 ---------- 

417 felis_file 

418 The Felis schema file to read. 

419 schema_name 

420 Optional schema name to override the one in the file. 

421 id_generation 

422 Whether to generate IDs for all objects in the schema that do not have 

423 them. 

424 ignore_constraints 

425 Whether to ignore constraints when building metadata. 

426 skip_indexes 

427 Whether to skip creating indexes when building metadata. 

428 engine_url 

429 Engine URL to determine if SQLite-specific handling is needed. 

430 

431 Returns 

432 ------- 

433 MetaData 

434 The SQLAlchemy metadata object with proper schema handling. 

435 """ 

436 schema = Schema.from_stream(felis_file, context={"id_generation": id_generation}) 

437 if schema_name: 

438 logger.info(f"Overriding schema name with: {schema_name}") 

439 schema.name = schema_name 

440 

441 # Determine if we need SQLite-specific handling 

442 apply_schema = True 

443 if engine_url: 

444 if is_sqlite_url(engine_url): 

445 apply_schema = False 

446 logger.debug("SQLite detected: schema name will not be applied to metadata") 

447 

448 return MetaDataBuilder( 

449 schema, 

450 ignore_constraints=ignore_constraints, 

451 skip_indexes=skip_indexes, 

452 apply_schema_to_metadata=apply_schema, 

453 ).build()