Coverage for python / lsst / dax / apdb / schema_model.py: 66%

179 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:58 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = [ 

25 "CheckConstraint", 

26 "Column", 

27 "Constraint", 

28 "ExtraDataTypes", 

29 "ForeignKeyConstraint", 

30 "Index", 

31 "Schema", 

32 "Table", 

33 "UniqueConstraint", 

34] 

35 

36import dataclasses 

37from collections.abc import Iterable, Mapping, MutableMapping 

38from enum import Enum 

39from typing import Any 

40 

41import felis.datamodel 

42 

43_Mapping = Mapping[str, Any] 

44 

45 

46class ExtraDataTypes(Enum): 

47 """Additional column data types that we need in dax_apdb.""" 

48 

49 UUID = "uuid" 

50 

51 

52DataTypes = felis.datamodel.DataType | ExtraDataTypes 

53 

54 

55def _strip_keys(map: _Mapping, keys: Iterable[str]) -> _Mapping: 

56 """Return a copy of a dictionary with some keys removed.""" 

57 keys = set(keys) 

58 return {key: value for key, value in map.items() if key not in keys} 

59 

60 

61def _make_iterable(obj: str | Iterable[str]) -> Iterable[str]: 

62 """Make an iterable out of string or list of strings.""" 

63 if isinstance(obj, str): 

64 yield obj 

65 else: 

66 yield from obj 

67 

68 

69_data_type_size: Mapping[DataTypes, int] = { 

70 felis.datamodel.DataType.boolean: 1, 

71 felis.datamodel.DataType.byte: 1, 

72 felis.datamodel.DataType.short: 2, 

73 felis.datamodel.DataType.int: 4, 

74 felis.datamodel.DataType.long: 8, 

75 felis.datamodel.DataType.float: 4, 

76 felis.datamodel.DataType.double: 8, 

77 felis.datamodel.DataType.char: 1, 

78 felis.datamodel.DataType.string: 2, # approximation, depends on character set 

79 felis.datamodel.DataType.unicode: 2, # approximation, depends on character set 

80 felis.datamodel.DataType.text: 2, # approximation, depends on character set 

81 felis.datamodel.DataType.binary: 1, 

82 felis.datamodel.DataType.timestamp: 8, # May be different depending on backend 

83 ExtraDataTypes.UUID: 16, 

84} 

85 

86 

87# The first entry in the returned mapping is for nullable columns, 

88# the second entry is for non-nullable columns. 

89_dtype_map: Mapping[felis.datamodel.DataType, tuple[str, str]] = { 

90 felis.datamodel.DataType.double: ("float64", "float64"), 

91 felis.datamodel.DataType.float: ("float32", "float32"), 

92 felis.datamodel.DataType.timestamp: ("datetime64[ms]", "datetime64[ms]"), 

93 felis.datamodel.DataType.long: ("Int64", "int64"), 

94 felis.datamodel.DataType.int: ("Int32", "int32"), 

95 felis.datamodel.DataType.short: ("Int16", "int16"), 

96 felis.datamodel.DataType.byte: ("Int8", "int8"), 

97 felis.datamodel.DataType.binary: ("object", "object"), 

98 felis.datamodel.DataType.char: ("object", "object"), 

99 felis.datamodel.DataType.text: ("object", "object"), 

100 felis.datamodel.DataType.string: ("object", "object"), 

101 felis.datamodel.DataType.unicode: ("object", "object"), 

102 felis.datamodel.DataType.boolean: ("boolean", "bool"), 

103} 

104 

105 

106@dataclasses.dataclass 

107class Column: 

108 """Column representation in schema.""" 

109 

110 name: str 

111 """Column name.""" 

112 

113 id: str 

114 """Felis ID for this column.""" 

115 

116 datatype: DataTypes 

117 """Column type, one of the enums defined in DataType.""" 

118 

119 length: int | None = None 

120 """Optional length for string/binary columns""" 

121 

122 nullable: bool = True 

123 """True for nullable columns.""" 

124 

125 value: Any = None 

126 """Default value for column, can be `None`.""" 

127 

128 autoincrement: bool | None = None 

129 """Unspecified value results in `None`.""" 

130 

131 description: str | None = None 

132 """Column description.""" 

133 

134 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict) 

135 """Additional annotations for this column.""" 

136 

137 table: Table | None = None 

138 """Table which defines this column, usually not `None`.""" 

139 

140 @classmethod 

141 def from_felis(cls, dm_column: felis.datamodel.Column) -> Column: 

142 """Convert Felis column definition into instance of this class. 

143 

144 Parameters 

145 ---------- 

146 dm_column : `felis.datamodel.Column` 

147 Felis column definition. 

148 

149 Returns 

150 ------- 

151 column : `Column` 

152 Converted column definition. 

153 """ 

154 column = cls( 

155 name=dm_column.name, 

156 id=dm_column.id, 

157 datatype=dm_column.datatype, 

158 length=dm_column.length, 

159 value=dm_column.value, 

160 description=dm_column.description, 

161 nullable=dm_column.nullable if dm_column.nullable is not None else True, 

162 autoincrement=dm_column.autoincrement, 

163 annotations=_strip_keys( 

164 dict(dm_column), 

165 ["name", "id", "datatype", "length", "nullable", "value", "autoincrement", "description"], 

166 ), 

167 ) 

168 return column 

169 

170 def clone(self) -> Column: 

171 """Make a clone of self.""" 

172 return dataclasses.replace(self, table=None) 

173 

174 def size(self) -> int: 

175 """Return size in bytes of this column. 

176 

177 Returns 

178 ------- 

179 size : `int` 

180 Size in bytes for this column, typically represents in-memory size 

181 of the corresponding data type. May or may not be the same as 

182 storage size or wire-level protocol size. 

183 """ 

184 size = _data_type_size[self.datatype] 

185 if self.length is not None: 

186 size *= self.length 

187 return size 

188 

189 @property 

190 def pandas_type(self) -> str: 

191 """Type of this column in pandas.DataFrame (`str`).""" 

192 # We do not convert UUID columns to pandas. 

193 assert isinstance(self.datatype, felis.datamodel.DataType) 

194 # TODO: We have cases of NULLs in existing data for non-nullable 

195 # columns (in Cassandra). To avoid errors for such cases we allow all 

196 # types to be nullable. We should revisit this at some later time. 

197 return _dtype_map[self.datatype][0] 

198 

199 

200@dataclasses.dataclass 

201class Index: 

202 """Index representation.""" 

203 

204 name: str 

205 """index name, can be empty.""" 

206 

207 id: str 

208 """Felis ID for this index.""" 

209 

210 columns: list[Column] = dataclasses.field(default_factory=list) 

211 """List of columns in index, one of the ``columns`` or ``expressions`` 

212 must be non-empty. 

213 """ 

214 

215 expressions: list[str] = dataclasses.field(default_factory=list) 

216 """List of expressions in index, one of the ``columns`` or ``expressions`` 

217 must be non-empty. 

218 """ 

219 

220 description: str | None = None 

221 """Index description.""" 

222 

223 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict) 

224 """Additional annotations for this index.""" 

225 

226 @classmethod 

227 def from_felis(cls, dm_index: felis.datamodel.Index, columns: Mapping[str, Column]) -> Index: 

228 """Convert Felis index definition into instance of this class. 

229 

230 Parameters 

231 ---------- 

232 dm_index : `felis.datamodel.Index` 

233 Felis index definition. 

234 columns : `~collections.abc.Mapping` [`str`, `Column`] 

235 Mapping of column ID to `Column` instance. 

236 

237 Returns 

238 ------- 

239 index : `Index` 

240 Converted index definition. 

241 """ 

242 return cls( 

243 name=dm_index.name, 

244 id=dm_index.id, 

245 columns=[columns[c] for c in (dm_index.columns or [])], 

246 expressions=dm_index.expressions or [], 

247 description=dm_index.description, 

248 annotations=_strip_keys(dict(dm_index), ["name", "id", "columns", "expressions", "description"]), 

249 ) 

250 

251 

252@dataclasses.dataclass 

253class Constraint: 

254 """Constraint description, this is a base class, actual constraints will be 

255 instances of one of the subclasses. 

256 """ 

257 

258 name: str | None 

259 """Constraint name.""" 

260 

261 id: str 

262 """Felis ID for this constraint.""" 

263 

264 deferrable: bool = False 

265 """If `True` then this constraint will be declared as deferrable.""" 

266 

267 initially: str | None = None 

268 """Value for ``INITIALLY`` clause, only used of ``deferrable`` is True.""" 

269 

270 description: str | None = None 

271 """Constraint description.""" 

272 

273 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict) 

274 """Additional annotations for this constraint.""" 

275 

276 @classmethod 

277 def from_felis(cls, dm_constr: felis.datamodel.Constraint, columns: Mapping[str, Column]) -> Constraint: 

278 """Convert Felis constraint definition into instance of this class. 

279 

280 Parameters 

281 ---------- 

282 dm_const : `felis.datamodel.Constraint` 

283 Felis constraint definition. 

284 columns : `~collections.abc.Mapping` [`str`, `Column`] 

285 Mapping of column ID to `Column` instance. 

286 

287 Returns 

288 ------- 

289 constraint : `Constraint` 

290 Converted constraint definition. 

291 """ 

292 if isinstance(dm_constr, felis.datamodel.UniqueConstraint): 

293 return UniqueConstraint( 

294 name=dm_constr.name, 

295 id=dm_constr.id, 

296 columns=[columns[c] for c in dm_constr.columns], 

297 deferrable=dm_constr.deferrable, 

298 initially=dm_constr.initially, 

299 description=dm_constr.description, 

300 annotations=_strip_keys( 

301 dict(dm_constr), 

302 ["name", "type", "id", "columns", "deferrable", "initially", "description"], 

303 ), 

304 ) 

305 elif isinstance(dm_constr, felis.datamodel.ForeignKeyConstraint): 

306 return ForeignKeyConstraint( 

307 name=dm_constr.name, 

308 id=dm_constr.id, 

309 columns=[columns[c] for c in dm_constr.columns], 

310 referenced_columns=[columns[c] for c in dm_constr.referenced_columns], 

311 deferrable=dm_constr.deferrable, 

312 initially=dm_constr.initially, 

313 description=dm_constr.description, 

314 annotations=_strip_keys( 

315 dict(dm_constr), 

316 [ 

317 "name", 

318 "id", 

319 "type", 

320 "columns", 

321 "deferrable", 

322 "initially", 

323 "referenced_columns", 

324 "description", 

325 ], 

326 ), 

327 ) 

328 elif isinstance(dm_constr, felis.datamodel.CheckConstraint): 

329 return CheckConstraint( 

330 name=dm_constr.name, 

331 id=dm_constr.id, 

332 expression=dm_constr.expression, 

333 deferrable=dm_constr.deferrable, 

334 initially=dm_constr.initially, 

335 description=dm_constr.description, 

336 annotations=_strip_keys( 

337 dict(dm_constr), 

338 ["name", "id", "type", "expression", "deferrable", "initially", "description"], 

339 ), 

340 ) 

341 else: 

342 raise TypeError(f"Unexpected constraint type: {dm_constr}") 

343 

344 

345@dataclasses.dataclass 

346class UniqueConstraint(Constraint): 

347 """Description of unique constraint.""" 

348 

349 columns: list[Column] = dataclasses.field(default_factory=list) 

350 """List of columns in this constraint, all columns belong to the same table 

351 as the constraint itself. 

352 """ 

353 

354 

355@dataclasses.dataclass 

356class ForeignKeyConstraint(Constraint): 

357 """Description of foreign key constraint.""" 

358 

359 columns: list[Column] = dataclasses.field(default_factory=list) 

360 """List of columns in this constraint, all columns belong to the same table 

361 as the constraint itself. 

362 """ 

363 

364 referenced_columns: list[Column] = dataclasses.field(default_factory=list) 

365 """List of referenced columns, the number of columns must be the same as in 

366 ``Constraint.columns`` list. All columns must belong to the same table, 

367 which is different from the table of this constraint. 

368 """ 

369 

370 onupdate: str | None = None 

371 """What to do when parent table columns are updated. Typical values are 

372 CASCADE, DELETE and RESTRICT. 

373 """ 

374 

375 ondelete: str | None = None 

376 """What to do when parent table columns are deleted. Typical values are 

377 CASCADE, DELETE and RESTRICT. 

378 """ 

379 

380 @property 

381 def referenced_table(self) -> Table: 

382 """Table referenced by this constraint.""" 

383 assert len(self.referenced_columns) > 0, "column list cannot be empty" 

384 ref_table = self.referenced_columns[0].table 

385 assert ref_table is not None, "foreign key column must have table defined" 

386 return ref_table 

387 

388 

389@dataclasses.dataclass 

390class CheckConstraint(Constraint): 

391 """Description of check constraint.""" 

392 

393 expression: str = "" 

394 """Expression on one or more columns on the table, must be non-empty.""" 

395 

396 

397@dataclasses.dataclass 

398class Table: 

399 """Description of a single table schema.""" 

400 

401 name: str 

402 """Table name.""" 

403 

404 id: str 

405 """Felis ID for this table.""" 

406 

407 columns: list[Column] 

408 """List of Column instances.""" 

409 

410 primary_key: list[Column] 

411 """List of Column that constitute a primary key, may be empty.""" 

412 

413 constraints: list[Constraint] 

414 """List of Constraint instances, can be empty.""" 

415 

416 indexes: list[Index] 

417 """List of Index instances, can be empty.""" 

418 

419 description: str | None = None 

420 """Table description.""" 

421 

422 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict) 

423 """Additional annotations for this table.""" 

424 

425 def __post_init__(self) -> None: 

426 """Update all columns to point to this table.""" 

427 for column in self.columns: 

428 column.table = self 

429 

430 @classmethod 

431 def from_felis(cls, dm_table: felis.datamodel.Table, columns: Mapping[str, Column]) -> Table: 

432 """Convert Felis table definition into instance of this class. 

433 

434 Parameters 

435 ---------- 

436 dm_table : `felis.datamodel.Table` 

437 Felis table definition. 

438 columns : `~collections.abc.Mapping` [`str`, `Column`] 

439 Mapping of column ID to `Column` instance. 

440 

441 Returns 

442 ------- 

443 table : `Table` 

444 Converted table definition. 

445 """ 

446 table_columns = [columns[c.id] for c in dm_table.columns] 

447 if dm_table.primary_key: 

448 pk_columns = [columns[c] for c in _make_iterable(dm_table.primary_key)] 

449 else: 

450 pk_columns = [] 

451 constraints = [Constraint.from_felis(constr, columns) for constr in dm_table.constraints] 

452 indices = [Index.from_felis(dm_idx, columns) for dm_idx in dm_table.indexes] 

453 table = cls( 

454 name=dm_table.name, 

455 id=dm_table.id, 

456 columns=table_columns, 

457 primary_key=pk_columns, 

458 constraints=constraints, 

459 indexes=indices, 

460 description=dm_table.description, 

461 annotations=_strip_keys( 

462 dict(dm_table), 

463 ["name", "id", "columns", "primaryKey", "constraints", "indexes", "description"], 

464 ), 

465 ) 

466 return table 

467 

468 

469@dataclasses.dataclass 

470class Schema: 

471 """Complete schema description, collection of tables.""" 

472 

473 name: str 

474 """Schema name.""" 

475 

476 id: str 

477 """Felis ID for this schema.""" 

478 

479 tables: list[Table] 

480 """Collection of table definitions.""" 

481 

482 version: felis.datamodel.SchemaVersion | None = None 

483 """Schema version description.""" 

484 

485 description: str | None = None 

486 """Schema description.""" 

487 

488 annotations: Mapping[str, Any] = dataclasses.field(default_factory=dict) 

489 """Additional annotations for this table.""" 

490 

491 @classmethod 

492 def from_felis(cls, dm_schema: felis.datamodel.Schema) -> Schema: 

493 """Convert felis schema definition to instance of this class. 

494 

495 Parameters 

496 ---------- 

497 dm_schema : `felis.datamodel.Schema` 

498 Felis schema definition. 

499 

500 Returns 

501 ------- 

502 schema : `Schema` 

503 Converted schema definition. 

504 """ 

505 # Convert all columns first. 

506 columns: MutableMapping[str, Column] = {} 

507 for dm_table in dm_schema.tables: 

508 for dm_column in dm_table.columns: 

509 column = Column.from_felis(dm_column) 

510 columns[column.id] = column 

511 

512 tables = [Table.from_felis(dm_table, columns) for dm_table in dm_schema.tables] 

513 

514 version: felis.datamodel.SchemaVersion | None 

515 if isinstance(dm_schema.version, str): 

516 version = felis.datamodel.SchemaVersion(current=dm_schema.version) 

517 else: 

518 version = dm_schema.version 

519 

520 schema = cls( 

521 name=dm_schema.name, 

522 id=dm_schema.id, 

523 tables=tables, 

524 version=version, 

525 description=dm_schema.description, 

526 annotations=_strip_keys(dict(dm_schema), ["name", "id", "tables", "description"]), 

527 ) 

528 return schema