Coverage for metadetect / tests / test_fitting_joint.py: 6%

464 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:20 +0000

1import numpy as np 

2import ngmix 

3 

4import pytest 

5 

6from .sim import make_mbobs_sim 

7from metadetect.fitting import ( 

8 fit_mbobs_admom, 

9 fit_mbobs_list_joint, 

10 make_coadd_obs, 

11 get_admom_runner, 

12 symmetrize_obs_weights, 

13 fit_mbobs_gauss, 

14) 

15from metadetect import procflags 

16 

17 

18@pytest.mark.parametrize("case", [ 

19 "shear_bands_bad_oneband", 

20 "shear_bands_bad", 

21 "shear_bands_outofrange", 

22 "bad_img_jacob", 

23 "bad_img_shape", 

24 "missing_psf", 

25 "bad_psf_jacob", 

26 "bad_psf_img_shape", 

27 "zero_weights", 

28 "missing_attrs", 

29 "disjoint_weights", 

30]) 

31def test_make_coadd_obs_errors(case): 

32 ran_one = False 

33 

34 if case == "shear_bands_bad_oneband": 

35 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0) 

36 coadd_obs, flags = make_coadd_obs( 

37 mbobs, shear_bands=[10] 

38 ) 

39 assert coadd_obs is None 

40 assert flags == procflags.INCONSISTENT_BANDS 

41 ran_one = True 

42 elif case == "shear_bands_bad": 

43 mbobs = make_mbobs_sim(45, 3, wcs_var_scale=0) 

44 coadd_obs, flags = make_coadd_obs( 

45 mbobs, shear_bands=[10] 

46 ) 

47 assert coadd_obs is None 

48 assert flags == procflags.INCONSISTENT_BANDS 

49 ran_one = True 

50 elif case == "shear_bands_outofrange": 

51 mbobs = make_mbobs_sim(45, 3, wcs_var_scale=0) 

52 coadd_obs, flags = make_coadd_obs( 

53 mbobs, shear_bands=[10, 13, 0] 

54 ) 

55 assert coadd_obs is None 

56 assert flags == procflags.INCONSISTENT_BANDS 

57 ran_one = True 

58 elif case == "bad_img_jacob": 

59 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

60 mbobs[1][0] = ngmix.Observation( 

61 image=mbobs[0][0].image, 

62 jacobian=ngmix.DiagonalJacobian( 

63 scale=0.25, 

64 row=10, 

65 col=9, 

66 ), 

67 psf=mbobs[1][0].psf, 

68 ) 

69 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

70 assert coadd_obs is None 

71 assert flags == procflags.INCONSISTENT_BANDS 

72 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

73 assert coadd_obs is None 

74 assert flags == procflags.INCONSISTENT_BANDS 

75 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

76 assert coadd_obs is not None 

77 assert flags == 0 

78 ran_one = True 

79 elif case == "bad_img_shape": 

80 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

81 mbobs[1][0] = ngmix.Observation( 

82 image=np.zeros((5, 5)), 

83 jacobian=mbobs[1][0].jacobian, 

84 psf=mbobs[1][0].psf, 

85 ) 

86 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

87 assert coadd_obs is None 

88 assert flags == procflags.INCONSISTENT_BANDS 

89 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

90 assert coadd_obs is None 

91 assert flags == procflags.INCONSISTENT_BANDS 

92 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

93 assert coadd_obs is not None 

94 assert flags == 0 

95 ran_one = True 

96 elif case == "missing_psf": 

97 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

98 mbobs[1][0] = ngmix.Observation( 

99 image=mbobs[1][0].image, 

100 jacobian=mbobs[1][0].jacobian, 

101 ) 

102 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

103 assert coadd_obs is None 

104 assert flags == procflags.INCONSISTENT_BANDS 

105 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

106 assert coadd_obs is None 

107 assert flags == procflags.INCONSISTENT_BANDS 

108 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

109 assert coadd_obs is not None 

110 assert flags == 0 

111 ran_one = True 

112 elif case == "bad_psf_jacob": 

113 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

114 mbobs[1][0].psf = ngmix.Observation( 

115 image=mbobs[1][0].psf.image, 

116 jacobian=ngmix.DiagonalJacobian( 

117 scale=0.25, 

118 row=10, 

119 col=9, 

120 ), 

121 ) 

122 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

123 assert coadd_obs is None 

124 assert flags == procflags.INCONSISTENT_BANDS 

125 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

126 assert coadd_obs is None 

127 assert flags == procflags.INCONSISTENT_BANDS 

128 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

129 assert coadd_obs is not None 

130 assert flags == 0 

131 ran_one = True 

132 elif case == "bad_psf_img_shape": 

133 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

134 mbobs[1][0].psf = ngmix.Observation( 

135 image=np.zeros((5, 5)), 

136 jacobian=mbobs[1][0].psf.jacobian, 

137 ) 

138 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

139 assert coadd_obs is None 

140 assert flags == procflags.INCONSISTENT_BANDS 

141 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

142 assert coadd_obs is None 

143 assert flags == procflags.INCONSISTENT_BANDS 

144 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

145 assert coadd_obs is not None 

146 assert flags == 0 

147 ran_one = True 

148 elif case == "zero_weights": 

149 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

150 mbobs[1][0] = ngmix.Observation( 

151 image=mbobs[1][0].image, 

152 jacobian=mbobs[1][0].jacobian, 

153 psf=mbobs[1][0].psf, 

154 weight=np.zeros_like(mbobs[1][0].image), 

155 ignore_zero_weight=False, 

156 ) 

157 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

158 assert coadd_obs is None 

159 assert flags == procflags.ZERO_WEIGHTS 

160 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

161 assert coadd_obs is None 

162 assert flags == procflags.ZERO_WEIGHTS 

163 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

164 assert coadd_obs is not None 

165 assert flags == 0 

166 ran_one = True 

167 elif case == "missing_attrs": 

168 for attr in ["mfrac", "noise", "bmask", "ormask"]: 

169 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

170 kwargs = { 

171 "mfrac": mbobs[1][0].mfrac, 

172 "noise": mbobs[1][0].noise, 

173 "bmask": mbobs[1][0].bmask, 

174 "ormask": mbobs[1][0].ormask, 

175 } 

176 kwargs.pop(attr) 

177 mbobs[1][0] = ngmix.Observation( 

178 image=mbobs[1][0].image, 

179 jacobian=mbobs[1][0].jacobian, 

180 psf=mbobs[1][0].psf, 

181 weight=mbobs[1][0].weight, 

182 **kwargs, 

183 ) 

184 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

185 assert coadd_obs is None 

186 assert flags == procflags.INCONSISTENT_BANDS 

187 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

188 assert coadd_obs is None 

189 assert flags == procflags.INCONSISTENT_BANDS 

190 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

191 assert coadd_obs is not None 

192 assert flags == 0 

193 ran_one = True 

194 elif case == "disjoint_weights": 

195 # if two images have disjoint weight maps where no non-zero 

196 # areas overlap, then we cannot coadd 

197 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

198 wmsk = np.zeros_like(mbobs[1][0].image) 

199 wmsk[:10, :] = 1 

200 mbobs[0][0] = ngmix.Observation( 

201 image=mbobs[0][0].image, 

202 jacobian=mbobs[0][0].jacobian, 

203 psf=mbobs[0][0].psf, 

204 weight=mbobs[0][0].weight * (1.0 - wmsk), 

205 mfrac=mbobs[0][0].mfrac, 

206 ormask=mbobs[0][0].ormask, 

207 bmask=mbobs[0][0].bmask, 

208 noise=mbobs[0][0].noise, 

209 ) 

210 mbobs[1][0] = ngmix.Observation( 

211 image=mbobs[1][0].image, 

212 jacobian=mbobs[1][0].jacobian, 

213 psf=mbobs[1][0].psf, 

214 weight=mbobs[1][0].weight * wmsk, 

215 mfrac=mbobs[1][0].mfrac, 

216 ormask=mbobs[1][0].ormask, 

217 bmask=mbobs[1][0].bmask, 

218 noise=mbobs[1][0].noise, 

219 ) 

220 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

221 assert coadd_obs is None 

222 assert flags == procflags.ZERO_WEIGHTS 

223 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

224 assert coadd_obs is None 

225 assert flags == procflags.ZERO_WEIGHTS 

226 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3]) 

227 assert coadd_obs is not None 

228 assert flags == 0 

229 ran_one = True 

230 else: 

231 assert False, f"case {case} not found!" 

232 

233 assert ran_one, "No tests ran!" 

234 

235 

236def test_make_coadd_obs_single(): 

237 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

238 

239 coadd_obs, flags = make_coadd_obs(mbobs[:1]) 

240 assert coadd_obs is mbobs[0][0] 

241 assert flags == 0 

242 

243 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[2]) 

244 assert coadd_obs is mbobs[2][0] 

245 assert flags == 0 

246 

247 

248def test_make_coadd_obs_psf_weight(): 

249 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

250 

251 coadd_obs, flags = make_coadd_obs(mbobs) 

252 assert np.allclose( 

253 coadd_obs.psf.weight, 

254 1.0/(np.sqrt(np.sum(coadd_obs.psf.image**2))/1000.0)**2 

255 ) 

256 

257 

258def test_make_coadd_obs_symmetric(): 

259 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

260 

261 coadd_obs01, flags = make_coadd_obs(mbobs, shear_bands=[0, 1]) 

262 assert flags == 0 

263 

264 coadd_obs10, flags = make_coadd_obs(mbobs, shear_bands=[1, 0]) 

265 assert flags == 0 

266 

267 assert repr(coadd_obs01.jacobian) == repr(coadd_obs10.jacobian) 

268 assert coadd_obs01.meta == mbobs[1][0].meta 

269 assert coadd_obs10.meta == mbobs[0][0].meta 

270 for attr in [ 

271 "image", "weight", 

272 "bmask", "ormask", "noise", "mfrac", 

273 ]: 

274 assert np.array_equal( 

275 getattr(coadd_obs01, attr), 

276 getattr(coadd_obs10, attr), 

277 ) 

278 

279 assert repr(coadd_obs01.psf.jacobian) == repr(coadd_obs10.psf.jacobian) 

280 assert coadd_obs01.psf.meta == mbobs[1][0].psf.meta 

281 assert coadd_obs10.psf.meta == mbobs[0][0].psf.meta 

282 for attr in ["image", "weight"]: 

283 assert np.array_equal( 

284 getattr(coadd_obs01.psf, attr), 

285 getattr(coadd_obs10.psf, attr), 

286 ) 

287 

288 

289def _or_arrays(arrs): 

290 res = arrs[0].copy() 

291 for arr in arrs[1:]: 

292 res |= arr 

293 

294 return res 

295 

296 

297@pytest.mark.parametrize("shear_bands", [None, [0, 1], [3, 1, 2]]) 

298def test_make_coadd_obs_shear_bands(shear_bands): 

299 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

300 

301 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=shear_bands) 

302 assert flags == 0 

303 

304 # make sure different shear bands give different answers 

305 coadd_obs_none, flags = make_coadd_obs(mbobs, shear_bands=None) 

306 assert flags == 0 

307 

308 if shear_bands is not None: 

309 assert not np.array_equal( 

310 coadd_obs.image, 

311 coadd_obs_none.image 

312 ) 

313 else: 

314 assert np.array_equal( 

315 coadd_obs.image, 

316 coadd_obs_none.image 

317 ) 

318 

319 if shear_bands is None: 

320 shear_bands = list(range(len(mbobs))) 

321 

322 # the coadded image should have higher s/n 

323 assert all( 

324 coadd_obs.get_s2n() 

325 > mbobs[sb][0].get_s2n() for sb in shear_bands 

326 ) 

327 

328 # total flux should be close 

329 fluxes = list( 

330 mbobs[sb][0].image.sum() 

331 for sb in shear_bands 

332 ) 

333 assert np.allclose( 

334 coadd_obs.image.sum(), 

335 fluxes, 

336 atol=0, 

337 rtol=0.2, 

338 ), (coadd_obs.image.sum(), fluxes) 

339 

340 # psf should sum to unity 

341 assert np.allclose(coadd_obs.psf.image.sum(), 1) 

342 

343 # jacobians and image shapes should be the same 

344 assert repr(coadd_obs.jacobian) == repr(mbobs[0][0].jacobian) 

345 assert coadd_obs.image.shape == mbobs[0][0].image.shape 

346 assert repr(coadd_obs.psf.jacobian) == repr(mbobs[0][0].psf.jacobian) 

347 assert coadd_obs.psf.image.shape == mbobs[0][0].psf.image.shape 

348 assert coadd_obs.meta == mbobs[shear_bands[-1]][0].meta 

349 

350 assert np.array_equal( 

351 coadd_obs.bmask, 

352 _or_arrays([mbobs[sb][0].bmask for sb in shear_bands]), 

353 ) 

354 assert np.array_equal( 

355 coadd_obs.ormask, 

356 _or_arrays([mbobs[sb][0].ormask for sb in shear_bands]), 

357 ) 

358 

359 wgts = [np.median(mbobs[sb][0].weight) for sb in shear_bands] 

360 wgts = np.array(wgts) 

361 wgts /= np.sum(wgts) 

362 # weights should be different 

363 assert not np.allclose(wgts[0], wgts) 

364 

365 # check the actual values in coadded attributes 

366 image = np.zeros_like(mbobs[0][0].image) 

367 weight = np.zeros_like(mbobs[0][0].image) 

368 mfrac = np.zeros_like(mbobs[0][0].image) 

369 noise = np.zeros_like(mbobs[0][0].image) 

370 psf_image = np.zeros_like(mbobs[0][0].psf.image) 

371 for i, sb in enumerate(shear_bands): 

372 image += mbobs[sb][0].image * wgts[i] 

373 mfrac += mbobs[sb][0].mfrac * wgts[i] 

374 noise += mbobs[sb][0].noise * wgts[i] 

375 psf_image += mbobs[sb][0].psf.image * wgts[i] 

376 weight += wgts[i]**2 / mbobs[sb][0].weight 

377 weight = 1.0 / weight 

378 assert np.allclose(image, coadd_obs.image) 

379 assert np.allclose(weight, coadd_obs.weight) 

380 assert np.allclose(mfrac, coadd_obs.mfrac) 

381 assert np.allclose(noise, coadd_obs.noise) 

382 assert np.allclose(psf_image, coadd_obs.psf.image) 

383 

384 

385def test_fit_mbobs_list_joint_errors(): 

386 mbobs_list = [ 

387 make_mbobs_sim(45, 3, wcs_var_scale=0), 

388 make_mbobs_sim(46, 3, wcs_var_scale=0), 

389 ] 

390 rng = np.random.RandomState(seed=4235) 

391 with pytest.raises(RuntimeError) as err: 

392 fit_mbobs_list_joint( 

393 mbobs_list=mbobs_list, 

394 fitter_name="blah", 

395 bmask_flags=0, 

396 rng=rng, 

397 shear_bands=None, 

398 ) 

399 assert "fitter 'blah'" in str(err.value) 

400 

401 

402def test_fit_mbobs_list_joint_empty(): 

403 mbobs_list = [] 

404 rng = np.random.RandomState(seed=4235) 

405 res = fit_mbobs_list_joint( 

406 mbobs_list=mbobs_list, 

407 fitter_name="am", 

408 bmask_flags=0, 

409 rng=rng, 

410 shear_bands=None, 

411 ) 

412 assert res is None 

413 

414 

415@pytest.mark.parametrize("shear_bands", [None, [0, 1], [2, 3, 1]]) 

416@pytest.mark.parametrize("fname", ["am", "admom"]) 

417def test_fit_mbobs_list_joint_fits_all(shear_bands, fname): 

418 mbobs_list = [ 

419 make_mbobs_sim(45, 4, wcs_var_scale=0), 

420 make_mbobs_sim(46, 4, wcs_var_scale=0), 

421 make_mbobs_sim(47, 4, wcs_var_scale=0), 

422 ] 

423 rng = np.random.RandomState(seed=4235) 

424 res = fit_mbobs_list_joint( 

425 mbobs_list=mbobs_list, 

426 fitter_name=fname, 

427 bmask_flags=0, 

428 rng=rng, 

429 shear_bands=shear_bands, 

430 ) 

431 assert res.shape == (3,) 

432 

433 if shear_bands is None: 

434 shear_bands = list(range(4)) 

435 assert np.all( 

436 res["shear_bands"] 

437 == "".join(str(sb) for sb in sorted(shear_bands)) 

438 ) 

439 

440 rng = np.random.RandomState(seed=4235) 

441 for i in range(3): 

442 res1 = fit_mbobs_admom( 

443 mbobs=mbobs_list[i], 

444 bmask_flags=0, 

445 rng=rng, 

446 shear_bands=shear_bands, 

447 ) 

448 

449 for col in res.dtype.names: 

450 np.testing.assert_array_equal(res[i:i+1][col], res1[col]) 

451 

452 

453@pytest.mark.parametrize("coadd", [True, False]) 

454@pytest.mark.parametrize("symmetrize", [True, False]) 

455@pytest.mark.parametrize("shear_bands", [None, [0], [0, 1], [2, 3, 1]]) 

456@pytest.mark.parametrize("fname", [ 

457 "am", 

458 "admom", 

459 pytest.param("gauss", marks=pytest.mark.xfail), 

460]) 

461def test_fit_mbobs_list_joint_seeding(shear_bands, fname, coadd, symmetrize): 

462 mbobs_list = [ 

463 make_mbobs_sim(45, 4, wcs_var_scale=0), 

464 make_mbobs_sim(46, 4, wcs_var_scale=0), 

465 make_mbobs_sim(47, 4, wcs_var_scale=0), 

466 ] 

467 rng = np.random.RandomState(seed=4235) 

468 res = fit_mbobs_list_joint( 

469 mbobs_list=mbobs_list, 

470 fitter_name=fname, 

471 bmask_flags=0, 

472 rng=rng, 

473 shear_bands=shear_bands, 

474 coadd=coadd, 

475 symmetrize=symmetrize, 

476 ) 

477 

478 mbobs_list = [ 

479 make_mbobs_sim(45, 4, wcs_var_scale=0), 

480 make_mbobs_sim(46, 4, wcs_var_scale=0), 

481 make_mbobs_sim(47, 4, wcs_var_scale=0), 

482 ] 

483 rng1 = np.random.RandomState(seed=4235) 

484 res1 = fit_mbobs_list_joint( 

485 mbobs_list=mbobs_list, 

486 fitter_name=fname, 

487 bmask_flags=0, 

488 rng=rng1, 

489 shear_bands=shear_bands, 

490 coadd=coadd, 

491 symmetrize=symmetrize, 

492 ) 

493 

494 for col in res.dtype.names: 

495 np.testing.assert_array_equal( 

496 res[col], 

497 res1[col], 

498 err_msg=col, 

499 ) 

500 

501 

502@pytest.mark.parametrize("case", [ 

503 "missing_band", 

504 "too_many_bands", 

505 "zero_weights", 

506 "edge_hit", 

507 "coadd_flags", 

508]) 

509def test_fit_mbobs_admom_input_errors(case): 

510 rng = np.random.RandomState(seed=211324) 

511 ran_one = False 

512 

513 if case == "missing_band": 

514 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

515 mbobs[1] = ngmix.ObsList() 

516 res = fit_mbobs_admom( 

517 mbobs=mbobs, 

518 bmask_flags=0, 

519 rng=rng, 

520 shear_bands=None, 

521 ) 

522 ran_one = True 

523 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.MISSING_BAND), ( 

524 procflags.get_procflags_str(res["am_flags"][0]) 

525 ) 

526 elif case == "too_many_bands": 

527 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

528 ol = ngmix.ObsList() 

529 ol.append(mbobs[1][0]) 

530 ol.append(mbobs[1][0]) 

531 mbobs[1] = ol 

532 res = fit_mbobs_admom( 

533 mbobs=mbobs, 

534 bmask_flags=0, 

535 rng=rng, 

536 shear_bands=None, 

537 ) 

538 ran_one = True 

539 assert res["am_flags"] == ( 

540 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS 

541 ), ( 

542 procflags.get_procflags_str(res["am_flags"][0]) 

543 ) 

544 elif case == "zero_weights": 

545 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

546 mbobs[1][0] = ngmix.Observation( 

547 image=mbobs[1][0].image, 

548 jacobian=mbobs[1][0].jacobian, 

549 psf=mbobs[1][0].psf, 

550 weight=np.zeros_like(mbobs[1][0].image), 

551 ignore_zero_weight=False, 

552 bmask=mbobs[1][0].bmask, 

553 ) 

554 res = fit_mbobs_admom( 

555 mbobs=mbobs, 

556 bmask_flags=0, 

557 rng=rng, 

558 shear_bands=None, 

559 ) 

560 ran_one = True 

561 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.ZERO_WEIGHTS), ( 

562 procflags.get_procflags_str(res["am_flags"][0]) 

563 ) 

564 elif case == "edge_hit": 

565 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

566 mbobs[1][0] = ngmix.Observation( 

567 image=mbobs[1][0].image, 

568 jacobian=mbobs[1][0].jacobian, 

569 psf=mbobs[1][0].psf, 

570 weight=mbobs[1][0].weight, 

571 bmask=mbobs[1][0].bmask + 10, 

572 ) 

573 res = fit_mbobs_admom( 

574 mbobs=mbobs, 

575 bmask_flags=10, 

576 rng=rng, 

577 shear_bands=None, 

578 ) 

579 ran_one = True 

580 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.EDGE_HIT), ( 

581 procflags.get_procflags_str(res["am_flags"][0]) 

582 ) 

583 elif case == "coadd_flags": 

584 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

585 mbobs[1][0] = ngmix.Observation( 

586 image=mbobs[1][0].image, 

587 jacobian=mbobs[1][0].jacobian, 

588 psf=mbobs[1][0].psf, 

589 weight=mbobs[1][0].weight, 

590 bmask=mbobs[1][0].bmask, 

591 ormask=mbobs[1][0].ormask, 

592 noise=mbobs[1][0].noise, 

593 # missing mfrac so coadd fails 

594 ) 

595 res = fit_mbobs_admom( 

596 mbobs=mbobs, 

597 bmask_flags=0, 

598 rng=rng, 

599 shear_bands=None, 

600 ) 

601 ran_one = True 

602 assert res["am_flags"] == ( 

603 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS 

604 ), ( 

605 procflags.get_procflags_str(res["am_flags"][0]) 

606 ) 

607 else: 

608 assert False, f"case {case} not found!" 

609 

610 assert ran_one, "No tests ran!" 

611 

612 

613@pytest.mark.parametrize("shear_bands", [[0], [2]]) 

614def test_fit_mbobs_admom_oneband(shear_bands): 

615 rng = np.random.RandomState(seed=211324) 

616 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

617 res = fit_mbobs_admom( 

618 mbobs=mbobs, 

619 bmask_flags=0, 

620 rng=rng, 

621 shear_bands=shear_bands, 

622 ) 

623 

624 # make sure things are seeded and everything is copied out 

625 # correctly 

626 assert res["am_flags"] == ( 

627 res["am_psf_flags"] | res["am_obj_flags"] 

628 ) 

629 

630 rng = np.random.RandomState(seed=211324) 

631 fitter = get_admom_runner(rng) 

632 pres = fitter.go(mbobs[shear_bands[0]][0].psf) 

633 np.testing.assert_allclose( 

634 res["am_psf_T"][0], pres["T"] 

635 ) 

636 np.testing.assert_allclose( 

637 res["am_psf_g"][0], pres["e"] 

638 ) 

639 

640 sobs = symmetrize_obs_weights(mbobs[shear_bands[0]][0]) 

641 gres = fitter.go(sobs) 

642 np.testing.assert_array_equal(res["am_T_flags"][0], gres["T_flags"]) 

643 np.testing.assert_array_equal(res["am_T"][0], gres["T"]) 

644 np.testing.assert_array_equal(res["am_T_err"][0], gres["T_err"]) 

645 np.testing.assert_array_equal( 

646 res["am_T_ratio"][0], 

647 gres["T"]/pres["T"], 

648 ) 

649 

650 np.testing.assert_array_equal(res["am_obj_flags"][0], gres["flags"]) 

651 np.testing.assert_array_equal(res["am_s2n"][0], gres["s2n"]) 

652 np.testing.assert_array_equal(res["am_g"][0], gres["e"]) 

653 np.testing.assert_array_equal(res["am_g_cov"][0], gres["e_cov"]) 

654 

655 

656@pytest.mark.parametrize("shear_bands", [[0], [2], None, [1, 3, 2]]) 

657def test_fit_mbobs_admom_smoke(shear_bands): 

658 rng = np.random.RandomState(seed=211324) 

659 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

660 res = fit_mbobs_admom( 

661 mbobs=mbobs, 

662 bmask_flags=0, 

663 rng=rng, 

664 shear_bands=shear_bands, 

665 ) 

666 

667 # none of these fits should fail 

668 if shear_bands is None: 

669 shear_bands = list(range(len(mbobs))) 

670 

671 for col in res.dtype.names: 

672 if col.endswith("band_flux_flags"): 

673 assert np.all(res[col] == procflags.NO_ATTEMPT), (col, res[col]) 

674 elif "band_flux" in col: 

675 assert np.all(np.isnan(res[col])), (col, res[col]) 

676 elif col == "shear_bands": 

677 assert np.all( 

678 res[col] 

679 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

680 ), (col, res[col]) 

681 elif not col.endswith("flags"): 

682 assert np.all(np.isfinite(res[col])), (col, res[col]) 

683 else: 

684 assert np.all(res[col] == 0), (col, res[col]) 

685 

686 

687def test_fit_mbobs_admom_symmetrize(): 

688 rng = np.random.RandomState(seed=211324) 

689 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0) 

690 wgt = mbobs[0][0].weight.copy() 

691 msk = rng.uniform(size=wgt.shape) < 0.2 

692 wgt[msk] = 0 

693 wgt[:, 21:] = 0 

694 mbobs[0][0].weight = wgt 

695 assert np.any(mbobs[0][0].weight == 0) 

696 res_nosym = fit_mbobs_admom( 

697 mbobs=mbobs, 

698 bmask_flags=0, 

699 rng=rng, 

700 symmetrize=False, 

701 ) 

702 

703 rng = np.random.RandomState(seed=211324) 

704 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0) 

705 wgt = mbobs[0][0].weight.copy() 

706 msk = rng.uniform(size=wgt.shape) < 0.2 

707 wgt[msk] = 0 

708 wgt[:, 21:] = 0 

709 mbobs[0][0].weight = wgt 

710 assert np.any(mbobs[0][0].weight == 0) 

711 res = fit_mbobs_admom( 

712 mbobs=mbobs, 

713 bmask_flags=0, 

714 rng=rng, 

715 symmetrize=True, 

716 ) 

717 

718 shear_bands = list(range(len(mbobs))) 

719 for col in res.dtype.names: 

720 if col.endswith("band_flux_flags"): 

721 assert np.all(res[col] == procflags.NO_ATTEMPT), (col, res[col]) 

722 assert np.all(res_nosym[col] == procflags.NO_ATTEMPT), (col, res_nosym[col]) 

723 elif "band_flux" in col: 

724 assert np.all(np.isnan(res[col])), (col, res[col]) 

725 assert np.all(np.isnan(res_nosym[col])), (col, res_nosym[col]) 

726 elif col == "shear_bands": 

727 assert np.all( 

728 res[col] 

729 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

730 ), (col, res[col]) 

731 assert np.all( 

732 res_nosym[col] 

733 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

734 ), (col, res_nosym[col]) 

735 elif not col.endswith("flags"): 

736 assert np.all(np.isfinite(res[col])), (col, res[col]) 

737 assert np.all(np.isfinite(res_nosym[col])), (col, res_nosym[col]) 

738 if "psf" not in col: 

739 assert not np.allclose(res[col], res_nosym[col]), ( 

740 col, res[col], res_nosym[col] 

741 ) 

742 else: 

743 assert np.all(res[col] == 0), ( 

744 col, res[col], procflags.get_procflags_str(res[col]) 

745 ) 

746 assert np.all(res_nosym[col] == 0), ( 

747 col, res_nosym[col], procflags.get_procflags_str(res_nosym[col]) 

748 ) 

749 

750 

751@pytest.mark.parametrize("shear_bands", [[0], [2], None, [1, 3, 2]]) 

752def test_fit_mbobs_gauss_smoke(shear_bands): 

753 rng = np.random.RandomState(seed=211324) 

754 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

755 res = fit_mbobs_gauss( 

756 mbobs=mbobs, 

757 bmask_flags=0, 

758 rng=rng, 

759 shear_bands=shear_bands, 

760 ) 

761 

762 # none of these fits should fail 

763 if shear_bands is None: 

764 shear_bands = list(range(len(mbobs))) 

765 

766 for col in res.dtype.names: 

767 if col.endswith("band_flux_flags"): 

768 assert np.all(res[col][0, shear_bands] == 0), (col, res[col]) 

769 if len(shear_bands) != len(mbobs): 

770 tocheck = [i for i in range(len(mbobs)) if i not in shear_bands] 

771 assert np.all( 

772 res[col][:, tocheck] == procflags.NO_ATTEMPT 

773 ), (col, res[col]) 

774 

775 elif "band_flux" in col: 

776 assert np.all(np.isfinite(res[col][0, shear_bands])), (col, res[col]) 

777 elif col == "shear_bands": 

778 assert np.all( 

779 res[col] 

780 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

781 ), (col, res[col]) 

782 elif not col.endswith("flags"): 

783 assert np.all(np.isfinite(res[col])), (col, res[col]) 

784 else: 

785 assert np.all(res[col] == 0), (col, res[col]) 

786 

787 

788def test_fit_mbobs_gauss_coadd(): 

789 rng = np.random.RandomState(seed=211324) 

790 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

791 res = fit_mbobs_gauss( 

792 mbobs=mbobs, 

793 bmask_flags=0, 

794 rng=rng, 

795 coadd=False, 

796 ) 

797 

798 rng = np.random.RandomState(seed=211324) 

799 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

800 res_coadd = fit_mbobs_gauss( 

801 mbobs=mbobs, 

802 bmask_flags=0, 

803 rng=rng, 

804 coadd=True, 

805 ) 

806 

807 shear_bands = list(range(len(mbobs))) 

808 

809 for col in res.dtype.names: 

810 if col.endswith("band_flux_flags"): 

811 assert np.all(res[col][0, shear_bands] == 0), (col, res[col]) 

812 assert np.all(res_coadd[col][0, shear_bands] == 0), (col, res_coadd[col]) 

813 if len(shear_bands) != len(mbobs): 

814 assert np.any( 

815 res[col][0, shear_bands] == procflags.NO_ATTEMPT 

816 ), (col, res[col]) 

817 

818 tocheck = [i for i in range(len(mbobs)) if i not in shear_bands] 

819 assert np.all( 

820 res_coadd[col][0, tocheck] == procflags.NO_ATTEMPT 

821 ), (col, res_coadd[col]) 

822 elif col.endswith("band_flux"): 

823 assert np.all(np.isfinite(res[col][0, shear_bands])), (col, res[col]) 

824 assert np.all( 

825 np.isfinite(res_coadd[col][0, shear_bands]) 

826 ), (col, res_coadd[col]) 

827 elif col == "shear_bands": 

828 assert np.all( 

829 res[col] 

830 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

831 ), (col, res[col]) 

832 assert np.all( 

833 res_coadd[col] 

834 == "".join(f"{sb}" for sb in sorted(shear_bands)) 

835 ), (col, res_coadd[col]) 

836 elif not col.endswith("flags"): 

837 assert np.all(np.isfinite(res[col])), (col, res[col]) 

838 assert np.all(np.isfinite(res_coadd[col])), (col, res_coadd[col]) 

839 assert not np.allclose(res[col], res_coadd[col]), ( 

840 col, res[col], res_coadd[col] 

841 ) 

842 else: 

843 assert np.all(res[col] == 0), (col, res[col]) 

844 assert np.all(res_coadd[col] == 0), (col, res_coadd[col]) 

845 

846 

847@pytest.mark.parametrize("coadd", [True, False]) 

848@pytest.mark.parametrize("case", [ 

849 "missing_band", 

850 "too_many_bands", 

851 "zero_weights", 

852 "edge_hit", 

853 "shear_bands", 

854 "coadd_flags", 

855]) 

856def test_fit_mbobs_gauss_input_errors(case, coadd): 

857 rng = np.random.RandomState(seed=211324) 

858 ran_one = False 

859 

860 if case == "missing_band": 

861 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

862 mbobs[1] = ngmix.ObsList() 

863 res = fit_mbobs_gauss( 

864 mbobs=mbobs, 

865 bmask_flags=0, 

866 rng=rng, 

867 shear_bands=None, 

868 coadd=coadd, 

869 ) 

870 ran_one = True 

871 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.MISSING_BAND), ( 

872 procflags.get_procflags_str(res["gauss_flags"][0]) 

873 ) 

874 elif case == "too_many_bands": 

875 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

876 ol = ngmix.ObsList() 

877 ol.append(mbobs[1][0]) 

878 ol.append(mbobs[1][0]) 

879 mbobs[1] = ol 

880 res = fit_mbobs_gauss( 

881 mbobs=mbobs, 

882 bmask_flags=0, 

883 rng=rng, 

884 shear_bands=None, 

885 coadd=coadd, 

886 ) 

887 ran_one = True 

888 assert res["gauss_flags"] == ( 

889 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS 

890 ), ( 

891 procflags.get_procflags_str(res["gauss_flags"][0]) 

892 ) 

893 elif case == "zero_weights": 

894 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

895 mbobs[1][0] = ngmix.Observation( 

896 image=mbobs[1][0].image, 

897 jacobian=mbobs[1][0].jacobian, 

898 psf=mbobs[1][0].psf, 

899 weight=np.zeros_like(mbobs[1][0].image), 

900 ignore_zero_weight=False, 

901 bmask=mbobs[1][0].bmask, 

902 ) 

903 res = fit_mbobs_gauss( 

904 mbobs=mbobs, 

905 bmask_flags=0, 

906 rng=rng, 

907 shear_bands=None, 

908 coadd=coadd, 

909 ) 

910 ran_one = True 

911 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.ZERO_WEIGHTS), ( 

912 procflags.get_procflags_str(res["gauss_flags"][0]) 

913 ) 

914 elif case == "edge_hit": 

915 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

916 mbobs[1][0] = ngmix.Observation( 

917 image=mbobs[1][0].image, 

918 jacobian=mbobs[1][0].jacobian, 

919 psf=mbobs[1][0].psf, 

920 weight=mbobs[1][0].weight, 

921 bmask=mbobs[1][0].bmask + 10, 

922 ) 

923 res = fit_mbobs_gauss( 

924 mbobs=mbobs, 

925 bmask_flags=10, 

926 rng=rng, 

927 shear_bands=None, 

928 coadd=coadd, 

929 ) 

930 ran_one = True 

931 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.EDGE_HIT), ( 

932 procflags.get_procflags_str(res["gauss_flags"][0]) 

933 ) 

934 elif case == "shear_bands": 

935 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

936 res = fit_mbobs_gauss( 

937 mbobs=mbobs, 

938 bmask_flags=0, 

939 rng=rng, 

940 shear_bands=[1, 2, 10], 

941 coadd=coadd, 

942 ) 

943 ran_one = True 

944 assert res["gauss_flags"] == ( 

945 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS 

946 ), ( 

947 procflags.get_procflags_str(res["gauss_flags"][0]) 

948 ) 

949 elif case == "coadd_flags": 

950 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0) 

951 mbobs[1][0] = ngmix.Observation( 

952 image=mbobs[1][0].image, 

953 jacobian=mbobs[1][0].jacobian, 

954 psf=mbobs[1][0].psf, 

955 weight=mbobs[1][0].weight, 

956 bmask=mbobs[1][0].bmask, 

957 ormask=mbobs[1][0].ormask, 

958 noise=mbobs[1][0].noise, 

959 # missing mfrac so coadd fails 

960 ) 

961 res = fit_mbobs_gauss( 

962 mbobs=mbobs, 

963 bmask_flags=0, 

964 rng=rng, 

965 shear_bands=None, 

966 coadd=coadd, 

967 ) 

968 ran_one = True 

969 if coadd: 

970 assert res["gauss_flags"] == ( 

971 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS 

972 ), ( 

973 procflags.get_procflags_str(res["gauss_flags"][0]) 

974 ) 

975 else: 

976 assert res["gauss_flags"] == 0, ( 

977 procflags.get_procflags_str(res["gauss_flags"][0]) 

978 ) 

979 else: 

980 assert False, f"case {case} not found!" 

981 

982 assert ran_one, "No tests ran!"