Coverage for python / lsst / daf / butler / column_spec.py: 69%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:41 +0000

1# This file is part of butler4. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "COLLECTION_NAME_MAX_LENGTH", 

32 "BoolColumnSpec", 

33 "ColumnSpec", 

34 "ColumnType", 

35 "FloatColumnSpec", 

36 "HashColumnSpec", 

37 "IntColumnSpec", 

38 "RegionColumnSpec", 

39 "StringColumnSpec", 

40 "TimespanColumnSpec", 

41 "UUIDColumnSpec", 

42 "make_tuple_type_adapter", 

43) 

44 

45import textwrap 

46import uuid 

47from abc import ABC, abstractmethod 

48from collections.abc import Iterable 

49from typing import ( 

50 TYPE_CHECKING, 

51 Annotated, 

52 Any, 

53 ClassVar, 

54 Literal, 

55 TypeAlias, 

56 Union, 

57 final, 

58) 

59 

60import astropy.time 

61import pyarrow as pa 

62import pydantic 

63 

64from lsst.sphgeom import Region 

65 

66from . import arrow_utils, ddl 

67from ._timespan import Timespan 

68from .pydantic_utils import SerializableBytesHex, SerializableRegion, SerializableTime 

69 

70if TYPE_CHECKING: 

71 from .name_shrinker import NameShrinker 

72 

73ColumnType: TypeAlias = Literal[ 

74 "int", 

75 "string", 

76 "hash", 

77 "float", 

78 "datetime", 

79 "bool", 

80 "uuid", 

81 "timespan", 

82 "region", 

83 # The ingest_date column in the datasets table can be one of two column 

84 # types: 

85 # 1. TIMESTAMP column (which is not used anywhere else in the DB) 

86 # 2. Integer nanoseconds TAI (same as "datetime" column type) 

87 # Which it is depends on the database schema in use for the "datasets" 

88 # manager. (v1 is TIMESTAMP, v2 is integer). See makeStaticTableSpecs in 

89 # lsst.daf.butler.registry.datasets.byDimensions.tables. 

90 # 

91 # We don't know which it is until we go to resolve the query against 

92 # a database, so it has to be its own data type. 

93 "ingest_date", 

94] 

95 

96 

97COLLECTION_NAME_MAX_LENGTH = 64 

98# TODO: DM-42541 would bee a good opportunity to move this constant to a 

99# better home; this file is the least-bad home I can think of for now. Note 

100# that actually changing the value is a (minor) schema change. 

101 

102 

103class ColumnValueSerializer(ABC): 

104 """Class that knows how to serialize and deserialize column values.""" 

105 

106 @abstractmethod 

107 def serialize(self, value: Any) -> Any: 

108 """Convert column value to something that can be serialized. 

109 

110 Parameters 

111 ---------- 

112 value : `typing.Any` 

113 Column value to be serialized. 

114 

115 Returns 

116 ------- 

117 value : `typing.Any` 

118 Column value in serializable format. 

119 """ 

120 raise NotImplementedError 

121 

122 @abstractmethod 

123 def deserialize(self, value: Any) -> Any: 

124 """Convert serialized value to column value. 

125 

126 Parameters 

127 ---------- 

128 value : `typing.Any` 

129 Serialized column value. 

130 

131 Returns 

132 ------- 

133 value : `typing.Any` 

134 Deserialized column value. 

135 """ 

136 raise NotImplementedError 

137 

138 

139class _TypeAdapterColumnValueSerializer(ColumnValueSerializer): 

140 """Implementation of serializer that uses pydantic type adapter.""" 

141 

142 def __init__(self, type_adapter: pydantic.TypeAdapter): 

143 # Docstring inherited. 

144 self._type_adapter = type_adapter 

145 

146 def serialize(self, value: Any) -> Any: 

147 # Docstring inherited. 

148 return value if value is None else self._type_adapter.dump_python(value) 

149 

150 def deserialize(self, value: Any) -> Any: 

151 # Docstring inherited. 

152 return value if value is None else self._type_adapter.validate_python(value) 

153 

154 

155class _BaseColumnSpec(pydantic.BaseModel, ABC): 

156 """Base class for descriptions of table columns.""" 

157 

158 pytype: ClassVar[type] 

159 

160 name: str = pydantic.Field(description="""Name of the column.""") 

161 

162 doc: str = pydantic.Field(default="", description="Documentation for the column.") 

163 

164 type: ColumnType 

165 

166 nullable: bool = pydantic.Field( 

167 default=True, 

168 description="Whether the column may be ``NULL``.", 

169 ) 

170 

171 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec: 

172 """Convert this specification to a SQL-specific one. 

173 

174 Parameters 

175 ---------- 

176 name_shrinker : `NameShrinker`, optional 

177 Object that should be used to shrink the field name to ensure it 

178 fits within database-specific limits. 

179 **kwargs 

180 Forwarded to `ddl.FieldSpec`. 

181 

182 Returns 

183 ------- 

184 sql_spec : `ddl.FieldSpec` 

185 A SQL-specific version of this specification. 

186 """ 

187 name = self.name 

188 if name_shrinker is not None: 

189 name = name_shrinker.shrink(name) 

190 return ddl.FieldSpec(name=name, dtype=ddl.VALID_CONFIG_COLUMN_TYPES[self.type], **kwargs) 

191 

192 @abstractmethod 

193 def to_arrow(self) -> arrow_utils.ToArrow: 

194 """Return an object that converts values of this column to a column in 

195 an Arrow table. 

196 

197 Returns 

198 ------- 

199 converter : `.arrow_utils.ToArrow` 

200 A converter object with schema information in Arrow form. 

201 """ 

202 raise NotImplementedError() 

203 

204 def serializer(self) -> ColumnValueSerializer: 

205 """Return object that converts values of this column to or from 

206 serializable format. 

207 

208 Returns 

209 ------- 

210 serializer : `ColumnValueSerializer` 

211 A converter instance. 

212 """ 

213 return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.annotated_type)) 

214 

215 def display(self, level: int = 0, tab: str = " ") -> list[str]: 

216 """Return a human-reader-focused string description of this column as 

217 a list of lines. 

218 

219 Parameters 

220 ---------- 

221 level : `int` 

222 Number of indentation tabs for the first line. 

223 tab : `str` 

224 Characters to duplicate ``level`` times to form the actual indent. 

225 

226 Returns 

227 ------- 

228 lines : `list` [ `str` ] 

229 Display lines. 

230 """ 

231 lines = [f"{tab * level}{self.name}: {self.type}"] 

232 if self.doc: 

233 indent = tab * (level + 1) 

234 lines.extend( 

235 textwrap.wrap( 

236 self.doc, 

237 initial_indent=indent, 

238 subsequent_indent=indent, 

239 ) 

240 ) 

241 return lines 

242 

243 def __str__(self) -> str: 

244 return "\n".join(self.display()) 

245 

246 @property 

247 def annotated_type(self) -> Any: 

248 """Return a Pydantic-friendly type annotation for this column type. 

249 

250 Since this is a runtime object and most type annotations must be 

251 static, this is really only useful for `pydantic.TypeAdapter` 

252 construction and dynamic `pydantic.create_model` construction. 

253 """ 

254 base = self._get_base_annotated_type() 

255 if self.nullable: 

256 return base | None 

257 return base 

258 

259 @abstractmethod 

260 def _get_base_annotated_type(self) -> Any: 

261 """Return the base annotated type (not taking into account `nullable`) 

262 for this column type. 

263 """ 

264 raise NotImplementedError() 

265 

266 

267def make_tuple_type_adapter( 

268 columns: Iterable[ColumnSpec], 

269) -> pydantic.TypeAdapter[tuple[Any, ...]]: 

270 """Return a `pydantic.TypeAdapter` for a `tuple` with types defined by an 

271 iterable of `ColumnSpec` objects. 

272 

273 Parameters 

274 ---------- 

275 columns : `~collections.abc.Iterable` [ `ColumnSpec` ] 

276 Iterable of column specifications. 

277 

278 Returns 

279 ------- 

280 adapter : `pydantic.TypeAdapter` 

281 A Pydantic type adapter for the `tuple` representation of a row with 

282 the given columns. 

283 """ 

284 # Static type-checkers don't like this runtime use of static-typing 

285 # constructs, but that's how Pydantic works. 

286 return pydantic.TypeAdapter(tuple[*[spec.annotated_type for spec in columns]]) # type: ignore 

287 

288 

289@final 

290class IntColumnSpec(_BaseColumnSpec): 

291 """Description of an integer column.""" 

292 

293 pytype: ClassVar[type] = int 

294 

295 type: Literal["int"] = "int" 

296 

297 def to_arrow(self) -> arrow_utils.ToArrow: 

298 # Docstring inherited. 

299 return arrow_utils.ToArrow.for_primitive(self.name, pa.uint64(), nullable=self.nullable) 

300 

301 def _get_base_annotated_type(self) -> Any: 

302 # Docstring inherited. 

303 return pydantic.StrictInt 

304 

305 

306@final 

307class StringColumnSpec(_BaseColumnSpec): 

308 """Description of a string column.""" 

309 

310 pytype: ClassVar[type] = str 

311 

312 type: Literal["string"] = "string" 

313 

314 length: int 

315 """Maximum length of strings.""" 

316 

317 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec: 

318 # Docstring inherited. 

319 return super().to_sql_spec(length=self.length, name_shrinker=name_shrinker, **kwargs) 

320 

321 def to_arrow(self) -> arrow_utils.ToArrow: 

322 # Docstring inherited. 

323 return arrow_utils.ToArrow.for_primitive(self.name, pa.string(), nullable=self.nullable) 

324 

325 def _get_base_annotated_type(self) -> Any: 

326 # Docstring inherited. 

327 return pydantic.StrictStr 

328 

329 

330@final 

331class HashColumnSpec(_BaseColumnSpec): 

332 """Description of a hash digest.""" 

333 

334 pytype: ClassVar[type] = bytes 

335 

336 type: Literal["hash"] = "hash" 

337 

338 nbytes: int 

339 """Number of bytes for the hash.""" 

340 

341 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec: 

342 # Docstring inherited. 

343 return super().to_sql_spec(nbytes=self.nbytes, name_shrinker=name_shrinker, **kwargs) 

344 

345 def to_arrow(self) -> arrow_utils.ToArrow: 

346 # Docstring inherited. 

347 return arrow_utils.ToArrow.for_primitive( 

348 self.name, 

349 # The size for Arrow binary columns is a fixed size, not a maximum 

350 # as in SQL, so we use a variable-size column. 

351 pa.binary(), 

352 nullable=self.nullable, 

353 ) 

354 

355 def _get_base_annotated_type(self) -> Any: 

356 # Docstring inherited. 

357 return SerializableBytesHex 

358 

359 

360@final 

361class FloatColumnSpec(_BaseColumnSpec): 

362 """Description of a float column.""" 

363 

364 pytype: ClassVar[type] = float 

365 

366 type: Literal["float"] = "float" 

367 

368 def to_arrow(self) -> arrow_utils.ToArrow: 

369 # Docstring inherited. 

370 assert self.nullable is not None, "nullable=None should be resolved by validators" 

371 return arrow_utils.ToArrow.for_primitive(self.name, pa.float64(), nullable=self.nullable) 

372 

373 def _get_base_annotated_type(self) -> Any: 

374 # Docstring inherited. 

375 return pydantic.StrictFloat 

376 

377 

378@final 

379class BoolColumnSpec(_BaseColumnSpec): 

380 """Description of a bool column.""" 

381 

382 pytype: ClassVar[type] = bool 

383 

384 type: Literal["bool"] = "bool" 

385 

386 def to_arrow(self) -> arrow_utils.ToArrow: 

387 # Docstring inherited. 

388 return arrow_utils.ToArrow.for_primitive(self.name, pa.bool_(), nullable=self.nullable) 

389 

390 def _get_base_annotated_type(self) -> Any: 

391 # Docstring inherited. 

392 return pydantic.StrictBool 

393 

394 

395@final 

396class UUIDColumnSpec(_BaseColumnSpec): 

397 """Description of a UUID column.""" 

398 

399 pytype: ClassVar[type] = uuid.UUID 

400 

401 type: Literal["uuid"] = "uuid" 

402 

403 def to_arrow(self) -> arrow_utils.ToArrow: 

404 # Docstring inherited. 

405 assert self.nullable is not None, "nullable=None should be resolved by validators" 

406 return arrow_utils.ToArrow.for_uuid(self.name, nullable=self.nullable) 

407 

408 def _get_base_annotated_type(self) -> Any: 

409 # Docstring inherited. 

410 return uuid.UUID 

411 

412 

413@final 

414class RegionColumnSpec(_BaseColumnSpec): 

415 """Description of a region column.""" 

416 

417 name: str = "region" 

418 

419 pytype: ClassVar[type] = Region 

420 

421 type: Literal["region"] = "region" 

422 

423 nbytes: int = 2048 

424 """Number of bytes for the encoded region.""" 

425 

426 def to_arrow(self) -> arrow_utils.ToArrow: 

427 # Docstring inherited. 

428 assert self.nullable is not None, "nullable=None should be resolved by validators" 

429 return arrow_utils.ToArrow.for_region(self.name, nullable=self.nullable) 

430 

431 def _get_base_annotated_type(self) -> Any: 

432 # Docstring inherited. 

433 return SerializableRegion 

434 

435 

436@final 

437class TimespanColumnSpec(_BaseColumnSpec): 

438 """Description of a timespan column.""" 

439 

440 name: str = "timespan" 

441 

442 pytype: ClassVar[type] = Timespan 

443 

444 type: Literal["timespan"] = "timespan" 

445 

446 def to_arrow(self) -> arrow_utils.ToArrow: 

447 # Docstring inherited. 

448 return arrow_utils.ToArrow.for_timespan(self.name, nullable=self.nullable) 

449 

450 def _get_base_annotated_type(self) -> Any: 

451 # Docstring inherited. 

452 return Timespan 

453 

454 

455@final 

456class DateTimeColumnSpec(_BaseColumnSpec): 

457 """Description of a time column, stored as integer TAI nanoseconds since 

458 1970-01-01 and represented in Python via `astropy.time.Time`. 

459 """ 

460 

461 pytype: ClassVar[type] = astropy.time.Time 

462 

463 type: Literal["datetime"] = "datetime" 

464 

465 def to_arrow(self) -> arrow_utils.ToArrow: 

466 # Docstring inherited. 

467 assert self.nullable is not None, "nullable=None should be resolved by validators" 

468 return arrow_utils.ToArrow.for_datetime(self.name, nullable=self.nullable) 

469 

470 def _get_base_annotated_type(self) -> Any: 

471 # Docstring inherited. 

472 return SerializableTime 

473 

474 

475ColumnSpec = Annotated[ 

476 Union[ 

477 IntColumnSpec, 

478 StringColumnSpec, 

479 HashColumnSpec, 

480 FloatColumnSpec, 

481 BoolColumnSpec, 

482 UUIDColumnSpec, 

483 RegionColumnSpec, 

484 TimespanColumnSpec, 

485 DateTimeColumnSpec, 

486 ], 

487 pydantic.Field(discriminator="type"), 

488]