Coverage for python / lsst / ctrl / bps / wms_service.py: 74%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:53 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Base classes for working with a specific WMS.""" 

29 

30__all__ = [ 

31 "BaseWmsService", 

32 "BaseWmsWorkflow", 

33 "WmsJobReport", 

34 "WmsRunReport", 

35 "WmsSpecificInfo", 

36 "WmsStates", 

37] 

38 

39 

40import dataclasses 

41import logging 

42from abc import ABCMeta, abstractmethod 

43from enum import Enum 

44from typing import Any 

45 

46_LOG = logging.getLogger(__name__) 

47 

48 

49class WmsStates(Enum): 

50 """Run and job states.""" 

51 

52 # Offset values so can use as exit codes to bps status 

53 # without colliding with click exit codes (e.g., 2 for 

54 # bad command line) 

55 

56 UNKNOWN = 10 

57 """Can't determine state.""" 

58 

59 MISFIT = 11 

60 """Determined state, but doesn't fit other states.""" 

61 

62 UNREADY = 12 

63 """Still waiting for parents to finish.""" 

64 

65 READY = 13 

66 """All of its parents have finished successfully.""" 

67 

68 PENDING = 14 

69 """Ready to run, visible in batch queue.""" 

70 

71 RUNNING = 15 

72 """Currently running.""" 

73 

74 DELETED = 16 

75 """In the process of being deleted or already deleted.""" 

76 

77 HELD = 17 

78 """In a hold state.""" 

79 

80 SUCCEEDED = 0 

81 """Have completed with success status.""" 

82 

83 FAILED = 19 

84 """Have completed with non-success status.""" 

85 

86 PRUNED = 20 

87 """At least one of the parents failed or can't be run.""" 

88 

89 

90class WmsSpecificInfo: 

91 """Class representing WMS specific information. 

92 

93 Each piece of information is split into two parts: a template and 

94 a context. The template is a string that can contain literal text and/or 

95 *named* replacement fields delimited by braces ``{}``. The context is 

96 a mapping between the names, corresponding to the replacement fields 

97 in the template, and their values. 

98 

99 To produce a human-readable representation of the information, e.g., for 

100 logging purposes, it needs to be rendered first to combine these two parts. 

101 On the other hand, the context alone might be sufficient if the provided 

102 information is being ingested to a database. 

103 """ 

104 

105 def __init__(self) -> None: 

106 self._context: dict[str, Any] = {} 

107 self._templates: list[str] = [] 

108 

109 def __bool__(self) -> bool: 

110 return bool(self._templates) 

111 

112 def __str__(self) -> str: 

113 lines = [] 

114 for template in self._templates: 

115 lines.append(template.format_map(self._context)) 

116 return "\n".join(lines) 

117 

118 @property 

119 def context(self) -> dict[str, Any]: 

120 """The context that will be used to render the information. 

121 

122 Returns 

123 ------- 

124 context : `dict` [`str`, `~typing.Any`] 

125 A copy of the dictionary representing the mapping between 

126 *every* template variable and its value. 

127 

128 Notes 

129 ----- 

130 The property returns a *shallow* copy of the dictionary representing 

131 the context as the intended purpose of the `WmsSpecificInfo` is to 

132 pass a small number of brief messages from WMS to BPS reporting 

133 subsystem. Hence, it is assumed that the dictionary will only contain 

134 immutable objects (e.g. strings, numbers). 

135 """ 

136 return self._context.copy() 

137 

138 @property 

139 def templates(self) -> list[str]: 

140 """The list of templates that will be used to render the information. 

141 

142 Returns 

143 ------- 

144 templates : `list` [`str`] 

145 A copy of the complete list of the message templates in order 

146 in which the messages were added. 

147 """ 

148 return self._templates.copy() 

149 

150 def add_message(self, template: str, context: dict[str, Any] | None = None, **kwargs) -> None: 

151 """Add a message to the WMS information. 

152 

153 If keyword arguments are specified, the passed context is then updated 

154 with those key/value pairs. 

155 

156 Parameters 

157 ---------- 

158 template : `str` 

159 A message template. 

160 context : `dict` [`str`, `~typing.Any`], optional 

161 A mapping between template variables and their values. 

162 **kwargs 

163 Additional keyword arguments. 

164 

165 Raises 

166 ------ 

167 ValueError 

168 Raised if the message can't be rendered due to errors in either 

169 the template, the context, or both. 

170 """ 

171 ctx: dict[str, Any] = {} 

172 if context is not None: 

173 ctx |= context 

174 ctx.update(kwargs) 

175 

176 # Test that given context has all of the values needed for the given 

177 # template. 

178 try: 

179 template.format_map(ctx) 

180 except Exception as exc: 

181 raise ValueError(f"Adding template '{template}' with context '{ctx}' failed") from exc 

182 

183 # Check if the given context does not change values of the already 

184 # existing fields. 

185 common_fields = set(self._context) & set(ctx) 

186 conflicts = [field for field in common_fields if self._context[field] != ctx[field]] 

187 if conflicts: 

188 raise ValueError( 

189 f"Adding template '{template}' with context '{ctx}' failed:" 

190 f"change of value detected for field(s): {', '.join(conflicts)}" 

191 ) 

192 

193 self._context.update(ctx) 

194 self._templates.append(template) 

195 

196 

197@dataclasses.dataclass(slots=True) 

198class WmsJobReport: 

199 """WMS job information to be included in detailed report output.""" 

200 

201 wms_id: str 

202 """Job id assigned by the workflow management system.""" 

203 

204 name: str 

205 """A name assigned automatically by BPS.""" 

206 

207 label: str 

208 """A user-facing label for a job. Multiple jobs can have the same label.""" 

209 

210 state: WmsStates 

211 """Job's current execution state.""" 

212 

213 

214@dataclasses.dataclass(slots=True) 

215class WmsRunReport: 

216 """WMS run information to be included in detailed report output.""" 

217 

218 wms_id: str | None = None 

219 """Id assigned to the run by the WMS. 

220 """ 

221 

222 global_wms_id: str | None = None 

223 """Global run identification number. 

224 

225 Only applicable in the context of a WMS using distributed job queues 

226 (e.g., HTCondor). 

227 """ 

228 

229 path: str | None = None 

230 """Path to the submit directory.""" 

231 

232 label: str | None = None 

233 """Run's label.""" 

234 

235 run: str | None = None 

236 """Run's name.""" 

237 

238 project: str | None = None 

239 """Name of the project run belongs to.""" 

240 

241 campaign: str | None = None 

242 """Name of the campaign the run belongs to.""" 

243 

244 payload: str | None = None 

245 """Name of the payload.""" 

246 

247 operator: str | None = None 

248 """Username of the operator who submitted the run.""" 

249 

250 run_summary: str | None = None 

251 """Job counts per label.""" 

252 

253 state: WmsStates | None = None 

254 """Run's execution state.""" 

255 

256 jobs: list[WmsJobReport] | None = None 

257 """Information about individual jobs in the run.""" 

258 

259 total_number_jobs: int | None = None 

260 """Total number of jobs in the run.""" 

261 

262 job_state_counts: dict[WmsStates, int] | None = None 

263 """Job counts per state.""" 

264 

265 job_summary: dict[str, dict[WmsStates, int]] | None = None 

266 """Job counts per label and per state.""" 

267 

268 exit_code_summary: dict[str, list[int]] | None = None 

269 """Summary of non-zero exit codes per job label available through the WMS. 

270 

271 Currently behavior for jobs that were canceled, held, etc. are plugin 

272 dependent. 

273 """ 

274 

275 specific_info: WmsSpecificInfo | None = None 

276 """Any additional WMS specific information.""" 

277 

278 

279class BaseWmsService: 

280 """Interface for interactions with a specific WMS. 

281 

282 Parameters 

283 ---------- 

284 config : `lsst.ctrl.bps.BpsConfig` 

285 Configuration needed by the WMS service. 

286 """ 

287 

288 def __init__(self, config): 

289 self.config = config 

290 

291 @property 

292 def defaults(self): 

293 """Service default settings (`lsst.daf.butler.Config`). 

294 

295 Notes 

296 ----- 

297 This property is currently being used in ``BpsConfig.__init__()``. 

298 As long as that's the case it cannot be changed to return 

299 a `BpsConfig` instance. 

300 """ 

301 return None 

302 

303 @property 

304 def defaults_uri(self): 

305 """URI to WMS default settings (`lsst.resources.ResourcePath`).""" 

306 return None 

307 

308 def prepare(self, config, generic_workflow, out_prefix=None): 

309 """Create submission for a generic workflow for a specific WMS. 

310 

311 Parameters 

312 ---------- 

313 config : `lsst.ctrl.bps.BpsConfig` 

314 BPS configuration. 

315 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

316 Generic representation of a single workflow. 

317 out_prefix : `str` 

318 Prefix for all WMS output files. 

319 

320 Returns 

321 ------- 

322 wms_workflow : `lsst.ctrl.bps.BaseWmsWorkflow` 

323 Prepared WMS Workflow to submit for execution. 

324 """ 

325 raise NotImplementedError 

326 

327 def submit(self, workflow, **kwargs): 

328 """Submit a single WMS workflow. 

329 

330 Parameters 

331 ---------- 

332 workflow : `lsst.ctrl.bps.BaseWmsWorkflow` 

333 Prepared WMS Workflow to submit for execution. 

334 **kwargs : `~typing.Any` 

335 Additional modifiers to the configuration. 

336 """ 

337 raise NotImplementedError 

338 

339 def restart(self, wms_workflow_id): 

340 """Restart a workflow from the point of failure. 

341 

342 Parameters 

343 ---------- 

344 wms_workflow_id : `str` 

345 Id that can be used by WMS service to identify workflow that 

346 need to be restarted. 

347 

348 Returns 

349 ------- 

350 wms_id : `str` 

351 Id of the restarted workflow. If restart failed, it will be set 

352 to `None`. 

353 run_name : `str` 

354 Name of the restarted workflow. If restart failed, it will be set 

355 to `None`. 

356 message : `str` 

357 A message describing any issues encountered during the restart. 

358 If there were no issue, an empty string is returned. 

359 """ 

360 raise NotImplementedError 

361 

362 def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False): 

363 """Query WMS for list of submitted WMS workflows/jobs. 

364 

365 This should be a quick lookup function to create list of jobs for 

366 other functions. 

367 

368 Parameters 

369 ---------- 

370 wms_id : `int` or `str`, optional 

371 Id or path that can be used by WMS service to look up job. 

372 user : `str`, optional 

373 User whose submitted jobs should be listed. 

374 require_bps : `bool`, optional 

375 Whether to require jobs returned in list to be bps-submitted jobs. 

376 pass_thru : `str`, optional 

377 Information to pass through to WMS. 

378 is_global : `bool`, optional 

379 If set, all available job queues will be queried for job 

380 information. Defaults to False which means that only a local job 

381 queue will be queried for information. 

382 

383 Only applicable in the context of a WMS using distributed job 

384 queues (e.g., HTCondor). A WMS with a centralized job queue 

385 (e.g. PanDA) can safely ignore it. 

386 

387 Returns 

388 ------- 

389 job_ids : `list` [`~typing.Any`] 

390 Only job ids to be used by cancel and other functions. Typically 

391 this means top-level jobs (i.e., not children jobs). 

392 """ 

393 raise NotImplementedError 

394 

395 def report( 

396 self, 

397 wms_workflow_id=None, 

398 user=None, 

399 hist=0, 

400 pass_thru=None, 

401 is_global=False, 

402 return_exit_codes=False, 

403 ): 

404 """Query WMS for status of submitted WMS workflows. 

405 

406 Parameters 

407 ---------- 

408 wms_workflow_id : `int` or `str`, optional 

409 Id that can be used by WMS service to look up status. 

410 user : `str`, optional 

411 Limit report to submissions by this particular user. 

412 hist : `float`, optional 

413 Number of days to expand report to include finished WMS workflows. 

414 pass_thru : `str`, optional 

415 Additional arguments to pass through to the specific WMS service. 

416 is_global : `bool`, optional 

417 If set, all available job queues will be queried for job 

418 information. Defaults to False which means that only a local job 

419 queue will be queried for information. 

420 

421 Only applicable in the context of a WMS using distributed job 

422 queues (e.g., HTCondor). A WMS with a centralized job queue 

423 (e.g. PanDA) can safely ignore it. 

424 return_exit_codes : `bool`, optional 

425 If set, return exit codes related to jobs with a 

426 non-success status. Defaults to False, which means that only 

427 the summary state is returned. 

428 

429 Only applicable in the context of a WMS with associated 

430 handlers to return exit codes from jobs. 

431 

432 Returns 

433 ------- 

434 run_reports : `list` [`lsst.ctrl.bps.WmsRunReport`] 

435 Status information for submitted WMS workflows. 

436 message : `str` 

437 Message to user on how to find more status information specific to 

438 this particular WMS. 

439 """ 

440 raise NotImplementedError 

441 

442 def get_status( 

443 self, 

444 wms_workflow_id: str, 

445 hist: float = 1, 

446 is_global: bool = False, 

447 ) -> tuple[WmsStates, str]: 

448 """Query WMS for quick status of single submitted WMS workflow. 

449 

450 Parameters 

451 ---------- 

452 wms_workflow_id : `int` or `str`, optional 

453 ID that can be used by WMS service to look up status. 

454 hist : `float`, optional 

455 Number of days to expand query to include finished WMS workflows. 

456 Defaults to 1. 

457 is_global : `bool`, optional 

458 If set, all available job queues will be queried for run 

459 information. Defaults to False which means that only a local run 

460 queue will be queried for information. 

461 

462 Only applicable in the context of a WMS using distributed job 

463 queues (e.g., HTCondor). A WMS with a centralized job queue 

464 (e.g. PanDA) can safely ignore it. 

465 

466 Returns 

467 ------- 

468 status : `lsst.ctrl.bps.WmsStates` 

469 Status of single run from given information. 

470 message : `str` 

471 Extra message for status command to print. This could be pointers 

472 to documentation or to WMS specific commands. 

473 """ 

474 raise NotImplementedError 

475 

476 def cancel(self, wms_id, pass_thru=None): 

477 """Cancel submitted workflows/jobs. 

478 

479 Parameters 

480 ---------- 

481 wms_id : `str` 

482 ID or path of job that should be canceled. 

483 pass_thru : `str`, optional 

484 Information to pass through to WMS. 

485 

486 Returns 

487 ------- 

488 deleted : `bool` 

489 Whether successful deletion or not. Currently, if any doubt or any 

490 individual jobs not deleted, return False. 

491 message : `str` 

492 Any message from WMS (e.g., error details). 

493 """ 

494 raise NotImplementedError 

495 

496 def run_submission_checks(self): 

497 """Check to run at start if running WMS specific submission steps. 

498 

499 Any exception other than NotImplementedError will halt submission. 

500 Submit directory may not yet exist when this is called. 

501 """ 

502 raise NotImplementedError 

503 

504 def ping(self, pass_thru): 

505 """Check whether WMS services are up, reachable, and can authenticate 

506 if authentication is required. 

507 

508 The services to be checked are those needed for submit, report, cancel, 

509 restart, but ping cannot guarantee whether jobs would actually run 

510 successfully. 

511 

512 Parameters 

513 ---------- 

514 pass_thru : `str`, optional 

515 Information to pass through to WMS. 

516 

517 Returns 

518 ------- 

519 status : `int` 

520 0 for success, non-zero for failure. 

521 message : `str` 

522 Any message from WMS (e.g., error details). 

523 """ 

524 raise NotImplementedError 

525 

526 

527class BaseWmsWorkflow(metaclass=ABCMeta): 

528 """Interface for single workflow specific to a WMS. 

529 

530 Parameters 

531 ---------- 

532 name : `str` 

533 Unique name of workflow. 

534 config : `lsst.ctrl.bps.BpsConfig` 

535 Generic workflow config. 

536 """ 

537 

538 def __init__(self, name, config): 

539 self.name = name 

540 self.config = config 

541 self.service_class = None 

542 self.run_id = None 

543 self.submit_path = None 

544 

545 @classmethod 

546 def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_class): 

547 """Create a WMS-specific workflow from a GenericWorkflow. 

548 

549 Parameters 

550 ---------- 

551 config : `lsst.ctrl.bps.BpsConfig` 

552 Configuration values needed for generating a WMS specific workflow. 

553 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

554 Generic workflow from which to create the WMS-specific one. 

555 out_prefix : `str` 

556 Root directory to be used for WMS workflow inputs and outputs 

557 as well as internal WMS files. 

558 service_class : `str` 

559 Full module name of WMS service class that created this workflow. 

560 

561 Returns 

562 ------- 

563 wms_workflow : `lsst.ctrl.bps.BaseWmsWorkflow` 

564 A WMS specific workflow. 

565 """ 

566 raise NotImplementedError 

567 

568 @abstractmethod 

569 def write(self, out_prefix): 

570 """Write WMS files for this particular workflow. 

571 

572 Parameters 

573 ---------- 

574 out_prefix : `str` 

575 Root directory to be used for WMS workflow inputs and outputs 

576 as well as internal WMS files. 

577 """ 

578 raise NotImplementedError