Coverage for python / lsst / daf / butler / tests / utils.py: 33%
121 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from unittest.mock import patch
32__all__ = ()
34import os
35import shutil
36import tempfile
37from collections.abc import Callable, Iterator, Sequence
38from contextlib import contextmanager
39from typing import TYPE_CHECKING, Any
41import astropy
42from astropy.table import Table as AstropyTable
44from lsst.resources import ResourcePathExpression
46from .. import Butler, ButlerConfig, Config, DatasetRef, StorageClassFactory, Timespan
47from .._collection_type import CollectionType
48from ..datastore import NullDatastore
49from ..dimensions import DimensionConfig
50from ..direct_butler import DirectButler
51from ..registry.sql_registry import RegistryConfig, SqlRegistry
52from ..tests import MetricsExample, addDatasetType
54if TYPE_CHECKING:
55 import unittest
57 from lsst.daf.butler import DatasetType
59 class TestCaseMixin(unittest.TestCase):
60 """Base class for mixin test classes that use TestCase methods."""
62 pass
64else:
66 class TestCaseMixin:
67 """Do-nothing definition of mixin base class for regular execution."""
69 pass
72def makeTestTempDir(default_base: str) -> str:
73 """Create a temporary directory for test usage.
75 The directory will be created within ``DAF_BUTLER_TEST_TMP`` if that
76 environment variable is set, falling back to ``default_base`` if it is
77 not.
79 Parameters
80 ----------
81 default_base : `str`
82 Default parent directory.
84 Returns
85 -------
86 dir : `str`
87 Name of the new temporary directory.
88 """
89 base = os.environ.get("DAF_BUTLER_TEST_TMP", default_base)
90 return tempfile.mkdtemp(dir=base)
93def removeTestTempDir(root: str | None) -> None:
94 """Attempt to remove a temporary test directory, but do not raise if
95 unable to.
97 Unlike `tempfile.TemporaryDirectory`, this passes ``ignore_errors=True``
98 to ``shutil.rmtree`` at close, making it safe to use on NFS.
100 Parameters
101 ----------
102 root : `str`, optional
103 Name of the directory to be removed. If `None`, nothing will be done.
104 """
105 if root is not None and os.path.exists(root):
106 shutil.rmtree(root, ignore_errors=True)
109@contextmanager
110def safeTestTempDir(default_base: str) -> Iterator[str]:
111 """Return a context manager that creates a temporary directory and then
112 attempts to remove it.
114 Parameters
115 ----------
116 default_base : `str`
117 Default parent directory, forwarded to `makeTestTempDir`.
119 Returns
120 -------
121 context : `contextlib.ContextManager`
122 A context manager that returns the new directory name on ``__enter__``
123 and removes the temporary directory (via `removeTestTempDir`) on
124 ``__exit__``.
125 """
126 root = makeTestTempDir(default_base)
127 try:
128 yield root
129 finally:
130 removeTestTempDir(root)
133def create_populated_sqlite_registry(
134 *args: ResourcePathExpression,
135 registry_config: RegistryConfig | None = None,
136 dimension_config: DimensionConfig | None = None,
137) -> Butler:
138 """Create an in-memory registry-only sqlite butler and populate it.
140 Parameters
141 ----------
142 *args : convertible to `lsst.resources.ResourcePath`
143 Paths to export YAML files that should be imported.
144 registry_config : ``RegistryConfig``, optional
145 Registry configuration to use as the basis for the Butler
146 configuration.
147 dimension_config : ``DimensionConfig``, optional
148 Dimension universe configuration.
150 Returns
151 -------
152 butler : `Butler`
153 New butler populated with the specified import files.
154 """
155 config = ButlerConfig()
156 if registry_config is not None:
157 config["registry"] = registry_config
158 config[".registry.db"] = "sqlite://"
159 registry = SqlRegistry.createFromConfig(config["registry"], dimension_config)
160 butler = DirectButler(
161 config=config,
162 registry=registry,
163 datastore=NullDatastore(None, None),
164 storageClasses=StorageClassFactory(),
165 )
166 for arg in args:
167 butler.import_(filename=arg, without_datastore=True)
168 return butler
171class ButlerTestHelper:
172 """Mixin with helpers for unit tests."""
174 assertEqual: Callable
175 assertIsInstance: Callable
176 maxDiff: int | None
178 def assertAstropyTablesEqual(
179 self,
180 tables: AstropyTable | Sequence[AstropyTable],
181 expectedTables: AstropyTable | Sequence[AstropyTable],
182 filterColumns: bool = False,
183 unorderedRows: bool = False,
184 ) -> None:
185 """Verify that a list of astropy tables matches a list of expected
186 astropy tables.
188 Parameters
189 ----------
190 tables : `astropy.table.Table` or iterable [`astropy.table.Table`]
191 The table or tables that should match the expected tables.
192 expectedTables : `astropy.table.Table`
193 or iterable [`astropy.table.Table`]
194 The tables with expected values to which the tables under test will
195 be compared.
196 filterColumns : `bool`
197 If `True` then only compare columns that exist in
198 ``expectedTables``.
199 unorderedRows : `bool`, optional
200 If `True` (`False` is default), don't require tables to have their
201 rows in the same order.
202 """
203 # If a single table is passed in for tables or expectedTables, put it
204 # in a list.
205 if isinstance(tables, AstropyTable):
206 tables = [tables]
207 if isinstance(expectedTables, AstropyTable):
208 expectedTables = [expectedTables]
209 self.assertEqual(len(tables), len(expectedTables))
210 for table, expected in zip(tables, expectedTables, strict=True):
211 # Assert that we are testing what we think we are testing:
212 self.assertIsInstance(table, AstropyTable)
213 self.assertIsInstance(expected, AstropyTable)
214 if filterColumns:
215 table = table.copy()
216 table.keep_columns(expected.colnames)
217 if unorderedRows:
218 table = table.copy()
219 table.sort(table.colnames)
220 expected = expected.copy()
221 expected.sort(expected.colnames)
222 # Assert that they match.
223 # Recommendation from Astropy Slack is to format the table into
224 # lines for comparison. We do not compare column data types.
225 table1 = table.pformat()
226 expected1 = expected.pformat()
227 original_max = self.maxDiff
228 self.maxDiff = None # This is required to get the full diff.
229 try:
230 self.assertEqual(table1, expected1, f"Table:\n{table}\n\nvs Expected:\n{expected}")
231 finally:
232 self.maxDiff = original_max
235def readTable(textTable: str) -> AstropyTable:
236 """Read an astropy table from formatted text.
238 Contains formatting that causes the astropy table to print an empty string
239 instead of "--" for missing/unpopulated values in the text table.
241 Parameters
242 ----------
243 textTable : `str`
244 The text version of the table to read.
246 Returns
247 -------
248 table : `astropy.table.Table`
249 The table as an astropy table.
250 """
251 return AstropyTable.read(
252 textTable,
253 format="ascii",
254 data_start=2, # skip the header row and the header row underlines.
255 fill_values=[("", 0, "")],
256 )
259class MetricTestRepo:
260 """Creates and manage a test repository on disk with datasets that
261 may be queried and modified for unit tests.
263 Parameters
264 ----------
265 root : `str`
266 The location of the repository, to pass to ``Butler.makeRepo``.
267 configFile : `str`
268 The path to the config file, to pass to ``Butler.makeRepo``.
269 forceConfigRoot : `bool`, optional
270 If `False`, any values present in the supplied ``config`` that
271 would normally be reset are not overridden and will appear
272 directly in the output config. Passed to ``Butler.makeRepo``.
273 storageClassName : `bool` or `None`, optional
274 Name of storage class to use for datasets added to the test repository.
275 A default will be used if none is specified.
276 """
278 METRICS_EXAMPLE_SUMMARY = {"AM1": 5.2, "AM2": 30.6}
279 """The summary data included in ``MetricsExample`` objects stored in the
280 test repo
281 """
283 _DEFAULT_RUN = "ingest/run"
284 _DEFAULT_TAG = "ingest"
285 _DEFAULT_STORAGE_CLASS = "StructuredCompositeReadComp"
287 @staticmethod
288 def _makeExampleMetrics() -> MetricsExample:
289 """Make an object to put into the repository."""
290 return MetricsExample(
291 MetricTestRepo.METRICS_EXAMPLE_SUMMARY,
292 {"a": [1, 2, 3], "b": {"blue": 5, "red": "green"}},
293 [563, 234, 456.7, 752, 8, 9, 27],
294 )
296 def __init__(
297 self,
298 root: str,
299 configFile: str,
300 forceConfigRoot: bool = True,
301 storageClassName: str | None = None,
302 ) -> None:
303 self.root = root
304 butlerConfigFile = Butler.makeRepo(
305 self.root, config=Config(configFile), forceConfigRoot=forceConfigRoot
306 )
307 butler = Butler.from_config(butlerConfigFile, run=self._DEFAULT_RUN, collections=[self._DEFAULT_TAG])
308 self._do_init(butler, butlerConfigFile, storageClassName)
310 @classmethod
311 def create_from_butler(
312 cls, butler: Butler, butler_config_file: str | Config, storageClassName: str | None = None
313 ) -> MetricTestRepo:
314 """Create a MetricTestRepo from an existing Butler instance.
316 Parameters
317 ----------
318 butler
319 `Butler` instance used for setting up the repository.
320 butler_config_file
321 Path to the config file or the `Config` instance used to set up
322 that Butler instance.
323 storageClassName
324 Name of storage class to use for datasets added to the test
325 repository. A default will be used if none is specified.
327 Returns
328 -------
329 repo
330 New instance of `MetricTestRepo` using the provided `Butler`
331 instance.
332 """
333 self = cls.__new__(cls)
334 butler = butler.clone(run=self._DEFAULT_RUN, collections=[self._DEFAULT_TAG])
335 self._do_init(butler, butler_config_file, storageClassName)
336 return self
338 def _do_init(
339 self, butler: Butler, butlerConfigFile: str | Config, storageClassName: str | None = None
340 ) -> None:
341 self.butler = butler
342 self.storageClassFactory = StorageClassFactory()
343 self.storageClassFactory.addFromConfig(butlerConfigFile)
345 # New datasets will be added to run and tag, but we will only look in
346 # tag when looking up datasets.
347 self.butler.collections.register(self._DEFAULT_TAG, CollectionType.TAGGED)
349 if storageClassName is None:
350 storageClassName = self._DEFAULT_STORAGE_CLASS
352 # Create and register a DatasetType
353 self.datasetType = addDatasetType(
354 self.butler, "test_metric_comp", {"instrument", "visit"}, storageClassName
355 )
357 # Add needed Dimensions
358 self.butler.registry.insertDimensionData("instrument", {"name": "DummyCamComp"})
359 self.butler.registry.insertDimensionData(
360 "physical_filter", {"instrument": "DummyCamComp", "name": "d-r", "band": "R"}
361 )
362 self.butler.registry.insertDimensionData("day_obs", {"instrument": "DummyCamComp", "id": 20200101})
363 self.butler.registry.insertDimensionData(
364 "visit_system", {"instrument": "DummyCamComp", "id": 1, "name": "default"}
365 )
366 visitStart = astropy.time.Time("2020-01-01 08:00:00.123456789", scale="tai")
367 visitEnd = astropy.time.Time("2020-01-01 08:00:36.66", scale="tai")
368 self.butler.registry.insertDimensionData(
369 "visit",
370 dict(
371 instrument="DummyCamComp",
372 id=423,
373 name="fourtwentythree",
374 physical_filter="d-r",
375 timespan=Timespan(visitStart, visitEnd),
376 day_obs=20200101,
377 ),
378 )
379 self.butler.registry.insertDimensionData(
380 "visit",
381 dict(
382 instrument="DummyCamComp",
383 id=424,
384 name="fourtwentyfour",
385 physical_filter="d-r",
386 day_obs=20200101,
387 ),
388 )
390 self.ref1 = self.addDataset({"instrument": "DummyCamComp", "visit": 423})
391 self.ref2 = self.addDataset({"instrument": "DummyCamComp", "visit": 424})
393 def addDataset(
394 self, dataId: dict[str, Any], run: str | None = None, datasetType: DatasetType | None = None
395 ) -> DatasetRef:
396 """Create a new example metric and add it to the named run with the
397 given dataId.
399 Overwrites tags, so this does not try to associate the new dataset with
400 existing tags. (If/when tags are needed this can be added to the
401 arguments of this function.)
403 Parameters
404 ----------
405 dataId : `dict`
406 The dataId for the new metric.
407 run : `str`, optional
408 The name of the run to create and add a dataset to. If `None`, the
409 dataset will be added to the root butler.
410 datasetType : ``DatasetType``, optional
411 The dataset type of the added dataset. If `None`, will use the
412 default dataset type.
414 Returns
415 -------
416 datasetRef : `DatasetRef`
417 A reference to the added dataset.
418 """
419 if run:
420 self.butler.collections.register(run)
421 else:
422 run = self._DEFAULT_RUN
423 metric = self._makeExampleMetrics()
424 return self.butler.put(
425 metric, self.datasetType if datasetType is None else datasetType, dataId, run=run
426 )
429@contextmanager
430def mock_env(new_environment: dict[str, str]) -> Iterator[None]:
431 """Context manager to clear the process environment variables, replace them
432 with new values, and restore them at the end of the test.
434 Parameters
435 ----------
436 new_environment : `dict` [`str`, `str`]
437 New environment variable values.
438 """
439 with patch.dict(os.environ, new_environment, clear=True):
440 yield