Coverage for tests / utils_tests.py: 14%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:21 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Helper functions for tests of DIA catalogs, including generating mock 

23catalogs for simulated APDB access. 

24""" 

25import astropy.units 

26import pandas as pd 

27import numpy as np 

28 

29from lsst.afw.cameraGeom.testUtils import DetectorWrapper 

30from lsst.afw.coord import Observatory 

31import lsst.afw.geom as afwGeom 

32import lsst.afw.image as afwImage 

33from lsst.daf.base import DateTime, PropertySet 

34import lsst.daf.butler as dafButler 

35import lsst.geom 

36import lsst.meas.algorithms as measAlg 

37from lsst.pipe.base.utils import RegionTimeInfo 

38import lsst.sphgeom 

39 

40from lsst.ap.association.utils import getRegion 

41 

42 

43def makeDiaObjects(nObjects, exposure, rng, startId=1): 

44 """Make a test set of DiaObjects. 

45 

46 Parameters 

47 ---------- 

48 nObjects : `int` 

49 Number of objects to create. 

50 exposure : `lsst.afw.image.Exposure` 

51 Exposure to create objects over. 

52 rng : `numpy.random.Generator` 

53 A NumPy random number generator initialized with a fixed seed for reproducibility. 

54 

55 Returns 

56 ------- 

57 diaObjects : `pandas.DataFrame` 

58 DiaObjects generated across the exposure. 

59 """ 

60 bbox = lsst.geom.Box2D(exposure.getBBox()) 

61 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nObjects) 

62 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nObjects) 

63 

64 data = [] 

65 for idx, (x, y) in enumerate(zip(rand_x, rand_y)): 

66 coord = exposure.wcs.pixelToSky(x, y) 

67 newObject = {"ra": coord.getRa().asDegrees(), 

68 "dec": coord.getDec().asDegrees(), 

69 "diaObjectId": idx + startId, 

70 "nDiaSources": 5} 

71 for f in ["u", "g", "r", "i", "z", "y"]: 

72 newObject["%s_psfFluxNdata" % f] = 0 

73 data.append(newObject) 

74 

75 return pd.DataFrame(data=data).set_index("diaObjectId", drop=False) 

76 

77 

78def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, flagList=None, startId=1): 

79 """Make a test set of DiaSources. 

80 

81 Parameters 

82 ---------- 

83 nSources : `int` 

84 Number of sources to create. 

85 diaObjectIds : `numpy.ndarray` 

86 Integer Ids of diaobjects to "associate" with the DiaSources. 

87 exposure : `lsst.afw.image.Exposure` 

88 Exposure to create sources over. 

89 rng : `numpy.random.Generator` 

90 A NumPy random number generator initialized with a fixed seed for reproducibility. 

91 randomizeObjects : `bool`, optional 

92 If True, randomly draw from `diaObjectIds` to generate the ids in the 

93 output catalog, otherwise just iterate through them, repeating as 

94 necessary to get nSources objectIds. 

95 flagList : `list` of `str`, optional 

96 Optional list of names of flag columns to add to the DiaSource table. 

97 

98 Returns 

99 ------- 

100 diaSources : `pandas.DataFrame` 

101 DiaSources generated across the exposure. 

102 """ 

103 bbox = lsst.geom.Box2D(exposure.getBBox()) 

104 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources) 

105 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources) 

106 if randomizeObjects: 

107 objectIds = diaObjectIds[rng.integers(len(diaObjectIds), size=nSources)] 

108 else: 

109 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nSources)]] 

110 

111 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD) 

112 

113 data = [] 

114 flags = {} 

115 if flagList is not None: 

116 for flag in flagList: 

117 flags[flag] = False 

118 for idx, (x, y, objId) in enumerate(zip(rand_x, rand_y, objectIds)): 

119 coord = exposure.wcs.pixelToSky(x, y) 

120 # Put together the minimum values for the alert. 

121 diaSource = {"ra": coord.getRa().asDegrees(), 

122 "dec": coord.getDec().asDegrees(), 

123 "x": x, 

124 "y": y, 

125 "visit": exposure.visitInfo.id, 

126 "detector": exposure.detector.getId(), 

127 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD, 

128 scale=DateTime.TAI), 

129 "diaObjectId": objId, 

130 "ssObjectId": 0, 

131 "parentDiaSourceId": 0, 

132 "diaSourceId": idx + startId, 

133 "midpointMjdTai": midpointMjdTai + 1.0 * idx, 

134 "band": exposure.getFilter().bandLabel, 

135 "psfNdata": 0, 

136 "bboxSize": 23, 

137 "trailNdata": 0, 

138 "dipoleNdata": 0} 

139 data.append(diaSource | flags) 

140 

141 return pd.DataFrame(data=data).set_index(["diaObjectId", "band", "diaSourceId"], drop=False) 

142 

143 

144def makeSolarSystemSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, startId=1): 

145 """Make a test set of solar system sources. 

146 

147 Parameters 

148 ---------- 

149 nSources : `int` 

150 Number of sources to create. 

151 diaObjectIds : `numpy.ndarray` 

152 Integer Ids of diaobjects to "associate" with the DiaSources. 

153 exposure : `lsst.afw.image.Exposure` 

154 Exposure to create sources over. 

155 rng : `numpy.random.Generator` 

156 A NumPy random number generator initialized with a fixed seed for reproducibility. 

157 randomizeObjects : `bool`, optional 

158 If True, randomly draw from `diaObjectIds` to generate the ids in the 

159 output catalog, otherwise just iterate through them, repeating as 

160 necessary to get nSources objectIds. 

161 

162 Returns 

163 ------- 

164 solarSystemSources : `pandas.DataFrame` 

165 Solar system sources generated across the exposure. 

166 """ 

167 solarSystemSources = makeDiaSources(nSources, diaObjectIds, exposure, rng, 

168 randomizeObjects=False, 

169 startId=startId) 

170 solarSystemSources["ssObjectId"] = rng.integers(0, high=2**63-1, size=nSources) 

171 solarSystemSources["Err(arcsec)"] = rng.uniform(0.2, 0.4, size=nSources) 

172 

173 return solarSystemSources 

174 

175 

176def makeDiaForcedSources(nForcedSources, diaObjectIds, exposure, rng, randomizeObjects=False, startId=1): 

177 """Make a test set of DiaSources. 

178 

179 Parameters 

180 ---------- 

181 nForcedSources : `int` 

182 Number of sources to create. 

183 diaObjectIds : `numpy.ndarray` 

184 Integer Ids of diaobjects to "associate" with the DiaSources. 

185 exposure : `lsst.afw.image.Exposure` 

186 Exposure to create sources over. 

187 rng : `numpy.random.Generator` 

188 A NumPy random number generator initialized with a fixed seed for reproducibility. 

189 randomizeObjects : `bool`, optional 

190 If True, randomly draw from `diaObjectIds` to generate the ids in the 

191 output catalog, otherwise just iterate through them. 

192 

193 Returns 

194 ------- 

195 diaForcedSources : `pandas.DataFrame` 

196 DiaForcedSources generated across the exposure. 

197 """ 

198 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD) 

199 visit = exposure.visitInfo.id 

200 detector = exposure.detector.getId() 

201 if randomizeObjects: 

202 objectIds = diaObjectIds[rng.randint(len(diaObjectIds), size=nForcedSources)] 

203 else: 

204 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nForcedSources)]] 

205 

206 data = [] 

207 bbox = exposure.getBBox() 

208 

209 for i, objId in enumerate(objectIds): 

210 # Put together the minimum values for the alert. 

211 x = rng.uniform(bbox.minX, bbox.maxX) 

212 y = rng.uniform(bbox.minY, bbox.maxY) 

213 coord = exposure.wcs.pixelToSky(x, y) 

214 data.append({"diaForcedSourceId": i + startId, 

215 "visit": visit + i, 

216 "detector": detector, 

217 "diaObjectId": objId, 

218 "ra": coord.getRa().asDegrees(), 

219 "dec": coord.getDec().asDegrees(), 

220 "midpointMjdTai": midpointMjdTai + 1.0 * i, 

221 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD, 

222 scale=DateTime.TAI), 

223 "band": exposure.getFilter().bandLabel}) 

224 

225 return pd.DataFrame(data=data).set_index(["diaObjectId", "diaForcedSourceId"], drop=False) 

226 

227 

228def makeExposure(flipX=False, flipY=False): 

229 """Create an exposure and flip the x or y (or both) coordinates. 

230 

231 Returns bounding boxes that are right or left handed around the bounding 

232 polygon. 

233 

234 Parameters 

235 ---------- 

236 flipX : `bool` 

237 Flip the x coordinate in the WCS. 

238 flipY : `bool` 

239 Flip the y coordinate in the WCS. 

240 

241 Returns 

242 ------- 

243 exposure : `lsst.afw.image.Exposure` 

244 Exposure with a valid bounding box and wcs. 

245 """ 

246 metadata = PropertySet() 

247 

248 metadata.set("SIMPLE", "T") 

249 metadata.set("BITPIX", -32) 

250 metadata.set("NAXIS", 2) 

251 metadata.set("NAXIS1", 1024) 

252 metadata.set("NAXIS2", 1153) 

253 metadata.set("RADECSYS", 'FK5') 

254 metadata.set("EQUINOX", 2000.) 

255 ra = 215.604025685476 

256 dec = 53.1595451514076 

257 

258 metadata.setDouble("CRVAL1", ra) 

259 metadata.setDouble("CRVAL2", dec) 

260 metadata.setDouble("CRPIX1", 1109.99981456774) 

261 metadata.setDouble("CRPIX2", 560.018167811613) 

262 metadata.set("CTYPE1", 'RA---SIN') 

263 metadata.set("CTYPE2", 'DEC--SIN') 

264 

265 xFlip = 1 

266 if flipX: 

267 xFlip = -1 

268 yFlip = 1 

269 if flipY: 

270 yFlip = -1 

271 metadata.setDouble("CD1_1", xFlip * 5.10808596133527E-05) 

272 metadata.setDouble("CD1_2", yFlip * 1.85579539217196E-07) 

273 metadata.setDouble("CD2_2", yFlip * -5.10281493481982E-05) 

274 metadata.setDouble("CD2_1", xFlip * -8.27440751733828E-07) 

275 

276 wcs = afwGeom.makeSkyWcs(metadata) 

277 exposure = afwImage.makeExposure( 

278 afwImage.makeMaskedImageFromArrays(np.ones((1024, 1153))), wcs) 

279 detector = DetectorWrapper(id=23, bbox=exposure.getBBox()).detector 

280 visit = afwImage.VisitInfo( 

281 exposureTime=200., 

282 date=DateTime("2014-05-13T17:00:00.000000000", 

283 DateTime.Timescale.TAI), 

284 boresightRaDec=lsst.geom.SpherePoint(ra, dec, lsst.geom.degrees), 

285 boresightRotAngle=73.2*lsst.geom.degrees, 

286 rotType=afwImage.RotType.SKY, 

287 observatory=Observatory(11.1*lsst.geom.degrees, 22.2*lsst.geom.degrees, 0.333), 

288 ) 

289 kernel = measAlg.DoubleGaussianPsf(7, 7, 2.0).getKernel() 

290 exposure.setPsf(measAlg.KernelPsf(kernel, exposure.getBBox().getCenter())) 

291 exposure.info.id = 1234 

292 exposure.setDetector(detector) 

293 exposure.info.setVisitInfo(visit) 

294 exposure.setFilter(afwImage.FilterLabel(band='g')) 

295 exposure.setPhotoCalib(afwImage.PhotoCalib(1., 0., exposure.getBBox())) 

296 

297 return exposure 

298 

299 

300def makeRegionTime(exposure=None, begin=None, end=None): 

301 """Make a `RegionTimeInfo` for testing 

302 

303 Parameters 

304 ---------- 

305 exposure : `lsst.afw.image.Exposure`, optional 

306 Exposure to construct a ``RegionTimeInfo`` for. 

307 If None, a default exposure will be created. 

308 begin : `astropy.time.Time`, optional 

309 The start time of the interval. 

310 If `None`, calculate the start time from the midpoint of the exposure 

311 and the exposure length. 

312 end : `astropy.time.Time`, optional 

313 The end time of the interval. 

314 If `None`, calculate the end time from the midpoint of the exposure 

315 and the exposure length. 

316 

317 Returns 

318 ------- 

319 regionTime : `lsst.pipe.base.utils.RegionTimeInfo` 

320 Object containing the spatial region and temporal timespan for an exposure. 

321 """ 

322 if exposure is None: 

323 exposure = makeExposure() 

324 region = getRegion(exposure) 

325 expTime = exposure.visitInfo.exposureTime*astropy.units.second 

326 # visitInfo time is the midpoint of the exposure. 

327 if begin is None: 

328 begin = exposure.visitInfo.date.toAstropy() - expTime/2 

329 if end is None: 

330 end = exposure.visitInfo.date.toAstropy() + expTime/2 

331 timespan = dafButler.Timespan(begin=begin, end=end) 

332 return RegionTimeInfo(region=region, timespan=timespan)