Coverage for python / lsst / ctrl / bps / report.py: 12%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:21 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Supporting functions for reporting on runs submitted to a WMS. 

29 

30Note: Expectations are that future reporting effort will revolve around LSST 

31oriented database tables. 

32""" 

33 

34__all__ = ["display_report", "retrieve_report"] 

35 

36import logging 

37import sys 

38from collections.abc import Callable, Sequence 

39from typing import TextIO 

40 

41from lsst.utils import doImportType 

42 

43from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport 

44from .wms_service import BaseWmsService, WmsRunReport, WmsStates 

45 

46_LOG = logging.getLogger(__name__) 

47 

48 

49def display_report( 

50 runs: list[WmsRunReport], 

51 messages: list[str], 

52 is_detailed: bool = False, 

53 is_global: bool = False, 

54 return_exit_codes: bool = False, 

55 file: TextIO = sys.stdout, 

56) -> None: 

57 """Print out summary of jobs submitted for execution. 

58 

59 Parameters 

60 ---------- 

61 runs : `list` [`str`] 

62 Runs to include in the summary. 

63 messages : `list` [`str`] 

64 Errors that happened during report and/or processing. Empty if 

65 no issues were encountered. 

66 is_detailed : `bool`, optional 

67 If set, the function prints out a detailed report including statuses 

68 of each task in the workflow grouped by task labels. By default, only 

69 a brief summary of each run is displayed. 

70 is_global : `bool`, optional 

71 If set, a global run id(s) will be used when displaying the report. 

72 By default, the report will use local run id(s). 

73 

74 Only applicable in the context of a WMS using distributed job queues 

75 (e.g., HTCondor). 

76 return_exit_codes : `bool`, optional 

77 If set, return exit codes related to jobs with a 

78 non-success status. Defaults to False, which means that only 

79 the summary state is returned. 

80 

81 Only applicable in the context of a WMS with associated 

82 handlers to return exit codes from jobs. 

83 file : TextIO 

84 File or file-like object to write the output to. 

85 """ 

86 run_brief = SummaryRunReport( 

87 [ 

88 ("X", "S"), 

89 ("STATE", "S"), 

90 ("%S", "S"), 

91 ("ID", "S"), 

92 ("OPERATOR", "S"), 

93 ("PROJECT", "S"), 

94 ("CAMPAIGN", "S"), 

95 ("PAYLOAD", "S"), 

96 ("RUN", "S"), 

97 ] 

98 ) 

99 

100 if is_detailed: 

101 fields = [(" ", "S")] + [(state.name, "i") for state in WmsStates] + [("EXPECTED", "i")] 

102 run_report = DetailedRunReport(fields) 

103 

104 for run in runs: 

105 run_brief.add(run, use_global_id=is_global) 

106 

107 run_report.add(run, use_global_id=is_global) 

108 if run_report.message: 

109 messages.append(run_report.message) 

110 

111 print(run_brief, file=file) 

112 print("\n", file=file) 

113 print(f"Path: {run.path}", file=file) 

114 print(f"Global job id: {run.global_wms_id}", file=file) 

115 if run.specific_info: 

116 print(run.specific_info, file=file) 

117 print("\n", file=file) 

118 print(run_report, file=file) 

119 

120 if return_exit_codes: 

121 fields = [ 

122 (" ", "S"), 

123 ("PAYLOAD ERROR COUNT", "i"), 

124 ("PAYLOAD ERROR CODES", "S"), 

125 ("INFRASTRUCTURE ERROR COUNT", "i"), 

126 ("INFRASTRUCTURE ERROR CODES", "S"), 

127 ] 

128 run_exits_report = ExitCodesReport(fields) 

129 run_exits_report.add(run, use_global_id=is_global) 

130 if run_exits_report.message: 

131 messages.append(run_exits_report.message) 

132 print("\n", file=file) 

133 print(run_exits_report, file=file) 

134 run_exits_report.clear() 

135 

136 run_brief.clear() 

137 run_report.clear() 

138 else: 

139 for run in runs: 

140 run_brief.add(run, use_global_id=is_global) 

141 run_brief.sort("ID") 

142 print(run_brief, file=file) 

143 

144 if messages: 

145 uniques = list(dict.fromkeys(messages)) 

146 print("\n".join(uniques), file=file) 

147 print("\n", file=file) 

148 

149 

150def retrieve_report( 

151 wms_service_fqn: str, 

152 *, 

153 run_id: str | None = None, 

154 user: str | None = None, 

155 hist: float | None = None, 

156 pass_thru: str | None = None, 

157 is_global: bool = False, 

158 return_exit_codes: bool = False, 

159 postprocessors: Sequence[Callable[[WmsRunReport], list[str]]] | None = None, 

160) -> tuple[list[WmsRunReport], list[str]]: 

161 """Retrieve summary of jobs submitted for execution. 

162 

163 Parameters 

164 ---------- 

165 wms_service_fqn : `str` 

166 Name of the WMS service class. 

167 run_id : `str`, optional 

168 A run id the report will be restricted to. 

169 user : `str`, optional 

170 A username the report will be restricted to. 

171 hist : `float`, optional 

172 Include runs from the given number of past days. 

173 pass_thru : `str`, optional 

174 A string to pass directly to the WMS service class. 

175 is_global : `bool`, optional 

176 If set, all available job queues will be queried for job information. 

177 Defaults to False which means that only a local job queue will be 

178 queried for information. 

179 

180 Only applicable in the context of a WMS using distributed job queues 

181 (e.g., HTCondor). 

182 return_exit_codes : `bool`, optional 

183 If set, return exit codes related to jobs with a 

184 non-success status. Defaults to False, which means that only 

185 the summary state is returned. 

186 

187 Only applicable in the context of a WMS with associated 

188 handlers to return exit codes from jobs. 

189 postprocessors : `collections.abc.Sequence` [callable], optional 

190 List of functions for "massaging" reports returned by the plugin. Each 

191 function must take one positional argument: 

192 

193 - ``report``: run report (`lsst.ctrl.bps.WmsRunReport`) 

194 

195 If None (default), each run report returned by the plugin (if any) 

196 will be returned as is. 

197 

198 Returns 

199 ------- 

200 reports : `list` [`WmsRunReport`] 

201 Run reports satisfying the search criteria. 

202 messages : `list` [`str`] 

203 Errors that happened during report retrieval and/or processing. 

204 Empty if no issues were encountered. 

205 

206 Raises 

207 ------ 

208 TypeError 

209 Raised if the WMS service class is not a subclass of BaseWmsService. 

210 """ 

211 messages: list[str] = [] 

212 

213 wms_service_class = doImportType(wms_service_fqn) 

214 if not issubclass(wms_service_class, BaseWmsService): 

215 raise TypeError( 

216 f"Invalid WMS service class '{wms_service_fqn}'; must be a subclass of BaseWmsService" 

217 ) 

218 wms_service = wms_service_class({}) 

219 

220 reports, message = wms_service.report( 

221 wms_workflow_id=run_id, 

222 user=user, 

223 hist=hist, 

224 pass_thru=pass_thru, 

225 is_global=is_global, 

226 return_exit_codes=return_exit_codes, 

227 ) 

228 if message: 

229 messages.append(message) 

230 

231 if postprocessors: 

232 for report in reports: 

233 for postprocessor in postprocessors: 

234 if warnings := postprocessor(report): 

235 for warning in warnings: 

236 messages.append( 

237 f"WARNING: Report may be incomplete. " 

238 f"There was an issue with report postprocessing for '{report.wms_id}': " 

239 f"{warning} (origin: {postprocessor.__name__})" 

240 ) 

241 

242 return reports, messages