Coverage for python / lsst / dax / apdb / apdbSchema.py: 39%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:49 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Module containing methods and classes for generic APDB schema operations.
24The code in this module is independent of the specific technology used to
25implement APDB.
26"""
28from __future__ import annotations
30__all__ = ["ApdbSchema", "ApdbTables"]
32import enum
33import logging
34from collections.abc import Mapping, MutableMapping
35from functools import cached_property
37import felis.datamodel
38import numpy
40from .schema_model import ExtraDataTypes, Schema, Table
41from .versionTuple import VersionTuple
43_LOG = logging.getLogger(__name__)
45# In most cases column types are determined by Cassandra driver, but in some
46# cases we need to create Pandas Dataframe ourselves and we use this map to
47# infer types of columns from their YAML schema. Note that Cassandra saves
48# timestamps with millisecond precision, but pandas maps datetime type to
49# "datetime64[ns]".
50_dtype_map: Mapping[felis.datamodel.DataType | ExtraDataTypes, type | str] = {
51 felis.datamodel.DataType.double: numpy.float64,
52 felis.datamodel.DataType.float: numpy.float32,
53 felis.datamodel.DataType.timestamp: "datetime64[ns]",
54 felis.datamodel.DataType.long: numpy.int64,
55 felis.datamodel.DataType.int: numpy.int32,
56 felis.datamodel.DataType.short: numpy.int16,
57 felis.datamodel.DataType.byte: numpy.int8,
58 felis.datamodel.DataType.binary: object,
59 felis.datamodel.DataType.char: object,
60 felis.datamodel.DataType.text: object,
61 felis.datamodel.DataType.string: object,
62 felis.datamodel.DataType.unicode: object,
63 felis.datamodel.DataType.boolean: bool,
64}
67@enum.unique
68class ApdbTables(enum.Enum):
69 """Names of the tables in APDB schema."""
71 DiaObject = "DiaObject"
72 """Name of the table for DIAObject records."""
74 DiaSource = "DiaSource"
75 """Name of the table for DIASource records."""
77 DiaForcedSource = "DiaForcedSource"
78 """Name of the table for DIAForcedSource records."""
80 DiaObjectLast = "DiaObjectLast"
81 """Name of the table for the last version of DIAObject records.
83 This table may be optional for some implementations.
84 """
86 SSObject = "SSObject"
87 """Name of the table for SSObject records."""
89 SSSource = "SSSource"
90 """Name of the table for SSSource records."""
92 DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
93 """Name of the table for DiaObject_To_Object_Match records."""
95 metadata = "metadata"
96 """Name of the metadata table, this table may not always exist."""
98 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
99 """Return full table name.
101 Parameters
102 ----------
103 prefix : `str`, optional
104 Optional prefix for table name.
105 time_partition : `int`, optional
106 Optional time partition, should only be used for tables that
107 support time partitioning.
108 """
109 name = f"{prefix}{self.value}"
110 if time_partition is not None:
111 name = f"{name}_{time_partition}"
112 return name
115class ApdbSchema:
116 """Class for management of APDB schema.
118 Attributes
119 ----------
120 tableSchemas : `dict`
121 Maps table name to `TableDef` instance.
123 Parameters
124 ----------
125 schema_file : `str`
126 Location of the YAML file with APDB schema.
127 ss_schema_file : `str`
128 Location of the YAML file with SSO schema. File will be loaded if APDB
129 schema file does not contain SSObject/SSSource tables. Can be set to
130 empty string to skip loading of SSObject/SSSource schema.
131 """
133 def __init__(
134 self,
135 schema_file: str,
136 ss_schema_file: str,
137 ):
138 # build complete table schema
139 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file)
140 if ss_schema_file:
141 if ApdbTables.SSObject not in self.tableSchemas or ApdbTables.SSSource not in self.tableSchemas:
142 # Read additional SSP schema.
143 ssp_tables, _ = self._buildSchemas(ss_schema_file)
144 if ApdbTables.SSObject not in ssp_tables or ApdbTables.SSSource not in ssp_tables:
145 raise LookupError(f"Cannot locate SSObject/SSSource table in {ss_schema_file}")
146 self.tableSchemas = dict(self.tableSchemas) | {
147 ApdbTables.SSObject: ssp_tables[ApdbTables.SSObject],
148 ApdbTables.SSSource: ssp_tables[ApdbTables.SSSource],
149 }
151 def column_dtype(self, felis_type: felis.datamodel.DataType | ExtraDataTypes) -> type | str:
152 """Return Pandas data type for a given Felis column type.
154 Parameters
155 ----------
156 felis_type : `felis.datamodel.DataType`
157 Felis type, on of the enums defined in `felis.datamodel` module.
159 Returns
160 -------
161 column_dtype : `type` or `str`
162 Type that can be used for columns in Pandas.
164 Raises
165 ------
166 TypeError
167 Raised if type is cannot be handled.
168 """
169 try:
170 return _dtype_map[felis_type]
171 except KeyError:
172 raise TypeError(f"Unexpected Felis type: {felis_type}")
174 def schemaVersion(self) -> VersionTuple:
175 """Return schema version as defined in YAML schema file.
177 Returns
178 -------
179 version : `VersionTuple`
180 Version number read from YAML file, if YAML file does not define
181 schema version then "0.1.0" is returned.
182 """
183 if self._schemaVersion is None:
184 return VersionTuple(0, 1, 0)
185 else:
186 return self._schemaVersion
188 @classmethod
189 def _buildSchemas(cls, schema_file: str) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]:
190 """Create schema definitions for tables from felis schema.
192 Reads YAML schema and builds a dictionary containing
193 `.schema_model.Table` instances for each APDB table appearing in that
194 schema.
196 Parameters
197 ----------
198 schema_file : `str`
199 Name of YAML file with ``felis`` schema.
201 Returns
202 -------
203 tables : `dict` [`ApdbTables`, `schema_model.Table`]
204 Mapping of table names to `.schema_model.Table` instances.
205 version : `VersionTuple` or `None`
206 Schema version defined in schema file, `None` if version is not
207 defined.
208 """
209 _LOG.debug("Loading felis schema from %s", schema_file)
210 felis_schema = felis.datamodel.Schema.from_uri(schema_file, context={"id_generation": True})
211 schema = Schema.from_felis(felis_schema)
213 # convert all dicts into classes
214 tables: MutableMapping[ApdbTables, Table] = {}
215 for table in schema.tables:
216 try:
217 table_enum = ApdbTables(table.name)
218 except ValueError:
219 # There may be other tables in the schema that do not belong
220 # to APDB.
221 continue
222 else:
223 tables[table_enum] = table
225 version: VersionTuple | None = None
226 if schema.version is not None:
227 version = VersionTuple.fromString(schema.version.current)
229 _LOG.debug("Loaded schema for tables %s", list(tables))
230 return tables, version
232 @cached_property
233 def has_mjd_timestamps(self) -> bool:
234 """True if timestamps columns are in MJD (`bool`)."""
235 table = self.tableSchemas[ApdbTables.DiaObject]
236 # Look for validityStartMjdTai or validityStart
237 for column in table.columns:
238 if column.name == "validityStartMjdTai":
239 return True
240 elif column.name == "validityStart":
241 return False
242 raise LookupError(
243 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema."
244 )