Coverage for metadetect / tests / sim.py: 14%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:10 +0000

1import numpy as np 

2import ngmix 

3import galsim 

4 

5MAX_NBAND = 4 

6DEFAULT_SIM_CONFIG = { 

7 'nobj': 4, 

8 'nband': 3, 

9 'noises': (0.0005, 0.001, 0.0015, 0.002), 

10 'scale': 0.263, 

11 'psf_fwhm': 0.9, 

12 'dims': (225, 225), 

13 'flux_low': 0.5, 

14 'flux_high': 1.5, 

15 'r50_low': 0.1, 

16 'r50_high': 2.0, 

17 'g_std': 0.2, 

18 'fracdev_low': 0.001, 

19 'fracdev_high': 0.99, 

20 'bulge_colors': np.array([0.5, 1.0, 1.5, 2.5]), 

21 'disk_colors': np.array([1.25, 1.0, 0.75, 0.5]), 

22} 

23 

24 

25class Sim(dict): 

26 def __init__(self, rng, config=None): 

27 self.update(DEFAULT_SIM_CONFIG) 

28 

29 if config is not None: 

30 self.update(config) 

31 

32 self['pos_width'] = self['dims'][0]/2.0*0.5 * self['scale'] 

33 self.rng = rng 

34 

35 self._set_wcs() 

36 self._make_psf() 

37 self._gpdf = ngmix.priors.GPriorBA( 

38 self['g_std'], 

39 rng=self.rng, 

40 ) 

41 

42 def get_mbobs(self): 

43 """ 

44 get a simulated MultiBandObsList 

45 """ 

46 all_band_obj = self._get_band_objects() 

47 

48 mbobs = ngmix.MultiBandObsList() 

49 for _band in range(self['nband']): 

50 band = _band % MAX_NBAND 

51 band_objects = [o[band] for o in all_band_obj] 

52 obj = galsim.Sum(band_objects) 

53 

54 im = obj.drawImage( 

55 nx=self['dims'][1], 

56 ny=self['dims'][0], 

57 scale=self['scale'] 

58 ).array 

59 

60 im += self.rng.normal(scale=self['noises'][band], size=im.shape) 

61 wt = im*0 + 1.0/self['noises'][band]**2 

62 bmask = np.zeros(im.shape, dtype='i4') 

63 ormask = np.zeros(im.shape, dtype='i4') 

64 nse = self.rng.normal(scale=self['noises'][band], size=im.shape) 

65 

66 obs = ngmix.Observation( 

67 im, 

68 weight=wt, 

69 bmask=bmask, 

70 ormask=ormask, 

71 jacobian=self._jacobian, 

72 psf=self._psf_obs.copy(), 

73 noise=nse, 

74 ignore_zero_weight=False, 

75 ) 

76 

77 obslist = ngmix.ObsList() 

78 obslist.append(obs) 

79 mbobs.append(obslist) 

80 

81 return mbobs 

82 

83 def _get_r50(self): 

84 return self.rng.uniform( 

85 low=self['r50_low'], 

86 high=self['r50_high'], 

87 ) 

88 

89 def _get_flux(self): 

90 return self.rng.uniform( 

91 low=self['flux_low'], 

92 high=self['flux_high'], 

93 ) 

94 

95 def _get_fracdev(self): 

96 return self.rng.uniform( 

97 low=self['fracdev_low'], 

98 high=self['fracdev_high'], 

99 ) 

100 

101 def _get_g(self): 

102 return self._gpdf.sample2d() 

103 

104 def _get_dxdy(self): 

105 return self.rng.uniform( 

106 low=-self['pos_width'], 

107 high=self['pos_width'], 

108 size=2, 

109 ) 

110 

111 def _get_band_objects(self): 

112 

113 all_band_obj = [] 

114 for i in range(self['nobj']): 

115 r50 = self._get_r50() 

116 flux = self._get_flux() 

117 fracdev = self._get_fracdev() 

118 dx, dy = self._get_dxdy() 

119 

120 g1d, g2d = self._get_g() 

121 g1b = 0.5*g1d 

122 g2b = 0.5*g2d 

123 

124 flux_bulge = fracdev*flux 

125 flux_disk = (1-fracdev)*flux 

126 

127 bulge_obj = galsim.DeVaucouleurs( 

128 half_light_radius=r50 

129 ).shear(g1=g1b, g2=g2b) 

130 

131 disk_obj = galsim.Exponential( 

132 half_light_radius=r50 

133 ).shear(g1=g1d, g2=g2d) 

134 

135 band_objs = [] 

136 for _band in range(self['nband']): 

137 band = _band % MAX_NBAND 

138 band_disk = \ 

139 disk_obj.withFlux(flux_disk*self['disk_colors'][band]) 

140 band_bulge = \ 

141 bulge_obj.withFlux(flux_bulge*self['bulge_colors'][band]) 

142 

143 obj = galsim.Sum(band_disk, band_bulge).shift(dx=dx, dy=dy) 

144 obj = galsim.Convolve(obj, self._psf) 

145 band_objs.append(obj) 

146 

147 all_band_obj.append(band_objs) 

148 

149 return all_band_obj 

150 

151 def _set_wcs(self): 

152 self._jacobian = ngmix.DiagonalJacobian( 

153 row=0, 

154 col=0, 

155 scale=self['scale'], 

156 ) 

157 

158 def _make_psf(self): 

159 

160 self._psf = galsim.Gaussian(fwhm=self['psf_fwhm']) 

161 

162 psf_im = self._psf.drawImage(scale=self['scale']).array 

163 noise = psf_im.max()/1000.0 

164 weight = psf_im + 1.0/noise**2 

165 psf_im += self.rng.normal( 

166 scale=noise, 

167 size=psf_im.shape 

168 ) 

169 

170 cen = (np.array(psf_im.shape)-1.0)/2.0 

171 j = self._jacobian.copy() 

172 j.set_cen(row=cen[0], col=cen[1]) 

173 

174 psf_obs = ngmix.Observation( 

175 psf_im, 

176 weight=weight, 

177 jacobian=j 

178 ) 

179 self._psf_obs = psf_obs 

180 

181 

182def make_mbobs_sim( 

183 seed, nband, simulate_star=False, noise_scale=1, band_flux_factors=None, 

184 band_image_sizes=None, wcs_var_scale=1, 

185): 

186 rng = np.random.RandomState(seed=seed) 

187 

188 if simulate_star: 

189 gal = galsim.Gaussian( 

190 fwhm=1e-6, 

191 ).shear( 

192 g1=rng.uniform(low=-0.1, high=0.1), 

193 g2=rng.uniform(low=-0.1, high=0.1), 

194 ).withFlux( 

195 400 

196 ) 

197 else: 

198 gal = galsim.Exponential( 

199 half_light_radius=rng.uniform(low=0.5, high=0.7), 

200 ).shear( 

201 g1=rng.uniform(low=-0.1, high=0.1), 

202 g2=rng.uniform(low=-0.1, high=0.1), 

203 ).withFlux( 

204 400 

205 ) 

206 mbobs = ngmix.MultiBandObsList() 

207 

208 for band in range(nband): 

209 if band_image_sizes is not None: 

210 dim = band_image_sizes[band] 

211 else: 

212 dim = 35 

213 cen = (dim-1)/2 

214 psf_dim = dim + 17 

215 psf_cen = (psf_dim-1)/2 

216 

217 psf = galsim.Gaussian( 

218 fwhm=rng.uniform(low=0.8, high=0.9), 

219 ).shear( 

220 g1=rng.uniform(low=-0.1, high=0.1), 

221 g2=rng.uniform(low=-0.1, high=0.1), 

222 ) 

223 

224 gs_wcs = galsim.ShearWCS( 

225 0.25, 

226 galsim.Shear( 

227 g1=rng.uniform(low=-0.1, high=0.1) * wcs_var_scale, 

228 g2=rng.uniform(low=-0.1, high=0.1) * wcs_var_scale, 

229 ) 

230 ).jacobian() 

231 offset = rng.uniform(low=-0.5, high=0.5, size=2) * wcs_var_scale 

232 

233 if band_flux_factors is not None: 

234 flux_factor = band_flux_factors[band] 

235 else: 

236 flux_factor = 1.0 

237 obj = galsim.Convolve([gal * flux_factor, psf]) 

238 

239 im = obj.drawImage(nx=dim, ny=dim, wcs=gs_wcs, offset=offset).array 

240 nse = np.sqrt(np.sum(im**2)) / rng.uniform(low=10, high=100) * noise_scale 

241 im += rng.normal(size=im.shape, scale=nse) 

242 

243 psf_im = psf.drawImage(nx=psf_dim, ny=psf_dim, wcs=gs_wcs).array 

244 psf_obs = ngmix.Observation( 

245 image=psf_im, 

246 jacobian=ngmix.Jacobian( 

247 row=psf_cen, 

248 col=psf_cen, 

249 wcs=gs_wcs, 

250 ) 

251 ) 

252 

253 obslist = ngmix.ObsList() 

254 obslist.append( 

255 ngmix.Observation( 

256 image=im, 

257 weight=np.ones_like(im) / nse**2, 

258 jacobian=ngmix.Jacobian( 

259 row=cen+offset[1], 

260 col=cen+offset[0], 

261 wcs=gs_wcs, 

262 ), 

263 bmask=np.zeros_like(im, dtype=np.int32), 

264 ormask=(rng.uniform(size=im.shape) * 2**29).astype(int), 

265 mfrac=rng.uniform(size=im.shape), 

266 psf=psf_obs, 

267 meta={"wgt": 1.0/nse**2}, 

268 noise=rng.normal(size=im.shape, scale=nse), 

269 ) 

270 ) 

271 mbobs.append(obslist) 

272 

273 return mbobs