Coverage for python / lsst / meas / transiNet / rbTransiNetInterface.py: 28%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:27 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:27 +0000
1# This file is part of meas_transiNet.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["RBTransiNetInterface", "CutoutInputs"]
24import dataclasses
25import math
27import numpy as np
28import torch
30import lsst.utils.logging
32from .modelPackages.nnModelPackage import NNModelPackage
35@dataclasses.dataclass(frozen=True, kw_only=True)
36class CutoutInputs:
37 """Science/template/difference cutouts of a single object plus other
38 metadata.
39 """
40 science: np.ndarray
41 template: np.ndarray
42 difference: np.ndarray
44 label: bool = None
45 """Known truth of whether this is a real or bogus object."""
48class RBTransiNetInterface:
49 """ The interface between the LSST AP pipeline and a trained pytorch-based
50 RBTransiNet neural network model.
52 Parameters
53 ----------
54 task : `lsst.meas.transiNet.RBTransiNetTask`
55 The task that is using this interface: the 'left side'.
56 model_package_name : `str`
57 Name of the model package to load.
58 package_storage_mode : {'local', 'neighbor'}
59 Storage mode of the model package
60 device : `str`
61 Device to load and run the neural network on, e.g. 'cpu' or 'cuda:0'
62 """
64 def __init__(self, task, device='cpu'):
65 self.task = task
67 # in case the model package name is not set at this stage, it is not
68 # needed (e.g. in butler mode).
69 self.model_package_name = task.config.modelPackageName or 'N/A'
71 self.package_storage_mode = task.config.modelPackageStorageMode
72 self.device = device
73 self.init_model()
75 def init_model(self):
76 """Create and initialize an NN model
77 """
79 if self.package_storage_mode == 'butler' and self.task.butler_loaded_package is None:
80 raise RuntimeError("RBTransiNetInterface is trying to load a butler-mode NN model package, "
81 "but the RBTransiNetTask has not passed down a preloaded payload.")
83 model_package = NNModelPackage(model_package_name=self.model_package_name,
84 package_storage_mode=self.package_storage_mode,
85 butler_loaded_package=self.task.butler_loaded_package)
86 self.model = model_package.load(self.device)
88 # Put the model in evaluation mode instead of training model.
89 self.model.eval()
91 def input_to_batches(self, inputs, batchSize):
92 """Convert a list of inputs to a generator of batches.
94 Parameters
95 ----------
96 inputs : `list` [`CutoutInputs`]
97 Inputs to be scored.
99 Returns
100 -------
101 batches : `generator`
102 Generator of batches of inputs.
103 """
104 for i in range(0, len(inputs), batchSize):
105 yield inputs[i:i + batchSize]
107 def prepare_input(self, inputs):
108 """Convert inputs from numpy arrays, etc. to a torch.tensor blob.
110 Parameters
111 ----------
112 inputs : `list` [`CutoutInputs`]
113 Inputs to be scored.
115 Returns
116 -------
117 blob
118 Prepared torch tensor blob to run the model on.
119 labels
120 Truth labels, concatenated into a single list.
121 """
122 cutoutsList = []
123 labelsList = []
124 for inp in inputs:
125 # Convert each cutout to a torch tensor
126 template = torch.from_numpy(inp.template)
127 science = torch.from_numpy(inp.science)
128 difference = torch.from_numpy(inp.difference)
130 # Stack the components to create a single blob
131 # dimensions should be 3 x width x height
132 singleBlob = torch.stack([difference, science, template], dim=0)
134 # And append them to the temporary list
135 cutoutsList.append(singleBlob)
137 labelsList.append(inp.label)
139 blob = torch.stack(cutoutsList)
141 return blob, labelsList
143 def infer(self, inputs):
144 """Return the score of this cutout.
146 Parameters
147 ----------
148 inputs : `list` [`CutoutInputs`]
149 Inputs to be scored.
151 Returns
152 -------
153 scores : `numpy.array`
154 Float scores for each element of ``inputs``.
155 """
157 # Handle empty inputs gracefully.
158 if not inputs:
159 return np.array([])
161 # Convert the inputs to batches.
162 # TODO: The batch size is set to 64 for now. Later when
163 # deploying parallel instances of the task, memory limits
164 # should be taken into account, if necessary.
165 batch_size = 64
166 batches = self.input_to_batches(inputs, batchSize=batch_size)
168 # Log every 10 seconds as proof of liveness.
169 logger = lsst.utils.logging.PeriodicLogger(self.task.log, interval=10.0)
170 n_batches = math.ceil(len(inputs) / batch_size)
172 # Loop over the batches
173 for i, batch in enumerate(batches):
174 logger.log("%s/%s batches have been scored.", i, n_batches)
175 torchBlob, labelsList = self.prepare_input(batch)
177 # Run the model
178 with torch.no_grad():
179 output = self.model(torchBlob)
181 # And append the results to the list
182 if i == 0:
183 scores = output
184 else:
185 scores = torch.cat((scores, output.cpu()), dim=0)
187 npyScores = scores.detach().numpy().ravel()
188 return npyScores