lsst.pipe.tasks gcf790cdeb6+0604939b8f
Loading...
Searching...
No Matches
_all_sky_hips.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
22
23__all__ = ("AllSkyHipsTaskConnections", "AllSkyHipsTaskConfig", "AllSkyHipsTask")
24
25from PIL import Image
26import io
27import math
28import re
29import hpgeom as hpg
30import numpy as np
31import healsparse as hsp
32from dataclasses import replace
33from datetime import datetime
34from collections.abc import Iterable
35
36
37from lsst.resources import ResourcePath
38from lsst.pex.config import Field, ConfigField, ChoiceField
39from lsst.sphgeom import RangeSet
40
41from lsst.pipe.base import (
42 PipelineTask,
43 PipelineTaskConfig,
44 PipelineTaskConnections,
45 Struct,
46 QuantumContext,
47 InputQuantizedConnection,
48 OutputQuantizedConnection,
49 TaskMetadata,
50)
51from lsst.pipe.base.connectionTypes import Input
52
53
54from ._properties import HipsPropertiesConfig, _write_property
55from ..healSparseMapping import _is_power_of_two
56
57
59 PipelineTaskConnections,
60 dimensions=tuple(),
61 defaultTemplates={"input_task_label": "generateLowOrderHips"},
62):
63 low_order_metadata = Input(
64 doc="Metadata produced by the LowOrderHipsTask",
65 name="{input_task_label}_metadata",
66 storageClass="TaskMetadata",
67 multiple=True,
68 deferLoad=True,
69 dimensions=tuple(),
70 )
71 input_hips = Input(
72 doc="Hips pixels at level 8 used to build higher orders",
73 name="rgb_picture_hips8",
74 storageClass="NumpyArray",
75 multiple=True,
76 deferLoad=True,
77 dimensions=("healpix8",),
78 )
79
80 def __init__(self, *, config: AllSkyHipsTaskConfig):
81 # Set the input dimensions to whatever the minimum order healpix
82 # to produce is.
83 self.low_order_metadata = replace(
84 self.low_order_metadata, dimensions=set((f"healpix{config.min_order}",))
85 )
86
87
88class AllSkyHipsTaskConfig(PipelineTaskConfig, pipelineConnections=AllSkyHipsTaskConnections):
89 hips_base_uri = Field[str](
90 doc="URI to HiPS base for output.",
91 optional=False,
92 )
93 color_ordering = Field[str](
94 doc=(
95 "A string of the astrophysical bands that correspond to the RGB channels in the color image "
96 "inputs to high_order_hips task. This is in making the hips metadata"
97 ),
98 optional=False,
99 )
100 file_extension = ChoiceField[str](
101 doc="Extension for the presisted image.",
102 allowed={"png": "Use the png image extension", "webp": "Use the webp image extension"},
103 default="png",
104 )
105 properties = ConfigField[HipsPropertiesConfig](
106 doc="Configuration for properties file.",
107 )
108 allsky_tilesize = Field[int](
109 dtype=int,
110 doc="Allsky tile size; must be power of 2. HiPS standard recommends 64x64 tiles.",
111 default=64,
112 check=_is_power_of_two,
113 )
114 bluest_band = Field[str](
115 doc="Band corresponding to bluest color in image",
116 default="g",
117 )
118 reddest_band = Field[str](
119 doc="Band corresponding to reddest color in image",
120 default="i",
121 )
122 max_order = Field[int](doc="The maximum order hips that was produced", default=11)
123 data_bitpix = ChoiceField[str](
124 doc="The dtype of the original input data",
125 allowed={
126 "float64": "double precision",
127 "float32": "single precision",
128 "uint8": "8 bit uint",
129 "uint16": "16 bit uint",
130 "uint32": "32 bit uint",
131 "uint64": "64 bit uint",
132 },
133 default="float32",
134 )
135 hips_bitpix = ChoiceField[str](
136 doc="The dtype of the hips images",
137 allowed={
138 "float64": "double precision",
139 "float32": "single precision",
140 "uint8": "8 bit uint",
141 "uint16": "16 bit uint",
142 "uint32": "32 bit uint",
143 "uint64": "64 bit uint",
144 },
145 default="uint8",
146 )
147 shift_order = Field[int](
148 doc="Shift order of hips, right now must be 9 configuration for future options", default=9
149 )
150 min_order = Field[int](
151 doc="Minimum healpix order for HiPS tree.",
152 default=3,
153 )
154
155 def validate(self):
156 if self.shift_order != 9:
157 raise ValueError("Shift order must be 9.")
158 return super().validate()
159
160
161class AllSkyHipsTask(PipelineTask):
162 """Pipeline task for generating all-sky HealPix (HiPS) tiles and associated metadata."""
163
164 _DefaultName = "allSkyHipsTask"
165 ConfigClass = AllSkyHipsTaskConfig
166 config: ConfigClass
167
168 def __init__(self, **kwargs):
169 super().__init__(**kwargs)
170 self.hips_base_path = ResourcePath(self.config.hips_base_uri, forceDirectory=True)
171 self.hips_base_path = self.hips_base_path.join(
172 f"color_{self.config.color_ordering}", forceDirectory=True
173 )
174
175 def runQuantum(
176 self,
177 butlerQC: QuantumContext,
178 inputRefs: InputQuantizedConnection,
179 outputRefs: OutputQuantizedConnection,
180 ) -> None:
181 # Extract the healpix8 pixel ids.
182 hpx8_pixels = []
183 for ref in inputRefs.input_hips:
184 hpx8_pixels.append((ref.dataId["healpix8"]))
185
186 # Scale pixel IDS to higher order (hpx11) that were already produced
187 hpx8_rangeset = RangeSet(hpx8_pixels)
188 hpx11_rangeset = hpx8_rangeset.scaled(4**3)
189 hpx11_pixels = set()
190 for begin, end in hpx11_rangeset:
191 hpx11_pixels.update(range(begin, end))
192 hpx11_pixels = np.array([s for s in hpx11_pixels])
193
194 low_order_metadata = butlerQC.get(inputRefs.low_order_metadata)
195
196 outputs = self.run(low_order_metadata, hpx11_pixels)
197 butlerQC.put(outputs, outputRefs)
198
199 def run(self, low_order_metadata: Iterable[TaskMetadata], hpx11_pixels) -> Struct:
200 """Generate all-sky HealPix tiles and metadata.
201
202 Parameters
203 ----------
204 low_order_metadata : Iterable[TaskMetadata]
205 Low-order metadata from previous processing steps.
206 hpx11_pixels : array-like
207 Array of HPX11 pixel IDs to process.
208
209 Returns
210 -------
211 Struct
212 This task produces no outputs so an empty strct is returned
213 """
214 self._write_properties_and_moc(
215 self.config.max_order, hpx11_pixels, self.config.shift_order, self.config.color_ordering
216 )
217 self._write_allsky_file(self.config.min_order)
218 return Struct()
219
220 def _write_properties_and_moc(self, max_order, pixels, shift_order, band):
221 """Write HiPS properties file and MOC.
222
223 Parameters
224 ----------
225 max_order : `int`
226 Maximum HEALPix order.
227 pixels : `np.ndarray` (N,)
228 Array of pixels used.
229 shift_order : `int`
230 HPX shift order.
231 band : `str`
232 Band (or color).
233 """
234 area = hpg.nside_to_pixel_area(2**max_order, degrees=True) * len(pixels)
235
236 initial_ra = self.config.properties.initial_ra
237 initial_dec = self.config.properties.initial_dec
238 initial_fov = self.config.properties.initial_fov
239
240 if initial_ra is None or initial_dec is None or initial_fov is None:
241 # We want to point to an arbitrary pixel in the footprint.
242 # Just take the median pixel value for simplicity.
243 temp_pixels = pixels.copy()
244 if temp_pixels.size % 2 == 0:
245 temp_pixels = np.append(temp_pixels, [temp_pixels[0]])
246 medpix = int(np.median(temp_pixels))
247 _initial_ra, _initial_dec = hpg.pixel_to_angle(2**max_order, medpix)
248 _initial_fov = hpg.nside_to_resolution(2**max_order, units="arcminutes") / 60.0
249
250 if initial_ra is None or initial_dec is None:
251 initial_ra = _initial_ra
252 initial_dec = _initial_dec
253 if initial_fov is None:
254 initial_fov = _initial_fov
255
256 self._write_hips_properties_file(
257 self.config.properties,
258 band,
259 max_order,
260 shift_order,
261 area,
262 initial_ra,
263 initial_dec,
264 initial_fov,
265 )
266
267 # Write the MOC coverage
268 self._write_hips_moc_file(
269 max_order,
270 pixels,
271 )
272
273 def _write_hips_properties_file(
274 self,
275 properties_config,
276 band,
277 max_order,
278 shift_order,
279 area,
280 initial_ra,
281 initial_dec,
282 initial_fov,
283 ):
284 """Write HiPS properties file.
285
286 Parameters
287 ----------
288 properties_config : `lsst.pipe.tasks.hips.HipsPropertiesConfig`
289 Configuration for properties values.
290 band : `str`
291 Name of band(s) for HiPS tree.
292 max_order : `int`
293 Maximum HEALPix order.
294 shift_order : `int`
295 HPX shift order.
296 area : `float`
297 Coverage area in square degrees.
298 initial_ra : `float`
299 Initial HiPS RA position (degrees).
300 initial_dec : `float`
301 Initial HiPS Dec position (degrees).
302 initial_fov : `float`
303 Initial HiPS display size (degrees).
304 """
305
306 match self.config.data_bitpix:
307 case "float64":
308 bitpix = -64
309 case "float32":
310 bitpix = -32
311 case "uint8":
312 bitpix = 8
313 case "uint16":
314 bitpix = 16
315 case "uint32":
316 bitpix = 32
317 case "uint64":
318 bitpix = 64
319
320 match self.config.hips_bitpix:
321 case "float64":
322 hbitpix = -64
323 case "float32":
324 hbitpix = -32
325 case "uint8":
326 hbitpix = 8
327 case "uint16":
328 hbitpix = 16
329 case "uint32":
330 hbitpix = 32
331 case "uint64":
332 hbitpix = 64
333
334 date_iso8601 = datetime.utcnow().isoformat(timespec="seconds") + "Z"
335 pixel_scale = hpg.nside_to_resolution(2 ** (max_order + shift_order), units="degrees")
336
337 uri = self.hips_base_path.join("properties")
338 with ResourcePath.temporary_uri(suffix=uri.getExtension()) as temporary_uri:
339 with open(temporary_uri.ospath, "w") as fh:
340 _write_property(
341 fh,
342 "creator_did",
343 properties_config.creator_did_template.format(band=band),
344 )
345 if properties_config.obs_collection is not None:
346 _write_property(fh, "obs_collection", properties_config.obs_collection)
347 _write_property(
348 fh,
349 "obs_title",
350 properties_config.obs_title_template.format(band=band),
351 )
352 if properties_config.obs_description_template is not None:
353 _write_property(
354 fh,
355 "obs_description",
356 properties_config.obs_description_template.format(band=band),
357 )
358 if len(properties_config.prov_progenitor) > 0:
359 for prov_progenitor in properties_config.prov_progenitor:
360 _write_property(fh, "prov_progenitor", prov_progenitor)
361 if properties_config.obs_ack is not None:
362 _write_property(fh, "obs_ack", properties_config.obs_ack)
363 _write_property(fh, "obs_regime", "Optical")
364 _write_property(fh, "data_pixel_bitpix", str(bitpix))
365 _write_property(fh, "dataproduct_type", "image")
366 _write_property(fh, "moc_sky_fraction", str(area / 41253.0))
367 _write_property(fh, "data_ucd", "phot.flux")
368 _write_property(fh, "hips_creation_date", date_iso8601)
369 _write_property(fh, "hips_builder", "lsst.pipe.tasks.hips.GenerateHipsTask")
370 _write_property(fh, "hips_creator", "Vera C. Rubin Observatory")
371 _write_property(fh, "hips_version", "1.4")
372 _write_property(fh, "hips_release_date", date_iso8601)
373 _write_property(fh, "hips_frame", "equatorial")
374 _write_property(fh, "hips_order", str(max_order))
375 _write_property(fh, "hips_tile_width", str(2**shift_order))
376 _write_property(fh, "hips_status", "private master clonableOnce")
377 _write_property(fh, "hips_tile_format", self.config.file_extension)
378 _write_property(fh, "dataproduct_subtype", "color")
379 _write_property(fh, "hips_pixel_bitpix", str(hbitpix))
380 _write_property(fh, "hips_pixel_scale", str(pixel_scale))
381 _write_property(fh, "hips_initial_ra", str(initial_ra))
382 _write_property(fh, "hips_initial_dec", str(initial_dec))
383 _write_property(fh, "hips_initial_fov", str(initial_fov))
384 if self.config.bluest_band in properties_config.spectral_ranges:
385 em_min = properties_config.spectral_ranges[self.config.bluest_band].lambda_min / 1e9
386 else:
387 self.log.warning("blue band %s not in self.config.spectral_ranges.", band)
388 em_min = 3e-7
389 if self.config.reddest_band in properties_config.spectral_ranges:
390 em_max = properties_config.spectral_ranges[self.config.reddest_band].lambda_max / 1e9
391 else:
392 self.log.warning("red band %s not in self.config.spectral_ranges.", band)
393 em_max = 1e-6
394 _write_property(fh, "em_min", str(em_min))
395 _write_property(fh, "em_max", str(em_max))
396 if properties_config.t_min is not None:
397 _write_property(fh, "t_min", properties_config.t_min)
398 if properties_config.t_max is not None:
399 _write_property(fh, "t_max", properties_config.t_max)
400
401 uri.transfer_from(temporary_uri, transfer="copy", overwrite=True)
402
403 def _write_hips_moc_file(self, max_order, pixels, min_uniq_order=1):
404 """Write HiPS MOC file.
405
406 Parameters
407 ----------
408 max_order : `int`
409 Maximum HEALPix order.
410 pixels : `np.ndarray`
411 Array of pixels covered.
412 min_uniq_order : `int`, optional
413 Minimum HEALPix order for looking for fully covered pixels.
414 """
415 # WARNING: In general PipelineTasks are not allowed to do any outputs
416 # outside of the butler. This task has been given (temporary)
417 # Special Dispensation because of the nature of HiPS outputs until
418 # a more controlled solution can be found.
419
420 # Make a healsparse map which provides easy degrade/comparisons.
421 hspmap = hsp.HealSparseMap.make_empty(2**min_uniq_order, 2**max_order, dtype=np.int8)
422 hspmap[pixels] = 1
423
424 uri = self.hips_base_path.join("Moc.fits")
425 with ResourcePath.temporary_uri(suffix=uri.getExtension()) as temporary_uri:
426 hspmap.write_moc(temporary_uri.ospath)
427 uri.transfer_from(temporary_uri, transfer="copy", overwrite=True)
428
429 def _write_allsky_file(self, allsky_order):
430 """Write an Allsky.png file.
431
432 Parameters
433 ----------
434 allsky_order : `int`
435 HEALPix order of the minimum order to make allsky file.
436 """
437 tile_size = self.config.allsky_tilesize
438
439 # The Allsky file format is described in
440 # https://www.ivoa.net/documents/HiPS/20170519/REC-HIPS-1.0-20170519.pdf
441 # From S4.3.2:
442 # The Allsky file is built as an array of tiles, stored side by side in
443 # the left-to-right order. The width of this array must be the square
444 # root of the number of the tiles of the order. For instance, the width
445 # of this array at order 3 is 27 ( (int)sqrt(768) ). To avoid having a
446 # too large Allsky file, the resolution of each tile may be reduced but
447 # must stay a power of two (typically 64x64 pixels rather than 512x512).
448
449 n_tiles = hpg.nside_to_npixel(hpg.order_to_nside(allsky_order))
450 n_tiles_wide = int(np.floor(np.sqrt(n_tiles)))
451 n_tiles_high = int(np.ceil(n_tiles / n_tiles_wide))
452
453 allsky_image = None
454
455 allsky_order_uri = self.hips_base_path.join(f"Norder{allsky_order}", forceDirectory=True)
456 if self.config.file_extension == "png":
457 pixel_regex = re.compile(r"Npix([0-9]+)\.png$")
458 elif self.config.file_extension == "webp":
459 pixel_regex = re.compile(r"Npix([0-9]+)\.webp$")
460 else:
461 raise RuntimeError("Unknown file extension")
462
463 image_uris = list(
464 ResourcePath.findFileResources(
465 candidates=[allsky_order_uri],
466 file_filter=pixel_regex,
467 )
468 )
469
470 for image_uri in image_uris:
471 matches = re.match(pixel_regex, image_uri.basename())
472 pix_num = int(matches.group(1))
473 tile_image = Image.open(io.BytesIO(image_uri.read()))
474 row = math.floor(pix_num // n_tiles_wide)
475 column = pix_num % n_tiles_wide
476 box = (column * tile_size, row * tile_size, (column + 1) * tile_size, (row + 1) * tile_size)
477 tile_image_shrunk = tile_image.resize((tile_size, tile_size))
478
479 if allsky_image is None:
480 allsky_image = Image.new(
481 tile_image.mode,
482 (n_tiles_wide * tile_size, n_tiles_high * tile_size),
483 )
484 allsky_image.paste(tile_image_shrunk, box)
485
486 uri = allsky_order_uri.join(f"Allsky.{self.config.file_extension}")
487
488 with ResourcePath.temporary_uri(suffix=uri.getExtension()) as temporary_uri:
489 allsky_image.save(temporary_uri.ospath)
490
491 uri.transfer_from(temporary_uri, transfer="copy", overwrite=True)