Coverage for python / lsst / ctrl / bps / panda / utils.py: 5%

509 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:53 +0000

1# This file is part of ctrl_bps_panda. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Utilities for bps PanDA plugin.""" 

29 

30__all__ = [ 

31 "add_decoder_prefix", 

32 "aggregate_by_basename", 

33 "convert_exec_string_to_hex", 

34 "copy_files_for_distribution", 

35 "extract_taskname", 

36 "get_idds_client", 

37 "get_idds_result", 

38 "idds_call_with_check", 

39] 

40 

41import binascii 

42import concurrent.futures 

43import json 

44import logging 

45import os 

46import random 

47import re 

48import tarfile 

49import time 

50import uuid 

51 

52import idds.common.utils as idds_utils 

53import pandaclient.idds_api 

54from idds.doma.workflowv2.domapandawork import DomaPanDAWork 

55from idds.workflowv2.workflow import AndCondition 

56from idds.workflowv2.workflow import Workflow as IDDS_client_workflow 

57 

58from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob, WmsStates 

59from lsst.ctrl.bps.panda.cmd_line_embedder import CommandLineEmbedder 

60from lsst.ctrl.bps.panda.constants import ( 

61 PANDA_DEFAULT_CLOUD, 

62 PANDA_DEFAULT_CORE_COUNT, 

63 PANDA_DEFAULT_MAX_ATTEMPTS, 

64 PANDA_DEFAULT_MAX_JOBS_PER_TASK, 

65 PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB, 

66 PANDA_DEFAULT_MAX_WALLTIME, 

67 PANDA_DEFAULT_NAME_LENGTH, 

68 PANDA_DEFAULT_ORDER_ID_MAP_FILE, 

69 PANDA_DEFAULT_PRIORITY, 

70 PANDA_DEFAULT_PROCESSING_TYPE, 

71 PANDA_DEFAULT_PROD_SOURCE_LABEL, 

72 PANDA_DEFAULT_RSS, 

73 PANDA_DEFAULT_RSS_MAX, 

74 PANDA_DEFAULT_TASK_TYPE, 

75 PANDA_DEFAULT_VO, 

76) 

77from lsst.resources import ResourcePath 

78 

79_LOG = logging.getLogger(__name__) 

80 

81 

82def extract_taskname(s: str) -> str: 

83 """Extract the task name from a string that follows a pattern 

84 CampaignName_timestamp_TaskNumber_TaskLabel_ChunkNumber. 

85 

86 Parameters 

87 ---------- 

88 s : `str` 

89 The input string from which to extract the task name. 

90 

91 Returns 

92 ------- 

93 taskname : `str` 

94 The extracted task name as per the rules above. 

95 """ 

96 # remove surrounding quotes/spaces if present 

97 s = s.strip().strip("'\"") 

98 

99 # find all occurrences of underscore + digits + underscore, 

100 # take the last one 

101 matches = re.findall(r"_(\d+)_", s) 

102 if matches: 

103 last_number = matches[-1] 

104 last_pos = s.rfind(f"_{last_number}_") + len(f"_{last_number}_") 

105 taskname = s[last_pos:] 

106 return taskname 

107 

108 # fallback: if no such pattern, return everything 

109 taskname = s 

110 return taskname 

111 

112 

113def aggregate_by_basename(job_summary, exit_code_summary, run_summary): 

114 """Aggregate job exit code and run summaries by 

115 their base label (basename). 

116 

117 Parameters 

118 ---------- 

119 job_summary : `dict` [`str`, `dict` [`str`, `int`]] 

120 A mapping of job labels to state-count mappings. 

121 exit_code_summary : `dict` [`str`, `list` [`int`]] 

122 A mapping of job labels to lists of exit codes. 

123 run_summary : `str` 

124 A semicolon-separated string of job summaries 

125 where each entry has the format "<label>:<count>". 

126 

127 Returns 

128 ------- 

129 aggregated_jobs : `dict` [`str`, `dict` [`str`, `int`]] 

130 A dictionary mapping each base label to the summed job state counts 

131 across all matching labels. 

132 aggregated_exits : `dict` [`str`, `list` [`int`]] 

133 A dictionary mapping each base label to a combined list of exit codes 

134 from all matching labels. 

135 aggregated_run : `str` 

136 A semicolon-separated string with aggregated job counts by base label. 

137 """ 

138 

139 def base_label(label): 

140 return re.sub(r"_\d+$", "", label) 

141 

142 aggregated_jobs = {} 

143 aggregated_exits = {} 

144 

145 for label, states in job_summary.items(): 

146 base = base_label(label) 

147 if base not in aggregated_jobs: 

148 aggregated_jobs[base] = dict.fromkeys(WmsStates, 0) 

149 for state, count in states.items(): 

150 aggregated_jobs[base][state] += count 

151 

152 for label, codes in exit_code_summary.items(): 

153 base = base_label(label) 

154 aggregated_exits.setdefault(base, []).extend(codes) 

155 

156 aggregated = {} 

157 for entry in run_summary.split(";"): 

158 entry = entry.strip() 

159 if not entry: 

160 continue 

161 try: 

162 label, num = entry.split(":") 

163 num = int(num) 

164 except ValueError: 

165 continue 

166 

167 base = base_label(label) 

168 aggregated[base] = aggregated.get(base, 0) + num 

169 

170 aggregated_run = ";".join(f"{base}:{count}" for base, count in aggregated.items()) 

171 return aggregated_jobs, aggregated_exits, aggregated_run 

172 

173 

174def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_workers): 

175 """Brings locally generated files into Cloud for further 

176 utilization them on the edge nodes. 

177 

178 Parameters 

179 ---------- 

180 files_to_stage : `dict` [`str`, `str`] 

181 Files which need to be copied to a workflow staging area. 

182 file_distribution_uri : `ResourcePath` 

183 Path on the edge node accessed storage, 

184 including access protocol, bucket name to place files. 

185 max_copy_workers : `int` 

186 Maximum number of workers for copying files. 

187 

188 Raises 

189 ------ 

190 RuntimeError 

191 Raised when error copying files to the distribution point. 

192 """ 

193 files_to_copy = {} 

194 

195 # In case there are folders we iterate over its content 

196 for local_pfn in files_to_stage.values(): 

197 folder_name = os.path.basename(os.path.normpath(local_pfn)) 

198 if os.path.isdir(local_pfn): 

199 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True) 

200 files_in_folder = ResourcePath.findFileResources([local_pfn]) 

201 for file in files_in_folder: 

202 file_name = file.basename() 

203 files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False) 

204 else: 

205 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False) 

206 files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri 

207 

208 copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers) 

209 future_file_copy = [] 

210 for src, trgt in files_to_copy.items(): 

211 _LOG.debug("Staging %s to %s", src, trgt) 

212 # S3 clients explicitly instantiate here to overpass this 

213 # https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe 

214 trgt.exists() 

215 future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy")) 

216 

217 for future in concurrent.futures.as_completed(future_file_copy): 

218 if future.result() is not None: 

219 raise RuntimeError("Error of placing files to the distribution point") 

220 

221 

222def get_idds_client(config): 

223 """Get the idds client. 

224 

225 Parameters 

226 ---------- 

227 config : `lsst.ctrl.bps.BpsConfig` 

228 BPS configuration. 

229 

230 Returns 

231 ------- 

232 idds_client: `idds.client.clientmanager.ClientManager` 

233 The iDDS ClientManager object. 

234 """ 

235 idds_server = None 

236 if isinstance(config, BpsConfig): 

237 _, idds_server = config.search("iddsServer", opt={"default": None}) 

238 elif isinstance(config, dict) and "iddsServer" in config: 

239 idds_server = config["iddsServer"] 

240 # if idds_server is None, a default value on the panda relay service 

241 # will be used 

242 idds_client = pandaclient.idds_api.get_api( 

243 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True 

244 ) 

245 return idds_client 

246 

247 

248def get_idds_result(ret): 

249 """Parse the results returned from iDDS. 

250 

251 Parameters 

252 ---------- 

253 ret : `tuple` [`int`, `tuple` [`bool`, payload ]] 

254 The first part ``ret[0]`` is the status of PanDA relay service. 

255 The part of ``ret[1][0]`` is the status of iDDS service. 

256 The part of ``ret[1][1]`` is the returned payload. 

257 If ``ret[1][0]`` is `False`, ``ret[1][1]`` can be error messages. 

258 

259 Returns 

260 ------- 

261 status: `bool` 

262 The status of iDDS calls. 

263 result: `int` or `list` or `dict` or `None` 

264 The result returned from iDDS. `None` if error state. 

265 error: `str` or `None` 

266 Error messages. `None` if no error state. 

267 """ 

268 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html 

269 if not isinstance(ret, list | tuple) or ret[0] != 0: 

270 # Something wrong with the PanDA relay service. 

271 # The call may not be delivered to iDDS. 

272 status = False 

273 result = None 

274 error = f"PanDA relay service returns errors: {ret}" 

275 else: 

276 if ret[1][0]: 

277 status = True 

278 result = ret[1][1] 

279 error = None 

280 if isinstance(result, str) and "Authentication no permission" in result: 

281 status = False 

282 result = None 

283 error = result 

284 else: 

285 # iDDS returns errors 

286 status = False 

287 result = None 

288 error = f"iDDS returns errors: {ret[1][1]}" 

289 return status, result, error 

290 

291 

292def idds_call_with_check(func, *, func_name: str, request_id: int, **kwargs): 

293 """Call an iDDS client function, log, and check the return code. 

294 

295 Parameters 

296 ---------- 

297 func : `~collections.abc.Callable` 

298 The iDDS client function to call. 

299 func_name : `str` 

300 Name used for logging. 

301 request_id : `int` 

302 The request or workflow ID. 

303 **kwargs 

304 Additional keyword arguments passed to the function. 

305 

306 Returns 

307 ------- 

308 ret : `~typing.Any` 

309 The return value from the iDDS client function. 

310 """ 

311 call_kwargs = dict(kwargs) 

312 if request_id is not None: 

313 call_kwargs["request_id"] = request_id 

314 

315 ret = func(**call_kwargs) 

316 

317 _LOG.debug("PanDA %s returned = %s", func_name, str(ret)) 

318 

319 request_status = ret[0] 

320 if request_status != 0: 

321 raise RuntimeError(f"Error calling {func_name}: {ret} for id: {request_id}") 

322 

323 return ret 

324 

325 

326def _make_pseudo_filename(config, gwjob): 

327 """Make the job pseudo filename. 

328 

329 Parameters 

330 ---------- 

331 config : `lsst.ctrl.bps.BpsConfig` 

332 BPS configuration. 

333 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

334 Job for which to create the pseudo filename. 

335 

336 Returns 

337 ------- 

338 pseudo_filename : `str` 

339 The pseudo filename for the given job. 

340 """ 

341 cmd_line_embedder = CommandLineEmbedder(config) 

342 _, pseudo_filename = cmd_line_embedder.substitute_command_line( 

343 gwjob.executable.src_uri + " " + gwjob.arguments, gwjob.cmdvals, gwjob.name, [] 

344 ) 

345 return pseudo_filename 

346 

347 

348def _make_doma_work( 

349 config, 

350 generic_workflow, 

351 gwjob, 

352 task_count, 

353 task_chunk, 

354 enable_event_service=False, 

355 enable_job_name_map=False, 

356 order_id_map_files=None, 

357 es_label=None, 

358 max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB, 

359 max_wms_job_wall_time=None, 

360 remote_filename=None, 

361 qnode_map_filename=None, 

362): 

363 """Make the DOMA Work object for a PanDA task. 

364 

365 Parameters 

366 ---------- 

367 config : `lsst.ctrl.bps.BpsConfig` 

368 BPS configuration. 

369 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

370 The workflow. 

371 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

372 Job representing the jobs for the PanDA task. 

373 task_count : `int` 

374 Count of PanDA tasks used when making unique names. 

375 task_chunk : `int` 

376 Count of chunk of a PanDA tasks used when making unique names. 

377 enable_event_service : `bool`, optional 

378 ???. 

379 enable_job_name_map : `bool`, optional 

380 ???. 

381 order_id_map_files : `typing.Any`, optional 

382 ???. 

383 es_label : `typing.Any`, optional 

384 ???. 

385 max_payloads_per_panda_job : `int`, optional 

386 ???. 

387 max_wms_job_wall_time : `typing.Any`, optional 

388 ???. 

389 remote_filename : `typing.Any`, optional 

390 ???. 

391 qnode_map_filename : `typing.Any`, optional 

392 ???. 

393 

394 Returns 

395 ------- 

396 work : `idds.doma.workflowv2.domapandawork.DomaPanDAWork` 

397 The client representation of a PanDA task. 

398 local_pfns : `dict` [`str`, `str`] 

399 Files which need to be copied to a workflow staging area. 

400 """ 

401 if order_id_map_files is None: 

402 order_id_map_files = {} 

403 _LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob) 

404 cvals = {"curr_cluster": gwjob.label} 

405 _, site = config.search("computeSite", opt={"curvals": cvals, "required": True}) 

406 cvals["curr_site"] = site 

407 cvals["curr_pipetask"] = gwjob.label 

408 _, processing_type = config.search( 

409 "processingType", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROCESSING_TYPE} 

410 ) 

411 if gwjob.label in ["finalJob", "customJob"]: 

412 _, nonpipetask = config.search(gwjob.label) 

413 default_type = "Rubin_Merge" 

414 if gwjob.label == "customJob": 

415 default_type = PANDA_DEFAULT_PROCESSING_TYPE 

416 processing_type = nonpipetask["processingType"] if nonpipetask["processingType"] else default_type 

417 _, task_type = config.search("taskType", opt={"curvals": cvals, "default": PANDA_DEFAULT_TASK_TYPE}) 

418 _, prod_source_label = config.search( 

419 "prodSourceLabel", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROD_SOURCE_LABEL} 

420 ) 

421 _, vo = config.search("vo", opt={"curvals": cvals, "default": PANDA_DEFAULT_VO}) 

422 

423 _, file_distribution_end_point = config.search( 

424 "fileDistributionEndPoint", opt={"curvals": cvals, "default": None} 

425 ) 

426 

427 _, file_distribution_end_point_default = config.search( 

428 "fileDistributionEndPointDefault", opt={"curvals": cvals, "default": None} 

429 ) 

430 

431 task_rss = gwjob.request_memory if gwjob.request_memory else PANDA_DEFAULT_RSS 

432 task_rss_retry_step = task_rss * gwjob.memory_multiplier if gwjob.memory_multiplier else 0 

433 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss 

434 

435 # Assume input files are same across task 

436 local_pfns = {} 

437 direct_io_files = set() 

438 

439 if gwjob.executable.transfer_executable: 

440 local_pfns["job_executable"] = gwjob.executable.src_uri 

441 job_executable = f"./{os.path.basename(gwjob.executable.src_uri)}" 

442 else: 

443 job_executable = gwjob.executable.src_uri 

444 cmd_line_embedder = CommandLineEmbedder(config) 

445 _LOG.debug( 

446 "job %s inputs = %s, outputs = %s", 

447 gwjob.name, 

448 generic_workflow.get_job_inputs(gwjob.name), 

449 generic_workflow.get_job_outputs(gwjob.name), 

450 ) 

451 

452 job_env = "" 

453 if gwjob.environment: 

454 for key, value in gwjob.environment.items(): 

455 try: 

456 sub_value = value.format_map(gwjob.cmdvals) 

457 except (KeyError, TypeError) as exc: 

458 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc)) 

459 raise 

460 job_env += f"export {key}={sub_value}; " 

461 

462 cmd_line, _ = cmd_line_embedder.substitute_command_line( 

463 job_env + job_executable + " " + gwjob.arguments, 

464 gwjob.cmdvals, 

465 gwjob.name, 

466 generic_workflow.get_job_inputs(gwjob.name) + generic_workflow.get_job_outputs(gwjob.name), 

467 ) 

468 

469 my_log = f"enable_event_service {enable_event_service} for {gwjob.label}" 

470 _LOG.info(my_log) 

471 if enable_event_service: 

472 if gwjob.request_walltime and max_wms_job_wall_time: 

473 my_log = ( 

474 f"requestWalltime({gwjob.request_walltime}) " 

475 f"and maxWmsJobWalltime({max_wms_job_wall_time}) are set, " 

476 "max_payloads_per_panda_job is int(max_wms_job_wall_time / gwjob.request_walltime), " 

477 "ignore maxPayloadsPerPandaJob." 

478 ) 

479 _LOG.info(my_log) 

480 max_payloads_per_panda_job = int(max_wms_job_wall_time / gwjob.request_walltime) 

481 if max_payloads_per_panda_job < 2: 

482 my_log = ( 

483 f"max_payloads_per_panda_job ({max_payloads_per_panda_job}) is too small, " 

484 "disable EventService" 

485 ) 

486 _LOG.info(my_log) 

487 enable_event_service = False 

488 

489 maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME 

490 if enable_event_service: 

491 if gwjob.request_walltime and max_payloads_per_panda_job: 

492 maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job 

493 elif max_wms_job_wall_time: 

494 maxwalltime = max_wms_job_wall_time 

495 

496 if enable_event_service or enable_job_name_map: 

497 for es_name in order_id_map_files: 

498 local_pfns[es_name] = order_id_map_files[es_name] 

499 

500 for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True): 

501 local_pfns[gwfile.name] = gwfile.src_uri 

502 if os.path.isdir(gwfile.src_uri): 

503 # this is needed to make isdir function working 

504 # properly in ButlerURL instance on the edge node 

505 local_pfns[gwfile.name] += "/" 

506 

507 if gwfile.job_access_remote: 

508 direct_io_files.add(gwfile.name) 

509 

510 if qnode_map_filename: 

511 local_pfns.update(qnode_map_filename) 

512 

513 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False) 

514 

515 if not direct_io_files: 

516 if submit_cmd: 

517 direct_io_files.add(remote_filename) 

518 else: 

519 direct_io_files.add("cmdlineplaceholder") 

520 

521 lsst_temp = "LSST_RUN_TEMP_SPACE" 

522 if lsst_temp in file_distribution_end_point and lsst_temp not in os.environ: 

523 file_distribution_end_point = file_distribution_end_point_default 

524 if submit_cmd and not file_distribution_end_point: 

525 file_distribution_end_point = "FileDistribution" 

526 

527 executable = add_decoder_prefix( 

528 config, cmd_line, file_distribution_end_point, (local_pfns, direct_io_files) 

529 ) 

530 work = DomaPanDAWork( 

531 executable=executable, 

532 primary_input_collection={ 

533 "scope": "pseudo_dataset", 

534 "name": f"pseudo_input_collection#{task_count}", 

535 }, 

536 output_collections=[{"scope": "pseudo_dataset", "name": f"pseudo_output_collection#{task_count}"}], 

537 log_collections=[], 

538 dependency_map=[], 

539 task_name=f"{generic_workflow.name}_{task_count:02d}_{gwjob.label}_{task_chunk:02d}", 

540 task_queue=gwjob.queue, 

541 task_log={ 

542 "destination": "local", 

543 "value": "log.tgz", 

544 "dataset": "PandaJob_#{pandaid}/", 

545 "token": "local", 

546 "param_type": "log", 

547 "type": "template", 

548 }, 

549 encode_command_line=True, 

550 task_rss=task_rss, 

551 task_rss_retry_offset=task_rss_retry_offset, 

552 task_rss_retry_step=task_rss_retry_step, 

553 task_rss_max=gwjob.request_memory_max if gwjob.request_memory_max else PANDA_DEFAULT_RSS_MAX, 

554 task_cloud=gwjob.compute_cloud if gwjob.compute_cloud else PANDA_DEFAULT_CLOUD, 

555 task_site=site, 

556 task_priority=int(gwjob.priority) if gwjob.priority else PANDA_DEFAULT_PRIORITY, 

557 core_count=gwjob.request_cpus if gwjob.request_cpus else PANDA_DEFAULT_CORE_COUNT, 

558 working_group=gwjob.accounting_group, 

559 processing_type=processing_type, 

560 task_type=task_type, 

561 prodSourceLabel=prod_source_label, 

562 vo=vo, 

563 es=enable_event_service, 

564 es_label=es_label, 

565 max_events_per_job=max_payloads_per_panda_job, 

566 maxattempt=gwjob.number_of_retries if gwjob.number_of_retries else PANDA_DEFAULT_MAX_ATTEMPTS, 

567 maxwalltime=maxwalltime, 

568 ) 

569 return work, local_pfns 

570 

571 

572def add_final_idds_work( 

573 config, generic_workflow, idds_client_workflow, dag_sink_work, task_count, task_chunk 

574): 

575 """Add the special final PanDA task to the client workflow. 

576 

577 Parameters 

578 ---------- 

579 config : `lsst.ctrl.bps.BpsConfig` 

580 BPS configuration. 

581 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

582 Generic workflow in which to find the final job. 

583 idds_client_workflow : `idds.workflowv2.workflow.Workflow` 

584 The iDDS client representation of the workflow to which the final task 

585 is added. 

586 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`] 

587 The work nodes in the client workflow which have no successors. 

588 task_count : `int` 

589 Count of PanDA tasks used when making unique names. 

590 task_chunk : `int` 

591 Count of chunk of a PanDA tasks used when making unique names. 

592 

593 Returns 

594 ------- 

595 files : `dict` [`str`, `str`] 

596 Files which need to be copied to a workflow staging area. 

597 

598 Raises 

599 ------ 

600 NotImplementedError 

601 Raised if final job in GenericWorkflow is itself a workflow. 

602 TypeError 

603 Raised if final job in GenericWorkflow is invalid type. 

604 """ 

605 files = {} 

606 

607 # If final job exists in generic workflow, create DAG final job 

608 final = generic_workflow.get_final() 

609 if final: 

610 if isinstance(final, GenericWorkflow): 

611 raise NotImplementedError("PanDA plugin does not support a workflow as the final job") 

612 

613 if not isinstance(final, GenericWorkflowJob): 

614 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})") 

615 

616 dag_final_work, files = _make_doma_work( 

617 config, 

618 generic_workflow, 

619 final, 

620 task_count, 

621 task_chunk, 

622 ) 

623 pseudo_filename = "pure_pseudoinput+qgraphNodeId:+qgraphId:" 

624 dag_final_work.dependency_map.append( 

625 {"name": pseudo_filename, "submitted": False, "dependencies": []} 

626 ) 

627 idds_client_workflow.add_work(dag_final_work) 

628 conditions = [] 

629 for work in dag_sink_work: 

630 conditions.append(work.is_terminated) 

631 and_cond = AndCondition(conditions=conditions, true_works=[dag_final_work]) 

632 idds_client_workflow.add_condition(and_cond) 

633 else: 

634 _LOG.debug("No final job in GenericWorkflow") 

635 return files 

636 

637 

638def convert_exec_string_to_hex(cmdline): 

639 """Convert the command line into hex representation. 

640 

641 This step is currently involved because large blocks of command lines 

642 including special symbols passed to the pilot/container. To make sure 

643 the 1 to 1 matching and pass by the special symbol stripping 

644 performed by the Pilot we applied the hexing. 

645 

646 Parameters 

647 ---------- 

648 cmdline : `str` 

649 UTF-8 command line string. 

650 

651 Returns 

652 ------- 

653 hex : `str` 

654 Hex representation of string. 

655 """ 

656 return binascii.hexlify(cmdline.encode()).decode("utf-8") 

657 

658 

659def add_decoder_prefix(config, cmd_line, distribution_path, files): 

660 """Compose the command line sent to the pilot from the functional part 

661 (the actual SW running) and the middleware part (containers invocation). 

662 

663 Parameters 

664 ---------- 

665 config : `lsst.ctrl.bps.BpsConfig` 

666 Configuration information. 

667 cmd_line : `str` 

668 UTF-8 based functional part of the command line. 

669 distribution_path : `str` 

670 URI of path where all files are located for distribution. 

671 files : `tuple` [`dict` [`str`, `str`], `list` [`str`]] 

672 File names needed for a task (copied local, direct access). 

673 

674 Returns 

675 ------- 

676 decoder_prefix : `str` 

677 Full command line to be executed on the edge node. 

678 """ 

679 # Manipulate file paths for placement on cmdline 

680 files_plc_hldr = {} 

681 for key, pfn in files[0].items(): 

682 if pfn.endswith("/"): 

683 files_plc_hldr[key] = os.path.basename(pfn[:-1]) 

684 isdir = True 

685 else: 

686 files_plc_hldr[key] = os.path.basename(pfn) 

687 _, extension = os.path.splitext(pfn) 

688 isdir = os.path.isdir(pfn) or (key == "butlerConfig" and extension != "yaml") 

689 if isdir: 

690 # this is needed to make isdir function working 

691 # properly in ButlerURL instance on the egde node 

692 files_plc_hldr[key] += "/" 

693 _LOG.debug("files_plc_hldr[%s] = %s", key, files_plc_hldr[key]) 

694 

695 cmdline_hex = convert_exec_string_to_hex(cmd_line) 

696 _, runner_command = config.search("runnerCommand", opt={"replaceEnvVars": False, "expandEnvVars": False}) 

697 order_id_map_filename = files[0].get("orderIdMapFilename", None) 

698 if order_id_map_filename: 

699 order_id_map_filename = os.path.basename(order_id_map_filename) 

700 order_id_map_filename = os.path.join(distribution_path, order_id_map_filename) 

701 runner_command = runner_command.replace("orderIdMapFilename", order_id_map_filename) 

702 runner_command = runner_command.replace("\n", " ") 

703 decoder_prefix = runner_command.replace( 

704 "_cmd_line_", 

705 str(cmdline_hex) 

706 + " ${IN/L} " 

707 + distribution_path 

708 + " " 

709 + "+".join(f"{k}:{v}" for k, v in files_plc_hldr.items()) 

710 + " " 

711 + "+".join(files[1]), 

712 ) 

713 return decoder_prefix 

714 

715 

716def add_idds_work(config, generic_workflow, idds_workflow): 

717 """Convert GenericWorkflowJobs to iDDS work and add them to the iDDS 

718 workflow. 

719 

720 Parameters 

721 ---------- 

722 config : `lsst.ctrl.bps.BpsConfig` 

723 BPS configuration. 

724 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

725 Generic workflow containing jobs to convert. 

726 idds_workflow : `idds.workflowv2.workflow.Workflow` 

727 The iDDS workflow to which the converted jobs should be added. 

728 

729 Returns 

730 ------- 

731 files_to_pre_stage : `dict` [`str`, `str`] 

732 Files that need to be copied to the staging area before submission. 

733 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`] 

734 The work nodes in the client workflow which have no successors. 

735 task_count : `int` 

736 Number of tasks in iDDS workflow used for unique task names. 

737 

738 Raises 

739 ------ 

740 RuntimeError 

741 If cannot recover from dependency issues after pass through workflow. 

742 """ 

743 # event service 

744 _, enable_event_service = config.search("enableEventService", opt={"default": None}) 

745 _, enable_qnode_map = config.search("enableQnodeMap", opt={"default": None}) 

746 _, max_payloads_per_panda_job = config.search( 

747 "maxPayloadsPerPandaJob", opt={"default": PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB} 

748 ) 

749 _, max_wms_job_wall_time = config.search("maxWmsJobWalltime", opt={"default": None}) 

750 my_log = ( 

751 f"enableEventService: {enable_event_service}, maxPayloadsPerPandaJob: {max_payloads_per_panda_job}" 

752 ) 

753 _LOG.info(my_log) 

754 

755 # job name map: Use a short job name to map the long job name 

756 _, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None}) 

757 _LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}") 

758 if enable_event_service and not enable_job_name_map: 

759 enable_job_name_map = True 

760 my_log = "enable_event_service is set, set enable_job_name_map True." 

761 _LOG.info(my_log) 

762 

763 # Limit number of jobs in single PanDA task 

764 _, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK}) 

765 

766 files_to_pre_stage = {} 

767 dag_sink_work = [] # Workflow sink nodes that need to be connected to final task 

768 job_to_task = {} 

769 job_to_pseudo_filename = {} 

770 task_count = 0 # Task number/ID in idds workflow used for unique name 

771 remote_archive_filename = None 

772 

773 submit_path = config["submitPath"] 

774 

775 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False) 

776 if submit_cmd: 

777 files = generic_workflow.get_executables(data=False, transfer_only=True) 

778 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz" 

779 archive_filename = create_archive_file(submit_path, archive_filename, files) 

780 remote_archive_filename = copy_files_to_pandacache(archive_filename) 

781 

782 order_id_map_files = {} 

783 name_works = {} 

784 order_id_map = {} 

785 job_name_to_order_id_map = {} 

786 order_id_map_file = None 

787 max_payloads_per_panda_job_by_label = {} 

788 if enable_event_service: 

789 enable_event_service = enable_event_service.split(",") 

790 enable_event_service_tmp = [] 

791 for es_def in enable_event_service: 

792 if ":" in es_def: 

793 es_label, m_payloads = es_def.split(":") 

794 else: 

795 es_label, m_payloads = es_def, max_payloads_per_panda_job 

796 es_label = es_label.strip() 

797 enable_event_service_tmp.append(es_label) 

798 max_payloads_per_panda_job_by_label[es_label] = int(m_payloads) 

799 enable_event_service = enable_event_service_tmp 

800 if enable_job_name_map: 

801 _, order_id_map_filename = config.search( 

802 "orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE} 

803 ) 

804 order_id_map_file = os.path.join(submit_path, order_id_map_filename) 

805 order_id_map_files = {"orderIdMapFilename": order_id_map_file} 

806 files_to_pre_stage.update(order_id_map_files) 

807 

808 # To avoid dying due to optimizing number of times through workflow, 

809 # catch dependency issues to loop through again later. 

810 jobs_with_dependency_issues = {} 

811 

812 # Initialize quantum node map 

813 qnode_map = {} 

814 qnode_map_filename = None 

815 if enable_qnode_map: 

816 qnode_map_file = os.path.join(submit_path, "qnode_map.json") 

817 qnode_map_filename = {"qnodemap": qnode_map_file} 

818 files_to_pre_stage.update(qnode_map_filename) 

819 

820 # Assume jobs with same label share config values 

821 for job_label in generic_workflow.labels: 

822 _LOG.debug("job_label = %s", job_label) 

823 

824 if enable_job_name_map: 

825 order_id_map[job_label] = {} 

826 job_name_to_order_id_map[job_label] = {} 

827 

828 # Add each job with a particular label to a corresponding PanDA task 

829 # A PanDA task has a limit on number of jobs, so break into multiple 

830 # PanDA tasks if needed. 

831 job_count = 0 # Number of jobs in idds task used for task chunking 

832 task_chunk = 1 # Task chunk number within job label used for unique name 

833 work = None 

834 order_id = -1 

835 

836 # Instead of changing code to make chunks up front and round-robin 

837 # assign jobs to chunks, for now keeping chunk creation in loop 

838 # but using knowledge of how many chunks there will be to set better 

839 # maximum number of jobs in a chunk for more even distribution. 

840 jobs_by_label = generic_workflow.get_jobs_by_label(job_label) 

841 num_chunks = -(-len(jobs_by_label) // max_jobs_per_task) # ceil 

842 max_jobs_per_task_this_label = -(-len(jobs_by_label) // num_chunks) 

843 _LOG.debug( 

844 "For job_label = %s, num jobs = %s, num_chunks = %s, max_jobs = %s", 

845 job_label, 

846 len(jobs_by_label), 

847 num_chunks, 

848 max_jobs_per_task_this_label, 

849 ) 

850 for gwjob in jobs_by_label: 

851 order_id += 1 

852 pseudo_filename = _make_pseudo_filename(config, gwjob) 

853 job_to_pseudo_filename[gwjob.name] = pseudo_filename 

854 if enable_job_name_map: 

855 order_id_map[job_label][str(order_id)] = pseudo_filename 

856 job_name_to_order_id_map[job_label][gwjob.name] = str(order_id) 

857 

858 job_count += 1 

859 if job_count > max_jobs_per_task_this_label: 

860 job_count = 1 

861 task_chunk += 1 

862 

863 if job_count == 1: 

864 # Create new PanDA task object 

865 task_count += 1 

866 work_enable_event_service = False 

867 if enable_event_service and job_label in enable_event_service: 

868 work_enable_event_service = True 

869 max_payloads_per_panda_job_current = max_payloads_per_panda_job_by_label.get( 

870 job_label, max_payloads_per_panda_job 

871 ) 

872 work, files = _make_doma_work( 

873 config, 

874 generic_workflow, 

875 gwjob, 

876 task_count, 

877 task_chunk, 

878 enable_event_service=work_enable_event_service, 

879 enable_job_name_map=enable_job_name_map, 

880 order_id_map_files=order_id_map_files, 

881 es_label=job_label, 

882 max_payloads_per_panda_job=max_payloads_per_panda_job_current, 

883 max_wms_job_wall_time=max_wms_job_wall_time, 

884 remote_filename=remote_archive_filename, 

885 qnode_map_filename=qnode_map_filename, 

886 ) 

887 work.dependency_tasks = [] 

888 name_works[work.task_name] = work 

889 files_to_pre_stage.update(files) 

890 idds_workflow.add_work(work) 

891 if generic_workflow.out_degree(gwjob.name) == 0: 

892 dag_sink_work.append(work) 

893 

894 if enable_qnode_map: 

895 job_name_PH = "PH:" + gwjob.name 

896 job_to_pseudo_filename[gwjob.name] = job_name_PH 

897 qnode_map[job_name_PH] = pseudo_filename 

898 

899 job_to_task[gwjob.name] = work.get_work_name() 

900 deps = [] 

901 missing_deps = False 

902 for parent_job_name in generic_workflow.predecessors(gwjob.name): 

903 if parent_job_name not in job_to_task: 

904 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys()) 

905 missing_deps = True 

906 break 

907 else: 

908 if enable_job_name_map: 

909 parent_job = generic_workflow.get_job(parent_job_name) 

910 parent_job_label = parent_job.label 

911 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name] 

912 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}" 

913 else: 

914 inputname = job_to_pseudo_filename[parent_job_name] 

915 

916 parent_task_name = job_to_task[parent_job_name] 

917 deps.append( 

918 { 

919 "task": parent_task_name, 

920 "inputname": inputname, 

921 } 

922 ) 

923 if parent_task_name not in work.dependency_tasks: 

924 work.dependency_tasks.append(parent_task_name) 

925 if not missing_deps: 

926 j_name = job_to_pseudo_filename[gwjob.name] 

927 f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name 

928 work.dependency_map.append( 

929 { 

930 "name": f_name, 

931 "order_id": order_id, 

932 "dependencies": deps, 

933 } 

934 ) 

935 else: 

936 jobs_with_dependency_issues[gwjob.name] = { 

937 "work": work, 

938 "order_id": order_id, 

939 "label": job_label, 

940 } 

941 

942 if enable_qnode_map: 

943 with open(qnode_map_file, "w", encoding="utf-8") as f: 

944 json.dump(qnode_map, f, indent=2) 

945 

946 # If there were any issues figuring out dependencies through earlier loop 

947 if jobs_with_dependency_issues: 

948 _LOG.warning("Could not prepare workflow in single pass. Please notify developers.") 

949 _LOG.info("Trying to recover...") 

950 for job_name, work_item in jobs_with_dependency_issues.items(): 

951 deps = [] 

952 work = work_item["work"] 

953 order_id = work_item["order_id"] 

954 job_label = work_item["label"] 

955 

956 for parent_job_name in generic_workflow.predecessors(job_name): 

957 if parent_job_name not in job_to_task: 

958 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys()) 

959 raise RuntimeError( 

960 "Could not recover from dependency issues ({job_name} missing {parent_job_name})." 

961 ) 

962 if enable_job_name_map: 

963 parent_job = generic_workflow.get_job(parent_job_name) 

964 parent_job_label = parent_job.label 

965 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name] 

966 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}" 

967 else: 

968 inputname = job_to_pseudo_filename[parent_job_name] 

969 

970 parent_task_name = job_to_task[parent_job_name] 

971 deps.append( 

972 { 

973 "task": parent_task_name, 

974 "inputname": inputname, 

975 } 

976 ) 

977 if parent_task_name not in work.dependency_tasks: 

978 work.dependency_tasks.append(parent_task_name) 

979 

980 work.dependency_map.append( 

981 { 

982 "name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else job_name, 

983 "order_id": order_id, 

984 "dependencies": deps, 

985 } 

986 ) 

987 

988 _LOG.info("Successfully recovered.") 

989 

990 for task_name in name_works: 

991 work = name_works[task_name] 

992 # trigger the setter function which will validate the dependency_map: 

993 # 1) check the name length to avoid the the name too long, 

994 # 2) check to avoid duplicated items. 

995 sorted_dep_map = sorted(work.dependency_map, key=lambda x: x["order_id"]) 

996 work.dependency_map = sorted_dep_map 

997 

998 if enable_job_name_map: 

999 with open(order_id_map_file, "w") as f: 

1000 json.dump(order_id_map, f) 

1001 

1002 return files_to_pre_stage, dag_sink_work, task_count 

1003 

1004 

1005def create_archive_file(submit_path, archive_filename, files): 

1006 if not archive_filename.startswith("/"): 

1007 archive_filename = os.path.join(submit_path, archive_filename) 

1008 

1009 with tarfile.open(archive_filename, "w:gz", dereference=True) as tar: 

1010 for local_file in files: 

1011 base_name = os.path.basename(local_file) 

1012 tar.add(local_file, arcname=os.path.basename(base_name)) 

1013 return archive_filename 

1014 

1015 

1016def copy_files_to_pandacache(filename): 

1017 from pandaclient import Client 

1018 

1019 attempt = 0 

1020 max_attempts = 3 

1021 done = False 

1022 while attempt < max_attempts and not done: 

1023 status, out = Client.putFile(filename, True) 

1024 if status == 0: 

1025 done = True 

1026 print(f"copy_files_to_pandacache: status: {status}, out: {out}") 

1027 if out.startswith("NewFileName:"): 

1028 # found the same input sandbox to reuse 

1029 filename = out.split(":")[-1] 

1030 elif out != "True": 

1031 print(out) 

1032 return None 

1033 

1034 filename = os.path.basename(filename) 

1035 cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache") 

1036 filename = os.path.join(cache_path, filename) 

1037 return filename 

1038 

1039 

1040def download_extract_archive(filename, prefix=None): 

1041 """Download and extract the tarball from pandacache. 

1042 

1043 Parameters 

1044 ---------- 

1045 filename : `str` 

1046 The filename to download. 

1047 prefix : `str`, optional 

1048 The target directory the tarball will be downloaded and extracted to. 

1049 If None (default), the current directory will be used. 

1050 """ 

1051 archive_basename = os.path.basename(filename) 

1052 target_dir = prefix if prefix is not None else os.getcwd() 

1053 full_output_filename = os.path.join(target_dir, archive_basename) 

1054 

1055 if filename.startswith("https:"): 

1056 panda_cache_url = os.path.dirname(os.path.dirname(filename)) 

1057 os.environ["PANDACACHE_URL"] = panda_cache_url 

1058 elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ: 

1059 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"] 

1060 panda_cache_url = os.environ.get("PANDACACHE_URL", None) 

1061 print(f"PANDACACHE_URL: {panda_cache_url}") 

1062 

1063 # The import of PanDA client must happen *after* the PANDACACHE_URL is set. 

1064 # Otherwise, the PanDA client the environment setting will not be parsed. 

1065 from pandaclient import Client 

1066 

1067 attempt = 0 

1068 max_attempts = 3 

1069 while attempt < max_attempts: 

1070 status, output = Client.getFile(archive_basename, output_path=full_output_filename) 

1071 if status == 0: 

1072 break 

1073 if attempt <= 1: 

1074 secs = random.randint(1, 10) 

1075 elif attempt <= 2: 

1076 secs = random.randint(1, 60) 

1077 else: 

1078 secs = random.randint(1, 120) 

1079 time.sleep(secs) 

1080 print(f"Download archive file from pandacache status: {status}, output: {output}") 

1081 if status != 0: 

1082 raise RuntimeError("Failed to download archive file from pandacache") 

1083 with tarfile.open(full_output_filename, "r:gz") as f: 

1084 f.extractall(target_dir) 

1085 print(f"Extracted {full_output_filename} to {target_dir}") 

1086 os.remove(full_output_filename) 

1087 print(f"Removed {full_output_filename}") 

1088 

1089 

1090def get_task_parameter(config, remote_build, key): 

1091 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False} 

1092 _, value = remote_build.search(key, search_opt) 

1093 if not value: 

1094 _, value = config.search(key, search_opt) 

1095 return value 

1096 

1097 

1098def create_idds_build_workflow(**kwargs): 

1099 config = kwargs["config"] if "config" in kwargs else None 

1100 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None 

1101 config_file = kwargs["config_file"] if "config_file" in kwargs else None 

1102 config_file_base = os.path.basename(config_file) if config_file else None 

1103 compute_site = kwargs["compute_site"] if "compute_site" in kwargs else None 

1104 _, files = remote_build.search("files", opt={"default": []}) 

1105 submit_path = config["submitPath"] 

1106 files.append(config_file) 

1107 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz" 

1108 archive_filename = create_archive_file(submit_path, archive_filename, files) 

1109 _LOG.info(f"archive file name: {archive_filename}") 

1110 remote_filename = copy_files_to_pandacache(archive_filename) 

1111 _LOG.info(f"pandacache file: {remote_filename}") 

1112 

1113 _LOG.info(type(remote_build)) 

1114 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False} 

1115 cvals = {"LSST_VERSION": get_task_parameter(config, remote_build, "LSST_VERSION")} 

1116 cvals["custom_lsst_setup"] = get_task_parameter(config, remote_build, "custom_lsst_setup") 

1117 max_name_length = PANDA_DEFAULT_NAME_LENGTH 

1118 if "IDDS_MAX_NAME_LENGTH" in os.environ: 

1119 max_name_length = int(os.environ["IDDS_MAX_NAME_LENGTH"]) 

1120 cvals["IDDS_MAX_NAME_LENGTH"] = max_name_length 

1121 search_opt["curvals"] = cvals 

1122 _, executable = remote_build.search("runnerCommand", opt=search_opt) 

1123 executable = executable.replace("_download_cmd_line_", remote_filename) 

1124 executable = executable.replace("_build_cmd_line_", config_file_base) 

1125 executable = executable.replace("_compute_site_", compute_site or "") 

1126 

1127 task_cloud = get_task_parameter(config, remote_build, "computeCloud") 

1128 task_site = get_task_parameter(config, remote_build, "computeSite") 

1129 task_queue = get_task_parameter(config, remote_build, "queue") 

1130 task_rss = get_task_parameter(config, remote_build, "requestMemory") 

1131 task_rss_max = get_task_parameter(config, remote_build, "requestMemoryMax") 

1132 memory_multiplier = get_task_parameter(config, remote_build, "memoryMultiplier") 

1133 task_rss_retry_step = task_rss * memory_multiplier if memory_multiplier else 0 

1134 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss 

1135 nretries = get_task_parameter(config, remote_build, "numberOfRetries") 

1136 processing_type = get_task_parameter(config, remote_build, "processingType") 

1137 priority = get_task_parameter(config, remote_build, "priority") 

1138 _LOG.info("requestMemory: %s", task_rss) 

1139 _LOG.info("Site: %s", task_site) 

1140 # _LOG.info("executable: %s", executable) 

1141 # TODO: fill other parameters based on config 

1142 build_work = DomaPanDAWork( 

1143 executable=executable, 

1144 task_type="lsst_build", 

1145 primary_input_collection={"scope": "pseudo_dataset", "name": "pseudo_input_collection#1"}, 

1146 output_collections=[{"scope": "pseudo_dataset", "name": "pseudo_output_collection#1"}], 

1147 log_collections=[], 

1148 dependency_map=None, 

1149 task_name="build_task", 

1150 task_queue=task_queue, 

1151 encode_command_line=True, 

1152 prodSourceLabel="managed", 

1153 processing_type=processing_type, 

1154 task_log={ 

1155 "dataset": "PandaJob_#{pandaid}/", 

1156 "destination": "local", 

1157 "param_type": "log", 

1158 "token": "local", 

1159 "type": "template", 

1160 "value": "log.tgz", 

1161 }, 

1162 task_rss=task_rss if task_rss else PANDA_DEFAULT_RSS, 

1163 task_rss_max=task_rss_max if task_rss_max else PANDA_DEFAULT_RSS_MAX, 

1164 task_rss_retry_offset=task_rss_retry_offset, 

1165 task_rss_retry_step=task_rss_retry_step, 

1166 task_cloud=task_cloud, 

1167 task_site=task_site, 

1168 task_priority=int(priority) if priority else PANDA_DEFAULT_PRIORITY, 

1169 maxattempt=nretries if nretries > 0 else PANDA_DEFAULT_MAX_ATTEMPTS, 

1170 ) 

1171 

1172 workflow = IDDS_client_workflow() 

1173 

1174 workflow.add_work(build_work) 

1175 workflow.name = config["bps_defined"]["uniqProcName"] 

1176 return workflow