Coverage for python / lsst / pipe / tasks / rgb2hips / _low_order_hips.py: 0%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:45 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ("LowOrderHipsTaskConnections", "LowOrderHipsTaskConfig", "LowOrderHipsTask") 

24 

25import numpy as np 

26import cv2 

27 

28from lsst.daf.butler import DeferredDatasetHandle 

29from lsst.pipe.base import ( 

30 PipelineTask, 

31 PipelineTaskConfig, 

32 PipelineTaskConnections, 

33 Struct, 

34 QuantumContext, 

35 InputQuantizedConnection, 

36 OutputQuantizedConnection, 

37) 

38 

39from lsst.pex.config import Field, ChoiceField 

40from lsst.pipe.base.connectionTypes import Input 

41from lsst.resources import ResourcePath 

42 

43from collections.abc import Iterable 

44 

45from numpy.typing import NDArray 

46 

47from ._utils import _write_hips_image 

48 

49 

50class LowOrderHipsTaskConnections(PipelineTaskConnections, dimensions=tuple()): 

51 input_hips = Input( 

52 doc="Hips pixels at level 8 used to build higher orders", 

53 name="rgb_picture_hips8", 

54 storageClass="NumpyArray", 

55 multiple=True, 

56 deferLoad=True, 

57 dimensions=("healpix8",), 

58 ) 

59 

60 def __init__(self, *, config: LowOrderHipsTaskConfig): 

61 # Set the quantum dimensions to whatever the minimum order healpix 

62 # to produce is. 

63 self.dimensions = set( 

64 (f"healpix{config.min_order}",), 

65 ) 

66 

67 

68class LowOrderHipsTaskConfig(PipelineTaskConfig, pipelineConnections=LowOrderHipsTaskConnections): 

69 min_order = Field[int]( 

70 doc="Minimum healpix order for HiPS tree.", 

71 default=3, 

72 ) 

73 hips_base_uri = Field[str]( 

74 doc="URI to HiPS base for output.", 

75 optional=False, 

76 ) 

77 color_ordering = Field[str]( 

78 doc=( 

79 "A string of the astrophysical bands that correspond to the RGB channels in the color image " 

80 "inputs to high_order_hips task. This is in making the hips metadata" 

81 ), 

82 optional=False, 

83 ) 

84 file_extension = ChoiceField[str]( 

85 doc="Extension for the presisted image", 

86 allowed={"png": "Use the png image extension", "webp": "Use the webp image extension"}, 

87 default="png", 

88 ) 

89 array_type = ChoiceField[str]( 

90 doc="The dataset type for the output image array", 

91 default="uint8", 

92 allowed={ 

93 "uint8": "Use 8 bit arrays, 255 max", 

94 "uint16": "Use 16 bit arrays, 65535 max", 

95 "half": "Use 16 bit float arrays, 1 max", 

96 "float": "Use 32 bit float arrays, 1 max", 

97 }, 

98 ) 

99 

100 def validate(self): 

101 if self.min_order >= 8: 

102 raise ValueError("The minimum order must be less than 8.") 

103 

104 

105class LowOrderHipsTask(PipelineTask): 

106 """`PipelineTask` to create low order hips tiles. 

107 

108 This task reads in healpix 8 tiles, which have already been down sampled, 

109 and assembles them into progressively lower hips order tiles. 

110 

111 This task has special permission to write to locations outside the butler. 

112 Don't emulate this in other tasks. 

113 """ 

114 

115 _DefaultName = "lowOrderHipsTask" 

116 ConfigClass = LowOrderHipsTaskConfig 

117 

118 config: ConfigClass 

119 

120 def __init__(self, **kwargs): 

121 super().__init__(**kwargs) 

122 self.hips_base_path = ResourcePath(self.config.hips_base_uri, forceDirectory=True) 

123 self.hips_base_path = self.hips_base_path.join( 

124 f"color_{self.config.color_ordering}", forceDirectory=True 

125 ) 

126 

127 def run(self, hpx_container: Iterable[tuple[DeferredDatasetHandle, int]]) -> Struct: 

128 """Produce Hips images with hips order 8 inputs to the configured min_order. 

129 

130 Parameters 

131 ---------- 

132 hpx_container : `Iterable` of `tuple` of `DeferredDatasetHanle`, `int` 

133 This is an iterable of handles to already down-sampled hpx order 8 

134 arrays and their corresponding order 8 pixel id. 

135 

136 Returns 

137 ------- 

138 result : `Struct` 

139 This tasks does not produce an output, so will return an empty `Struct` 

140 

141 """ 

142 # loop over each order, assembling the previous order tiles into 

143 # an array, and writing the image. Resample each image smaller, 

144 # and continue downward in order. 

145 # This must 7 here based on the outputs of HighOrderHipsTask being 

146 # healpix order 8 pixels. 

147 for order in range(7, self.config.min_order - 1, -1): 

148 self.log.info("Processing order %d", order) 

149 # sort the previous order's pixels into a mapping with keys of 

150 # this order's pixel to the corresponding previous orders pixels 

151 # that are contained within that key. 

152 hpx_next_mapping = self._create_sorted_container(hpx_container) 

153 

154 hpx_next_container = [] 

155 npix = 512 

156 size_thresh = len(hpx_next_mapping) // 10 

157 size_counter = 0 

158 percent_counter = 0 

159 for hpx_next_id, hpx_next_items in hpx_next_mapping.items(): 

160 # Print out a log message every so often for a liveness 

161 # check 

162 if size_counter > size_thresh: 

163 percent_counter += 10 

164 self.log.info("Done %d percent", percent_counter) 

165 size_counter = 0 

166 # allocate a container for the pixel being assembled 

167 hpx_next_array = np.zeros((npix, npix, 3), dtype=np.float32) 

168 for img_prev, hpx_prev_id in hpx_next_items: 

169 if order == 7: 

170 # These are saved out in float32 from the previous task 

171 img_prev: NDArray = img_prev.get() 

172 # determine which sub pixel quadrant this belongs to in the next orders 

173 # pixel and assign. 

174 sub_index = hpx_prev_id - np.left_shift(hpx_next_id, 2) 

175 match sub_index: 

176 case 0: 

177 hpx_next_array[0 : npix // 2 :, 0 : npix // 2] = img_prev 

178 case 1: 

179 hpx_next_array[npix // 2 :, 0 : npix // 2] = img_prev 

180 case 2: 

181 hpx_next_array[0 : npix // 2, npix // 2 :] = img_prev 

182 case 3: 

183 hpx_next_array[npix // 2 :, npix // 2 :] = img_prev 

184 # Write out the hips image 

185 _write_hips_image( 

186 hpx_next_array, 

187 hpx_next_id, 

188 order, 

189 self.hips_base_path, 

190 self.config.file_extension, 

191 self.config.array_type, 

192 ) 

193 size_counter += 1 

194 

195 # resample the image to a smaller grid and store it for the next order 

196 zoomed = cv2.resize(hpx_next_array, (256, 256), interpolation=cv2.INTER_LANCZOS4) 

197 

198 hpx_next_container.append((zoomed, hpx_next_id)) 

199 hpx_container = hpx_next_container 

200 return Struct() 

201 

202 def _create_sorted_container( 

203 self, 

204 hpx_container: Iterable[tuple[NDArray | DeferredDatasetHandle, int]], 

205 ) -> dict[int, Iterable[tuple[NDArray | DeferredDatasetHandle, int]]]: 

206 """Sort a list of [images (or handels), hpx_id] into corresponding pixels at a higher order.""" 

207 hpx_output_mapping = {} 

208 for pair in hpx_container: 

209 hpx_output_id = np.right_shift(pair[1], 2) 

210 hpx_output_container = hpx_output_mapping.setdefault(hpx_output_id, []) 

211 hpx_output_container.append(pair) 

212 return hpx_output_mapping 

213 

214 def runQuantum( 

215 self, 

216 butlerQC: QuantumContext, 

217 inputRefs: InputQuantizedConnection, 

218 outputRefs: OutputQuantizedConnection, 

219 ) -> None: 

220 # get the hips handles and their pixel 

221 hpx_container = [] 

222 for ref in inputRefs.input_hips: 

223 hpx_container.append((butlerQC.get(ref), ref.dataId["healpix8"])) 

224 

225 outputs = self.run(hpx_container) 

226 butlerQC.put(outputs, outputRefs)