Coverage for python / lsst / ctrl / bps / report.py: 12%
62 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:49 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Supporting functions for reporting on runs submitted to a WMS.
30Note: Expectations are that future reporting effort will revolve around LSST
31oriented database tables.
32"""
34__all__ = ["display_report", "retrieve_report"]
36import logging
37import sys
38from collections.abc import Callable, Sequence
39from typing import TextIO
41from lsst.utils import doImportType
43from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport
44from .wms_service import BaseWmsService, WmsRunReport, WmsStates
46_LOG = logging.getLogger(__name__)
49def display_report(
50 runs: list[WmsRunReport],
51 messages: list[str],
52 is_detailed: bool = False,
53 is_global: bool = False,
54 return_exit_codes: bool = False,
55 file: TextIO = sys.stdout,
56) -> None:
57 """Print out summary of jobs submitted for execution.
59 Parameters
60 ----------
61 runs : `list` [`str`]
62 Runs to include in the summary.
63 messages : `list` [`str`]
64 Errors that happened during report and/or processing. Empty if
65 no issues were encountered.
66 is_detailed : `bool`, optional
67 If set, the function prints out a detailed report including statuses
68 of each task in the workflow grouped by task labels. By default, only
69 a brief summary of each run is displayed.
70 is_global : `bool`, optional
71 If set, a global run id(s) will be used when displaying the report.
72 By default, the report will use local run id(s).
74 Only applicable in the context of a WMS using distributed job queues
75 (e.g., HTCondor).
76 return_exit_codes : `bool`, optional
77 If set, return exit codes related to jobs with a
78 non-success status. Defaults to False, which means that only
79 the summary state is returned.
81 Only applicable in the context of a WMS with associated
82 handlers to return exit codes from jobs.
83 file : TextIO
84 File or file-like object to write the output to.
85 """
86 run_brief = SummaryRunReport(
87 [
88 ("X", "S"),
89 ("STATE", "S"),
90 ("%S", "S"),
91 ("ID", "S"),
92 ("OPERATOR", "S"),
93 ("PROJECT", "S"),
94 ("CAMPAIGN", "S"),
95 ("PAYLOAD", "S"),
96 ("RUN", "S"),
97 ]
98 )
100 if is_detailed:
101 fields = [(" ", "S")] + [(state.name, "i") for state in WmsStates] + [("EXPECTED", "i")]
102 run_report = DetailedRunReport(fields)
104 for run in runs:
105 run_brief.add(run, use_global_id=is_global)
107 run_report.add(run, use_global_id=is_global)
108 if run_report.message:
109 messages.append(run_report.message)
111 print(run_brief, file=file)
112 print("\n", file=file)
113 print(f"Path: {run.path}", file=file)
114 print(f"Global job id: {run.global_wms_id}", file=file)
115 if run.specific_info:
116 print(run.specific_info, file=file)
117 print("\n", file=file)
118 print(run_report, file=file)
120 if return_exit_codes:
121 fields = [
122 (" ", "S"),
123 ("PAYLOAD ERROR COUNT", "i"),
124 ("PAYLOAD ERROR CODES", "S"),
125 ("INFRASTRUCTURE ERROR COUNT", "i"),
126 ("INFRASTRUCTURE ERROR CODES", "S"),
127 ]
128 run_exits_report = ExitCodesReport(fields)
129 run_exits_report.add(run, use_global_id=is_global)
130 if run_exits_report.message:
131 messages.append(run_exits_report.message)
132 print("\n", file=file)
133 print(run_exits_report, file=file)
134 run_exits_report.clear()
136 run_brief.clear()
137 run_report.clear()
138 else:
139 for run in runs:
140 run_brief.add(run, use_global_id=is_global)
141 run_brief.sort("ID")
142 print(run_brief, file=file)
144 if messages:
145 uniques = list(dict.fromkeys(messages))
146 print("\n".join(uniques), file=file)
147 print("\n", file=file)
150def retrieve_report(
151 wms_service_fqn: str,
152 *,
153 run_id: str | None = None,
154 user: str | None = None,
155 hist: float | None = None,
156 pass_thru: str | None = None,
157 is_global: bool = False,
158 return_exit_codes: bool = False,
159 postprocessors: Sequence[Callable[[WmsRunReport], list[str]]] | None = None,
160) -> tuple[list[WmsRunReport], list[str]]:
161 """Retrieve summary of jobs submitted for execution.
163 Parameters
164 ----------
165 wms_service_fqn : `str`
166 Name of the WMS service class.
167 run_id : `str`, optional
168 A run id the report will be restricted to.
169 user : `str`, optional
170 A username the report will be restricted to.
171 hist : `float`, optional
172 Include runs from the given number of past days.
173 pass_thru : `str`, optional
174 A string to pass directly to the WMS service class.
175 is_global : `bool`, optional
176 If set, all available job queues will be queried for job information.
177 Defaults to False which means that only a local job queue will be
178 queried for information.
180 Only applicable in the context of a WMS using distributed job queues
181 (e.g., HTCondor).
182 return_exit_codes : `bool`, optional
183 If set, return exit codes related to jobs with a
184 non-success status. Defaults to False, which means that only
185 the summary state is returned.
187 Only applicable in the context of a WMS with associated
188 handlers to return exit codes from jobs.
189 postprocessors : `collections.abc.Sequence` [callable], optional
190 List of functions for "massaging" reports returned by the plugin. Each
191 function must take one positional argument:
193 - ``report``: run report (`lsst.ctrl.bps.WmsRunReport`)
195 If None (default), each run report returned by the plugin (if any)
196 will be returned as is.
198 Returns
199 -------
200 reports : `list` [`WmsRunReport`]
201 Run reports satisfying the search criteria.
202 messages : `list` [`str`]
203 Errors that happened during report retrieval and/or processing.
204 Empty if no issues were encountered.
206 Raises
207 ------
208 TypeError
209 Raised if the WMS service class is not a subclass of BaseWmsService.
210 """
211 messages: list[str] = []
213 wms_service_class = doImportType(wms_service_fqn)
214 if not issubclass(wms_service_class, BaseWmsService):
215 raise TypeError(
216 f"Invalid WMS service class '{wms_service_fqn}'; must be a subclass of BaseWmsService"
217 )
218 wms_service = wms_service_class({})
220 reports, message = wms_service.report(
221 wms_workflow_id=run_id,
222 user=user,
223 hist=hist,
224 pass_thru=pass_thru,
225 is_global=is_global,
226 return_exit_codes=return_exit_codes,
227 )
228 if message:
229 messages.append(message)
231 if postprocessors:
232 for report in reports:
233 for postprocessor in postprocessors:
234 if warnings := postprocessor(report):
235 for warning in warnings:
236 messages.append(
237 f"WARNING: Report may be incomplete. "
238 f"There was an issue with report postprocessing for '{report.wms_id}': "
239 f"{warning} (origin: {postprocessor.__name__})"
240 )
242 return reports, messages