Coverage for python / lsst / pipe / base / resource_usage.py: 56%
54 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("QuantumResourceUsage",)
32import datetime
34import numpy as np
35import pydantic
37from ._task_metadata import TaskMetadata
40class QuantumResourceUsage(pydantic.BaseModel):
41 """A struct holding the most frequently used resource usage metrics for
42 executed quanta.
43 """
45 memory: float = pydantic.Field(json_schema_extra={"unit": "byte"})
46 """Maximum memory usage of the process that ran this quantum, in bytes.
48 This is derived from the maximum resident set size at the end of the
49 quantum's execution, and is expected to only grow as multiple quanta are
50 executed in the same process; that makes this a lower bound on the actual
51 memory use of any particular quantum.
52 """
54 start: datetime.datetime
55 """Start time (UTC), corresponding to the beginning of the "prep" period.
56 """
58 prep_time: float = pydantic.Field(json_schema_extra={"unit": "s"})
59 """Wall-clock time in seconds spent preparing for the execution of this
60 quantum.
62 This includes checking for existing inputs, outputs, clobbering when
63 necessary, and running the `..PipelineTaskConnections.adjustQuantum` hook.
64 It does not include process startup times, import times, or butler
65 initialization overheads.
66 """
68 init_time: float = pydantic.Field(json_schema_extra={"unit": "s"})
69 """Wall-clock time in seconds spent initializing the task used to run this
70 quantum.
72 This includes the time spent reading init-inputs.
73 """
75 run_time: float = pydantic.Field(json_schema_extra={"unit": "s"})
76 """Wall-clock time in seconds spent executing this quantum.
78 This includes time spent reading inputs and writing outputs. It does not
79 include the time spent writing the special log and metadata datasets.
80 """
82 run_time_cpu: float = pydantic.Field(json_schema_extra={"unit": "s"})
83 """CPU time in seconds spent executing this quantum.
85 This includes time spent reading inputs and writing outputs, to the extent
86 that those operations actually spend CPU time at all. It does not include
87 the time spent writing the special log and metadata datasets.
88 """
90 @property
91 def total_time(self) -> float:
92 """Total wall-clock time spent in "prep", "init", and "run"."""
93 return self.prep_time + self.init_time + self.run_time
95 @property
96 def end(self) -> datetime.datetime:
97 """End time (UTC), corresponding to the end of the "run" period."""
98 return self.start + datetime.timedelta(seconds=self.total_time)
100 @classmethod
101 def from_task_metadata(cls, metadata: TaskMetadata) -> QuantumResourceUsage | None:
102 """Extract resource usage information from task metadata.
104 Parameters
105 ----------
106 metadata : `TaskMetadata`
107 Metadata written by
108 `.single_quantum_executor.SingleQuantumExecutor`.
110 Returns
111 -------
112 resource_usage : `QuantumResourceUsage` or `None`
113 Resource usage information for this quantum, or `None` if the
114 expected fields were not found.
115 """
116 try:
117 quantum_metadata = metadata["quantum"]
118 except KeyError:
119 return None
120 end = datetime.datetime.fromisoformat(quantum_metadata["endUtc"])
121 start = datetime.datetime.fromisoformat(quantum_metadata["prepUtc"])
122 try:
123 start_init = datetime.datetime.fromisoformat(quantum_metadata["initUtc"])
124 except KeyError:
125 start_init = end
126 try:
127 start_run = datetime.datetime.fromisoformat(quantum_metadata["startUtc"])
128 except KeyError:
129 start_run = end
130 try:
131 run_time_cpu = quantum_metadata["endCpuTime"] - quantum_metadata["startCpuTime"]
132 except KeyError:
133 run_time_cpu = 0.0
134 return cls(
135 memory=quantum_metadata["endMaxResidentSetSize"],
136 start=start,
137 prep_time=(start_init - start).total_seconds(),
138 init_time=(start_run - start_init).total_seconds(),
139 run_time=(end - start_run).total_seconds(),
140 run_time_cpu=run_time_cpu,
141 )
143 @staticmethod
144 def get_numpy_fields() -> dict[str, np.dtype]:
145 """Return a mapping from field name to the `numpy.dtype` for that
146 field.
147 """
148 return {
149 "memory": np.dtype(np.float32),
150 "start": np.dtype(np.float64),
151 "prep_time": np.dtype(np.float32),
152 "init_time": np.dtype(np.float32),
153 "run_time": np.dtype(np.float32),
154 "run_time_cpu": np.dtype(np.float32),
155 }
157 @staticmethod
158 def get_units() -> dict[str, str | None]:
159 """Return a mapping from field name to units.
161 Units are astropy-compatible strings.
162 """
163 return {
164 "memory": "byte",
165 "start": "s",
166 "prep_time": "s",
167 "init_time": "s",
168 "run_time": "s",
169 "run_time_cpu": "s",
170 }
172 def get_numpy_row(self) -> tuple[object, ...]:
173 """Convert this object to a `tuple` that can used to initialize a
174 NumPy structured array with the fields from `get_numpy_fields`.
175 """
176 return (
177 self.memory,
178 self.start.timestamp(),
179 self.prep_time,
180 self.init_time,
181 self.run_time,
182 self.run_time_cpu,
183 )