Coverage for metadetect / tests / test_fitting_joint.py: 6%
464 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:39 +0000
1import numpy as np
2import ngmix
4import pytest
6from .sim import make_mbobs_sim
7from metadetect.fitting import (
8 fit_mbobs_admom,
9 fit_mbobs_list_joint,
10 make_coadd_obs,
11 get_admom_runner,
12 symmetrize_obs_weights,
13 fit_mbobs_gauss,
14)
15from metadetect import procflags
18@pytest.mark.parametrize("case", [
19 "shear_bands_bad_oneband",
20 "shear_bands_bad",
21 "shear_bands_outofrange",
22 "bad_img_jacob",
23 "bad_img_shape",
24 "missing_psf",
25 "bad_psf_jacob",
26 "bad_psf_img_shape",
27 "zero_weights",
28 "missing_attrs",
29 "disjoint_weights",
30])
31def test_make_coadd_obs_errors(case):
32 ran_one = False
34 if case == "shear_bands_bad_oneband":
35 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0)
36 coadd_obs, flags = make_coadd_obs(
37 mbobs, shear_bands=[10]
38 )
39 assert coadd_obs is None
40 assert flags == procflags.INCONSISTENT_BANDS
41 ran_one = True
42 elif case == "shear_bands_bad":
43 mbobs = make_mbobs_sim(45, 3, wcs_var_scale=0)
44 coadd_obs, flags = make_coadd_obs(
45 mbobs, shear_bands=[10]
46 )
47 assert coadd_obs is None
48 assert flags == procflags.INCONSISTENT_BANDS
49 ran_one = True
50 elif case == "shear_bands_outofrange":
51 mbobs = make_mbobs_sim(45, 3, wcs_var_scale=0)
52 coadd_obs, flags = make_coadd_obs(
53 mbobs, shear_bands=[10, 13, 0]
54 )
55 assert coadd_obs is None
56 assert flags == procflags.INCONSISTENT_BANDS
57 ran_one = True
58 elif case == "bad_img_jacob":
59 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
60 mbobs[1][0] = ngmix.Observation(
61 image=mbobs[0][0].image,
62 jacobian=ngmix.DiagonalJacobian(
63 scale=0.25,
64 row=10,
65 col=9,
66 ),
67 psf=mbobs[1][0].psf,
68 )
69 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
70 assert coadd_obs is None
71 assert flags == procflags.INCONSISTENT_BANDS
72 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
73 assert coadd_obs is None
74 assert flags == procflags.INCONSISTENT_BANDS
75 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
76 assert coadd_obs is not None
77 assert flags == 0
78 ran_one = True
79 elif case == "bad_img_shape":
80 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
81 mbobs[1][0] = ngmix.Observation(
82 image=np.zeros((5, 5)),
83 jacobian=mbobs[1][0].jacobian,
84 psf=mbobs[1][0].psf,
85 )
86 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
87 assert coadd_obs is None
88 assert flags == procflags.INCONSISTENT_BANDS
89 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
90 assert coadd_obs is None
91 assert flags == procflags.INCONSISTENT_BANDS
92 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
93 assert coadd_obs is not None
94 assert flags == 0
95 ran_one = True
96 elif case == "missing_psf":
97 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
98 mbobs[1][0] = ngmix.Observation(
99 image=mbobs[1][0].image,
100 jacobian=mbobs[1][0].jacobian,
101 )
102 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
103 assert coadd_obs is None
104 assert flags == procflags.INCONSISTENT_BANDS
105 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
106 assert coadd_obs is None
107 assert flags == procflags.INCONSISTENT_BANDS
108 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
109 assert coadd_obs is not None
110 assert flags == 0
111 ran_one = True
112 elif case == "bad_psf_jacob":
113 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
114 mbobs[1][0].psf = ngmix.Observation(
115 image=mbobs[1][0].psf.image,
116 jacobian=ngmix.DiagonalJacobian(
117 scale=0.25,
118 row=10,
119 col=9,
120 ),
121 )
122 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
123 assert coadd_obs is None
124 assert flags == procflags.INCONSISTENT_BANDS
125 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
126 assert coadd_obs is None
127 assert flags == procflags.INCONSISTENT_BANDS
128 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
129 assert coadd_obs is not None
130 assert flags == 0
131 ran_one = True
132 elif case == "bad_psf_img_shape":
133 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
134 mbobs[1][0].psf = ngmix.Observation(
135 image=np.zeros((5, 5)),
136 jacobian=mbobs[1][0].psf.jacobian,
137 )
138 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
139 assert coadd_obs is None
140 assert flags == procflags.INCONSISTENT_BANDS
141 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
142 assert coadd_obs is None
143 assert flags == procflags.INCONSISTENT_BANDS
144 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
145 assert coadd_obs is not None
146 assert flags == 0
147 ran_one = True
148 elif case == "zero_weights":
149 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
150 mbobs[1][0] = ngmix.Observation(
151 image=mbobs[1][0].image,
152 jacobian=mbobs[1][0].jacobian,
153 psf=mbobs[1][0].psf,
154 weight=np.zeros_like(mbobs[1][0].image),
155 ignore_zero_weight=False,
156 )
157 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
158 assert coadd_obs is None
159 assert flags == procflags.ZERO_WEIGHTS
160 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
161 assert coadd_obs is None
162 assert flags == procflags.ZERO_WEIGHTS
163 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
164 assert coadd_obs is not None
165 assert flags == 0
166 ran_one = True
167 elif case == "missing_attrs":
168 for attr in ["mfrac", "noise", "bmask", "ormask"]:
169 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
170 kwargs = {
171 "mfrac": mbobs[1][0].mfrac,
172 "noise": mbobs[1][0].noise,
173 "bmask": mbobs[1][0].bmask,
174 "ormask": mbobs[1][0].ormask,
175 }
176 kwargs.pop(attr)
177 mbobs[1][0] = ngmix.Observation(
178 image=mbobs[1][0].image,
179 jacobian=mbobs[1][0].jacobian,
180 psf=mbobs[1][0].psf,
181 weight=mbobs[1][0].weight,
182 **kwargs,
183 )
184 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
185 assert coadd_obs is None
186 assert flags == procflags.INCONSISTENT_BANDS
187 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
188 assert coadd_obs is None
189 assert flags == procflags.INCONSISTENT_BANDS
190 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
191 assert coadd_obs is not None
192 assert flags == 0
193 ran_one = True
194 elif case == "disjoint_weights":
195 # if two images have disjoint weight maps where no non-zero
196 # areas overlap, then we cannot coadd
197 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
198 wmsk = np.zeros_like(mbobs[1][0].image)
199 wmsk[:10, :] = 1
200 mbobs[0][0] = ngmix.Observation(
201 image=mbobs[0][0].image,
202 jacobian=mbobs[0][0].jacobian,
203 psf=mbobs[0][0].psf,
204 weight=mbobs[0][0].weight * (1.0 - wmsk),
205 mfrac=mbobs[0][0].mfrac,
206 ormask=mbobs[0][0].ormask,
207 bmask=mbobs[0][0].bmask,
208 noise=mbobs[0][0].noise,
209 )
210 mbobs[1][0] = ngmix.Observation(
211 image=mbobs[1][0].image,
212 jacobian=mbobs[1][0].jacobian,
213 psf=mbobs[1][0].psf,
214 weight=mbobs[1][0].weight * wmsk,
215 mfrac=mbobs[1][0].mfrac,
216 ormask=mbobs[1][0].ormask,
217 bmask=mbobs[1][0].bmask,
218 noise=mbobs[1][0].noise,
219 )
220 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
221 assert coadd_obs is None
222 assert flags == procflags.ZERO_WEIGHTS
223 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
224 assert coadd_obs is None
225 assert flags == procflags.ZERO_WEIGHTS
226 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[0, 2, 3])
227 assert coadd_obs is not None
228 assert flags == 0
229 ran_one = True
230 else:
231 assert False, f"case {case} not found!"
233 assert ran_one, "No tests ran!"
236def test_make_coadd_obs_single():
237 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
239 coadd_obs, flags = make_coadd_obs(mbobs[:1])
240 assert coadd_obs is mbobs[0][0]
241 assert flags == 0
243 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=[2])
244 assert coadd_obs is mbobs[2][0]
245 assert flags == 0
248def test_make_coadd_obs_psf_weight():
249 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
251 coadd_obs, flags = make_coadd_obs(mbobs)
252 assert np.allclose(
253 coadd_obs.psf.weight,
254 1.0/(np.sqrt(np.sum(coadd_obs.psf.image**2))/1000.0)**2
255 )
258def test_make_coadd_obs_symmetric():
259 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
261 coadd_obs01, flags = make_coadd_obs(mbobs, shear_bands=[0, 1])
262 assert flags == 0
264 coadd_obs10, flags = make_coadd_obs(mbobs, shear_bands=[1, 0])
265 assert flags == 0
267 assert repr(coadd_obs01.jacobian) == repr(coadd_obs10.jacobian)
268 assert coadd_obs01.meta == mbobs[1][0].meta
269 assert coadd_obs10.meta == mbobs[0][0].meta
270 for attr in [
271 "image", "weight",
272 "bmask", "ormask", "noise", "mfrac",
273 ]:
274 assert np.array_equal(
275 getattr(coadd_obs01, attr),
276 getattr(coadd_obs10, attr),
277 )
279 assert repr(coadd_obs01.psf.jacobian) == repr(coadd_obs10.psf.jacobian)
280 assert coadd_obs01.psf.meta == mbobs[1][0].psf.meta
281 assert coadd_obs10.psf.meta == mbobs[0][0].psf.meta
282 for attr in ["image", "weight"]:
283 assert np.array_equal(
284 getattr(coadd_obs01.psf, attr),
285 getattr(coadd_obs10.psf, attr),
286 )
289def _or_arrays(arrs):
290 res = arrs[0].copy()
291 for arr in arrs[1:]:
292 res |= arr
294 return res
297@pytest.mark.parametrize("shear_bands", [None, [0, 1], [3, 1, 2]])
298def test_make_coadd_obs_shear_bands(shear_bands):
299 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
301 coadd_obs, flags = make_coadd_obs(mbobs, shear_bands=shear_bands)
302 assert flags == 0
304 # make sure different shear bands give different answers
305 coadd_obs_none, flags = make_coadd_obs(mbobs, shear_bands=None)
306 assert flags == 0
308 if shear_bands is not None:
309 assert not np.array_equal(
310 coadd_obs.image,
311 coadd_obs_none.image
312 )
313 else:
314 assert np.array_equal(
315 coadd_obs.image,
316 coadd_obs_none.image
317 )
319 if shear_bands is None:
320 shear_bands = list(range(len(mbobs)))
322 # the coadded image should have higher s/n
323 assert all(
324 coadd_obs.get_s2n()
325 > mbobs[sb][0].get_s2n() for sb in shear_bands
326 )
328 # total flux should be close
329 fluxes = list(
330 mbobs[sb][0].image.sum()
331 for sb in shear_bands
332 )
333 assert np.allclose(
334 coadd_obs.image.sum(),
335 fluxes,
336 atol=0,
337 rtol=0.2,
338 ), (coadd_obs.image.sum(), fluxes)
340 # psf should sum to unity
341 assert np.allclose(coadd_obs.psf.image.sum(), 1)
343 # jacobians and image shapes should be the same
344 assert repr(coadd_obs.jacobian) == repr(mbobs[0][0].jacobian)
345 assert coadd_obs.image.shape == mbobs[0][0].image.shape
346 assert repr(coadd_obs.psf.jacobian) == repr(mbobs[0][0].psf.jacobian)
347 assert coadd_obs.psf.image.shape == mbobs[0][0].psf.image.shape
348 assert coadd_obs.meta == mbobs[shear_bands[-1]][0].meta
350 assert np.array_equal(
351 coadd_obs.bmask,
352 _or_arrays([mbobs[sb][0].bmask for sb in shear_bands]),
353 )
354 assert np.array_equal(
355 coadd_obs.ormask,
356 _or_arrays([mbobs[sb][0].ormask for sb in shear_bands]),
357 )
359 wgts = [np.median(mbobs[sb][0].weight) for sb in shear_bands]
360 wgts = np.array(wgts)
361 wgts /= np.sum(wgts)
362 # weights should be different
363 assert not np.allclose(wgts[0], wgts)
365 # check the actual values in coadded attributes
366 image = np.zeros_like(mbobs[0][0].image)
367 weight = np.zeros_like(mbobs[0][0].image)
368 mfrac = np.zeros_like(mbobs[0][0].image)
369 noise = np.zeros_like(mbobs[0][0].image)
370 psf_image = np.zeros_like(mbobs[0][0].psf.image)
371 for i, sb in enumerate(shear_bands):
372 image += mbobs[sb][0].image * wgts[i]
373 mfrac += mbobs[sb][0].mfrac * wgts[i]
374 noise += mbobs[sb][0].noise * wgts[i]
375 psf_image += mbobs[sb][0].psf.image * wgts[i]
376 weight += wgts[i]**2 / mbobs[sb][0].weight
377 weight = 1.0 / weight
378 assert np.allclose(image, coadd_obs.image)
379 assert np.allclose(weight, coadd_obs.weight)
380 assert np.allclose(mfrac, coadd_obs.mfrac)
381 assert np.allclose(noise, coadd_obs.noise)
382 assert np.allclose(psf_image, coadd_obs.psf.image)
385def test_fit_mbobs_list_joint_errors():
386 mbobs_list = [
387 make_mbobs_sim(45, 3, wcs_var_scale=0),
388 make_mbobs_sim(46, 3, wcs_var_scale=0),
389 ]
390 rng = np.random.RandomState(seed=4235)
391 with pytest.raises(RuntimeError) as err:
392 fit_mbobs_list_joint(
393 mbobs_list=mbobs_list,
394 fitter_name="blah",
395 bmask_flags=0,
396 rng=rng,
397 shear_bands=None,
398 )
399 assert "fitter 'blah'" in str(err.value)
402def test_fit_mbobs_list_joint_empty():
403 mbobs_list = []
404 rng = np.random.RandomState(seed=4235)
405 res = fit_mbobs_list_joint(
406 mbobs_list=mbobs_list,
407 fitter_name="am",
408 bmask_flags=0,
409 rng=rng,
410 shear_bands=None,
411 )
412 assert res is None
415@pytest.mark.parametrize("shear_bands", [None, [0, 1], [2, 3, 1]])
416@pytest.mark.parametrize("fname", ["am", "admom"])
417def test_fit_mbobs_list_joint_fits_all(shear_bands, fname):
418 mbobs_list = [
419 make_mbobs_sim(45, 4, wcs_var_scale=0),
420 make_mbobs_sim(46, 4, wcs_var_scale=0),
421 make_mbobs_sim(47, 4, wcs_var_scale=0),
422 ]
423 rng = np.random.RandomState(seed=4235)
424 res = fit_mbobs_list_joint(
425 mbobs_list=mbobs_list,
426 fitter_name=fname,
427 bmask_flags=0,
428 rng=rng,
429 shear_bands=shear_bands,
430 )
431 assert res.shape == (3,)
433 if shear_bands is None:
434 shear_bands = list(range(4))
435 assert np.all(
436 res["shear_bands"]
437 == "".join(str(sb) for sb in sorted(shear_bands))
438 )
440 rng = np.random.RandomState(seed=4235)
441 for i in range(3):
442 res1 = fit_mbobs_admom(
443 mbobs=mbobs_list[i],
444 bmask_flags=0,
445 rng=rng,
446 shear_bands=shear_bands,
447 )
449 for col in res.dtype.names:
450 np.testing.assert_array_equal(res[i:i+1][col], res1[col])
453@pytest.mark.parametrize("coadd", [True, False])
454@pytest.mark.parametrize("symmetrize", [True, False])
455@pytest.mark.parametrize("shear_bands", [None, [0], [0, 1], [2, 3, 1]])
456@pytest.mark.parametrize("fname", [
457 "am",
458 "admom",
459 pytest.param("gauss", marks=pytest.mark.xfail),
460])
461def test_fit_mbobs_list_joint_seeding(shear_bands, fname, coadd, symmetrize):
462 mbobs_list = [
463 make_mbobs_sim(45, 4, wcs_var_scale=0),
464 make_mbobs_sim(46, 4, wcs_var_scale=0),
465 make_mbobs_sim(47, 4, wcs_var_scale=0),
466 ]
467 rng = np.random.RandomState(seed=4235)
468 res = fit_mbobs_list_joint(
469 mbobs_list=mbobs_list,
470 fitter_name=fname,
471 bmask_flags=0,
472 rng=rng,
473 shear_bands=shear_bands,
474 coadd=coadd,
475 symmetrize=symmetrize,
476 )
478 mbobs_list = [
479 make_mbobs_sim(45, 4, wcs_var_scale=0),
480 make_mbobs_sim(46, 4, wcs_var_scale=0),
481 make_mbobs_sim(47, 4, wcs_var_scale=0),
482 ]
483 rng1 = np.random.RandomState(seed=4235)
484 res1 = fit_mbobs_list_joint(
485 mbobs_list=mbobs_list,
486 fitter_name=fname,
487 bmask_flags=0,
488 rng=rng1,
489 shear_bands=shear_bands,
490 coadd=coadd,
491 symmetrize=symmetrize,
492 )
494 for col in res.dtype.names:
495 np.testing.assert_array_equal(
496 res[col],
497 res1[col],
498 err_msg=col,
499 )
502@pytest.mark.parametrize("case", [
503 "missing_band",
504 "too_many_bands",
505 "zero_weights",
506 "edge_hit",
507 "coadd_flags",
508])
509def test_fit_mbobs_admom_input_errors(case):
510 rng = np.random.RandomState(seed=211324)
511 ran_one = False
513 if case == "missing_band":
514 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
515 mbobs[1] = ngmix.ObsList()
516 res = fit_mbobs_admom(
517 mbobs=mbobs,
518 bmask_flags=0,
519 rng=rng,
520 shear_bands=None,
521 )
522 ran_one = True
523 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.MISSING_BAND), (
524 procflags.get_procflags_str(res["am_flags"][0])
525 )
526 elif case == "too_many_bands":
527 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
528 ol = ngmix.ObsList()
529 ol.append(mbobs[1][0])
530 ol.append(mbobs[1][0])
531 mbobs[1] = ol
532 res = fit_mbobs_admom(
533 mbobs=mbobs,
534 bmask_flags=0,
535 rng=rng,
536 shear_bands=None,
537 )
538 ran_one = True
539 assert res["am_flags"] == (
540 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS
541 ), (
542 procflags.get_procflags_str(res["am_flags"][0])
543 )
544 elif case == "zero_weights":
545 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
546 mbobs[1][0] = ngmix.Observation(
547 image=mbobs[1][0].image,
548 jacobian=mbobs[1][0].jacobian,
549 psf=mbobs[1][0].psf,
550 weight=np.zeros_like(mbobs[1][0].image),
551 ignore_zero_weight=False,
552 bmask=mbobs[1][0].bmask,
553 )
554 res = fit_mbobs_admom(
555 mbobs=mbobs,
556 bmask_flags=0,
557 rng=rng,
558 shear_bands=None,
559 )
560 ran_one = True
561 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.ZERO_WEIGHTS), (
562 procflags.get_procflags_str(res["am_flags"][0])
563 )
564 elif case == "edge_hit":
565 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
566 mbobs[1][0] = ngmix.Observation(
567 image=mbobs[1][0].image,
568 jacobian=mbobs[1][0].jacobian,
569 psf=mbobs[1][0].psf,
570 weight=mbobs[1][0].weight,
571 bmask=mbobs[1][0].bmask + 10,
572 )
573 res = fit_mbobs_admom(
574 mbobs=mbobs,
575 bmask_flags=10,
576 rng=rng,
577 shear_bands=None,
578 )
579 ran_one = True
580 assert res["am_flags"] == (procflags.NO_ATTEMPT | procflags.EDGE_HIT), (
581 procflags.get_procflags_str(res["am_flags"][0])
582 )
583 elif case == "coadd_flags":
584 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
585 mbobs[1][0] = ngmix.Observation(
586 image=mbobs[1][0].image,
587 jacobian=mbobs[1][0].jacobian,
588 psf=mbobs[1][0].psf,
589 weight=mbobs[1][0].weight,
590 bmask=mbobs[1][0].bmask,
591 ormask=mbobs[1][0].ormask,
592 noise=mbobs[1][0].noise,
593 # missing mfrac so coadd fails
594 )
595 res = fit_mbobs_admom(
596 mbobs=mbobs,
597 bmask_flags=0,
598 rng=rng,
599 shear_bands=None,
600 )
601 ran_one = True
602 assert res["am_flags"] == (
603 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS
604 ), (
605 procflags.get_procflags_str(res["am_flags"][0])
606 )
607 else:
608 assert False, f"case {case} not found!"
610 assert ran_one, "No tests ran!"
613@pytest.mark.parametrize("shear_bands", [[0], [2]])
614def test_fit_mbobs_admom_oneband(shear_bands):
615 rng = np.random.RandomState(seed=211324)
616 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
617 res = fit_mbobs_admom(
618 mbobs=mbobs,
619 bmask_flags=0,
620 rng=rng,
621 shear_bands=shear_bands,
622 )
624 # make sure things are seeded and everything is copied out
625 # correctly
626 assert res["am_flags"] == (
627 res["am_psf_flags"] | res["am_obj_flags"]
628 )
630 rng = np.random.RandomState(seed=211324)
631 fitter = get_admom_runner(rng)
632 pres = fitter.go(mbobs[shear_bands[0]][0].psf)
633 np.testing.assert_allclose(
634 res["am_psf_T"][0], pres["T"]
635 )
636 np.testing.assert_allclose(
637 res["am_psf_g"][0], pres["e"]
638 )
640 sobs = symmetrize_obs_weights(mbobs[shear_bands[0]][0])
641 gres = fitter.go(sobs)
642 np.testing.assert_array_equal(res["am_T_flags"][0], gres["T_flags"])
643 np.testing.assert_array_equal(res["am_T"][0], gres["T"])
644 np.testing.assert_array_equal(res["am_T_err"][0], gres["T_err"])
645 np.testing.assert_array_equal(
646 res["am_T_ratio"][0],
647 gres["T"]/pres["T"],
648 )
650 np.testing.assert_array_equal(res["am_obj_flags"][0], gres["flags"])
651 np.testing.assert_array_equal(res["am_s2n"][0], gres["s2n"])
652 np.testing.assert_array_equal(res["am_g"][0], gres["e"])
653 np.testing.assert_array_equal(res["am_g_cov"][0], gres["e_cov"])
656@pytest.mark.parametrize("shear_bands", [[0], [2], None, [1, 3, 2]])
657def test_fit_mbobs_admom_smoke(shear_bands):
658 rng = np.random.RandomState(seed=211324)
659 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
660 res = fit_mbobs_admom(
661 mbobs=mbobs,
662 bmask_flags=0,
663 rng=rng,
664 shear_bands=shear_bands,
665 )
667 # none of these fits should fail
668 if shear_bands is None:
669 shear_bands = list(range(len(mbobs)))
671 for col in res.dtype.names:
672 if col.endswith("band_flux_flags"):
673 assert np.all(res[col] == procflags.NO_ATTEMPT), (col, res[col])
674 elif "band_flux" in col:
675 assert np.all(np.isnan(res[col])), (col, res[col])
676 elif col == "shear_bands":
677 assert np.all(
678 res[col]
679 == "".join(f"{sb}" for sb in sorted(shear_bands))
680 ), (col, res[col])
681 elif not col.endswith("flags"):
682 assert np.all(np.isfinite(res[col])), (col, res[col])
683 else:
684 assert np.all(res[col] == 0), (col, res[col])
687def test_fit_mbobs_admom_symmetrize():
688 rng = np.random.RandomState(seed=211324)
689 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0)
690 wgt = mbobs[0][0].weight.copy()
691 msk = rng.uniform(size=wgt.shape) < 0.2
692 wgt[msk] = 0
693 wgt[:, 21:] = 0
694 mbobs[0][0].weight = wgt
695 assert np.any(mbobs[0][0].weight == 0)
696 res_nosym = fit_mbobs_admom(
697 mbobs=mbobs,
698 bmask_flags=0,
699 rng=rng,
700 symmetrize=False,
701 )
703 rng = np.random.RandomState(seed=211324)
704 mbobs = make_mbobs_sim(45, 1, wcs_var_scale=0)
705 wgt = mbobs[0][0].weight.copy()
706 msk = rng.uniform(size=wgt.shape) < 0.2
707 wgt[msk] = 0
708 wgt[:, 21:] = 0
709 mbobs[0][0].weight = wgt
710 assert np.any(mbobs[0][0].weight == 0)
711 res = fit_mbobs_admom(
712 mbobs=mbobs,
713 bmask_flags=0,
714 rng=rng,
715 symmetrize=True,
716 )
718 shear_bands = list(range(len(mbobs)))
719 for col in res.dtype.names:
720 if col.endswith("band_flux_flags"):
721 assert np.all(res[col] == procflags.NO_ATTEMPT), (col, res[col])
722 assert np.all(res_nosym[col] == procflags.NO_ATTEMPT), (col, res_nosym[col])
723 elif "band_flux" in col:
724 assert np.all(np.isnan(res[col])), (col, res[col])
725 assert np.all(np.isnan(res_nosym[col])), (col, res_nosym[col])
726 elif col == "shear_bands":
727 assert np.all(
728 res[col]
729 == "".join(f"{sb}" for sb in sorted(shear_bands))
730 ), (col, res[col])
731 assert np.all(
732 res_nosym[col]
733 == "".join(f"{sb}" for sb in sorted(shear_bands))
734 ), (col, res_nosym[col])
735 elif not col.endswith("flags"):
736 assert np.all(np.isfinite(res[col])), (col, res[col])
737 assert np.all(np.isfinite(res_nosym[col])), (col, res_nosym[col])
738 if "psf" not in col:
739 assert not np.allclose(res[col], res_nosym[col]), (
740 col, res[col], res_nosym[col]
741 )
742 else:
743 assert np.all(res[col] == 0), (
744 col, res[col], procflags.get_procflags_str(res[col])
745 )
746 assert np.all(res_nosym[col] == 0), (
747 col, res_nosym[col], procflags.get_procflags_str(res_nosym[col])
748 )
751@pytest.mark.parametrize("shear_bands", [[0], [2], None, [1, 3, 2]])
752def test_fit_mbobs_gauss_smoke(shear_bands):
753 rng = np.random.RandomState(seed=211324)
754 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
755 res = fit_mbobs_gauss(
756 mbobs=mbobs,
757 bmask_flags=0,
758 rng=rng,
759 shear_bands=shear_bands,
760 )
762 # none of these fits should fail
763 if shear_bands is None:
764 shear_bands = list(range(len(mbobs)))
766 for col in res.dtype.names:
767 if col.endswith("band_flux_flags"):
768 assert np.all(res[col][0, shear_bands] == 0), (col, res[col])
769 if len(shear_bands) != len(mbobs):
770 tocheck = [i for i in range(len(mbobs)) if i not in shear_bands]
771 assert np.all(
772 res[col][:, tocheck] == procflags.NO_ATTEMPT
773 ), (col, res[col])
775 elif "band_flux" in col:
776 assert np.all(np.isfinite(res[col][0, shear_bands])), (col, res[col])
777 elif col == "shear_bands":
778 assert np.all(
779 res[col]
780 == "".join(f"{sb}" for sb in sorted(shear_bands))
781 ), (col, res[col])
782 elif not col.endswith("flags"):
783 assert np.all(np.isfinite(res[col])), (col, res[col])
784 else:
785 assert np.all(res[col] == 0), (col, res[col])
788def test_fit_mbobs_gauss_coadd():
789 rng = np.random.RandomState(seed=211324)
790 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
791 res = fit_mbobs_gauss(
792 mbobs=mbobs,
793 bmask_flags=0,
794 rng=rng,
795 coadd=False,
796 )
798 rng = np.random.RandomState(seed=211324)
799 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
800 res_coadd = fit_mbobs_gauss(
801 mbobs=mbobs,
802 bmask_flags=0,
803 rng=rng,
804 coadd=True,
805 )
807 shear_bands = list(range(len(mbobs)))
809 for col in res.dtype.names:
810 if col.endswith("band_flux_flags"):
811 assert np.all(res[col][0, shear_bands] == 0), (col, res[col])
812 assert np.all(res_coadd[col][0, shear_bands] == 0), (col, res_coadd[col])
813 if len(shear_bands) != len(mbobs):
814 assert np.any(
815 res[col][0, shear_bands] == procflags.NO_ATTEMPT
816 ), (col, res[col])
818 tocheck = [i for i in range(len(mbobs)) if i not in shear_bands]
819 assert np.all(
820 res_coadd[col][0, tocheck] == procflags.NO_ATTEMPT
821 ), (col, res_coadd[col])
822 elif col.endswith("band_flux"):
823 assert np.all(np.isfinite(res[col][0, shear_bands])), (col, res[col])
824 assert np.all(
825 np.isfinite(res_coadd[col][0, shear_bands])
826 ), (col, res_coadd[col])
827 elif col == "shear_bands":
828 assert np.all(
829 res[col]
830 == "".join(f"{sb}" for sb in sorted(shear_bands))
831 ), (col, res[col])
832 assert np.all(
833 res_coadd[col]
834 == "".join(f"{sb}" for sb in sorted(shear_bands))
835 ), (col, res_coadd[col])
836 elif not col.endswith("flags"):
837 assert np.all(np.isfinite(res[col])), (col, res[col])
838 assert np.all(np.isfinite(res_coadd[col])), (col, res_coadd[col])
839 assert not np.allclose(res[col], res_coadd[col]), (
840 col, res[col], res_coadd[col]
841 )
842 else:
843 assert np.all(res[col] == 0), (col, res[col])
844 assert np.all(res_coadd[col] == 0), (col, res_coadd[col])
847@pytest.mark.parametrize("coadd", [True, False])
848@pytest.mark.parametrize("case", [
849 "missing_band",
850 "too_many_bands",
851 "zero_weights",
852 "edge_hit",
853 "shear_bands",
854 "coadd_flags",
855])
856def test_fit_mbobs_gauss_input_errors(case, coadd):
857 rng = np.random.RandomState(seed=211324)
858 ran_one = False
860 if case == "missing_band":
861 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
862 mbobs[1] = ngmix.ObsList()
863 res = fit_mbobs_gauss(
864 mbobs=mbobs,
865 bmask_flags=0,
866 rng=rng,
867 shear_bands=None,
868 coadd=coadd,
869 )
870 ran_one = True
871 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.MISSING_BAND), (
872 procflags.get_procflags_str(res["gauss_flags"][0])
873 )
874 elif case == "too_many_bands":
875 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
876 ol = ngmix.ObsList()
877 ol.append(mbobs[1][0])
878 ol.append(mbobs[1][0])
879 mbobs[1] = ol
880 res = fit_mbobs_gauss(
881 mbobs=mbobs,
882 bmask_flags=0,
883 rng=rng,
884 shear_bands=None,
885 coadd=coadd,
886 )
887 ran_one = True
888 assert res["gauss_flags"] == (
889 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS
890 ), (
891 procflags.get_procflags_str(res["gauss_flags"][0])
892 )
893 elif case == "zero_weights":
894 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
895 mbobs[1][0] = ngmix.Observation(
896 image=mbobs[1][0].image,
897 jacobian=mbobs[1][0].jacobian,
898 psf=mbobs[1][0].psf,
899 weight=np.zeros_like(mbobs[1][0].image),
900 ignore_zero_weight=False,
901 bmask=mbobs[1][0].bmask,
902 )
903 res = fit_mbobs_gauss(
904 mbobs=mbobs,
905 bmask_flags=0,
906 rng=rng,
907 shear_bands=None,
908 coadd=coadd,
909 )
910 ran_one = True
911 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.ZERO_WEIGHTS), (
912 procflags.get_procflags_str(res["gauss_flags"][0])
913 )
914 elif case == "edge_hit":
915 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
916 mbobs[1][0] = ngmix.Observation(
917 image=mbobs[1][0].image,
918 jacobian=mbobs[1][0].jacobian,
919 psf=mbobs[1][0].psf,
920 weight=mbobs[1][0].weight,
921 bmask=mbobs[1][0].bmask + 10,
922 )
923 res = fit_mbobs_gauss(
924 mbobs=mbobs,
925 bmask_flags=10,
926 rng=rng,
927 shear_bands=None,
928 coadd=coadd,
929 )
930 ran_one = True
931 assert res["gauss_flags"] == (procflags.NO_ATTEMPT | procflags.EDGE_HIT), (
932 procflags.get_procflags_str(res["gauss_flags"][0])
933 )
934 elif case == "shear_bands":
935 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
936 res = fit_mbobs_gauss(
937 mbobs=mbobs,
938 bmask_flags=0,
939 rng=rng,
940 shear_bands=[1, 2, 10],
941 coadd=coadd,
942 )
943 ran_one = True
944 assert res["gauss_flags"] == (
945 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS
946 ), (
947 procflags.get_procflags_str(res["gauss_flags"][0])
948 )
949 elif case == "coadd_flags":
950 mbobs = make_mbobs_sim(45, 4, wcs_var_scale=0)
951 mbobs[1][0] = ngmix.Observation(
952 image=mbobs[1][0].image,
953 jacobian=mbobs[1][0].jacobian,
954 psf=mbobs[1][0].psf,
955 weight=mbobs[1][0].weight,
956 bmask=mbobs[1][0].bmask,
957 ormask=mbobs[1][0].ormask,
958 noise=mbobs[1][0].noise,
959 # missing mfrac so coadd fails
960 )
961 res = fit_mbobs_gauss(
962 mbobs=mbobs,
963 bmask_flags=0,
964 rng=rng,
965 shear_bands=None,
966 coadd=coadd,
967 )
968 ran_one = True
969 if coadd:
970 assert res["gauss_flags"] == (
971 procflags.NO_ATTEMPT | procflags.INCONSISTENT_BANDS
972 ), (
973 procflags.get_procflags_str(res["gauss_flags"][0])
974 )
975 else:
976 assert res["gauss_flags"] == 0, (
977 procflags.get_procflags_str(res["gauss_flags"][0])
978 )
979 else:
980 assert False, f"case {case} not found!"
982 assert ran_one, "No tests ran!"