Coverage for tests / nopytest_convertReferenceCatalog.py: 0%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:57 +0000

1# This file is part of meas_algorithms. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22# This file is excluded from running through pytest due to concerns about the 

23# interaction between multiprocessing as invoked by this code, and the process 

24# pool used by pytest. 

25# 

26# Note that it is invoked independently by SCons, so the tests are still run 

27# as part of the build. 

28 

29import logging 

30import os.path 

31import tempfile 

32import unittest 

33import unittest.mock 

34 

35import numpy as np 

36 

37import lsst.daf.butler 

38from lsst.daf.butler import DatasetType, DeferredDatasetHandle 

39from lsst.daf.butler.script import ingest_files 

40from lsst.meas.algorithms import (ConvertReferenceCatalogTask, ReferenceObjectLoader) 

41from lsst.meas.algorithms.htmIndexer import HtmIndexer 

42from lsst.meas.algorithms.convertRefcatManager import ConvertRefcatManager 

43from lsst.meas.algorithms.readTextCatalogTask import ReadTextCatalogTask 

44from lsst.meas.algorithms.convertReferenceCatalog import addRefCatMetadata 

45import lsst.utils 

46 

47import convertReferenceCatalogTestBase 

48 

49 

50class TestConvertReferenceCatalogParallel(convertReferenceCatalogTestBase.ConvertReferenceCatalogTestBase, 

51 lsst.utils.tests.TestCase): 

52 """Test converting a refcat with multiprocessing turned on. 

53 

54 This significantly overlaps in coverage with 

55 ``ReferenceObjectLoaderTestCase`` in ``test_referenceObjectLoader.py``, but 

56 focuses on checking the conversion, using the loader to perform that check. 

57 """ 

58 def testIngestTwoFilesTwoCores(self): 

59 with (tempfile.TemporaryDirectory() as inPath1, tempfile.TemporaryDirectory() as inPath2, 

60 tempfile.TemporaryDirectory() as dataPath): 

61 # Generate a second catalog, with different ids 

62 skyCatalogFile1, _, skyCatalog1 = self.makeSkyCatalog(inPath1, idStart=25, seed=123) 

63 skyCatalogFile2, _, skyCatalog2 = self.makeSkyCatalog(inPath2, idStart=5432, seed=11) 

64 # override some field names, and use multiple cores 

65 config = convertReferenceCatalogTestBase.makeConvertConfig(withRaDecErr=True, 

66 withMagErr=True, 

67 withPm=True, 

68 withParallax=True, 

69 withFullPositionInformation=True) 

70 # use a very small HTM pixelization depth to ensure there will be collisions when 

71 # ingesting the files in parallel 

72 depth = 2 

73 config.dataset_config.indexer.active.depth = depth 

74 # np.savetxt prepends '# ' to the header lines, so use a reader that understands that 

75 config.file_reader.format = 'ascii.commented_header' 

76 config.n_processes = 2 # use multiple cores for this test only 

77 config.id_name = 'id' # Use the ids from the generated catalogs 

78 repoPath = os.path.join(self.outPath, "output_multifile_parallel") 

79 

80 # Convert the input data files to our HTM indexed format. 

81 converter = ConvertReferenceCatalogTask(output_dir=dataPath, config=config) 

82 converter.run([skyCatalogFile1, skyCatalogFile2]) 

83 

84 # Make a temporary butler to ingest them into. 

85 butler = self.makeTemporaryRepo(repoPath, config.dataset_config.indexer.active.depth) 

86 dimensions = [f"htm{depth}"] 

87 datasetType = DatasetType(config.dataset_config.ref_dataset_name, 

88 dimensions, 

89 "SimpleCatalog", 

90 universe=butler.dimensions, 

91 isCalibration=False) 

92 butler.registry.registerDatasetType(datasetType) 

93 

94 # Ingest the files into the new butler. 

95 run = "testingRun" 

96 htmTableFile = os.path.join(dataPath, "filename_to_htm.ecsv") 

97 ingest_files(repoPath, 

98 config.dataset_config.ref_dataset_name, 

99 run, 

100 htmTableFile, 

101 transfer="auto") 

102 

103 # Test if we can get back the catalogs, with a new butler. 

104 with lsst.daf.butler.Butler.from_config(repoPath) as butler: 

105 datasetRefs = list(butler.registry.queryDatasets(config.dataset_config.ref_dataset_name, 

106 collections=[run]).expanded()) 

107 handlers = [] 

108 for dataRef in datasetRefs: 

109 handlers.append(DeferredDatasetHandle(butler=butler, ref=dataRef, parameters=None)) 

110 loaderConfig = ReferenceObjectLoader.ConfigClass() 

111 loader = ReferenceObjectLoader([dataRef.dataId for dataRef in datasetRefs], 

112 handlers, 

113 name="testRefCat", 

114 config=loaderConfig, 

115 log=self.logger) 

116 self.checkAllRowsInRefcat(loader, skyCatalog1, config) 

117 self.checkAllRowsInRefcat(loader, skyCatalog2, config) 

118 

119 

120class TestConvertRefcatManager(convertReferenceCatalogTestBase.ConvertReferenceCatalogTestBase, 

121 lsst.utils.tests.TestCase): 

122 """Unittests of various methods of ConvertRefcatManager. 

123 

124 Uses mocks to force particular behavior regarding e.g. catalogs. 

125 """ 

126 def setUp(self): 

127 self.rng = np.random.Generator(np.random.MT19937(5)) 

128 

129 self.tempDir = tempfile.TemporaryDirectory() 

130 tempPath = self.tempDir.name 

131 self.log = logging.getLogger("lsst.TestConvertRefcatManager") 

132 self.config = convertReferenceCatalogTestBase.makeConvertConfig(withRaDecErr=True) 

133 self.config.id_name = 'id' 

134 self.depth = 2 # very small depth, for as few pixels as possible. 

135 self.indexer = HtmIndexer(self.depth) 

136 self.htm = lsst.sphgeom.HtmPixelization(self.depth) 

137 converter = ConvertReferenceCatalogTask(output_dir=tempPath, config=self.config) 

138 dtype = [('id', '<f8'), ('ra', '<f8'), ('dec', '<f8'), ('ra_err', '<f8'), ('dec_err', '<f8'), 

139 ('a', '<f8'), ('a_err', '<f8')] 

140 self.schema, self.key_map = converter.makeSchema(dtype) 

141 self.fileReader = ReadTextCatalogTask() 

142 

143 self.fakeInput = self.makeSkyCatalog(outPath=None, size=5, idStart=6543) 

144 self.matchedPixels = np.array([1, 1, 2, 2, 3]) 

145 self.tempDir2 = tempfile.TemporaryDirectory() 

146 tempPath = self.tempDir2.name 

147 self.filenames = {x: os.path.join(tempPath, "%d.fits" % x) for x in set(self.matchedPixels)} 

148 

149 self.worker = ConvertRefcatManager(self.filenames, 

150 self.config, 

151 self.fileReader, 

152 self.indexer, 

153 self.schema, 

154 self.key_map, 

155 self.htm.universe()[0], 

156 addRefCatMetadata, 

157 self.log) 

158 

159 def _createFakeCatalog(self, nOld=5, nNew=0, idStart=42): 

160 """Create a fake output SimpleCatalog, populated with nOld+nNew elements. 

161 

162 Parameters 

163 ---------- 

164 nOld : `int`, optional 

165 The number of filled in sources to put in the catalog. 

166 nNew : `int`, optional 

167 The number of empty sources to put in the catalog. 

168 idStart : `int`, optional 

169 The start id of the ``nOld`` sources. 

170 

171 Returns 

172 ------- 

173 catalog : `lsst.afw.table.SimpleCatalog` 

174 A catalog populated with random data and contiguous ids. 

175 """ 

176 catalog = lsst.afw.table.SimpleCatalog(self.schema) 

177 catalog.resize(nOld) 

178 for x in self.schema: 

179 catalog[x.key] = self.rng.random(nOld) 

180 # do the ids separately, so there are no duplicates 

181 catalog['id'] = np.arange(idStart, idStart + nOld) 

182 catalog.resize(nOld + nNew) # make space for the elements we will add 

183 return catalog.copy(deep=True) 

184 

185 def test_doOnePixelNewData(self): 

186 """Test that we can add new data to an existing catalog.""" 

187 pixelId = 1 # the pixel we are going to test 

188 

189 nOld = 5 

190 nNew = sum(self.matchedPixels == pixelId) 

191 catalog = self._createFakeCatalog(nOld=nOld, nNew=nNew) 

192 self.worker.getCatalog = unittest.mock.Mock(self.worker.getCatalog, return_value=catalog) 

193 

194 self.worker._doOnePixel(self.fakeInput, self.matchedPixels, pixelId, {}, {}) 

195 newcat = lsst.afw.table.SimpleCatalog.readFits(self.filenames[pixelId]) 

196 

197 # check that the "pre" catalog is unchanged, exactly 

198 np.testing.assert_equal(newcat[:nOld]['id'], catalog[:nOld]['id']) 

199 self.assertFloatsEqual(newcat[:nOld]['coord_ra'], catalog[:nOld]['coord_ra']) 

200 self.assertFloatsEqual(newcat[:nOld]['coord_dec'], catalog[:nOld]['coord_dec']) 

201 

202 # check that the new catalog elements are set correctly 

203 newElements = self.fakeInput[self.matchedPixels == pixelId] 

204 np.testing.assert_equal(newcat[nOld:]['id'], newElements['id']) 

205 self.assertFloatsAlmostEqual(newcat[nOld:]['coord_ra'], newElements['ra']*np.pi/180) 

206 self.assertFloatsAlmostEqual(newcat[nOld:]['coord_dec'], newElements['dec']*np.pi/180) 

207 

208 def test_doOnePixelNoData(self): 

209 """Test that we can put new data into an empty catalog.""" 

210 pixelId = 2 

211 

212 nOld = 0 

213 nNew = sum(self.matchedPixels == pixelId) 

214 catalog = self._createFakeCatalog(nOld=nOld, nNew=nNew) 

215 self.worker.getCatalog = unittest.mock.Mock(self.worker.getCatalog, return_value=catalog) 

216 

217 self.worker._doOnePixel(self.fakeInput, self.matchedPixels, pixelId, {}, {}) 

218 newcat = lsst.afw.table.SimpleCatalog.readFits(self.filenames[pixelId]) 

219 

220 # check that the new catalog elements are set correctly 

221 newElements = self.fakeInput[self.matchedPixels == pixelId] 

222 np.testing.assert_equal(newcat['id'], newElements['id']) 

223 self.assertFloatsAlmostEqual(newcat['coord_ra'], newElements['ra']*np.pi/180) 

224 self.assertFloatsAlmostEqual(newcat['coord_dec'], newElements['dec']*np.pi/180) 

225 

226 def test_getCatalog(self): 

227 """Test that getCatalog returns a properly expanded new catalog.""" 

228 pixelId = 3 

229 nOld = 10 

230 nNewElements = 5 

231 # save a catalog to disk that we can check against the getCatalog()'s return 

232 catalog = self._createFakeCatalog(nOld=nOld, nNew=0) 

233 catalog.writeFits(self.filenames[pixelId]) 

234 newcat = self.worker.getCatalog(pixelId, self.schema, nNewElements) 

235 

236 self.assertEqual(len(newcat), nOld + nNewElements) 

237 

238 np.testing.assert_equal(newcat[:len(catalog)]['id'], catalog['id']) 

239 self.assertFloatsEqual(newcat[:len(catalog)]['coord_ra'], catalog['coord_ra']) 

240 self.assertFloatsEqual(newcat[:len(catalog)]['coord_dec'], catalog['coord_dec']) 

241 

242 def test_setCoordinateCovariance(self): 

243 catalog = self._createFakeCatalog(nOld=10, nNew=0) 

244 

245 with self.assertRaises(NotImplementedError): 

246 self.worker._setCoordinateCovariance(catalog[0], self.fakeInput[0]) 

247 

248 def tearDown(self): 

249 self.tempDir.cleanup() 

250 self.tempDir2.cleanup() 

251 

252 

253class TestMemory(lsst.utils.tests.MemoryTestCase): 

254 pass 

255 

256 

257def setup_module(module): 

258 lsst.utils.tests.init() 

259 

260 

261if __name__ == "__main__": 

262 lsst.utils.tests.init() 

263 unittest.main()