Coverage for python / lsst / source / injection / inject_base.py: 17%

214 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:57 +0000

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["_ALLOWED_SOURCE_TYPES", "BaseInjectConnections", "BaseInjectConfig", "BaseInjectTask"] 

25 

26from typing import cast 

27 

28import galsim 

29import numpy as np 

30import numpy.ma as ma 

31from astropy import units 

32from astropy.table import Table, hstack, vstack 

33from astropy.units import Quantity, UnitConversionError 

34 

35from lsst.afw.image.exposure.exposureUtils import bbox_contains_sky_coords 

36from lsst.geom import Point2D 

37from lsst.pex.config import ChoiceField, Field, ListField 

38from lsst.pipe.base import PipelineTask, PipelineTaskConfig, PipelineTaskConnections, Struct 

39from lsst.pipe.base.connectionTypes import PrerequisiteInput 

40 

41from .inject_engine import generate_galsim_objects, inject_galsim_objects_into_exposure 

42 

43_ALLOWED_SOURCE_TYPES = [ 

44 "Gaussian", 

45 "Box", 

46 "TopHat", 

47 "DeltaFunction", 

48 "Airy", 

49 "Moffat", 

50 "Kolmogorov", 

51 "VonKarman", 

52 "Exponential", 

53 "DeVaucouleurs", 

54 "Sersic", 

55 "InclinedExponential", 

56 "InclinedSersic", 

57 "Spergel", 

58 "RandomKnots", 

59 "Star", 

60 "Trail", 

61 "Stamp", 

62] 

63 

64 

65class BaseInjectConnections( 

66 PipelineTaskConnections, 

67 dimensions=("instrument",), 

68 defaultTemplates={ 

69 "injection_prefix": "injection_", 

70 "injected_prefix": "injected_", 

71 }, 

72): 

73 """Base connections for source injection tasks.""" 

74 

75 injection_catalogs = PrerequisiteInput( 

76 doc="Set of catalogs of sources to draw inputs from.", 

77 name="{injection_prefix}catalog", 

78 dimensions=("htm7", "band"), 

79 storageClass="ArrowAstropy", 

80 minimum=0, 

81 multiple=True, 

82 ) 

83 

84 

85class BaseInjectConfig(PipelineTaskConfig, pipelineConnections=BaseInjectConnections): 

86 """Base configuration for source injection tasks.""" 

87 

88 # Catalog manipulation options. 

89 process_all_data_ids = Field[bool]( 

90 doc="If True, all input data IDs will be processed, even those where no synthetic sources were " 

91 "identified for injection. In such an eventuality this returns a clone of the input image, renamed " 

92 "to the *output_exposure* connection name and with an empty *mask_plane_name* mask plane attached.", 

93 default=False, 

94 ) 

95 trim_padding = Field[int]( 

96 doc="Size of the pixel padding surrounding the image. Only those synthetic sources with a centroid " 

97 "falling within the ``image + trim_padding`` region will be considered for source injection.", 

98 default=100, 

99 optional=True, 

100 ) 

101 selection = Field[str]( 

102 doc="A string that can be evaluated as a boolean expression to select rows in the input injection " 

103 "catalog. To make use of this configuration option, the internal object name ``injection_catalog`` " 

104 "must be used. For example, to select all sources with a magnitude in the range 20.0 < mag < 25.0, " 

105 "set ``selection=\"(injection_catalog['mag'] > 20.0) & (injection_catalog['mag'] < 25.0)\"``. " 

106 "The ``{visit}`` field will be substituted for the current visit ID of the exposure being processed. " 

107 "For example, to select only visits that match a user-supplied visit column in the input injection " 

108 "catalog, set ``selection=\"np.isin(injection_catalog['visit'], {visit})\"``.", 

109 optional=True, 

110 ) 

111 

112 # General configuration options. 

113 mask_plane_name = Field[str]( 

114 doc="Name assigned to the injected mask plane which is attached to the output exposure.", 

115 default="INJECTED", 

116 ) 

117 calib_flux_radius = Field[float]( 

118 doc="Aperture radius (in pixels) that was used to define the calibration for this image+catalog. " 

119 "This will be used to produce the correct instrumental fluxes within the radius. " 

120 "This value should match that of the field defined in ``slot_CalibFlux_instFlux``.", 

121 default=12.0, 

122 ) 

123 fits_alignment = ChoiceField[str]( # type: ignore 

124 doc="How should injections from FITS files be aligned?", 

125 dtype=str, 

126 allowed={ 

127 "wcs": ( 

128 "Input image will be transformed such that the local WCS in the FITS header matches the " 

129 "local WCS in the target image. I.e., North, East, and angular distances in the input image " 

130 "will match North, East, and angular distances in the target image." 

131 ), 

132 "pixel": ( 

133 "Input image will **not** be transformed. Up, right, and pixel distances in the input image " 

134 "will match up, right and pixel distances in the target image." 

135 ), 

136 }, 

137 default="pixel", 

138 ) 

139 stamp_prefix = Field[str]( 

140 doc="String to prefix to the entries in the *col_stamp* column, for example, a directory path.", 

141 default="", 

142 ) 

143 inject_variance = Field[bool]( 

144 doc="Whether, when injecting flux into the image plane, to inject a corresponding amount of variance " 

145 "into the variance plane.", 

146 default=True, 

147 ) 

148 add_noise = Field[bool]( 

149 doc="Whether to randomly vary the injected flux in each pixel by an amount consistent with " 

150 "the injected variance.", 

151 default=True, 

152 ) 

153 noise_seed = Field[int]( 

154 doc="Initial seed for random noise generation. This value increments by 1 for each injected " 

155 "object, so each object has an independent noise realization.", 

156 default=0, 

157 ) 

158 bad_mask_names = ListField[str]( 

159 doc="List of mask plane names indicating pixels to ignore when fitting flux vs variance in " 

160 "preparation for variance plane modification.", 

161 default=["BAD", "CR", "CROSSTALK", "INTRP", "NO_DATA", "SAT", "SUSPECT", "UNMASKEDNAN"], 

162 ) 

163 

164 # Custom column names. 

165 col_ra = Field[str]( 

166 doc="Column name for right ascension (in degrees).", 

167 default="ra", 

168 ) 

169 col_dec = Field[str]( 

170 doc="Column name for declination (in degrees).", 

171 default="dec", 

172 ) 

173 col_source_type = Field[str]( 

174 doc="Column name for the source type used in the input catalog. Must match one of the surface " 

175 "brightness profiles defined by GalSim.", 

176 default="source_type", 

177 ) 

178 col_mag = Field[str]( 

179 doc="Column name for magnitude.", 

180 default="mag", 

181 ) 

182 col_stamp = Field[str]( 

183 doc="Column name to identify FITS file postage stamps for direct injection. The strings in this " 

184 "column will be prefixed with a string given in *stamp_prefix*, to assist in providing the full " 

185 "path to a FITS file.", 

186 default="stamp", 

187 ) 

188 col_draw_size = Field[str]( 

189 doc="Column name providing pixel size of the region into which the source profile will be drawn. If " 

190 "this column is not provided as an input, the GalSim method ``getGoodImageSize`` will be used " 

191 "instead.", 

192 default="draw_size", 

193 ) 

194 col_trail_length = Field[str]( 

195 doc="Column name for specifying a satellite trail length (in pixels).", 

196 default="trail_length", 

197 ) 

198 

199 def setDefaults(self): 

200 super().setDefaults() 

201 

202 

203class BaseInjectTask(PipelineTask): 

204 """Base class for injecting sources into images.""" 

205 

206 _DefaultName = "baseInjectTask" 

207 ConfigClass = BaseInjectConfig 

208 

209 def run(self, injection_catalogs, input_exposure, psf, photo_calib, wcs): 

210 """Inject sources into an image. 

211 

212 Parameters 

213 ---------- 

214 injection_catalogs : `list` [`astropy.table.Table`] 

215 Tract level injection catalogs that potentially cover the named 

216 input exposure. 

217 input_exposure : `lsst.afw.image.ExposureF` 

218 The exposure sources will be injected into. 

219 psf: `lsst.meas.algorithms.ImagePsf` 

220 PSF model. 

221 photo_calib : `lsst.afw.image.PhotoCalib` 

222 Photometric calibration used to calibrate injected sources. 

223 wcs : `lsst.afw.geom.SkyWcs` 

224 WCS used to calibrate injected sources. 

225 

226 Returns 

227 ------- 

228 output_struct : `lsst.pipe.base.Struct` 

229 contains : output_exposure : `lsst.afw.image.ExposureF` 

230 output_catalog : `lsst.afw.table.SourceCatalog` 

231 """ 

232 self.config = cast(BaseInjectConfig, self.config) 

233 

234 # Attach potential externally calibrated datasets to input_exposure. 

235 # Keep originals so we can reset at the end. 

236 original_psf = input_exposure.getPsf() 

237 original_photo_calib = input_exposure.getPhotoCalib() 

238 original_wcs = input_exposure.getWcs() 

239 input_exposure.setPsf(psf) 

240 input_exposure.setPhotoCalib(photo_calib) 

241 input_exposure.setWcs(wcs) 

242 

243 # Make empty table if none supplied to support process_all_data_ids. 

244 if len(injection_catalogs) == 0: 

245 if self.config.process_all_data_ids: 

246 injection_catalogs = [Table(names=["ra", "dec", "source_type"])] 

247 else: 

248 corners = [Point2D(x) for x in input_exposure.getBBox().getCorners()] 

249 coords = wcs.pixelToSky(corners) 

250 ras = [x.getRa().asDegrees() for x in coords] 

251 decs = [x.getDec().asDegrees() for x in coords] 

252 raise RuntimeError( 

253 "No injection sources overlap the region bounded by " 

254 f"{np.min(ras):.2f} <= RA < {np.max(ras):.2f}, " 

255 f"{np.min(decs):.2f} <= Dec < {np.max(decs):.2f} (degrees). " 

256 "Check injection catalog coverage." 

257 ) 

258 

259 # Consolidate injection catalogs and compose main injection catalog. 

260 injection_catalog = self._compose_injection_catalog(injection_catalogs) 

261 

262 # Mapping between standard column names and configured names/units. 

263 column_mapping = { 

264 "ra": (self.config.col_ra, units.deg), 

265 "dec": (self.config.col_dec, units.deg), 

266 "source_type": (self.config.col_source_type, None), 

267 "mag": (self.config.col_mag, units.mag), 

268 "stamp": (self.config.col_stamp, None), 

269 "draw_size": (self.config.col_draw_size, units.pix), 

270 "trail_length": (self.config.col_trail_length, units.pix), 

271 } 

272 

273 # Standardize injection catalog column names and units. 

274 injection_catalog = self._standardize_columns( 

275 injection_catalog, 

276 column_mapping, 

277 input_exposure.getWcs().getPixelScale(input_exposure.getBBox().getCenter()).asArcseconds(), 

278 ) 

279 

280 # Clean the injection catalog of sources which are not injectable. 

281 injection_catalog = self._clean_sources(injection_catalog, input_exposure) 

282 

283 # Injection binary flag lookup dictionary. 

284 binary_flags = { 

285 "MAG_BAD": 0, 

286 "TYPE_UNKNOWN": 1, 

287 "SERSIC_EXTREME": 2, 

288 "NO_OVERLAP": 3, 

289 "FFT_SIZE_ERROR": 4, 

290 "PSF_COMPUTE_ERROR": 5, 

291 } 

292 

293 # Check that sources in the injection catalog are able to be injected. 

294 injection_catalog = self._check_sources(injection_catalog, binary_flags) 

295 

296 # Inject sources into input_exposure. 

297 good_injections: list[bool] = injection_catalog["injection_flag"] == 0 

298 good_injections_index = [i for i, val in enumerate(good_injections) if val] 

299 num_injection_sources = np.sum(good_injections) 

300 if num_injection_sources > 0: 

301 object_generator = generate_galsim_objects( 

302 injection_catalog=injection_catalog[good_injections], 

303 photo_calib=photo_calib, 

304 wcs=wcs, 

305 fits_alignment=self.config.fits_alignment, 

306 stamp_prefix=self.config.stamp_prefix, 

307 logger=self.log, 

308 ) 

309 ( 

310 draw_sizes, 

311 common_bounds, 

312 fft_size_errors, 

313 psf_compute_errors, 

314 ) = inject_galsim_objects_into_exposure( 

315 input_exposure, 

316 object_generator, 

317 mask_plane_name=self.config.mask_plane_name, 

318 calib_flux_radius=self.config.calib_flux_radius, 

319 draw_size_max=10000, # TODO: replace draw_size logic with GS logic. 

320 inject_variance=self.config.inject_variance, 

321 add_noise=self.config.add_noise, 

322 noise_seed=self.config.noise_seed, 

323 bad_mask_names=list(self.config.bad_mask_names), 

324 logger=self.log, 

325 ) 

326 # Add inject_galsim_objects_into_exposure outputs into output cat. 

327 common_areas = [x.area() if x is not None else None for x in common_bounds] 

328 for i, (draw_size, common_area, fft_size_error, psf_compute_error) in enumerate( 

329 zip(draw_sizes, common_areas, fft_size_errors, psf_compute_errors) 

330 ): 

331 injection_catalog["injection_draw_size"][good_injections_index[i]] = draw_size 

332 if common_area == 0: 

333 injection_catalog["injection_flag"][good_injections_index[i]] += ( 

334 2 ** binary_flags["NO_OVERLAP"] 

335 ) 

336 if fft_size_error: 

337 injection_catalog["injection_flag"][good_injections_index[i]] += ( 

338 2 ** binary_flags["FFT_SIZE_ERROR"] 

339 ) 

340 if psf_compute_error: 

341 injection_catalog["injection_flag"][good_injections_index[i]] += ( 

342 2 ** binary_flags["PSF_COMPUTE_ERROR"] 

343 ) 

344 num_injected_sources = np.sum(injection_catalog["injection_flag"] == 0) 

345 num_skipped_sources = np.sum(injection_catalog["injection_flag"] != 0) 

346 grammar1 = "source" if num_injection_sources == 1 else "sources" 

347 grammar2 = "source" if num_skipped_sources == 1 else "sources" 

348 

349 injection_flags = np.array(injection_catalog["injection_flag"]) 

350 num_injection_flags = [np.sum((injection_flags & 2**x) > 0) for x in binary_flags.values()] 

351 if np.sum(num_injection_flags) > 0: 

352 injection_flag_report = ": " + ", ".join( 

353 [f"{x}({y})" for x, y in zip(binary_flags.keys(), num_injection_flags) if y > 0] 

354 ) 

355 else: 

356 injection_flag_report = "" 

357 self.log.info( 

358 "Injected %d of %d potential %s. %d %s flagged and skipped%s.", 

359 num_injected_sources, 

360 num_injection_sources, 

361 grammar1, 

362 num_skipped_sources, 

363 grammar2, 

364 injection_flag_report, 

365 ) 

366 elif num_injection_sources == 0 and self.config.process_all_data_ids: 

367 self.log.warning("No sources to be injected for this DatasetRef; processing anyway.") 

368 input_exposure.mask.addMaskPlane(self.config.mask_plane_name) 

369 mask_plane_core_name = self.config.mask_plane_name + "_CORE" 

370 input_exposure.mask.addMaskPlane(mask_plane_core_name) 

371 self.log.info( 

372 "Adding %s and %s mask planes to the exposure.", 

373 self.config.mask_plane_name, 

374 mask_plane_core_name, 

375 ) 

376 else: 

377 raise RuntimeError( 

378 "No sources to be injected for this DatasetRef, and process_all_data_ids is False." 

379 ) 

380 

381 # Restore original input_exposure calibrated data. 

382 input_exposure.setPsf(original_psf) 

383 input_exposure.setPhotoCalib(original_photo_calib) 

384 input_exposure.setWcs(original_wcs) 

385 

386 # Add injection provenance and injection flags metadata. 

387 metadata = input_exposure.getMetadata() 

388 input_dataset_type = self.config.connections.input_exposure.format(**self.config.connections.toDict()) 

389 metadata.set("INJECTED", input_dataset_type, "Initial source injection dataset type") 

390 for flag, value in sorted(binary_flags.items(), key=lambda item: item[1]): 

391 injection_catalog.meta[flag] = value 

392 

393 output_struct = Struct(output_exposure=input_exposure, output_catalog=injection_catalog) 

394 return output_struct 

395 

396 def _compose_injection_catalog(self, injection_catalogs): 

397 """Consolidate injection catalogs and compose main injection catalog. 

398 

399 If multiple injection catalogs are input, all catalogs are 

400 concatenated together. 

401 

402 A running injection_id, specific to this dataset ref, is assigned to 

403 each source in the output injection catalog if not provided. 

404 

405 Parameters 

406 ---------- 

407 injection_catalogs : `list` [`astropy.table.Table`] 

408 Set of synthetic source catalogs to concatenate. 

409 

410 Returns 

411 ------- 

412 injection_catalog : `astropy.table.Table` 

413 Catalog of sources to be injected. 

414 """ 

415 self.config = cast(BaseInjectConfig, self.config) 

416 

417 # Generate injection IDs (if not provided) and injection flag column. 

418 injection_data = vstack(injection_catalogs, metadata_conflicts="silent") 

419 if "injection_id" in injection_data.columns: 

420 injection_id = injection_data["injection_id"] 

421 injection_data.remove_column("injection_id") 

422 else: 

423 injection_id = range(len(injection_data)) 

424 injection_header = Table( 

425 { 

426 "injection_id": injection_id, 

427 "injection_flag": np.zeros(len(injection_data), dtype=int), 

428 "injection_draw_size": np.zeros(len(injection_data), dtype=int), 

429 } 

430 ) 

431 

432 # Construct final injection catalog. 

433 injection_catalog = hstack([injection_header, injection_data]) 

434 injection_catalog["source_type"] = injection_catalog["source_type"].astype(str) 

435 

436 # Log and return. 

437 num_injection_catalogs = np.sum([len(table) > 0 for table in injection_catalogs]) 

438 grammar1 = "source" if len(injection_catalog) == 1 else "sources" 

439 grammar2 = "trixel" if num_injection_catalogs == 1 else "trixels" 

440 self.log.info( 

441 "Retrieved %d injection %s from %d HTM %s.", 

442 len(injection_catalog), 

443 grammar1, 

444 num_injection_catalogs, 

445 grammar2, 

446 ) 

447 return injection_catalog 

448 

449 def _standardize_columns(self, injection_catalog, column_mapping, pixel_scale): 

450 """Standardize injection catalog column names and units. 

451 

452 Use config variables to standardize the expected columns and column 

453 names in the input injection catalog. This method replaces all core 

454 column names in the config with hard-coded internal names. 

455 

456 Only a core group of column names are standardized; additional column 

457 names will not be modified. If certain parameters are needed (i.e., 

458 by GalSim), these columns must be given exactly as required in the 

459 appropriate units. Refer to the configuration documentation for more 

460 details. 

461 

462 Parameters 

463 ---------- 

464 injection_catalog : `astropy.table.Table` 

465 A catalog of sources to be injected. 

466 column_mapping : `dict` [`str`, `tuple` [`str`, `astropy.units.Unit`]] 

467 A dictionary mapping standard column names to the configured column 

468 names and units. 

469 pixel_scale : `float` 

470 Pixel scale of the exposure in arcseconds per pixel. 

471 

472 Returns 

473 ------- 

474 injection_catalog : `astropy.table.Table` 

475 The standardized catalog of sources to be injected. 

476 """ 

477 self.config = cast(BaseInjectConfig, self.config) 

478 

479 pixel_scale_equivalency = units.pixel_scale( 

480 Quantity(pixel_scale, units.arcsec / units.pix) # type: ignore 

481 ) 

482 for standard_col, (configured_col, unit) in column_mapping.items(): 

483 # Rename columns if necessary. 

484 if configured_col in injection_catalog.colnames: 

485 injection_catalog.rename_column(configured_col, standard_col) 

486 # Attempt to convert to our desired units, then remove units. 

487 if standard_col in injection_catalog.columns and unit: 

488 try: 

489 injection_catalog[standard_col] = ( 

490 injection_catalog[standard_col].to(unit, pixel_scale_equivalency).value 

491 ) 

492 except UnitConversionError: 

493 pass 

494 return Table(injection_catalog) 

495 

496 def _clean_sources(self, injection_catalog, input_exposure): 

497 """Clean the injection catalog of sources which are not injectable. 

498 

499 This method will remove sources which are not injectable for a variety 

500 of reasons, namely: sources which fall outside the padded exposure 

501 bounding box or sources not selected by virtue of their evaluated 

502 selection criteria. 

503 

504 If the input injection catalog contains x/y inputs but does not contain 

505 RA/Dec inputs, WCS information will be used to generate RA/Dec sky 

506 coordinate information and appended to the injection catalog. 

507 

508 Parameters 

509 ---------- 

510 injection_catalog : `astropy.table.Table` 

511 The catalog of sources to be injected. 

512 input_exposure : `lsst.afw.image.ExposureF` 

513 The exposure to inject sources into. 

514 

515 Returns 

516 ------- 

517 injection_catalog : `astropy.table.Table` 

518 Updated injection catalog containing *x* and *y* pixel coordinates, 

519 and cleaned to only include injection sources which fall within the 

520 bounding box of the input exposure dilated by *trim_padding*. 

521 """ 

522 self.config = cast(BaseInjectConfig, self.config) 

523 

524 # Exit early if there are no sources to inject. 

525 if len(injection_catalog) == 0: 

526 self.log.info("Catalog cleaning not applied to empty injection catalog.") 

527 return injection_catalog 

528 

529 sources_to_keep = np.ones(len(injection_catalog), dtype=bool) 

530 

531 # Determine centroids and remove sources outside the padded bbox. 

532 wcs = input_exposure.getWcs() 

533 has_sky = {"ra", "dec"} <= set(injection_catalog.columns) 

534 has_pixel = {"x", "y"} <= set(injection_catalog.columns) 

535 # Input catalog must contain either RA/Dec OR x/y. 

536 # If only x/y given, RA/Dec will be calculated. 

537 if not has_sky and has_pixel: 

538 begin_x, begin_y = input_exposure.getBBox().getBegin() 

539 ras, decs = wcs.pixelToSkyArray( 

540 begin_x + injection_catalog["x"].astype(float), 

541 begin_y + injection_catalog["y"].astype(float), 

542 degrees=True, 

543 ) 

544 injection_catalog["ra"] = ras 

545 injection_catalog["dec"] = decs 

546 injection_catalog["x"] += begin_x 

547 injection_catalog["y"] += begin_y 

548 has_sky = True 

549 elif not has_sky and not has_pixel: 

550 self.log.warning("No spatial coordinates found in injection catalog; cannot inject any sources!") 

551 if has_sky: 

552 bbox = input_exposure.getBBox() 

553 if self.config.trim_padding: 

554 bbox.grow(int(self.config.trim_padding)) 

555 is_contained = bbox_contains_sky_coords( 

556 bbox, wcs, injection_catalog["ra"] * units.deg, injection_catalog["dec"] * units.deg 

557 ) 

558 sources_to_keep &= is_contained 

559 if (num_not_contained := np.sum(~is_contained)) > 0: 

560 grammar = ("source", "a centroid") if num_not_contained == 1 else ("sources", "centroids") 

561 self.log.info( 

562 "Identified %d injection %s with %s outside the padded image bounding box.", 

563 num_not_contained, 

564 grammar[0], 

565 grammar[1], 

566 ) 

567 

568 # Remove sources by boolean selection flag. 

569 if self.config.selection: 

570 visit = input_exposure.getInfo().getVisitInfo().getId() 

571 selected = eval(self.config.selection.format(visit=visit)) 

572 sources_to_keep &= selected 

573 if (num_not_selected := np.sum(~selected)) >= 0: 

574 grammar = ["source", "was"] if num_not_selected == 1 else ["sources", "were"] 

575 self.log.warning( 

576 "Identified %d injection %s that %s not selected.", 

577 num_not_selected, 

578 grammar[0], 

579 grammar[1], 

580 ) 

581 

582 # Print final cleaning report and return. 

583 num_cleaned_total = np.sum(~sources_to_keep) 

584 grammar = "source" if len(sources_to_keep) == 1 else "sources" 

585 self.log.info( 

586 "Catalog cleaning removed %d of %d %s; %d remaining for catalog checking.", 

587 num_cleaned_total, 

588 len(sources_to_keep), 

589 grammar, 

590 np.sum(sources_to_keep), 

591 ) 

592 injection_catalog = injection_catalog[sources_to_keep] 

593 return injection_catalog 

594 

595 def _check_sources(self, injection_catalog, binary_flags): 

596 """Check that sources in the injection catalog are able to be injected. 

597 

598 This method will check that sources in the injection catalog are able 

599 to be injected, and will flag them if not. Checks will be made on a 

600 number of parameters, including magnitude, source type and Sérsic index 

601 (where relevant). 

602 

603 Legacy profile types will be renamed to their standardized GalSim 

604 equivalents; any source profile types that are not GalSim classes will 

605 be flagged. 

606 

607 Note: Unlike the cleaning method, no sources are actually removed here. 

608 Instead, a binary flag is set in the *injection_flag* column for each 

609 source. Only unflagged sources will be generated for source injection. 

610 

611 Parameters 

612 ---------- 

613 injection_catalog : `astropy.table.Table` 

614 Catalog of sources to be injected. 

615 binary_flags : `dict` [`str`, `int`] 

616 Dictionary of binary flags to be used in the injection_flag column. 

617 

618 Returns 

619 ------- 

620 injection_catalog : `astropy.table.Table` 

621 The cleaned catalog of sources to be injected. 

622 """ 

623 self.config = cast(BaseInjectConfig, self.config) 

624 

625 # Exit early if there are no sources to inject. 

626 if len(injection_catalog) == 0: 

627 self.log.info("Catalog checking not applied to empty injection catalog.") 

628 return injection_catalog 

629 

630 # Flag erroneous magnitude values (missing mag data or NaN mag values). 

631 if "mag" not in injection_catalog.columns: 

632 # Check injection_catalog has a mag column. 

633 self.log.warning("No magnitude data found in injection catalog; cannot inject any sources!") 

634 injection_catalog["injection_flag"] += 2 ** binary_flags["MAG_BAD"] 

635 else: 

636 # Check that all input mag values are finite. 

637 mag_array = np.isfinite(ma.array(injection_catalog["mag"])) 

638 bad_mag = ~(mag_array.data * ~mag_array.mask) 

639 if (num_bad_mag := np.sum(bad_mag)) > 0: 

640 grammar = "source" if num_bad_mag == 1 else "sources" 

641 self.log.warning( 

642 "Flagging %d injection %s that do not have a finite magnitude.", num_bad_mag, grammar 

643 ) 

644 injection_catalog["injection_flag"][bad_mag] += 2 ** binary_flags["MAG_BAD"] 

645 

646 # Replace legacy source types with standardized profile names. 

647 injection_catalog["source_type"] = injection_catalog["source_type"].astype("O") 

648 replace_dict = {"Star": "DeltaFunction"} 

649 for legacy_type, standard_type in replace_dict.items(): 

650 legacy_matches = injection_catalog["source_type"] == legacy_type 

651 if np.any(legacy_matches): 

652 injection_catalog["source_type"][legacy_matches] = standard_type 

653 injection_catalog["source_type"] = injection_catalog["source_type"].astype(str) 

654 

655 # Flag source types not supported by GalSim. 

656 input_source_types = set(injection_catalog["source_type"]) 

657 for input_source_type in input_source_types: 

658 if input_source_type not in _ALLOWED_SOURCE_TYPES: 

659 unknown_source_types = injection_catalog["source_type"] == input_source_type 

660 grammar = "source" if np.sum(unknown_source_types) == 1 else "sources" 

661 self.log.warning( 

662 "Flagging %d injection %s with an unsupported source type: %s.", 

663 np.sum(unknown_source_types), 

664 grammar, 

665 input_source_type, 

666 ) 

667 injection_catalog["injection_flag"][unknown_source_types] += 2 ** binary_flags["TYPE_UNKNOWN"] 

668 

669 # Flag extreme Sersic index sources. 

670 if "n" in injection_catalog.columns: 

671 min_n = galsim.Sersic._minimum_n 

672 max_n = galsim.Sersic._maximum_n 

673 n_vals = injection_catalog["n"] 

674 extreme_sersics = (n_vals <= min_n) | (n_vals >= max_n) 

675 if (num_extreme_sersics := np.sum(extreme_sersics)) > 0: 

676 grammar = "source" if num_extreme_sersics == 1 else "sources" 

677 self.log.warning( 

678 "Flagging %d injection %s with a Sersic index outside the range %.1f <= n <= %.1f.", 

679 num_extreme_sersics, 

680 grammar, 

681 min_n, 

682 max_n, 

683 ) 

684 injection_catalog["injection_flag"][extreme_sersics] += 2 ** binary_flags["SERSIC_EXTREME"] 

685 

686 # Print final cleaning report. 

687 num_flagged_total = np.sum(injection_catalog["injection_flag"] != 0) 

688 grammar = "source" if len(injection_catalog) == 1 else "sources" 

689 self.log.info( 

690 "Catalog checking flagged %d of %d %s; %d remaining for source generation.", 

691 num_flagged_total, 

692 len(injection_catalog), 

693 grammar, 

694 np.sum(injection_catalog["injection_flag"] == 0), 

695 ) 

696 return injection_catalog