Coverage for python / lsst / ctrl / bps / wms_service.py: 74%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:03 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Base classes for working with a specific WMS."""
30__all__ = [
31 "BaseWmsService",
32 "BaseWmsWorkflow",
33 "WmsJobReport",
34 "WmsRunReport",
35 "WmsSpecificInfo",
36 "WmsStates",
37]
40import dataclasses
41import logging
42from abc import ABCMeta, abstractmethod
43from enum import Enum
44from typing import Any
46_LOG = logging.getLogger(__name__)
49class WmsStates(Enum):
50 """Run and job states."""
52 # Offset values so can use as exit codes to bps status
53 # without colliding with click exit codes (e.g., 2 for
54 # bad command line)
56 UNKNOWN = 10
57 """Can't determine state."""
59 MISFIT = 11
60 """Determined state, but doesn't fit other states."""
62 UNREADY = 12
63 """Still waiting for parents to finish."""
65 READY = 13
66 """All of its parents have finished successfully."""
68 PENDING = 14
69 """Ready to run, visible in batch queue."""
71 RUNNING = 15
72 """Currently running."""
74 DELETED = 16
75 """In the process of being deleted or already deleted."""
77 HELD = 17
78 """In a hold state."""
80 SUCCEEDED = 0
81 """Have completed with success status."""
83 FAILED = 19
84 """Have completed with non-success status."""
86 PRUNED = 20
87 """At least one of the parents failed or can't be run."""
90class WmsSpecificInfo:
91 """Class representing WMS specific information.
93 Each piece of information is split into two parts: a template and
94 a context. The template is a string that can contain literal text and/or
95 *named* replacement fields delimited by braces ``{}``. The context is
96 a mapping between the names, corresponding to the replacement fields
97 in the template, and their values.
99 To produce a human-readable representation of the information, e.g., for
100 logging purposes, it needs to be rendered first to combine these two parts.
101 On the other hand, the context alone might be sufficient if the provided
102 information is being ingested to a database.
103 """
105 def __init__(self) -> None:
106 self._context: dict[str, Any] = {}
107 self._templates: list[str] = []
109 def __bool__(self) -> bool:
110 return bool(self._templates)
112 def __str__(self) -> str:
113 lines = []
114 for template in self._templates:
115 lines.append(template.format_map(self._context))
116 return "\n".join(lines)
118 @property
119 def context(self) -> dict[str, Any]:
120 """The context that will be used to render the information.
122 Returns
123 -------
124 context : `dict` [`str`, `~typing.Any`]
125 A copy of the dictionary representing the mapping between
126 *every* template variable and its value.
128 Notes
129 -----
130 The property returns a *shallow* copy of the dictionary representing
131 the context as the intended purpose of the `WmsSpecificInfo` is to
132 pass a small number of brief messages from WMS to BPS reporting
133 subsystem. Hence, it is assumed that the dictionary will only contain
134 immutable objects (e.g. strings, numbers).
135 """
136 return self._context.copy()
138 @property
139 def templates(self) -> list[str]:
140 """The list of templates that will be used to render the information.
142 Returns
143 -------
144 templates : `list` [`str`]
145 A copy of the complete list of the message templates in order
146 in which the messages were added.
147 """
148 return self._templates.copy()
150 def add_message(self, template: str, context: dict[str, Any] | None = None, **kwargs) -> None:
151 """Add a message to the WMS information.
153 If keyword arguments are specified, the passed context is then updated
154 with those key/value pairs.
156 Parameters
157 ----------
158 template : `str`
159 A message template.
160 context : `dict` [`str`, `~typing.Any`], optional
161 A mapping between template variables and their values.
162 **kwargs
163 Additional keyword arguments.
165 Raises
166 ------
167 ValueError
168 Raised if the message can't be rendered due to errors in either
169 the template, the context, or both.
170 """
171 ctx: dict[str, Any] = {}
172 if context is not None:
173 ctx |= context
174 ctx.update(kwargs)
176 # Test that given context has all of the values needed for the given
177 # template.
178 try:
179 template.format_map(ctx)
180 except Exception as exc:
181 raise ValueError(f"Adding template '{template}' with context '{ctx}' failed") from exc
183 # Check if the given context does not change values of the already
184 # existing fields.
185 common_fields = set(self._context) & set(ctx)
186 conflicts = [field for field in common_fields if self._context[field] != ctx[field]]
187 if conflicts:
188 raise ValueError(
189 f"Adding template '{template}' with context '{ctx}' failed:"
190 f"change of value detected for field(s): {', '.join(conflicts)}"
191 )
193 self._context.update(ctx)
194 self._templates.append(template)
197@dataclasses.dataclass(slots=True)
198class WmsJobReport:
199 """WMS job information to be included in detailed report output."""
201 wms_id: str
202 """Job id assigned by the workflow management system."""
204 name: str
205 """A name assigned automatically by BPS."""
207 label: str
208 """A user-facing label for a job. Multiple jobs can have the same label."""
210 state: WmsStates
211 """Job's current execution state."""
214@dataclasses.dataclass(slots=True)
215class WmsRunReport:
216 """WMS run information to be included in detailed report output."""
218 wms_id: str | None = None
219 """Id assigned to the run by the WMS.
220 """
222 global_wms_id: str | None = None
223 """Global run identification number.
225 Only applicable in the context of a WMS using distributed job queues
226 (e.g., HTCondor).
227 """
229 path: str | None = None
230 """Path to the submit directory."""
232 label: str | None = None
233 """Run's label."""
235 run: str | None = None
236 """Run's name."""
238 project: str | None = None
239 """Name of the project run belongs to."""
241 campaign: str | None = None
242 """Name of the campaign the run belongs to."""
244 payload: str | None = None
245 """Name of the payload."""
247 operator: str | None = None
248 """Username of the operator who submitted the run."""
250 run_summary: str | None = None
251 """Job counts per label."""
253 state: WmsStates | None = None
254 """Run's execution state."""
256 jobs: list[WmsJobReport] | None = None
257 """Information about individual jobs in the run."""
259 total_number_jobs: int | None = None
260 """Total number of jobs in the run."""
262 job_state_counts: dict[WmsStates, int] | None = None
263 """Job counts per state."""
265 job_summary: dict[str, dict[WmsStates, int]] | None = None
266 """Job counts per label and per state."""
268 exit_code_summary: dict[str, list[int]] | None = None
269 """Summary of non-zero exit codes per job label available through the WMS.
271 Currently behavior for jobs that were canceled, held, etc. are plugin
272 dependent.
273 """
275 specific_info: WmsSpecificInfo | None = None
276 """Any additional WMS specific information."""
279class BaseWmsService:
280 """Interface for interactions with a specific WMS.
282 Parameters
283 ----------
284 config : `lsst.ctrl.bps.BpsConfig`
285 Configuration needed by the WMS service.
286 """
288 def __init__(self, config):
289 self.config = config
291 @property
292 def defaults(self):
293 """Service default settings (`lsst.daf.butler.Config`).
295 Notes
296 -----
297 This property is currently being used in ``BpsConfig.__init__()``.
298 As long as that's the case it cannot be changed to return
299 a `BpsConfig` instance.
300 """
301 return None
303 @property
304 def defaults_uri(self):
305 """URI to WMS default settings (`lsst.resources.ResourcePath`)."""
306 return None
308 def prepare(self, config, generic_workflow, out_prefix=None):
309 """Create submission for a generic workflow for a specific WMS.
311 Parameters
312 ----------
313 config : `lsst.ctrl.bps.BpsConfig`
314 BPS configuration.
315 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
316 Generic representation of a single workflow.
317 out_prefix : `str`
318 Prefix for all WMS output files.
320 Returns
321 -------
322 wms_workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
323 Prepared WMS Workflow to submit for execution.
324 """
325 raise NotImplementedError
327 def submit(self, workflow, **kwargs):
328 """Submit a single WMS workflow.
330 Parameters
331 ----------
332 workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
333 Prepared WMS Workflow to submit for execution.
334 **kwargs : `~typing.Any`
335 Additional modifiers to the configuration.
336 """
337 raise NotImplementedError
339 def restart(self, wms_workflow_id):
340 """Restart a workflow from the point of failure.
342 Parameters
343 ----------
344 wms_workflow_id : `str`
345 Id that can be used by WMS service to identify workflow that
346 need to be restarted.
348 Returns
349 -------
350 wms_id : `str`
351 Id of the restarted workflow. If restart failed, it will be set
352 to `None`.
353 run_name : `str`
354 Name of the restarted workflow. If restart failed, it will be set
355 to `None`.
356 message : `str`
357 A message describing any issues encountered during the restart.
358 If there were no issue, an empty string is returned.
359 """
360 raise NotImplementedError
362 def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
363 """Query WMS for list of submitted WMS workflows/jobs.
365 This should be a quick lookup function to create list of jobs for
366 other functions.
368 Parameters
369 ----------
370 wms_id : `int` or `str`, optional
371 Id or path that can be used by WMS service to look up job.
372 user : `str`, optional
373 User whose submitted jobs should be listed.
374 require_bps : `bool`, optional
375 Whether to require jobs returned in list to be bps-submitted jobs.
376 pass_thru : `str`, optional
377 Information to pass through to WMS.
378 is_global : `bool`, optional
379 If set, all available job queues will be queried for job
380 information. Defaults to False which means that only a local job
381 queue will be queried for information.
383 Only applicable in the context of a WMS using distributed job
384 queues (e.g., HTCondor). A WMS with a centralized job queue
385 (e.g. PanDA) can safely ignore it.
387 Returns
388 -------
389 job_ids : `list` [`~typing.Any`]
390 Only job ids to be used by cancel and other functions. Typically
391 this means top-level jobs (i.e., not children jobs).
392 """
393 raise NotImplementedError
395 def report(
396 self,
397 wms_workflow_id=None,
398 user=None,
399 hist=0,
400 pass_thru=None,
401 is_global=False,
402 return_exit_codes=False,
403 ):
404 """Query WMS for status of submitted WMS workflows.
406 Parameters
407 ----------
408 wms_workflow_id : `int` or `str`, optional
409 Id that can be used by WMS service to look up status.
410 user : `str`, optional
411 Limit report to submissions by this particular user.
412 hist : `float`, optional
413 Number of days to expand report to include finished WMS workflows.
414 pass_thru : `str`, optional
415 Additional arguments to pass through to the specific WMS service.
416 is_global : `bool`, optional
417 If set, all available job queues will be queried for job
418 information. Defaults to False which means that only a local job
419 queue will be queried for information.
421 Only applicable in the context of a WMS using distributed job
422 queues (e.g., HTCondor). A WMS with a centralized job queue
423 (e.g. PanDA) can safely ignore it.
424 return_exit_codes : `bool`, optional
425 If set, return exit codes related to jobs with a
426 non-success status. Defaults to False, which means that only
427 the summary state is returned.
429 Only applicable in the context of a WMS with associated
430 handlers to return exit codes from jobs.
432 Returns
433 -------
434 run_reports : `list` [`lsst.ctrl.bps.WmsRunReport`]
435 Status information for submitted WMS workflows.
436 message : `str`
437 Message to user on how to find more status information specific to
438 this particular WMS.
439 """
440 raise NotImplementedError
442 def get_status(
443 self,
444 wms_workflow_id: str,
445 hist: float = 1,
446 is_global: bool = False,
447 ) -> tuple[WmsStates, str]:
448 """Query WMS for quick status of single submitted WMS workflow.
450 Parameters
451 ----------
452 wms_workflow_id : `int` or `str`, optional
453 ID that can be used by WMS service to look up status.
454 hist : `float`, optional
455 Number of days to expand query to include finished WMS workflows.
456 Defaults to 1.
457 is_global : `bool`, optional
458 If set, all available job queues will be queried for run
459 information. Defaults to False which means that only a local run
460 queue will be queried for information.
462 Only applicable in the context of a WMS using distributed job
463 queues (e.g., HTCondor). A WMS with a centralized job queue
464 (e.g. PanDA) can safely ignore it.
466 Returns
467 -------
468 status : `lsst.ctrl.bps.WmsStates`
469 Status of single run from given information.
470 message : `str`
471 Extra message for status command to print. This could be pointers
472 to documentation or to WMS specific commands.
473 """
474 raise NotImplementedError
476 def cancel(self, wms_id, pass_thru=None):
477 """Cancel submitted workflows/jobs.
479 Parameters
480 ----------
481 wms_id : `str`
482 ID or path of job that should be canceled.
483 pass_thru : `str`, optional
484 Information to pass through to WMS.
486 Returns
487 -------
488 deleted : `bool`
489 Whether successful deletion or not. Currently, if any doubt or any
490 individual jobs not deleted, return False.
491 message : `str`
492 Any message from WMS (e.g., error details).
493 """
494 raise NotImplementedError
496 def run_submission_checks(self):
497 """Check to run at start if running WMS specific submission steps.
499 Any exception other than NotImplementedError will halt submission.
500 Submit directory may not yet exist when this is called.
501 """
502 raise NotImplementedError
504 def ping(self, pass_thru):
505 """Check whether WMS services are up, reachable, and can authenticate
506 if authentication is required.
508 The services to be checked are those needed for submit, report, cancel,
509 restart, but ping cannot guarantee whether jobs would actually run
510 successfully.
512 Parameters
513 ----------
514 pass_thru : `str`, optional
515 Information to pass through to WMS.
517 Returns
518 -------
519 status : `int`
520 0 for success, non-zero for failure.
521 message : `str`
522 Any message from WMS (e.g., error details).
523 """
524 raise NotImplementedError
527class BaseWmsWorkflow(metaclass=ABCMeta):
528 """Interface for single workflow specific to a WMS.
530 Parameters
531 ----------
532 name : `str`
533 Unique name of workflow.
534 config : `lsst.ctrl.bps.BpsConfig`
535 Generic workflow config.
536 """
538 def __init__(self, name, config):
539 self.name = name
540 self.config = config
541 self.service_class = None
542 self.run_id = None
543 self.submit_path = None
545 @classmethod
546 def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_class):
547 """Create a WMS-specific workflow from a GenericWorkflow.
549 Parameters
550 ----------
551 config : `lsst.ctrl.bps.BpsConfig`
552 Configuration values needed for generating a WMS specific workflow.
553 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
554 Generic workflow from which to create the WMS-specific one.
555 out_prefix : `str`
556 Root directory to be used for WMS workflow inputs and outputs
557 as well as internal WMS files.
558 service_class : `str`
559 Full module name of WMS service class that created this workflow.
561 Returns
562 -------
563 wms_workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
564 A WMS specific workflow.
565 """
566 raise NotImplementedError
568 @abstractmethod
569 def write(self, out_prefix):
570 """Write WMS files for this particular workflow.
572 Parameters
573 ----------
574 out_prefix : `str`
575 Root directory to be used for WMS workflow inputs and outputs
576 as well as internal WMS files.
577 """
578 raise NotImplementedError