Coverage for python / lsst / daf / butler / column_spec.py: 69%
150 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of butler4.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "COLLECTION_NAME_MAX_LENGTH",
32 "BoolColumnSpec",
33 "ColumnSpec",
34 "ColumnType",
35 "FloatColumnSpec",
36 "HashColumnSpec",
37 "IntColumnSpec",
38 "RegionColumnSpec",
39 "StringColumnSpec",
40 "TimespanColumnSpec",
41 "UUIDColumnSpec",
42 "make_tuple_type_adapter",
43)
45import textwrap
46import uuid
47from abc import ABC, abstractmethod
48from collections.abc import Iterable
49from typing import (
50 TYPE_CHECKING,
51 Annotated,
52 Any,
53 ClassVar,
54 Literal,
55 TypeAlias,
56 Union,
57 final,
58)
60import astropy.time
61import pyarrow as pa
62import pydantic
64from lsst.sphgeom import Region
66from . import arrow_utils, ddl
67from ._timespan import Timespan
68from .pydantic_utils import SerializableBytesHex, SerializableRegion, SerializableTime
70if TYPE_CHECKING:
71 from .name_shrinker import NameShrinker
73ColumnType: TypeAlias = Literal[
74 "int",
75 "string",
76 "hash",
77 "float",
78 "datetime",
79 "bool",
80 "uuid",
81 "timespan",
82 "region",
83 # The ingest_date column in the datasets table can be one of two column
84 # types:
85 # 1. TIMESTAMP column (which is not used anywhere else in the DB)
86 # 2. Integer nanoseconds TAI (same as "datetime" column type)
87 # Which it is depends on the database schema in use for the "datasets"
88 # manager. (v1 is TIMESTAMP, v2 is integer). See makeStaticTableSpecs in
89 # lsst.daf.butler.registry.datasets.byDimensions.tables.
90 #
91 # We don't know which it is until we go to resolve the query against
92 # a database, so it has to be its own data type.
93 "ingest_date",
94]
97COLLECTION_NAME_MAX_LENGTH = 64
98# TODO: DM-42541 would bee a good opportunity to move this constant to a
99# better home; this file is the least-bad home I can think of for now. Note
100# that actually changing the value is a (minor) schema change.
103class ColumnValueSerializer(ABC):
104 """Class that knows how to serialize and deserialize column values."""
106 @abstractmethod
107 def serialize(self, value: Any) -> Any:
108 """Convert column value to something that can be serialized.
110 Parameters
111 ----------
112 value : `typing.Any`
113 Column value to be serialized.
115 Returns
116 -------
117 value : `typing.Any`
118 Column value in serializable format.
119 """
120 raise NotImplementedError
122 @abstractmethod
123 def deserialize(self, value: Any) -> Any:
124 """Convert serialized value to column value.
126 Parameters
127 ----------
128 value : `typing.Any`
129 Serialized column value.
131 Returns
132 -------
133 value : `typing.Any`
134 Deserialized column value.
135 """
136 raise NotImplementedError
139class _TypeAdapterColumnValueSerializer(ColumnValueSerializer):
140 """Implementation of serializer that uses pydantic type adapter."""
142 def __init__(self, type_adapter: pydantic.TypeAdapter):
143 # Docstring inherited.
144 self._type_adapter = type_adapter
146 def serialize(self, value: Any) -> Any:
147 # Docstring inherited.
148 return value if value is None else self._type_adapter.dump_python(value)
150 def deserialize(self, value: Any) -> Any:
151 # Docstring inherited.
152 return value if value is None else self._type_adapter.validate_python(value)
155class _BaseColumnSpec(pydantic.BaseModel, ABC):
156 """Base class for descriptions of table columns."""
158 pytype: ClassVar[type]
160 name: str = pydantic.Field(description="""Name of the column.""")
162 doc: str = pydantic.Field(default="", description="Documentation for the column.")
164 type: ColumnType
166 nullable: bool = pydantic.Field(
167 default=True,
168 description="Whether the column may be ``NULL``.",
169 )
171 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec:
172 """Convert this specification to a SQL-specific one.
174 Parameters
175 ----------
176 name_shrinker : `NameShrinker`, optional
177 Object that should be used to shrink the field name to ensure it
178 fits within database-specific limits.
179 **kwargs
180 Forwarded to `ddl.FieldSpec`.
182 Returns
183 -------
184 sql_spec : `ddl.FieldSpec`
185 A SQL-specific version of this specification.
186 """
187 name = self.name
188 if name_shrinker is not None:
189 name = name_shrinker.shrink(name)
190 return ddl.FieldSpec(name=name, dtype=ddl.VALID_CONFIG_COLUMN_TYPES[self.type], **kwargs)
192 @abstractmethod
193 def to_arrow(self) -> arrow_utils.ToArrow:
194 """Return an object that converts values of this column to a column in
195 an Arrow table.
197 Returns
198 -------
199 converter : `.arrow_utils.ToArrow`
200 A converter object with schema information in Arrow form.
201 """
202 raise NotImplementedError()
204 def serializer(self) -> ColumnValueSerializer:
205 """Return object that converts values of this column to or from
206 serializable format.
208 Returns
209 -------
210 serializer : `ColumnValueSerializer`
211 A converter instance.
212 """
213 return _TypeAdapterColumnValueSerializer(pydantic.TypeAdapter(self.annotated_type))
215 def display(self, level: int = 0, tab: str = " ") -> list[str]:
216 """Return a human-reader-focused string description of this column as
217 a list of lines.
219 Parameters
220 ----------
221 level : `int`
222 Number of indentation tabs for the first line.
223 tab : `str`
224 Characters to duplicate ``level`` times to form the actual indent.
226 Returns
227 -------
228 lines : `list` [ `str` ]
229 Display lines.
230 """
231 lines = [f"{tab * level}{self.name}: {self.type}"]
232 if self.doc:
233 indent = tab * (level + 1)
234 lines.extend(
235 textwrap.wrap(
236 self.doc,
237 initial_indent=indent,
238 subsequent_indent=indent,
239 )
240 )
241 return lines
243 def __str__(self) -> str:
244 return "\n".join(self.display())
246 @property
247 def annotated_type(self) -> Any:
248 """Return a Pydantic-friendly type annotation for this column type.
250 Since this is a runtime object and most type annotations must be
251 static, this is really only useful for `pydantic.TypeAdapter`
252 construction and dynamic `pydantic.create_model` construction.
253 """
254 base = self._get_base_annotated_type()
255 if self.nullable:
256 return base | None
257 return base
259 @abstractmethod
260 def _get_base_annotated_type(self) -> Any:
261 """Return the base annotated type (not taking into account `nullable`)
262 for this column type.
263 """
264 raise NotImplementedError()
267def make_tuple_type_adapter(
268 columns: Iterable[ColumnSpec],
269) -> pydantic.TypeAdapter[tuple[Any, ...]]:
270 """Return a `pydantic.TypeAdapter` for a `tuple` with types defined by an
271 iterable of `ColumnSpec` objects.
273 Parameters
274 ----------
275 columns : `~collections.abc.Iterable` [ `ColumnSpec` ]
276 Iterable of column specifications.
278 Returns
279 -------
280 adapter : `pydantic.TypeAdapter`
281 A Pydantic type adapter for the `tuple` representation of a row with
282 the given columns.
283 """
284 # Static type-checkers don't like this runtime use of static-typing
285 # constructs, but that's how Pydantic works.
286 return pydantic.TypeAdapter(tuple[*[spec.annotated_type for spec in columns]]) # type: ignore
289@final
290class IntColumnSpec(_BaseColumnSpec):
291 """Description of an integer column."""
293 pytype: ClassVar[type] = int
295 type: Literal["int"] = "int"
297 def to_arrow(self) -> arrow_utils.ToArrow:
298 # Docstring inherited.
299 return arrow_utils.ToArrow.for_primitive(self.name, pa.uint64(), nullable=self.nullable)
301 def _get_base_annotated_type(self) -> Any:
302 # Docstring inherited.
303 return pydantic.StrictInt
306@final
307class StringColumnSpec(_BaseColumnSpec):
308 """Description of a string column."""
310 pytype: ClassVar[type] = str
312 type: Literal["string"] = "string"
314 length: int
315 """Maximum length of strings."""
317 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec:
318 # Docstring inherited.
319 return super().to_sql_spec(length=self.length, name_shrinker=name_shrinker, **kwargs)
321 def to_arrow(self) -> arrow_utils.ToArrow:
322 # Docstring inherited.
323 return arrow_utils.ToArrow.for_primitive(self.name, pa.string(), nullable=self.nullable)
325 def _get_base_annotated_type(self) -> Any:
326 # Docstring inherited.
327 return pydantic.StrictStr
330@final
331class HashColumnSpec(_BaseColumnSpec):
332 """Description of a hash digest."""
334 pytype: ClassVar[type] = bytes
336 type: Literal["hash"] = "hash"
338 nbytes: int
339 """Number of bytes for the hash."""
341 def to_sql_spec(self, name_shrinker: NameShrinker | None = None, **kwargs: Any) -> ddl.FieldSpec:
342 # Docstring inherited.
343 return super().to_sql_spec(nbytes=self.nbytes, name_shrinker=name_shrinker, **kwargs)
345 def to_arrow(self) -> arrow_utils.ToArrow:
346 # Docstring inherited.
347 return arrow_utils.ToArrow.for_primitive(
348 self.name,
349 # The size for Arrow binary columns is a fixed size, not a maximum
350 # as in SQL, so we use a variable-size column.
351 pa.binary(),
352 nullable=self.nullable,
353 )
355 def _get_base_annotated_type(self) -> Any:
356 # Docstring inherited.
357 return SerializableBytesHex
360@final
361class FloatColumnSpec(_BaseColumnSpec):
362 """Description of a float column."""
364 pytype: ClassVar[type] = float
366 type: Literal["float"] = "float"
368 def to_arrow(self) -> arrow_utils.ToArrow:
369 # Docstring inherited.
370 assert self.nullable is not None, "nullable=None should be resolved by validators"
371 return arrow_utils.ToArrow.for_primitive(self.name, pa.float64(), nullable=self.nullable)
373 def _get_base_annotated_type(self) -> Any:
374 # Docstring inherited.
375 return pydantic.StrictFloat
378@final
379class BoolColumnSpec(_BaseColumnSpec):
380 """Description of a bool column."""
382 pytype: ClassVar[type] = bool
384 type: Literal["bool"] = "bool"
386 def to_arrow(self) -> arrow_utils.ToArrow:
387 # Docstring inherited.
388 return arrow_utils.ToArrow.for_primitive(self.name, pa.bool_(), nullable=self.nullable)
390 def _get_base_annotated_type(self) -> Any:
391 # Docstring inherited.
392 return pydantic.StrictBool
395@final
396class UUIDColumnSpec(_BaseColumnSpec):
397 """Description of a UUID column."""
399 pytype: ClassVar[type] = uuid.UUID
401 type: Literal["uuid"] = "uuid"
403 def to_arrow(self) -> arrow_utils.ToArrow:
404 # Docstring inherited.
405 assert self.nullable is not None, "nullable=None should be resolved by validators"
406 return arrow_utils.ToArrow.for_uuid(self.name, nullable=self.nullable)
408 def _get_base_annotated_type(self) -> Any:
409 # Docstring inherited.
410 return uuid.UUID
413@final
414class RegionColumnSpec(_BaseColumnSpec):
415 """Description of a region column."""
417 name: str = "region"
419 pytype: ClassVar[type] = Region
421 type: Literal["region"] = "region"
423 nbytes: int = 2048
424 """Number of bytes for the encoded region."""
426 def to_arrow(self) -> arrow_utils.ToArrow:
427 # Docstring inherited.
428 assert self.nullable is not None, "nullable=None should be resolved by validators"
429 return arrow_utils.ToArrow.for_region(self.name, nullable=self.nullable)
431 def _get_base_annotated_type(self) -> Any:
432 # Docstring inherited.
433 return SerializableRegion
436@final
437class TimespanColumnSpec(_BaseColumnSpec):
438 """Description of a timespan column."""
440 name: str = "timespan"
442 pytype: ClassVar[type] = Timespan
444 type: Literal["timespan"] = "timespan"
446 def to_arrow(self) -> arrow_utils.ToArrow:
447 # Docstring inherited.
448 return arrow_utils.ToArrow.for_timespan(self.name, nullable=self.nullable)
450 def _get_base_annotated_type(self) -> Any:
451 # Docstring inherited.
452 return Timespan
455@final
456class DateTimeColumnSpec(_BaseColumnSpec):
457 """Description of a time column, stored as integer TAI nanoseconds since
458 1970-01-01 and represented in Python via `astropy.time.Time`.
459 """
461 pytype: ClassVar[type] = astropy.time.Time
463 type: Literal["datetime"] = "datetime"
465 def to_arrow(self) -> arrow_utils.ToArrow:
466 # Docstring inherited.
467 assert self.nullable is not None, "nullable=None should be resolved by validators"
468 return arrow_utils.ToArrow.for_datetime(self.name, nullable=self.nullable)
470 def _get_base_annotated_type(self) -> Any:
471 # Docstring inherited.
472 return SerializableTime
475ColumnSpec = Annotated[
476 Union[
477 IntColumnSpec,
478 StringColumnSpec,
479 HashColumnSpec,
480 FloatColumnSpec,
481 BoolColumnSpec,
482 UUIDColumnSpec,
483 RegionColumnSpec,
484 TimespanColumnSpec,
485 DateTimeColumnSpec,
486 ],
487 pydantic.Field(discriminator="type"),
488]