Coverage for python / lsst / ctrl / bps / htcondor / report_utils.py: 7%
291 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Utility functions used for reporting."""
30import logging
31import os
32import re
33from pathlib import Path
34from typing import Any
36import htcondor
38from lsst.ctrl.bps import (
39 WmsJobReport,
40 WmsRunReport,
41 WmsSpecificInfo,
42 WmsStates,
43)
45from .common_utils import _htc_status_to_wms_state
46from .lssthtc import (
47 MISSING_ID,
48 WmsNodeType,
49 condor_search,
50 htc_check_dagman_output,
51 htc_tweak_log_info,
52 pegasus_name_to_label,
53 read_dag_info,
54 read_dag_log,
55 read_dag_status,
56 read_node_status,
57 summarize_dag,
58)
60_LOG = logging.getLogger(__name__)
63def _get_status_from_id(
64 wms_workflow_id: str, hist: float, schedds: dict[str, htcondor.Schedd]
65) -> tuple[WmsStates, str]:
66 """Gather run information using workflow id.
68 Parameters
69 ----------
70 wms_workflow_id : `str`
71 Limit to specific run based on id.
72 hist : `float`
73 Limit history search to this many days.
74 schedds : `dict` [ `str`, `htcondor.Schedd` ]
75 HTCondor schedulers which to query for job information. If empty
76 dictionary, all queries will be run against the local scheduler only.
78 Returns
79 -------
80 state : `lsst.ctrl.bps.WmsStates`
81 Status for the corresponding run.
82 message : `str`
83 Message with extra error information.
84 """
85 _LOG.debug("_get_status_from_id: id=%s, hist=%s, schedds=%s", wms_workflow_id, hist, schedds)
87 message = ""
89 # Collect information about the job by querying HTCondor schedd and
90 # HTCondor history.
91 schedd_dag_info = _get_info_from_schedd(wms_workflow_id, hist, schedds)
92 if len(schedd_dag_info) == 1:
93 schedd_name = next(iter(schedd_dag_info))
94 dag_id = next(iter(schedd_dag_info[schedd_name]))
95 dag_ad = schedd_dag_info[schedd_name][dag_id]
96 state = _htc_status_to_wms_state(dag_ad)
97 else:
98 state = WmsStates.UNKNOWN
99 message = f"DAGMan job {wms_workflow_id} not found in queue or history. Check id or try path."
100 return state, message
103def _get_status_from_path(wms_path: str | os.PathLike) -> tuple[WmsStates, str]:
104 """Gather run status from a given run directory.
106 Parameters
107 ----------
108 wms_path : `str` | `os.PathLike`
109 The directory containing the submit side files (e.g., HTCondor files).
111 Returns
112 -------
113 state : `lsst.ctrl.bps.WmsStates`
114 Status for the run.
115 message : `str`
116 Message to be printed.
117 """
118 wms_path = Path(wms_path).resolve()
119 message = ""
120 try:
121 wms_workflow_id, dag_ad = read_dag_log(wms_path)
122 except FileNotFoundError:
123 wms_workflow_id = MISSING_ID
124 message = f"DAGMan log not found in {wms_path}. Check path."
126 if wms_workflow_id == MISSING_ID:
127 state = WmsStates.UNKNOWN
128 else:
129 htc_tweak_log_info(wms_path, dag_ad[wms_workflow_id])
130 state = _htc_status_to_wms_state(dag_ad[wms_workflow_id])
132 return state, message
135def _report_from_path(wms_path):
136 """Gather run information from a given run directory.
138 Parameters
139 ----------
140 wms_path : `str`
141 The directory containing the submit side files (e.g., HTCondor files).
143 Returns
144 -------
145 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`]
146 Run information for the detailed report. The key is the HTCondor id
147 and the value is a collection of report information for that run.
148 message : `str`
149 Message to be printed with the summary report.
150 """
151 wms_workflow_id, jobs, message = _get_info_from_path(wms_path)
152 if wms_workflow_id == MISSING_ID:
153 run_reports = {}
154 else:
155 run_reports = _create_detailed_report_from_jobs(wms_workflow_id, jobs)
156 return run_reports, message
159def _report_from_id(wms_workflow_id, hist, schedds=None):
160 """Gather run information using workflow id.
162 Parameters
163 ----------
164 wms_workflow_id : `str`
165 Limit to specific run based on id.
166 hist : `float`
167 Limit history search to this many days.
168 schedds : `dict` [ `str`, `htcondor.Schedd` ], optional
169 HTCondor schedulers which to query for job information. If None
170 (default), all queries will be run against the local scheduler only.
172 Returns
173 -------
174 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`]
175 Run information for the detailed report. The key is the HTCondor id
176 and the value is a collection of report information for that run.
177 message : `str`
178 Message to be printed with the summary report.
179 """
180 messages = []
182 # Collect information about the job by querying HTCondor schedd and
183 # HTCondor history.
184 schedd_dag_info = _get_info_from_schedd(wms_workflow_id, hist, schedds)
185 if len(schedd_dag_info) == 1:
186 # Extract the DAG info without altering the results of the query.
187 schedd_name = next(iter(schedd_dag_info))
188 dag_id = next(iter(schedd_dag_info[schedd_name]))
189 dag_ad = schedd_dag_info[schedd_name][dag_id]
191 # If the provided workflow id does not correspond to the one extracted
192 # from the DAGMan log file in the submit directory, rerun the query
193 # with the id found in the file.
194 #
195 # This is to cover the situation in which the user provided the old job
196 # id of a restarted run.
197 try:
198 path_dag_id, _ = read_dag_log(dag_ad["Iwd"])
199 except FileNotFoundError as exc:
200 # At the moment missing DAGMan log is pretty much a fatal error.
201 # So empty the DAG info to finish early (see the if statement
202 # below).
203 schedd_dag_info.clear()
204 messages.append(f"Cannot create the report for '{dag_id}': {exc}")
205 else:
206 if path_dag_id != dag_id:
207 schedd_dag_info = _get_info_from_schedd(path_dag_id, hist, schedds)
208 messages.append(
209 f"WARNING: Found newer workflow executions in same submit directory as id '{dag_id}'. "
210 "This normally occurs when a run is restarted. The report shown is for the most "
211 f"recent status with run id '{path_dag_id}'"
212 )
214 if len(schedd_dag_info) == 0:
215 run_reports = {}
216 elif len(schedd_dag_info) == 1:
217 _, dag_info = schedd_dag_info.popitem()
218 dag_id, dag_ad = dag_info.popitem()
220 # Create a mapping between jobs and their classads. The keys will
221 # be of format 'ClusterId.ProcId'.
222 job_info = {dag_id: dag_ad}
224 # Find jobs (nodes) belonging to that DAGMan job.
225 job_constraint = f"DAGManJobId == {int(float(dag_id))}"
226 schedd_job_info = condor_search(constraint=job_constraint, hist=hist, schedds=schedds)
227 if schedd_job_info:
228 _, node_info = schedd_job_info.popitem()
229 job_info.update(node_info)
231 # Collect additional pieces of information about jobs using HTCondor
232 # files in the submission directory.
233 _, path_jobs, message = _get_info_from_path(dag_ad["Iwd"])
234 _update_jobs(job_info, path_jobs)
235 if message:
236 messages.append(message)
237 run_reports = _create_detailed_report_from_jobs(dag_id, job_info)
238 else:
239 ids = [ad["GlobalJobId"] for dag_info in schedd_dag_info.values() for ad in dag_info.values()]
240 message = (
241 f"More than one job matches id '{wms_workflow_id}', "
242 f"their global ids are: {', '.join(ids)}. Rerun with one of the global ids"
243 )
244 messages.append(message)
245 run_reports = {}
247 message = "\n".join(messages)
248 return run_reports, message
251def _get_info_from_schedd(
252 wms_workflow_id: str, hist: float, schedds: dict[str, htcondor.Schedd]
253) -> dict[str, dict[str, dict[str, Any]]]:
254 """Gather run information from HTCondor.
256 Parameters
257 ----------
258 wms_workflow_id : `str`
259 Limit to specific run based on id.
260 hist : `float`
261 Limit history search to this many days.
262 schedds : `dict` [ `str`, `htcondor.Schedd` ]
263 HTCondor schedulers which to query for job information. If empty
264 dictionary, all queries will be run against the local scheduler only.
266 Returns
267 -------
268 schedd_dag_info : `dict` [`str`, `dict` [`str`, `dict` [`str` Any]]]
269 Information about jobs satisfying the search criteria where for each
270 Scheduler, local HTCondor job ids are mapped to their respective
271 classads.
272 """
273 _LOG.debug("_get_info_from_schedd: id=%s, hist=%s, schedds=%s", wms_workflow_id, hist, schedds)
275 dag_constraint = 'regexp("dagman$", Cmd)'
276 try:
277 cluster_id = int(float(wms_workflow_id))
278 except ValueError:
279 dag_constraint += f' && GlobalJobId == "{wms_workflow_id}"'
280 else:
281 dag_constraint += f" && ClusterId == {cluster_id}"
283 # With the current implementation of the condor_* functions the query
284 # will always return only one match per Scheduler.
285 #
286 # Even in the highly unlikely situation where HTCondor history (which
287 # condor_search queries too) is long enough to have jobs from before
288 # the cluster ids were rolled over (and as a result there is more then
289 # one job with the same cluster id) they will not show up in
290 # the results.
291 schedd_dag_info = condor_search(constraint=dag_constraint, hist=hist, schedds=schedds)
292 return schedd_dag_info
295def _get_info_from_path(wms_path: str | os.PathLike) -> tuple[str, dict[str, dict[str, Any]], str]:
296 """Gather run information from a given run directory.
298 Parameters
299 ----------
300 wms_path : `str` or `os.PathLike`
301 Directory containing HTCondor files.
303 Returns
304 -------
305 wms_workflow_id : `str`
306 The run id which is a DAGman job id.
307 jobs : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
308 Information about jobs read from files in the given directory.
309 The key is the HTCondor id and the value is a dictionary of HTCondor
310 keys and values.
311 message : `str`
312 Message to be printed with the summary report.
313 """
314 # Ensure path is absolute, in particular for folks helping
315 # debug failures that need to dig around submit files.
316 wms_path = Path(wms_path).resolve()
318 messages = []
319 try:
320 wms_workflow_id, jobs = read_dag_log(wms_path)
321 _LOG.debug("_get_info_from_path: from dag log %s = %s", wms_workflow_id, jobs)
322 _update_jobs(jobs, read_node_status(wms_path))
323 _LOG.debug("_get_info_from_path: after node status %s = %s", wms_workflow_id, jobs)
325 # Add more info for DAGman job
326 job = jobs[wms_workflow_id]
327 job.update(read_dag_status(wms_path))
329 job["total_jobs"], job["state_counts"] = _get_state_counts_from_jobs(wms_workflow_id, jobs)
330 if "bps_run" not in job:
331 _add_run_info(wms_path, job)
333 message = htc_check_dagman_output(wms_path)
334 if message:
335 messages.append(message)
336 _LOG.debug(
337 "_get_info: id = %s, total_jobs = %s", wms_workflow_id, jobs[wms_workflow_id]["total_jobs"]
338 )
340 # Add extra pieces of information which cannot be found in HTCondor
341 # generated files like 'GlobalJobId'.
342 #
343 # Do not treat absence of this file as a serious error. Neither runs
344 # submitted with earlier versions of the plugin nor the runs submitted
345 # with Pegasus plugin will have it at the moment. However, once enough
346 # time passes and Pegasus plugin will have its own report() method
347 # (instead of sneakily using HTCondor's one), the lack of that file
348 # should be treated as seriously as lack of any other file.
349 try:
350 job_info = read_dag_info(wms_path)
351 except FileNotFoundError as exc:
352 message = f"Warn: Some information may not be available: {exc}"
353 messages.append(message)
354 else:
355 schedd_name = next(iter(job_info))
356 job_ad = next(iter(job_info[schedd_name].values()))
357 job.update(job_ad)
358 except FileNotFoundError as err:
359 message = f"Could not find HTCondor files in '{wms_path}' ({err})"
360 _LOG.debug(message)
361 messages.append(message)
362 message = htc_check_dagman_output(wms_path)
363 if message:
364 messages.append(message)
365 wms_workflow_id = MISSING_ID
366 jobs = {}
368 # Add more condor_q-like info.
369 for job in jobs.values():
370 htc_tweak_log_info(wms_path, job)
372 message = "\n".join([msg for msg in messages if msg])
373 _LOG.debug("wms_workflow_id = %s, jobs = %s", wms_workflow_id, jobs.keys())
374 _LOG.debug("message = %s", message)
375 return wms_workflow_id, jobs, message
378def _create_detailed_report_from_jobs(
379 wms_workflow_id: str, jobs: dict[str, dict[str, Any]]
380) -> dict[str, WmsRunReport]:
381 """Gather run information to be used in generating summary reports.
383 Parameters
384 ----------
385 wms_workflow_id : `str`
386 The run id to create the report for.
387 jobs : `dict` [`str`, `dict` [`str`, Any]]
388 Mapping HTCondor job id to job information.
390 Returns
391 -------
392 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`]
393 Run information for the detailed report. The key is the given HTCondor
394 id and the value is a collection of report information for that run.
395 """
396 _LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id])
398 dag_ad = jobs[wms_workflow_id]
400 report = WmsRunReport(
401 wms_id=f"{dag_ad['ClusterId']}.{dag_ad['ProcId']}",
402 global_wms_id=dag_ad.get("GlobalJobId", "MISS"),
403 path=dag_ad["Iwd"],
404 label=dag_ad.get("bps_job_label", "MISS"),
405 run=dag_ad.get("bps_run", "MISS"),
406 project=dag_ad.get("bps_project", "MISS"),
407 campaign=dag_ad.get("bps_campaign", "MISS"),
408 payload=dag_ad.get("bps_payload", "MISS"),
409 operator=_get_owner(dag_ad),
410 run_summary=_get_run_summary(dag_ad),
411 state=_htc_status_to_wms_state(dag_ad),
412 total_number_jobs=0,
413 jobs=[],
414 job_state_counts=dict.fromkeys(WmsStates, 0),
415 exit_code_summary={},
416 )
418 payload_jobs = {} # keep track for later processing
419 specific_info = WmsSpecificInfo()
420 for job_id, job_ad in jobs.items():
421 if job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) in [WmsNodeType.PAYLOAD, WmsNodeType.FINAL]:
422 try:
423 name = job_ad.get("DAGNodeName", job_id)
424 wms_state = _htc_status_to_wms_state(job_ad)
425 job_report = WmsJobReport(
426 wms_id=job_id,
427 name=name,
428 label=job_ad.get("bps_job_label", pegasus_name_to_label(name)),
429 state=wms_state,
430 )
431 if job_report.label == "init":
432 job_report.label = "pipetaskInit"
433 report.job_state_counts[wms_state] += 1
434 report.jobs.append(job_report)
435 payload_jobs[job_id] = job_ad
436 except KeyError as ex:
437 _LOG.error("Job missing key '%s': %s", str(ex), job_ad)
438 raise
439 elif is_service_job(job_ad):
440 _LOG.debug(
441 "Found service job: id='%s', name='%s', label='%s', NodeStatus='%s', JobStatus='%s'",
442 job_id,
443 job_ad["DAGNodeName"],
444 job_ad.get("bps_job_label", "MISS"),
445 job_ad.get("NodeStatus", "MISS"),
446 job_ad.get("JobStatus", "MISS"),
447 )
448 _add_service_job_specific_info(job_ad, specific_info)
450 report.total_number_jobs = len(payload_jobs)
451 report.exit_code_summary = _get_exit_code_summary(payload_jobs)
452 if specific_info:
453 report.specific_info = specific_info
455 # Workflow will exit with non-zero DAG_STATUS if problem with
456 # any of the wms jobs. So change FAILED to SUCCEEDED if all
457 # payload jobs SUCCEEDED.
458 if report.total_number_jobs == report.job_state_counts[WmsStates.SUCCEEDED]:
459 report.state = WmsStates.SUCCEEDED
461 run_reports = {report.wms_id: report}
462 _LOG.debug("_create_detailed_report: run_reports = %s", run_reports)
463 return run_reports
466def _add_service_job_specific_info(job_ad: dict[str, Any], specific_info: WmsSpecificInfo) -> None:
467 """Generate report information for service job.
469 Parameters
470 ----------
471 job_ad : `dict` [`str`, `~typing.Any`]
472 Provisioning job information.
473 specific_info : `lsst.ctrl.bps.WmsSpecificInfo`
474 Where to add message.
475 """
476 status_details = ""
477 job_status = _htc_status_to_wms_state(job_ad)
479 # Service jobs in queue are deleted when DAG is done.
480 # To get accurate status, need to check other info.
481 if (
482 job_status == WmsStates.DELETED
483 and "Reason" in job_ad
484 and (
485 "Removed by DAGMan" in job_ad["Reason"]
486 or "removed because <OtherJobRemoveRequirements = DAGManJobId =?=" in job_ad["Reason"]
487 or "DAG is exiting and writing rescue file." in job_ad["Reason"]
488 )
489 ):
490 if "HoldReason" in job_ad:
491 # HoldReason exists even if released, so check.
492 if "job_released_time" in job_ad and job_ad["job_held_time"] < job_ad["job_released_time"]:
493 # If released, assume running until deleted.
494 job_status = WmsStates.SUCCEEDED
495 status_details = ""
496 else:
497 # If job held when deleted by DAGMan, still want to
498 # report hold reason
499 status_details = f"(Job was held for the following reason: {job_ad['HoldReason']})"
501 else:
502 job_status = WmsStates.SUCCEEDED
503 elif job_status == WmsStates.SUCCEEDED:
504 status_details = "(Note: Finished before workflow.)"
505 elif job_status == WmsStates.HELD:
506 status_details = f"({job_ad['HoldReason']})"
508 template = "Status of {job_name}: {status} {status_details}"
509 context = {
510 "job_name": job_ad["DAGNodeName"],
511 "status": job_status.name,
512 "status_details": status_details,
513 }
514 specific_info.add_message(template=template, context=context)
517def _summary_report(user, hist, pass_thru, schedds=None):
518 """Gather run information to be used in generating summary reports.
520 Parameters
521 ----------
522 user : `str`
523 Run lookup restricted to given user.
524 hist : `float`
525 How many previous days to search for run information.
526 pass_thru : `str`
527 Advanced users can define the HTCondor constraint to be used
528 when searching queue and history.
530 Returns
531 -------
532 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`]
533 Run information for the summary report. The keys are HTCondor ids and
534 the values are collections of report information for each run.
535 message : `str`
536 Message to be printed with the summary report.
537 """
538 # only doing summary report so only look for dagman jobs
539 if pass_thru:
540 constraint = pass_thru
541 else:
542 # Notes:
543 # * bps_isjob == 'True' isn't getting set for DAG jobs that are
544 # manually restarted.
545 # * Any job with DAGManJobID isn't a DAG job
546 constraint = 'bps_isjob == "True" && JobUniverse == 7'
547 if user:
548 constraint += f' && (Owner == "{user}" || bps_operator == "{user}")'
550 job_info = condor_search(constraint=constraint, hist=hist, schedds=schedds)
552 # Have list of DAGMan jobs, need to get run_report info.
553 run_reports = {}
554 msg = ""
555 for jobs in job_info.values():
556 for job_id, job in jobs.items():
557 total_jobs, state_counts = _get_state_counts_from_dag_job(job)
558 # If didn't get from queue information (e.g., Kerberos bug),
559 # try reading from file.
560 if total_jobs == 0:
561 try:
562 job.update(read_dag_status(job["Iwd"]))
563 total_jobs, state_counts = _get_state_counts_from_dag_job(job)
564 except StopIteration:
565 pass # don't kill report can't find htcondor files
567 if "bps_run" not in job:
568 _add_run_info(job["Iwd"], job)
569 report = WmsRunReport(
570 wms_id=job_id,
571 global_wms_id=job["GlobalJobId"],
572 path=job["Iwd"],
573 label=job.get("bps_job_label", "MISS"),
574 run=job.get("bps_run", "MISS"),
575 project=job.get("bps_project", "MISS"),
576 campaign=job.get("bps_campaign", "MISS"),
577 payload=job.get("bps_payload", "MISS"),
578 operator=_get_owner(job),
579 run_summary=_get_run_summary(job),
580 state=_htc_status_to_wms_state(job),
581 jobs=[],
582 total_number_jobs=total_jobs,
583 job_state_counts=state_counts,
584 )
585 run_reports[report.global_wms_id] = report
587 return run_reports, msg
590def _add_run_info(wms_path, job):
591 """Find BPS run information elsewhere for runs without bps attributes.
593 Parameters
594 ----------
595 wms_path : `str`
596 Path to submit files for the run.
597 job : `dict` [`str`, `~typing.Any`]
598 HTCondor dag job information.
600 Raises
601 ------
602 StopIteration
603 If cannot find file it is looking for. Permission errors are
604 caught and job's run is marked with error.
605 """
606 path = Path(wms_path) / "jobs"
607 try:
608 subfile = next(path.glob("**/*.sub"))
609 except (StopIteration, PermissionError):
610 job["bps_run"] = "Unavailable"
611 else:
612 _LOG.debug("_add_run_info: subfile = %s", subfile)
613 try:
614 with open(subfile, encoding="utf-8") as fh:
615 for line in fh:
616 if line.startswith("+bps_"):
617 m = re.match(r"\+(bps_[^\s]+)\s*=\s*(.+)$", line)
618 if m:
619 _LOG.debug("Matching line: %s", line)
620 job[m.group(1)] = m.group(2).replace('"', "")
621 else:
622 _LOG.debug("Could not parse attribute: %s", line)
623 except PermissionError:
624 job["bps_run"] = "PermissionError"
625 _LOG.debug("After adding job = %s", job)
628def _get_owner(job):
629 """Get the owner of a dag job.
631 Parameters
632 ----------
633 job : `dict` [`str`, `~typing.Any`]
634 HTCondor dag job information.
636 Returns
637 -------
638 owner : `str`
639 Owner of the dag job.
640 """
641 owner = job.get("bps_operator", None)
642 if not owner:
643 owner = job.get("Owner", None)
644 if not owner:
645 _LOG.warning("Could not get Owner from htcondor job: %s", job)
646 owner = "MISS"
647 return owner
650def _get_run_summary(job):
651 """Get the run summary for a job.
653 Parameters
654 ----------
655 job : `dict` [`str`, `~typing.Any`]
656 HTCondor dag job information.
658 Returns
659 -------
660 summary : `str`
661 Number of jobs per PipelineTask label in approximate pipeline order.
662 Format: <label>:<count>[;<label>:<count>]+
663 """
664 summary = job.get("bps_job_summary", job.get("bps_run_summary", None))
665 if not summary:
666 summary, _, _ = summarize_dag(job["Iwd"])
667 if not summary:
668 _LOG.warning("Could not get run summary for htcondor job: %s", job)
669 _LOG.debug("_get_run_summary: summary=%s", summary)
671 # Workaround sometimes using init vs pipetaskInit
672 summary = summary.replace("init:", "pipetaskInit:")
674 if "pegasus_version" in job and "pegasus" not in summary:
675 summary += ";pegasus:0"
677 return summary
680def _get_exit_code_summary(jobs):
681 """Get the exit code summary for a run.
683 Parameters
684 ----------
685 jobs : `dict` [`str`, `dict` [`str`, Any]]
686 Mapping HTCondor job id to job information.
688 Returns
689 -------
690 summary : `dict` [`str`, `list` [`int`]]
691 Jobs' exit codes per job label.
692 """
693 summary = {}
694 for job_id, job_ad in jobs.items():
695 job_label = job_ad["bps_job_label"]
696 summary.setdefault(job_label, [])
697 try:
698 exit_code = 0
699 job_status = job_ad["JobStatus"]
700 match job_status:
701 case htcondor.JobStatus.COMPLETED | htcondor.JobStatus.HELD | htcondor.JobStatus.REMOVED:
702 exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"]
703 case (
704 htcondor.JobStatus.IDLE
705 | htcondor.JobStatus.RUNNING
706 | htcondor.JobStatus.TRANSFERRING_OUTPUT
707 | htcondor.JobStatus.SUSPENDED
708 ):
709 pass
710 case _:
711 _LOG.debug("Unknown 'JobStatus' value ('%d') in classad for job '%s'", job_status, job_id)
712 if exit_code != 0:
713 summary[job_label].append(exit_code)
714 except KeyError as ex:
715 _LOG.debug("Attribute '%s' not found in the classad for job '%s'", ex, job_id)
716 return summary
719def _get_state_counts_from_jobs(
720 wms_workflow_id: str, jobs: dict[str, dict[str, Any]]
721) -> tuple[int, dict[WmsStates, int]]:
722 """Count number of jobs per WMS state.
724 The workflow job and the service jobs are excluded from the count.
726 Parameters
727 ----------
728 wms_workflow_id : `str`
729 HTCondor job id.
730 jobs : `dict [`dict` [`str`, `~typing.Any`]]
731 HTCondor dag job information.
733 Returns
734 -------
735 total_count : `int`
736 Total number of dag nodes.
737 state_counts : `dict` [`lsst.ctrl.bps.WmsStates`, `int`]
738 Keys are the different WMS states and values are counts of jobs
739 that are in that WMS state.
740 """
741 state_counts = dict.fromkeys(WmsStates, 0)
742 for job_id, job_ad in jobs.items():
743 if job_id != wms_workflow_id and job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) in [
744 WmsNodeType.PAYLOAD,
745 WmsNodeType.FINAL,
746 ]:
747 state_counts[_htc_status_to_wms_state(job_ad)] += 1
748 total_count = sum(state_counts.values())
750 return total_count, state_counts
753def _get_state_counts_from_dag_job(job):
754 """Count number of jobs per WMS state.
756 Parameters
757 ----------
758 job : `dict` [`str`, `~typing.Any`]
759 HTCondor dag job information.
761 Returns
762 -------
763 total_count : `int`
764 Total number of dag nodes.
765 state_counts : `dict` [`lsst.ctrl.bps.WmsStates`, `int`]
766 Keys are the different WMS states and values are counts of jobs
767 that are in that WMS state.
768 """
769 _LOG.debug("_get_state_counts_from_dag_job: job = %s %s", type(job), len(job))
770 state_counts = dict.fromkeys(WmsStates, 0)
771 if "DAG_NodesReady" in job:
772 state_counts = {
773 WmsStates.UNREADY: job.get("DAG_NodesUnready", 0),
774 WmsStates.READY: job.get("DAG_NodesReady", 0),
775 WmsStates.HELD: job.get("DAG_JobsHeld", 0),
776 WmsStates.SUCCEEDED: job.get("DAG_NodesDone", 0),
777 WmsStates.FAILED: job.get("DAG_NodesFailed", 0),
778 WmsStates.PRUNED: job.get("DAG_NodesFutile", 0),
779 WmsStates.MISFIT: job.get("DAG_NodesPre", 0) + job.get("DAG_NodesPost", 0),
780 }
781 total_jobs = job.get("DAG_NodesTotal")
782 _LOG.debug("_get_state_counts_from_dag_job: from DAG_* keys, total_jobs = %s", total_jobs)
783 elif "NodesFailed" in job:
784 state_counts = {
785 WmsStates.UNREADY: job.get("NodesUnready", 0),
786 WmsStates.READY: job.get("NodesReady", 0),
787 WmsStates.HELD: job.get("JobProcsHeld", 0),
788 WmsStates.SUCCEEDED: job.get("NodesDone", 0),
789 WmsStates.FAILED: job.get("NodesFailed", 0),
790 WmsStates.PRUNED: job.get("NodesFutile", 0),
791 WmsStates.MISFIT: job.get("NodesPre", 0) + job.get("NodesPost", 0),
792 }
793 try:
794 total_jobs = job.get("NodesTotal")
795 except KeyError as ex:
796 _LOG.error("Job missing %s. job = %s", str(ex), job)
797 raise
798 _LOG.debug("_get_state_counts_from_dag_job: from NODES* keys, total_jobs = %s", total_jobs)
799 else:
800 # With Kerberos job auth and Kerberos bug, if warning would be printed
801 # for every DAG.
802 _LOG.debug("Can't get job state counts %s", job["Iwd"])
803 total_jobs = 0
805 _LOG.debug("total_jobs = %s, state_counts: %s", total_jobs, state_counts)
806 return total_jobs, state_counts
809def _update_jobs(jobs1, jobs2):
810 """Update jobs1 with info in jobs2.
812 (Basically an update for nested dictionaries.)
814 Parameters
815 ----------
816 jobs1 : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
817 HTCondor job information to be updated.
818 jobs2 : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
819 Additional HTCondor job information.
820 """
821 for job_id, job_ad in jobs2.items():
822 if job_id in jobs1:
823 jobs1[job_id].update(job_ad)
824 else:
825 jobs1[job_id] = job_ad
828def is_service_job(job_ad: dict[str, Any]) -> bool:
829 """Determine if a job is a service one.
831 Parameters
832 ----------
833 job_ad : `dict` [`str`, Any]
834 Information about an HTCondor job.
836 Returns
837 -------
838 is_service_job : `bool`
839 True if the job is a service one, false otherwise.
841 Notes
842 -----
843 At the moment, HTCondor does not provide a native way to distinguish
844 between payload and service jobs in the workflow. This code depends
845 on read_node_status adding wms_node_type.
846 """
847 return job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) == WmsNodeType.SERVICE