Coverage for tests / test_loadDiaCatalogs.py: 30%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:39 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import astropy.units 

24import numpy as np 

25import tempfile 

26import unittest 

27import yaml 

28 

29from lsst.ap.association import LoadDiaCatalogsTask 

30from lsst.ap.association.utils import getMidpointFromTimespan, readSchemaFromApdb 

31from lsst.dax.apdb import Apdb, ApdbSql, ApdbTables 

32from lsst.resources import ResourcePath 

33import lsst.utils.tests 

34from utils_tests import makeExposure, makeDiaObjects, makeDiaSources, makeDiaForcedSources, makeRegionTime, \ 

35 getRegion 

36 

37 

38class TestLoadDiaCatalogs(unittest.TestCase): 

39 

40 def setUp(self): 

41 # Create an instance of random generator with fixed seed. 

42 rng = np.random.default_rng(1234) 

43 

44 self.db_file_fd, self.db_file = tempfile.mkstemp( 

45 dir=os.path.dirname(__file__)) 

46 self.addCleanup(os.remove, self.db_file) 

47 self.addCleanup(os.close, self.db_file_fd) 

48 

49 self.apdbConfig = ApdbSql.init_database(db_url="sqlite:///" + self.db_file) 

50 self.config_file = tempfile.NamedTemporaryFile() 

51 self.addCleanup(self.config_file.close) 

52 self.apdbConfig.save(self.config_file.name) 

53 self.apdb = Apdb.from_config(self.apdbConfig) 

54 self.schema = readSchemaFromApdb(self.apdb) 

55 

56 self.exposure = makeExposure(False, False) 

57 self.regionTime = makeRegionTime(exposure=self.exposure) 

58 self.dateTime = getMidpointFromTimespan(self.regionTime.timespan) 

59 

60 self.diaObjects = makeDiaObjects(20, self.exposure, rng) 

61 self.diaSources = makeDiaSources( 

62 100, self.diaObjects["diaObjectId"].to_numpy(), self.exposure, rng) 

63 self.diaForcedSources = makeDiaForcedSources( 

64 200, self.diaObjects["diaObjectId"].to_numpy(), self.exposure, rng) 

65 

66 # Store the test diaSources as though they were observed a month before 

67 # the current exposure. 

68 dateTime = self.regionTime.timespan.begin.tai - 30 * astropy.units.day 

69 self.apdb.store(dateTime, 

70 self.diaObjects, 

71 self.diaSources, 

72 self.diaForcedSources) 

73 

74 # These columns are not in the DPDD, yet do appear in DiaSource.yaml. 

75 # We don't need to check them against the default APDB schema. 

76 self.ignoreColumns = ["band", "bboxSize", "isDipole", "flags"] 

77 

78 def _makeConfig(self, **kwargs): 

79 config = LoadDiaCatalogsTask.ConfigClass() 

80 config.apdb_config_url = self.config_file.name 

81 config.update(**kwargs) 

82 return config 

83 

84 def testRun(self): 

85 """Test the full run method for the loader. 

86 """ 

87 diaConfig = self._makeConfig() 

88 diaLoader = LoadDiaCatalogsTask(config=diaConfig) 

89 result = diaLoader.run(self.regionTime) 

90 

91 self.assertEqual(len(result.diaObjects), len(self.diaObjects)) 

92 self.assertEqual(len(result.diaSources), len(self.diaSources)) 

93 self.assertEqual(len(result.diaForcedSources), 

94 len(self.diaForcedSources)) 

95 

96 def testLoadDiaObjects(self): 

97 """Test that the correct number of diaObjects are loaded. 

98 """ 

99 diaConfig = self._makeConfig() 

100 diaLoader = LoadDiaCatalogsTask(config=diaConfig) 

101 region = getRegion(self.exposure) 

102 diaObjects = diaLoader.loadDiaObjects(region, 

103 self.schema) 

104 self.assertEqual(len(diaObjects), len(self.diaObjects)) 

105 

106 def testLoadDiaForcedSources(self): 

107 """Test that the correct number of diaForcedSources are loaded. 

108 """ 

109 diaConfig = self._makeConfig() 

110 diaLoader = LoadDiaCatalogsTask(config=diaConfig) 

111 region = getRegion(self.exposure) 

112 diaForcedSources = diaLoader.loadDiaForcedSources( 

113 self.diaObjects, 

114 region, 

115 self.dateTime, 

116 self.schema) 

117 self.assertEqual(len(diaForcedSources), len(self.diaForcedSources)) 

118 

119 def testLoadDiaSources(self): 

120 """Test that the correct number of diaSources are loaded. 

121 

122 Also check that they can be properly loaded both by location and 

123 ``diaObjectId``. 

124 """ 

125 diaConfig = self._makeConfig() 

126 diaLoader = LoadDiaCatalogsTask(config=diaConfig) 

127 

128 region = getRegion(self.exposure) 

129 diaSources = diaLoader.loadDiaSources(self.diaObjects, 

130 region, 

131 self.dateTime, 

132 self.schema) 

133 self.assertEqual(len(diaSources), len(self.diaSources)) 

134 

135 def test_apdbSchema(self): 

136 """Test that the default DiaSource schema from dax_apdb agrees with the 

137 column names defined here in ap_association/data/DiaSource.yaml. 

138 """ 

139 tableDef = self.apdb.tableDef(ApdbTables.DiaSource) 

140 apdbSchemaColumns = [column.name for column in tableDef.columns] 

141 

142 functorFile = ResourcePath("resource://lsst.ap.association/resources/data/DiaSource.yaml") 

143 with functorFile.open("r") as yaml_stream: 

144 diaSourceFunctor = yaml.safe_load_all(yaml_stream) 

145 for functor in diaSourceFunctor: 

146 diaSourceColumns = [column for column in list(functor['funcs'].keys()) 

147 if column not in self.ignoreColumns] 

148 self.assertLess(set(diaSourceColumns), set(apdbSchemaColumns)) 

149 

150 

151class MemoryTester(lsst.utils.tests.MemoryTestCase): 

152 pass 

153 

154 

155def setup_module(module): 

156 lsst.utils.tests.init() 

157 

158 

159if __name__ == "__main__": 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 lsst.utils.tests.init() 

161 unittest.main()