Coverage for python / lsst / daf / butler / tests / utils.py: 33%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30from unittest.mock import patch 

31 

32__all__ = () 

33 

34import os 

35import shutil 

36import tempfile 

37from collections.abc import Callable, Iterator, Sequence 

38from contextlib import contextmanager 

39from typing import TYPE_CHECKING, Any 

40 

41import astropy 

42from astropy.table import Table as AstropyTable 

43 

44from lsst.resources import ResourcePathExpression 

45 

46from .. import Butler, ButlerConfig, Config, DatasetRef, StorageClassFactory, Timespan 

47from .._collection_type import CollectionType 

48from ..datastore import NullDatastore 

49from ..dimensions import DimensionConfig 

50from ..direct_butler import DirectButler 

51from ..registry.sql_registry import RegistryConfig, SqlRegistry 

52from ..tests import MetricsExample, addDatasetType 

53 

54if TYPE_CHECKING: 

55 import unittest 

56 

57 from lsst.daf.butler import DatasetType 

58 

59 class TestCaseMixin(unittest.TestCase): 

60 """Base class for mixin test classes that use TestCase methods.""" 

61 

62 pass 

63 

64else: 

65 

66 class TestCaseMixin: 

67 """Do-nothing definition of mixin base class for regular execution.""" 

68 

69 pass 

70 

71 

72def makeTestTempDir(default_base: str) -> str: 

73 """Create a temporary directory for test usage. 

74 

75 The directory will be created within ``DAF_BUTLER_TEST_TMP`` if that 

76 environment variable is set, falling back to ``default_base`` if it is 

77 not. 

78 

79 Parameters 

80 ---------- 

81 default_base : `str` 

82 Default parent directory. 

83 

84 Returns 

85 ------- 

86 dir : `str` 

87 Name of the new temporary directory. 

88 """ 

89 base = os.environ.get("DAF_BUTLER_TEST_TMP", default_base) 

90 return tempfile.mkdtemp(dir=base) 

91 

92 

93def removeTestTempDir(root: str | None) -> None: 

94 """Attempt to remove a temporary test directory, but do not raise if 

95 unable to. 

96 

97 Unlike `tempfile.TemporaryDirectory`, this passes ``ignore_errors=True`` 

98 to ``shutil.rmtree`` at close, making it safe to use on NFS. 

99 

100 Parameters 

101 ---------- 

102 root : `str`, optional 

103 Name of the directory to be removed. If `None`, nothing will be done. 

104 """ 

105 if root is not None and os.path.exists(root): 

106 shutil.rmtree(root, ignore_errors=True) 

107 

108 

109@contextmanager 

110def safeTestTempDir(default_base: str) -> Iterator[str]: 

111 """Return a context manager that creates a temporary directory and then 

112 attempts to remove it. 

113 

114 Parameters 

115 ---------- 

116 default_base : `str` 

117 Default parent directory, forwarded to `makeTestTempDir`. 

118 

119 Returns 

120 ------- 

121 context : `contextlib.ContextManager` 

122 A context manager that returns the new directory name on ``__enter__`` 

123 and removes the temporary directory (via `removeTestTempDir`) on 

124 ``__exit__``. 

125 """ 

126 root = makeTestTempDir(default_base) 

127 try: 

128 yield root 

129 finally: 

130 removeTestTempDir(root) 

131 

132 

133def create_populated_sqlite_registry( 

134 *args: ResourcePathExpression, 

135 registry_config: RegistryConfig | None = None, 

136 dimension_config: DimensionConfig | None = None, 

137) -> Butler: 

138 """Create an in-memory registry-only sqlite butler and populate it. 

139 

140 Parameters 

141 ---------- 

142 *args : convertible to `lsst.resources.ResourcePath` 

143 Paths to export YAML files that should be imported. 

144 registry_config : ``RegistryConfig``, optional 

145 Registry configuration to use as the basis for the Butler 

146 configuration. 

147 dimension_config : ``DimensionConfig``, optional 

148 Dimension universe configuration. 

149 

150 Returns 

151 ------- 

152 butler : `Butler` 

153 New butler populated with the specified import files. 

154 """ 

155 config = ButlerConfig() 

156 if registry_config is not None: 

157 config["registry"] = registry_config 

158 config[".registry.db"] = "sqlite://" 

159 registry = SqlRegistry.createFromConfig(config["registry"], dimension_config) 

160 butler = DirectButler( 

161 config=config, 

162 registry=registry, 

163 datastore=NullDatastore(None, None), 

164 storageClasses=StorageClassFactory(), 

165 ) 

166 for arg in args: 

167 butler.import_(filename=arg, without_datastore=True) 

168 return butler 

169 

170 

171class ButlerTestHelper: 

172 """Mixin with helpers for unit tests.""" 

173 

174 assertEqual: Callable 

175 assertIsInstance: Callable 

176 maxDiff: int | None 

177 

178 def assertAstropyTablesEqual( 

179 self, 

180 tables: AstropyTable | Sequence[AstropyTable], 

181 expectedTables: AstropyTable | Sequence[AstropyTable], 

182 filterColumns: bool = False, 

183 unorderedRows: bool = False, 

184 ) -> None: 

185 """Verify that a list of astropy tables matches a list of expected 

186 astropy tables. 

187 

188 Parameters 

189 ---------- 

190 tables : `astropy.table.Table` or iterable [`astropy.table.Table`] 

191 The table or tables that should match the expected tables. 

192 expectedTables : `astropy.table.Table` 

193 or iterable [`astropy.table.Table`] 

194 The tables with expected values to which the tables under test will 

195 be compared. 

196 filterColumns : `bool` 

197 If `True` then only compare columns that exist in 

198 ``expectedTables``. 

199 unorderedRows : `bool`, optional 

200 If `True` (`False` is default), don't require tables to have their 

201 rows in the same order. 

202 """ 

203 # If a single table is passed in for tables or expectedTables, put it 

204 # in a list. 

205 if isinstance(tables, AstropyTable): 

206 tables = [tables] 

207 if isinstance(expectedTables, AstropyTable): 

208 expectedTables = [expectedTables] 

209 self.assertEqual(len(tables), len(expectedTables)) 

210 for table, expected in zip(tables, expectedTables, strict=True): 

211 # Assert that we are testing what we think we are testing: 

212 self.assertIsInstance(table, AstropyTable) 

213 self.assertIsInstance(expected, AstropyTable) 

214 if filterColumns: 

215 table = table.copy() 

216 table.keep_columns(expected.colnames) 

217 if unorderedRows: 

218 table = table.copy() 

219 table.sort(table.colnames) 

220 expected = expected.copy() 

221 expected.sort(expected.colnames) 

222 # Assert that they match. 

223 # Recommendation from Astropy Slack is to format the table into 

224 # lines for comparison. We do not compare column data types. 

225 table1 = table.pformat() 

226 expected1 = expected.pformat() 

227 original_max = self.maxDiff 

228 self.maxDiff = None # This is required to get the full diff. 

229 try: 

230 self.assertEqual(table1, expected1, f"Table:\n{table}\n\nvs Expected:\n{expected}") 

231 finally: 

232 self.maxDiff = original_max 

233 

234 

235def readTable(textTable: str) -> AstropyTable: 

236 """Read an astropy table from formatted text. 

237 

238 Contains formatting that causes the astropy table to print an empty string 

239 instead of "--" for missing/unpopulated values in the text table. 

240 

241 Parameters 

242 ---------- 

243 textTable : `str` 

244 The text version of the table to read. 

245 

246 Returns 

247 ------- 

248 table : `astropy.table.Table` 

249 The table as an astropy table. 

250 """ 

251 return AstropyTable.read( 

252 textTable, 

253 format="ascii", 

254 data_start=2, # skip the header row and the header row underlines. 

255 fill_values=[("", 0, "")], 

256 ) 

257 

258 

259class MetricTestRepo: 

260 """Creates and manage a test repository on disk with datasets that 

261 may be queried and modified for unit tests. 

262 

263 Parameters 

264 ---------- 

265 root : `str` 

266 The location of the repository, to pass to ``Butler.makeRepo``. 

267 configFile : `str` 

268 The path to the config file, to pass to ``Butler.makeRepo``. 

269 forceConfigRoot : `bool`, optional 

270 If `False`, any values present in the supplied ``config`` that 

271 would normally be reset are not overridden and will appear 

272 directly in the output config. Passed to ``Butler.makeRepo``. 

273 storageClassName : `bool` or `None`, optional 

274 Name of storage class to use for datasets added to the test repository. 

275 A default will be used if none is specified. 

276 """ 

277 

278 METRICS_EXAMPLE_SUMMARY = {"AM1": 5.2, "AM2": 30.6} 

279 """The summary data included in ``MetricsExample`` objects stored in the 

280 test repo 

281 """ 

282 

283 _DEFAULT_RUN = "ingest/run" 

284 _DEFAULT_TAG = "ingest" 

285 _DEFAULT_STORAGE_CLASS = "StructuredCompositeReadComp" 

286 

287 @staticmethod 

288 def _makeExampleMetrics() -> MetricsExample: 

289 """Make an object to put into the repository.""" 

290 return MetricsExample( 

291 MetricTestRepo.METRICS_EXAMPLE_SUMMARY, 

292 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}}, 

293 [563, 234, 456.7, 752, 8, 9, 27], 

294 ) 

295 

296 def __init__( 

297 self, 

298 root: str, 

299 configFile: str, 

300 forceConfigRoot: bool = True, 

301 storageClassName: str | None = None, 

302 ) -> None: 

303 self.root = root 

304 butlerConfigFile = Butler.makeRepo( 

305 self.root, config=Config(configFile), forceConfigRoot=forceConfigRoot 

306 ) 

307 butler = Butler.from_config(butlerConfigFile, run=self._DEFAULT_RUN, collections=[self._DEFAULT_TAG]) 

308 self._do_init(butler, butlerConfigFile, storageClassName) 

309 

310 @classmethod 

311 def create_from_butler( 

312 cls, butler: Butler, butler_config_file: str | Config, storageClassName: str | None = None 

313 ) -> MetricTestRepo: 

314 """Create a MetricTestRepo from an existing Butler instance. 

315 

316 Parameters 

317 ---------- 

318 butler 

319 `Butler` instance used for setting up the repository. 

320 butler_config_file 

321 Path to the config file or the `Config` instance used to set up 

322 that Butler instance. 

323 storageClassName 

324 Name of storage class to use for datasets added to the test 

325 repository. A default will be used if none is specified. 

326 

327 Returns 

328 ------- 

329 repo 

330 New instance of `MetricTestRepo` using the provided `Butler` 

331 instance. 

332 """ 

333 self = cls.__new__(cls) 

334 butler = butler.clone(run=self._DEFAULT_RUN, collections=[self._DEFAULT_TAG]) 

335 self._do_init(butler, butler_config_file, storageClassName) 

336 return self 

337 

338 def _do_init( 

339 self, butler: Butler, butlerConfigFile: str | Config, storageClassName: str | None = None 

340 ) -> None: 

341 self.butler = butler 

342 self.storageClassFactory = StorageClassFactory() 

343 self.storageClassFactory.addFromConfig(butlerConfigFile) 

344 

345 # New datasets will be added to run and tag, but we will only look in 

346 # tag when looking up datasets. 

347 self.butler.collections.register(self._DEFAULT_TAG, CollectionType.TAGGED) 

348 

349 if storageClassName is None: 

350 storageClassName = self._DEFAULT_STORAGE_CLASS 

351 

352 # Create and register a DatasetType 

353 self.datasetType = addDatasetType( 

354 self.butler, "test_metric_comp", {"instrument", "visit"}, storageClassName 

355 ) 

356 

357 # Add needed Dimensions 

358 self.butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"}) 

359 self.butler.registry.insertDimensionData( 

360 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"} 

361 ) 

362 self.butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101}) 

363 self.butler.registry.insertDimensionData( 

364 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"} 

365 ) 

366 visitStart = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai") 

367 visitEnd = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai") 

368 self.butler.registry.insertDimensionData( 

369 "visit", 

370 dict( 

371 instrument="DummyCamComp", 

372 id=423, 

373 name="fourtwentythree", 

374 physical_filter="d-r", 

375 timespan=Timespan(visitStart, visitEnd), 

376 day_obs=20200101, 

377 ), 

378 ) 

379 self.butler.registry.insertDimensionData( 

380 "visit", 

381 dict( 

382 instrument="DummyCamComp", 

383 id=424, 

384 name="fourtwentyfour", 

385 physical_filter="d-r", 

386 day_obs=20200101, 

387 ), 

388 ) 

389 

390 self.ref1 = self.addDataset({"instrument": "DummyCamComp", "visit": 423}) 

391 self.ref2 = self.addDataset({"instrument": "DummyCamComp", "visit": 424}) 

392 

393 def addDataset( 

394 self, dataId: dict[str, Any], run: str | None = None, datasetType: DatasetType | None = None 

395 ) -> DatasetRef: 

396 """Create a new example metric and add it to the named run with the 

397 given dataId. 

398 

399 Overwrites tags, so this does not try to associate the new dataset with 

400 existing tags. (If/when tags are needed this can be added to the 

401 arguments of this function.) 

402 

403 Parameters 

404 ---------- 

405 dataId : `dict` 

406 The dataId for the new metric. 

407 run : `str`, optional 

408 The name of the run to create and add a dataset to. If `None`, the 

409 dataset will be added to the root butler. 

410 datasetType : ``DatasetType``, optional 

411 The dataset type of the added dataset. If `None`, will use the 

412 default dataset type. 

413 

414 Returns 

415 ------- 

416 datasetRef : `DatasetRef` 

417 A reference to the added dataset. 

418 """ 

419 if run: 

420 self.butler.collections.register(run) 

421 else: 

422 run = self._DEFAULT_RUN 

423 metric = self._makeExampleMetrics() 

424 return self.butler.put( 

425 metric, self.datasetType if datasetType is None else datasetType, dataId, run=run 

426 ) 

427 

428 

429@contextmanager 

430def mock_env(new_environment: dict[str, str]) -> Iterator[None]: 

431 """Context manager to clear the process environment variables, replace them 

432 with new values, and restore them at the end of the test. 

433 

434 Parameters 

435 ---------- 

436 new_environment : `dict` [`str`, `str`] 

437 New environment variable values. 

438 """ 

439 with patch.dict(os.environ, new_environment, clear=True): 

440 yield