Coverage for python / lsst / dax / apdb / apdbSchema.py: 35%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:48 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Module containing methods and classes for generic APDB schema operations.
24The code in this module is independent of the specific technology used to
25implement APDB.
26"""
28from __future__ import annotations
30__all__ = ["ApdbSchema", "ApdbTables"]
32import enum
33import logging
34from collections.abc import Mapping, MutableMapping
35from functools import cached_property
37import felis.datamodel
39from .schema_model import Schema, Table
40from .versionTuple import VersionTuple
42_LOG = logging.getLogger(__name__)
45@enum.unique
46class ApdbTables(enum.Enum):
47 """Names of the tables in APDB schema."""
49 DiaObject = "DiaObject"
50 """Name of the table for DIAObject records."""
52 DiaSource = "DiaSource"
53 """Name of the table for DIASource records."""
55 DiaForcedSource = "DiaForcedSource"
56 """Name of the table for DIAForcedSource records."""
58 DiaObjectLast = "DiaObjectLast"
59 """Name of the table for the last version of DIAObject records.
61 This table may be optional for some implementations.
62 """
64 SSObject = "SSObject"
65 """Name of the table for SSObject records."""
67 SSSource = "SSSource"
68 """Name of the table for SSSource records."""
70 DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
71 """Name of the table for DiaObject_To_Object_Match records."""
73 metadata = "metadata"
74 """Name of the metadata table, this table may not always exist."""
76 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
77 """Return full table name.
79 Parameters
80 ----------
81 prefix : `str`, optional
82 Optional prefix for table name.
83 time_partition : `int`, optional
84 Optional time partition, should only be used for tables that
85 support time partitioning.
86 """
87 name = f"{prefix}{self.value}"
88 if time_partition is not None:
89 name = f"{name}_{time_partition}"
90 return name
93class ApdbSchema:
94 """Class for management of APDB schema.
96 Attributes
97 ----------
98 tableSchemas : `dict`
99 Maps table name to `TableDef` instance.
101 Parameters
102 ----------
103 schema_file : `str`
104 Location of the YAML file with APDB schema.
105 ss_schema_file : `str`
106 Location of the YAML file with SSO schema. File will be loaded if APDB
107 schema file does not contain SSObject/SSSource tables. Can be set to
108 empty string to skip loading of SSObject/SSSource schema.
109 """
111 def __init__(
112 self,
113 schema_file: str,
114 ss_schema_file: str,
115 ):
116 # build complete table schema
117 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file)
118 if ss_schema_file:
119 if ApdbTables.SSObject not in self.tableSchemas or ApdbTables.SSSource not in self.tableSchemas:
120 # Read additional SSP schema.
121 ssp_tables, _ = self._buildSchemas(ss_schema_file)
122 if ApdbTables.SSObject not in ssp_tables or ApdbTables.SSSource not in ssp_tables:
123 raise LookupError(f"Cannot locate SSObject/SSSource table in {ss_schema_file}")
124 self.tableSchemas = dict(self.tableSchemas) | {
125 ApdbTables.SSObject: ssp_tables[ApdbTables.SSObject],
126 ApdbTables.SSSource: ssp_tables[ApdbTables.SSSource],
127 }
129 def schemaVersion(self) -> VersionTuple:
130 """Return schema version as defined in YAML schema file.
132 Returns
133 -------
134 version : `VersionTuple`
135 Version number read from YAML file, if YAML file does not define
136 schema version then "0.1.0" is returned.
137 """
138 if self._schemaVersion is None:
139 return VersionTuple(0, 1, 0)
140 else:
141 return self._schemaVersion
143 @classmethod
144 def _buildSchemas(cls, schema_file: str) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]:
145 """Create schema definitions for tables from felis schema.
147 Reads YAML schema and builds a dictionary containing
148 `.schema_model.Table` instances for each APDB table appearing in that
149 schema.
151 Parameters
152 ----------
153 schema_file : `str`
154 Name of YAML file with ``felis`` schema.
156 Returns
157 -------
158 tables : `dict` [`ApdbTables`, `schema_model.Table`]
159 Mapping of table names to `.schema_model.Table` instances.
160 version : `VersionTuple` or `None`
161 Schema version defined in schema file, `None` if version is not
162 defined.
163 """
164 _LOG.debug("Loading felis schema from %s", schema_file)
165 felis_schema = felis.datamodel.Schema.from_uri(schema_file, context={"id_generation": True})
166 schema = Schema.from_felis(felis_schema)
168 # convert all dicts into classes
169 tables: MutableMapping[ApdbTables, Table] = {}
170 for table in schema.tables:
171 try:
172 table_enum = ApdbTables(table.name)
173 except ValueError:
174 # There may be other tables in the schema that do not belong
175 # to APDB.
176 continue
177 else:
178 tables[table_enum] = table
180 version: VersionTuple | None = None
181 if schema.version is not None:
182 version = VersionTuple.fromString(schema.version.current)
184 _LOG.debug("Loaded schema for tables %s", list(tables))
185 return tables, version
187 @cached_property
188 def has_mjd_timestamps(self) -> bool:
189 """True if timestamps columns are in MJD (`bool`)."""
190 table = self.tableSchemas[ApdbTables.DiaObject]
191 # Look for validityStartMjdTai or validityStart
192 for column in table.columns:
193 if column.name == "validityStartMjdTai":
194 return True
195 elif column.name == "validityStart":
196 return False
197 raise LookupError(
198 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema."
199 )
201 def timestamp_column_name(self, column: str) -> str:
202 """Return column name before/after schema migration to MJD TAI.
204 Parameters
205 ----------
206 column : `str`
207 Column name before MJD TAI schema migration.
209 Returns
210 -------
211 column : `str`
212 Column name in current schema.
213 """
214 if self.has_mjd_timestamps:
215 if column == "time_processed":
216 return "timeProcessedMjdTai"
217 elif column == "time_withdrawn":
218 return "timeWithdrawnMjdTai"
219 else:
220 return f"{column}MjdTai"
221 else:
222 return column