Coverage for python / lsst / ctrl / bps / panda / panda_service.py: 9%
239 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
1# This file is part of ctrl_bps_panda.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""Interface between generic workflow to PanDA/iDDS workflow system."""
29__all__ = ["PanDAService", "PandaBpsWmsWorkflow"]
32import json
33import logging
34import os
35import pickle
36import re
38from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
40from lsst.ctrl.bps import (
41 DEFAULT_MEM_FMT,
42 DEFAULT_MEM_UNIT,
43 BaseWmsService,
44 BaseWmsWorkflow,
45 WmsRunReport,
46 WmsStates,
47)
48from lsst.ctrl.bps.panda.constants import (
49 PANDA_DEFAULT_MAX_COPY_WORKERS,
50 PANDA_DEFAULT_MAX_REQUEST_LENGTH,
51)
52from lsst.ctrl.bps.panda.utils import (
53 add_final_idds_work,
54 add_idds_work,
55 aggregate_by_basename,
56 copy_files_for_distribution,
57 create_idds_build_workflow,
58 extract_taskname,
59 get_idds_client,
60 get_idds_result,
61 idds_call_with_check,
62)
63from lsst.resources import ResourcePath
64from lsst.utils.timer import time_this
66_LOG = logging.getLogger(__name__)
69class PanDAService(BaseWmsService):
70 """PanDA version of WMS service."""
72 def prepare(self, config, generic_workflow, out_prefix=None):
73 # Docstring inherited from BaseWmsService.prepare.
74 _LOG.debug("out_prefix = '%s'", out_prefix)
76 _LOG.info("Starting PanDA prepare stage (creating specific implementation of workflow)")
78 with time_this(
79 log=_LOG,
80 level=logging.INFO,
81 prefix=None,
82 msg="PanDA prepare stage completed",
83 mem_usage=True,
84 mem_unit=DEFAULT_MEM_UNIT,
85 mem_fmt=DEFAULT_MEM_FMT,
86 ):
87 workflow = PandaBpsWmsWorkflow.from_generic_workflow(
88 config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}"
89 )
90 workflow.write(out_prefix)
91 return workflow
93 def submit(self, workflow, **kwargs):
94 config = kwargs["config"] if "config" in kwargs else None
95 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None
97 if config and remote_build:
98 _LOG.info("remote build")
100 idds_build_workflow = create_idds_build_workflow(**kwargs)
101 idds_client = get_idds_client(self.config)
102 ret = idds_client.submit_build(idds_build_workflow, username=None, use_dataset_name=False)
103 _LOG.debug("iDDS client manager submit returned = %s", ret)
105 # Check submission success
106 status, result, error = get_idds_result(ret)
107 if status:
108 request_id = int(result)
109 else:
110 raise RuntimeError(f"Error submitting to PanDA service: {error}")
112 _LOG.info("Submitted into iDDs with request id=%s", request_id)
113 idds_build_workflow.run_id = request_id
114 return idds_build_workflow
116 else:
117 _, max_request_length = self.config.search(
118 "maxRequestLength", opt={"default": PANDA_DEFAULT_MAX_REQUEST_LENGTH}
119 )
120 _, max_copy_workers = self.config.search(
121 "maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS}
122 )
123 file_distribution_uri = self.config["fileDistributionEndPoint"]
124 lsst_temp = "LSST_RUN_TEMP_SPACE"
125 if lsst_temp in file_distribution_uri and lsst_temp not in os.environ:
126 file_distribution_uri = self.config["fileDistributionEndPointDefault"]
127 protocol_pattern = re.compile(r"^[a-zA-Z][a-zA-Z\d+\-.]*://")
128 if not protocol_pattern.match(file_distribution_uri):
129 file_distribution_uri = "file://" + file_distribution_uri
131 idds_client = get_idds_client(self.config)
132 submit_cmd = workflow.run_attrs.get("bps_iscustom", False)
133 if not submit_cmd:
134 copy_files_for_distribution(
135 workflow.files_to_pre_stage,
136 ResourcePath(file_distribution_uri, forceDirectory=True),
137 max_copy_workers,
138 )
140 idds_wf = workflow.idds_client_workflow
141 workflow_steps = idds_wf.split_workflow_to_steps(
142 request_cache=self.config["submitPath"], max_request_length=max_request_length
143 )
144 for wf_step in workflow_steps:
145 ret_step = idds_client.submit(wf_step, username=None, use_dataset_name=False)
146 status, result_step, error = get_idds_result(ret_step)
147 if status and result_step == 0:
148 msg = f"iDDS client manager successfully uploaded workflow step: {wf_step.step_name}"
149 _LOG.info(msg)
150 else:
151 msg = (
152 f"iDDS client manager failed to submit workflow step {wf_step.step_name}: "
153 f"{ret_step}"
154 )
155 raise RuntimeError(msg)
157 ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False)
158 _LOG.debug("iDDS client manager submit returned = %s", ret)
160 # Check submission success
161 status, result, error = get_idds_result(ret)
162 if status:
163 request_id = int(result)
164 else:
165 raise RuntimeError(f"Error submitting to PanDA service: {error}")
167 _LOG.info("Submitted into iDDs with request id=%s", request_id)
168 workflow.run_id = request_id
170 def restart(self, wms_workflow_id):
171 # Docstring inherited from BaseWmsService.restart.
172 idds_client = get_idds_client(self.config)
173 ret = idds_client.retry(request_id=wms_workflow_id)
174 _LOG.debug("Restart PanDA workflow returned = %s", ret)
176 status, result, error = get_idds_result(ret)
177 if status:
178 _LOG.info("Restarting PanDA workflow %s", result)
179 return wms_workflow_id, None, json.dumps(result)
181 return None, None, f"Error retry PanDA workflow: {error}"
183 def report(
184 self,
185 wms_workflow_id=None,
186 user=None,
187 hist=0,
188 pass_thru=None,
189 is_global=False,
190 return_exit_codes=False,
191 ):
192 # Docstring inherited from BaseWmsService.report.
193 message = ""
194 run_reports = []
196 if not wms_workflow_id:
197 message = "Run summary not implemented yet, use 'bps report --id <workflow_id>' instead"
198 return run_reports, message
200 idds_client = get_idds_client(self.config)
201 ret = idds_call_with_check(
202 idds_client.get_requests,
203 func_name="get workflow status",
204 request_id=wms_workflow_id,
205 with_detail=True,
206 )
208 tasks = ret[1][1]
209 if not tasks:
210 message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id"
211 return run_reports, message
213 # Create initial WmsRunReport
214 head = tasks[0]
215 wms_report = WmsRunReport(
216 wms_id=str(head["request_id"]),
217 operator=head["username"],
218 project="",
219 campaign="",
220 payload="",
221 run=head["name"],
222 state=WmsStates.UNKNOWN,
223 total_number_jobs=0,
224 job_state_counts=dict.fromkeys(WmsStates, 0),
225 job_summary={},
226 run_summary="",
227 exit_code_summary={},
228 )
230 # Define workflow status mapping
231 workflow_status = head["status"]["attributes"]["_name_"]
232 if workflow_status in ("Finished", "SubFinished"):
233 wms_report.state = WmsStates.SUCCEEDED
234 elif workflow_status in ("Failed", "Expired"):
235 wms_report.state = WmsStates.FAILED
236 elif workflow_status == "Cancelled":
237 wms_report.state = WmsStates.DELETED
238 elif workflow_status == "Suspended":
239 wms_report.state = WmsStates.HELD
240 else:
241 wms_report.state = WmsStates.RUNNING
243 # Define state mapping for job aggregation
244 # The status of a task is taken from the first item of state_map.
245 # The workflow is in status WmsStates.FAILED when:
246 # All tasks have failed.
247 # SubFinished tasks has jobs in
248 # output_processed_files: Finished
249 # output_failed_files: Failed
250 # output_missing_files: Missing
251 state_map = {
252 "Finished": [WmsStates.SUCCEEDED],
253 "SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.PRUNED],
254 "Transforming": [
255 WmsStates.RUNNING,
256 WmsStates.SUCCEEDED,
257 WmsStates.FAILED,
258 # WmsStates.READY,
259 WmsStates.UNREADY,
260 WmsStates.PRUNED,
261 ],
262 "Failed": [WmsStates.FAILED, WmsStates.PRUNED],
263 }
265 file_map = {
266 WmsStates.SUCCEEDED: "output_processed_files",
267 WmsStates.RUNNING: "output_processing_files",
268 WmsStates.FAILED: "output_failed_files",
269 # WmsStates.READY: "output_activated_files",
270 WmsStates.UNREADY: "input_new_files",
271 WmsStates.PRUNED: "output_missing_files",
272 }
274 # Sort tasks by workload_id or fallback
275 try:
276 tasks.sort(key=lambda x: x["transform_workload_id"])
277 except (KeyError, TypeError):
278 tasks.sort(key=lambda x: x["transform_id"])
280 exit_codes_all = {}
282 # --- Process each task sequentially ---
283 for task in tasks:
284 if task.get("transform_id") is None:
285 # Not created task (It happens because of an outer join
286 # between requests table and transforms table).
287 continue
289 task_name = task.get("transform_name", "")
290 tasklabel = extract_taskname(task_name)
291 status = task["transform_status"]["attributes"]["_name_"]
292 totaljobs = task.get("output_total_files", 0)
293 wms_report.total_number_jobs += totaljobs
295 # --- If task failed/subfinished, fetch exit codes ---
296 if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"):
297 transform_workload_id = task.get("transform_workload_id")
298 if transform_workload_id:
299 # When there are failed jobs, ctrl_bps check
300 # the number of exit codes
301 nfailed = task.get("output_failed_files", 0)
302 exit_codes_all[tasklabel] = [1] * nfailed
303 if return_exit_codes:
304 new_ret = idds_call_with_check(
305 idds_client.get_contents_output_ext,
306 func_name=f"get task {transform_workload_id} detail",
307 request_id=wms_workflow_id,
308 workload_id=transform_workload_id,
309 )
310 # task_info is a dictionary of len 1 that contains
311 # a list of dicts containing panda job info
312 task_info = new_ret[1][1]
313 if len(task_info) == 1:
314 _, wmsjobs = next(iter(task_info.items()))
315 exit_codes_all[tasklabel] = [
316 j["trans_exit_code"]
317 for j in wmsjobs
318 if j.get("trans_exit_code") not in (None, 0, "0")
319 ]
320 if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0:
321 _LOG.debug(
322 f"No exit codes in iDDS task info for workload {transform_workload_id}"
323 )
324 else:
325 raise RuntimeError(
326 f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}"
327 )
329 # --- Aggregate job states ---
330 taskstatus = {}
331 mapped_states = state_map.get(status, [])
332 for state in WmsStates:
333 njobs = 0
334 if state in mapped_states and state in file_map:
335 val = task.get(file_map[state])
336 if val:
337 njobs = val
338 if state == WmsStates.RUNNING:
339 njobs += task.get("output_new_files", 0) - task.get("input_new_files", 0)
340 if state != WmsStates.UNREADY:
341 wms_report.job_state_counts[state] += njobs
342 taskstatus[state] = njobs
344 # Count UNREADY
345 unready = WmsStates.UNREADY
346 taskstatus[unready] = totaljobs - sum(
347 taskstatus[state] for state in WmsStates if state != unready
348 )
349 wms_report.job_state_counts[unready] += taskstatus[unready]
351 # Store task summary
352 wms_report.job_summary[tasklabel] = taskstatus
353 summary_part = f"{tasklabel}:{totaljobs}"
354 if wms_report.run_summary:
355 summary_part = f";{summary_part}"
356 wms_report.run_summary += summary_part
358 # Store all exit codes
359 wms_report.exit_code_summary = exit_codes_all
361 (
362 wms_report.job_summary,
363 wms_report.exit_code_summary,
364 wms_report.run_summary,
365 ) = aggregate_by_basename(
366 wms_report.job_summary,
367 wms_report.exit_code_summary,
368 wms_report.run_summary,
369 )
371 run_reports.append(wms_report)
372 return run_reports, message
374 def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
375 # Docstring inherited from BaseWmsService.list_submitted_jobs.
376 if wms_id is None and user is not None:
377 raise RuntimeError(
378 "Error to get workflow status report: wms_id is required"
379 " and filtering workflows with 'user' is not supported."
380 )
382 idds_client = get_idds_client(self.config)
383 ret = idds_client.get_requests(request_id=wms_id)
384 _LOG.debug("PanDA get workflows returned = %s", ret)
386 status, result, error = get_idds_result(ret)
387 if status:
388 req_ids = [req["request_id"] for req in result]
389 return req_ids
391 raise RuntimeError(f"Error list PanDA workflow requests: {error}")
393 def cancel(self, wms_id, pass_thru=None):
394 # Docstring inherited from BaseWmsService.cancel.
395 idds_client = get_idds_client(self.config)
396 ret = idds_client.abort(request_id=wms_id)
397 _LOG.debug("Abort PanDA workflow returned = %s", ret)
399 status, result, error = get_idds_result(ret)
400 if status:
401 _LOG.info("Aborting PanDA workflow %s", result)
402 return True, json.dumps(result)
404 return False, f"Error abort PanDA workflow: {error}"
406 def ping(self, pass_thru=None):
407 # Docstring inherited from BaseWmsService.ping.
408 idds_client = get_idds_client(self.config)
409 ret = idds_client.ping()
410 _LOG.debug("Ping PanDA service returned = %s", ret)
412 status, result, error = get_idds_result(ret)
413 if status:
414 if "Status" in result and result["Status"] == "OK":
415 return 0, None
417 return -1, f"Error ping PanDA service: {result}"
419 return -1, f"Error ping PanDA service: {error}"
421 def run_submission_checks(self):
422 # Docstring inherited from BaseWmsService.run_submission_checks.
423 for key in ["PANDA_URL"]:
424 if key not in os.environ:
425 raise OSError(f"Missing environment variable {key}")
427 status, message = self.ping()
428 if status != 0:
429 raise RuntimeError(message)
431 def get_status(
432 self,
433 wms_workflow_id=None,
434 hist=0,
435 is_global=False,
436 ):
437 # Docstring inherited from BaseWmsService.get_status.
439 idds_client = get_idds_client(self.config)
440 ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=False)
441 _LOG.debug("PanDA get workflow status returned = %s", str(ret))
443 request_status = ret[0]
444 if request_status != 0:
445 state = WmsStates.UNKNOWN
446 message = f"Error getting workflow status for id {wms_workflow_id}: ret = {ret}"
447 else:
448 tasks = ret[1][1]
449 if not tasks:
450 state = WmsStates.UNKNOWN
451 message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id"
452 elif not isinstance(tasks[0], dict):
453 state = WmsStates.UNKNOWN
454 message = f"Error getting workflow status for id {wms_workflow_id}: ret = {ret}"
455 else:
456 message = ""
457 head = tasks[0]
458 workflow_status = head["status"]["attributes"]["_name_"]
459 if workflow_status in ["Finished"]:
460 state = WmsStates.SUCCEEDED
461 elif workflow_status in ["Failed", "Expired", "SubFinished"]:
462 state = WmsStates.FAILED
463 elif workflow_status in ["Cancelled"]:
464 state = WmsStates.DELETED
465 elif workflow_status in ["Suspended"]:
466 state = WmsStates.HELD
467 else:
468 state = WmsStates.RUNNING
470 return state, message
473class PandaBpsWmsWorkflow(BaseWmsWorkflow):
474 """A single Panda based workflow.
476 Parameters
477 ----------
478 name : `str`
479 Unique name for Workflow.
480 config : `lsst.ctrl.bps.BpsConfig`
481 BPS configuration that includes necessary submit/runtime information.
482 """
484 def __init__(self, name, config=None):
485 super().__init__(name, config)
486 self.files_to_pre_stage = {} # src, dest
487 self.idds_client_workflow = IDDS_client_workflow(name=name)
488 self.run_attrs = {}
490 @classmethod
491 def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_class):
492 # Docstring inherited from BaseWmsWorkflow.from_generic_workflow.
493 wms_workflow = cls(generic_workflow.name, config)
495 if generic_workflow.run_attrs:
496 wms_workflow.run_attrs.update(generic_workflow.run_attrs)
498 files, dag_sink_work, task_count = add_idds_work(
499 config, generic_workflow, wms_workflow.idds_client_workflow
500 )
501 wms_workflow.files_to_pre_stage.update(files)
503 files = add_final_idds_work(
504 config, generic_workflow, wms_workflow.idds_client_workflow, dag_sink_work, task_count + 1, 1
505 )
506 wms_workflow.files_to_pre_stage.update(files)
508 return wms_workflow
510 def write(self, out_prefix):
511 # Docstring inherited from BaseWmsWorkflow.write.
512 with open(os.path.join(out_prefix, "panda_workflow.pickle"), "wb") as fh:
513 pickle.dump(self, fh)