Coverage for tests / test_estimate_photoz_pipeline.py: 39%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:49 +0000
1# This file is part of meas_photoz_base
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import os
23import tempfile
24import unittest
25from typing import Any
27import lsst.meas.photoz.base.all_algos as all_algos
28from lsst.daf.butler import Butler, Config
29from lsst.daf.butler.tests import DatastoreMock
30from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
31from lsst.meas.photoz.base.estimate_photoz_task import EstimatePhotozConnections, photozAlgoRegistry
32from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester
34PIPELINES_DIR = os.path.join(os.path.dirname(__file__), "..", "pipelines")
35TEST_DIR = os.path.abspath(os.path.dirname(__file__))
36TEST_DATA_DIR = os.path.join(TEST_DIR, "data")
39class MeasPzPipelineTestCase(unittest.TestCase):
40 """Test the PZ pipeline plumbing for fully supported algorithms.
42 This uses the `PipelineStepTester` to test
43 a test pipeline define in tests/data/photoz_pipeline_hsc.yaml
45 This should include any algorithms that are
46 including in the rubin-env environment.
48 For now that is knn and trainz.
49 """
51 def setUp(self) -> None:
52 self.root = makeTestTempDir(TEST_DATA_DIR)
53 self.maxDiff = None
55 def tearDown(self) -> None:
56 removeTestTempDir(self.root)
58 def makeButler(self, **kwargs: Any) -> Butler:
59 """Return new Butler instance on each call."""
60 config = Config()
62 # make separate temporary directory for registry of this instance
63 tmpdir = tempfile.mkdtemp(dir=self.root)
64 config["registry", "db"] = f"sqlite:///{tmpdir}/gen3.sqlite3"
65 config = Butler.makeRepo(self.root, config)
66 butler = Butler.from_config(config, **kwargs)
67 DatastoreMock.apply(butler)
68 return butler
70 def test_photoz_pipeline(self) -> None:
71 butler = self.makeButler(writeable=True)
73 expected_inputs = ["object"]
74 expected_outputs = []
75 inputs = [("object", {"skymap", "tract"}, "ArrowAstropy", False)]
76 names = list(photozAlgoRegistry.keys())
77 tasks = list(photozAlgoRegistry.values())
78 all_tasks = [x for x in all_algos.__all__ if x != "photozAlgoRegistry"]
79 assert len(names) == len(all_tasks)
80 assert set(tasks) == set([getattr(all_algos, attr) for attr in all_tasks])
82 for algo in names:
83 dataset = EstimatePhotozConnections.photoz_model.name.format(algo=algo)
84 expected_inputs.append(dataset)
85 expected_outputs.append(EstimatePhotozConnections.photoz_ensemble.name.format(algo=algo))
86 inputs.append((dataset, {"instrument"}, "PhotozModel", True))
88 tester = PipelineStepTester(
89 os.path.join(PIPELINES_DIR, "photoz.yaml"),
90 ["#photoz_all"],
91 inputs,
92 expected_inputs=set(expected_inputs),
93 expected_outputs=set(expected_outputs),
94 )
95 tester.run(butler, self)