Coverage for python / lsst / ctrl / bps / parsl / site.py: 45%
52 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:54 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from abc import ABC, abstractmethod
29from types import ModuleType
30from typing import TYPE_CHECKING
32import parsl.config
33from parsl.addresses import address_by_hostname
34from parsl.executors.base import ParslExecutor
35from parsl.monitoring import MonitoringHub
37from lsst.ctrl.bps import BpsConfig
38from lsst.utils import doImport
40from .configuration import get_bps_config_value, get_workflow_name
41from .environment import export_environment
43if TYPE_CHECKING:
44 from .job import ParslJob
46__all__ = ("SiteConfig",)
49class SiteConfig(ABC):
50 """Base class for site configuration.
52 Subclasses need to override at least the ``get_executors`` and
53 ``select_executor`` methods.
55 Parameters
56 ----------
57 config : `lsst.ctrl.bps.BpsConfig`
58 BPS configuration.
59 resource_list : `list`, optional
60 List of parsl resource specifications to pass to the executor.
61 """
63 def __init__(self, config: BpsConfig, resource_list: list = None):
64 self.config = config
65 self.site = self.get_site_subconfig(config)
66 self.resource_list = resource_list
68 @staticmethod
69 def get_site_subconfig(config: BpsConfig) -> BpsConfig:
70 """Get BPS configuration for the site of interest.
72 We return the BPS sub-configuration for the site indicated by the
73 ``computeSite`` value, which is ``site.<computeSite>``.
75 Parameters
76 ----------
77 config : `lsst.ctrl.bps.BpsConfig`
78 BPS configuration.
80 Returns
81 -------
82 site : `lsst.ctrl.bps.BpsConfig`
83 Site sub-configuration.
84 """
85 computeSite = get_bps_config_value(config, "computeSite", str, required=True)
86 return get_bps_config_value(config, f".site.{computeSite}", BpsConfig, required=True)
88 @classmethod
89 def from_config(cls, config: BpsConfig) -> "SiteConfig":
90 """Get the site configuration nominated in the BPS config.
92 The ``computeSite`` (`str`) value in the BPS configuration is used to
93 select a site configuration. The site configuration class to use is
94 specified by the BPS configuration as ``site.<computeSite>.class``
95 (`str`), which should be the fully-qualified name of a python class
96 that inherits from `SiteConfig`.
98 Parameters
99 ----------
100 config : `lsst.ctrl.bps.BpsConfig`
101 BPS configuration.
103 Returns
104 -------
105 site_config : subclass of `SiteConfig`
106 Site configuration.
107 """
108 site = cls.get_site_subconfig(config)
109 name = get_bps_config_value(site, "class", str, required=True)
110 site_config = doImport(name)
111 if isinstance(site_config, ModuleType) or not issubclass(site_config, SiteConfig):
112 raise RuntimeError(f"Site class={name} is not a SiteConfig subclass")
113 return site_config(config)
115 @abstractmethod
116 def get_executors(self) -> list[ParslExecutor]:
117 """Get a list of executors to be used in processing.
119 Each executor should have a unique ``label``.
120 """
121 raise NotImplementedError("Subclasses must define")
123 @abstractmethod
124 def select_executor(self, job: "ParslJob") -> str:
125 """Get the ``label`` of the executor to use to execute a job.
127 Parameters
128 ----------
129 job : `lsst.ctrl.bps.parsl.ParslJob`
130 Job to be executed.
132 Returns
133 -------
134 label : `str`
135 Label of executor to use to execute ``job``.
136 """
137 raise NotImplementedError("Subclasses must define")
139 def get_address(self) -> str:
140 """Return the IP address of the machine hosting the driver/submission.
142 This address should be accessible from the workers. This should
143 generally by the return value of one of the functions in
144 ``parsl.addresses``.
146 This is used by the default implementation of ``get_monitor``, but will
147 generally be used by ``get_executors`` too.
149 This default implementation gets the address from the hostname, but
150 that will not work if the workers don't access the driver/submission
151 node by that address.
152 """
153 return address_by_hostname()
155 def get_command_prefix(self) -> str:
156 """Return command(s) to add before each job command.
158 These may be used to configure the environment for the job.
160 This default implementation respects the BPS configuration elements:
162 - ``site.<computeSite>.commandPrefix`` (`str`): command(s) to use as a
163 prefix to executing a job command on a worker.
164 - ``site.<computeSite>.environment`` (`bool`): add bash commands that
165 replicate the environment on the driver/submit machine?
166 """
167 prefix = get_bps_config_value(self.site, "commandPrefix", str, "")
168 if get_bps_config_value(self.site, "environment", bool, False):
169 prefix += "\n" + export_environment()
170 return prefix
172 def get_monitor(self) -> MonitoringHub | None:
173 """Get parsl monitor.
175 The parsl monitor provides a database that tracks the progress of the
176 workflow and the use of resources on the workers.
178 This implementation respects the BPS configuration elements:
180 - ``site.<computeSite>.monitorEnable`` (`bool`): enable monitor?
181 - ``site.<computeSite>.monitorInterval`` (`float`): time interval (sec)
182 between logging of resource usage.
183 - ``site.<computeSite>.monitorFilename`` (`str`): name of file to use
184 for the monitor sqlite database.
186 Returns
187 -------
188 monitor : `parsl.monitoring.MonitoringHub` or `None`
189 Parsl monitor, or `None` for no monitor.
190 """
191 if not get_bps_config_value(self.site, "monitorEnable", bool, False):
192 return None
193 return MonitoringHub(
194 workflow_name=get_workflow_name(self.config),
195 hub_address=self.get_address(),
196 resource_monitoring_interval=get_bps_config_value(self.site, "monitorInterval", float, 30.0),
197 logging_endpoint="sqlite:///"
198 + get_bps_config_value(self.site, "monitorFilename", str, "monitor.sqlite"),
199 )
201 def get_parsl_config(self) -> parsl.config.Config:
202 """Get Parsl configuration for this site.
204 Subclasses can overwrite this method to build a more specific Parsl
205 configuration, if required.
207 The retries are set from the ``site.<computeSite>.retries`` value
208 found in the BPS configuration file.
210 Returns
211 -------
212 config : `parsl.config.Config`
213 The configuration to be used for Parsl.
214 """
215 executors = self.get_executors()
216 monitor = self.get_monitor()
217 retries = get_bps_config_value(self.site, "retries", int, 1)
218 # Path to Parsl run directory. The default set by Parsl is
219 # 'runinfo' which is not explicit enough for end users given that
220 # we are using BPS + Parsl + Slurm to execute a workflow.
221 run_dir = get_bps_config_value(self.site, "run_dir", str, "runinfo")
222 return parsl.config.Config(
223 executors=executors,
224 monitoring=monitor,
225 retries=retries,
226 run_dir=run_dir,
227 checkpoint_mode="task_exit",
228 )