Coverage for python / lsst / images / fields / _sum.py: 35%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:34 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("SumField", "SumFieldSerializationModel") 

15 

16from collections.abc import Iterable 

17from typing import TYPE_CHECKING, Any, Literal, final 

18 

19import astropy.units 

20import numpy as np 

21import pydantic 

22 

23from .._geom import Bounds, Box 

24from .._image import Image 

25from ..serialization import ArchiveTree, InputArchive, OutputArchive 

26from ._base import BaseField 

27 

28if TYPE_CHECKING: 

29 try: 

30 from lsst.afw.math import BackgroundList as LegacyBackgroundList 

31 except ImportError: 

32 type LegacyBackgroundList = Any # type: ignore[no-redef] 

33 

34 from ._concrete import Field, FieldSerializationModel 

35 

36 

37@final 

38class SumField(BaseField): 

39 """A field that sums other fields lazily. 

40 

41 Parameters 

42 ---------- 

43 operands : `~collections.abc.Iterable` [ `BaseField` ] 

44 The fields to sum together. 

45 """ 

46 

47 def __init__(self, operands: Iterable[Field]): 

48 self._operands = tuple(operands) 

49 if not self._operands: 

50 raise ValueError("At least one operand must be provided.") 

51 iterator = iter(self._operands) 

52 first = next(iterator) 

53 self._bounds = first.bounds 

54 self._unit = first.unit 

55 for operand in iterator: 

56 self._bounds = self._bounds.intersection(operand.bounds) 

57 if operand.unit is None: 

58 if self._unit is not None: 

59 raise astropy.units.UnitConversionError( 

60 "Cannot add a field with no units to a field with units." 

61 ) 

62 elif self._unit is None: 

63 raise astropy.units.UnitConversionError( 

64 "Cannot add a field with units to a field with no units." 

65 ) 

66 else: 

67 # Raise if these units are not sum-compatible. 

68 self._unit.to(operand.unit) 

69 

70 @property 

71 def bounds(self) -> Bounds: 

72 return self._bounds 

73 

74 @property 

75 def unit(self) -> astropy.units.UnitBase | None: 

76 return self._unit 

77 

78 @property 

79 def operands(self) -> tuple[Field, ...]: 

80 """The fields that are summed together (`tuple` [`BaseField`, ...]).""" 

81 return self._operands 

82 

83 def evaluate( 

84 self, *, x: np.ndarray, y: np.ndarray, quantity: bool = False 

85 ) -> np.ndarray | astropy.units.Quantity: 

86 iterator = iter(self._operands) 

87 first = next(iterator) 

88 # We have to add quantities if this is a unit-aware field, as the 

89 # terms in the sum might have different-but-compatible units. 

90 result = first(x=x, y=y, quantity=(self.unit is not None)) 

91 for operand in iterator: 

92 result += operand(x=x, y=y, quantity=(self.unit is not None)) 

93 if self.unit is not None and not quantity: 

94 # Caller doesn't want a Quantity back. 

95 assert isinstance(result, astropy.units.Quantity) 

96 return result.to_value(self.unit) 

97 if self.unit is None and quantity: 

98 # Caller wants a Quantity back even though there's no units. 

99 return astropy.units.Quantity(result) 

100 return result 

101 

102 def render(self, bbox: Box | None = None, *, dtype: np.typing.DTypeLike | None = None) -> Image: 

103 if bbox is None: 

104 bbox = self.bounds.bbox 

105 result = Image(0.0, bbox=bbox, dtype=dtype, unit=self.unit) 

106 for operand in self._operands: 

107 result.quantity += operand.render(bbox, dtype=dtype).quantity 

108 return result 

109 

110 def multiply_constant(self, factor: float | astropy.units.Quantity | astropy.units.UnitBase) -> SumField: 

111 return SumField([operand * factor for operand in self._operands]) 

112 

113 def serialize(self, archive: OutputArchive[Any]) -> SumFieldSerializationModel: 

114 """Serialize the field to an output archive.""" 

115 return SumFieldSerializationModel(operands=[operand.serialize(archive) for operand in self._operands]) 

116 

117 @staticmethod 

118 def deserialize(model: SumFieldSerializationModel, archive: InputArchive[Any]) -> SumField: 

119 """Deserialize the field from an input archive.""" 

120 from ._concrete import deserialize_field 

121 

122 return SumField([deserialize_field(operand, archive) for operand in model.operands]) 

123 

124 @staticmethod 

125 def _get_archive_tree_type( 

126 pointer_type: type[Any], 

127 ) -> type[SumFieldSerializationModel]: 

128 """Return the serialization model type for this object for an archive 

129 type that uses the given pointer type. 

130 """ 

131 return SumFieldSerializationModel 

132 

133 @staticmethod 

134 def from_legacy_background( 

135 legacy_background: LegacyBackgroundList, 

136 unit: astropy.units.UnitBase | None = None, 

137 ) -> SumField: 

138 """Convert from a legacy `lsst.afw.math.BackgroundList` instance.""" 

139 from ._concrete import field_from_legacy_background 

140 

141 return SumField([field_from_legacy_background(b, unit) for b, *_ in legacy_background]) 

142 

143 

144class SumFieldSerializationModel(ArchiveTree): 

145 """Serialization model for `SumField`.""" 

146 

147 operands: list[FieldSerializationModel] = pydantic.Field(default_factory=list) 

148 

149 field_type: Literal["SUM"] = "SUM" 

150 

151 def finish_deserialize(self, archive: InputArchive) -> SumField: 

152 return SumField.deserialize(self, archive)