Coverage for python / lsst / meas / transiNet / rbTransiNetInterface.py: 28%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:05 +0000

1# This file is part of meas_transiNet. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["RBTransiNetInterface", "CutoutInputs"] 

23 

24import dataclasses 

25import math 

26 

27import numpy as np 

28import torch 

29 

30import lsst.utils.logging 

31 

32from .modelPackages.nnModelPackage import NNModelPackage 

33 

34 

35@dataclasses.dataclass(frozen=True, kw_only=True) 

36class CutoutInputs: 

37 """Science/template/difference cutouts of a single object plus other 

38 metadata. 

39 """ 

40 science: np.ndarray 

41 template: np.ndarray 

42 difference: np.ndarray 

43 

44 label: bool = None 

45 """Known truth of whether this is a real or bogus object.""" 

46 

47 

48class RBTransiNetInterface: 

49 """ The interface between the LSST AP pipeline and a trained pytorch-based 

50 RBTransiNet neural network model. 

51 

52 Parameters 

53 ---------- 

54 task : `lsst.meas.transiNet.RBTransiNetTask` 

55 The task that is using this interface: the 'left side'. 

56 model_package_name : `str` 

57 Name of the model package to load. 

58 package_storage_mode : {'local', 'neighbor'} 

59 Storage mode of the model package 

60 device : `str` 

61 Device to load and run the neural network on, e.g. 'cpu' or 'cuda:0' 

62 """ 

63 

64 def __init__(self, task, device='cpu'): 

65 self.task = task 

66 

67 # in case the model package name is not set at this stage, it is not 

68 # needed (e.g. in butler mode). 

69 self.model_package_name = task.config.modelPackageName or 'N/A' 

70 

71 self.package_storage_mode = task.config.modelPackageStorageMode 

72 self.device = device 

73 self.init_model() 

74 

75 def init_model(self): 

76 """Create and initialize an NN model 

77 """ 

78 

79 if self.package_storage_mode == 'butler' and self.task.butler_loaded_package is None: 

80 raise RuntimeError("RBTransiNetInterface is trying to load a butler-mode NN model package, " 

81 "but the RBTransiNetTask has not passed down a preloaded payload.") 

82 

83 model_package = NNModelPackage(model_package_name=self.model_package_name, 

84 package_storage_mode=self.package_storage_mode, 

85 butler_loaded_package=self.task.butler_loaded_package) 

86 self.model = model_package.load(self.device) 

87 

88 # Put the model in evaluation mode instead of training model. 

89 self.model.eval() 

90 

91 def input_to_batches(self, inputs, batchSize): 

92 """Convert a list of inputs to a generator of batches. 

93 

94 Parameters 

95 ---------- 

96 inputs : `list` [`CutoutInputs`] 

97 Inputs to be scored. 

98 

99 Returns 

100 ------- 

101 batches : `generator` 

102 Generator of batches of inputs. 

103 """ 

104 for i in range(0, len(inputs), batchSize): 

105 yield inputs[i:i + batchSize] 

106 

107 def prepare_input(self, inputs): 

108 """Convert inputs from numpy arrays, etc. to a torch.tensor blob. 

109 

110 Parameters 

111 ---------- 

112 inputs : `list` [`CutoutInputs`] 

113 Inputs to be scored. 

114 

115 Returns 

116 ------- 

117 blob 

118 Prepared torch tensor blob to run the model on. 

119 labels 

120 Truth labels, concatenated into a single list. 

121 """ 

122 cutoutsList = [] 

123 labelsList = [] 

124 for inp in inputs: 

125 # Convert each cutout to a torch tensor 

126 template = torch.from_numpy(inp.template) 

127 science = torch.from_numpy(inp.science) 

128 difference = torch.from_numpy(inp.difference) 

129 

130 # Stack the components to create a single blob 

131 # dimensions should be 3 x width x height 

132 singleBlob = torch.stack([difference, science, template], dim=0) 

133 

134 # And append them to the temporary list 

135 cutoutsList.append(singleBlob) 

136 

137 labelsList.append(inp.label) 

138 

139 blob = torch.stack(cutoutsList) 

140 

141 return blob, labelsList 

142 

143 def infer(self, inputs): 

144 """Return the score of this cutout. 

145 

146 Parameters 

147 ---------- 

148 inputs : `list` [`CutoutInputs`] 

149 Inputs to be scored. 

150 

151 Returns 

152 ------- 

153 scores : `numpy.array` 

154 Float scores for each element of ``inputs``. 

155 """ 

156 

157 # Handle empty inputs gracefully. 

158 if not inputs: 

159 return np.array([]) 

160 

161 # Convert the inputs to batches. 

162 # TODO: The batch size is set to 64 for now. Later when 

163 # deploying parallel instances of the task, memory limits 

164 # should be taken into account, if necessary. 

165 batch_size = 64 

166 batches = self.input_to_batches(inputs, batchSize=batch_size) 

167 

168 # Log every 10 seconds as proof of liveness. 

169 logger = lsst.utils.logging.PeriodicLogger(self.task.log, interval=10.0) 

170 n_batches = math.ceil(len(inputs) / batch_size) 

171 

172 # Loop over the batches 

173 for i, batch in enumerate(batches): 

174 logger.log("%s/%s batches have been scored.", i, n_batches) 

175 torchBlob, labelsList = self.prepare_input(batch) 

176 

177 # Run the model 

178 with torch.no_grad(): 

179 output = self.model(torchBlob) 

180 

181 # And append the results to the list 

182 if i == 0: 

183 scores = output 

184 else: 

185 scores = torch.cat((scores, output.cpu()), dim=0) 

186 

187 npyScores = scores.detach().numpy().ravel() 

188 return npyScores