Coverage for python / lsst / daf / butler / registry / obscore / pgsphere.py: 0%
70 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:16 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["PgSphereObsCorePlugin"]
32from collections.abc import Callable, Mapping, Sequence
33from typing import TYPE_CHECKING, Any
35import sqlalchemy
36from sqlalchemy.dialects.postgresql.base import ischema_names
37from sqlalchemy.types import UserDefinedType
39from lsst.sphgeom import ConvexPolygon, LonLat, Region
41from ... import ddl
42from ._spatial import MissingDatabaseError, RegionTypeError, SpatialObsCorePlugin
44if TYPE_CHECKING:
45 from ..interfaces import Database
46 from ._records import Record
49class PgSpherePoint(UserDefinedType):
50 """SQLAlchemy type representing pgSphere point (spoint) type.
52 On Python side this type corresponds to `lsst.sphgeom.LonLat`.
53 Only a limited set of methods is implemented, sufficient to store the
54 data in the database.
55 """
57 cache_ok = True
59 def get_col_spec(self, **kw: Any) -> str:
60 """Return name of the column type.
62 Parameters
63 ----------
64 **kw
65 Keyword Parameters.
67 Returns
68 -------
69 spec : `str`
70 Always returns "SPOINT".
71 """
72 return "SPOINT"
74 def bind_processor(self, dialect: sqlalchemy.engine.Dialect) -> Callable:
75 """Return processor method for bind values.
77 Parameters
78 ----------
79 dialect : `sqlalchemy.engine.Dialect`
80 The relevant dialect.
82 Returns
83 -------
84 processor : `~collections.abc.Callable`
85 The processor method.
86 """
88 def _process(value: LonLat | None) -> str | None:
89 if value is None:
90 return None
91 lon = value.getLon().asRadians()
92 lat = value.getLat().asRadians()
93 return f"({lon},{lat})"
95 return _process
98class PgSpherePolygon(UserDefinedType):
99 """SQLAlchemy type representing pgSphere polygon (spoly) type.
101 On Python side it corresponds to a sequence of `lsst.sphgeom.LonLat`
102 instances (sphgeom polygons are convex, while pgSphere polygons do not
103 have to be). Only a limited set of methods is implemented, sufficient to
104 store the data in the database.
105 """
107 cache_ok = True
109 def get_col_spec(self, **kw: Any) -> str:
110 """Return name of the column type.
112 Parameters
113 ----------
114 **kw
115 Keyword Parameters.
117 Returns
118 -------
119 spec : `str`
120 Always returns "SPOLY".
121 """
122 return "SPOLY"
124 def bind_processor(self, dialect: sqlalchemy.engine.Dialect) -> Callable:
125 """Return processor method for bind values.
127 Parameters
128 ----------
129 dialect : `sqlalchemy.engine.Dialect`
130 The relevant dialect.
132 Returns
133 -------
134 processor : `~collections.abc.Callable`
135 The processor method.
136 """
138 def _process(value: Sequence[LonLat] | None) -> str | None:
139 if value is None:
140 return None
141 points = []
142 for lonlat in value:
143 lon = lonlat.getLon().asRadians()
144 lat = lonlat.getLat().asRadians()
145 points.append(f"({lon},{lat})")
146 return "{" + ",".join(points) + "}"
148 return _process
151# To suppress SAWarning about unknown types we need to make them known, this
152# is not explicitly documented but it is what other people do.
153ischema_names["spoint"] = PgSpherePoint
154ischema_names["spoly"] = PgSpherePolygon
157class PgSphereObsCorePlugin(SpatialObsCorePlugin):
158 """Spatial ObsCore plugin which creates pg_sphere geometries.
160 Parameters
161 ----------
162 name : `str`
163 The name.
164 config : `~collections.abc.Mapping` [`str`, `~typing.Any`]
165 The configuration.
167 Notes
168 -----
169 This plugin adds and fills two columns to obscore table - one for the
170 region (polygon), another for the position of the center of bounding
171 circle. Both columns are indexed. Column names can be changed via plugin
172 configuration.
173 """
175 def __init__(self, *, name: str, config: Mapping[str, Any]):
176 self._name = name
177 self._region_column_name = config.get("region_column", "pgsphere_region")
178 self._position_column_name = config.get("position_column", "pgsphere_position")
180 @classmethod
181 def initialize(cls, *, name: str, config: Mapping[str, Any], db: Database | None) -> SpatialObsCorePlugin:
182 # docstring inherited.
184 if db is None:
185 raise MissingDatabaseError("Database access is required for pgSphere plugin")
187 # Check that engine is Postgres and pgSphere extension is enabled.
188 if db.dialect.name != "postgresql":
189 raise RuntimeError("PgSphere spatial plugin for obscore requires PostgreSQL database.")
190 query = "SELECT COUNT(*) FROM pg_extension WHERE extname='pg_sphere'"
191 with db.query(sqlalchemy.sql.text(query)) as result:
192 if result.scalar() == 0:
193 raise RuntimeError(
194 "PgSphere spatial plugin for obscore requires the pgSphere extension. "
195 "Please run `CREATE EXTENSION pg_sphere;` on a database containing obscore table "
196 "from a PostgreSQL superuser account."
197 )
199 return cls(name=name, config=config)
201 def extend_table_spec(self, table_spec: ddl.TableSpec) -> None:
202 # docstring inherited.
203 table_spec.fields.update(
204 (
205 ddl.FieldSpec(
206 name=self._region_column_name,
207 dtype=PgSpherePolygon,
208 doc="pgSphere polygon for this record region.",
209 ),
210 ddl.FieldSpec(
211 name=self._position_column_name,
212 dtype=PgSpherePoint,
213 doc="pgSphere position for this record, center of bounding circle.",
214 ),
215 )
216 )
217 # Spatial columns need GIST index type
218 table_spec.indexes.add(ddl.IndexSpec(self._region_column_name, postgresql_using="gist"))
219 table_spec.indexes.add(ddl.IndexSpec(self._position_column_name, postgresql_using="gist"))
221 def make_records(self, region: Region | None) -> Record | None:
222 # docstring inherited.
224 if region is None:
225 return None
227 record: Record = {}
228 circle = region.getBoundingCircle()
229 record[self._position_column_name] = LonLat(circle.getCenter())
231 # Presently we can only handle polygons
232 if isinstance(region, ConvexPolygon):
233 poly_points = [LonLat(vertex) for vertex in region.getVertices()]
234 record[self._region_column_name] = poly_points
235 else:
236 raise RegionTypeError(f"Unexpected region type: {type(region)}")
238 return record