Coverage for tests / wms_test_utils.py: 63%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:52 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27import dataclasses 

28import logging 

29 

30from lsst.ctrl.bps.wms_service import BaseWmsService, WmsJobReport, WmsRunReport, WmsStates 

31 

32_LOG = logging.getLogger(__name__) 

33 

34 

35TEST_REPORT = WmsRunReport( 

36 wms_id="1.0", 

37 global_wms_id="foo#1.0", 

38 path="/path/to/run", 

39 label="label", 

40 run="run", 

41 project="dev", 

42 campaign="testing", 

43 payload="test", 

44 operator="tester", 

45 run_summary="foo:1", 

46 state=WmsStates.SUCCEEDED, 

47 jobs=[WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED)], 

48 total_number_jobs=1, 

49 job_state_counts={state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates}, 

50 job_summary={ 

51 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates}, 

52 }, 

53) 

54 

55 

56class WmsServiceSuccess(BaseWmsService): 

57 """WMS service class with working ping and get_status.""" 

58 

59 def ping(self, pass_thru): 

60 _LOG.info(f"Success {pass_thru}") 

61 return 0, "" 

62 

63 def report( 

64 self, 

65 wms_workflow_id=None, 

66 user=None, 

67 hist=0, 

68 pass_thru=None, 

69 is_global=False, 

70 return_exit_codes=False, 

71 ): 

72 report = dataclasses.replace(TEST_REPORT) 

73 return [report], [] 

74 

75 def get_status( 

76 self, 

77 wms_workflow_id, 

78 hist=0, 

79 pass_thru=None, 

80 is_global=False, 

81 ): 

82 return WmsStates.SUCCEEDED, "" 

83 

84 

85class WmsServiceFailure(BaseWmsService): 

86 """WMS service class with non-functional ping and 

87 get_status for failed run. 

88 """ 

89 

90 def ping(self, pass_thru): 

91 _LOG.warning("service failure") 

92 return 64, "Couldn't contact service X" 

93 

94 def report( 

95 self, 

96 wms_workflow_id=None, 

97 user=None, 

98 hist=0, 

99 pass_thru=None, 

100 is_global=False, 

101 return_exit_codes=False, 

102 ): 

103 report = WmsRunReport() 

104 return [report], [] 

105 

106 def get_status( 

107 self, 

108 wms_workflow_id, 

109 hist=0, 

110 pass_thru=None, 

111 is_global=False, 

112 ): 

113 return WmsStates.FAILED, "Dummy error message." 

114 

115 

116class WmsServicePassThru(BaseWmsService): 

117 """WMS service class with pass through ping.""" 

118 

119 def ping(self, pass_thru): 

120 _LOG.info(pass_thru) 

121 return 0, pass_thru 

122 

123 

124class WmsServiceDefault(BaseWmsService): 

125 """WMS service class with default ping and get_status.""" 

126 

127 def ping(self, pass_thru): 

128 _LOG.info(f"DEFAULT {pass_thru}") 

129 return 0, "default" 

130 

131 def get_status( 

132 self, 

133 wms_workflow_id=None, 

134 hist=0, 

135 pass_thru=None, 

136 is_global=False, 

137 ): 

138 return WmsStates.RUNNING, "" 

139 

140 

141class WmsServiceFromCmdline(BaseWmsService): 

142 """WMS service class with its own default settings.""" 

143 

144 @property 

145 def defaults(self): 

146 return {"corge": "cmdline"} 

147 

148 @property 

149 def defaults_path(self): 

150 return "/wms/class/from/cmdline" 

151 

152 

153class WmsServiceFromConfig(BaseWmsService): 

154 """WMS service class with its own default settings.""" 

155 

156 @property 

157 def defaults(self): 

158 return {"corge": "config"} 

159 

160 @property 

161 def defaults_path(self): 

162 return "/wms/class/from/config" 

163 

164 

165class WmsServiceFromEnv(BaseWmsService): 

166 """WMS service class with its own default settings.""" 

167 

168 @property 

169 def defaults(self): 

170 return {"corge": "env"} 

171 

172 @property 

173 def defaults_path(self): 

174 return "/wms/class/from/env" 

175 

176 

177class WmsServiceFromDefaults(BaseWmsService): 

178 """WMS service class with its own default settings.""" 

179 

180 @property 

181 def defaults(self): 

182 return {"corge": "defaults"} 

183 

184 @property 

185 def defaults_path(self): 

186 return "/wms/class/from/defaults" 

187 

188 

189class WmsServiceInvalid: 

190 """WMS service class that is not a subclass of BaseWmsService."""