Coverage for tests / utils_tests.py: 13%
108 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:09 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Helper functions for tests of DIA catalogs, including generating mock
23catalogs for simulated APDB access.
24"""
25import astropy.units
26import pandas as pd
27import numpy as np
29from lsst.afw.cameraGeom.testUtils import DetectorWrapper
30import lsst.afw.geom as afwGeom
31import lsst.afw.image as afwImage
32from lsst.daf.base import DateTime, PropertySet
33import lsst.daf.butler as dafButler
34import lsst.geom
35from lsst.pipe.base.utils import RegionTimeInfo
36import lsst.sphgeom
38from lsst.ap.association.utils import getRegion
41def makeDiaObjects(nObjects, exposure, rng):
42 """Make a test set of DiaObjects.
44 Parameters
45 ----------
46 nObjects : `int`
47 Number of objects to create.
48 exposure : `lsst.afw.image.Exposure`
49 Exposure to create objects over.
50 rng : `numpy.random.Generator`
51 A NumPy random number generator initialized with a fixed seed for reproducibility.
53 Returns
54 -------
55 diaObjects : `pandas.DataFrame`
56 DiaObjects generated across the exposure.
57 """
58 bbox = lsst.geom.Box2D(exposure.getBBox())
59 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nObjects)
60 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nObjects)
62 data = []
63 for idx, (x, y) in enumerate(zip(rand_x, rand_y)):
64 coord = exposure.wcs.pixelToSky(x, y)
65 newObject = {"ra": coord.getRa().asDegrees(),
66 "dec": coord.getDec().asDegrees(),
67 "diaObjectId": idx + 1,
68 "nDiaSources": 5}
69 for f in ["u", "g", "r", "i", "z", "y"]:
70 newObject["%s_psfFluxNdata" % f] = 0
71 data.append(newObject)
73 return pd.DataFrame(data=data).set_index("diaObjectId", drop=False)
76def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, flagList=None):
77 """Make a test set of DiaSources.
79 Parameters
80 ----------
81 nSources : `int`
82 Number of sources to create.
83 diaObjectIds : `numpy.ndarray`
84 Integer Ids of diaobjects to "associate" with the DiaSources.
85 exposure : `lsst.afw.image.Exposure`
86 Exposure to create sources over.
87 rng : `numpy.random.Generator`
88 A NumPy random number generator initialized with a fixed seed for reproducibility.
89 randomizeObjects : `bool`, optional
90 If True, randomly draw from `diaObjectIds` to generate the ids in the
91 output catalog, otherwise just iterate through them, repeating as
92 necessary to get nSources objectIds.
93 flagList : `list` of `str`, optional
94 Optional list of names of flag columns to add to the DiaSource table.
96 Returns
97 -------
98 diaSources : `pandas.DataFrame`
99 DiaSources generated across the exposure.
100 """
101 bbox = lsst.geom.Box2D(exposure.getBBox())
102 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources)
103 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources)
104 if randomizeObjects:
105 objectIds = diaObjectIds[rng.integers(len(diaObjectIds), size=nSources)]
106 else:
107 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nSources)]]
109 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD)
111 data = []
112 flags = {}
113 if flagList is not None:
114 for flag in flagList:
115 flags[flag] = False
116 for idx, (x, y, objId) in enumerate(zip(rand_x, rand_y, objectIds)):
117 coord = exposure.wcs.pixelToSky(x, y)
118 # Put together the minimum values for the alert.
119 diaSource = {"ra": coord.getRa().asDegrees(),
120 "dec": coord.getDec().asDegrees(),
121 "x": x,
122 "y": y,
123 "visit": exposure.visitInfo.id,
124 "detector": exposure.detector.getId(),
125 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD,
126 scale=DateTime.TAI),
127 "diaObjectId": objId,
128 "ssObjectId": 0,
129 "parentDiaSourceId": 0,
130 "diaSourceId": idx + 1,
131 "midpointMjdTai": midpointMjdTai + 1.0 * idx,
132 "band": exposure.getFilter().bandLabel,
133 "psfNdata": 0,
134 "bboxSize": 23,
135 "trailNdata": 0,
136 "dipoleNdata": 0}
137 data.append(diaSource | flags)
139 return pd.DataFrame(data=data).set_index(["diaObjectId", "band", "diaSourceId"], drop=False)
142def makeSolarSystemSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False):
143 """Make a test set of solar system sources.
145 Parameters
146 ----------
147 nSources : `int`
148 Number of sources to create.
149 diaObjectIds : `numpy.ndarray`
150 Integer Ids of diaobjects to "associate" with the DiaSources.
151 exposure : `lsst.afw.image.Exposure`
152 Exposure to create sources over.
153 rng : `numpy.random.Generator`
154 A NumPy random number generator initialized with a fixed seed for reproducibility.
155 randomizeObjects : `bool`, optional
156 If True, randomly draw from `diaObjectIds` to generate the ids in the
157 output catalog, otherwise just iterate through them, repeating as
158 necessary to get nSources objectIds.
160 Returns
161 -------
162 solarSystemSources : `pandas.DataFrame`
163 Solar system sources generated across the exposure.
164 """
165 solarSystemSources = makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False)
166 solarSystemSources["ssObjectId"] = rng.integers(0, high=2**63-1, size=nSources)
167 solarSystemSources["Err(arcsec)"] = rng.uniform(0.2, 0.4, size=nSources)
169 return solarSystemSources
172def makeDiaForcedSources(nForcedSources, diaObjectIds, exposure, rng, randomizeObjects=False):
173 """Make a test set of DiaSources.
175 Parameters
176 ----------
177 nForcedSources : `int`
178 Number of sources to create.
179 diaObjectIds : `numpy.ndarray`
180 Integer Ids of diaobjects to "associate" with the DiaSources.
181 exposure : `lsst.afw.image.Exposure`
182 Exposure to create sources over.
183 rng : `numpy.random.Generator`
184 A NumPy random number generator initialized with a fixed seed for reproducibility.
185 randomizeObjects : `bool`, optional
186 If True, randomly draw from `diaObjectIds` to generate the ids in the
187 output catalog, otherwise just iterate through them.
189 Returns
190 -------
191 diaForcedSources : `pandas.DataFrame`
192 DiaForcedSources generated across the exposure.
193 """
194 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD)
195 visit = exposure.visitInfo.id
196 detector = exposure.detector.getId()
197 if randomizeObjects:
198 objectIds = diaObjectIds[rng.randint(len(diaObjectIds), size=nForcedSources)]
199 else:
200 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nForcedSources)]]
202 data = []
203 bbox = exposure.getBBox()
205 for i, objId in enumerate(objectIds):
206 # Put together the minimum values for the alert.
207 x = rng.uniform(bbox.minX, bbox.maxX)
208 y = rng.uniform(bbox.minY, bbox.maxY)
209 coord = exposure.wcs.pixelToSky(x, y)
210 data.append({"diaForcedSourceId": i + 1,
211 "visit": visit + i,
212 "detector": detector,
213 "diaObjectId": objId,
214 "ra": coord.getRa().asDegrees(),
215 "dec": coord.getDec().asDegrees(),
216 "midpointMjdTai": midpointMjdTai + 1.0 * i,
217 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD,
218 scale=DateTime.TAI),
219 "band": exposure.getFilter().bandLabel})
221 return pd.DataFrame(data=data).set_index(["diaObjectId", "diaForcedSourceId"], drop=False)
224def makeExposure(flipX=False, flipY=False):
225 """Create an exposure and flip the x or y (or both) coordinates.
227 Returns bounding boxes that are right or left handed around the bounding
228 polygon.
230 Parameters
231 ----------
232 flipX : `bool`
233 Flip the x coordinate in the WCS.
234 flipY : `bool`
235 Flip the y coordinate in the WCS.
237 Returns
238 -------
239 exposure : `lsst.afw.image.Exposure`
240 Exposure with a valid bounding box and wcs.
241 """
242 metadata = PropertySet()
244 metadata.set("SIMPLE", "T")
245 metadata.set("BITPIX", -32)
246 metadata.set("NAXIS", 2)
247 metadata.set("NAXIS1", 1024)
248 metadata.set("NAXIS2", 1153)
249 metadata.set("RADECSYS", 'FK5')
250 metadata.set("EQUINOX", 2000.)
252 metadata.setDouble("CRVAL1", 215.604025685476)
253 metadata.setDouble("CRVAL2", 53.1595451514076)
254 metadata.setDouble("CRPIX1", 1109.99981456774)
255 metadata.setDouble("CRPIX2", 560.018167811613)
256 metadata.set("CTYPE1", 'RA---SIN')
257 metadata.set("CTYPE2", 'DEC--SIN')
259 xFlip = 1
260 if flipX:
261 xFlip = -1
262 yFlip = 1
263 if flipY:
264 yFlip = -1
265 metadata.setDouble("CD1_1", xFlip * 5.10808596133527E-05)
266 metadata.setDouble("CD1_2", yFlip * 1.85579539217196E-07)
267 metadata.setDouble("CD2_2", yFlip * -5.10281493481982E-05)
268 metadata.setDouble("CD2_1", xFlip * -8.27440751733828E-07)
270 wcs = afwGeom.makeSkyWcs(metadata)
271 exposure = afwImage.makeExposure(
272 afwImage.makeMaskedImageFromArrays(np.ones((1024, 1153))), wcs)
273 detector = DetectorWrapper(id=23, bbox=exposure.getBBox()).detector
274 visit = afwImage.VisitInfo(
275 exposureTime=200.,
276 date=DateTime("2014-05-13T17:00:00.000000000",
277 DateTime.Timescale.TAI))
278 exposure.info.id = 1234
279 exposure.setDetector(detector)
280 exposure.info.setVisitInfo(visit)
281 exposure.setFilter(afwImage.FilterLabel(band='g'))
282 exposure.setPhotoCalib(afwImage.PhotoCalib(1., 0., exposure.getBBox()))
284 return exposure
287def makeRegionTime(exposure=None, begin=None, end=None):
288 """Make a `RegionTimeInfo` for testing
290 Parameters
291 ----------
292 exposure : `lsst.afw.image.Exposure`, optional
293 Exposure to construct a ``RegionTimeInfo`` for.
294 If None, a default exposure will be created.
295 begin : `astropy.time.Time`, optional
296 The start time of the interval.
297 If `None`, calculate the start time from the midpoint of the exposure
298 and the exposure length.
299 end : `astropy.time.Time`, optional
300 The end time of the interval.
301 If `None`, calculate the end time from the midpoint of the exposure
302 and the exposure length.
304 Returns
305 -------
306 regionTime : `lsst.pipe.base.utils.RegionTimeInfo`
307 Object containing the spatial region and temporal timespan for an exposure.
308 """
309 if exposure is None:
310 exposure = makeExposure()
311 region = getRegion(exposure)
312 expTime = exposure.visitInfo.exposureTime*astropy.units.second
313 # visitInfo time is the midpoint of the exposure.
314 if begin is None:
315 begin = exposure.visitInfo.date.toAstropy() - expTime/2
316 if end is None:
317 end = exposure.visitInfo.date.toAstropy() + expTime/2
318 timespan = dafButler.Timespan(begin=begin, end=end)
319 return RegionTimeInfo(region=region, timespan=timespan)