Coverage for python / lsst / source / injection / utils / _generate_injection_catalog.py: 14%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:57 +0000

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["generate_injection_catalog"] 

25 

26import hashlib 

27import itertools 

28import logging 

29from collections.abc import Sequence 

30from typing import Any 

31 

32import numpy as np 

33from astropy.table import Table, hstack 

34from scipy.stats import qmc 

35 

36from lsst.afw.geom import SkyWcs 

37 

38 

39def generate_injection_catalog( 

40 ra_lim: Sequence[float], 

41 dec_lim: Sequence[float], 

42 mag_lim: Sequence[float] | None = None, 

43 wcs: SkyWcs = None, 

44 number: int = 1, 

45 density: int | None = None, 

46 seed: Any = None, 

47 log_level: int = logging.INFO, 

48 **kwargs: Any, 

49) -> Table: 

50 """Generate a synthetic source injection catalog. 

51 

52 This function generates a synthetic source injection catalog from user 

53 supplied input parameters. The catalog is returned as an astropy Table. 

54 

55 On-sky source positions are generated using the quasi-random Halton 

56 sequence. Optional magnitudes may also be generated using the same 

57 sequence. By default, the Halton sequence is seeded using the product of 

58 the right ascension and declination limit ranges. This ensures that the 

59 same sequence is always generated for the same limits. This seed may be 

60 overridden by specifying the ``seed`` parameter. 

61 

62 A unique injection ID is generated for each source. The injection ID 

63 encodes two pieces of information: the unique source identification number 

64 and the version number of the source as specified by the ``number`` 

65 parameter. To achieve this, the unique source ID number is multiplied by 

66 `10**n` such that the sum of the multiplied source ID number with the 

67 unique repeated version number will always be unique. For example, an 

68 injection catalog with `number = 3` versions of each source will have 

69 injection IDs: 0, 1, 2, 10, 11, 12, 20, 21, 22, etc. If `number = 20`, then 

70 the injection IDs will be: 0, 1, 2, ..., 17, 18, 19, 100, 101, 102, etc. 

71 If `number = 1` (default) then the injection ID will be a simple sequential 

72 list of integers. 

73 

74 Parameters 

75 ---------- 

76 ra_lim : `Sequence` [`float`] 

77 The right ascension limits of the catalog in degrees. 

78 dec_lim : `Sequence` [`float`] 

79 The declination limits of the catalog in degrees. 

80 mag_lim : `Sequence` [`float`], optional 

81 The magnitude limits of the catalog in magnitudes. 

82 wcs : `lsst.afw.geom.SkyWcs`, optional 

83 The WCS associated with these data. If not given or ``None`` (default), 

84 the catalog will be generated using Cartesian geometry. 

85 number : `int`, optional 

86 The number of times to generate each unique combination of input 

87 parameters. The default is 1 (i.e., no repeats). This will be ignored 

88 if ``density`` is specified. 

89 density : `int` | None, optional 

90 The desired source density in sources per square degree. If given, the 

91 ``number`` parameter will be ignored. Instead, the number of unique 

92 parameter combination generations will be calculated to achieve the 

93 desired density. The default is `None` (i.e., no density calculation). 

94 seed : `Any`, optional 

95 The seed to use for the Halton sequence. If not given or ``None`` 

96 (default), the seed will be set using the product of the right 

97 ascension and declination limit ranges. 

98 log_level : `int`, optional 

99 The log level to use for logging. 

100 **kwargs : `Any` 

101 The input parameters used to generate the catalog. Each parameter key 

102 will be used as a column name in the catalog. The values are the unique 

103 values for that parameter. The output catalog will contain a row for 

104 each unique combination of input parameters and be generated the number 

105 of times specified by ``number``. 

106 

107 Returns 

108 ------- 

109 table : `astropy.table.Table` 

110 The fully populated synthetic source injection catalog. The catalog 

111 will contain an automatically generated ``injection_id`` column that 

112 is unique for each source. The injection ID encodes two pieces of 

113 information: the unique source identification number and the repeated 

114 version number of the source as defined by the ``number`` parameter. 

115 """ 

116 # Instantiate logger. 

117 logger = logging.getLogger(__name__) 

118 logger.setLevel(log_level) 

119 

120 # Parse optional keyword input parameters. 

121 values: list[Any] = [np.atleast_1d(x) for x in kwargs.values()] 

122 

123 # Determine the BBox limits and pixel scale. 

124 if wcs: 

125 sky_corners = list(itertools.product(ra_lim, dec_lim)) 

126 ra_corners, dec_corners = np.array(sky_corners).T 

127 x_corners, y_corners = wcs.skyToPixelArray(ra_corners, dec_corners, degrees=True) 

128 xlim: Any = np.percentile(x_corners, [0, 100]) 

129 ylim: Any = np.percentile(y_corners, [0, 100]) 

130 else: 

131 xlim, ylim = ra_lim, dec_lim 

132 

133 # Automatically calculate the number of generations if density is given. 

134 if density: 

135 dec_lim_rad = np.deg2rad(dec_lim) 

136 area = ((180 / np.pi) * np.diff(ra_lim) * (np.sin(dec_lim_rad[1]) - np.sin(dec_lim_rad[0])))[0] 

137 rows = list(itertools.product(*values)) 

138 native_density = len(rows) / area 

139 number = np.round(density / native_density).astype(int) 

140 if number > 0: 

141 logger.info( 

142 "Setting number of generations to %s, equivalent to %.1f sources per square degree.", 

143 number, 

144 number * native_density, 

145 ) 

146 else: 

147 logger.warning("Requested source density would require number < 1; setting number = 1.") 

148 number = 1 

149 

150 # Generate the fully expanded parameter table. 

151 values.append(range(number)) 

152 keys = list(kwargs.keys()) 

153 keys.append("version_id") 

154 param_table = Table(rows=list(itertools.product(*values)), names=keys) 

155 

156 # Generate on-sky coordinate pairs. 

157 if not seed: 

158 seed = str(np.diff(ra_lim)[0] * np.diff(dec_lim)[0]) 

159 # Random seed is the lower 32 bits of the hashed name. 

160 # We use hashlib.sha256 for guaranteed repeatability. 

161 hex_hash = hashlib.sha256(seed.encode("UTF-8")).hexdigest() 

162 hashed_seed = int("0x" + hex_hash, 0) & 0xFFFFFFFF 

163 sampler = qmc.Halton(d=2, seed=hashed_seed) 

164 sample = sampler.random(n=len(param_table)) 

165 # Flip RA values if no WCS given. 

166 if not wcs: 

167 sample[:, 0] = 1 - sample[:, 0] 

168 xy_coords = Table(qmc.scale(sample, [xlim[0], ylim[0]], [xlim[1], ylim[1]]), names=("x", "y")) 

169 if wcs: 

170 ra_coords, dec_coords = wcs.pixelToSkyArray(xy_coords["x"], xy_coords["y"], degrees=True) 

171 sky_coords = Table([ra_coords, dec_coords], names=("ra", "dec")) 

172 else: 

173 sky_coords = Table(xy_coords, names=("ra", "dec")) 

174 # Perform an additional random permutation of the sky coordinate pairs to 

175 # minimize the potential for on-sky parameter correlations. 

176 rng = np.random.default_rng(hashed_seed) 

177 sky_coords = Table(rng.permutation(sky_coords)) 

178 

179 # Generate random magnitudes if limits are specified 

180 if mag_lim: 

181 mag_sampler = qmc.Halton(d=1, seed=hashed_seed) 

182 mag_sample = mag_sampler.random(n=len(param_table)) 

183 mags = Table(qmc.scale(mag_sample, mag_lim[0], mag_lim[1]), names=("mag",)) 

184 sky_coords = hstack([sky_coords, mags]) 

185 

186 # Generate the unique injection ID and construct the final table. 

187 source_id = np.concatenate([([i] * number) for i in range(int(len(param_table) / number))]) 

188 injection_id = param_table["version_id"] + source_id * int(10 ** np.ceil(np.log10(number))) 

189 injection_id.name = "injection_id" 

190 table = hstack([injection_id, sky_coords, param_table]) 

191 table.remove_column("version_id") 

192 

193 # Final logger report and return. 

194 if number == 1: 

195 extra_info = f"{len(table)} unique sources." 

196 else: 

197 num_combinations = int(len(table) / number) 

198 grammar = "combination" if num_combinations == 1 else "combinations" 

199 extra_info = f"{len(table)} sources: {num_combinations} {grammar} repeated {number} times." 

200 logger.info("Generated an injection catalog containing %s", extra_info) 

201 return table