Coverage for python / lsst / ctrl / bps / htcondor / lssthtc.py: 11%
924 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Placeholder HTCondor DAGMan API.
30There is new work on a python DAGMan API from HTCondor. However, at this
31time, it tries to make things easier by assuming DAG is easily broken into
32levels where there are 1-1 or all-to-all relationships to nodes in next
33level. LSST workflows are more complicated.
34"""
36__all__ = [
37 "MISSING_ID",
38 "DagStatus",
39 "HTCDag",
40 "HTCJob",
41 "NodeStatus",
42 "RestrictedDict",
43 "WmsNodeType",
44 "condor_history",
45 "condor_q",
46 "condor_search",
47 "condor_status",
48 "htc_backup_files",
49 "htc_check_dagman_output",
50 "htc_create_submit_from_cmd",
51 "htc_create_submit_from_dag",
52 "htc_create_submit_from_file",
53 "htc_escape",
54 "htc_query_history",
55 "htc_query_present",
56 "htc_submit_dag",
57 "htc_tweak_log_info",
58 "htc_version",
59 "htc_write_attribs",
60 "htc_write_condor_file",
61 "pegasus_name_to_label",
62 "read_dag_info",
63 "read_dag_log",
64 "read_dag_nodes_log",
65 "read_dag_status",
66 "read_node_status",
67 "summarize_dag",
68 "update_job_info",
69 "write_dag_info",
70]
73import itertools
74import json
75import logging
76import os
77import pprint
78import re
79import subprocess
80from collections import Counter, defaultdict
81from collections.abc import MutableMapping
82from datetime import datetime, timedelta
83from enum import IntEnum, auto
84from pathlib import Path
85from typing import Any, TextIO
87import classad
88import htcondor
89import networkx
90from deprecated.sphinx import deprecated
91from packaging import version
93from .handlers import HTC_JOB_AD_HANDLERS
95_LOG = logging.getLogger(__name__)
97MISSING_ID = "-99999"
100class DagStatus(IntEnum):
101 """HTCondor DAGMan's statuses for a DAG."""
103 OK = 0
104 ERROR = 1 # an error condition different than those listed here
105 FAILED = 2 # one or more nodes in the DAG have failed
106 ABORTED = 3 # the DAG has been aborted by an ABORT-DAG-ON specification
107 REMOVED = 4 # the DAG has been removed by condor_rm
108 CYCLE = 5 # a cycle was found in the DAG
109 SUSPENDED = 6 # the DAG has been suspended (see section 2.10.8)
112@deprecated(
113 reason="The JobStatus is internally replaced by htcondor.JobStatus. "
114 "External reporting code should be using ctrl_bps.WmsStates. "
115 "This class will be removed after v30.",
116 version="v30.0",
117 category=FutureWarning,
118)
119class JobStatus(IntEnum):
120 """HTCondor's statuses for jobs."""
122 UNEXPANDED = 0 # Unexpanded
123 IDLE = 1 # Idle
124 RUNNING = 2 # Running
125 REMOVED = 3 # Removed
126 COMPLETED = 4 # Completed
127 HELD = 5 # Held
128 TRANSFERRING_OUTPUT = 6 # Transferring_Output
129 SUSPENDED = 7 # Suspended
132class NodeStatus(IntEnum):
133 """HTCondor's statuses for DAGman nodes."""
135 # (STATUS_NOT_READY): At least one parent has not yet finished or the node
136 # is a FINAL node.
137 NOT_READY = 0
139 # (STATUS_READY): All parents have finished, but the node is not yet
140 # running.
141 READY = 1
143 # (STATUS_PRERUN): The node’s PRE script is running.
144 PRERUN = 2
146 # (STATUS_SUBMITTED): The node’s HTCondor job(s) are in the queue.
147 # StatusDetails = "not_idle" -> running.
148 # JobProcsHeld = 1-> hold.
149 # JobProcsQueued = 1 -> idle.
150 SUBMITTED = 3
152 # (STATUS_POSTRUN): The node’s POST script is running.
153 POSTRUN = 4
155 # (STATUS_DONE): The node has completed successfully.
156 DONE = 5
158 # (STATUS_ERROR): The node has failed. StatusDetails has info (e.g.,
159 # ULOG_JOB_ABORTED for deleted job).
160 ERROR = 6
162 # (STATUS_FUTILE): The node will never run because ancestor node failed.
163 FUTILE = 7
166class WmsNodeType(IntEnum):
167 """HTCondor plugin node types to help with payload reporting."""
169 UNKNOWN = auto()
170 """Dummy value when missing."""
172 PAYLOAD = auto()
173 """Payload job."""
175 FINAL = auto()
176 """Final job."""
178 SERVICE = auto()
179 """Service job."""
181 NOOP = auto()
182 """NOOP job used for ordering jobs."""
184 SUBDAG = auto()
185 """SUBDAG job used for ordering jobs."""
187 SUBDAG_CHECK = auto()
188 """Job used to correctly prune jobs after a subdag."""
191HTC_QUOTE_KEYS = {"environment"}
192HTC_VALID_JOB_KEYS = {
193 "universe",
194 "executable",
195 "arguments",
196 "environment",
197 "log",
198 "error",
199 "output",
200 "should_transfer_files",
201 "when_to_transfer_output",
202 "getenv",
203 "notification",
204 "notify_user",
205 "concurrency_limit",
206 "transfer_executable",
207 "transfer_input_files",
208 "transfer_output_files",
209 "transfer_output_remaps",
210 "request_cpus",
211 "request_memory",
212 "request_disk",
213 "priority",
214 "category",
215 "requirements",
216 "on_exit_hold",
217 "on_exit_hold_reason",
218 "on_exit_hold_subcode",
219 "max_retries",
220 "retry_until",
221 "periodic_release",
222 "periodic_remove",
223 "accounting_group",
224 "accounting_group_user",
225}
226HTC_VALID_JOB_DAG_KEYS = {
227 "dir",
228 "noop",
229 "done",
230 "vars",
231 "pre",
232 "post",
233 "retry",
234 "retry_unless_exit",
235 "abort_dag_on",
236 "abort_exit",
237 "priority",
238}
239HTC_VERSION = version.parse(htcondor.__version__)
242class RestrictedDict(MutableMapping):
243 """A dictionary that only allows certain keys.
245 Parameters
246 ----------
247 valid_keys : `~collections.abc.Container`
248 Strings that are valid keys.
249 init_data : `dict` or `RestrictedDict`, optional
250 Initial data.
252 Raises
253 ------
254 KeyError
255 If invalid key(s) in init_data.
256 """
258 def __init__(self, valid_keys, init_data=()):
259 self.valid_keys = valid_keys
260 self.data = {}
261 self.update(init_data)
263 def __getitem__(self, key):
264 """Return value for given key if exists.
266 Parameters
267 ----------
268 key : `str`
269 Identifier for value to return.
271 Returns
272 -------
273 value : `~typing.Any`
274 Value associated with given key.
276 Raises
277 ------
278 KeyError
279 If key doesn't exist.
280 """
281 return self.data[key]
283 def __delitem__(self, key):
284 """Delete value for given key if exists.
286 Parameters
287 ----------
288 key : `str`
289 Identifier for value to delete.
291 Raises
292 ------
293 KeyError
294 If key doesn't exist.
295 """
296 del self.data[key]
298 def __setitem__(self, key, value):
299 """Store key,value in internal dict only if key is valid.
301 Parameters
302 ----------
303 key : `str`
304 Identifier to associate with given value.
305 value : `~typing.Any`
306 Value to store.
308 Raises
309 ------
310 KeyError
311 If key is invalid.
312 """
313 if key not in self.valid_keys:
314 raise KeyError(f"Invalid key {key}")
315 self.data[key] = value
317 def __iter__(self):
318 return self.data.__iter__()
320 def __len__(self):
321 return len(self.data)
323 def __str__(self):
324 return str(self.data)
327def htc_backup_files(
328 wms_path: str | os.PathLike, subdir: str | os.PathLike | None = None, limit: int = 100
329) -> Path | None:
330 """Backup select HTCondor files in the submit directory.
332 Files will be saved in separate subdirectories which will be created in
333 the submit directory where the files are located. These subdirectories
334 will be consecutive, zero-padded integers. Their values will correspond to
335 the number of HTCondor rescue DAGs in the submit directory.
337 Hence, with the default settings, copies after the initial failed run will
338 be placed in '001' subdirectory, '002' after the first restart, and so on
339 until the limit of backups is reached. If there's no rescue DAG yet, files
340 will be copied to '000' subdirectory.
342 Parameters
343 ----------
344 wms_path : `str` or `os.PathLike`
345 Path to the submit directory either absolute or relative.
346 subdir : `str` or `os.PathLike`, optional
347 A path, relative to the submit directory, where all subdirectories with
348 backup files will be kept. Defaults to None which means that the backup
349 subdirectories will be placed directly in the submit directory.
350 limit : `int`, optional
351 Maximal number of backups. If the number of backups reaches the limit,
352 the last backup files will be overwritten. The default value is 100
353 to match the default value of HTCondor's DAGMAN_MAX_RESCUE_NUM in
354 version 8.8+.
356 Returns
357 -------
358 last_rescue_file : `pathlib.Path` or None
359 Path to the latest rescue file or None if doesn't exist.
361 Raises
362 ------
363 FileNotFoundError
364 If the submit directory or the file that needs to be backed up does not
365 exist.
366 OSError
367 If the submit directory cannot be accessed or backing up a file failed
368 either due to permission or filesystem related issues.
370 Notes
371 -----
372 This is not a generic function for making backups. It is intended to be
373 used once, just before a restart, to make snapshots of files which will be
374 overwritten by HTCondor after during the next run.
375 """
376 width = len(str(limit))
378 path = Path(wms_path).resolve()
379 if not path.is_dir():
380 raise FileNotFoundError(f"Directory {path} not found")
382 # Initialize the backup counter.
383 rescue_dags = list(path.glob("*.rescue[0-9][0-9][0-9]"))
384 counter = min(len(rescue_dags), limit)
386 # Create the backup directory and move select files there.
387 dest = path
388 if subdir:
389 # PurePath.is_relative_to() is not available before Python 3.9. Hence
390 # we need to check is 'subdir' is in the submit directory in some other
391 # way if it is an absolute path.
392 subdir = Path(subdir)
393 if subdir.is_absolute():
394 subdir = subdir.resolve() # Since resolve was run on path, must run it here
395 if dest not in subdir.parents:
396 _LOG.warning(
397 "Invalid backup location: '%s' not in the submit directory, will use '%s' instead.",
398 subdir,
399 wms_path,
400 )
401 else:
402 dest /= subdir
403 else:
404 dest /= subdir
405 dest /= f"{counter:0{width}}"
406 _LOG.debug("dest = %s", dest)
407 try:
408 dest.mkdir(parents=True, exist_ok=False if counter < limit else True)
409 except FileExistsError:
410 _LOG.warning("Refusing to do backups: target directory '%s' already exists", dest)
411 else:
412 htc_backup_files_single_path(path, dest)
414 # also back up any subdag info
415 for subdag_dir in path.glob("subdags/*"):
416 subdag_dest = dest / subdag_dir.relative_to(path)
417 subdag_dest.mkdir(parents=True, exist_ok=False)
418 htc_backup_files_single_path(subdag_dir, subdag_dest)
420 last_rescue_file = rescue_dags[-1] if rescue_dags else None
421 _LOG.debug("last_rescue_file = %s", last_rescue_file)
422 return last_rescue_file
425def htc_backup_files_single_path(src: str | os.PathLike, dest: str | os.PathLike) -> None:
426 """Move particular htc files to a different directory for later debugging.
428 Parameters
429 ----------
430 src : `str` or `os.PathLike`
431 Directory from which to backup particular files.
432 dest : `str` or `os.PathLike`
433 Directory to which particular files are moved.
435 Raises
436 ------
437 RuntimeError
438 If given dest directory matches given src directory.
439 OSError
440 If problems moving file.
441 FileNotFoundError
442 Item matching pattern in src directory isn't a file.
443 """
444 src = Path(src)
445 dest = Path(dest)
446 if dest.samefile(src):
447 raise RuntimeError(f"Destination directory is same as the source directory ({src})")
449 for patt in [
450 "*.info.*",
451 "*.dag.metrics",
452 "*.dag.nodes.log",
453 "*.node_status",
454 "wms_*.dag.post.out",
455 "wms_*.status.txt",
456 ]:
457 for source in src.glob(patt):
458 if source.is_file():
459 target = dest / source.relative_to(src)
460 try:
461 source.rename(target)
462 except OSError as exc:
463 raise type(exc)(f"Backing up '{source}' failed: {exc.strerror}") from None
464 else:
465 raise FileNotFoundError(f"Backing up '{source}' failed: not a file")
468def htc_escape(value):
469 """Escape characters in given value based upon HTCondor syntax.
471 Parameters
472 ----------
473 value : `~typing.Any`
474 Value that needs to have characters escaped if string.
476 Returns
477 -------
478 new_value : `~typing.Any`
479 Given value with characters escaped appropriate for HTCondor if string.
480 """
481 if isinstance(value, str):
482 newval = value.replace('"', '""').replace("'", "''").replace(""", '"')
483 else:
484 newval = value
486 return newval
489def htc_write_attribs(stream, attrs):
490 """Write job attributes in HTCondor format to writeable stream.
492 Parameters
493 ----------
494 stream : `~typing.TextIO`
495 Output text stream (typically an open file).
496 attrs : `dict`
497 HTCondor job attributes (dictionary of attribute key, value).
498 """
499 for key, value in attrs.items():
500 # Make sure strings are syntactically correct for HTCondor.
501 if isinstance(value, str):
502 pval = f'"{htc_escape(value)}"'
503 else:
504 pval = value
506 print(f"+{key} = {pval}", file=stream)
509def htc_write_condor_file(
510 filename: str | os.PathLike, job_name: str, job: RestrictedDict, job_attrs: dict[str, Any]
511) -> None:
512 """Write an HTCondor submit file.
514 Parameters
515 ----------
516 filename : `str` or `os.PathLike`
517 Filename for the HTCondor submit file.
518 job_name : `str`
519 Job name to use in submit file.
520 job : `RestrictedDict`
521 Submit script information.
522 job_attrs : `dict`
523 Job attributes.
524 """
525 os.makedirs(os.path.dirname(filename), exist_ok=True)
526 with open(filename, "w") as fh:
527 for key, value in job.items():
528 if value is not None:
529 if key in HTC_QUOTE_KEYS: # Assumes internal quotes are already escaped correctly
530 print(f'{key}="{value}"', file=fh)
531 else:
532 print(f"{key}={value}", file=fh)
533 for key in ["output", "error", "log"]:
534 if key not in job:
535 filename = f"{job_name}.$(Cluster).{'out' if key != 'log' else key}"
536 print(f"{key}={filename}", file=fh)
538 if job_attrs is not None:
539 htc_write_attribs(fh, job_attrs)
540 print("queue", file=fh)
543# To avoid doing the version check during every function call select
544# appropriate conversion function at the import time.
545#
546# Make sure that *each* version specific variant of the conversion function(s)
547# has the same signature after applying any changes!
548if HTC_VERSION < version.parse("8.9.8"): 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was never true
550 def htc_tune_schedd_args(**kwargs):
551 """Ensure that arguments for Schedd are version appropriate.
553 The old arguments: 'requirements' and 'attr_list' of
554 'Schedd.history()', 'Schedd.query()', and 'Schedd.xquery()' were
555 deprecated in favor of 'constraint' and 'projection', respectively,
556 starting from version 8.9.8. The function will convert "new" keyword
557 arguments to "old" ones.
559 Parameters
560 ----------
561 **kwargs
562 Any keyword arguments that Schedd.history(), Schedd.query(), and
563 Schedd.xquery() accepts.
565 Returns
566 -------
567 kwargs : `dict` [`str`, `~typing.Any`]
568 Keywords arguments that are guaranteed to work with the Python
569 HTCondor API.
571 Notes
572 -----
573 Function doesn't validate provided keyword arguments beyond converting
574 selected arguments to their version specific form. For example,
575 it won't remove keywords that are not supported by the methods
576 mentioned earlier.
577 """
578 translation_table = {
579 "constraint": "requirements",
580 "projection": "attr_list",
581 }
582 for new, old in translation_table.items():
583 try:
584 kwargs[old] = kwargs.pop(new)
585 except KeyError:
586 pass
587 return kwargs
589else:
591 def htc_tune_schedd_args(**kwargs):
592 """Ensure that arguments for Schedd are version appropriate.
594 This is the fallback function if no version specific alteration are
595 necessary. Effectively, a no-op.
597 Parameters
598 ----------
599 **kwargs
600 Any keyword arguments that Schedd.history(), Schedd.query(), and
601 Schedd.xquery() accepts.
603 Returns
604 -------
605 kwargs : `dict` [`str`, `~typing.Any`]
606 Keywords arguments that were passed to the function.
607 """
608 return kwargs
611def htc_query_history(schedds, **kwargs):
612 """Fetch history records from the condor_schedd daemon.
614 Parameters
615 ----------
616 schedds : `htcondor.Schedd`
617 HTCondor schedulers which to query for job information.
618 **kwargs
619 Any keyword arguments that Schedd.history() accepts.
621 Yields
622 ------
623 schedd_name : `str`
624 Name of the HTCondor scheduler managing the job queue.
625 job_ad : `dict` [`str`, `~typing.Any`]
626 A dictionary representing HTCondor ClassAd describing a job. It maps
627 job attributes names to values of the ClassAd expressions they
628 represent.
629 """
630 # If not set, provide defaults for positional arguments.
631 kwargs.setdefault("constraint", None)
632 kwargs.setdefault("projection", [])
633 kwargs = htc_tune_schedd_args(**kwargs)
634 for schedd_name, schedd in schedds.items():
635 for job_ad in schedd.history(**kwargs):
636 yield schedd_name, dict(job_ad)
639def htc_query_present(schedds, **kwargs):
640 """Query the condor_schedd daemon for job ads.
642 Parameters
643 ----------
644 schedds : `htcondor.Schedd`
645 HTCondor schedulers which to query for job information.
646 **kwargs
647 Any keyword arguments that Schedd.xquery() accepts.
649 Yields
650 ------
651 schedd_name : `str`
652 Name of the HTCondor scheduler managing the job queue.
653 job_ad : `dict` [`str`, `~typing.Any`]
654 A dictionary representing HTCondor ClassAd describing a job. It maps
655 job attributes names to values of the ClassAd expressions they
656 represent.
657 """
658 kwargs = htc_tune_schedd_args(**kwargs)
659 for schedd_name, schedd in schedds.items():
660 for job_ad in schedd.query(**kwargs):
661 yield schedd_name, dict(job_ad)
664def htc_version():
665 """Return the version given by the HTCondor API.
667 Returns
668 -------
669 version : `str`
670 HTCondor version as easily comparable string.
671 """
672 return str(HTC_VERSION)
675def htc_submit_dag(sub):
676 """Submit job for execution.
678 Parameters
679 ----------
680 sub : `htcondor.Submit`
681 An object representing a job submit description.
683 Returns
684 -------
685 schedd_job_info : `dict` [`str`, `dict` [`str`, \
686 `dict` [`str`, `~typing.Any`]]]
687 Information about jobs satisfying the search criteria where for each
688 Scheduler, local HTCondor job ids are mapped to their respective
689 classads.
690 """
691 coll = htcondor.Collector()
692 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
693 schedd = htcondor.Schedd(schedd_ad)
695 # If Schedd.submit() fails, the method will raise an exception. Usually,
696 # that implies issues with the HTCondor pool which BPS can't address.
697 # Hence, no effort is made to handle the exception.
698 submit_result = schedd.submit(sub)
700 # Sadly, the ClassAd from Schedd.submit() (see above) does not have
701 # 'GlobalJobId' so we need to run a regular query to get it anyway.
702 schedd_name = schedd_ad["Name"]
703 schedd_dag_info = condor_q(
704 constraint=f"ClusterId == {submit_result.cluster()}", schedds={schedd_name: schedd}
705 )
706 return schedd_dag_info
709def htc_create_submit_from_dag(dag_filename: str, submit_options: dict[str, Any]) -> htcondor.Submit:
710 """Create a DAGMan job submit description.
712 Parameters
713 ----------
714 dag_filename : `str`
715 Name of file containing HTCondor DAG commands.
716 submit_options : `dict` [`str`, `~typing.Any`], optional
717 Contains extra options for command line (Value of None means flag).
719 Returns
720 -------
721 sub : `htcondor.Submit`
722 An object representing a job submit description.
724 Notes
725 -----
726 Use with HTCondor versions which support htcondor.Submit.from_dag(),
727 i.e., 8.9.3 or newer.
728 """
729 # Config and environment variables do not seem to override -MaxIdle
730 # on the .dag.condor.sub's command line (broken in some 24.0.x versions).
731 # Explicitly forward them as a submit_option if either exists.
732 # Note: auto generated subdag submit files are still the -MaxIdle=1000
733 # in the broken versions.
734 if "MaxIdle" not in submit_options:
735 max_jobs_idle: int | None = None
736 config_var_name = "DAGMAN_MAX_JOBS_IDLE"
737 if f"_CONDOR_{config_var_name}" in os.environ:
738 max_jobs_idle = int(os.environ[f"_CONDOR_{config_var_name}"])
739 elif config_var_name in htcondor.param:
740 max_jobs_idle = htcondor.param[config_var_name]
741 if max_jobs_idle:
742 submit_options["MaxIdle"] = max_jobs_idle
744 return htcondor.Submit.from_dag(dag_filename, submit_options)
747def htc_create_submit_from_cmd(dag_filename, submit_options=None):
748 """Create a DAGMan job submit description.
750 Create a DAGMan job submit description by calling ``condor_submit_dag``
751 on given DAG description file.
753 Parameters
754 ----------
755 dag_filename : `str`
756 Name of file containing HTCondor DAG commands.
757 submit_options : `dict` [`str`, `~typing.Any`], optional
758 Contains extra options for command line (Value of None means flag).
760 Returns
761 -------
762 sub : `htcondor.Submit`
763 An object representing a job submit description.
765 Notes
766 -----
767 Use with HTCondor versions which do not support htcondor.Submit.from_dag(),
768 i.e., older than 8.9.3.
769 """
770 # Run command line condor_submit_dag command.
771 cmd = "condor_submit_dag -f -no_submit -notification never -autorescue 1 -UseDagDir -no_recurse "
773 if submit_options is not None:
774 for opt, val in submit_options.items():
775 cmd += f" -{opt} {val or ''}"
776 cmd += f"{dag_filename}"
778 process = subprocess.Popen(
779 cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8"
780 )
781 process.wait()
783 if process.returncode != 0:
784 print(f"Exit code: {process.returncode}")
785 print(process.communicate()[0])
786 raise RuntimeError("Problems running condor_submit_dag")
788 return htc_create_submit_from_file(f"{dag_filename}.condor.sub")
791def htc_create_submit_from_file(submit_file):
792 """Parse a submission file.
794 Parameters
795 ----------
796 submit_file : `str`
797 Name of the HTCondor submit file.
799 Returns
800 -------
801 sub : `htcondor.Submit`
802 An object representing a job submit description.
803 """
804 descriptors = {}
805 with open(submit_file) as fh:
806 for line in fh:
807 line = line.strip()
808 if not line.startswith("#") and not line == "queue":
809 (key, val) = re.split(r"\s*=\s*", line, maxsplit=1)
810 descriptors[key] = val
812 # Avoid UserWarning: the line 'copy_to_spool = False' was
813 # unused by Submit object. Is it a typo?
814 try:
815 del descriptors["copy_to_spool"]
816 except KeyError:
817 pass
819 return htcondor.Submit(descriptors)
822def _htc_write_job_commands(stream, name, commands, node_type="JOB"):
823 """Output the DAGMan job lines for single job in DAG.
825 Parameters
826 ----------
827 stream : `~typing.TextIO`
828 Writeable text stream (typically an opened file).
829 name : `str`
830 Job name.
831 commands : `RestrictedDict`
832 DAG commands for a job.
833 node_type : `str`, optional
834 Type of DAGMan node (JOB, FINAL, SERVICE). Defaults to "JOB".
835 """
836 # Note: optional pieces of commands include a space at the beginning.
837 # also making sure values aren't empty strings as placeholders.
838 if "pre" in commands and commands["pre"]:
839 defer = ""
840 if "defer" in commands["pre"] and commands["pre"]["defer"]:
841 defer = f" DEFER {commands['pre']['defer']['status']} {commands['pre']['defer']['time']}"
843 debug = ""
844 if "debug" in commands["pre"] and commands["pre"]["debug"]:
845 debug = f" DEBUG {commands['pre']['debug']['filename']} {commands['pre']['debug']['type']}"
847 arguments = ""
848 if "arguments" in commands["pre"] and commands["pre"]["arguments"]:
849 arguments = f" {commands['pre']['arguments']}"
851 executable = commands["pre"]["executable"]
852 print(f"SCRIPT{defer}{debug} PRE {name} {executable}{arguments}", file=stream)
854 if "post" in commands and commands["post"]:
855 defer = ""
856 if "defer" in commands["post"] and commands["post"]["defer"]:
857 defer = f" DEFER {commands['post']['defer']['status']} {commands['post']['defer']['time']}"
859 debug = ""
860 if "debug" in commands["post"] and commands["post"]["debug"]:
861 debug = f" DEBUG {commands['post']['debug']['filename']} {commands['post']['debug']['type']}"
863 arguments = ""
864 if "arguments" in commands["post"] and commands["post"]["arguments"]:
865 arguments = f" {commands['post']['arguments']}"
867 executable = commands["post"]["executable"]
868 print(f"SCRIPT{defer}{debug} POST {name} {executable}{arguments}", file=stream)
870 if "vars" in commands and commands["vars"]:
871 for key, value in commands["vars"].items():
872 print(f'VARS {name} {key}="{htc_escape(value)}"', file=stream)
874 if "pre_skip" in commands and commands["pre_skip"]:
875 print(f"PRE_SKIP {name} {commands['pre_skip']}", file=stream)
877 # FINAL node cannot have a DAGMan retry, abort-dag-on, priority, category
878 if node_type != "FINAL":
879 if "retry" in commands and commands["retry"]:
880 print(f"RETRY {name} {commands['retry']}", end="", file=stream)
881 if "retry_unless_exit" in commands and commands["retry_unless_exit"]:
882 print(f" UNLESS-EXIT {commands['retry_unless_exit']}", end="", file=stream)
883 print("", file=stream) # Since previous prints don't include new line
885 if "abort_dag_on" in commands and commands["abort_dag_on"]:
886 print(
887 f"ABORT-DAG-ON {name} {commands['abort_dag_on']['node_exit']}"
888 f" RETURN {commands['abort_dag_on']['abort_exit']}",
889 file=stream,
890 )
892 if "priority" in commands and commands["priority"]:
893 print(
894 f"PRIORITY {name} {commands['priority']}",
895 file=stream,
896 )
899class HTCJob:
900 """HTCondor job for use in building DAG.
902 Parameters
903 ----------
904 name : `str`
905 Name of the job.
906 label : `str`
907 Label that can used for grouping or lookup.
908 initcmds : `RestrictedDict`
909 Initial job commands for submit file.
910 initdagcmds : `RestrictedDict`
911 Initial commands for job inside DAG.
912 initattrs : `dict`
913 Initial dictionary of job attributes.
914 """
916 def __init__(self, name, label=None, initcmds=(), initdagcmds=(), initattrs=None):
917 self.name = name
918 self.label = label
919 self.cmds = RestrictedDict(HTC_VALID_JOB_KEYS, initcmds)
920 self.dagcmds = RestrictedDict(HTC_VALID_JOB_DAG_KEYS, initdagcmds)
921 self.attrs = initattrs
922 self.subfile = None
923 self.subdir = None
924 self.subdag = None
926 def __str__(self):
927 return self.name
929 def add_job_cmds(self, new_commands):
930 """Add commands to Job (overwrite existing).
932 Parameters
933 ----------
934 new_commands : `dict`
935 Submit file commands to be added to Job.
936 """
937 self.cmds.update(new_commands)
939 def add_dag_cmds(self, new_commands):
940 """Add DAG commands to Job (overwrite existing).
942 Parameters
943 ----------
944 new_commands : `dict`
945 DAG file commands to be added to Job.
946 """
947 self.dagcmds.update(new_commands)
949 def add_job_attrs(self, new_attrs):
950 """Add attributes to Job (overwrite existing).
952 Parameters
953 ----------
954 new_attrs : `dict`
955 Attributes to be added to Job.
956 """
957 if self.attrs is None:
958 self.attrs = {}
959 if new_attrs:
960 self.attrs.update(new_attrs)
962 def write_submit_file(self, submit_path: str | os.PathLike) -> None:
963 """Write job description to submit file.
965 Parameters
966 ----------
967 submit_path : `str` or `os.PathLike`
968 Prefix path for the submit file.
969 """
970 if not self.subfile:
971 self.subfile = f"{self.name}.sub"
973 subfile = self.subfile
974 if self.subdir:
975 subfile = Path(self.subdir) / subfile
977 subfile = Path(os.path.expandvars(subfile))
978 if not subfile.is_absolute():
979 subfile = Path(submit_path) / subfile
980 if not subfile.exists():
981 htc_write_condor_file(subfile, self.name, self.cmds, self.attrs)
983 def write_dag_commands(self, stream, dag_rel_path, command_name="JOB"):
984 """Write DAG commands for single job to output stream.
986 Parameters
987 ----------
988 stream : `~typing.TextIO`
989 Output Stream.
990 dag_rel_path : `str`
991 Relative path of dag to submit directory.
992 command_name : `str`
993 Name of the DAG command (e.g., JOB, FINAL).
994 """
995 subfile = os.path.expandvars(self.subfile)
997 # JOB NodeName SubmitDescription [DIR directory] [NOOP] [DONE]
998 job_line = f'{command_name} {self.name} "{subfile}"'
999 if "dir" in self.dagcmds:
1000 dir_val = self.dagcmds["dir"]
1001 if dag_rel_path:
1002 dir_val = os.path.join(dag_rel_path, dir_val)
1003 job_line += f' DIR "{dir_val}"'
1004 if self.dagcmds.get("noop", False):
1005 job_line += " NOOP"
1007 print(job_line, file=stream)
1008 if self.dagcmds:
1009 _htc_write_job_commands(stream, self.name, self.dagcmds, command_name)
1011 def dump(self, fh):
1012 """Dump job information to output stream.
1014 Parameters
1015 ----------
1016 fh : `~typing.TextIO`
1017 Output stream.
1018 """
1019 printer = pprint.PrettyPrinter(indent=4, stream=fh)
1020 printer.pprint(self.name)
1021 printer.pprint(self.cmds)
1022 printer.pprint(self.attrs)
1025class HTCDag(networkx.DiGraph):
1026 """HTCondor DAG.
1028 Parameters
1029 ----------
1030 data : `~typing.Any`
1031 Initial graph data of any format that is supported
1032 by the to_network_graph() function.
1033 name : `str`
1034 Name for DAG.
1035 """
1037 def __init__(self, data=None, name=""):
1038 super().__init__(data=data, name=name)
1040 self.graph["attr"] = {}
1041 self.graph["run_id"] = None
1042 self.graph["submit_path"] = None
1043 self.graph["final_job"] = None
1044 self.graph["service_job"] = None
1045 self.graph["submit_options"] = {}
1047 def __str__(self):
1048 """Represent basic DAG info as string.
1050 Returns
1051 -------
1052 info : `str`
1053 String containing basic DAG info.
1054 """
1055 return f"{self.graph['name']} {len(self)}"
1057 def add_attribs(self, attribs=None):
1058 """Add attributes to the DAG.
1060 Parameters
1061 ----------
1062 attribs : `dict`
1063 DAG attributes.
1064 """
1065 if attribs is not None:
1066 self.graph["attr"].update(attribs)
1068 def add_job(self, job, parent_names=None, child_names=None):
1069 """Add an HTCJob to the HTCDag.
1071 Parameters
1072 ----------
1073 job : `HTCJob`
1074 HTCJob to add to the HTCDag.
1075 parent_names : `~collections.abc.Iterable` [`str`], optional
1076 Names of parent jobs.
1077 child_names : `~collections.abc.Iterable` [`str`], optional
1078 Names of child jobs.
1079 """
1080 assert isinstance(job, HTCJob)
1081 _LOG.debug("Adding job %s to dag", job.name)
1083 # Add dag level attributes to each job
1084 job.add_job_attrs(self.graph["attr"])
1086 self.add_node(job.name, data=job)
1088 if parent_names is not None:
1089 self.add_job_relationships(parent_names, [job.name])
1091 if child_names is not None:
1092 self.add_job_relationships(child_names, [job.name])
1094 def add_job_relationships(self, parents, children):
1095 """Add DAG edge between parents and children jobs.
1097 Parameters
1098 ----------
1099 parents : `list` [`str`]
1100 Contains parent job name(s).
1101 children : `list` [`str`]
1102 Contains children job name(s).
1103 """
1104 self.add_edges_from(itertools.product(parents, children))
1106 def add_final_job(self, job):
1107 """Add an HTCJob for the FINAL job in HTCDag.
1109 Parameters
1110 ----------
1111 job : `HTCJob`
1112 HTCJob to add to the HTCDag as a FINAL job.
1113 """
1114 # Add dag level attributes to each job
1115 job.add_job_attrs(self.graph["attr"])
1117 self.graph["final_job"] = job
1119 def add_service_job(self, job):
1120 """Add an HTCJob for the SERVICE job in HTCDag.
1122 Parameters
1123 ----------
1124 job : `HTCJob`
1125 HTCJob to add to the HTCDag as a FINAL job.
1126 """
1127 # Add dag level attributes to each job
1128 job.add_job_attrs(self.graph["attr"])
1130 self.graph["service_job"] = job
1132 def del_job(self, job_name):
1133 """Delete the job from the DAG.
1135 Parameters
1136 ----------
1137 job_name : `str`
1138 Name of job in DAG to delete.
1139 """
1140 # Reconnect edges around node to delete
1141 parents = self.predecessors(job_name)
1142 children = self.successors(job_name)
1143 self.add_edges_from(itertools.product(parents, children))
1145 # Delete job node (which deletes its edges).
1146 self.remove_node(job_name)
1148 def write(self, submit_path, job_subdir="", dag_subdir="", dag_rel_path=""):
1149 """Write DAG to a file.
1151 Parameters
1152 ----------
1153 submit_path : `str`
1154 Prefix path for all outputs.
1155 job_subdir : `str`, optional
1156 Template for job subdir (submit_path + job_subdir).
1157 dag_subdir : `str`, optional
1158 DAG subdir (submit_path + dag_subdir).
1159 dag_rel_path : `str`, optional
1160 Prefix to job_subdir for jobs inside subdag.
1161 """
1162 self.graph["submit_path"] = submit_path
1163 self.graph["dag_filename"] = os.path.join(dag_subdir, f"{self.graph['name']}.dag")
1164 full_filename = os.path.join(submit_path, self.graph["dag_filename"])
1165 os.makedirs(os.path.dirname(full_filename), exist_ok=True)
1167 try:
1168 dagman_config_path = Path(self.graph["attr"]["bps_wms_config_path"])
1169 except KeyError:
1170 dagman_config_path = None
1171 with open(full_filename, "w") as fh:
1172 if dagman_config_path is not None:
1173 fh.write(f"CONFIG {dag_rel_path / dagman_config_path}\n")
1175 for name, nodeval in self.nodes().items():
1176 try:
1177 job = nodeval["data"]
1178 except KeyError:
1179 _LOG.error("Job %s doesn't have data (keys: %s).", name, nodeval.keys())
1180 raise
1181 if job.subdag:
1182 dag_subdir = f"subdags/{job.name}"
1183 if "dir" in job.dagcmds:
1184 subdir = job.dagcmds["dir"]
1185 else:
1186 subdir = job_subdir
1187 if dagman_config_path is not None:
1188 job.subdag.add_attribs({"bps_wms_config_path": str(dagman_config_path)})
1189 job.subdag.write(submit_path, subdir, dag_subdir, "../..")
1190 fh.write(
1191 f"SUBDAG EXTERNAL {job.name} {Path(job.subdag.graph['dag_filename']).name} "
1192 f"DIR {dag_subdir}\n"
1193 )
1194 if job.dagcmds:
1195 _htc_write_job_commands(fh, job.name, job.dagcmds)
1196 else:
1197 job.write_submit_file(submit_path)
1198 job.write_dag_commands(fh, dag_rel_path)
1200 for edge in self.edges():
1201 print(f"PARENT {edge[0]} CHILD {edge[1]}", file=fh)
1202 print(f"DOT {self.name}.dot", file=fh)
1203 print(f"NODE_STATUS_FILE {self.name}.node_status", file=fh)
1205 # Add bps attributes to dag submission
1206 for key, value in self.graph["attr"].items():
1207 print(f'SET_JOB_ATTR {key}= "{htc_escape(value)}"', file=fh)
1209 # Add special nodes if any.
1210 special_jobs = {
1211 "FINAL": self.graph["final_job"],
1212 "SERVICE": self.graph["service_job"],
1213 }
1214 for dagcmd, job in special_jobs.items():
1215 if job is not None:
1216 job.write_submit_file(submit_path)
1217 job.write_dag_commands(fh, dag_rel_path, dagcmd)
1219 def dump(self, fh):
1220 """Dump DAG info to output stream.
1222 Parameters
1223 ----------
1224 fh : `typing.IO`
1225 Where to dump DAG info as text.
1226 """
1227 for key, value in self.graph:
1228 print(f"{key}={value}", file=fh)
1229 for name, data in self.nodes().items():
1230 print(f"{name}:", file=fh)
1231 data.dump(fh)
1232 for edge in self.edges():
1233 print(f"PARENT {edge[0]} CHILD {edge[1]}", file=fh)
1234 if self.graph["final_job"]:
1235 print(f"FINAL {self.graph['final_job'].name}:", file=fh)
1236 self.graph["final_job"].dump(fh)
1238 def write_dot(self, filename):
1239 """Write a dot version of the DAG.
1241 Parameters
1242 ----------
1243 filename : `str`
1244 Name of the dot file.
1245 """
1246 pos = networkx.nx_agraph.graphviz_layout(self)
1247 networkx.draw(self, pos=pos)
1248 networkx.drawing.nx_pydot.write_dot(self, filename)
1251def condor_q(constraint=None, schedds=None, **kwargs):
1252 """Get information about the jobs in the HTCondor job queue(s).
1254 Parameters
1255 ----------
1256 constraint : `str`, optional
1257 Constraints to be passed to job query.
1258 schedds : `dict` [`str`, `htcondor.Schedd`], optional
1259 HTCondor schedulers which to query for job information. If None
1260 (default), the query will be run against local scheduler only.
1261 **kwargs : `~typing.Any`
1262 Additional keyword arguments that need to be passed to the internal
1263 query method.
1265 Returns
1266 -------
1267 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]]
1268 Information about jobs satisfying the search criteria where for each
1269 Scheduler, local HTCondor job ids are mapped to their respective
1270 classads.
1271 """
1272 return condor_query(constraint, schedds, htc_query_present, **kwargs)
1275def condor_history(constraint=None, schedds=None, **kwargs):
1276 """Get information about the jobs from HTCondor history records.
1278 Parameters
1279 ----------
1280 constraint : `str`, optional
1281 Constraints to be passed to job query.
1282 schedds : `dict` [`str`, `htcondor.Schedd`], optional
1283 HTCondor schedulers which to query for job information. If None
1284 (default), the query will be run against the history file of
1285 the local scheduler only.
1286 **kwargs : `~typing.Any`
1287 Additional keyword arguments that need to be passed to the internal
1288 query method.
1290 Returns
1291 -------
1292 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]]
1293 Information about jobs satisfying the search criteria where for each
1294 Scheduler, local HTCondor job ids are mapped to their respective
1295 classads.
1296 """
1297 return condor_query(constraint, schedds, htc_query_history, **kwargs)
1300def condor_query(constraint=None, schedds=None, query_func=htc_query_present, **kwargs):
1301 """Get information about HTCondor jobs.
1303 Parameters
1304 ----------
1305 constraint : `str`, optional
1306 Constraints to be passed to job query.
1307 schedds : `dict` [`str`, `htcondor.Schedd`], optional
1308 HTCondor schedulers which to query for job information. If None
1309 (default), the query will be run against the history file of
1310 the local scheduler only.
1311 query_func : `~collections.abc.Callable`
1312 An query function which takes following arguments:
1314 - ``schedds``: Schedulers to query (`list` [`htcondor.Schedd`]).
1315 - ``**kwargs``: Keyword arguments that will be passed to the query
1316 function.
1317 **kwargs : `~typing.Any`
1318 Additional keyword arguments that need to be passed to the query
1319 method.
1321 Returns
1322 -------
1323 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]]
1324 Information about jobs satisfying the search criteria where for each
1325 Scheduler, local HTCondor job ids are mapped to their respective
1326 classads.
1327 """
1328 if not schedds:
1329 coll = htcondor.Collector()
1330 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
1331 schedds = {schedd_ad["Name"]: htcondor.Schedd(schedd_ad)}
1333 # Make sure that 'ClusterId' and 'ProcId' attributes are always included
1334 # in the job classad. They are needed to construct the job id.
1335 added_attrs = set()
1336 if "projection" in kwargs and kwargs["projection"]:
1337 requested_attrs = set(kwargs["projection"])
1338 required_attrs = {"ClusterId", "ProcId"}
1339 added_attrs = required_attrs - requested_attrs
1340 for attr in added_attrs:
1341 kwargs["projection"].append(attr)
1343 unwanted_attrs = {"Env", "Environment"} | added_attrs
1344 job_info = defaultdict(dict)
1345 for schedd_name, job_ad in query_func(schedds, constraint=constraint, **kwargs):
1346 id_ = f"{job_ad['ClusterId']}.{job_ad['ProcId']}"
1347 for attr in set(job_ad) & unwanted_attrs:
1348 del job_ad[attr]
1349 job_info[schedd_name][id_] = job_ad
1350 _LOG.debug("query returned %d jobs", sum(len(val) for val in job_info.values()))
1352 # Restore the list of the requested attributes to its original value
1353 # if needed.
1354 if added_attrs:
1355 for attr in added_attrs:
1356 kwargs["projection"].remove(attr)
1358 # When returning the results filter out entries for schedulers with no jobs
1359 # matching the search criteria.
1360 return {key: val for key, val in job_info.items() if val}
1363def condor_search(constraint=None, hist=None, schedds=None):
1364 """Search for running and finished jobs satisfying given criteria.
1366 Parameters
1367 ----------
1368 constraint : `str`, optional
1369 Constraints to be passed to job query.
1370 hist : `float`
1371 Limit history search to this many days.
1372 schedds : `dict` [`str`, `htcondor.Schedd`], optional
1373 The list of the HTCondor schedulers which to query for job information.
1374 If None (default), only the local scheduler will be queried.
1376 Returns
1377 -------
1378 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str` `~typing.Any`]]]
1379 Information about jobs satisfying the search criteria where for each
1380 Scheduler, local HTCondor job ids are mapped to their respective
1381 classads.
1382 """
1383 if not schedds:
1384 coll = htcondor.Collector()
1385 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
1386 schedds = {schedd_ad["Name"]: htcondor.Schedd(locate_ad=schedd_ad)}
1388 job_info = condor_q(constraint=constraint, schedds=schedds)
1389 if hist is not None:
1390 _LOG.debug("Searching history going back %s days", hist)
1391 epoch = (datetime.now() - timedelta(days=hist)).timestamp()
1392 constraint += f" && (CompletionDate >= {epoch} || JobFinishedHookDone >= {epoch})"
1393 hist_info = condor_history(constraint, schedds=schedds)
1394 update_job_info(job_info, hist_info)
1395 return job_info
1398def condor_status(constraint=None, coll=None):
1399 """Get information about HTCondor pool.
1401 Parameters
1402 ----------
1403 constraint : `str`, optional
1404 Constraints to be passed to the query.
1405 coll : `htcondor.Collector`, optional
1406 Object representing HTCondor collector daemon.
1408 Returns
1409 -------
1410 pool_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1411 Mapping between HTCondor slot names and slot information (classAds).
1412 """
1413 if coll is None:
1414 coll = htcondor.Collector()
1415 try:
1416 pool_ads = coll.query(constraint=constraint)
1417 except OSError as ex:
1418 raise RuntimeError(f"Problem querying the Collector. (Constraint='{constraint}')") from ex
1420 pool_info = {}
1421 for slot in pool_ads:
1422 pool_info[slot["name"]] = dict(slot)
1423 _LOG.debug("condor_status returned %d ads", len(pool_info))
1424 return pool_info
1427def update_job_info(job_info, other_info):
1428 """Update results of a job query with results from another query.
1430 Parameters
1431 ----------
1432 job_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1433 Results of the job query that needs to be updated.
1434 other_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1435 Results of the other job query.
1437 Returns
1438 -------
1439 job_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1440 The updated results.
1441 """
1442 for schedd_name, others in other_info.items():
1443 try:
1444 jobs = job_info[schedd_name]
1445 except KeyError:
1446 job_info[schedd_name] = others
1447 else:
1448 for id_, ad in others.items():
1449 jobs.setdefault(id_, {}).update(ad)
1450 return job_info
1453def count_jobs_in_single_dag(
1454 filename: str | os.PathLike,
1455) -> tuple[Counter[str], dict[str, str], dict[str, WmsNodeType]]:
1456 """Build bps_run_summary string from dag file.
1458 Parameters
1459 ----------
1460 filename : `str`
1461 Path that includes dag file for a run.
1463 Returns
1464 -------
1465 counts : `Counter` [`str`]
1466 Semi-colon separated list of job labels and counts.
1467 (Same format as saved in dag classad).
1468 job_name_to_label : `dict` [`str`, `str`]
1469 Mapping of job names to job labels.
1470 job_name_to_type : `dict` [`str`, `lsst.ctrl.bps.htcondor.WmsNodeType`]
1471 Mapping of job names to job types
1472 (e.g., payload, final, service).
1473 """
1474 # Later code depends upon insertion order
1475 counts: Counter = Counter() # counts of payload jobs per label
1476 job_name_to_label: dict[str, str] = {}
1477 job_name_to_type: dict[str, WmsNodeType] = {}
1478 with open(filename) as fh:
1479 for line in fh:
1480 # Skip any line that contains commands irrelevant to job counting.
1481 if not line.startswith(
1482 (
1483 "JOB",
1484 "FINAL",
1485 "SERVICE",
1486 "SUBDAG EXTERNAL",
1487 )
1488 ):
1489 continue
1491 m = re.match(
1492 r"(?P<command>JOB|FINAL|SERVICE|SUBDAG EXTERNAL)\s+"
1493 r'(?P<jobname>(?P<wms>wms_)?\S+)\s+"?(?P<subfile>\S+)"?\s*'
1494 r'(DIR "?(?P<dir>[^\s"]+)"?)?\s*(?P<noop>NOOP)?',
1495 line,
1496 )
1497 if m:
1498 job_name = m.group("jobname")
1499 name_parts = job_name.split("_")
1501 label = ""
1502 if m.group("dir"):
1503 dir_match = re.search(r"jobs/([^\s/]+)", m.group("dir"))
1504 if dir_match:
1505 label = dir_match.group(1)
1506 else:
1507 _LOG.debug("Parse DAG: unparsed dir = %s", line)
1508 elif m.group("subfile"):
1509 subfile_match = re.search(r"jobs/([^\s/]+)", m.group("subfile"))
1510 if subfile_match:
1511 label = m.group("subfile").split("/")[1]
1512 else:
1513 label = pegasus_name_to_label(job_name)
1515 match m.group("command"):
1516 case "JOB":
1517 if m.group("noop"):
1518 job_type = WmsNodeType.NOOP
1519 # wms_noop_label
1520 label = name_parts[2]
1521 elif m.group("wms"):
1522 if name_parts[1] == "check":
1523 job_type = WmsNodeType.SUBDAG_CHECK
1524 # wms_check_status_wms_group_label
1525 label = name_parts[5]
1526 else:
1527 _LOG.warning(
1528 "Unexpected skipping of dag line due to unknown wms job: %s", line
1529 )
1530 else:
1531 job_type = WmsNodeType.PAYLOAD
1532 if label == "init":
1533 label = "pipetaskInit"
1534 counts[label] += 1
1535 case "FINAL":
1536 job_type = WmsNodeType.FINAL
1537 counts[label] += 1 # final counts a payload job.
1538 case "SERVICE":
1539 job_type = WmsNodeType.SERVICE
1540 case "SUBDAG EXTERNAL":
1541 job_type = WmsNodeType.SUBDAG
1542 label = name_parts[2]
1544 job_name_to_label[job_name] = label
1545 job_name_to_type[job_name] = job_type
1546 else:
1547 # The line should, but didn't match the pattern above. Probably
1548 # problems with regex.
1549 _LOG.warning("Unexpected skipping of dag line: %s", line)
1551 return counts, job_name_to_label, job_name_to_type
1554def summarize_dag(dir_name: str) -> tuple[str, dict[str, str], dict[str, WmsNodeType]]:
1555 """Build bps_run_summary string from dag file.
1557 Parameters
1558 ----------
1559 dir_name : `str`
1560 Path that includes dag file for a run.
1562 Returns
1563 -------
1564 summary : `str`
1565 Semi-colon separated list of job labels and counts
1566 (Same format as saved in dag classad).
1567 job_name_to_label : `dict` [`str`, `str`]
1568 Mapping of job names to job labels.
1569 job_name_to_type : `dict` [`str`, `lsst.ctrl.bps.htcondor.WmsNodeType`]
1570 Mapping of job names to job types
1571 (e.g., payload, final, service).
1572 """
1573 # Later code depends upon insertion order
1574 counts: Counter[str] = Counter() # counts of payload jobs per label
1575 job_name_to_label: dict[str, str] = {}
1576 job_name_to_type: dict[str, WmsNodeType] = {}
1577 for filename in Path(dir_name).glob("*.dag"):
1578 single_counts, single_job_name_to_label, single_job_name_to_type = count_jobs_in_single_dag(filename)
1579 counts += single_counts
1580 _update_dicts(job_name_to_label, single_job_name_to_label)
1581 _update_dicts(job_name_to_type, single_job_name_to_type)
1583 for filename in Path(dir_name).glob("subdags/*/*.dag"):
1584 single_counts, single_job_name_to_label, single_job_name_to_type = count_jobs_in_single_dag(filename)
1585 counts += single_counts
1586 _update_dicts(job_name_to_label, single_job_name_to_label)
1587 _update_dicts(job_name_to_type, single_job_name_to_type)
1589 summary = ";".join([f"{name}:{counts[name]}" for name in counts])
1590 _LOG.debug("summarize_dag: %s %s %s", summary, job_name_to_label, job_name_to_type)
1591 return summary, job_name_to_label, job_name_to_type
1594def pegasus_name_to_label(name):
1595 """Convert pegasus job name to a label for the report.
1597 Parameters
1598 ----------
1599 name : `str`
1600 Name of job.
1602 Returns
1603 -------
1604 label : `str`
1605 Label for job.
1606 """
1607 label = "UNK"
1608 if name.startswith("create_dir") or name.startswith("stage_in") or name.startswith("stage_out"):
1609 label = "pegasus"
1610 else:
1611 m = re.match(r"pipetask_(\d+_)?([^_]+)", name)
1612 if m:
1613 label = m.group(2)
1614 if label == "init":
1615 label = "pipetaskInit"
1617 return label
1620def read_single_dag_status(filename: str | os.PathLike) -> dict[str, Any]:
1621 """Read the node status file for DAG summary information.
1623 Parameters
1624 ----------
1625 filename : `str` or `Path.pathlib`
1626 Node status filename.
1628 Returns
1629 -------
1630 dag_ad : `dict` [`str`, `~typing.Any`]
1631 DAG summary information.
1632 """
1633 dag_ad: dict[str, Any] = {}
1635 # While this is probably more up to date than dag classad, only read from
1636 # file if need to.
1637 try:
1638 node_stat_file = Path(filename)
1639 _LOG.debug("Reading Node Status File %s", node_stat_file)
1640 with open(node_stat_file) as infh:
1641 dag_ad = dict(classad.parseNext(infh)) # pylint: disable=E1101
1643 if not dag_ad:
1644 # Pegasus check here
1645 metrics_file = node_stat_file.with_suffix(".dag.metrics")
1646 if metrics_file.exists():
1647 with open(metrics_file) as infh:
1648 metrics = json.load(infh)
1649 dag_ad["NodesTotal"] = metrics.get("jobs", 0)
1650 dag_ad["NodesFailed"] = metrics.get("jobs_failed", 0)
1651 dag_ad["NodesDone"] = metrics.get("jobs_succeeded", 0)
1652 metrics_file = node_stat_file.with_suffix(".metrics")
1653 with open(metrics_file) as infh:
1654 metrics = json.load(infh)
1655 dag_ad["NodesTotal"] = metrics["wf_metrics"]["total_jobs"]
1656 except (OSError, PermissionError):
1657 pass
1659 _LOG.debug("read_dag_status: %s", dag_ad)
1660 return dag_ad
1663def read_dag_status(wms_path: str | os.PathLike) -> dict[str, Any]:
1664 """Read the node status file for DAG summary information.
1666 Parameters
1667 ----------
1668 wms_path : `str` or `os.PathLike`
1669 Path that includes node status file for a run.
1671 Returns
1672 -------
1673 dag_ad : `dict` [`str`, `~typing.Any`]
1674 DAG summary information, counts summed across any subdags.
1675 """
1676 dag_ads: dict[str, Any] = {}
1677 path = Path(wms_path)
1678 try:
1679 node_stat_file = next(path.glob("*.node_status"))
1680 except StopIteration as exc:
1681 raise FileNotFoundError(f"DAGMan node status not found in {wms_path}") from exc
1683 dag_ads = read_single_dag_status(node_stat_file)
1685 for node_stat_file in path.glob("subdags/*/*.node_status"):
1686 dag_ad = read_single_dag_status(node_stat_file)
1687 dag_ads["JobProcsHeld"] += dag_ad.get("JobProcsHeld", 0)
1688 dag_ads["NodesPost"] += dag_ad.get("NodesPost", 0)
1689 dag_ads["JobProcsIdle"] += dag_ad.get("JobProcsIdle", 0)
1690 dag_ads["NodesTotal"] += dag_ad.get("NodesTotal", 0)
1691 dag_ads["NodesFailed"] += dag_ad.get("NodesFailed", 0)
1692 dag_ads["NodesDone"] += dag_ad.get("NodesDone", 0)
1693 dag_ads["NodesQueued"] += dag_ad.get("NodesQueued", 0)
1694 dag_ads["NodesPre"] += dag_ad.get("NodesReady", 0)
1695 dag_ads["NodesFutile"] += dag_ad.get("NodesFutile", 0)
1696 dag_ads["NodesUnready"] += dag_ad.get("NodesUnready", 0)
1698 return dag_ads
1701def read_single_node_status(filename: str | os.PathLike, init_fake_id: int) -> dict[str, Any]:
1702 """Read entire node status file.
1704 Parameters
1705 ----------
1706 filename : `str` or `pathlib.Path`
1707 Node status filename.
1708 init_fake_id : `int`
1709 Initial fake id value.
1711 Returns
1712 -------
1713 jobs : `dict` [`str`, `~typing.Any`]
1714 DAG summary information compiled from the node status file combined
1715 with the information found in the node event log.
1717 Currently, if the same job attribute is found in both files, its value
1718 from the event log takes precedence over the value from the node status
1719 file.
1720 """
1721 filename = Path(filename)
1723 # Get jobid info from other places to fill in gaps in info from node_status
1724 _, job_name_to_label, job_name_to_type = count_jobs_in_single_dag(filename.with_suffix(".dag"))
1725 loginfo: dict[str, dict[str, Any]] = {}
1726 try:
1727 wms_workflow_id, loginfo = read_single_dag_log(filename.with_suffix(".dag.dagman.log"))
1728 loginfo = read_single_dag_nodes_log(filename.with_suffix(".dag.nodes.log"))
1729 except (OSError, PermissionError):
1730 pass
1732 job_name_to_id: dict[str, str] = {}
1733 _LOG.debug("loginfo = %s", loginfo)
1734 log_job_name_to_id: dict[str, str] = {}
1735 for job_id, job_info in loginfo.items():
1736 if "LogNotes" in job_info:
1737 m = re.match(r"DAG Node: (\S+)", job_info["LogNotes"])
1738 if m:
1739 job_name = m.group(1)
1740 log_job_name_to_id[job_name] = job_id
1741 job_info["DAGNodeName"] = job_name
1742 job_info["wms_node_type"] = job_name_to_type[job_name]
1743 job_info["bps_job_label"] = job_name_to_label[job_name]
1745 jobs = {}
1746 fake_id = init_fake_id # For nodes that do not yet have a job id, give fake one
1747 try:
1748 with open(filename) as fh:
1749 for ad in classad.parseAds(fh):
1750 match ad["Type"]:
1751 case "DagStatus":
1752 # Skip DAG summary.
1753 pass
1754 case "NodeStatus":
1755 job_name = ad["Node"]
1756 if job_name in job_name_to_label:
1757 job_label = job_name_to_label[job_name]
1758 elif "_" in job_name:
1759 job_label = job_name.split("_")[1]
1760 else:
1761 job_label = job_name
1763 job = dict(ad)
1764 if job_name in log_job_name_to_id:
1765 job_id = str(log_job_name_to_id[job_name])
1766 _update_dicts(job, loginfo[job_id])
1767 else:
1768 job_id = str(fake_id)
1769 job = dict(ad)
1770 fake_id -= 1
1771 jobs[job_id] = job
1772 job_name_to_id[job_name] = job_id
1774 # Make job info as if came from condor_q.
1775 job["ClusterId"] = int(float(job_id))
1776 job["DAGManJobID"] = wms_workflow_id
1777 job["DAGNodeName"] = job_name
1778 job["bps_job_label"] = job_label
1779 job["wms_node_type"] = job_name_to_type[job_name]
1781 case "StatusEnd":
1782 # Skip node status file "epilog".
1783 pass
1784 case _:
1785 _LOG.debug(
1786 "Ignoring unknown classad type '%s' in the node status file '%s'",
1787 ad["Type"],
1788 filename,
1789 )
1790 except (OSError, PermissionError):
1791 pass
1793 # Check for missing jobs (e.g., submission failure or not submitted yet)
1794 # Use dag info to create job placeholders
1795 for name in set(job_name_to_label) - set(job_name_to_id):
1796 if name in log_job_name_to_id: # job was in nodes.log, but not node_status
1797 job_id = str(log_job_name_to_id[name])
1798 job = dict(loginfo[job_id])
1799 else:
1800 job_id = str(fake_id)
1801 fake_id -= 1
1802 job = {}
1803 job["NodeStatus"] = NodeStatus.NOT_READY
1805 job["ClusterId"] = int(float(job_id))
1806 job["ProcId"] = 0
1807 job["DAGManJobID"] = wms_workflow_id
1808 job["DAGNodeName"] = name
1809 job["bps_job_label"] = job_name_to_label[name]
1810 job["wms_node_type"] = job_name_to_type[name]
1811 jobs[f"{job['ClusterId']}.{job['ProcId']}"] = job
1813 for job_info in jobs.values():
1814 job_info["from_dag_job"] = f"wms_{filename.stem}"
1816 return jobs
1819def read_node_status(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]:
1820 """Read entire node status file.
1822 Parameters
1823 ----------
1824 wms_path : `str` or `os.PathLike`
1825 Path that includes node status file for a run.
1827 Returns
1828 -------
1829 jobs : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1830 DAG summary information compiled from the node status file combined
1831 with the information found in the node event log.
1833 Currently, if the same job attribute is found in both files, its value
1834 from the event log takes precedence over the value from the node status
1835 file.
1836 """
1837 jobs: dict[str, dict[str, Any]] = {}
1838 init_fake_id = -1
1840 # subdags may not have run so wouldn't have node_status file
1841 # use dag files and let read_single_node_status handle missing
1842 # node_status file.
1843 for dag_filename in Path(wms_path).glob("*.dag"):
1844 filename = dag_filename.with_suffix(".node_status")
1845 info = read_single_node_status(filename, init_fake_id)
1846 init_fake_id -= len(info)
1847 _update_dicts(jobs, info)
1849 for dag_filename in Path(wms_path).glob("subdags/*/*.dag"):
1850 filename = dag_filename.with_suffix(".node_status")
1851 info = read_single_node_status(filename, init_fake_id)
1852 init_fake_id -= len(info)
1853 _update_dicts(jobs, info)
1855 # Propagate pruned from subdags to jobs
1856 name_to_id: dict[str, str] = {}
1857 missing_status: dict[str, list[str]] = {}
1858 for id_, job in jobs.items():
1859 if job["DAGNodeName"].startswith("wms_"):
1860 name_to_id[job["DAGNodeName"]] = id_
1861 if "NodeStatus" not in job or job["NodeStatus"] == NodeStatus.NOT_READY:
1862 missing_status.setdefault(job["from_dag_job"], []).append(id_)
1864 for name, dag_id in name_to_id.items():
1865 dag_status = jobs[dag_id].get("NodeStatus", NodeStatus.NOT_READY)
1866 if dag_status in {NodeStatus.NOT_READY, NodeStatus.FUTILE}:
1867 for id_ in missing_status.get(name, []):
1868 jobs[id_]["NodeStatus"] = dag_status
1870 return jobs
1873def read_single_dag_log(log_filename: str | os.PathLike) -> tuple[str, dict[str, dict[str, Any]]]:
1874 """Read job information from the DAGMan log file.
1876 Parameters
1877 ----------
1878 log_filename : `str` or `os.PathLike`
1879 DAGMan log filename.
1881 Returns
1882 -------
1883 wms_workflow_id : `str`
1884 HTCondor job id (i.e., <ClusterId>.<ProcId>) of the DAGMan job.
1885 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1886 HTCondor job information read from the log file mapped to HTCondor
1887 job id.
1889 Raises
1890 ------
1891 FileNotFoundError
1892 If cannot find DAGMan log in given wms_path.
1893 """
1894 wms_workflow_id = "0"
1895 dag_info: dict[str, dict[str, Any]] = {}
1897 filename = Path(log_filename)
1898 if filename.exists():
1899 _LOG.debug("dag node log filename: %s", filename)
1901 info: dict[str, Any] = {}
1902 job_event_log = htcondor.JobEventLog(str(filename))
1903 for event in job_event_log.events(stop_after=0):
1904 id_ = f"{event['Cluster']}.{event['Proc']}"
1905 if id_ not in info:
1906 info[id_] = {}
1907 wms_workflow_id = id_ # taking last job id in case of restarts
1908 _update_dicts(info[id_], event)
1909 info[id_][f"{event.type.name.lower()}_time"] = event["EventTime"]
1911 # only save latest DAG job
1912 dag_info = {wms_workflow_id: info[wms_workflow_id]}
1914 return wms_workflow_id, dag_info
1917def read_dag_log(wms_path: str | os.PathLike) -> tuple[str, dict[str, Any]]:
1918 """Read job information from the DAGMan log file.
1920 Parameters
1921 ----------
1922 wms_path : `str` or `os.PathLike`
1923 Path containing the DAGMan log file.
1925 Returns
1926 -------
1927 wms_workflow_id : `str`
1928 HTCondor job id (i.e., <ClusterId>.<ProcId>) of the DAGMan job.
1929 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1930 HTCondor job information read from the log file mapped to HTCondor
1931 job id.
1933 Raises
1934 ------
1935 FileNotFoundError
1936 If cannot find DAGMan log in given wms_path.
1937 """
1938 wms_workflow_id = MISSING_ID
1939 dag_info: dict[str, dict[str, Any]] = {}
1941 path = Path(wms_path)
1942 if path.exists():
1943 try:
1944 filename = next(path.glob("*.dag.dagman.log"))
1945 except StopIteration as exc:
1946 raise FileNotFoundError(f"DAGMan log not found in {wms_path}") from exc
1947 _LOG.debug("dag node log filename: %s", filename)
1948 wms_workflow_id, dag_info = read_single_dag_log(filename)
1950 return wms_workflow_id, dag_info
1953def read_single_dag_nodes_log(filename: str | os.PathLike) -> dict[str, dict[str, Any]]:
1954 """Read job information from the DAGMan nodes log file.
1956 Parameters
1957 ----------
1958 filename : `str` or `os.PathLike`
1959 Path containing the DAGMan nodes log file.
1961 Returns
1962 -------
1963 info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
1964 HTCondor job information read from the log file mapped to HTCondor
1965 job id.
1967 Raises
1968 ------
1969 FileNotFoundError
1970 If cannot find DAGMan node log in given wms_path.
1971 """
1972 _LOG.debug("dag node log filename: %s", filename)
1973 filename = Path(filename)
1975 info: dict[str, dict[str, Any]] = {}
1976 if not filename.exists():
1977 raise FileNotFoundError(f"{filename} does not exist")
1979 try:
1980 job_event_log = htcondor.JobEventLog(str(filename))
1981 except htcondor.HTCondorIOError as ex:
1982 _LOG.error("Problem reading nodes log file (%s): %s", filename, ex)
1983 import traceback
1985 traceback.print_stack()
1986 raise
1987 for event in job_event_log.events(stop_after=0):
1988 _LOG.debug("log event type = %s, keys = %s", event["EventTypeNumber"], event.keys())
1990 try:
1991 id_ = f"{event['Cluster']}.{event['Proc']}"
1992 except KeyError:
1993 _LOG.warn(
1994 "Log event missing ids (DAGNodeName=%s, EventTime=%s, EventTypeNumber=%s)",
1995 event.get("DAGNodeName", "UNK"),
1996 event.get("EventTime", "UNK"),
1997 event.get("EventTypeNumber", "UNK"),
1998 )
1999 else:
2000 if id_ not in info:
2001 info[id_] = {}
2002 # Workaround: Please check to see if still problem in
2003 # future HTCondor versions. Sometimes get a
2004 # JobAbortedEvent for a subdag job after it already
2005 # terminated normally. Seems to happen when using job
2006 # plus subdags.
2007 if event["EventTypeNumber"] == 9 and info[id_].get("EventTypeNumber", -1) == 5:
2008 _LOG.debug("Skipping spurious JobAbortedEvent: %s", dict(event))
2009 else:
2010 _update_dicts(info[id_], event)
2011 info[id_][f"{event.type.name.lower()}_time"] = event["EventTime"]
2013 return info
2016def read_dag_nodes_log(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]:
2017 """Read job information from the DAGMan nodes log file.
2019 Parameters
2020 ----------
2021 wms_path : `str` or `os.PathLike`
2022 Path containing the DAGMan nodes log file.
2024 Returns
2025 -------
2026 info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
2027 HTCondor job information read from the log file mapped to HTCondor
2028 job id.
2030 Raises
2031 ------
2032 FileNotFoundError
2033 If cannot find DAGMan node log in given wms_path.
2034 """
2035 info: dict[str, dict[str, Any]] = {}
2036 for filename in Path(wms_path).glob("*.dag.nodes.log"):
2037 _LOG.debug("dag node log filename: %s", filename)
2038 _update_dicts(info, read_single_dag_nodes_log(filename))
2040 # If submitted, the main nodes log file should exist
2041 if not info:
2042 raise FileNotFoundError(f"DAGMan node log not found in {wms_path}")
2044 # Subdags will not have dag nodes log files if they haven't
2045 # started running yet (so missing is not an error).
2046 for filename in Path(wms_path).glob("subdags/*/*.dag.nodes.log"):
2047 _LOG.debug("dag node log filename: %s", filename)
2048 _update_dicts(info, read_single_dag_nodes_log(filename))
2050 return info
2053def read_dag_info(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]:
2054 """Read custom DAGMan job information from the file.
2056 Parameters
2057 ----------
2058 wms_path : `str` or `os.PathLike`
2059 Path containing the file with the DAGMan job info.
2061 Returns
2062 -------
2063 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
2064 HTCondor job information.
2066 Raises
2067 ------
2068 FileNotFoundError
2069 If cannot find DAGMan job info file in the given location.
2070 """
2071 dag_info: dict[str, dict[str, Any]] = {}
2072 try:
2073 filename = next(Path(wms_path).glob("*.info.json"))
2074 except StopIteration as exc:
2075 raise FileNotFoundError(f"File with DAGMan job information not found in {wms_path}") from exc
2076 _LOG.debug("DAGMan job information filename: %s", filename)
2077 try:
2078 with open(filename) as fh:
2079 dag_info = json.load(fh)
2080 except (OSError, PermissionError) as exc:
2081 _LOG.debug("Retrieving DAGMan job information failed: %s", exc)
2082 return dag_info
2085def write_dag_info(filename, dag_info):
2086 """Write custom job information about DAGMan job.
2088 Parameters
2089 ----------
2090 filename : `str`
2091 Name of the file where the information will be stored.
2092 dag_info : `dict` [`str` `dict` [`str`, `~typing.Any`]]
2093 Information about the DAGMan job.
2094 """
2095 schedd_name = next(iter(dag_info))
2096 dag_id = next(iter(dag_info[schedd_name]))
2097 dag_ad = dag_info[schedd_name][dag_id]
2098 ad = {"ClusterId": dag_ad["ClusterId"], "GlobalJobId": dag_ad["GlobalJobId"]}
2099 ad.update({key: val for key, val in dag_ad.items() if key.startswith("bps")})
2100 try:
2101 with open(filename, "w") as fh:
2102 info = {schedd_name: {dag_id: ad}}
2103 json.dump(info, fh)
2104 except (KeyError, OSError, PermissionError) as exc:
2105 _LOG.debug("Persisting DAGMan job information failed: %s", exc)
2108def htc_tweak_log_info(wms_path: str | Path, job: dict[str, Any]) -> None:
2109 """Massage the given job info has same structure as if came from condor_q.
2111 Parameters
2112 ----------
2113 wms_path : `str` | `os.PathLike`
2114 Path containing an HTCondor event log file.
2115 job : `dict` [ `str`, `~typing.Any` ]
2116 A mapping between HTCondor job id and job information read from
2117 the log.
2118 """
2119 _LOG.debug("htc_tweak_log_info: %s %s", wms_path, job)
2121 # Use the presence of 'MyType' key as a proxy to determine if the job ad
2122 # contains the info extracted from the event log. Exit early if it doesn't
2123 # (e.g. it is a job ad for a pruned job).
2124 if "MyType" not in job:
2125 return
2127 try:
2128 job["ClusterId"] = job["Cluster"]
2129 job["ProcId"] = job["Proc"]
2130 except KeyError as e:
2131 _LOG.error("Missing key %s in job: %s", str(e), job)
2132 raise
2133 job["Iwd"] = str(wms_path)
2134 job["Owner"] = Path(wms_path).owner()
2136 if "LogNotes" in job:
2137 m = re.match(r"DAG Node: (\S+)", job["LogNotes"])
2138 if m:
2139 job["DAGNodeName"] = m.group(1)
2141 match job["MyType"]:
2142 case "ExecuteEvent":
2143 job["JobStatus"] = htcondor.JobStatus.RUNNING
2144 case "JobTerminatedEvent" | "PostScriptTerminatedEvent":
2145 job["JobStatus"] = htcondor.JobStatus.COMPLETED
2146 case "SubmitEvent":
2147 job["JobStatus"] = htcondor.JobStatus.IDLE
2148 case "JobAbortedEvent":
2149 job["JobStatus"] = htcondor.JobStatus.REMOVED
2150 case "JobHeldEvent":
2151 job["JobStatus"] = htcondor.JobStatus.HELD
2152 case "JobReleaseEvent":
2153 # If the job managing the execution of the root DAG is held and
2154 # released this will be the last event showing up in its
2155 # job event log even if the job is still running. If this is
2156 # the last event for a job corresponding to the workflow node
2157 # (either a normal payload job or the job managing the execution
2158 # of an inner DAG), its final status will be determined later
2159 # using node status log (see _htc_status_to_wms_state()).
2160 job["JobStatus"] = htcondor.JobStatus.RUNNING if "DAGNodeName" not in job else None
2161 case _:
2162 _LOG.debug("Unknown log event type: %s", job["MyType"])
2163 job["JobStatus"] = None
2165 # Use available information to add either "ExitCode" or "ExitSignal"
2166 # attribute that captures respectively job's exit status (if it finished
2167 # on its own accord) or its exit signal (if it was terminated by
2168 # a signal). Also, include a flag "ExitBySignal" to make distinguishing
2169 # between these two cases easy later on.
2170 if job["JobStatus"] in {
2171 htcondor.JobStatus.COMPLETED,
2172 htcondor.JobStatus.HELD,
2173 htcondor.JobStatus.REMOVED,
2174 }:
2175 new_job = HTC_JOB_AD_HANDLERS.handle(job)
2176 if new_job is not None:
2177 job = new_job
2178 else:
2179 _LOG.error("Could not determine exit status for job '%s.%s'", job["ClusterId"], job["ProcId"])
2182def htc_check_dagman_output(wms_path: str | os.PathLike) -> str:
2183 """Check the DAGMan output for error messages.
2185 Parameters
2186 ----------
2187 wms_path : `str` or `os.PathLike`
2188 Directory containing the DAGman output file.
2190 Returns
2191 -------
2192 message : `str`
2193 Message containing error messages from the DAGMan output. Empty
2194 string if no messages.
2196 Raises
2197 ------
2198 FileNotFoundError
2199 If cannot find DAGMan standard output file in given wms_path.
2200 """
2201 try:
2202 filename = next(Path(wms_path).glob("*.dag.dagman.out"))
2203 except StopIteration as exc:
2204 raise FileNotFoundError(f"DAGMan standard output file not found in {wms_path}") from exc
2205 _LOG.debug("dag output filename: %s", filename)
2207 p = re.compile(r"^(\d\d/\d\d/\d\d \d\d:\d\d:\d\d) (Job submit try \d+/\d+ failed|Warning:.*$|ERROR:.*$)")
2209 message = ""
2210 try:
2211 with open(filename) as fh:
2212 last_submit_failed = "" # Since submit retries multiple times only report last one
2213 for line in fh:
2214 m = p.match(line)
2215 if m:
2216 if m.group(2).startswith("Job submit try"):
2217 last_submit_failed = m.group(1)
2218 elif m.group(2).startswith("ERROR: submit attempt failed"):
2219 pass # Should be handled by Job submit try
2220 elif m.group(2).startswith("Warning"):
2221 if ".dag.nodes.log is in /tmp" in m.group(2):
2222 last_warning = "Cannot submit from /tmp."
2223 else:
2224 last_warning = m.group(2)
2225 elif m.group(2) == "ERROR: Warning is fatal error because of DAGMAN_USE_STRICT setting":
2226 message += "ERROR: "
2227 message += last_warning
2228 message += "\n"
2229 elif m.group(2) in [
2230 "ERROR: the following job(s) failed:",
2231 "ERROR: the following Node(s) failed:",
2232 ]:
2233 pass
2234 else:
2235 message += m.group(2)
2236 message += "\n"
2238 if last_submit_failed:
2239 message += f"Warn: Job submission issues (last: {last_submit_failed})"
2240 except (OSError, PermissionError):
2241 message = f"Warn: Could not read dagman output file from {wms_path}."
2242 _LOG.debug("dag output file message: %s", message)
2243 return message
2246def _read_rescue_headers(infh: TextIO) -> tuple[list[str], list[str]]:
2247 """Read header lines from a rescue file.
2249 Parameters
2250 ----------
2251 infh : `TextIO`
2252 The rescue file from which to read the header lines.
2254 Returns
2255 -------
2256 header_lines : `list` [`str`]
2257 Header lines read from the rescue file.
2258 failed_subdags : `list` [`str`]
2259 Names of failed subdag jobs.
2260 """
2261 header_lines: list[str] = []
2262 failed = False
2263 failed_subdags: list[str] = []
2265 for line in infh:
2266 line = line.strip()
2267 if line.startswith("#"):
2268 if line.startswith("# Nodes that failed:"):
2269 failed = True
2270 header_lines.append(line)
2271 elif failed:
2272 orig_failed_nodes = line[1:].strip().split(",")
2273 new_failed_nodes = []
2274 for node in orig_failed_nodes:
2275 if node.startswith("wms_check_status"):
2276 group_node = node[17:]
2277 failed_subdags.append(group_node)
2278 new_failed_nodes.append(group_node)
2279 else:
2280 new_failed_nodes.append(node)
2281 header_lines.append(f"# {','.join(new_failed_nodes)}")
2282 if orig_failed_nodes[-1] == "<ENDLIST>":
2283 failed = False
2284 else:
2285 header_lines.append(line)
2286 elif line.strip() == "": # end of headers
2287 break
2288 return header_lines, failed_subdags
2291def _write_rescue_headers(header_lines: list[str], failed_subdags: list[str], outfh: TextIO) -> None:
2292 """Write the header lines to the new rescue file.
2294 Parameters
2295 ----------
2296 header_lines : `list` [`str`]
2297 Header lines to write to the new rescue file.
2298 failed_subdags : `list` [`str`]
2299 Job names of the failed subdags.
2300 outfh : `TextIO`
2301 New rescue file.
2302 """
2303 done_str = "# Nodes premarked DONE"
2304 pattern = f"^{done_str}:\\s+(\\d+)"
2305 for header_line in header_lines:
2306 m = re.match(pattern, header_line)
2307 if m:
2308 print(f"{done_str}: {int(m.group(1)) - len(failed_subdags)}", file=outfh)
2309 else:
2310 print(header_line, file=outfh)
2312 print("", file=outfh)
2315def _copy_done_lines(failed_subdags: list[str], infh: TextIO, outfh: TextIO) -> None:
2316 """Copy the DONE lines from the original rescue file skipping
2317 the failed group jobs.
2319 Parameters
2320 ----------
2321 failed_subdags : `list` [`str`]
2322 List of job names for the failed subdags
2323 infh : `TextIO`
2324 Original rescue file to copy from.
2325 outfh : `TextIO`
2326 New rescue file to copy to.
2327 """
2328 for line in infh:
2329 line = line.strip()
2330 try:
2331 _, node_name = line.split()
2332 except ValueError:
2333 _LOG.error(f"Unexpected line in rescue file = '{line}'")
2334 raise
2335 if node_name not in failed_subdags:
2336 print(line, file=outfh)
2339def _update_rescue_file(rescue_file: Path) -> None:
2340 """Update the subdag failures in the main rescue file
2341 and backup the failed subdag dirs.
2343 Parameters
2344 ----------
2345 rescue_file : `pathlib.Path`
2346 The main rescue file that needs to be updated.
2347 """
2348 # To reduce memory requirements, not reading entire file into memory.
2349 rescue_tmp = rescue_file.with_suffix(rescue_file.suffix + ".tmp")
2350 with open(rescue_file) as infh:
2351 header_lines, failed_subdags = _read_rescue_headers(infh)
2352 with open(rescue_tmp, "w") as outfh:
2353 _write_rescue_headers(header_lines, failed_subdags, outfh)
2354 _copy_done_lines(failed_subdags, infh, outfh)
2355 rescue_file.unlink()
2356 rescue_tmp.rename(rescue_file)
2357 for failed_subdag in failed_subdags:
2358 htc_backup_files(
2359 rescue_file.parent / "subdags" / failed_subdag, subdir=f"backups/subdags/{failed_subdag}"
2360 )
2363def _update_dicts(dict1, dict2):
2364 """Update dict1 with info in dict2.
2366 (Basically an update for nested dictionaries.)
2368 Parameters
2369 ----------
2370 dict1 : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
2371 HTCondor job information to be updated.
2372 dict2 : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
2373 Additional HTCondor job information.
2374 """
2375 for key, value in dict2.items():
2376 if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict):
2377 _update_dicts(dict1[key], value)
2378 else:
2379 dict1[key] = value
2382def _locate_schedds(locate_all=False):
2383 """Find out Scheduler daemons in an HTCondor pool.
2385 Parameters
2386 ----------
2387 locate_all : `bool`, optional
2388 If True, all available schedulers in the HTCondor pool will be located.
2389 False by default which means that the search will be limited to looking
2390 for the Scheduler running on a local host.
2392 Returns
2393 -------
2394 schedds : `dict` [`str`, `htcondor.Schedd`]
2395 A mapping between Scheduler names and Python objects allowing for
2396 interacting with them.
2397 """
2398 coll = htcondor.Collector()
2400 schedd_ads = []
2401 if locate_all:
2402 schedd_ads.extend(coll.locateAll(htcondor.DaemonTypes.Schedd))
2403 else:
2404 schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd))
2405 return {ad["Name"]: htcondor.Schedd(ad) for ad in schedd_ads}