Coverage for tests / test_defineVisits.py: 22%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:50 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import logging 

23import os 

24import pickle 

25import shutil 

26import tempfile 

27import unittest 

28import warnings 

29from collections import defaultdict 

30 

31import lsst.daf.butler.tests as butlerTests 

32from lsst.daf.butler import DataCoordinate, DimensionRecord, SerializedDimensionRecord 

33from lsst.daf.butler.registry import ConflictingDefinitionError 

34from lsst.obs.base import DefineVisitsConfig, DefineVisitsTask 

35from lsst.obs.base.instrument_tests import DummyCam 

36from lsst.utils.iteration import ensure_iterable 

37 

38TESTDIR = os.path.dirname(__file__) 

39DATADIR = os.path.join(TESTDIR, "data", "visits") 

40 

41 

42class DefineVisitsBase: 

43 """General set up that can be shared.""" 

44 

45 use_data_ids = True 

46 """Use data IDs when calling defineVisits, else use dimension records.""" 

47 

48 def setUpExposures(self): 

49 """Create a new butler for each test since we are changing dimension 

50 records. 

51 """ 

52 self.root = tempfile.mkdtemp(dir=TESTDIR) 

53 self.creatorButler = butlerTests.makeTestRepo(self.root, {}) 

54 self.enterContext(self.creatorButler) 

55 self.butler = butlerTests.makeTestCollection(self.creatorButler, uniqueId=self.id()) 

56 self.enterContext(self.butler) 

57 

58 self.config = self.get_config() 

59 self.task = DefineVisitsTask(config=self.config, butler=self.butler) 

60 

61 # Need to register the instrument. 

62 DummyCam().register(self.butler.registry) 

63 

64 # Choose serializations based on universe. 

65 universe = self.butler.dimensions 

66 uversion = universe.version 

67 # Not all universe changes result in visible changes. 

68 match uversion: 

69 case uversion if uversion < 2: 

70 raise unittest.SkipTest(f"Universe {uversion} is not compatible with these test files.") 

71 case 2 | 3 | 4 | 5: 

72 # has_simulated, azimuth, seq_start, seq_end. 

73 v = 2 

74 case 6: 

75 # group not group_name, group_id dropped. 

76 v = 6 

77 case 7: 

78 # can_see_sky. 

79 v = 7 

80 case _: 

81 # Might work. 

82 warnings.warn(f"Universe {uversion} has not been validated.") 

83 v = 7 

84 

85 # Read the exposure records. 

86 self.records: dict[int, DimensionRecord] = {} 

87 for i in (347, 348, 349): 

88 with open(os.path.join(DATADIR, f"exp_v{v}_{i}.json")) as fh: 

89 simple = SerializedDimensionRecord.model_validate_json(fh.read()) 

90 self.records[i] = DimensionRecord.from_simple(simple, registry=self.butler.registry) 

91 

92 def define_visits( 

93 self, 

94 exposures: list[DimensionRecord | list[DimensionRecord]], 

95 incremental: bool, 

96 ) -> None: 

97 for records in exposures: 

98 records = list(ensure_iterable(records)) 

99 if "group" in self.butler.dimensions["exposure"].implied: 

100 # This is a group + day_obs universe. 

101 for rec in records: 

102 self.butler.registry.syncDimensionData( 

103 "group", dict(instrument=rec.instrument, name=rec.group) 

104 ) 

105 self.butler.registry.syncDimensionData( 

106 "day_obs", dict(instrument=rec.instrument, id=rec.day_obs) 

107 ) 

108 

109 deduped_records = set(records) 

110 self.butler.registry.insertDimensionData("exposure", *deduped_records) 

111 # Include all records so far in definition. 

112 if self.use_data_ids: 

113 dataIds = list(self.butler.registry.queryDataIds("exposure", instrument="DummyCam")) 

114 else: 

115 dataIds = records 

116 

117 if not incremental: 

118 # Force duplicate records in non-incremental mode to ensure 

119 # that the task can deduplicate. 

120 dataIds.extend(dataIds) 

121 n_exposures = len(self.records) 

122 with self.assertLogs(level=logging.INFO) as cm: 

123 self.task.run(dataIds, incremental=incremental) 

124 self.assertIn(f"Grouping {n_exposures} exposure(s) into visits", "\n".join(cm.output)) 

125 else: 

126 self.task.run(dataIds, incremental=incremental) 

127 

128 

129class DefineVisitsTestCase(unittest.TestCase, DefineVisitsBase): 

130 """Test visit definition.""" 

131 

132 def setUp(self): 

133 self.setUpExposures() 

134 

135 def tearDown(self): 

136 if self.root is not None: 

137 shutil.rmtree(self.root, ignore_errors=True) 

138 

139 def get_config(self) -> DefineVisitsConfig: 

140 return DefineVisitsTask.ConfigClass() 

141 

142 def assertVisits(self): 

143 """Check that the visits were registered as expected.""" 

144 visits = list(self.butler.registry.queryDimensionRecords("visit")) 

145 self.assertEqual(len(visits), 4) 

146 self.assertEqual( 

147 {visit.id for visit in visits}, {2022040500347, 2022040500348, 2022040500349, 92022040500348} 

148 ) 

149 

150 # Ensure that the definitions are correct (ignoring order). 

151 defmap = defaultdict(set) 

152 definitions = list(self.butler.registry.queryDimensionRecords("visit_definition")) 

153 for defn in definitions: 

154 defmap[defn.visit].add(defn.exposure) 

155 

156 self.assertEqual( 

157 dict(defmap), 

158 { 

159 92022040500348: {2022040500348}, 

160 2022040500347: {2022040500347}, 

161 2022040500348: {2022040500348, 2022040500349}, 

162 2022040500349: {2022040500349}, 

163 }, 

164 ) 

165 

166 def test_defineVisits(self): 

167 # Test visit definition with all the records. 

168 self.define_visits([list(self.records.values())], incremental=False) # list inside a list 

169 self.assertVisits() 

170 

171 def test_incremental_cumulative(self): 

172 # Define the visits after each exposure. 

173 self.define_visits(list(self.records.values()), incremental=True) 

174 self.assertVisits() 

175 

176 def test_incremental_cumulative_reverse(self): 

177 # In reverse order we should still eventually end up with the right 

178 # answer. 

179 with self.assertLogs("lsst.defineVisits.groupExposures", level="WARNING") as cm: 

180 self.define_visits(list(reversed(self.records.values())), incremental=True) 

181 self.assertIn("Skipping the multi-snap definition", "\n".join(cm.output)) 

182 self.assertVisits() 

183 

184 def define_visits_incrementally(self, exposure: DimensionRecord) -> None: 

185 if "group" in self.butler.dimensions["exposure"].implied: 

186 self.butler.registry.syncDimensionData( 

187 "group", dict(instrument=exposure.instrument, name=exposure.group) 

188 ) 

189 self.butler.registry.syncDimensionData( 

190 "day_obs", 

191 dict( 

192 instrument=exposure.instrument, 

193 id=exposure.day_obs, 

194 ), 

195 ) 

196 self.butler.registry.insertDimensionData("exposure", exposure) 

197 dataIds = [ 

198 DataCoordinate.standardize( 

199 instrument="DummyCam", exposure=exposure.id, universe=self.butler.dimensions 

200 ) 

201 ] 

202 self.task.run(dataIds, incremental=True) 

203 

204 def test_incremental(self): 

205 for record in self.records.values(): 

206 self.define_visits_incrementally(record) 

207 self.assertVisits() 

208 

209 def test_incremental_reverse(self): 

210 for record in reversed(self.records.values()): 

211 self.define_visits_incrementally(record) 

212 self.assertVisits() 

213 

214 def testPickleTask(self): 

215 stream = pickle.dumps(self.task) 

216 copy = pickle.loads(stream) 

217 self.enterContext(copy.butler) 

218 self.assertEqual(self.task.getFullName(), copy.getFullName()) 

219 self.assertEqual(self.task.log.name, copy.log.name) 

220 self.assertEqual(self.task.config, copy.config) 

221 self.assertEqual(self.task.butler._config, copy.butler._config) 

222 self.assertEqual(list(self.task.butler.collections.defaults), list(copy.butler.collections.defaults)) 

223 self.assertEqual(self.task.butler.run, copy.butler.run) 

224 self.assertEqual(self.task.universe, copy.universe) 

225 

226 

227class DefineVisitsRecordsTestCase(DefineVisitsTestCase): 

228 """Define visits using only dimension records.""" 

229 

230 use_data_ids = False 

231 

232 

233class DefineVisitsGroupingTestCase(unittest.TestCase, DefineVisitsBase): 

234 """Test visit grouping by group metadata.""" 

235 

236 def setUp(self): 

237 self.setUpExposures() 

238 

239 def tearDown(self): 

240 if self.root is not None: 

241 shutil.rmtree(self.root, ignore_errors=True) 

242 

243 def get_config(self) -> DefineVisitsConfig: 

244 config = DefineVisitsTask.ConfigClass() 

245 config.groupExposures.name = "by-group-metadata" 

246 return config 

247 

248 def test_defineVisits(self): 

249 # Test visit definition with all the records. 

250 self.define_visits([list(self.records.values())], incremental=False) # list inside a list 

251 self.assertVisits() 

252 

253 def assertVisits(self): 

254 """Check that the visits were registered as expected.""" 

255 visits = list(self.butler.registry.queryDimensionRecords("visit")) 

256 self.assertEqual(len(visits), 2) 

257 

258 # The visit ID itself depends on which universe we are using. 

259 # It is either calculated or comes from the JSON record. 

260 if "group" in self.butler.dimensions["exposure"].implied: 

261 visit_ids = [20220406025653255, 20220406025807181] 

262 else: 

263 visit_ids = [2291434132550000, 2291434871810000] 

264 self.assertEqual({visit.id for visit in visits}, set(visit_ids)) 

265 

266 # Ensure that the definitions are correct (ignoring order). 

267 defmap = defaultdict(set) 

268 definitions = list(self.butler.registry.queryDimensionRecords("visit_definition")) 

269 for defn in definitions: 

270 defmap[defn.visit].add(defn.exposure) 

271 

272 self.assertEqual( 

273 dict(defmap), 

274 { 

275 visit_ids[0]: {2022040500347}, 

276 visit_ids[1]: {2022040500348, 2022040500349}, 

277 }, 

278 ) 

279 

280 

281class DefineVisitsGroupingRecordsTestCase(DefineVisitsGroupingTestCase): 

282 """Test using dimension records instead of Data IDs.""" 

283 

284 use_data_ids = False 

285 

286 

287class DefineVisitsOneToOneTestCase(unittest.TestCase, DefineVisitsBase): 

288 """Test visit grouping by group metadata.""" 

289 

290 def setUp(self): 

291 self.setUpExposures() 

292 

293 def tearDown(self): 

294 if self.root is not None: 

295 shutil.rmtree(self.root, ignore_errors=True) 

296 

297 def get_config(self) -> DefineVisitsConfig: 

298 config = DefineVisitsTask.ConfigClass() 

299 config.groupExposures.name = "one-to-one" 

300 return config 

301 

302 def test_defineVisits(self): 

303 # Test visit definition with all the records. 

304 self.define_visits([list(self.records.values())], incremental=False) # list inside a list 

305 self.assertVisits() 

306 

307 def assertVisits(self): 

308 """Check that the visits were registered as expected.""" 

309 visits = list(self.butler.registry.queryDimensionRecords("visit")) 

310 self.assertEqual(len(visits), 3) 

311 

312 # For one-to-one the visit ID is the exposure ID. 

313 visit_ids = [rec.id for rec in self.records.values()] 

314 self.assertEqual({visit.id for visit in visits}, set(visit_ids)) 

315 

316 # Ensure that the definitions are correct (ignoring order). 

317 defmap = defaultdict(set) 

318 definitions = list(self.butler.registry.queryDimensionRecords("visit_definition")) 

319 for defn in definitions: 

320 defmap[defn.visit].add(defn.exposure) 

321 

322 self.assertEqual( 

323 dict(defmap), 

324 { 

325 visit_ids[0]: {2022040500347}, 

326 visit_ids[1]: {2022040500348}, 

327 visit_ids[2]: {2022040500349}, 

328 }, 

329 ) 

330 

331 def test_update_records(self): 

332 self.define_visits([list(self.records.values())], incremental=False) # list inside a list 

333 self.assertVisits() 

334 

335 # Modify one of the records. 

336 records = self.records 

337 simple = records[348].to_simple() 

338 simple.record["target_name"] = "new target" 

339 records[348] = DimensionRecord.from_simple(simple, universe=self.butler.dimensions) 

340 self.butler.registry.syncDimensionData("exposure", records[348], update=True) 

341 

342 # Re-run without updates or skipping should fail. 

343 with self.assertRaises(ConflictingDefinitionError): 

344 self.task.run(records.values()) 

345 

346 result = self.task.run(records.values(), skip_conflicting=True) 

347 self.assertEqual(result.n_skipped, 3, str(result)) 

348 

349 # Check that the visit definition did not change. 

350 visit_348 = self.butler.query_dimension_records("visit", where="visit.seq_num = 348")[0] 

351 self.assertEqual(visit_348.target_name, "LATISS_E6A_00000040", visit_348) 

352 

353 # Run with forced update. 

354 result = self.task.run(records.values(), skip_conflicting=True, update_records=True) 

355 

356 # Every record reports it was updated if we are updating, even if 

357 # a record was not really changed. 

358 self.assertEqual(result.n_skipped, 0, str(result)) 

359 self.assertEqual(result.n_fully_updated, 3, str(result)) 

360 visit_348 = self.butler.query_dimension_records("visit", where="visit.seq_num = 348")[0] 

361 self.assertEqual(visit_348.target_name, "new target", visit_348) 

362 

363 

364if __name__ == "__main__": 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true

365 unittest.main()