Coverage for python / lsst / images / cells / _provenance.py: 25%
141 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:34 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("CoaddProvenance", "CoaddProvenanceSerializationModel")
16from collections.abc import Iterable
17from typing import TYPE_CHECKING, Any, ClassVar
19import astropy.table
20import astropy.units as u
21import numpy as np
22import pydantic
24from .._cell_grid import CellIJ
25from .._polygon import Polygon
26from ..serialization import ArchiveTree, InputArchive, OutputArchive, TableModel
28if TYPE_CHECKING:
29 try:
30 from lsst.cell_coadds import CoaddInputs as LegacyCoaddInputs
31 from lsst.cell_coadds import MultipleCellCoadd, ObservationIdentifiers
32 from lsst.skymap import Index2D
33 except ImportError:
34 type Index2D = Any # type: ignore[no-redef]
35 type LegacyCoaddInputs = Any # type: ignore[no-redef]
36 type MultipleCellCoadd = Any # type: ignore[no-redef]
37 type ObservationIdentifiers = Any # type: ignore[no-redef]
40class CoaddProvenance:
41 """A pair of tables that record the inputs to a cell-based coadd.
43 Parameters
44 ----------
45 inputs
46 A table of {visit, detector} combinations that contribute to any cell
47 in the coadd.
48 contributions
49 A table of {visit, detector, cell} combinations that describe how an
50 observation contributed to a cell.
52 Notes
53 -----
54 This object can represent the provenance of a whole patch, a single cell,
55 or anything in between. In the single-cell case, the ``inputs`` and
56 ``contributions`` tables have the same number of rows (but may not be
57 ordered the same way!).
58 """
60 def __init__(self, inputs: astropy.table.Table, contributions: astropy.table.Table):
61 self._inputs = inputs
62 self._contributions = contributions
64 _INPUT_TABLE_COLUMNS: ClassVar[list[tuple[str, type, str]]] = [
65 ("instrument", np.object_, "Name of the instrument."),
66 ("visit", np.uint64, "ID of the visit."),
67 ("detector", np.uint16, "ID of the detector."),
68 ("physical_filter", np.object_, "Full name of the bandpass filter."),
69 ("day_obs", np.uint32, "Observation night as a YYYYMMDD integer."),
70 (
71 "polygon",
72 np.object_,
73 (
74 "Polygon that approximates the overlap of the observation and the coadd patch, "
75 "in coadd coordinates."
76 ),
77 ),
78 ]
80 _CONTRIBUTION_TABLE_COLUMNS: ClassVar[list[tuple[str, type, str, u.UnitBase | None]]] = [
81 ("cell_i", np.uint16, "Y-axis index of the cell within the patch.", None),
82 ("cell_j", np.uint16, "X-axis index of the cell within the patch.", None),
83 ("instrument", np.object_, "Name of the instrument.", None),
84 ("visit", np.uint64, "ID of the visit.", None),
85 ("detector", np.uint16, "ID of the detector.", None),
86 ("overlaps_center", np.bool_, "Whether a this observation overlaps the center of the cell.", None),
87 ("overlap_fraction", np.float64, "Fraction of the cell that is covered by the overlap region.", None),
88 ("weight", np.float64, "Weight to be used for this input in this cell.", None),
89 ("psf_shape_xx", np.float64, "Second order moments of the PSF.", u.pix**2),
90 ("psf_shape_yy", np.float64, "Second order moments of the PSF.", u.pix**2),
91 ("psf_shape_xy", np.float64, "Second order moments of the PSF.", u.pix**2),
92 (
93 "psf_shape_flag",
94 np.bool_,
95 "Flag indicating whether the PSF shape measurement was successful.",
96 None,
97 ),
98 ]
100 @classmethod
101 def make_empty_input_table(cls, n_rows: int) -> astropy.table.Table:
102 """Make an empty `inputs` table with a set number of rows."""
103 return astropy.table.Table(
104 [
105 astropy.table.Column(name=name, length=n_rows, dtype=dtype, description=description)
106 for name, dtype, description in cls._INPUT_TABLE_COLUMNS
107 ]
108 )
110 @classmethod
111 def make_empty_contribution_table(cls, n_rows: int) -> astropy.table.Table:
112 """Make an empty `contributions` table with a set number of rows."""
113 return astropy.table.Table(
114 [
115 astropy.table.Column(
116 name=name, length=n_rows, dtype=dtype, description=description, unit=unit
117 )
118 for name, dtype, description, unit in cls._CONTRIBUTION_TABLE_COLUMNS
119 ]
120 )
122 @property
123 def inputs(self) -> astropy.table.Table:
124 """A table of {visit, detector} combinations that contribute to any
125 cell in the coadd.
126 """
127 return self._inputs
129 @property
130 def contributions(self) -> astropy.table.Table:
131 """A table of {visit, detector, cell} combinations that describe how an
132 observation contributed to a cell.
133 """
134 return self._contributions
136 def __getitem__(self, cell: CellIJ) -> CoaddProvenance:
137 return self.subset([cell])
139 def subset(self, cells: Iterable[CellIJ]) -> CoaddProvenance:
140 """Return a new provenance object with just the given cells."""
141 cells_to_keep = astropy.table.Table(
142 rows=[(index.i, index.j) for index in cells],
143 names=["cell_i", "cell_j"],
144 dtype=[np.uint16, np.uint16],
145 )
146 contributions = astropy.table.join(self._contributions, cells_to_keep)
147 assert contributions.columns.keys() == {name for name, _, _, _ in self._CONTRIBUTION_TABLE_COLUMNS}
148 inputs = astropy.table.join(contributions["instrument", "visit", "detector"], self._inputs)
149 assert inputs.columns.keys() == {name for name, _, _ in self._INPUT_TABLE_COLUMNS}
150 return CoaddProvenance(inputs=inputs, contributions=contributions)
152 def serialize(self, archive: OutputArchive[Any]) -> CoaddProvenanceSerializationModel:
153 """Serialize the provenance to an output archive.
155 Parameters
156 ----------
157 archive
158 Archive to write to.
159 """
160 inputs = self._inputs.copy(copy_data=False)
161 contributions = self._contributions.copy(copy_data=False)
162 instrument = CoaddProvenanceSerializationModel._fix_str_for_serialization(
163 "instrument", inputs, contributions
164 )
165 physical_filter = CoaddProvenanceSerializationModel._fix_str_for_serialization(
166 "physical_filter", inputs
167 )
168 CoaddProvenanceSerializationModel._fix_polygon_for_serialization(inputs)
169 inputs_model = archive.add_table(inputs, name="inputs")
170 contributions_model = archive.add_table(contributions, name="contributions")
171 return CoaddProvenanceSerializationModel(
172 instrument=instrument,
173 physical_filter=physical_filter,
174 inputs=inputs_model,
175 contributions=contributions_model,
176 )
178 @classmethod
179 def deserialize(
180 cls,
181 model: CoaddProvenanceSerializationModel,
182 archive: InputArchive[Any],
183 ) -> CoaddProvenance:
184 """Deserialize a provenance from an input archive.
186 Parameters
187 ----------
188 model
189 A Pydantic model representation of the image, holding references
190 to data stored in the archive.
191 archive
192 Archive to read from.
194 Notes
195 -----
196 While `CoaddProvenance.subset` can be used to filter provenance
197 information down to just certain cells, there is no advantage to be
198 had from doing this during deserialization (the table data is not
199 ordered by cell, and hence there's read-slicing we can do).
200 """
201 inputs = archive.get_table(model.inputs)
202 contributions = archive.get_table(model.contributions)
203 CoaddProvenanceSerializationModel._fix_str_for_deserialization(
204 "instrument", model.instrument, inputs, contributions
205 )
206 CoaddProvenanceSerializationModel._fix_str_for_deserialization(
207 "physical_filter", model.physical_filter, inputs
208 )
209 CoaddProvenanceSerializationModel._fix_polygon_for_deserialization(inputs)
210 for name, _, description in cls._INPUT_TABLE_COLUMNS:
211 inputs.columns[name].description = description
212 for name, _, description, unit in cls._CONTRIBUTION_TABLE_COLUMNS:
213 contributions.columns[name].description = description
214 contributions.columns[name].unit = unit
215 return cls(inputs=inputs, contributions=contributions)
217 @staticmethod
218 def from_legacy(legacy_cell_coadd: MultipleCellCoadd) -> CoaddProvenance:
219 """Extract provenance from a legacy
220 `lsst.cell_coadds.MultipleCellCoadd` object.
221 """
222 inputs = CoaddProvenance.make_empty_input_table(len(legacy_cell_coadd.common.visit_polygons))
223 for n, (legacy_identifiers, legacy_polygon) in enumerate(
224 legacy_cell_coadd.common.visit_polygons.items()
225 ):
226 inputs["instrument"][n] = legacy_identifiers.instrument
227 inputs["visit"][n] = legacy_identifiers.visit
228 inputs["detector"][n] = legacy_identifiers.detector
229 inputs["physical_filter"][n] = legacy_identifiers.physical_filter
230 inputs["day_obs"][n] = legacy_identifiers.day_obs
231 inputs["polygon"][n] = Polygon.from_legacy(legacy_polygon)
232 n_contributions = 0
233 for legacy_cell in legacy_cell_coadd.cells.values():
234 n_contributions += len(legacy_cell.inputs)
235 contributions = CoaddProvenance.make_empty_contribution_table(n_contributions)
236 n = 0
237 for legacy_cell in legacy_cell_coadd.cells.values():
238 for legacy_identifiers, legacy_inputs in legacy_cell.inputs.items():
239 contributions["cell_i"][n] = legacy_cell.identifiers.cell.y
240 contributions["cell_j"][n] = legacy_cell.identifiers.cell.x
241 contributions["instrument"][n] = legacy_identifiers.instrument
242 contributions["visit"][n] = legacy_identifiers.visit
243 contributions["detector"][n] = legacy_identifiers.detector
244 contributions["overlaps_center"][n] = legacy_inputs.overlaps_center
245 contributions["overlap_fraction"][n] = legacy_inputs.overlap_fraction
246 contributions["weight"][n] = legacy_inputs.weight
247 contributions["psf_shape_xx"][n] = legacy_inputs.psf_shape.getIxx()
248 contributions["psf_shape_yy"][n] = legacy_inputs.psf_shape.getIyy()
249 contributions["psf_shape_xy"][n] = legacy_inputs.psf_shape.getIxy()
250 contributions["psf_shape_flag"][n] = legacy_inputs.psf_shape_flag
251 n += 1
252 return CoaddProvenance(inputs=inputs, contributions=contributions)
255class CoaddProvenanceSerializationModel(ArchiveTree):
256 """A Pydantic model used to represent a serialized `CoaddProvenance`.
258 Notes
259 -----
260 We can't rewrite the Astropy tables directly into the archive (e.g. as
261 FITS binary tables for a FITS archive), because:
263 - `str` columns are a huge pain in both Numpy and FITS;
264 - the polygon columns need to be rewritten as array-valued columns.
266 To deal with the string columns (``instrument`` and ``physical_filter``)
267 we do dictionary compression: we map each distinct value of those columns
268 to an integer, and then we save that mapping to the model while saving
269 an integer version of that column in the table. But if there is actually
270 only one value in that column (the most common case by far) we just drop
271 the column and store that value directly in the model.
272 """
274 instrument: str | dict[str, int] = pydantic.Field(
275 description=(
276 "Instrument name for all inputs to this coadd, or a mapping from "
277 "instrument name to the integer used in its place in the tables."
278 )
279 )
280 physical_filter: str | dict[str, int] = pydantic.Field(
281 description="Physical filter name for all inputs to this coadd."
282 )
283 inputs: TableModel = pydantic.Field(description="Table of all inputs to the coadd.")
284 contributions: TableModel = pydantic.Field(description="Table of per-cell contributions to the coadd.")
286 @staticmethod
287 def _fix_str_for_serialization(column: str, *tables: astropy.table.Table) -> str | dict[str, int]:
288 """Rewrite a string column as an integer column or drop it.
290 Parameters
291 ----------
292 column
293 Name of the column to rewrite.
294 *tables
295 One or more astropy tables to rewrite. The first table is assumed
296 to have all values for this column that might appear in any other
297 tables.
299 Returns
300 -------
301 `str` | `dict` [`str`, `int`]
302 If there is only one unique value for this column in the first
303 table, that value (and the column will have been dropped from
304 all givne tables). If the tables are empty, the column is
305 dropped and an empty `dict` is returned. In all other cases the
306 given column is replaced with an integer column in all given
307 tables and the mapping from strings to integers is returned.
308 """
309 result: str | dict[str, int] = {name: n for n, name in enumerate(sorted(set(tables[0][column])))}
310 match len(result):
311 case 0:
312 pass
313 case 1:
314 (result,) = result.keys() # type: ignore[union-attr]
315 case _:
316 for table in tables:
317 table.columns[column] = astropy.table.Column(
318 data=[result[k] for k in table.columns[column]],
319 name=column,
320 dtype=np.uint8,
321 description=f"Integer mapped to {column} name.",
322 )
323 return result
324 # If we didn't remap to an integer (case 0 and 1 above), delete the
325 # column.
326 for table in tables:
327 del table.columns[column]
328 return result
330 @staticmethod
331 def _fix_str_for_deserialization(
332 column: str, value: str | dict[str, int], *tables: astropy.table.Table
333 ) -> None:
334 """Rewrite an integer column back to a string one.
336 Parameters
337 ----------
338 column
339 Name of the column to rewrite.
340 value
341 Value or mapping of values returned by
342 `_fix_str_for_serialization`.
343 tables
344 Tables to rewrite this column in.
345 """
346 match value:
347 case str():
348 for table in tables:
349 table.columns[column] = astropy.table.Column([value] * len(table), dtype=object)
350 case dict():
351 mapping = {v: k for k, v in value.items()}
352 for table in tables:
353 table.columns[column] = astropy.table.Column(
354 [mapping[k] for k in table[column]], dtype=object
355 )
357 @staticmethod
358 def _fix_polygon_for_serialization(inputs: astropy.table.Table) -> None:
359 """Rewrite a polygon `object` column as a pair of array-valued columns
360 and an array-size column.
362 Parameters
363 ----------
364 inputs
365 A copy of the in-memory coadd inputs table to modify in-place into
366 its serialization form.
367 """
368 max_n_vertices = max(p.n_vertices for p in inputs["polygon"])
369 inputs["n_vertices"] = astropy.table.Column(
370 [p.n_vertices for p in inputs["polygon"]],
371 name="n_vertices",
372 dtype=np.uint8,
373 description="Number of polygon vertices.",
374 )
375 inputs["x_vertices"] = astropy.table.Column(
376 name="x_vertices",
377 dtype=np.float64,
378 length=len(inputs),
379 shape=(max_n_vertices,),
380 description="X coordinates of polygon vertices, in tract coordinates.",
381 )
382 inputs["x_vertices"][:, :] = np.nan
383 inputs["y_vertices"] = astropy.table.Column(
384 name="y_vertices",
385 dtype=np.float64,
386 length=len(inputs),
387 shape=(max_n_vertices,),
388 description="Y coordinates of polygon vertices, in tract coordinates.",
389 )
390 inputs["y_vertices"][:, :] = np.nan
391 for i, polygon in enumerate(inputs["polygon"]):
392 inputs["n_vertices"][i] = polygon.n_vertices
393 inputs["x_vertices"][i][: polygon.n_vertices] = polygon.x_vertices
394 inputs["y_vertices"][i][: polygon.n_vertices] = polygon.y_vertices
395 del inputs["polygon"]
397 @staticmethod
398 def _fix_polygon_for_deserialization(inputs: astropy.table.Table) -> None:
399 """Rewrite a a pair of array-valued columns and an array-size column
400 into a polygon `object` column.
402 Parameters
403 ----------
404 inputs
405 The serialized version of the coadd inputs table, to be modified
406 in-place into its in-memory form.
407 """
408 polygons = [
409 Polygon(x_vertices=x_vertices[:n_vertices], y_vertices=y_vertices[:n_vertices])
410 for n_vertices, x_vertices, y_vertices in zip(
411 inputs["n_vertices"], inputs["x_vertices"], inputs["y_vertices"]
412 )
413 ]
414 del inputs["n_vertices"]
415 del inputs["x_vertices"]
416 del inputs["y_vertices"]
417 inputs["polygon"] = astropy.table.Column(polygons, name="polygon", dtype=np.object_)