21from __future__
import annotations
23__all__ = (
"AllSkyHipsTaskConnections",
"AllSkyHipsTaskConfig",
"AllSkyHipsTask")
31import healsparse
as hsp
32from dataclasses
import replace
33from datetime
import datetime
34from collections.abc
import Iterable
37from lsst.resources
import ResourcePath
41from lsst.pipe.base
import (
44 PipelineTaskConnections,
47 InputQuantizedConnection,
48 OutputQuantizedConnection,
51from lsst.pipe.base.connectionTypes
import Input
54from ._properties
import HipsPropertiesConfig, _write_property
55from ..healSparseMapping
import _is_power_of_two
59 PipelineTaskConnections,
61 defaultTemplates={
"input_task_label":
"generateLowOrderHips"},
63 low_order_metadata = Input(
64 doc=
"Metadata produced by the LowOrderHipsTask",
65 name=
"{input_task_label}_metadata",
66 storageClass=
"TaskMetadata",
72 doc=
"Hips pixels at level 8 used to build higher orders",
73 name=
"rgb_picture_hips8",
74 storageClass=
"NumpyArray",
77 dimensions=(
"healpix8",),
80 def __init__(self, *, config: AllSkyHipsTaskConfig):
83 self.low_order_metadata = replace(
84 self.low_order_metadata, dimensions=set((f
"healpix{config.min_order}",))
88class AllSkyHipsTaskConfig(PipelineTaskConfig, pipelineConnections=AllSkyHipsTaskConnections):
89 hips_base_uri = Field[str](
90 doc=
"URI to HiPS base for output.",
93 color_ordering = Field[str](
95 "A string of the astrophysical bands that correspond to the RGB channels in the color image "
96 "inputs to high_order_hips task. This is in making the hips metadata"
100 file_extension = ChoiceField[str](
101 doc=
"Extension for the presisted image.",
102 allowed={
"png":
"Use the png image extension",
"webp":
"Use the webp image extension"},
105 properties = ConfigField[HipsPropertiesConfig](
106 doc=
"Configuration for properties file.",
108 allsky_tilesize = Field[int](
110 doc=
"Allsky tile size; must be power of 2. HiPS standard recommends 64x64 tiles.",
112 check=_is_power_of_two,
114 bluest_band = Field[str](
115 doc=
"Band corresponding to bluest color in image",
118 reddest_band = Field[str](
119 doc=
"Band corresponding to reddest color in image",
122 max_order = Field[int](doc=
"The maximum order hips that was produced", default=11)
123 data_bitpix = ChoiceField[str](
124 doc=
"The dtype of the original input data",
126 "float64":
"double precision",
127 "float32":
"single precision",
128 "uint8":
"8 bit uint",
129 "uint16":
"16 bit uint",
130 "uint32":
"32 bit uint",
131 "uint64":
"64 bit uint",
135 hips_bitpix = ChoiceField[str](
136 doc=
"The dtype of the hips images",
138 "float64":
"double precision",
139 "float32":
"single precision",
140 "uint8":
"8 bit uint",
141 "uint16":
"16 bit uint",
142 "uint32":
"32 bit uint",
143 "uint64":
"64 bit uint",
147 shift_order = Field[int](
148 doc=
"Shift order of hips, right now must be 9 configuration for future options", default=9
150 min_order = Field[int](
151 doc=
"Minimum healpix order for HiPS tree.",
156 if self.shift_order != 9:
157 raise ValueError(
"Shift order must be 9.")
158 return super().validate()
161class AllSkyHipsTask(PipelineTask):
162 """Pipeline task for generating all-sky HealPix (HiPS) tiles and associated metadata."""
164 _DefaultName =
"allSkyHipsTask"
165 ConfigClass = AllSkyHipsTaskConfig
168 def __init__(self, **kwargs):
169 super().__init__(**kwargs)
170 self.hips_base_path = ResourcePath(self.config.hips_base_uri, forceDirectory=
True)
171 self.hips_base_path = self.hips_base_path.join(
172 f
"color_{self.config.color_ordering}", forceDirectory=
True
177 butlerQC: QuantumContext,
178 inputRefs: InputQuantizedConnection,
179 outputRefs: OutputQuantizedConnection,
183 for ref
in inputRefs.input_hips:
184 hpx8_pixels.append((ref.dataId[
"healpix8"]))
187 hpx8_rangeset = RangeSet(hpx8_pixels)
188 hpx11_rangeset = hpx8_rangeset.scaled(4**3)
190 for begin, end
in hpx11_rangeset:
191 hpx11_pixels.update(range(begin, end))
192 hpx11_pixels = np.array([s
for s
in hpx11_pixels])
194 low_order_metadata = butlerQC.get(inputRefs.low_order_metadata)
196 outputs = self.run(low_order_metadata, hpx11_pixels)
197 butlerQC.put(outputs, outputRefs)
199 def run(self, low_order_metadata: Iterable[TaskMetadata], hpx11_pixels) -> Struct:
200 """Generate all-sky HealPix tiles and metadata.
204 low_order_metadata : Iterable[TaskMetadata]
205 Low-order metadata from previous processing steps.
206 hpx11_pixels : array-like
207 Array of HPX11 pixel IDs to process.
212 This task produces no outputs so an empty strct is returned
214 self._write_properties_and_moc(
215 self.config.max_order, hpx11_pixels, self.config.shift_order, self.config.color_ordering
217 self._write_allsky_file(self.config.min_order)
220 def _write_properties_and_moc(self, max_order, pixels, shift_order, band):
221 """Write HiPS properties file and MOC.
226 Maximum HEALPix order.
227 pixels : `np.ndarray` (N,)
228 Array of pixels used.
234 area = hpg.nside_to_pixel_area(2**max_order, degrees=
True) * len(pixels)
236 initial_ra = self.config.properties.initial_ra
237 initial_dec = self.config.properties.initial_dec
238 initial_fov = self.config.properties.initial_fov
240 if initial_ra
is None or initial_dec
is None or initial_fov
is None:
243 temp_pixels = pixels.copy()
244 if temp_pixels.size % 2 == 0:
245 temp_pixels = np.append(temp_pixels, [temp_pixels[0]])
246 medpix = int(np.median(temp_pixels))
247 _initial_ra, _initial_dec = hpg.pixel_to_angle(2**max_order, medpix)
248 _initial_fov = hpg.nside_to_resolution(2**max_order, units=
"arcminutes") / 60.0
250 if initial_ra
is None or initial_dec
is None:
251 initial_ra = _initial_ra
252 initial_dec = _initial_dec
253 if initial_fov
is None:
254 initial_fov = _initial_fov
256 self._write_hips_properties_file(
257 self.config.properties,
268 self._write_hips_moc_file(
273 def _write_hips_properties_file(
284 """Write HiPS properties file.
288 properties_config : `lsst.pipe.tasks.hips.HipsPropertiesConfig`
289 Configuration for properties values.
291 Name of band(s) for HiPS tree.
293 Maximum HEALPix order.
297 Coverage area in square degrees.
299 Initial HiPS RA position (degrees).
300 initial_dec : `float`
301 Initial HiPS Dec position (degrees).
302 initial_fov : `float`
303 Initial HiPS display size (degrees).
306 match self.config.data_bitpix:
320 match self.config.hips_bitpix:
334 date_iso8601 = datetime.utcnow().isoformat(timespec=
"seconds") +
"Z"
335 pixel_scale = hpg.nside_to_resolution(2 ** (max_order + shift_order), units=
"degrees")
337 uri = self.hips_base_path.join(
"properties")
338 with ResourcePath.temporary_uri(suffix=uri.getExtension())
as temporary_uri:
339 with open(temporary_uri.ospath,
"w")
as fh:
343 properties_config.creator_did_template.format(band=band),
345 if properties_config.obs_collection
is not None:
346 _write_property(fh,
"obs_collection", properties_config.obs_collection)
350 properties_config.obs_title_template.format(band=band),
352 if properties_config.obs_description_template
is not None:
356 properties_config.obs_description_template.format(band=band),
358 if len(properties_config.prov_progenitor) > 0:
359 for prov_progenitor
in properties_config.prov_progenitor:
360 _write_property(fh,
"prov_progenitor", prov_progenitor)
361 if properties_config.obs_ack
is not None:
362 _write_property(fh,
"obs_ack", properties_config.obs_ack)
363 _write_property(fh,
"obs_regime",
"Optical")
364 _write_property(fh,
"data_pixel_bitpix", str(bitpix))
365 _write_property(fh,
"dataproduct_type",
"image")
366 _write_property(fh,
"moc_sky_fraction", str(area / 41253.0))
367 _write_property(fh,
"data_ucd",
"phot.flux")
368 _write_property(fh,
"hips_creation_date", date_iso8601)
369 _write_property(fh,
"hips_builder",
"lsst.pipe.tasks.hips.GenerateHipsTask")
370 _write_property(fh,
"hips_creator",
"Vera C. Rubin Observatory")
371 _write_property(fh,
"hips_version",
"1.4")
372 _write_property(fh,
"hips_release_date", date_iso8601)
373 _write_property(fh,
"hips_frame",
"equatorial")
374 _write_property(fh,
"hips_order", str(max_order))
375 _write_property(fh,
"hips_tile_width", str(2**shift_order))
376 _write_property(fh,
"hips_status",
"private master clonableOnce")
377 _write_property(fh,
"hips_tile_format", self.config.file_extension)
378 _write_property(fh,
"dataproduct_subtype",
"color")
379 _write_property(fh,
"hips_pixel_bitpix", str(hbitpix))
380 _write_property(fh,
"hips_pixel_scale", str(pixel_scale))
381 _write_property(fh,
"hips_initial_ra", str(initial_ra))
382 _write_property(fh,
"hips_initial_dec", str(initial_dec))
383 _write_property(fh,
"hips_initial_fov", str(initial_fov))
384 if self.config.bluest_band
in properties_config.spectral_ranges:
385 em_min = properties_config.spectral_ranges[self.config.bluest_band].lambda_min / 1e9
387 self.log.warning(
"blue band %s not in self.config.spectral_ranges.", band)
389 if self.config.reddest_band
in properties_config.spectral_ranges:
390 em_max = properties_config.spectral_ranges[self.config.reddest_band].lambda_max / 1e9
392 self.log.warning(
"red band %s not in self.config.spectral_ranges.", band)
394 _write_property(fh,
"em_min", str(em_min))
395 _write_property(fh,
"em_max", str(em_max))
396 if properties_config.t_min
is not None:
397 _write_property(fh,
"t_min", properties_config.t_min)
398 if properties_config.t_max
is not None:
399 _write_property(fh,
"t_max", properties_config.t_max)
401 uri.transfer_from(temporary_uri, transfer=
"copy", overwrite=
True)
403 def _write_hips_moc_file(self, max_order, pixels, min_uniq_order=1):
404 """Write HiPS MOC file.
409 Maximum HEALPix order.
410 pixels : `np.ndarray`
411 Array of pixels covered.
412 min_uniq_order : `int`, optional
413 Minimum HEALPix order for looking for fully covered pixels.
421 hspmap = hsp.HealSparseMap.make_empty(2**min_uniq_order, 2**max_order, dtype=np.int8)
424 uri = self.hips_base_path.join(
"Moc.fits")
425 with ResourcePath.temporary_uri(suffix=uri.getExtension())
as temporary_uri:
426 hspmap.write_moc(temporary_uri.ospath)
427 uri.transfer_from(temporary_uri, transfer=
"copy", overwrite=
True)
429 def _write_allsky_file(self, allsky_order):
430 """Write an Allsky.png file.
435 HEALPix order of the minimum order to make allsky file.
437 tile_size = self.config.allsky_tilesize
449 n_tiles = hpg.nside_to_npixel(hpg.order_to_nside(allsky_order))
450 n_tiles_wide = int(np.floor(np.sqrt(n_tiles)))
451 n_tiles_high = int(np.ceil(n_tiles / n_tiles_wide))
455 allsky_order_uri = self.hips_base_path.join(f
"Norder{allsky_order}", forceDirectory=
True)
456 if self.config.file_extension ==
"png":
457 pixel_regex = re.compile(
r"Npix([0-9]+)\.png$")
458 elif self.config.file_extension ==
"webp":
459 pixel_regex = re.compile(
r"Npix([0-9]+)\.webp$")
461 raise RuntimeError(
"Unknown file extension")
464 ResourcePath.findFileResources(
465 candidates=[allsky_order_uri],
466 file_filter=pixel_regex,
470 for image_uri
in image_uris:
471 matches = re.match(pixel_regex, image_uri.basename())
472 pix_num = int(matches.group(1))
473 tile_image = Image.open(io.BytesIO(image_uri.read()))
474 row = math.floor(pix_num // n_tiles_wide)
475 column = pix_num % n_tiles_wide
476 box = (column * tile_size, row * tile_size, (column + 1) * tile_size, (row + 1) * tile_size)
477 tile_image_shrunk = tile_image.resize((tile_size, tile_size))
479 if allsky_image
is None:
480 allsky_image = Image.new(
482 (n_tiles_wide * tile_size, n_tiles_high * tile_size),
484 allsky_image.paste(tile_image_shrunk, box)
486 uri = allsky_order_uri.join(f
"Allsky.{self.config.file_extension}")
488 with ResourcePath.temporary_uri(suffix=uri.getExtension())
as temporary_uri:
489 allsky_image.save(temporary_uri.ospath)
491 uri.transfer_from(temporary_uri, transfer=
"copy", overwrite=
True)