Coverage for tests / utils_tests.py: 14%
114 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:41 +0000
1# This file is part of ap_association.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Helper functions for tests of DIA catalogs, including generating mock
23catalogs for simulated APDB access.
24"""
25import astropy.units
26import pandas as pd
27import numpy as np
29from lsst.afw.cameraGeom.testUtils import DetectorWrapper
30from lsst.afw.coord import Observatory
31import lsst.afw.geom as afwGeom
32import lsst.afw.image as afwImage
33from lsst.daf.base import DateTime, PropertySet
34import lsst.daf.butler as dafButler
35import lsst.geom
36import lsst.meas.algorithms as measAlg
37from lsst.pipe.base.utils import RegionTimeInfo
38import lsst.sphgeom
40from lsst.ap.association.utils import getRegion
43def makeDiaObjects(nObjects, exposure, rng, startId=1):
44 """Make a test set of DiaObjects.
46 Parameters
47 ----------
48 nObjects : `int`
49 Number of objects to create.
50 exposure : `lsst.afw.image.Exposure`
51 Exposure to create objects over.
52 rng : `numpy.random.Generator`
53 A NumPy random number generator initialized with a fixed seed for reproducibility.
55 Returns
56 -------
57 diaObjects : `pandas.DataFrame`
58 DiaObjects generated across the exposure.
59 """
60 bbox = lsst.geom.Box2D(exposure.getBBox())
61 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nObjects)
62 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nObjects)
64 data = []
65 for idx, (x, y) in enumerate(zip(rand_x, rand_y)):
66 coord = exposure.wcs.pixelToSky(x, y)
67 newObject = {"ra": coord.getRa().asDegrees(),
68 "dec": coord.getDec().asDegrees(),
69 "diaObjectId": idx + startId,
70 "nDiaSources": 5}
71 for f in ["u", "g", "r", "i", "z", "y"]:
72 newObject["%s_psfFluxNdata" % f] = 0
73 data.append(newObject)
75 return pd.DataFrame(data=data).set_index("diaObjectId", drop=False)
78def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, flagList=None, startId=1):
79 """Make a test set of DiaSources.
81 Parameters
82 ----------
83 nSources : `int`
84 Number of sources to create.
85 diaObjectIds : `numpy.ndarray`
86 Integer Ids of diaobjects to "associate" with the DiaSources.
87 exposure : `lsst.afw.image.Exposure`
88 Exposure to create sources over.
89 rng : `numpy.random.Generator`
90 A NumPy random number generator initialized with a fixed seed for reproducibility.
91 randomizeObjects : `bool`, optional
92 If True, randomly draw from `diaObjectIds` to generate the ids in the
93 output catalog, otherwise just iterate through them, repeating as
94 necessary to get nSources objectIds.
95 flagList : `list` of `str`, optional
96 Optional list of names of flag columns to add to the DiaSource table.
98 Returns
99 -------
100 diaSources : `pandas.DataFrame`
101 DiaSources generated across the exposure.
102 """
103 bbox = lsst.geom.Box2D(exposure.getBBox())
104 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources)
105 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources)
106 if randomizeObjects:
107 objectIds = diaObjectIds[rng.integers(len(diaObjectIds), size=nSources)]
108 else:
109 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nSources)]]
111 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD)
113 data = []
114 flags = {}
115 if flagList is not None:
116 for flag in flagList:
117 flags[flag] = False
118 for idx, (x, y, objId) in enumerate(zip(rand_x, rand_y, objectIds)):
119 coord = exposure.wcs.pixelToSky(x, y)
120 # Put together the minimum values for the alert.
121 diaSource = {"ra": coord.getRa().asDegrees(),
122 "dec": coord.getDec().asDegrees(),
123 "x": x,
124 "y": y,
125 "visit": exposure.visitInfo.id,
126 "detector": exposure.detector.getId(),
127 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD,
128 scale=DateTime.TAI),
129 "diaObjectId": objId,
130 "ssObjectId": 0,
131 "parentDiaSourceId": 0,
132 "diaSourceId": idx + startId,
133 "midpointMjdTai": midpointMjdTai + 1.0 * idx,
134 "band": exposure.getFilter().bandLabel,
135 "psfNdata": 0,
136 "bboxSize": 23,
137 "trailNdata": 0,
138 "dipoleNdata": 0}
139 data.append(diaSource | flags)
141 return pd.DataFrame(data=data).set_index(["diaObjectId", "band", "diaSourceId"], drop=False)
144def makeSolarSystemSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, startId=1):
145 """Make a test set of solar system sources.
147 Parameters
148 ----------
149 nSources : `int`
150 Number of sources to create.
151 diaObjectIds : `numpy.ndarray`
152 Integer Ids of diaobjects to "associate" with the DiaSources.
153 exposure : `lsst.afw.image.Exposure`
154 Exposure to create sources over.
155 rng : `numpy.random.Generator`
156 A NumPy random number generator initialized with a fixed seed for reproducibility.
157 randomizeObjects : `bool`, optional
158 If True, randomly draw from `diaObjectIds` to generate the ids in the
159 output catalog, otherwise just iterate through them, repeating as
160 necessary to get nSources objectIds.
162 Returns
163 -------
164 solarSystemSources : `pandas.DataFrame`
165 Solar system sources generated across the exposure.
166 """
167 solarSystemSources = makeDiaSources(nSources, diaObjectIds, exposure, rng,
168 randomizeObjects=False,
169 startId=startId)
170 solarSystemSources["ssObjectId"] = rng.integers(0, high=2**63-1, size=nSources)
171 solarSystemSources["Err(arcsec)"] = rng.uniform(0.2, 0.4, size=nSources)
173 return solarSystemSources
176def makeDiaForcedSources(nForcedSources, diaObjectIds, exposure, rng, randomizeObjects=False, startId=1):
177 """Make a test set of DiaSources.
179 Parameters
180 ----------
181 nForcedSources : `int`
182 Number of sources to create.
183 diaObjectIds : `numpy.ndarray`
184 Integer Ids of diaobjects to "associate" with the DiaSources.
185 exposure : `lsst.afw.image.Exposure`
186 Exposure to create sources over.
187 rng : `numpy.random.Generator`
188 A NumPy random number generator initialized with a fixed seed for reproducibility.
189 randomizeObjects : `bool`, optional
190 If True, randomly draw from `diaObjectIds` to generate the ids in the
191 output catalog, otherwise just iterate through them.
193 Returns
194 -------
195 diaForcedSources : `pandas.DataFrame`
196 DiaForcedSources generated across the exposure.
197 """
198 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD)
199 visit = exposure.visitInfo.id
200 detector = exposure.detector.getId()
201 if randomizeObjects:
202 objectIds = diaObjectIds[rng.randint(len(diaObjectIds), size=nForcedSources)]
203 else:
204 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nForcedSources)]]
206 data = []
207 bbox = exposure.getBBox()
209 for i, objId in enumerate(objectIds):
210 # Put together the minimum values for the alert.
211 x = rng.uniform(bbox.minX, bbox.maxX)
212 y = rng.uniform(bbox.minY, bbox.maxY)
213 coord = exposure.wcs.pixelToSky(x, y)
214 data.append({"diaForcedSourceId": i + startId,
215 "visit": visit + i,
216 "detector": detector,
217 "diaObjectId": objId,
218 "ra": coord.getRa().asDegrees(),
219 "dec": coord.getDec().asDegrees(),
220 "midpointMjdTai": midpointMjdTai + 1.0 * i,
221 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD,
222 scale=DateTime.TAI),
223 "band": exposure.getFilter().bandLabel})
225 return pd.DataFrame(data=data).set_index(["diaObjectId", "diaForcedSourceId"], drop=False)
228def makeExposure(flipX=False, flipY=False):
229 """Create an exposure and flip the x or y (or both) coordinates.
231 Returns bounding boxes that are right or left handed around the bounding
232 polygon.
234 Parameters
235 ----------
236 flipX : `bool`
237 Flip the x coordinate in the WCS.
238 flipY : `bool`
239 Flip the y coordinate in the WCS.
241 Returns
242 -------
243 exposure : `lsst.afw.image.Exposure`
244 Exposure with a valid bounding box and wcs.
245 """
246 metadata = PropertySet()
248 metadata.set("SIMPLE", "T")
249 metadata.set("BITPIX", -32)
250 metadata.set("NAXIS", 2)
251 metadata.set("NAXIS1", 1024)
252 metadata.set("NAXIS2", 1153)
253 metadata.set("RADECSYS", 'FK5')
254 metadata.set("EQUINOX", 2000.)
255 ra = 215.604025685476
256 dec = 53.1595451514076
258 metadata.setDouble("CRVAL1", ra)
259 metadata.setDouble("CRVAL2", dec)
260 metadata.setDouble("CRPIX1", 1109.99981456774)
261 metadata.setDouble("CRPIX2", 560.018167811613)
262 metadata.set("CTYPE1", 'RA---SIN')
263 metadata.set("CTYPE2", 'DEC--SIN')
265 xFlip = 1
266 if flipX:
267 xFlip = -1
268 yFlip = 1
269 if flipY:
270 yFlip = -1
271 metadata.setDouble("CD1_1", xFlip * 5.10808596133527E-05)
272 metadata.setDouble("CD1_2", yFlip * 1.85579539217196E-07)
273 metadata.setDouble("CD2_2", yFlip * -5.10281493481982E-05)
274 metadata.setDouble("CD2_1", xFlip * -8.27440751733828E-07)
276 wcs = afwGeom.makeSkyWcs(metadata)
277 exposure = afwImage.makeExposure(
278 afwImage.makeMaskedImageFromArrays(np.ones((1024, 1153))), wcs)
279 detector = DetectorWrapper(id=23, bbox=exposure.getBBox()).detector
280 visit = afwImage.VisitInfo(
281 exposureTime=200.,
282 date=DateTime("2014-05-13T17:00:00.000000000",
283 DateTime.Timescale.TAI),
284 boresightRaDec=lsst.geom.SpherePoint(ra, dec, lsst.geom.degrees),
285 boresightRotAngle=73.2*lsst.geom.degrees,
286 rotType=afwImage.RotType.SKY,
287 observatory=Observatory(11.1*lsst.geom.degrees, 22.2*lsst.geom.degrees, 0.333),
288 )
289 kernel = measAlg.DoubleGaussianPsf(7, 7, 2.0).getKernel()
290 exposure.setPsf(measAlg.KernelPsf(kernel, exposure.getBBox().getCenter()))
291 exposure.info.id = 1234
292 exposure.setDetector(detector)
293 exposure.info.setVisitInfo(visit)
294 exposure.setFilter(afwImage.FilterLabel(band='g'))
295 exposure.setPhotoCalib(afwImage.PhotoCalib(1., 0., exposure.getBBox()))
297 return exposure
300def makeRegionTime(exposure=None, begin=None, end=None):
301 """Make a `RegionTimeInfo` for testing
303 Parameters
304 ----------
305 exposure : `lsst.afw.image.Exposure`, optional
306 Exposure to construct a ``RegionTimeInfo`` for.
307 If None, a default exposure will be created.
308 begin : `astropy.time.Time`, optional
309 The start time of the interval.
310 If `None`, calculate the start time from the midpoint of the exposure
311 and the exposure length.
312 end : `astropy.time.Time`, optional
313 The end time of the interval.
314 If `None`, calculate the end time from the midpoint of the exposure
315 and the exposure length.
317 Returns
318 -------
319 regionTime : `lsst.pipe.base.utils.RegionTimeInfo`
320 Object containing the spatial region and temporal timespan for an exposure.
321 """
322 if exposure is None:
323 exposure = makeExposure()
324 region = getRegion(exposure)
325 expTime = exposure.visitInfo.exposureTime*astropy.units.second
326 # visitInfo time is the midpoint of the exposure.
327 if begin is None:
328 begin = exposure.visitInfo.date.toAstropy() - expTime/2
329 if end is None:
330 end = exposure.visitInfo.date.toAstropy() + expTime/2
331 timespan = dafButler.Timespan(begin=begin, end=end)
332 return RegionTimeInfo(region=region, timespan=timespan)