Coverage for tests / utils_tests.py: 13%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:09 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Helper functions for tests of DIA catalogs, including generating mock 

23catalogs for simulated APDB access. 

24""" 

25import astropy.units 

26import pandas as pd 

27import numpy as np 

28 

29from lsst.afw.cameraGeom.testUtils import DetectorWrapper 

30import lsst.afw.geom as afwGeom 

31import lsst.afw.image as afwImage 

32from lsst.daf.base import DateTime, PropertySet 

33import lsst.daf.butler as dafButler 

34import lsst.geom 

35from lsst.pipe.base.utils import RegionTimeInfo 

36import lsst.sphgeom 

37 

38from lsst.ap.association.utils import getRegion 

39 

40 

41def makeDiaObjects(nObjects, exposure, rng): 

42 """Make a test set of DiaObjects. 

43 

44 Parameters 

45 ---------- 

46 nObjects : `int` 

47 Number of objects to create. 

48 exposure : `lsst.afw.image.Exposure` 

49 Exposure to create objects over. 

50 rng : `numpy.random.Generator` 

51 A NumPy random number generator initialized with a fixed seed for reproducibility. 

52 

53 Returns 

54 ------- 

55 diaObjects : `pandas.DataFrame` 

56 DiaObjects generated across the exposure. 

57 """ 

58 bbox = lsst.geom.Box2D(exposure.getBBox()) 

59 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nObjects) 

60 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nObjects) 

61 

62 data = [] 

63 for idx, (x, y) in enumerate(zip(rand_x, rand_y)): 

64 coord = exposure.wcs.pixelToSky(x, y) 

65 newObject = {"ra": coord.getRa().asDegrees(), 

66 "dec": coord.getDec().asDegrees(), 

67 "diaObjectId": idx + 1, 

68 "nDiaSources": 5} 

69 for f in ["u", "g", "r", "i", "z", "y"]: 

70 newObject["%s_psfFluxNdata" % f] = 0 

71 data.append(newObject) 

72 

73 return pd.DataFrame(data=data).set_index("diaObjectId", drop=False) 

74 

75 

76def makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False, flagList=None): 

77 """Make a test set of DiaSources. 

78 

79 Parameters 

80 ---------- 

81 nSources : `int` 

82 Number of sources to create. 

83 diaObjectIds : `numpy.ndarray` 

84 Integer Ids of diaobjects to "associate" with the DiaSources. 

85 exposure : `lsst.afw.image.Exposure` 

86 Exposure to create sources over. 

87 rng : `numpy.random.Generator` 

88 A NumPy random number generator initialized with a fixed seed for reproducibility. 

89 randomizeObjects : `bool`, optional 

90 If True, randomly draw from `diaObjectIds` to generate the ids in the 

91 output catalog, otherwise just iterate through them, repeating as 

92 necessary to get nSources objectIds. 

93 flagList : `list` of `str`, optional 

94 Optional list of names of flag columns to add to the DiaSource table. 

95 

96 Returns 

97 ------- 

98 diaSources : `pandas.DataFrame` 

99 DiaSources generated across the exposure. 

100 """ 

101 bbox = lsst.geom.Box2D(exposure.getBBox()) 

102 rand_x = rng.uniform(bbox.getMinX(), bbox.getMaxX(), size=nSources) 

103 rand_y = rng.uniform(bbox.getMinY(), bbox.getMaxY(), size=nSources) 

104 if randomizeObjects: 

105 objectIds = diaObjectIds[rng.integers(len(diaObjectIds), size=nSources)] 

106 else: 

107 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nSources)]] 

108 

109 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD) 

110 

111 data = [] 

112 flags = {} 

113 if flagList is not None: 

114 for flag in flagList: 

115 flags[flag] = False 

116 for idx, (x, y, objId) in enumerate(zip(rand_x, rand_y, objectIds)): 

117 coord = exposure.wcs.pixelToSky(x, y) 

118 # Put together the minimum values for the alert. 

119 diaSource = {"ra": coord.getRa().asDegrees(), 

120 "dec": coord.getDec().asDegrees(), 

121 "x": x, 

122 "y": y, 

123 "visit": exposure.visitInfo.id, 

124 "detector": exposure.detector.getId(), 

125 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD, 

126 scale=DateTime.TAI), 

127 "diaObjectId": objId, 

128 "ssObjectId": 0, 

129 "parentDiaSourceId": 0, 

130 "diaSourceId": idx + 1, 

131 "midpointMjdTai": midpointMjdTai + 1.0 * idx, 

132 "band": exposure.getFilter().bandLabel, 

133 "psfNdata": 0, 

134 "bboxSize": 23, 

135 "trailNdata": 0, 

136 "dipoleNdata": 0} 

137 data.append(diaSource | flags) 

138 

139 return pd.DataFrame(data=data).set_index(["diaObjectId", "band", "diaSourceId"], drop=False) 

140 

141 

142def makeSolarSystemSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False): 

143 """Make a test set of solar system sources. 

144 

145 Parameters 

146 ---------- 

147 nSources : `int` 

148 Number of sources to create. 

149 diaObjectIds : `numpy.ndarray` 

150 Integer Ids of diaobjects to "associate" with the DiaSources. 

151 exposure : `lsst.afw.image.Exposure` 

152 Exposure to create sources over. 

153 rng : `numpy.random.Generator` 

154 A NumPy random number generator initialized with a fixed seed for reproducibility. 

155 randomizeObjects : `bool`, optional 

156 If True, randomly draw from `diaObjectIds` to generate the ids in the 

157 output catalog, otherwise just iterate through them, repeating as 

158 necessary to get nSources objectIds. 

159 

160 Returns 

161 ------- 

162 solarSystemSources : `pandas.DataFrame` 

163 Solar system sources generated across the exposure. 

164 """ 

165 solarSystemSources = makeDiaSources(nSources, diaObjectIds, exposure, rng, randomizeObjects=False) 

166 solarSystemSources["ssObjectId"] = rng.integers(0, high=2**63-1, size=nSources) 

167 solarSystemSources["Err(arcsec)"] = rng.uniform(0.2, 0.4, size=nSources) 

168 

169 return solarSystemSources 

170 

171 

172def makeDiaForcedSources(nForcedSources, diaObjectIds, exposure, rng, randomizeObjects=False): 

173 """Make a test set of DiaSources. 

174 

175 Parameters 

176 ---------- 

177 nForcedSources : `int` 

178 Number of sources to create. 

179 diaObjectIds : `numpy.ndarray` 

180 Integer Ids of diaobjects to "associate" with the DiaSources. 

181 exposure : `lsst.afw.image.Exposure` 

182 Exposure to create sources over. 

183 rng : `numpy.random.Generator` 

184 A NumPy random number generator initialized with a fixed seed for reproducibility. 

185 randomizeObjects : `bool`, optional 

186 If True, randomly draw from `diaObjectIds` to generate the ids in the 

187 output catalog, otherwise just iterate through them. 

188 

189 Returns 

190 ------- 

191 diaForcedSources : `pandas.DataFrame` 

192 DiaForcedSources generated across the exposure. 

193 """ 

194 midpointMjdTai = exposure.visitInfo.date.get(system=DateTime.MJD) 

195 visit = exposure.visitInfo.id 

196 detector = exposure.detector.getId() 

197 if randomizeObjects: 

198 objectIds = diaObjectIds[rng.randint(len(diaObjectIds), size=nForcedSources)] 

199 else: 

200 objectIds = diaObjectIds[[i % len(diaObjectIds) for i in range(nForcedSources)]] 

201 

202 data = [] 

203 bbox = exposure.getBBox() 

204 

205 for i, objId in enumerate(objectIds): 

206 # Put together the minimum values for the alert. 

207 x = rng.uniform(bbox.minX, bbox.maxX) 

208 y = rng.uniform(bbox.minY, bbox.maxY) 

209 coord = exposure.wcs.pixelToSky(x, y) 

210 data.append({"diaForcedSourceId": i + 1, 

211 "visit": visit + i, 

212 "detector": detector, 

213 "diaObjectId": objId, 

214 "ra": coord.getRa().asDegrees(), 

215 "dec": coord.getDec().asDegrees(), 

216 "midpointMjdTai": midpointMjdTai + 1.0 * i, 

217 "timeProcessedMjdTai": DateTime.now().get(system=DateTime.MJD, 

218 scale=DateTime.TAI), 

219 "band": exposure.getFilter().bandLabel}) 

220 

221 return pd.DataFrame(data=data).set_index(["diaObjectId", "diaForcedSourceId"], drop=False) 

222 

223 

224def makeExposure(flipX=False, flipY=False): 

225 """Create an exposure and flip the x or y (or both) coordinates. 

226 

227 Returns bounding boxes that are right or left handed around the bounding 

228 polygon. 

229 

230 Parameters 

231 ---------- 

232 flipX : `bool` 

233 Flip the x coordinate in the WCS. 

234 flipY : `bool` 

235 Flip the y coordinate in the WCS. 

236 

237 Returns 

238 ------- 

239 exposure : `lsst.afw.image.Exposure` 

240 Exposure with a valid bounding box and wcs. 

241 """ 

242 metadata = PropertySet() 

243 

244 metadata.set("SIMPLE", "T") 

245 metadata.set("BITPIX", -32) 

246 metadata.set("NAXIS", 2) 

247 metadata.set("NAXIS1", 1024) 

248 metadata.set("NAXIS2", 1153) 

249 metadata.set("RADECSYS", 'FK5') 

250 metadata.set("EQUINOX", 2000.) 

251 

252 metadata.setDouble("CRVAL1", 215.604025685476) 

253 metadata.setDouble("CRVAL2", 53.1595451514076) 

254 metadata.setDouble("CRPIX1", 1109.99981456774) 

255 metadata.setDouble("CRPIX2", 560.018167811613) 

256 metadata.set("CTYPE1", 'RA---SIN') 

257 metadata.set("CTYPE2", 'DEC--SIN') 

258 

259 xFlip = 1 

260 if flipX: 

261 xFlip = -1 

262 yFlip = 1 

263 if flipY: 

264 yFlip = -1 

265 metadata.setDouble("CD1_1", xFlip * 5.10808596133527E-05) 

266 metadata.setDouble("CD1_2", yFlip * 1.85579539217196E-07) 

267 metadata.setDouble("CD2_2", yFlip * -5.10281493481982E-05) 

268 metadata.setDouble("CD2_1", xFlip * -8.27440751733828E-07) 

269 

270 wcs = afwGeom.makeSkyWcs(metadata) 

271 exposure = afwImage.makeExposure( 

272 afwImage.makeMaskedImageFromArrays(np.ones((1024, 1153))), wcs) 

273 detector = DetectorWrapper(id=23, bbox=exposure.getBBox()).detector 

274 visit = afwImage.VisitInfo( 

275 exposureTime=200., 

276 date=DateTime("2014-05-13T17:00:00.000000000", 

277 DateTime.Timescale.TAI)) 

278 exposure.info.id = 1234 

279 exposure.setDetector(detector) 

280 exposure.info.setVisitInfo(visit) 

281 exposure.setFilter(afwImage.FilterLabel(band='g')) 

282 exposure.setPhotoCalib(afwImage.PhotoCalib(1., 0., exposure.getBBox())) 

283 

284 return exposure 

285 

286 

287def makeRegionTime(exposure=None, begin=None, end=None): 

288 """Make a `RegionTimeInfo` for testing 

289 

290 Parameters 

291 ---------- 

292 exposure : `lsst.afw.image.Exposure`, optional 

293 Exposure to construct a ``RegionTimeInfo`` for. 

294 If None, a default exposure will be created. 

295 begin : `astropy.time.Time`, optional 

296 The start time of the interval. 

297 If `None`, calculate the start time from the midpoint of the exposure 

298 and the exposure length. 

299 end : `astropy.time.Time`, optional 

300 The end time of the interval. 

301 If `None`, calculate the end time from the midpoint of the exposure 

302 and the exposure length. 

303 

304 Returns 

305 ------- 

306 regionTime : `lsst.pipe.base.utils.RegionTimeInfo` 

307 Object containing the spatial region and temporal timespan for an exposure. 

308 """ 

309 if exposure is None: 

310 exposure = makeExposure() 

311 region = getRegion(exposure) 

312 expTime = exposure.visitInfo.exposureTime*astropy.units.second 

313 # visitInfo time is the midpoint of the exposure. 

314 if begin is None: 

315 begin = exposure.visitInfo.date.toAstropy() - expTime/2 

316 if end is None: 

317 end = exposure.visitInfo.date.toAstropy() + expTime/2 

318 timespan = dafButler.Timespan(begin=begin, end=end) 

319 return RegionTimeInfo(region=region, timespan=timespan)