Coverage for python / lsst / drp / tasks / compute_object_epochs.py: 32%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:19 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import numpy as np 

23from astropy.table import Table 

24 

25import lsst.pex.config as pexConfig 

26import lsst.pipe.base as pipeBase 

27 

28 

29class ComputeObjectEpochsConnections( 

30 pipeBase.PipelineTaskConnections, 

31 dimensions=("tract", "skymap", "patch"), 

32): 

33 objectCat = pipeBase.connectionTypes.Input( 

34 doc="Multiband catalog of positions in each patch.", 

35 name="deepCoadd_obj", 

36 storageClass="ArrowAstropy", 

37 dimensions=["skymap", "tract", "patch"], 

38 deferLoad=True, 

39 ) 

40 

41 epochMap = pipeBase.connectionTypes.Input( 

42 doc="Healsparse map of mean epoch of objectCat in each band.", 

43 name="deepCoadd_epoch_map_mean", 

44 storageClass="HealSparseMap", 

45 dimensions=("skymap", "tract", "band"), 

46 deferLoad=True, 

47 multiple=True, 

48 ) 

49 

50 objectEpochs = pipeBase.connectionTypes.Output( 

51 doc="Catalog of epochs for objectCat objects.", 

52 name="object_epoch", 

53 storageClass="ArrowAstropy", 

54 dimensions=["skymap", "tract", "patch"], 

55 ) 

56 

57 

58class ComputeObjectEpochsConfig( 

59 pipeBase.PipelineTaskConfig, 

60 pipelineConnections=ComputeObjectEpochsConnections, 

61): 

62 bands = pexConfig.ListField( 

63 doc="Bands to create mean epoch columns for", 

64 dtype=str, 

65 default=["u", "g", "r", "i", "z", "y"], 

66 ) 

67 

68 

69class ComputeObjectEpochsTask(pipeBase.PipelineTask): 

70 """Collect mean epochs for the observations that went into each object. 

71 

72 TODO: DM-46202, Remove this task once the object epochs are available 

73 elsewhere. 

74 """ 

75 

76 ConfigClass = ComputeObjectEpochsConfig 

77 _DefaultName = "computeObjectEpochs" 

78 

79 def computeEpochs(self, cat, epochMapDict): 

80 """Compute the mean epoch of the visits at each object centroid. 

81 

82 Parameters 

83 ---------- 

84 cat : `astropy.table.Table` 

85 Catalog containing object positions. 

86 epochMapDict: `dict` [`str`, `DeferredDatasetHandle`] 

87 Dictionary of handles per band for healsparse maps containing 

88 the mean epoch for positions in the reference catalog. 

89 

90 Returns 

91 ------- 

92 epochTable = `astropy.table.Table` 

93 Catalog with mean epoch of visits at each object position. 

94 """ 

95 # The primary key should probably stay id and be standardized in the 

96 # object table later, but this key was used originally and it would 

97 # be too disruptive to change now. 

98 allEpochs = {"objectId": cat["id"]} 

99 for band in self.config.bands: 

100 epochs = np.ones(len(cat)) * np.nan 

101 col_ra, col_dec = (str(("meas", band, f"coord_{coord}")) for coord in ("ra", "dec")) 

102 if col_ra in cat.columns and col_dec in cat.columns: 

103 ra, dec = cat[col_ra], cat[col_dec] 

104 validPositions = np.isfinite(ra) & np.isfinite(dec) 

105 if validPositions.any(): 

106 ra, dec = (x[validPositions] * (180.0 / np.pi) for x in (ra, dec)) 

107 epochMap = epochMapDict[band].get() 

108 bandEpochs = epochMap.get_values_pos(ra, dec) 

109 epochsValid = epochMap.get_values_pos(ra, dec, valid_mask=True) 

110 bandEpochs[~epochsValid] = np.nan 

111 epochs[validPositions] = bandEpochs 

112 del epochMap 

113 allEpochs[f"{band}_epoch"] = epochs 

114 

115 epochTable = Table(allEpochs) 

116 return epochTable 

117 

118 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

119 inputs = butlerQC.get(inputRefs) 

120 

121 inputs["epochMap"] = {ref.dataId["band"]: ref for ref in inputs["epochMap"]} 

122 

123 objectCatRef = inputs["objectCat"] 

124 columns_avail = objectCatRef.get(component="columns") 

125 columns = [ 

126 column 

127 for band in self.config.bands 

128 for coord in ["ra", "dec"] 

129 if str(column := ("meas", band, f"coord_{coord}")) in columns_avail 

130 ] 

131 objectCat = objectCatRef.get(parameters={"columns": columns}) 

132 epochs = self.computeEpochs(objectCat, inputs["epochMap"]) 

133 butlerQC.put(epochs, outputRefs.objectEpochs)