Coverage for tests / test_getRegionTimeFromVisit.py: 30%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:04 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23import tempfile 

24 

25import astropy.time 

26 

27from lsst.afw.table import SourceCatalog 

28import lsst.daf.butler 

29import lsst.daf.butler.tests as butlerTests 

30import lsst.pipe.base.testUtils as pipeTests 

31from lsst.pipe.tasks.getRegionTimeFromVisit import GetRegionTimeFromVisitTask 

32from lsst.sphgeom import ConvexPolygon, UnitVector3d 

33import lsst.utils.tests 

34 

35 

36class GetRegionTimeFromVisitTests(lsst.utils.tests.TestCase): 

37 def setUp(self): 

38 instrument = "NotACam" 

39 detector = 42 

40 group = "groupy" 

41 exposure = 1011 

42 filter = "k2024" 

43 # Coordinates taken from LATISS exposure 2024040800445 

44 day_obs = 20240408 

45 ra = 122.47171635551595 

46 dec = -36.20378247543336 

47 rot = 359.99623587800414 

48 self.region = ConvexPolygon( 

49 [UnitVector3d(-0.43197476135299717, 0.6808244361827491, -0.5915030791555212), 

50 UnitVector3d(-0.4337643437542999, 0.6796857349601265, -0.5915029972697637), 

51 UnitVector3d(-0.4344262837761736, 0.6807261608742085, -0.5898183600617098), 

52 UnitVector3d(-0.43263670132940496, 0.6818648620502468, -0.5898184420345037), 

53 ]) 

54 self.times = lsst.daf.butler.Timespan(astropy.time.Time("2024-04-09T03:50:00", scale="tai"), 

55 astropy.time.Time("2024-04-09T03:50:30", scale="tai")) 

56 

57 repo_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) 

58 self.addCleanup(tempfile.TemporaryDirectory.cleanup, repo_dir) 

59 self.repo = butlerTests.makeTestRepo(repo_dir.name) 

60 self.enterContext(self.repo) 

61 

62 butlerTests.addDataIdValue(self.repo, "instrument", instrument) 

63 butlerTests.addDataIdValue(self.repo, "day_obs", day_obs) 

64 butlerTests.addDataIdValue(self.repo, "physical_filter", filter) 

65 butlerTests.addDataIdValue(self.repo, "detector", detector) 

66 butlerTests.addDataIdValue(self.repo, "group", group) 

67 # addDataIdValue can't handle metadata, or tables that don't have an ID column 

68 self.repo.registry.insertDimensionData("exposure", {"id": exposure, 

69 "instrument": instrument, 

70 "group": group, 

71 "day_obs": day_obs, 

72 "physical_filter": filter, 

73 "tracking_ra": ra, 

74 "tracking_dec": dec, 

75 "sky_angle": rot, 

76 "timespan": self.times, 

77 }) 

78 self.repo.registry.insertDimensionData("visit", {"id": exposure, 

79 "instrument": instrument, 

80 "day_obs": day_obs, 

81 "physical_filter": filter, 

82 "timespan": self.times, 

83 }) 

84 self.repo.registry.insertDimensionData("visit_definition", {"instrument": instrument, 

85 "exposure": exposure, 

86 "visit": exposure, 

87 }) 

88 self.repo.registry.insertDimensionData("visit_detector_region", {"instrument": instrument, 

89 "visit": exposure, 

90 "detector": detector, 

91 "region": self.region, 

92 }) 

93 

94 butlerTests.addDatasetType(self.repo, "regionTimeInfo", {"instrument", "group", "detector"}, 

95 "RegionTimeInfo") 

96 butlerTests.addDatasetType(self.repo, "initial_stars_footprints_detector", 

97 {"instrument", "visit", "detector"}, "SourceCatalog") 

98 # pipeTests.makeQuantum needs outputs registered even if graph generation does not. 

99 butlerTests.addDatasetType(self.repo, "getRegionTimeFromVisit_dummy2", 

100 {"instrument", "exposure", "detector"}, "int") 

101 

102 self.group_id = self.repo.registry.expandDataId( 

103 {"instrument": instrument, "group": group, "detector": detector}) 

104 self.exposure_id = self.repo.registry.expandDataId( 

105 {"instrument": instrument, "exposure": exposure, "detector": detector}) 

106 self.visit_id = self.repo.registry.expandDataId( 

107 {"instrument": instrument, "visit": exposure, "detector": detector}) 

108 

109 def test_runQuantum(self): 

110 butler = butlerTests.makeTestCollection(self.repo, uniqueId=self.id()) 

111 minimal_source_catalog = SourceCatalog() 

112 butler.put(minimal_source_catalog, "initial_stars_footprints_detector", self.visit_id) 

113 

114 task = GetRegionTimeFromVisitTask() 

115 quantum = pipeTests.makeQuantum( 

116 task, butler, self.group_id, 

117 {"output": self.group_id, 

118 "dummy_visit": self.visit_id, 

119 "dummy_exposure": [self.exposure_id], 

120 }) 

121 

122 pipeTests.runTestQuantum(task, butler, quantum, mockRun=False) 

123 

124 # Not exactly round-tripping, because these objects came from the dimension records. 

125 info = butler.get("regionTimeInfo", self.group_id) 

126 self.assertEqual(info.region, self.region) 

127 self.assertEqual(info.timespan, self.times) 

128 

129 def test_connections(self): 

130 pipeTests.lintConnections(GetRegionTimeFromVisitTask.ConfigClass.ConnectionsClass) 

131 

132 

133def setup_module(module): 

134 lsst.utils.tests.init() 

135 

136 

137class MemoryTestCase(lsst.utils.tests.MemoryTestCase): 

138 pass 

139 

140 

141if __name__ == "__main__": 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 lsst.utils.tests.init() 

143 unittest.main()