Coverage for tests / test_estimate_photoz_pipeline.py: 39%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of meas_photoz_base 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import os 

23import tempfile 

24import unittest 

25from typing import Any 

26 

27import lsst.meas.photoz.base.all_algos as all_algos 

28from lsst.daf.butler import Butler, Config 

29from lsst.daf.butler.tests import DatastoreMock 

30from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

31from lsst.meas.photoz.base.estimate_photoz_task import EstimatePhotozConnections, photozAlgoRegistry 

32from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester 

33 

34PIPELINES_DIR = os.path.join(os.path.dirname(__file__), "..", "pipelines") 

35TEST_DIR = os.path.abspath(os.path.dirname(__file__)) 

36TEST_DATA_DIR = os.path.join(TEST_DIR, "data") 

37 

38 

39class MeasPzPipelineTestCase(unittest.TestCase): 

40 """Test the PZ pipeline plumbing for fully supported algorithms. 

41 

42 This uses the `PipelineStepTester` to test 

43 a test pipeline define in tests/data/photoz_pipeline_hsc.yaml 

44 

45 This should include any algorithms that are 

46 including in the rubin-env environment. 

47 

48 For now that is knn and trainz. 

49 """ 

50 

51 def setUp(self) -> None: 

52 self.root = makeTestTempDir(TEST_DATA_DIR) 

53 self.maxDiff = None 

54 

55 def tearDown(self) -> None: 

56 removeTestTempDir(self.root) 

57 

58 def makeButler(self, **kwargs: Any) -> Butler: 

59 """Return new Butler instance on each call.""" 

60 config = Config() 

61 

62 # make separate temporary directory for registry of this instance 

63 tmpdir = tempfile.mkdtemp(dir=self.root) 

64 config["registry", "db"] = f"sqlite:///{tmpdir}/gen3.sqlite3" 

65 config = Butler.makeRepo(self.root, config) 

66 butler = Butler.from_config(config, **kwargs) 

67 DatastoreMock.apply(butler) 

68 return butler 

69 

70 def test_photoz_pipeline(self) -> None: 

71 butler = self.makeButler(writeable=True) 

72 

73 expected_inputs = ["object"] 

74 expected_outputs = [] 

75 inputs = [("object", {"skymap", "tract"}, "ArrowAstropy", False)] 

76 names = list(photozAlgoRegistry.keys()) 

77 tasks = list(photozAlgoRegistry.values()) 

78 all_tasks = [x for x in all_algos.__all__ if x != "photozAlgoRegistry"] 

79 assert len(names) == len(all_tasks) 

80 assert set(tasks) == set([getattr(all_algos, attr) for attr in all_tasks]) 

81 

82 for algo in names: 

83 dataset = EstimatePhotozConnections.photoz_model.name.format(algo=algo) 

84 expected_inputs.append(dataset) 

85 expected_outputs.append(EstimatePhotozConnections.photoz_ensemble.name.format(algo=algo)) 

86 inputs.append((dataset, {"instrument"}, "PhotozModel", True)) 

87 

88 tester = PipelineStepTester( 

89 os.path.join(PIPELINES_DIR, "photoz.yaml"), 

90 ["#photoz_all"], 

91 inputs, 

92 expected_inputs=set(expected_inputs), 

93 expected_outputs=set(expected_outputs), 

94 ) 

95 tester.run(butler, self)