Coverage for python / lsst / ctrl / bps / status.py: 57%

14 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:52 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Supporting functions for reporting basic status of a single run 

29submitted to a WMS. 

30""" 

31 

32__all__ = ["status"] 

33 

34import logging 

35 

36from lsst.utils import doImportType 

37from lsst.utils.timer import time_this 

38 

39from . import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT 

40from .wms_service import BaseWmsService, WmsStates 

41 

42_LOG = logging.getLogger(__name__) 

43 

44 

45def status( 

46 wms_service_class_name: str, 

47 run_id: str, 

48 hist: float = 0, 

49 is_global: bool = False, 

50) -> tuple[WmsStates, str]: 

51 """Quickly retrieve status of run submitted for execution. 

52 

53 Parameters 

54 ---------- 

55 wms_service_class_name : `str` 

56 Name of the WMS service class. 

57 run_id : `str` 

58 A run id for which to get the status. 

59 hist : `float`, optional 

60 Include runs from the given number of past days. Defaults to 0. 

61 is_global : `bool`, optional 

62 If set, all available job queues will be queried for job information. 

63 Defaults to False which means that only a local job queue will be 

64 queried for information. 

65 

66 Only applicable in the context of a WMS using distributed job queues 

67 (e.g., HTCondor). 

68 

69 Returns 

70 ------- 

71 state : `WmsStates` 

72 Status of the run. 

73 message : `str` 

74 Errors that happened during status retrieval. 

75 Empty if no issues were encountered. 

76 """ 

77 with time_this( 

78 log=_LOG, 

79 level=logging.DEBUG, 

80 prefix=None, 

81 msg="WMS service class imported.", 

82 mem_usage=False, 

83 mem_unit=DEFAULT_MEM_UNIT, 

84 mem_fmt=DEFAULT_MEM_FMT, 

85 ): 

86 wms_service_class = doImportType(wms_service_class_name) 

87 

88 wms_service: BaseWmsService = wms_service_class({}) 

89 

90 with time_this( 

91 log=_LOG, 

92 level=logging.DEBUG, 

93 prefix=None, 

94 msg=f"Got status for workflow {run_id}", 

95 mem_usage=True, 

96 mem_unit=DEFAULT_MEM_UNIT, 

97 mem_fmt=DEFAULT_MEM_FMT, 

98 ): 

99 state, message = wms_service.get_status(wms_workflow_id=run_id, hist=hist, is_global=is_global) 

100 

101 return state, message