Coverage for python / lsst / ap / association / utils.py: 24%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:22 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Utilities for working with the APDB.
23"""
24__all__ = ("getMidpointFromTimespan", "makeEmptyForcedSourceTable", "getRegion", "paddedRegion",
25 "readSchemaFromApdb")
28import math
29import pandas as pd
31from lsst.daf.butler import Timespan
32from lsst.dax.apdb import Apdb, ApdbTables, schema_model
33import lsst.geom
34import lsst.sphgeom
36from lsst.pipe.tasks.schemaUtils import convertDataFrameToSdmSchema
39def readSchemaFromApdb(apdb: Apdb) -> dict[str, schema_model.Table | None]:
40 """Extract the schema from an APDB instance.
42 Parameters
43 ----------
44 apdb : `lsst.dax.apdb.Apdb`
45 AP database connection object.
47 Returns
48 -------
49 schemaTable : dict[str, schema_model.Table | None]
50 A dict of the schemas in the given table defined in the specified file.
51 """
52 return {table.table_name(): apdb.tableDef(table) for table in ApdbTables}
55def getMidpointFromTimespan(timespan, allowUnbounded=True):
56 """Safely retrieve the midpoint in TAI from a Timespan.
58 Parameters
59 ----------
60 timespan : `lsst.daf.butler.Timespan` of `astropy.time.Time`
61 A Timespan centered on the midpoint of a visit.
62 allowUnbounded : `bool`, optional
63 If set, return the start or end of an unbounded timespan.
65 Returns
66 -------
67 midpoint : `astropy.time.Time`
68 The midpoint of the timespan.
70 Raises
71 ------
72 ValueError
73 Raised if either the start or end of the timespan is None, and
74 ``allowUnbounded`` is not set.
75 ValueError
76 Raised if the timespan is empty.
77 """
78 if (timespan.begin == Timespan.EMPTY) or (timespan.begin == Timespan.EMPTY):
79 raise ValueError("Cannot compute midpoint: EMPTY Timespan.")
81 try:
82 interval = timespan.end - timespan.begin
83 return (timespan.begin + interval/2).tai
84 except TypeError as e:
85 if allowUnbounded:
86 if timespan.end is not None:
87 return timespan.end.tai
88 elif timespan.begin is not None:
89 return timespan.begin.tai
90 else:
91 raise ValueError("Cannot compute midpoint: unbounded timespan.") from e
92 else:
93 raise ValueError("Cannot compute midpoint: unbounded timespan.") from e
96def makeEmptyForcedSourceTable(schema):
97 """Return a dataframe with the correct columns for diaForcedSources table.
99 Returns
100 -------
101 diaForcedSources : `pandas.DataFrame`
102 Empty dataframe.
103 """
104 diaForcedSources = convertDataFrameToSdmSchema(schema, pd.DataFrame(), tableName="DiaForcedSource")
105 return diaForcedSources
108def getRegion(exposure):
109 """Calculate an enveloping region for an exposure.
111 Parameters
112 ----------
113 exposure : `lsst.afw.image.Exposure`
114 Exposure object with calibrated WCS.
116 Returns
117 -------
118 region : `lsst.sphgeom.Region`
119 Region enveloping an exposure.
120 """
121 # Bounding box needs to be a `Box2D` not a `Box2I` for `wcs.pixelToSky()`
122 bbox = lsst.geom.Box2D(exposure.getBBox())
123 wcs = exposure.getWcs()
125 region = lsst.sphgeom.ConvexPolygon([pp.getVector() for pp in wcs.pixelToSky(bbox.getCorners())])
127 return region
130def paddedRegion(region, margin):
131 """Return a region that has been expanded by a buffer.
133 Parameters
134 ----------
135 region : `lsst.sphgeom.Region`
136 The region to pad.
137 margin : `lsst.sphgeom.Angle`
138 The amount by which to increase the region.
140 Returns
141 -------
142 padded : `lsst.sphgeom.Region`
143 An enlarged copy of ``region``.
144 """
145 # region is almost certainly a (padded) detector bounding box.
146 if isinstance(region, lsst.sphgeom.ConvexPolygon):
147 # This is an ad-hoc, approximate implementation. It should be good
148 # enough for catalog loading, but is not a general-purpose solution.
149 center = lsst.geom.SpherePoint(region.getCentroid())
150 corners = [lsst.geom.SpherePoint(c) for c in region.getVertices()]
151 # Approximate the region as a Euclidian square
152 # geom.Angle(sphgeom.Angle) converter not pybind-wrapped???
153 diagonal_margin = lsst.geom.Angle(margin.asRadians() * math.sqrt(2.0))
154 padded = [c.offset(center.bearingTo(c), diagonal_margin) for c in corners]
155 return lsst.sphgeom.ConvexPolygon.convexHull([c.getVector() for c in padded])
156 elif hasattr(region, "dilatedBy"):
157 return region.dilatedBy(margin)
158 else:
159 return region.getBoundingCircle().dilatedBy(margin)