Coverage for python / lsst / pipe / tasks / split_primary.py: 48%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:40 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("SplitPrimaryTask",) 

25 

26import dataclasses 

27from typing import ClassVar 

28 

29import numpy as np 

30import astropy.table 

31 

32from lsst.pex.config import Field, ListField 

33from lsst.pipe.base import ( 

34 PipelineTask, 

35 PipelineTaskConnections, 

36 PipelineTaskConfig, 

37 Struct, 

38) 

39import lsst.pipe.base.connectionTypes as cT 

40 

41 

42class SplitPrimaryConnections(PipelineTaskConnections, dimensions=[]): 

43 """Connections for SplitPrimaryTask. 

44 

45 Notes 

46 ----- 

47 Task and connection dimensions are set by the 

48 `SplitPrimaryConfig.dimensions` field. 

49 

50 As this task is expected to be configured to run in several different 

51 pipeline contexts, all connection dataset type names should be explicitly 

52 configured in each; the default values are just placeholders. 

53 """ 

54 

55 full = cT.Input( 

56 "full", 

57 storageClass="ArrowAstropy", 

58 dimensions=[], 

59 doc=( 

60 "Input table with both primary and non-primary objects/sources " 

61 "and a column that distinguishes between them." 

62 ), 

63 ) 

64 

65 primary = cT.Output( 

66 "primary", 

67 storageClass="ArrowAstropy", 

68 dimensions=[], 

69 doc="Output table holding only primary objects/sources.", 

70 ) 

71 

72 nonprimary = cT.Output( 

73 "nonprimary", 

74 storageClass="ArrowAstropy", 

75 dimensions=[], 

76 doc="Output table holding only nonprimary objects/sources.", 

77 ) 

78 

79 def __init__(self, *, config: SplitPrimaryConfig = None): 

80 self.dimensions = set(config.dimensions) 

81 self.full = dataclasses.replace(self.full, dimensions=set(config.dimensions)) 

82 self.primary = dataclasses.replace(self.primary, dimensions=set(config.dimensions)) 

83 self.nonprimary = dataclasses.replace(self.nonprimary, dimensions=set(config.dimensions)) 

84 

85 

86class SplitPrimaryConfig( 

87 PipelineTaskConfig, pipelineConnections=SplitPrimaryConnections 

88): 

89 dimensions = ListField[str]( 

90 "Dimensions of this task and its inputs and outputs.", 

91 dtype=str, 

92 default=[], 

93 ) 

94 primary_flag_column = Field[str]( 

95 "Name of the column that distinguishes between primary (True) " 

96 "and non-primary (False) in the input catalog.", 

97 dtype=str, 

98 default="detect_isPrimary", 

99 ) 

100 discard_primary_columns = ListField[str]( 

101 "Additional columns to discard from the primary-only table (in addition to primary_flag_column). " 

102 "Configured columns that are not present in the input table are ignored.", 

103 dtype=str, 

104 default=[ 

105 "detect_isPatchInner", 

106 "detect_isTractInner", 

107 "detect_isDeblendedSource", 

108 "sky_object", 

109 "merge_peak_sky", 

110 ], 

111 ) 

112 discard_nonprimary_columns = ListField[str]( 

113 "Additional columns to drop from the nonprimary-only table (in addition to primary_flag_column). " 

114 "Configured columns that are not present in the input table are ignored.", 

115 dtype=str, 

116 default=[], 

117 ) 

118 

119 

120class SplitPrimaryTask(PipelineTask): 

121 """A task that splits its input table into "primary" and "nonprimary" 

122 row-subset tables based on the value of a boolean column, dropping that 

123 column and optionally others from the two outputs. 

124 """ 

125 

126 ConfigClass: ClassVar[type[PipelineTaskConfig]] = SplitPrimaryConfig 

127 

128 _DefaultName: ClassVar[str] = "splitPrimary" 

129 

130 def run(self, *, full: astropy.table.Table) -> Struct: # type: ignore 

131 """Run the task. 

132 

133 Parameters 

134 ---------- 

135 full : `astropy.table.Table` 

136 Table to split into row subsets. 

137 

138 Returns 

139 ------- 

140 result : `lsst.pipe.base.Struct` 

141 Structure with two attributes: 

142 

143 - ``primary`` (`astropy.table.Table`) table with rows where the 

144 `SplitPrimaryConfig.primary_flag_column` is `True`. 

145 

146 - ``nonprimary`` (`astropy.table.Table`) table with rows where the 

147 `SplitPrimaryConfig.primary_flag_column` is `False`. 

148 """ 

149 primary_mask = full[self.config.primary_flag_column] 

150 primary = full[primary_mask] 

151 del primary[self.config.primary_flag_column] 

152 for name in self.config.discard_primary_columns: 

153 if name in primary.colnames: 

154 del primary[name] 

155 nonprimary = full[np.logical_not(primary_mask)] 

156 del nonprimary[self.config.primary_flag_column] 

157 for name in self.config.discard_nonprimary_columns: 

158 if name in nonprimary.colnames: 

159 del nonprimary[name] 

160 self.log.info( 

161 "Split %s rows into %s primary rows and %s nonprimary rows.", 

162 len(full), 

163 len(primary), 

164 len(nonprimary), 

165 ) 

166 return Struct(primary=primary, nonprimary=nonprimary)