Coverage for tests / test_gatherResourceUsage.py: 35%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:55 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import os
25from unittest import main
27import numpy as np
28import pandas as pd
30import lsst.utils.tests
31from lsst.analysis.tools.tasks import (
32 ConsolidateResourceUsageConfig,
33 ConsolidateResourceUsageTask,
34 GatherResourceUsageConfig,
35 GatherResourceUsageTask,
36)
37from lsst.daf.butler import Butler, DatasetType, DeferredDatasetHandle
38from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
39from lsst.pipe.base import TaskMetadata
41TESTDIR = os.path.abspath(os.path.dirname(__file__))
43test_metadata_json = """{"metadata": {"quantum":
44 {"scalars":{"__version__":1},
45 "arrays": {"endMaxResidentSetSize":[1234567890],
46 "prepCpuTime":[200.0],
47 "initCpuTime":[201.0],
48 "startCpuTime":[203.0],
49 "endCpuTime":[228.0],
50 "prepUtc":["2025-01-08T22:20:00+00:00"],
51 "endUtc":["2025-01-08T22:20:45+00:00"]}}}}
52 """
55class TestGatherResourceUsage(lsst.utils.tests.TestCase):
56 """Tests for the GatherResourceUsage class and methods."""
58 def setUp(self):
59 self.root = makeTestTempDir(TESTDIR)
60 Butler.makeRepo(self.root)
61 self.butler = Butler(self.root, run="test_run")
63 def tearDown(self):
64 removeTestTempDir(self.root)
66 def test_gatherResourceUsage(self):
67 """Test extraction of resource usage from a TaskMetadata object."""
68 test_dataset_type = DatasetType(
69 "test_dataset_type", dimensions=[], universe=self.butler.dimensions, storageClass="TaskMetadata"
70 )
71 self.butler.registry.registerDatasetType(test_dataset_type)
72 task_metadata = TaskMetadata.model_validate_json(test_metadata_json)
73 ref = self.butler.put(task_metadata, test_dataset_type)
74 config = GatherResourceUsageConfig(dimensions=[])
75 gather_resource_usage_task = GatherResourceUsageTask(config=config)
76 input_metadata = [
77 DeferredDatasetHandle(butler=self.butler, ref=ref, storageClass="TaskMetadata", parameters=None)
78 ]
79 ru_output = gather_resource_usage_task.run(
80 universe=self.butler.dimensions, input_metadata=input_metadata
81 ).getDict()["output_table"]
82 self.assertFloatsAlmostEqual(ru_output["memory"].values[0], 1.23456789e09, atol=1e-3, rtol=1e-4)
83 self.assertFloatsAlmostEqual(ru_output["init_time"].values[0], 2.0, atol=1e-3, rtol=1e-4)
84 self.assertFloatsAlmostEqual(ru_output["run_time"].values[0], 25.0, atol=1e-3, rtol=1e-4)
85 self.assertFloatsAlmostEqual(ru_output["wall_time"].values[0], 45.0, atol=1e-3, rtol=1e-4)
87 def test_consolidateResourceUsage(self):
88 """Test that the expected columns and values are generated by the
89 ConsolidateResourceUsageTask"""
90 nrows = 101
91 ru_table = pd.DataFrame(
92 {
93 "memory": np.arange(nrows) * 1024.0**3,
94 "init_time": np.ones(nrows),
95 "run_time": np.arange(nrows),
96 "wall_time": np.arange(nrows),
97 }
98 )
99 config = ConsolidateResourceUsageConfig()
100 consolidate_resource_usage_task = ConsolidateResourceUsageTask(config=config)
101 consolidate_ru_output = consolidate_resource_usage_task.run(testTask_resource_usage=ru_table)
102 df = consolidate_ru_output.getDict()["output_table"]
103 self.assertEqual(df["quanta"][0], nrows)
104 self.assertAlmostEqual(df["integrated_runtime_hrs"][0], nrows * (nrows - 1) / 2 / 3600.0)
105 for prefix in ("mem_GB", "runtime_s", "walltime_s"):
106 for quantile in (1, 5, 32, 50, 68, 95, 99, 100):
107 column = f"{prefix}_p{quantile:03d}"
108 self.assertAlmostEqual(df[column][0], quantile)
111if __name__ == "__main__": 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 lsst.utils.tests.init()
113 main()