Coverage for python / lsst / drp / tasks / compute_object_epochs.py: 32%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:50 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import numpy as np
23from astropy.table import Table
25import lsst.pex.config as pexConfig
26import lsst.pipe.base as pipeBase
29class ComputeObjectEpochsConnections(
30 pipeBase.PipelineTaskConnections,
31 dimensions=("tract", "skymap", "patch"),
32):
33 objectCat = pipeBase.connectionTypes.Input(
34 doc="Multiband catalog of positions in each patch.",
35 name="deepCoadd_obj",
36 storageClass="ArrowAstropy",
37 dimensions=["skymap", "tract", "patch"],
38 deferLoad=True,
39 )
41 epochMap = pipeBase.connectionTypes.Input(
42 doc="Healsparse map of mean epoch of objectCat in each band.",
43 name="deepCoadd_epoch_map_mean",
44 storageClass="HealSparseMap",
45 dimensions=("skymap", "tract", "band"),
46 deferLoad=True,
47 multiple=True,
48 )
50 objectEpochs = pipeBase.connectionTypes.Output(
51 doc="Catalog of epochs for objectCat objects.",
52 name="object_epoch",
53 storageClass="ArrowAstropy",
54 dimensions=["skymap", "tract", "patch"],
55 )
58class ComputeObjectEpochsConfig(
59 pipeBase.PipelineTaskConfig,
60 pipelineConnections=ComputeObjectEpochsConnections,
61):
62 bands = pexConfig.ListField(
63 doc="Bands to create mean epoch columns for",
64 dtype=str,
65 default=["u", "g", "r", "i", "z", "y"],
66 )
69class ComputeObjectEpochsTask(pipeBase.PipelineTask):
70 """Collect mean epochs for the observations that went into each object.
72 TODO: DM-46202, Remove this task once the object epochs are available
73 elsewhere.
74 """
76 ConfigClass = ComputeObjectEpochsConfig
77 _DefaultName = "computeObjectEpochs"
79 def computeEpochs(self, cat, epochMapDict):
80 """Compute the mean epoch of the visits at each object centroid.
82 Parameters
83 ----------
84 cat : `astropy.table.Table`
85 Catalog containing object positions.
86 epochMapDict: `dict` [`str`, `DeferredDatasetHandle`]
87 Dictionary of handles per band for healsparse maps containing
88 the mean epoch for positions in the reference catalog.
90 Returns
91 -------
92 epochTable = `astropy.table.Table`
93 Catalog with mean epoch of visits at each object position.
94 """
95 # The primary key should probably stay id and be standardized in the
96 # object table later, but this key was used originally and it would
97 # be too disruptive to change now.
98 allEpochs = {"objectId": cat["id"]}
99 for band in self.config.bands:
100 epochs = np.ones(len(cat)) * np.nan
101 col_ra, col_dec = (str(("meas", band, f"coord_{coord}")) for coord in ("ra", "dec"))
102 if col_ra in cat.columns and col_dec in cat.columns:
103 ra, dec = cat[col_ra], cat[col_dec]
104 validPositions = np.isfinite(ra) & np.isfinite(dec)
105 if validPositions.any():
106 ra, dec = (x[validPositions] * (180.0 / np.pi) for x in (ra, dec))
107 epochMap = epochMapDict[band].get()
108 bandEpochs = epochMap.get_values_pos(ra, dec)
109 epochsValid = epochMap.get_values_pos(ra, dec, valid_mask=True)
110 bandEpochs[~epochsValid] = np.nan
111 epochs[validPositions] = bandEpochs
112 del epochMap
113 allEpochs[f"{band}_epoch"] = epochs
115 epochTable = Table(allEpochs)
116 return epochTable
118 def runQuantum(self, butlerQC, inputRefs, outputRefs):
119 inputs = butlerQC.get(inputRefs)
121 inputs["epochMap"] = {ref.dataId["band"]: ref for ref in inputs["epochMap"]}
123 objectCatRef = inputs["objectCat"]
124 columns_avail = objectCatRef.get(component="columns")
125 columns = [
126 column
127 for band in self.config.bands
128 for coord in ["ra", "dec"]
129 if str(column := ("meas", band, f"coord_{coord}")) in columns_avail
130 ]
131 objectCat = objectCatRef.get(parameters={"columns": columns})
132 epochs = self.computeEpochs(objectCat, inputs["epochMap"])
133 butlerQC.put(epochs, outputRefs.objectEpochs)