Coverage for python / lsst / ctrl / bps / htcondor / lssthtc.py: 11%

924 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:24 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Placeholder HTCondor DAGMan API. 

29 

30There is new work on a python DAGMan API from HTCondor. However, at this 

31time, it tries to make things easier by assuming DAG is easily broken into 

32levels where there are 1-1 or all-to-all relationships to nodes in next 

33level. LSST workflows are more complicated. 

34""" 

35 

36__all__ = [ 

37 "MISSING_ID", 

38 "DagStatus", 

39 "HTCDag", 

40 "HTCJob", 

41 "NodeStatus", 

42 "RestrictedDict", 

43 "WmsNodeType", 

44 "condor_history", 

45 "condor_q", 

46 "condor_search", 

47 "condor_status", 

48 "htc_backup_files", 

49 "htc_check_dagman_output", 

50 "htc_create_submit_from_cmd", 

51 "htc_create_submit_from_dag", 

52 "htc_create_submit_from_file", 

53 "htc_escape", 

54 "htc_query_history", 

55 "htc_query_present", 

56 "htc_submit_dag", 

57 "htc_tweak_log_info", 

58 "htc_version", 

59 "htc_write_attribs", 

60 "htc_write_condor_file", 

61 "pegasus_name_to_label", 

62 "read_dag_info", 

63 "read_dag_log", 

64 "read_dag_nodes_log", 

65 "read_dag_status", 

66 "read_node_status", 

67 "summarize_dag", 

68 "update_job_info", 

69 "write_dag_info", 

70] 

71 

72 

73import itertools 

74import json 

75import logging 

76import os 

77import pprint 

78import re 

79import subprocess 

80from collections import Counter, defaultdict 

81from collections.abc import MutableMapping 

82from datetime import datetime, timedelta 

83from enum import IntEnum, auto 

84from pathlib import Path 

85from typing import Any, TextIO 

86 

87import classad 

88import htcondor 

89import networkx 

90from deprecated.sphinx import deprecated 

91from packaging import version 

92 

93from .handlers import HTC_JOB_AD_HANDLERS 

94 

95_LOG = logging.getLogger(__name__) 

96 

97MISSING_ID = "-99999" 

98 

99 

100class DagStatus(IntEnum): 

101 """HTCondor DAGMan's statuses for a DAG.""" 

102 

103 OK = 0 

104 ERROR = 1 # an error condition different than those listed here 

105 FAILED = 2 # one or more nodes in the DAG have failed 

106 ABORTED = 3 # the DAG has been aborted by an ABORT-DAG-ON specification 

107 REMOVED = 4 # the DAG has been removed by condor_rm 

108 CYCLE = 5 # a cycle was found in the DAG 

109 SUSPENDED = 6 # the DAG has been suspended (see section 2.10.8) 

110 

111 

112@deprecated( 

113 reason="The JobStatus is internally replaced by htcondor.JobStatus. " 

114 "External reporting code should be using ctrl_bps.WmsStates. " 

115 "This class will be removed after v30.", 

116 version="v30.0", 

117 category=FutureWarning, 

118) 

119class JobStatus(IntEnum): 

120 """HTCondor's statuses for jobs.""" 

121 

122 UNEXPANDED = 0 # Unexpanded 

123 IDLE = 1 # Idle 

124 RUNNING = 2 # Running 

125 REMOVED = 3 # Removed 

126 COMPLETED = 4 # Completed 

127 HELD = 5 # Held 

128 TRANSFERRING_OUTPUT = 6 # Transferring_Output 

129 SUSPENDED = 7 # Suspended 

130 

131 

132class NodeStatus(IntEnum): 

133 """HTCondor's statuses for DAGman nodes.""" 

134 

135 # (STATUS_NOT_READY): At least one parent has not yet finished or the node 

136 # is a FINAL node. 

137 NOT_READY = 0 

138 

139 # (STATUS_READY): All parents have finished, but the node is not yet 

140 # running. 

141 READY = 1 

142 

143 # (STATUS_PRERUN): The node’s PRE script is running. 

144 PRERUN = 2 

145 

146 # (STATUS_SUBMITTED): The node’s HTCondor job(s) are in the queue. 

147 # StatusDetails = "not_idle" -> running. 

148 # JobProcsHeld = 1-> hold. 

149 # JobProcsQueued = 1 -> idle. 

150 SUBMITTED = 3 

151 

152 # (STATUS_POSTRUN): The node’s POST script is running. 

153 POSTRUN = 4 

154 

155 # (STATUS_DONE): The node has completed successfully. 

156 DONE = 5 

157 

158 # (STATUS_ERROR): The node has failed. StatusDetails has info (e.g., 

159 # ULOG_JOB_ABORTED for deleted job). 

160 ERROR = 6 

161 

162 # (STATUS_FUTILE): The node will never run because ancestor node failed. 

163 FUTILE = 7 

164 

165 

166class WmsNodeType(IntEnum): 

167 """HTCondor plugin node types to help with payload reporting.""" 

168 

169 UNKNOWN = auto() 

170 """Dummy value when missing.""" 

171 

172 PAYLOAD = auto() 

173 """Payload job.""" 

174 

175 FINAL = auto() 

176 """Final job.""" 

177 

178 SERVICE = auto() 

179 """Service job.""" 

180 

181 NOOP = auto() 

182 """NOOP job used for ordering jobs.""" 

183 

184 SUBDAG = auto() 

185 """SUBDAG job used for ordering jobs.""" 

186 

187 SUBDAG_CHECK = auto() 

188 """Job used to correctly prune jobs after a subdag.""" 

189 

190 

191HTC_QUOTE_KEYS = {"environment"} 

192HTC_VALID_JOB_KEYS = { 

193 "universe", 

194 "executable", 

195 "arguments", 

196 "environment", 

197 "log", 

198 "error", 

199 "output", 

200 "should_transfer_files", 

201 "when_to_transfer_output", 

202 "getenv", 

203 "notification", 

204 "notify_user", 

205 "concurrency_limit", 

206 "transfer_executable", 

207 "transfer_input_files", 

208 "transfer_output_files", 

209 "transfer_output_remaps", 

210 "request_cpus", 

211 "request_memory", 

212 "request_disk", 

213 "priority", 

214 "category", 

215 "requirements", 

216 "on_exit_hold", 

217 "on_exit_hold_reason", 

218 "on_exit_hold_subcode", 

219 "max_retries", 

220 "retry_until", 

221 "periodic_release", 

222 "periodic_remove", 

223 "accounting_group", 

224 "accounting_group_user", 

225} 

226HTC_VALID_JOB_DAG_KEYS = { 

227 "dir", 

228 "noop", 

229 "done", 

230 "vars", 

231 "pre", 

232 "post", 

233 "retry", 

234 "retry_unless_exit", 

235 "abort_dag_on", 

236 "abort_exit", 

237 "priority", 

238} 

239HTC_VERSION = version.parse(htcondor.__version__) 

240 

241 

242class RestrictedDict(MutableMapping): 

243 """A dictionary that only allows certain keys. 

244 

245 Parameters 

246 ---------- 

247 valid_keys : `~collections.abc.Container` 

248 Strings that are valid keys. 

249 init_data : `dict` or `RestrictedDict`, optional 

250 Initial data. 

251 

252 Raises 

253 ------ 

254 KeyError 

255 If invalid key(s) in init_data. 

256 """ 

257 

258 def __init__(self, valid_keys, init_data=()): 

259 self.valid_keys = valid_keys 

260 self.data = {} 

261 self.update(init_data) 

262 

263 def __getitem__(self, key): 

264 """Return value for given key if exists. 

265 

266 Parameters 

267 ---------- 

268 key : `str` 

269 Identifier for value to return. 

270 

271 Returns 

272 ------- 

273 value : `~typing.Any` 

274 Value associated with given key. 

275 

276 Raises 

277 ------ 

278 KeyError 

279 If key doesn't exist. 

280 """ 

281 return self.data[key] 

282 

283 def __delitem__(self, key): 

284 """Delete value for given key if exists. 

285 

286 Parameters 

287 ---------- 

288 key : `str` 

289 Identifier for value to delete. 

290 

291 Raises 

292 ------ 

293 KeyError 

294 If key doesn't exist. 

295 """ 

296 del self.data[key] 

297 

298 def __setitem__(self, key, value): 

299 """Store key,value in internal dict only if key is valid. 

300 

301 Parameters 

302 ---------- 

303 key : `str` 

304 Identifier to associate with given value. 

305 value : `~typing.Any` 

306 Value to store. 

307 

308 Raises 

309 ------ 

310 KeyError 

311 If key is invalid. 

312 """ 

313 if key not in self.valid_keys: 

314 raise KeyError(f"Invalid key {key}") 

315 self.data[key] = value 

316 

317 def __iter__(self): 

318 return self.data.__iter__() 

319 

320 def __len__(self): 

321 return len(self.data) 

322 

323 def __str__(self): 

324 return str(self.data) 

325 

326 

327def htc_backup_files( 

328 wms_path: str | os.PathLike, subdir: str | os.PathLike | None = None, limit: int = 100 

329) -> Path | None: 

330 """Backup select HTCondor files in the submit directory. 

331 

332 Files will be saved in separate subdirectories which will be created in 

333 the submit directory where the files are located. These subdirectories 

334 will be consecutive, zero-padded integers. Their values will correspond to 

335 the number of HTCondor rescue DAGs in the submit directory. 

336 

337 Hence, with the default settings, copies after the initial failed run will 

338 be placed in '001' subdirectory, '002' after the first restart, and so on 

339 until the limit of backups is reached. If there's no rescue DAG yet, files 

340 will be copied to '000' subdirectory. 

341 

342 Parameters 

343 ---------- 

344 wms_path : `str` or `os.PathLike` 

345 Path to the submit directory either absolute or relative. 

346 subdir : `str` or `os.PathLike`, optional 

347 A path, relative to the submit directory, where all subdirectories with 

348 backup files will be kept. Defaults to None which means that the backup 

349 subdirectories will be placed directly in the submit directory. 

350 limit : `int`, optional 

351 Maximal number of backups. If the number of backups reaches the limit, 

352 the last backup files will be overwritten. The default value is 100 

353 to match the default value of HTCondor's DAGMAN_MAX_RESCUE_NUM in 

354 version 8.8+. 

355 

356 Returns 

357 ------- 

358 last_rescue_file : `pathlib.Path` or None 

359 Path to the latest rescue file or None if doesn't exist. 

360 

361 Raises 

362 ------ 

363 FileNotFoundError 

364 If the submit directory or the file that needs to be backed up does not 

365 exist. 

366 OSError 

367 If the submit directory cannot be accessed or backing up a file failed 

368 either due to permission or filesystem related issues. 

369 

370 Notes 

371 ----- 

372 This is not a generic function for making backups. It is intended to be 

373 used once, just before a restart, to make snapshots of files which will be 

374 overwritten by HTCondor after during the next run. 

375 """ 

376 width = len(str(limit)) 

377 

378 path = Path(wms_path).resolve() 

379 if not path.is_dir(): 

380 raise FileNotFoundError(f"Directory {path} not found") 

381 

382 # Initialize the backup counter. 

383 rescue_dags = list(path.glob("*.rescue[0-9][0-9][0-9]")) 

384 counter = min(len(rescue_dags), limit) 

385 

386 # Create the backup directory and move select files there. 

387 dest = path 

388 if subdir: 

389 # PurePath.is_relative_to() is not available before Python 3.9. Hence 

390 # we need to check is 'subdir' is in the submit directory in some other 

391 # way if it is an absolute path. 

392 subdir = Path(subdir) 

393 if subdir.is_absolute(): 

394 subdir = subdir.resolve() # Since resolve was run on path, must run it here 

395 if dest not in subdir.parents: 

396 _LOG.warning( 

397 "Invalid backup location: '%s' not in the submit directory, will use '%s' instead.", 

398 subdir, 

399 wms_path, 

400 ) 

401 else: 

402 dest /= subdir 

403 else: 

404 dest /= subdir 

405 dest /= f"{counter:0{width}}" 

406 _LOG.debug("dest = %s", dest) 

407 try: 

408 dest.mkdir(parents=True, exist_ok=False if counter < limit else True) 

409 except FileExistsError: 

410 _LOG.warning("Refusing to do backups: target directory '%s' already exists", dest) 

411 else: 

412 htc_backup_files_single_path(path, dest) 

413 

414 # also back up any subdag info 

415 for subdag_dir in path.glob("subdags/*"): 

416 subdag_dest = dest / subdag_dir.relative_to(path) 

417 subdag_dest.mkdir(parents=True, exist_ok=False) 

418 htc_backup_files_single_path(subdag_dir, subdag_dest) 

419 

420 last_rescue_file = rescue_dags[-1] if rescue_dags else None 

421 _LOG.debug("last_rescue_file = %s", last_rescue_file) 

422 return last_rescue_file 

423 

424 

425def htc_backup_files_single_path(src: str | os.PathLike, dest: str | os.PathLike) -> None: 

426 """Move particular htc files to a different directory for later debugging. 

427 

428 Parameters 

429 ---------- 

430 src : `str` or `os.PathLike` 

431 Directory from which to backup particular files. 

432 dest : `str` or `os.PathLike` 

433 Directory to which particular files are moved. 

434 

435 Raises 

436 ------ 

437 RuntimeError 

438 If given dest directory matches given src directory. 

439 OSError 

440 If problems moving file. 

441 FileNotFoundError 

442 Item matching pattern in src directory isn't a file. 

443 """ 

444 src = Path(src) 

445 dest = Path(dest) 

446 if dest.samefile(src): 

447 raise RuntimeError(f"Destination directory is same as the source directory ({src})") 

448 

449 for patt in [ 

450 "*.info.*", 

451 "*.dag.metrics", 

452 "*.dag.nodes.log", 

453 "*.node_status", 

454 "wms_*.dag.post.out", 

455 "wms_*.status.txt", 

456 ]: 

457 for source in src.glob(patt): 

458 if source.is_file(): 

459 target = dest / source.relative_to(src) 

460 try: 

461 source.rename(target) 

462 except OSError as exc: 

463 raise type(exc)(f"Backing up '{source}' failed: {exc.strerror}") from None 

464 else: 

465 raise FileNotFoundError(f"Backing up '{source}' failed: not a file") 

466 

467 

468def htc_escape(value): 

469 """Escape characters in given value based upon HTCondor syntax. 

470 

471 Parameters 

472 ---------- 

473 value : `~typing.Any` 

474 Value that needs to have characters escaped if string. 

475 

476 Returns 

477 ------- 

478 new_value : `~typing.Any` 

479 Given value with characters escaped appropriate for HTCondor if string. 

480 """ 

481 if isinstance(value, str): 

482 newval = value.replace('"', '""').replace("'", "''").replace("&quot;", '"') 

483 else: 

484 newval = value 

485 

486 return newval 

487 

488 

489def htc_write_attribs(stream, attrs): 

490 """Write job attributes in HTCondor format to writeable stream. 

491 

492 Parameters 

493 ---------- 

494 stream : `~typing.TextIO` 

495 Output text stream (typically an open file). 

496 attrs : `dict` 

497 HTCondor job attributes (dictionary of attribute key, value). 

498 """ 

499 for key, value in attrs.items(): 

500 # Make sure strings are syntactically correct for HTCondor. 

501 if isinstance(value, str): 

502 pval = f'"{htc_escape(value)}"' 

503 else: 

504 pval = value 

505 

506 print(f"+{key} = {pval}", file=stream) 

507 

508 

509def htc_write_condor_file( 

510 filename: str | os.PathLike, job_name: str, job: RestrictedDict, job_attrs: dict[str, Any] 

511) -> None: 

512 """Write an HTCondor submit file. 

513 

514 Parameters 

515 ---------- 

516 filename : `str` or `os.PathLike` 

517 Filename for the HTCondor submit file. 

518 job_name : `str` 

519 Job name to use in submit file. 

520 job : `RestrictedDict` 

521 Submit script information. 

522 job_attrs : `dict` 

523 Job attributes. 

524 """ 

525 os.makedirs(os.path.dirname(filename), exist_ok=True) 

526 with open(filename, "w") as fh: 

527 for key, value in job.items(): 

528 if value is not None: 

529 if key in HTC_QUOTE_KEYS: # Assumes internal quotes are already escaped correctly 

530 print(f'{key}="{value}"', file=fh) 

531 else: 

532 print(f"{key}={value}", file=fh) 

533 for key in ["output", "error", "log"]: 

534 if key not in job: 

535 filename = f"{job_name}.$(Cluster).{'out' if key != 'log' else key}" 

536 print(f"{key}={filename}", file=fh) 

537 

538 if job_attrs is not None: 

539 htc_write_attribs(fh, job_attrs) 

540 print("queue", file=fh) 

541 

542 

543# To avoid doing the version check during every function call select 

544# appropriate conversion function at the import time. 

545# 

546# Make sure that *each* version specific variant of the conversion function(s) 

547# has the same signature after applying any changes! 

548if HTC_VERSION < version.parse("8.9.8"): 548 ↛ 550line 548 didn't jump to line 550 because the condition on line 548 was never true

549 

550 def htc_tune_schedd_args(**kwargs): 

551 """Ensure that arguments for Schedd are version appropriate. 

552 

553 The old arguments: 'requirements' and 'attr_list' of 

554 'Schedd.history()', 'Schedd.query()', and 'Schedd.xquery()' were 

555 deprecated in favor of 'constraint' and 'projection', respectively, 

556 starting from version 8.9.8. The function will convert "new" keyword 

557 arguments to "old" ones. 

558 

559 Parameters 

560 ---------- 

561 **kwargs 

562 Any keyword arguments that Schedd.history(), Schedd.query(), and 

563 Schedd.xquery() accepts. 

564 

565 Returns 

566 ------- 

567 kwargs : `dict` [`str`, `~typing.Any`] 

568 Keywords arguments that are guaranteed to work with the Python 

569 HTCondor API. 

570 

571 Notes 

572 ----- 

573 Function doesn't validate provided keyword arguments beyond converting 

574 selected arguments to their version specific form. For example, 

575 it won't remove keywords that are not supported by the methods 

576 mentioned earlier. 

577 """ 

578 translation_table = { 

579 "constraint": "requirements", 

580 "projection": "attr_list", 

581 } 

582 for new, old in translation_table.items(): 

583 try: 

584 kwargs[old] = kwargs.pop(new) 

585 except KeyError: 

586 pass 

587 return kwargs 

588 

589else: 

590 

591 def htc_tune_schedd_args(**kwargs): 

592 """Ensure that arguments for Schedd are version appropriate. 

593 

594 This is the fallback function if no version specific alteration are 

595 necessary. Effectively, a no-op. 

596 

597 Parameters 

598 ---------- 

599 **kwargs 

600 Any keyword arguments that Schedd.history(), Schedd.query(), and 

601 Schedd.xquery() accepts. 

602 

603 Returns 

604 ------- 

605 kwargs : `dict` [`str`, `~typing.Any`] 

606 Keywords arguments that were passed to the function. 

607 """ 

608 return kwargs 

609 

610 

611def htc_query_history(schedds, **kwargs): 

612 """Fetch history records from the condor_schedd daemon. 

613 

614 Parameters 

615 ---------- 

616 schedds : `htcondor.Schedd` 

617 HTCondor schedulers which to query for job information. 

618 **kwargs 

619 Any keyword arguments that Schedd.history() accepts. 

620 

621 Yields 

622 ------ 

623 schedd_name : `str` 

624 Name of the HTCondor scheduler managing the job queue. 

625 job_ad : `dict` [`str`, `~typing.Any`] 

626 A dictionary representing HTCondor ClassAd describing a job. It maps 

627 job attributes names to values of the ClassAd expressions they 

628 represent. 

629 """ 

630 # If not set, provide defaults for positional arguments. 

631 kwargs.setdefault("constraint", None) 

632 kwargs.setdefault("projection", []) 

633 kwargs = htc_tune_schedd_args(**kwargs) 

634 for schedd_name, schedd in schedds.items(): 

635 for job_ad in schedd.history(**kwargs): 

636 yield schedd_name, dict(job_ad) 

637 

638 

639def htc_query_present(schedds, **kwargs): 

640 """Query the condor_schedd daemon for job ads. 

641 

642 Parameters 

643 ---------- 

644 schedds : `htcondor.Schedd` 

645 HTCondor schedulers which to query for job information. 

646 **kwargs 

647 Any keyword arguments that Schedd.xquery() accepts. 

648 

649 Yields 

650 ------ 

651 schedd_name : `str` 

652 Name of the HTCondor scheduler managing the job queue. 

653 job_ad : `dict` [`str`, `~typing.Any`] 

654 A dictionary representing HTCondor ClassAd describing a job. It maps 

655 job attributes names to values of the ClassAd expressions they 

656 represent. 

657 """ 

658 kwargs = htc_tune_schedd_args(**kwargs) 

659 for schedd_name, schedd in schedds.items(): 

660 for job_ad in schedd.query(**kwargs): 

661 yield schedd_name, dict(job_ad) 

662 

663 

664def htc_version(): 

665 """Return the version given by the HTCondor API. 

666 

667 Returns 

668 ------- 

669 version : `str` 

670 HTCondor version as easily comparable string. 

671 """ 

672 return str(HTC_VERSION) 

673 

674 

675def htc_submit_dag(sub): 

676 """Submit job for execution. 

677 

678 Parameters 

679 ---------- 

680 sub : `htcondor.Submit` 

681 An object representing a job submit description. 

682 

683 Returns 

684 ------- 

685 schedd_job_info : `dict` [`str`, `dict` [`str`, \ 

686 `dict` [`str`, `~typing.Any`]]] 

687 Information about jobs satisfying the search criteria where for each 

688 Scheduler, local HTCondor job ids are mapped to their respective 

689 classads. 

690 """ 

691 coll = htcondor.Collector() 

692 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) 

693 schedd = htcondor.Schedd(schedd_ad) 

694 

695 # If Schedd.submit() fails, the method will raise an exception. Usually, 

696 # that implies issues with the HTCondor pool which BPS can't address. 

697 # Hence, no effort is made to handle the exception. 

698 submit_result = schedd.submit(sub) 

699 

700 # Sadly, the ClassAd from Schedd.submit() (see above) does not have 

701 # 'GlobalJobId' so we need to run a regular query to get it anyway. 

702 schedd_name = schedd_ad["Name"] 

703 schedd_dag_info = condor_q( 

704 constraint=f"ClusterId == {submit_result.cluster()}", schedds={schedd_name: schedd} 

705 ) 

706 return schedd_dag_info 

707 

708 

709def htc_create_submit_from_dag(dag_filename: str, submit_options: dict[str, Any]) -> htcondor.Submit: 

710 """Create a DAGMan job submit description. 

711 

712 Parameters 

713 ---------- 

714 dag_filename : `str` 

715 Name of file containing HTCondor DAG commands. 

716 submit_options : `dict` [`str`, `~typing.Any`], optional 

717 Contains extra options for command line (Value of None means flag). 

718 

719 Returns 

720 ------- 

721 sub : `htcondor.Submit` 

722 An object representing a job submit description. 

723 

724 Notes 

725 ----- 

726 Use with HTCondor versions which support htcondor.Submit.from_dag(), 

727 i.e., 8.9.3 or newer. 

728 """ 

729 # Config and environment variables do not seem to override -MaxIdle 

730 # on the .dag.condor.sub's command line (broken in some 24.0.x versions). 

731 # Explicitly forward them as a submit_option if either exists. 

732 # Note: auto generated subdag submit files are still the -MaxIdle=1000 

733 # in the broken versions. 

734 if "MaxIdle" not in submit_options: 

735 max_jobs_idle: int | None = None 

736 config_var_name = "DAGMAN_MAX_JOBS_IDLE" 

737 if f"_CONDOR_{config_var_name}" in os.environ: 

738 max_jobs_idle = int(os.environ[f"_CONDOR_{config_var_name}"]) 

739 elif config_var_name in htcondor.param: 

740 max_jobs_idle = htcondor.param[config_var_name] 

741 if max_jobs_idle: 

742 submit_options["MaxIdle"] = max_jobs_idle 

743 

744 return htcondor.Submit.from_dag(dag_filename, submit_options) 

745 

746 

747def htc_create_submit_from_cmd(dag_filename, submit_options=None): 

748 """Create a DAGMan job submit description. 

749 

750 Create a DAGMan job submit description by calling ``condor_submit_dag`` 

751 on given DAG description file. 

752 

753 Parameters 

754 ---------- 

755 dag_filename : `str` 

756 Name of file containing HTCondor DAG commands. 

757 submit_options : `dict` [`str`, `~typing.Any`], optional 

758 Contains extra options for command line (Value of None means flag). 

759 

760 Returns 

761 ------- 

762 sub : `htcondor.Submit` 

763 An object representing a job submit description. 

764 

765 Notes 

766 ----- 

767 Use with HTCondor versions which do not support htcondor.Submit.from_dag(), 

768 i.e., older than 8.9.3. 

769 """ 

770 # Run command line condor_submit_dag command. 

771 cmd = "condor_submit_dag -f -no_submit -notification never -autorescue 1 -UseDagDir -no_recurse " 

772 

773 if submit_options is not None: 

774 for opt, val in submit_options.items(): 

775 cmd += f" -{opt} {val or ''}" 

776 cmd += f"{dag_filename}" 

777 

778 process = subprocess.Popen( 

779 cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8" 

780 ) 

781 process.wait() 

782 

783 if process.returncode != 0: 

784 print(f"Exit code: {process.returncode}") 

785 print(process.communicate()[0]) 

786 raise RuntimeError("Problems running condor_submit_dag") 

787 

788 return htc_create_submit_from_file(f"{dag_filename}.condor.sub") 

789 

790 

791def htc_create_submit_from_file(submit_file): 

792 """Parse a submission file. 

793 

794 Parameters 

795 ---------- 

796 submit_file : `str` 

797 Name of the HTCondor submit file. 

798 

799 Returns 

800 ------- 

801 sub : `htcondor.Submit` 

802 An object representing a job submit description. 

803 """ 

804 descriptors = {} 

805 with open(submit_file) as fh: 

806 for line in fh: 

807 line = line.strip() 

808 if not line.startswith("#") and not line == "queue": 

809 (key, val) = re.split(r"\s*=\s*", line, maxsplit=1) 

810 descriptors[key] = val 

811 

812 # Avoid UserWarning: the line 'copy_to_spool = False' was 

813 # unused by Submit object. Is it a typo? 

814 try: 

815 del descriptors["copy_to_spool"] 

816 except KeyError: 

817 pass 

818 

819 return htcondor.Submit(descriptors) 

820 

821 

822def _htc_write_job_commands(stream, name, commands, node_type="JOB"): 

823 """Output the DAGMan job lines for single job in DAG. 

824 

825 Parameters 

826 ---------- 

827 stream : `~typing.TextIO` 

828 Writeable text stream (typically an opened file). 

829 name : `str` 

830 Job name. 

831 commands : `RestrictedDict` 

832 DAG commands for a job. 

833 node_type : `str`, optional 

834 Type of DAGMan node (JOB, FINAL, SERVICE). Defaults to "JOB". 

835 """ 

836 # Note: optional pieces of commands include a space at the beginning. 

837 # also making sure values aren't empty strings as placeholders. 

838 if "pre" in commands and commands["pre"]: 

839 defer = "" 

840 if "defer" in commands["pre"] and commands["pre"]["defer"]: 

841 defer = f" DEFER {commands['pre']['defer']['status']} {commands['pre']['defer']['time']}" 

842 

843 debug = "" 

844 if "debug" in commands["pre"] and commands["pre"]["debug"]: 

845 debug = f" DEBUG {commands['pre']['debug']['filename']} {commands['pre']['debug']['type']}" 

846 

847 arguments = "" 

848 if "arguments" in commands["pre"] and commands["pre"]["arguments"]: 

849 arguments = f" {commands['pre']['arguments']}" 

850 

851 executable = commands["pre"]["executable"] 

852 print(f"SCRIPT{defer}{debug} PRE {name} {executable}{arguments}", file=stream) 

853 

854 if "post" in commands and commands["post"]: 

855 defer = "" 

856 if "defer" in commands["post"] and commands["post"]["defer"]: 

857 defer = f" DEFER {commands['post']['defer']['status']} {commands['post']['defer']['time']}" 

858 

859 debug = "" 

860 if "debug" in commands["post"] and commands["post"]["debug"]: 

861 debug = f" DEBUG {commands['post']['debug']['filename']} {commands['post']['debug']['type']}" 

862 

863 arguments = "" 

864 if "arguments" in commands["post"] and commands["post"]["arguments"]: 

865 arguments = f" {commands['post']['arguments']}" 

866 

867 executable = commands["post"]["executable"] 

868 print(f"SCRIPT{defer}{debug} POST {name} {executable}{arguments}", file=stream) 

869 

870 if "vars" in commands and commands["vars"]: 

871 for key, value in commands["vars"].items(): 

872 print(f'VARS {name} {key}="{htc_escape(value)}"', file=stream) 

873 

874 if "pre_skip" in commands and commands["pre_skip"]: 

875 print(f"PRE_SKIP {name} {commands['pre_skip']}", file=stream) 

876 

877 # FINAL node cannot have a DAGMan retry, abort-dag-on, priority, category 

878 if node_type != "FINAL": 

879 if "retry" in commands and commands["retry"]: 

880 print(f"RETRY {name} {commands['retry']}", end="", file=stream) 

881 if "retry_unless_exit" in commands and commands["retry_unless_exit"]: 

882 print(f" UNLESS-EXIT {commands['retry_unless_exit']}", end="", file=stream) 

883 print("", file=stream) # Since previous prints don't include new line 

884 

885 if "abort_dag_on" in commands and commands["abort_dag_on"]: 

886 print( 

887 f"ABORT-DAG-ON {name} {commands['abort_dag_on']['node_exit']}" 

888 f" RETURN {commands['abort_dag_on']['abort_exit']}", 

889 file=stream, 

890 ) 

891 

892 if "priority" in commands and commands["priority"]: 

893 print( 

894 f"PRIORITY {name} {commands['priority']}", 

895 file=stream, 

896 ) 

897 

898 

899class HTCJob: 

900 """HTCondor job for use in building DAG. 

901 

902 Parameters 

903 ---------- 

904 name : `str` 

905 Name of the job. 

906 label : `str` 

907 Label that can used for grouping or lookup. 

908 initcmds : `RestrictedDict` 

909 Initial job commands for submit file. 

910 initdagcmds : `RestrictedDict` 

911 Initial commands for job inside DAG. 

912 initattrs : `dict` 

913 Initial dictionary of job attributes. 

914 """ 

915 

916 def __init__(self, name, label=None, initcmds=(), initdagcmds=(), initattrs=None): 

917 self.name = name 

918 self.label = label 

919 self.cmds = RestrictedDict(HTC_VALID_JOB_KEYS, initcmds) 

920 self.dagcmds = RestrictedDict(HTC_VALID_JOB_DAG_KEYS, initdagcmds) 

921 self.attrs = initattrs 

922 self.subfile = None 

923 self.subdir = None 

924 self.subdag = None 

925 

926 def __str__(self): 

927 return self.name 

928 

929 def add_job_cmds(self, new_commands): 

930 """Add commands to Job (overwrite existing). 

931 

932 Parameters 

933 ---------- 

934 new_commands : `dict` 

935 Submit file commands to be added to Job. 

936 """ 

937 self.cmds.update(new_commands) 

938 

939 def add_dag_cmds(self, new_commands): 

940 """Add DAG commands to Job (overwrite existing). 

941 

942 Parameters 

943 ---------- 

944 new_commands : `dict` 

945 DAG file commands to be added to Job. 

946 """ 

947 self.dagcmds.update(new_commands) 

948 

949 def add_job_attrs(self, new_attrs): 

950 """Add attributes to Job (overwrite existing). 

951 

952 Parameters 

953 ---------- 

954 new_attrs : `dict` 

955 Attributes to be added to Job. 

956 """ 

957 if self.attrs is None: 

958 self.attrs = {} 

959 if new_attrs: 

960 self.attrs.update(new_attrs) 

961 

962 def write_submit_file(self, submit_path: str | os.PathLike) -> None: 

963 """Write job description to submit file. 

964 

965 Parameters 

966 ---------- 

967 submit_path : `str` or `os.PathLike` 

968 Prefix path for the submit file. 

969 """ 

970 if not self.subfile: 

971 self.subfile = f"{self.name}.sub" 

972 

973 subfile = self.subfile 

974 if self.subdir: 

975 subfile = Path(self.subdir) / subfile 

976 

977 subfile = Path(os.path.expandvars(subfile)) 

978 if not subfile.is_absolute(): 

979 subfile = Path(submit_path) / subfile 

980 if not subfile.exists(): 

981 htc_write_condor_file(subfile, self.name, self.cmds, self.attrs) 

982 

983 def write_dag_commands(self, stream, dag_rel_path, command_name="JOB"): 

984 """Write DAG commands for single job to output stream. 

985 

986 Parameters 

987 ---------- 

988 stream : `~typing.TextIO` 

989 Output Stream. 

990 dag_rel_path : `str` 

991 Relative path of dag to submit directory. 

992 command_name : `str` 

993 Name of the DAG command (e.g., JOB, FINAL). 

994 """ 

995 subfile = os.path.expandvars(self.subfile) 

996 

997 # JOB NodeName SubmitDescription [DIR directory] [NOOP] [DONE] 

998 job_line = f'{command_name} {self.name} "{subfile}"' 

999 if "dir" in self.dagcmds: 

1000 dir_val = self.dagcmds["dir"] 

1001 if dag_rel_path: 

1002 dir_val = os.path.join(dag_rel_path, dir_val) 

1003 job_line += f' DIR "{dir_val}"' 

1004 if self.dagcmds.get("noop", False): 

1005 job_line += " NOOP" 

1006 

1007 print(job_line, file=stream) 

1008 if self.dagcmds: 

1009 _htc_write_job_commands(stream, self.name, self.dagcmds, command_name) 

1010 

1011 def dump(self, fh): 

1012 """Dump job information to output stream. 

1013 

1014 Parameters 

1015 ---------- 

1016 fh : `~typing.TextIO` 

1017 Output stream. 

1018 """ 

1019 printer = pprint.PrettyPrinter(indent=4, stream=fh) 

1020 printer.pprint(self.name) 

1021 printer.pprint(self.cmds) 

1022 printer.pprint(self.attrs) 

1023 

1024 

1025class HTCDag(networkx.DiGraph): 

1026 """HTCondor DAG. 

1027 

1028 Parameters 

1029 ---------- 

1030 data : `~typing.Any` 

1031 Initial graph data of any format that is supported 

1032 by the to_network_graph() function. 

1033 name : `str` 

1034 Name for DAG. 

1035 """ 

1036 

1037 def __init__(self, data=None, name=""): 

1038 super().__init__(data=data, name=name) 

1039 

1040 self.graph["attr"] = {} 

1041 self.graph["run_id"] = None 

1042 self.graph["submit_path"] = None 

1043 self.graph["final_job"] = None 

1044 self.graph["service_job"] = None 

1045 self.graph["submit_options"] = {} 

1046 

1047 def __str__(self): 

1048 """Represent basic DAG info as string. 

1049 

1050 Returns 

1051 ------- 

1052 info : `str` 

1053 String containing basic DAG info. 

1054 """ 

1055 return f"{self.graph['name']} {len(self)}" 

1056 

1057 def add_attribs(self, attribs=None): 

1058 """Add attributes to the DAG. 

1059 

1060 Parameters 

1061 ---------- 

1062 attribs : `dict` 

1063 DAG attributes. 

1064 """ 

1065 if attribs is not None: 

1066 self.graph["attr"].update(attribs) 

1067 

1068 def add_job(self, job, parent_names=None, child_names=None): 

1069 """Add an HTCJob to the HTCDag. 

1070 

1071 Parameters 

1072 ---------- 

1073 job : `HTCJob` 

1074 HTCJob to add to the HTCDag. 

1075 parent_names : `~collections.abc.Iterable` [`str`], optional 

1076 Names of parent jobs. 

1077 child_names : `~collections.abc.Iterable` [`str`], optional 

1078 Names of child jobs. 

1079 """ 

1080 assert isinstance(job, HTCJob) 

1081 _LOG.debug("Adding job %s to dag", job.name) 

1082 

1083 # Add dag level attributes to each job 

1084 job.add_job_attrs(self.graph["attr"]) 

1085 

1086 self.add_node(job.name, data=job) 

1087 

1088 if parent_names is not None: 

1089 self.add_job_relationships(parent_names, [job.name]) 

1090 

1091 if child_names is not None: 

1092 self.add_job_relationships(child_names, [job.name]) 

1093 

1094 def add_job_relationships(self, parents, children): 

1095 """Add DAG edge between parents and children jobs. 

1096 

1097 Parameters 

1098 ---------- 

1099 parents : `list` [`str`] 

1100 Contains parent job name(s). 

1101 children : `list` [`str`] 

1102 Contains children job name(s). 

1103 """ 

1104 self.add_edges_from(itertools.product(parents, children)) 

1105 

1106 def add_final_job(self, job): 

1107 """Add an HTCJob for the FINAL job in HTCDag. 

1108 

1109 Parameters 

1110 ---------- 

1111 job : `HTCJob` 

1112 HTCJob to add to the HTCDag as a FINAL job. 

1113 """ 

1114 # Add dag level attributes to each job 

1115 job.add_job_attrs(self.graph["attr"]) 

1116 

1117 self.graph["final_job"] = job 

1118 

1119 def add_service_job(self, job): 

1120 """Add an HTCJob for the SERVICE job in HTCDag. 

1121 

1122 Parameters 

1123 ---------- 

1124 job : `HTCJob` 

1125 HTCJob to add to the HTCDag as a FINAL job. 

1126 """ 

1127 # Add dag level attributes to each job 

1128 job.add_job_attrs(self.graph["attr"]) 

1129 

1130 self.graph["service_job"] = job 

1131 

1132 def del_job(self, job_name): 

1133 """Delete the job from the DAG. 

1134 

1135 Parameters 

1136 ---------- 

1137 job_name : `str` 

1138 Name of job in DAG to delete. 

1139 """ 

1140 # Reconnect edges around node to delete 

1141 parents = self.predecessors(job_name) 

1142 children = self.successors(job_name) 

1143 self.add_edges_from(itertools.product(parents, children)) 

1144 

1145 # Delete job node (which deletes its edges). 

1146 self.remove_node(job_name) 

1147 

1148 def write(self, submit_path, job_subdir="", dag_subdir="", dag_rel_path=""): 

1149 """Write DAG to a file. 

1150 

1151 Parameters 

1152 ---------- 

1153 submit_path : `str` 

1154 Prefix path for all outputs. 

1155 job_subdir : `str`, optional 

1156 Template for job subdir (submit_path + job_subdir). 

1157 dag_subdir : `str`, optional 

1158 DAG subdir (submit_path + dag_subdir). 

1159 dag_rel_path : `str`, optional 

1160 Prefix to job_subdir for jobs inside subdag. 

1161 """ 

1162 self.graph["submit_path"] = submit_path 

1163 self.graph["dag_filename"] = os.path.join(dag_subdir, f"{self.graph['name']}.dag") 

1164 full_filename = os.path.join(submit_path, self.graph["dag_filename"]) 

1165 os.makedirs(os.path.dirname(full_filename), exist_ok=True) 

1166 

1167 try: 

1168 dagman_config_path = Path(self.graph["attr"]["bps_wms_config_path"]) 

1169 except KeyError: 

1170 dagman_config_path = None 

1171 with open(full_filename, "w") as fh: 

1172 if dagman_config_path is not None: 

1173 fh.write(f"CONFIG {dag_rel_path / dagman_config_path}\n") 

1174 

1175 for name, nodeval in self.nodes().items(): 

1176 try: 

1177 job = nodeval["data"] 

1178 except KeyError: 

1179 _LOG.error("Job %s doesn't have data (keys: %s).", name, nodeval.keys()) 

1180 raise 

1181 if job.subdag: 

1182 dag_subdir = f"subdags/{job.name}" 

1183 if "dir" in job.dagcmds: 

1184 subdir = job.dagcmds["dir"] 

1185 else: 

1186 subdir = job_subdir 

1187 if dagman_config_path is not None: 

1188 job.subdag.add_attribs({"bps_wms_config_path": str(dagman_config_path)}) 

1189 job.subdag.write(submit_path, subdir, dag_subdir, "../..") 

1190 fh.write( 

1191 f"SUBDAG EXTERNAL {job.name} {Path(job.subdag.graph['dag_filename']).name} " 

1192 f"DIR {dag_subdir}\n" 

1193 ) 

1194 if job.dagcmds: 

1195 _htc_write_job_commands(fh, job.name, job.dagcmds) 

1196 else: 

1197 job.write_submit_file(submit_path) 

1198 job.write_dag_commands(fh, dag_rel_path) 

1199 

1200 for edge in self.edges(): 

1201 print(f"PARENT {edge[0]} CHILD {edge[1]}", file=fh) 

1202 print(f"DOT {self.name}.dot", file=fh) 

1203 print(f"NODE_STATUS_FILE {self.name}.node_status", file=fh) 

1204 

1205 # Add bps attributes to dag submission 

1206 for key, value in self.graph["attr"].items(): 

1207 print(f'SET_JOB_ATTR {key}= "{htc_escape(value)}"', file=fh) 

1208 

1209 # Add special nodes if any. 

1210 special_jobs = { 

1211 "FINAL": self.graph["final_job"], 

1212 "SERVICE": self.graph["service_job"], 

1213 } 

1214 for dagcmd, job in special_jobs.items(): 

1215 if job is not None: 

1216 job.write_submit_file(submit_path) 

1217 job.write_dag_commands(fh, dag_rel_path, dagcmd) 

1218 

1219 def dump(self, fh): 

1220 """Dump DAG info to output stream. 

1221 

1222 Parameters 

1223 ---------- 

1224 fh : `typing.IO` 

1225 Where to dump DAG info as text. 

1226 """ 

1227 for key, value in self.graph: 

1228 print(f"{key}={value}", file=fh) 

1229 for name, data in self.nodes().items(): 

1230 print(f"{name}:", file=fh) 

1231 data.dump(fh) 

1232 for edge in self.edges(): 

1233 print(f"PARENT {edge[0]} CHILD {edge[1]}", file=fh) 

1234 if self.graph["final_job"]: 

1235 print(f"FINAL {self.graph['final_job'].name}:", file=fh) 

1236 self.graph["final_job"].dump(fh) 

1237 

1238 def write_dot(self, filename): 

1239 """Write a dot version of the DAG. 

1240 

1241 Parameters 

1242 ---------- 

1243 filename : `str` 

1244 Name of the dot file. 

1245 """ 

1246 pos = networkx.nx_agraph.graphviz_layout(self) 

1247 networkx.draw(self, pos=pos) 

1248 networkx.drawing.nx_pydot.write_dot(self, filename) 

1249 

1250 

1251def condor_q(constraint=None, schedds=None, **kwargs): 

1252 """Get information about the jobs in the HTCondor job queue(s). 

1253 

1254 Parameters 

1255 ---------- 

1256 constraint : `str`, optional 

1257 Constraints to be passed to job query. 

1258 schedds : `dict` [`str`, `htcondor.Schedd`], optional 

1259 HTCondor schedulers which to query for job information. If None 

1260 (default), the query will be run against local scheduler only. 

1261 **kwargs : `~typing.Any` 

1262 Additional keyword arguments that need to be passed to the internal 

1263 query method. 

1264 

1265 Returns 

1266 ------- 

1267 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]] 

1268 Information about jobs satisfying the search criteria where for each 

1269 Scheduler, local HTCondor job ids are mapped to their respective 

1270 classads. 

1271 """ 

1272 return condor_query(constraint, schedds, htc_query_present, **kwargs) 

1273 

1274 

1275def condor_history(constraint=None, schedds=None, **kwargs): 

1276 """Get information about the jobs from HTCondor history records. 

1277 

1278 Parameters 

1279 ---------- 

1280 constraint : `str`, optional 

1281 Constraints to be passed to job query. 

1282 schedds : `dict` [`str`, `htcondor.Schedd`], optional 

1283 HTCondor schedulers which to query for job information. If None 

1284 (default), the query will be run against the history file of 

1285 the local scheduler only. 

1286 **kwargs : `~typing.Any` 

1287 Additional keyword arguments that need to be passed to the internal 

1288 query method. 

1289 

1290 Returns 

1291 ------- 

1292 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]] 

1293 Information about jobs satisfying the search criteria where for each 

1294 Scheduler, local HTCondor job ids are mapped to their respective 

1295 classads. 

1296 """ 

1297 return condor_query(constraint, schedds, htc_query_history, **kwargs) 

1298 

1299 

1300def condor_query(constraint=None, schedds=None, query_func=htc_query_present, **kwargs): 

1301 """Get information about HTCondor jobs. 

1302 

1303 Parameters 

1304 ---------- 

1305 constraint : `str`, optional 

1306 Constraints to be passed to job query. 

1307 schedds : `dict` [`str`, `htcondor.Schedd`], optional 

1308 HTCondor schedulers which to query for job information. If None 

1309 (default), the query will be run against the history file of 

1310 the local scheduler only. 

1311 query_func : `~collections.abc.Callable` 

1312 An query function which takes following arguments: 

1313 

1314 - ``schedds``: Schedulers to query (`list` [`htcondor.Schedd`]). 

1315 - ``**kwargs``: Keyword arguments that will be passed to the query 

1316 function. 

1317 **kwargs : `~typing.Any` 

1318 Additional keyword arguments that need to be passed to the query 

1319 method. 

1320 

1321 Returns 

1322 ------- 

1323 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str`, `~typing.Any`]]] 

1324 Information about jobs satisfying the search criteria where for each 

1325 Scheduler, local HTCondor job ids are mapped to their respective 

1326 classads. 

1327 """ 

1328 if not schedds: 

1329 coll = htcondor.Collector() 

1330 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) 

1331 schedds = {schedd_ad["Name"]: htcondor.Schedd(schedd_ad)} 

1332 

1333 # Make sure that 'ClusterId' and 'ProcId' attributes are always included 

1334 # in the job classad. They are needed to construct the job id. 

1335 added_attrs = set() 

1336 if "projection" in kwargs and kwargs["projection"]: 

1337 requested_attrs = set(kwargs["projection"]) 

1338 required_attrs = {"ClusterId", "ProcId"} 

1339 added_attrs = required_attrs - requested_attrs 

1340 for attr in added_attrs: 

1341 kwargs["projection"].append(attr) 

1342 

1343 unwanted_attrs = {"Env", "Environment"} | added_attrs 

1344 job_info = defaultdict(dict) 

1345 for schedd_name, job_ad in query_func(schedds, constraint=constraint, **kwargs): 

1346 id_ = f"{job_ad['ClusterId']}.{job_ad['ProcId']}" 

1347 for attr in set(job_ad) & unwanted_attrs: 

1348 del job_ad[attr] 

1349 job_info[schedd_name][id_] = job_ad 

1350 _LOG.debug("query returned %d jobs", sum(len(val) for val in job_info.values())) 

1351 

1352 # Restore the list of the requested attributes to its original value 

1353 # if needed. 

1354 if added_attrs: 

1355 for attr in added_attrs: 

1356 kwargs["projection"].remove(attr) 

1357 

1358 # When returning the results filter out entries for schedulers with no jobs 

1359 # matching the search criteria. 

1360 return {key: val for key, val in job_info.items() if val} 

1361 

1362 

1363def condor_search(constraint=None, hist=None, schedds=None): 

1364 """Search for running and finished jobs satisfying given criteria. 

1365 

1366 Parameters 

1367 ---------- 

1368 constraint : `str`, optional 

1369 Constraints to be passed to job query. 

1370 hist : `float` 

1371 Limit history search to this many days. 

1372 schedds : `dict` [`str`, `htcondor.Schedd`], optional 

1373 The list of the HTCondor schedulers which to query for job information. 

1374 If None (default), only the local scheduler will be queried. 

1375 

1376 Returns 

1377 ------- 

1378 job_info : `dict` [`str`, `dict` [`str`, `dict` [`str` `~typing.Any`]]] 

1379 Information about jobs satisfying the search criteria where for each 

1380 Scheduler, local HTCondor job ids are mapped to their respective 

1381 classads. 

1382 """ 

1383 if not schedds: 

1384 coll = htcondor.Collector() 

1385 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) 

1386 schedds = {schedd_ad["Name"]: htcondor.Schedd(locate_ad=schedd_ad)} 

1387 

1388 job_info = condor_q(constraint=constraint, schedds=schedds) 

1389 if hist is not None: 

1390 _LOG.debug("Searching history going back %s days", hist) 

1391 epoch = (datetime.now() - timedelta(days=hist)).timestamp() 

1392 constraint += f" && (CompletionDate >= {epoch} || JobFinishedHookDone >= {epoch})" 

1393 hist_info = condor_history(constraint, schedds=schedds) 

1394 update_job_info(job_info, hist_info) 

1395 return job_info 

1396 

1397 

1398def condor_status(constraint=None, coll=None): 

1399 """Get information about HTCondor pool. 

1400 

1401 Parameters 

1402 ---------- 

1403 constraint : `str`, optional 

1404 Constraints to be passed to the query. 

1405 coll : `htcondor.Collector`, optional 

1406 Object representing HTCondor collector daemon. 

1407 

1408 Returns 

1409 ------- 

1410 pool_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1411 Mapping between HTCondor slot names and slot information (classAds). 

1412 """ 

1413 if coll is None: 

1414 coll = htcondor.Collector() 

1415 try: 

1416 pool_ads = coll.query(constraint=constraint) 

1417 except OSError as ex: 

1418 raise RuntimeError(f"Problem querying the Collector. (Constraint='{constraint}')") from ex 

1419 

1420 pool_info = {} 

1421 for slot in pool_ads: 

1422 pool_info[slot["name"]] = dict(slot) 

1423 _LOG.debug("condor_status returned %d ads", len(pool_info)) 

1424 return pool_info 

1425 

1426 

1427def update_job_info(job_info, other_info): 

1428 """Update results of a job query with results from another query. 

1429 

1430 Parameters 

1431 ---------- 

1432 job_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1433 Results of the job query that needs to be updated. 

1434 other_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1435 Results of the other job query. 

1436 

1437 Returns 

1438 ------- 

1439 job_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1440 The updated results. 

1441 """ 

1442 for schedd_name, others in other_info.items(): 

1443 try: 

1444 jobs = job_info[schedd_name] 

1445 except KeyError: 

1446 job_info[schedd_name] = others 

1447 else: 

1448 for id_, ad in others.items(): 

1449 jobs.setdefault(id_, {}).update(ad) 

1450 return job_info 

1451 

1452 

1453def count_jobs_in_single_dag( 

1454 filename: str | os.PathLike, 

1455) -> tuple[Counter[str], dict[str, str], dict[str, WmsNodeType]]: 

1456 """Build bps_run_summary string from dag file. 

1457 

1458 Parameters 

1459 ---------- 

1460 filename : `str` 

1461 Path that includes dag file for a run. 

1462 

1463 Returns 

1464 ------- 

1465 counts : `Counter` [`str`] 

1466 Semi-colon separated list of job labels and counts. 

1467 (Same format as saved in dag classad). 

1468 job_name_to_label : `dict` [`str`, `str`] 

1469 Mapping of job names to job labels. 

1470 job_name_to_type : `dict` [`str`, `lsst.ctrl.bps.htcondor.WmsNodeType`] 

1471 Mapping of job names to job types 

1472 (e.g., payload, final, service). 

1473 """ 

1474 # Later code depends upon insertion order 

1475 counts: Counter = Counter() # counts of payload jobs per label 

1476 job_name_to_label: dict[str, str] = {} 

1477 job_name_to_type: dict[str, WmsNodeType] = {} 

1478 with open(filename) as fh: 

1479 for line in fh: 

1480 # Skip any line that contains commands irrelevant to job counting. 

1481 if not line.startswith( 

1482 ( 

1483 "JOB", 

1484 "FINAL", 

1485 "SERVICE", 

1486 "SUBDAG EXTERNAL", 

1487 ) 

1488 ): 

1489 continue 

1490 

1491 m = re.match( 

1492 r"(?P<command>JOB|FINAL|SERVICE|SUBDAG EXTERNAL)\s+" 

1493 r'(?P<jobname>(?P<wms>wms_)?\S+)\s+"?(?P<subfile>\S+)"?\s*' 

1494 r'(DIR "?(?P<dir>[^\s"]+)"?)?\s*(?P<noop>NOOP)?', 

1495 line, 

1496 ) 

1497 if m: 

1498 job_name = m.group("jobname") 

1499 name_parts = job_name.split("_") 

1500 

1501 label = "" 

1502 if m.group("dir"): 

1503 dir_match = re.search(r"jobs/([^\s/]+)", m.group("dir")) 

1504 if dir_match: 

1505 label = dir_match.group(1) 

1506 else: 

1507 _LOG.debug("Parse DAG: unparsed dir = %s", line) 

1508 elif m.group("subfile"): 

1509 subfile_match = re.search(r"jobs/([^\s/]+)", m.group("subfile")) 

1510 if subfile_match: 

1511 label = m.group("subfile").split("/")[1] 

1512 else: 

1513 label = pegasus_name_to_label(job_name) 

1514 

1515 match m.group("command"): 

1516 case "JOB": 

1517 if m.group("noop"): 

1518 job_type = WmsNodeType.NOOP 

1519 # wms_noop_label 

1520 label = name_parts[2] 

1521 elif m.group("wms"): 

1522 if name_parts[1] == "check": 

1523 job_type = WmsNodeType.SUBDAG_CHECK 

1524 # wms_check_status_wms_group_label 

1525 label = name_parts[5] 

1526 else: 

1527 _LOG.warning( 

1528 "Unexpected skipping of dag line due to unknown wms job: %s", line 

1529 ) 

1530 else: 

1531 job_type = WmsNodeType.PAYLOAD 

1532 if label == "init": 

1533 label = "pipetaskInit" 

1534 counts[label] += 1 

1535 case "FINAL": 

1536 job_type = WmsNodeType.FINAL 

1537 counts[label] += 1 # final counts a payload job. 

1538 case "SERVICE": 

1539 job_type = WmsNodeType.SERVICE 

1540 case "SUBDAG EXTERNAL": 

1541 job_type = WmsNodeType.SUBDAG 

1542 label = name_parts[2] 

1543 

1544 job_name_to_label[job_name] = label 

1545 job_name_to_type[job_name] = job_type 

1546 else: 

1547 # The line should, but didn't match the pattern above. Probably 

1548 # problems with regex. 

1549 _LOG.warning("Unexpected skipping of dag line: %s", line) 

1550 

1551 return counts, job_name_to_label, job_name_to_type 

1552 

1553 

1554def summarize_dag(dir_name: str) -> tuple[str, dict[str, str], dict[str, WmsNodeType]]: 

1555 """Build bps_run_summary string from dag file. 

1556 

1557 Parameters 

1558 ---------- 

1559 dir_name : `str` 

1560 Path that includes dag file for a run. 

1561 

1562 Returns 

1563 ------- 

1564 summary : `str` 

1565 Semi-colon separated list of job labels and counts 

1566 (Same format as saved in dag classad). 

1567 job_name_to_label : `dict` [`str`, `str`] 

1568 Mapping of job names to job labels. 

1569 job_name_to_type : `dict` [`str`, `lsst.ctrl.bps.htcondor.WmsNodeType`] 

1570 Mapping of job names to job types 

1571 (e.g., payload, final, service). 

1572 """ 

1573 # Later code depends upon insertion order 

1574 counts: Counter[str] = Counter() # counts of payload jobs per label 

1575 job_name_to_label: dict[str, str] = {} 

1576 job_name_to_type: dict[str, WmsNodeType] = {} 

1577 for filename in Path(dir_name).glob("*.dag"): 

1578 single_counts, single_job_name_to_label, single_job_name_to_type = count_jobs_in_single_dag(filename) 

1579 counts += single_counts 

1580 _update_dicts(job_name_to_label, single_job_name_to_label) 

1581 _update_dicts(job_name_to_type, single_job_name_to_type) 

1582 

1583 for filename in Path(dir_name).glob("subdags/*/*.dag"): 

1584 single_counts, single_job_name_to_label, single_job_name_to_type = count_jobs_in_single_dag(filename) 

1585 counts += single_counts 

1586 _update_dicts(job_name_to_label, single_job_name_to_label) 

1587 _update_dicts(job_name_to_type, single_job_name_to_type) 

1588 

1589 summary = ";".join([f"{name}:{counts[name]}" for name in counts]) 

1590 _LOG.debug("summarize_dag: %s %s %s", summary, job_name_to_label, job_name_to_type) 

1591 return summary, job_name_to_label, job_name_to_type 

1592 

1593 

1594def pegasus_name_to_label(name): 

1595 """Convert pegasus job name to a label for the report. 

1596 

1597 Parameters 

1598 ---------- 

1599 name : `str` 

1600 Name of job. 

1601 

1602 Returns 

1603 ------- 

1604 label : `str` 

1605 Label for job. 

1606 """ 

1607 label = "UNK" 

1608 if name.startswith("create_dir") or name.startswith("stage_in") or name.startswith("stage_out"): 

1609 label = "pegasus" 

1610 else: 

1611 m = re.match(r"pipetask_(\d+_)?([^_]+)", name) 

1612 if m: 

1613 label = m.group(2) 

1614 if label == "init": 

1615 label = "pipetaskInit" 

1616 

1617 return label 

1618 

1619 

1620def read_single_dag_status(filename: str | os.PathLike) -> dict[str, Any]: 

1621 """Read the node status file for DAG summary information. 

1622 

1623 Parameters 

1624 ---------- 

1625 filename : `str` or `Path.pathlib` 

1626 Node status filename. 

1627 

1628 Returns 

1629 ------- 

1630 dag_ad : `dict` [`str`, `~typing.Any`] 

1631 DAG summary information. 

1632 """ 

1633 dag_ad: dict[str, Any] = {} 

1634 

1635 # While this is probably more up to date than dag classad, only read from 

1636 # file if need to. 

1637 try: 

1638 node_stat_file = Path(filename) 

1639 _LOG.debug("Reading Node Status File %s", node_stat_file) 

1640 with open(node_stat_file) as infh: 

1641 dag_ad = dict(classad.parseNext(infh)) # pylint: disable=E1101 

1642 

1643 if not dag_ad: 

1644 # Pegasus check here 

1645 metrics_file = node_stat_file.with_suffix(".dag.metrics") 

1646 if metrics_file.exists(): 

1647 with open(metrics_file) as infh: 

1648 metrics = json.load(infh) 

1649 dag_ad["NodesTotal"] = metrics.get("jobs", 0) 

1650 dag_ad["NodesFailed"] = metrics.get("jobs_failed", 0) 

1651 dag_ad["NodesDone"] = metrics.get("jobs_succeeded", 0) 

1652 metrics_file = node_stat_file.with_suffix(".metrics") 

1653 with open(metrics_file) as infh: 

1654 metrics = json.load(infh) 

1655 dag_ad["NodesTotal"] = metrics["wf_metrics"]["total_jobs"] 

1656 except (OSError, PermissionError): 

1657 pass 

1658 

1659 _LOG.debug("read_dag_status: %s", dag_ad) 

1660 return dag_ad 

1661 

1662 

1663def read_dag_status(wms_path: str | os.PathLike) -> dict[str, Any]: 

1664 """Read the node status file for DAG summary information. 

1665 

1666 Parameters 

1667 ---------- 

1668 wms_path : `str` or `os.PathLike` 

1669 Path that includes node status file for a run. 

1670 

1671 Returns 

1672 ------- 

1673 dag_ad : `dict` [`str`, `~typing.Any`] 

1674 DAG summary information, counts summed across any subdags. 

1675 """ 

1676 dag_ads: dict[str, Any] = {} 

1677 path = Path(wms_path) 

1678 try: 

1679 node_stat_file = next(path.glob("*.node_status")) 

1680 except StopIteration as exc: 

1681 raise FileNotFoundError(f"DAGMan node status not found in {wms_path}") from exc 

1682 

1683 dag_ads = read_single_dag_status(node_stat_file) 

1684 

1685 for node_stat_file in path.glob("subdags/*/*.node_status"): 

1686 dag_ad = read_single_dag_status(node_stat_file) 

1687 dag_ads["JobProcsHeld"] += dag_ad.get("JobProcsHeld", 0) 

1688 dag_ads["NodesPost"] += dag_ad.get("NodesPost", 0) 

1689 dag_ads["JobProcsIdle"] += dag_ad.get("JobProcsIdle", 0) 

1690 dag_ads["NodesTotal"] += dag_ad.get("NodesTotal", 0) 

1691 dag_ads["NodesFailed"] += dag_ad.get("NodesFailed", 0) 

1692 dag_ads["NodesDone"] += dag_ad.get("NodesDone", 0) 

1693 dag_ads["NodesQueued"] += dag_ad.get("NodesQueued", 0) 

1694 dag_ads["NodesPre"] += dag_ad.get("NodesReady", 0) 

1695 dag_ads["NodesFutile"] += dag_ad.get("NodesFutile", 0) 

1696 dag_ads["NodesUnready"] += dag_ad.get("NodesUnready", 0) 

1697 

1698 return dag_ads 

1699 

1700 

1701def read_single_node_status(filename: str | os.PathLike, init_fake_id: int) -> dict[str, Any]: 

1702 """Read entire node status file. 

1703 

1704 Parameters 

1705 ---------- 

1706 filename : `str` or `pathlib.Path` 

1707 Node status filename. 

1708 init_fake_id : `int` 

1709 Initial fake id value. 

1710 

1711 Returns 

1712 ------- 

1713 jobs : `dict` [`str`, `~typing.Any`] 

1714 DAG summary information compiled from the node status file combined 

1715 with the information found in the node event log. 

1716 

1717 Currently, if the same job attribute is found in both files, its value 

1718 from the event log takes precedence over the value from the node status 

1719 file. 

1720 """ 

1721 filename = Path(filename) 

1722 

1723 # Get jobid info from other places to fill in gaps in info from node_status 

1724 _, job_name_to_label, job_name_to_type = count_jobs_in_single_dag(filename.with_suffix(".dag")) 

1725 loginfo: dict[str, dict[str, Any]] = {} 

1726 try: 

1727 wms_workflow_id, loginfo = read_single_dag_log(filename.with_suffix(".dag.dagman.log")) 

1728 loginfo = read_single_dag_nodes_log(filename.with_suffix(".dag.nodes.log")) 

1729 except (OSError, PermissionError): 

1730 pass 

1731 

1732 job_name_to_id: dict[str, str] = {} 

1733 _LOG.debug("loginfo = %s", loginfo) 

1734 log_job_name_to_id: dict[str, str] = {} 

1735 for job_id, job_info in loginfo.items(): 

1736 if "LogNotes" in job_info: 

1737 m = re.match(r"DAG Node: (\S+)", job_info["LogNotes"]) 

1738 if m: 

1739 job_name = m.group(1) 

1740 log_job_name_to_id[job_name] = job_id 

1741 job_info["DAGNodeName"] = job_name 

1742 job_info["wms_node_type"] = job_name_to_type[job_name] 

1743 job_info["bps_job_label"] = job_name_to_label[job_name] 

1744 

1745 jobs = {} 

1746 fake_id = init_fake_id # For nodes that do not yet have a job id, give fake one 

1747 try: 

1748 with open(filename) as fh: 

1749 for ad in classad.parseAds(fh): 

1750 match ad["Type"]: 

1751 case "DagStatus": 

1752 # Skip DAG summary. 

1753 pass 

1754 case "NodeStatus": 

1755 job_name = ad["Node"] 

1756 if job_name in job_name_to_label: 

1757 job_label = job_name_to_label[job_name] 

1758 elif "_" in job_name: 

1759 job_label = job_name.split("_")[1] 

1760 else: 

1761 job_label = job_name 

1762 

1763 job = dict(ad) 

1764 if job_name in log_job_name_to_id: 

1765 job_id = str(log_job_name_to_id[job_name]) 

1766 _update_dicts(job, loginfo[job_id]) 

1767 else: 

1768 job_id = str(fake_id) 

1769 job = dict(ad) 

1770 fake_id -= 1 

1771 jobs[job_id] = job 

1772 job_name_to_id[job_name] = job_id 

1773 

1774 # Make job info as if came from condor_q. 

1775 job["ClusterId"] = int(float(job_id)) 

1776 job["DAGManJobID"] = wms_workflow_id 

1777 job["DAGNodeName"] = job_name 

1778 job["bps_job_label"] = job_label 

1779 job["wms_node_type"] = job_name_to_type[job_name] 

1780 

1781 case "StatusEnd": 

1782 # Skip node status file "epilog". 

1783 pass 

1784 case _: 

1785 _LOG.debug( 

1786 "Ignoring unknown classad type '%s' in the node status file '%s'", 

1787 ad["Type"], 

1788 filename, 

1789 ) 

1790 except (OSError, PermissionError): 

1791 pass 

1792 

1793 # Check for missing jobs (e.g., submission failure or not submitted yet) 

1794 # Use dag info to create job placeholders 

1795 for name in set(job_name_to_label) - set(job_name_to_id): 

1796 if name in log_job_name_to_id: # job was in nodes.log, but not node_status 

1797 job_id = str(log_job_name_to_id[name]) 

1798 job = dict(loginfo[job_id]) 

1799 else: 

1800 job_id = str(fake_id) 

1801 fake_id -= 1 

1802 job = {} 

1803 job["NodeStatus"] = NodeStatus.NOT_READY 

1804 

1805 job["ClusterId"] = int(float(job_id)) 

1806 job["ProcId"] = 0 

1807 job["DAGManJobID"] = wms_workflow_id 

1808 job["DAGNodeName"] = name 

1809 job["bps_job_label"] = job_name_to_label[name] 

1810 job["wms_node_type"] = job_name_to_type[name] 

1811 jobs[f"{job['ClusterId']}.{job['ProcId']}"] = job 

1812 

1813 for job_info in jobs.values(): 

1814 job_info["from_dag_job"] = f"wms_{filename.stem}" 

1815 

1816 return jobs 

1817 

1818 

1819def read_node_status(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]: 

1820 """Read entire node status file. 

1821 

1822 Parameters 

1823 ---------- 

1824 wms_path : `str` or `os.PathLike` 

1825 Path that includes node status file for a run. 

1826 

1827 Returns 

1828 ------- 

1829 jobs : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1830 DAG summary information compiled from the node status file combined 

1831 with the information found in the node event log. 

1832 

1833 Currently, if the same job attribute is found in both files, its value 

1834 from the event log takes precedence over the value from the node status 

1835 file. 

1836 """ 

1837 jobs: dict[str, dict[str, Any]] = {} 

1838 init_fake_id = -1 

1839 

1840 # subdags may not have run so wouldn't have node_status file 

1841 # use dag files and let read_single_node_status handle missing 

1842 # node_status file. 

1843 for dag_filename in Path(wms_path).glob("*.dag"): 

1844 filename = dag_filename.with_suffix(".node_status") 

1845 info = read_single_node_status(filename, init_fake_id) 

1846 init_fake_id -= len(info) 

1847 _update_dicts(jobs, info) 

1848 

1849 for dag_filename in Path(wms_path).glob("subdags/*/*.dag"): 

1850 filename = dag_filename.with_suffix(".node_status") 

1851 info = read_single_node_status(filename, init_fake_id) 

1852 init_fake_id -= len(info) 

1853 _update_dicts(jobs, info) 

1854 

1855 # Propagate pruned from subdags to jobs 

1856 name_to_id: dict[str, str] = {} 

1857 missing_status: dict[str, list[str]] = {} 

1858 for id_, job in jobs.items(): 

1859 if job["DAGNodeName"].startswith("wms_"): 

1860 name_to_id[job["DAGNodeName"]] = id_ 

1861 if "NodeStatus" not in job or job["NodeStatus"] == NodeStatus.NOT_READY: 

1862 missing_status.setdefault(job["from_dag_job"], []).append(id_) 

1863 

1864 for name, dag_id in name_to_id.items(): 

1865 dag_status = jobs[dag_id].get("NodeStatus", NodeStatus.NOT_READY) 

1866 if dag_status in {NodeStatus.NOT_READY, NodeStatus.FUTILE}: 

1867 for id_ in missing_status.get(name, []): 

1868 jobs[id_]["NodeStatus"] = dag_status 

1869 

1870 return jobs 

1871 

1872 

1873def read_single_dag_log(log_filename: str | os.PathLike) -> tuple[str, dict[str, dict[str, Any]]]: 

1874 """Read job information from the DAGMan log file. 

1875 

1876 Parameters 

1877 ---------- 

1878 log_filename : `str` or `os.PathLike` 

1879 DAGMan log filename. 

1880 

1881 Returns 

1882 ------- 

1883 wms_workflow_id : `str` 

1884 HTCondor job id (i.e., <ClusterId>.<ProcId>) of the DAGMan job. 

1885 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1886 HTCondor job information read from the log file mapped to HTCondor 

1887 job id. 

1888 

1889 Raises 

1890 ------ 

1891 FileNotFoundError 

1892 If cannot find DAGMan log in given wms_path. 

1893 """ 

1894 wms_workflow_id = "0" 

1895 dag_info: dict[str, dict[str, Any]] = {} 

1896 

1897 filename = Path(log_filename) 

1898 if filename.exists(): 

1899 _LOG.debug("dag node log filename: %s", filename) 

1900 

1901 info: dict[str, Any] = {} 

1902 job_event_log = htcondor.JobEventLog(str(filename)) 

1903 for event in job_event_log.events(stop_after=0): 

1904 id_ = f"{event['Cluster']}.{event['Proc']}" 

1905 if id_ not in info: 

1906 info[id_] = {} 

1907 wms_workflow_id = id_ # taking last job id in case of restarts 

1908 _update_dicts(info[id_], event) 

1909 info[id_][f"{event.type.name.lower()}_time"] = event["EventTime"] 

1910 

1911 # only save latest DAG job 

1912 dag_info = {wms_workflow_id: info[wms_workflow_id]} 

1913 

1914 return wms_workflow_id, dag_info 

1915 

1916 

1917def read_dag_log(wms_path: str | os.PathLike) -> tuple[str, dict[str, Any]]: 

1918 """Read job information from the DAGMan log file. 

1919 

1920 Parameters 

1921 ---------- 

1922 wms_path : `str` or `os.PathLike` 

1923 Path containing the DAGMan log file. 

1924 

1925 Returns 

1926 ------- 

1927 wms_workflow_id : `str` 

1928 HTCondor job id (i.e., <ClusterId>.<ProcId>) of the DAGMan job. 

1929 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1930 HTCondor job information read from the log file mapped to HTCondor 

1931 job id. 

1932 

1933 Raises 

1934 ------ 

1935 FileNotFoundError 

1936 If cannot find DAGMan log in given wms_path. 

1937 """ 

1938 wms_workflow_id = MISSING_ID 

1939 dag_info: dict[str, dict[str, Any]] = {} 

1940 

1941 path = Path(wms_path) 

1942 if path.exists(): 

1943 try: 

1944 filename = next(path.glob("*.dag.dagman.log")) 

1945 except StopIteration as exc: 

1946 raise FileNotFoundError(f"DAGMan log not found in {wms_path}") from exc 

1947 _LOG.debug("dag node log filename: %s", filename) 

1948 wms_workflow_id, dag_info = read_single_dag_log(filename) 

1949 

1950 return wms_workflow_id, dag_info 

1951 

1952 

1953def read_single_dag_nodes_log(filename: str | os.PathLike) -> dict[str, dict[str, Any]]: 

1954 """Read job information from the DAGMan nodes log file. 

1955 

1956 Parameters 

1957 ---------- 

1958 filename : `str` or `os.PathLike` 

1959 Path containing the DAGMan nodes log file. 

1960 

1961 Returns 

1962 ------- 

1963 info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

1964 HTCondor job information read from the log file mapped to HTCondor 

1965 job id. 

1966 

1967 Raises 

1968 ------ 

1969 FileNotFoundError 

1970 If cannot find DAGMan node log in given wms_path. 

1971 """ 

1972 _LOG.debug("dag node log filename: %s", filename) 

1973 filename = Path(filename) 

1974 

1975 info: dict[str, dict[str, Any]] = {} 

1976 if not filename.exists(): 

1977 raise FileNotFoundError(f"{filename} does not exist") 

1978 

1979 try: 

1980 job_event_log = htcondor.JobEventLog(str(filename)) 

1981 except htcondor.HTCondorIOError as ex: 

1982 _LOG.error("Problem reading nodes log file (%s): %s", filename, ex) 

1983 import traceback 

1984 

1985 traceback.print_stack() 

1986 raise 

1987 for event in job_event_log.events(stop_after=0): 

1988 _LOG.debug("log event type = %s, keys = %s", event["EventTypeNumber"], event.keys()) 

1989 

1990 try: 

1991 id_ = f"{event['Cluster']}.{event['Proc']}" 

1992 except KeyError: 

1993 _LOG.warn( 

1994 "Log event missing ids (DAGNodeName=%s, EventTime=%s, EventTypeNumber=%s)", 

1995 event.get("DAGNodeName", "UNK"), 

1996 event.get("EventTime", "UNK"), 

1997 event.get("EventTypeNumber", "UNK"), 

1998 ) 

1999 else: 

2000 if id_ not in info: 

2001 info[id_] = {} 

2002 # Workaround: Please check to see if still problem in 

2003 # future HTCondor versions. Sometimes get a 

2004 # JobAbortedEvent for a subdag job after it already 

2005 # terminated normally. Seems to happen when using job 

2006 # plus subdags. 

2007 if event["EventTypeNumber"] == 9 and info[id_].get("EventTypeNumber", -1) == 5: 

2008 _LOG.debug("Skipping spurious JobAbortedEvent: %s", dict(event)) 

2009 else: 

2010 _update_dicts(info[id_], event) 

2011 info[id_][f"{event.type.name.lower()}_time"] = event["EventTime"] 

2012 

2013 return info 

2014 

2015 

2016def read_dag_nodes_log(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]: 

2017 """Read job information from the DAGMan nodes log file. 

2018 

2019 Parameters 

2020 ---------- 

2021 wms_path : `str` or `os.PathLike` 

2022 Path containing the DAGMan nodes log file. 

2023 

2024 Returns 

2025 ------- 

2026 info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

2027 HTCondor job information read from the log file mapped to HTCondor 

2028 job id. 

2029 

2030 Raises 

2031 ------ 

2032 FileNotFoundError 

2033 If cannot find DAGMan node log in given wms_path. 

2034 """ 

2035 info: dict[str, dict[str, Any]] = {} 

2036 for filename in Path(wms_path).glob("*.dag.nodes.log"): 

2037 _LOG.debug("dag node log filename: %s", filename) 

2038 _update_dicts(info, read_single_dag_nodes_log(filename)) 

2039 

2040 # If submitted, the main nodes log file should exist 

2041 if not info: 

2042 raise FileNotFoundError(f"DAGMan node log not found in {wms_path}") 

2043 

2044 # Subdags will not have dag nodes log files if they haven't 

2045 # started running yet (so missing is not an error). 

2046 for filename in Path(wms_path).glob("subdags/*/*.dag.nodes.log"): 

2047 _LOG.debug("dag node log filename: %s", filename) 

2048 _update_dicts(info, read_single_dag_nodes_log(filename)) 

2049 

2050 return info 

2051 

2052 

2053def read_dag_info(wms_path: str | os.PathLike) -> dict[str, dict[str, Any]]: 

2054 """Read custom DAGMan job information from the file. 

2055 

2056 Parameters 

2057 ---------- 

2058 wms_path : `str` or `os.PathLike` 

2059 Path containing the file with the DAGMan job info. 

2060 

2061 Returns 

2062 ------- 

2063 dag_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

2064 HTCondor job information. 

2065 

2066 Raises 

2067 ------ 

2068 FileNotFoundError 

2069 If cannot find DAGMan job info file in the given location. 

2070 """ 

2071 dag_info: dict[str, dict[str, Any]] = {} 

2072 try: 

2073 filename = next(Path(wms_path).glob("*.info.json")) 

2074 except StopIteration as exc: 

2075 raise FileNotFoundError(f"File with DAGMan job information not found in {wms_path}") from exc 

2076 _LOG.debug("DAGMan job information filename: %s", filename) 

2077 try: 

2078 with open(filename) as fh: 

2079 dag_info = json.load(fh) 

2080 except (OSError, PermissionError) as exc: 

2081 _LOG.debug("Retrieving DAGMan job information failed: %s", exc) 

2082 return dag_info 

2083 

2084 

2085def write_dag_info(filename, dag_info): 

2086 """Write custom job information about DAGMan job. 

2087 

2088 Parameters 

2089 ---------- 

2090 filename : `str` 

2091 Name of the file where the information will be stored. 

2092 dag_info : `dict` [`str` `dict` [`str`, `~typing.Any`]] 

2093 Information about the DAGMan job. 

2094 """ 

2095 schedd_name = next(iter(dag_info)) 

2096 dag_id = next(iter(dag_info[schedd_name])) 

2097 dag_ad = dag_info[schedd_name][dag_id] 

2098 ad = {"ClusterId": dag_ad["ClusterId"], "GlobalJobId": dag_ad["GlobalJobId"]} 

2099 ad.update({key: val for key, val in dag_ad.items() if key.startswith("bps")}) 

2100 try: 

2101 with open(filename, "w") as fh: 

2102 info = {schedd_name: {dag_id: ad}} 

2103 json.dump(info, fh) 

2104 except (KeyError, OSError, PermissionError) as exc: 

2105 _LOG.debug("Persisting DAGMan job information failed: %s", exc) 

2106 

2107 

2108def htc_tweak_log_info(wms_path: str | Path, job: dict[str, Any]) -> None: 

2109 """Massage the given job info has same structure as if came from condor_q. 

2110 

2111 Parameters 

2112 ---------- 

2113 wms_path : `str` | `os.PathLike` 

2114 Path containing an HTCondor event log file. 

2115 job : `dict` [ `str`, `~typing.Any` ] 

2116 A mapping between HTCondor job id and job information read from 

2117 the log. 

2118 """ 

2119 _LOG.debug("htc_tweak_log_info: %s %s", wms_path, job) 

2120 

2121 # Use the presence of 'MyType' key as a proxy to determine if the job ad 

2122 # contains the info extracted from the event log. Exit early if it doesn't 

2123 # (e.g. it is a job ad for a pruned job). 

2124 if "MyType" not in job: 

2125 return 

2126 

2127 try: 

2128 job["ClusterId"] = job["Cluster"] 

2129 job["ProcId"] = job["Proc"] 

2130 except KeyError as e: 

2131 _LOG.error("Missing key %s in job: %s", str(e), job) 

2132 raise 

2133 job["Iwd"] = str(wms_path) 

2134 job["Owner"] = Path(wms_path).owner() 

2135 

2136 if "LogNotes" in job: 

2137 m = re.match(r"DAG Node: (\S+)", job["LogNotes"]) 

2138 if m: 

2139 job["DAGNodeName"] = m.group(1) 

2140 

2141 match job["MyType"]: 

2142 case "ExecuteEvent": 

2143 job["JobStatus"] = htcondor.JobStatus.RUNNING 

2144 case "JobTerminatedEvent" | "PostScriptTerminatedEvent": 

2145 job["JobStatus"] = htcondor.JobStatus.COMPLETED 

2146 case "SubmitEvent": 

2147 job["JobStatus"] = htcondor.JobStatus.IDLE 

2148 case "JobAbortedEvent": 

2149 job["JobStatus"] = htcondor.JobStatus.REMOVED 

2150 case "JobHeldEvent": 

2151 job["JobStatus"] = htcondor.JobStatus.HELD 

2152 case "JobReleaseEvent": 

2153 # If the job managing the execution of the root DAG is held and 

2154 # released this will be the last event showing up in its 

2155 # job event log even if the job is still running. If this is 

2156 # the last event for a job corresponding to the workflow node 

2157 # (either a normal payload job or the job managing the execution 

2158 # of an inner DAG), its final status will be determined later 

2159 # using node status log (see _htc_status_to_wms_state()). 

2160 job["JobStatus"] = htcondor.JobStatus.RUNNING if "DAGNodeName" not in job else None 

2161 case _: 

2162 _LOG.debug("Unknown log event type: %s", job["MyType"]) 

2163 job["JobStatus"] = None 

2164 

2165 # Use available information to add either "ExitCode" or "ExitSignal" 

2166 # attribute that captures respectively job's exit status (if it finished 

2167 # on its own accord) or its exit signal (if it was terminated by 

2168 # a signal). Also, include a flag "ExitBySignal" to make distinguishing 

2169 # between these two cases easy later on. 

2170 if job["JobStatus"] in { 

2171 htcondor.JobStatus.COMPLETED, 

2172 htcondor.JobStatus.HELD, 

2173 htcondor.JobStatus.REMOVED, 

2174 }: 

2175 new_job = HTC_JOB_AD_HANDLERS.handle(job) 

2176 if new_job is not None: 

2177 job = new_job 

2178 else: 

2179 _LOG.error("Could not determine exit status for job '%s.%s'", job["ClusterId"], job["ProcId"]) 

2180 

2181 

2182def htc_check_dagman_output(wms_path: str | os.PathLike) -> str: 

2183 """Check the DAGMan output for error messages. 

2184 

2185 Parameters 

2186 ---------- 

2187 wms_path : `str` or `os.PathLike` 

2188 Directory containing the DAGman output file. 

2189 

2190 Returns 

2191 ------- 

2192 message : `str` 

2193 Message containing error messages from the DAGMan output. Empty 

2194 string if no messages. 

2195 

2196 Raises 

2197 ------ 

2198 FileNotFoundError 

2199 If cannot find DAGMan standard output file in given wms_path. 

2200 """ 

2201 try: 

2202 filename = next(Path(wms_path).glob("*.dag.dagman.out")) 

2203 except StopIteration as exc: 

2204 raise FileNotFoundError(f"DAGMan standard output file not found in {wms_path}") from exc 

2205 _LOG.debug("dag output filename: %s", filename) 

2206 

2207 p = re.compile(r"^(\d\d/\d\d/\d\d \d\d:\d\d:\d\d) (Job submit try \d+/\d+ failed|Warning:.*$|ERROR:.*$)") 

2208 

2209 message = "" 

2210 try: 

2211 with open(filename) as fh: 

2212 last_submit_failed = "" # Since submit retries multiple times only report last one 

2213 for line in fh: 

2214 m = p.match(line) 

2215 if m: 

2216 if m.group(2).startswith("Job submit try"): 

2217 last_submit_failed = m.group(1) 

2218 elif m.group(2).startswith("ERROR: submit attempt failed"): 

2219 pass # Should be handled by Job submit try 

2220 elif m.group(2).startswith("Warning"): 

2221 if ".dag.nodes.log is in /tmp" in m.group(2): 

2222 last_warning = "Cannot submit from /tmp." 

2223 else: 

2224 last_warning = m.group(2) 

2225 elif m.group(2) == "ERROR: Warning is fatal error because of DAGMAN_USE_STRICT setting": 

2226 message += "ERROR: " 

2227 message += last_warning 

2228 message += "\n" 

2229 elif m.group(2) in [ 

2230 "ERROR: the following job(s) failed:", 

2231 "ERROR: the following Node(s) failed:", 

2232 ]: 

2233 pass 

2234 else: 

2235 message += m.group(2) 

2236 message += "\n" 

2237 

2238 if last_submit_failed: 

2239 message += f"Warn: Job submission issues (last: {last_submit_failed})" 

2240 except (OSError, PermissionError): 

2241 message = f"Warn: Could not read dagman output file from {wms_path}." 

2242 _LOG.debug("dag output file message: %s", message) 

2243 return message 

2244 

2245 

2246def _read_rescue_headers(infh: TextIO) -> tuple[list[str], list[str]]: 

2247 """Read header lines from a rescue file. 

2248 

2249 Parameters 

2250 ---------- 

2251 infh : `TextIO` 

2252 The rescue file from which to read the header lines. 

2253 

2254 Returns 

2255 ------- 

2256 header_lines : `list` [`str`] 

2257 Header lines read from the rescue file. 

2258 failed_subdags : `list` [`str`] 

2259 Names of failed subdag jobs. 

2260 """ 

2261 header_lines: list[str] = [] 

2262 failed = False 

2263 failed_subdags: list[str] = [] 

2264 

2265 for line in infh: 

2266 line = line.strip() 

2267 if line.startswith("#"): 

2268 if line.startswith("# Nodes that failed:"): 

2269 failed = True 

2270 header_lines.append(line) 

2271 elif failed: 

2272 orig_failed_nodes = line[1:].strip().split(",") 

2273 new_failed_nodes = [] 

2274 for node in orig_failed_nodes: 

2275 if node.startswith("wms_check_status"): 

2276 group_node = node[17:] 

2277 failed_subdags.append(group_node) 

2278 new_failed_nodes.append(group_node) 

2279 else: 

2280 new_failed_nodes.append(node) 

2281 header_lines.append(f"# {','.join(new_failed_nodes)}") 

2282 if orig_failed_nodes[-1] == "<ENDLIST>": 

2283 failed = False 

2284 else: 

2285 header_lines.append(line) 

2286 elif line.strip() == "": # end of headers 

2287 break 

2288 return header_lines, failed_subdags 

2289 

2290 

2291def _write_rescue_headers(header_lines: list[str], failed_subdags: list[str], outfh: TextIO) -> None: 

2292 """Write the header lines to the new rescue file. 

2293 

2294 Parameters 

2295 ---------- 

2296 header_lines : `list` [`str`] 

2297 Header lines to write to the new rescue file. 

2298 failed_subdags : `list` [`str`] 

2299 Job names of the failed subdags. 

2300 outfh : `TextIO` 

2301 New rescue file. 

2302 """ 

2303 done_str = "# Nodes premarked DONE" 

2304 pattern = f"^{done_str}:\\s+(\\d+)" 

2305 for header_line in header_lines: 

2306 m = re.match(pattern, header_line) 

2307 if m: 

2308 print(f"{done_str}: {int(m.group(1)) - len(failed_subdags)}", file=outfh) 

2309 else: 

2310 print(header_line, file=outfh) 

2311 

2312 print("", file=outfh) 

2313 

2314 

2315def _copy_done_lines(failed_subdags: list[str], infh: TextIO, outfh: TextIO) -> None: 

2316 """Copy the DONE lines from the original rescue file skipping 

2317 the failed group jobs. 

2318 

2319 Parameters 

2320 ---------- 

2321 failed_subdags : `list` [`str`] 

2322 List of job names for the failed subdags 

2323 infh : `TextIO` 

2324 Original rescue file to copy from. 

2325 outfh : `TextIO` 

2326 New rescue file to copy to. 

2327 """ 

2328 for line in infh: 

2329 line = line.strip() 

2330 try: 

2331 _, node_name = line.split() 

2332 except ValueError: 

2333 _LOG.error(f"Unexpected line in rescue file = '{line}'") 

2334 raise 

2335 if node_name not in failed_subdags: 

2336 print(line, file=outfh) 

2337 

2338 

2339def _update_rescue_file(rescue_file: Path) -> None: 

2340 """Update the subdag failures in the main rescue file 

2341 and backup the failed subdag dirs. 

2342 

2343 Parameters 

2344 ---------- 

2345 rescue_file : `pathlib.Path` 

2346 The main rescue file that needs to be updated. 

2347 """ 

2348 # To reduce memory requirements, not reading entire file into memory. 

2349 rescue_tmp = rescue_file.with_suffix(rescue_file.suffix + ".tmp") 

2350 with open(rescue_file) as infh: 

2351 header_lines, failed_subdags = _read_rescue_headers(infh) 

2352 with open(rescue_tmp, "w") as outfh: 

2353 _write_rescue_headers(header_lines, failed_subdags, outfh) 

2354 _copy_done_lines(failed_subdags, infh, outfh) 

2355 rescue_file.unlink() 

2356 rescue_tmp.rename(rescue_file) 

2357 for failed_subdag in failed_subdags: 

2358 htc_backup_files( 

2359 rescue_file.parent / "subdags" / failed_subdag, subdir=f"backups/subdags/{failed_subdag}" 

2360 ) 

2361 

2362 

2363def _update_dicts(dict1, dict2): 

2364 """Update dict1 with info in dict2. 

2365 

2366 (Basically an update for nested dictionaries.) 

2367 

2368 Parameters 

2369 ---------- 

2370 dict1 : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

2371 HTCondor job information to be updated. 

2372 dict2 : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

2373 Additional HTCondor job information. 

2374 """ 

2375 for key, value in dict2.items(): 

2376 if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict): 

2377 _update_dicts(dict1[key], value) 

2378 else: 

2379 dict1[key] = value 

2380 

2381 

2382def _locate_schedds(locate_all=False): 

2383 """Find out Scheduler daemons in an HTCondor pool. 

2384 

2385 Parameters 

2386 ---------- 

2387 locate_all : `bool`, optional 

2388 If True, all available schedulers in the HTCondor pool will be located. 

2389 False by default which means that the search will be limited to looking 

2390 for the Scheduler running on a local host. 

2391 

2392 Returns 

2393 ------- 

2394 schedds : `dict` [`str`, `htcondor.Schedd`] 

2395 A mapping between Scheduler names and Python objects allowing for 

2396 interacting with them. 

2397 """ 

2398 coll = htcondor.Collector() 

2399 

2400 schedd_ads = [] 

2401 if locate_all: 

2402 schedd_ads.extend(coll.locateAll(htcondor.DaemonTypes.Schedd)) 

2403 else: 

2404 schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd)) 

2405 return {ad["Name"]: htcondor.Schedd(ad) for ad in schedd_ads}