Coverage for python / lsst / ctrl / bps / panda / utils.py: 5%
509 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
1# This file is part of ctrl_bps_panda.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Utilities for bps PanDA plugin."""
30__all__ = [
31 "add_decoder_prefix",
32 "aggregate_by_basename",
33 "convert_exec_string_to_hex",
34 "copy_files_for_distribution",
35 "extract_taskname",
36 "get_idds_client",
37 "get_idds_result",
38 "idds_call_with_check",
39]
41import binascii
42import concurrent.futures
43import json
44import logging
45import os
46import random
47import re
48import tarfile
49import time
50import uuid
52import idds.common.utils as idds_utils
53import pandaclient.idds_api
54from idds.doma.workflowv2.domapandawork import DomaPanDAWork
55from idds.workflowv2.workflow import AndCondition
56from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
58from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob, WmsStates
59from lsst.ctrl.bps.panda.cmd_line_embedder import CommandLineEmbedder
60from lsst.ctrl.bps.panda.constants import (
61 PANDA_DEFAULT_CLOUD,
62 PANDA_DEFAULT_CORE_COUNT,
63 PANDA_DEFAULT_MAX_ATTEMPTS,
64 PANDA_DEFAULT_MAX_JOBS_PER_TASK,
65 PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
66 PANDA_DEFAULT_MAX_WALLTIME,
67 PANDA_DEFAULT_NAME_LENGTH,
68 PANDA_DEFAULT_ORDER_ID_MAP_FILE,
69 PANDA_DEFAULT_PRIORITY,
70 PANDA_DEFAULT_PROCESSING_TYPE,
71 PANDA_DEFAULT_PROD_SOURCE_LABEL,
72 PANDA_DEFAULT_RSS,
73 PANDA_DEFAULT_RSS_MAX,
74 PANDA_DEFAULT_TASK_TYPE,
75 PANDA_DEFAULT_VO,
76)
77from lsst.resources import ResourcePath
79_LOG = logging.getLogger(__name__)
82def extract_taskname(s: str) -> str:
83 """Extract the task name from a string that follows a pattern
84 CampaignName_timestamp_TaskNumber_TaskLabel_ChunkNumber.
86 Parameters
87 ----------
88 s : `str`
89 The input string from which to extract the task name.
91 Returns
92 -------
93 taskname : `str`
94 The extracted task name as per the rules above.
95 """
96 # remove surrounding quotes/spaces if present
97 s = s.strip().strip("'\"")
99 # find all occurrences of underscore + digits + underscore,
100 # take the last one
101 matches = re.findall(r"_(\d+)_", s)
102 if matches:
103 last_number = matches[-1]
104 last_pos = s.rfind(f"_{last_number}_") + len(f"_{last_number}_")
105 taskname = s[last_pos:]
106 return taskname
108 # fallback: if no such pattern, return everything
109 taskname = s
110 return taskname
113def aggregate_by_basename(job_summary, exit_code_summary, run_summary):
114 """Aggregate job exit code and run summaries by
115 their base label (basename).
117 Parameters
118 ----------
119 job_summary : `dict` [`str`, `dict` [`str`, `int`]]
120 A mapping of job labels to state-count mappings.
121 exit_code_summary : `dict` [`str`, `list` [`int`]]
122 A mapping of job labels to lists of exit codes.
123 run_summary : `str`
124 A semicolon-separated string of job summaries
125 where each entry has the format "<label>:<count>".
127 Returns
128 -------
129 aggregated_jobs : `dict` [`str`, `dict` [`str`, `int`]]
130 A dictionary mapping each base label to the summed job state counts
131 across all matching labels.
132 aggregated_exits : `dict` [`str`, `list` [`int`]]
133 A dictionary mapping each base label to a combined list of exit codes
134 from all matching labels.
135 aggregated_run : `str`
136 A semicolon-separated string with aggregated job counts by base label.
137 """
139 def base_label(label):
140 return re.sub(r"_\d+$", "", label)
142 aggregated_jobs = {}
143 aggregated_exits = {}
145 for label, states in job_summary.items():
146 base = base_label(label)
147 if base not in aggregated_jobs:
148 aggregated_jobs[base] = dict.fromkeys(WmsStates, 0)
149 for state, count in states.items():
150 aggregated_jobs[base][state] += count
152 for label, codes in exit_code_summary.items():
153 base = base_label(label)
154 aggregated_exits.setdefault(base, []).extend(codes)
156 aggregated = {}
157 for entry in run_summary.split(";"):
158 entry = entry.strip()
159 if not entry:
160 continue
161 try:
162 label, num = entry.split(":")
163 num = int(num)
164 except ValueError:
165 continue
167 base = base_label(label)
168 aggregated[base] = aggregated.get(base, 0) + num
170 aggregated_run = ";".join(f"{base}:{count}" for base, count in aggregated.items())
171 return aggregated_jobs, aggregated_exits, aggregated_run
174def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_workers):
175 """Brings locally generated files into Cloud for further
176 utilization them on the edge nodes.
178 Parameters
179 ----------
180 files_to_stage : `dict` [`str`, `str`]
181 Files which need to be copied to a workflow staging area.
182 file_distribution_uri : `ResourcePath`
183 Path on the edge node accessed storage,
184 including access protocol, bucket name to place files.
185 max_copy_workers : `int`
186 Maximum number of workers for copying files.
188 Raises
189 ------
190 RuntimeError
191 Raised when error copying files to the distribution point.
192 """
193 files_to_copy = {}
195 # In case there are folders we iterate over its content
196 for local_pfn in files_to_stage.values():
197 folder_name = os.path.basename(os.path.normpath(local_pfn))
198 if os.path.isdir(local_pfn):
199 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=True)
200 files_in_folder = ResourcePath.findFileResources([local_pfn])
201 for file in files_in_folder:
202 file_name = file.basename()
203 files_to_copy[file] = folder_uri.join(file_name, forceDirectory=False)
204 else:
205 folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
206 files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri
208 copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
209 future_file_copy = []
210 for src, trgt in files_to_copy.items():
211 _LOG.debug("Staging %s to %s", src, trgt)
212 # S3 clients explicitly instantiate here to overpass this
213 # https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
214 trgt.exists()
215 future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy"))
217 for future in concurrent.futures.as_completed(future_file_copy):
218 if future.result() is not None:
219 raise RuntimeError("Error of placing files to the distribution point")
222def get_idds_client(config):
223 """Get the idds client.
225 Parameters
226 ----------
227 config : `lsst.ctrl.bps.BpsConfig`
228 BPS configuration.
230 Returns
231 -------
232 idds_client: `idds.client.clientmanager.ClientManager`
233 The iDDS ClientManager object.
234 """
235 idds_server = None
236 if isinstance(config, BpsConfig):
237 _, idds_server = config.search("iddsServer", opt={"default": None})
238 elif isinstance(config, dict) and "iddsServer" in config:
239 idds_server = config["iddsServer"]
240 # if idds_server is None, a default value on the panda relay service
241 # will be used
242 idds_client = pandaclient.idds_api.get_api(
243 idds_utils.json_dumps, idds_host=idds_server, compress=True, manager=True
244 )
245 return idds_client
248def get_idds_result(ret):
249 """Parse the results returned from iDDS.
251 Parameters
252 ----------
253 ret : `tuple` [`int`, `tuple` [`bool`, payload ]]
254 The first part ``ret[0]`` is the status of PanDA relay service.
255 The part of ``ret[1][0]`` is the status of iDDS service.
256 The part of ``ret[1][1]`` is the returned payload.
257 If ``ret[1][0]`` is `False`, ``ret[1][1]`` can be error messages.
259 Returns
260 -------
261 status: `bool`
262 The status of iDDS calls.
263 result: `int` or `list` or `dict` or `None`
264 The result returned from iDDS. `None` if error state.
265 error: `str` or `None`
266 Error messages. `None` if no error state.
267 """
268 # https://panda-wms.readthedocs.io/en/latest/client/rest_idds.html
269 if not isinstance(ret, list | tuple) or ret[0] != 0:
270 # Something wrong with the PanDA relay service.
271 # The call may not be delivered to iDDS.
272 status = False
273 result = None
274 error = f"PanDA relay service returns errors: {ret}"
275 else:
276 if ret[1][0]:
277 status = True
278 result = ret[1][1]
279 error = None
280 if isinstance(result, str) and "Authentication no permission" in result:
281 status = False
282 result = None
283 error = result
284 else:
285 # iDDS returns errors
286 status = False
287 result = None
288 error = f"iDDS returns errors: {ret[1][1]}"
289 return status, result, error
292def idds_call_with_check(func, *, func_name: str, request_id: int, **kwargs):
293 """Call an iDDS client function, log, and check the return code.
295 Parameters
296 ----------
297 func : `~collections.abc.Callable`
298 The iDDS client function to call.
299 func_name : `str`
300 Name used for logging.
301 request_id : `int`
302 The request or workflow ID.
303 **kwargs
304 Additional keyword arguments passed to the function.
306 Returns
307 -------
308 ret : `~typing.Any`
309 The return value from the iDDS client function.
310 """
311 call_kwargs = dict(kwargs)
312 if request_id is not None:
313 call_kwargs["request_id"] = request_id
315 ret = func(**call_kwargs)
317 _LOG.debug("PanDA %s returned = %s", func_name, str(ret))
319 request_status = ret[0]
320 if request_status != 0:
321 raise RuntimeError(f"Error calling {func_name}: {ret} for id: {request_id}")
323 return ret
326def _make_pseudo_filename(config, gwjob):
327 """Make the job pseudo filename.
329 Parameters
330 ----------
331 config : `lsst.ctrl.bps.BpsConfig`
332 BPS configuration.
333 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
334 Job for which to create the pseudo filename.
336 Returns
337 -------
338 pseudo_filename : `str`
339 The pseudo filename for the given job.
340 """
341 cmd_line_embedder = CommandLineEmbedder(config)
342 _, pseudo_filename = cmd_line_embedder.substitute_command_line(
343 gwjob.executable.src_uri + " " + gwjob.arguments, gwjob.cmdvals, gwjob.name, []
344 )
345 return pseudo_filename
348def _make_doma_work(
349 config,
350 generic_workflow,
351 gwjob,
352 task_count,
353 task_chunk,
354 enable_event_service=False,
355 enable_job_name_map=False,
356 order_id_map_files=None,
357 es_label=None,
358 max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
359 max_wms_job_wall_time=None,
360 remote_filename=None,
361 qnode_map_filename=None,
362):
363 """Make the DOMA Work object for a PanDA task.
365 Parameters
366 ----------
367 config : `lsst.ctrl.bps.BpsConfig`
368 BPS configuration.
369 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
370 The workflow.
371 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
372 Job representing the jobs for the PanDA task.
373 task_count : `int`
374 Count of PanDA tasks used when making unique names.
375 task_chunk : `int`
376 Count of chunk of a PanDA tasks used when making unique names.
377 enable_event_service : `bool`, optional
378 ???.
379 enable_job_name_map : `bool`, optional
380 ???.
381 order_id_map_files : `typing.Any`, optional
382 ???.
383 es_label : `typing.Any`, optional
384 ???.
385 max_payloads_per_panda_job : `int`, optional
386 ???.
387 max_wms_job_wall_time : `typing.Any`, optional
388 ???.
389 remote_filename : `typing.Any`, optional
390 ???.
391 qnode_map_filename : `typing.Any`, optional
392 ???.
394 Returns
395 -------
396 work : `idds.doma.workflowv2.domapandawork.DomaPanDAWork`
397 The client representation of a PanDA task.
398 local_pfns : `dict` [`str`, `str`]
399 Files which need to be copied to a workflow staging area.
400 """
401 if order_id_map_files is None:
402 order_id_map_files = {}
403 _LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob)
404 cvals = {"curr_cluster": gwjob.label}
405 _, site = config.search("computeSite", opt={"curvals": cvals, "required": True})
406 cvals["curr_site"] = site
407 cvals["curr_pipetask"] = gwjob.label
408 _, processing_type = config.search(
409 "processingType", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROCESSING_TYPE}
410 )
411 if gwjob.label in ["finalJob", "customJob"]:
412 _, nonpipetask = config.search(gwjob.label)
413 default_type = "Rubin_Merge"
414 if gwjob.label == "customJob":
415 default_type = PANDA_DEFAULT_PROCESSING_TYPE
416 processing_type = nonpipetask["processingType"] if nonpipetask["processingType"] else default_type
417 _, task_type = config.search("taskType", opt={"curvals": cvals, "default": PANDA_DEFAULT_TASK_TYPE})
418 _, prod_source_label = config.search(
419 "prodSourceLabel", opt={"curvals": cvals, "default": PANDA_DEFAULT_PROD_SOURCE_LABEL}
420 )
421 _, vo = config.search("vo", opt={"curvals": cvals, "default": PANDA_DEFAULT_VO})
423 _, file_distribution_end_point = config.search(
424 "fileDistributionEndPoint", opt={"curvals": cvals, "default": None}
425 )
427 _, file_distribution_end_point_default = config.search(
428 "fileDistributionEndPointDefault", opt={"curvals": cvals, "default": None}
429 )
431 task_rss = gwjob.request_memory if gwjob.request_memory else PANDA_DEFAULT_RSS
432 task_rss_retry_step = task_rss * gwjob.memory_multiplier if gwjob.memory_multiplier else 0
433 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss
435 # Assume input files are same across task
436 local_pfns = {}
437 direct_io_files = set()
439 if gwjob.executable.transfer_executable:
440 local_pfns["job_executable"] = gwjob.executable.src_uri
441 job_executable = f"./{os.path.basename(gwjob.executable.src_uri)}"
442 else:
443 job_executable = gwjob.executable.src_uri
444 cmd_line_embedder = CommandLineEmbedder(config)
445 _LOG.debug(
446 "job %s inputs = %s, outputs = %s",
447 gwjob.name,
448 generic_workflow.get_job_inputs(gwjob.name),
449 generic_workflow.get_job_outputs(gwjob.name),
450 )
452 job_env = ""
453 if gwjob.environment:
454 for key, value in gwjob.environment.items():
455 try:
456 sub_value = value.format_map(gwjob.cmdvals)
457 except (KeyError, TypeError) as exc:
458 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc))
459 raise
460 job_env += f"export {key}={sub_value}; "
462 cmd_line, _ = cmd_line_embedder.substitute_command_line(
463 job_env + job_executable + " " + gwjob.arguments,
464 gwjob.cmdvals,
465 gwjob.name,
466 generic_workflow.get_job_inputs(gwjob.name) + generic_workflow.get_job_outputs(gwjob.name),
467 )
469 my_log = f"enable_event_service {enable_event_service} for {gwjob.label}"
470 _LOG.info(my_log)
471 if enable_event_service:
472 if gwjob.request_walltime and max_wms_job_wall_time:
473 my_log = (
474 f"requestWalltime({gwjob.request_walltime}) "
475 f"and maxWmsJobWalltime({max_wms_job_wall_time}) are set, "
476 "max_payloads_per_panda_job is int(max_wms_job_wall_time / gwjob.request_walltime), "
477 "ignore maxPayloadsPerPandaJob."
478 )
479 _LOG.info(my_log)
480 max_payloads_per_panda_job = int(max_wms_job_wall_time / gwjob.request_walltime)
481 if max_payloads_per_panda_job < 2:
482 my_log = (
483 f"max_payloads_per_panda_job ({max_payloads_per_panda_job}) is too small, "
484 "disable EventService"
485 )
486 _LOG.info(my_log)
487 enable_event_service = False
489 maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME
490 if enable_event_service:
491 if gwjob.request_walltime and max_payloads_per_panda_job:
492 maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job
493 elif max_wms_job_wall_time:
494 maxwalltime = max_wms_job_wall_time
496 if enable_event_service or enable_job_name_map:
497 for es_name in order_id_map_files:
498 local_pfns[es_name] = order_id_map_files[es_name]
500 for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True):
501 local_pfns[gwfile.name] = gwfile.src_uri
502 if os.path.isdir(gwfile.src_uri):
503 # this is needed to make isdir function working
504 # properly in ButlerURL instance on the edge node
505 local_pfns[gwfile.name] += "/"
507 if gwfile.job_access_remote:
508 direct_io_files.add(gwfile.name)
510 if qnode_map_filename:
511 local_pfns.update(qnode_map_filename)
513 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
515 if not direct_io_files:
516 if submit_cmd:
517 direct_io_files.add(remote_filename)
518 else:
519 direct_io_files.add("cmdlineplaceholder")
521 lsst_temp = "LSST_RUN_TEMP_SPACE"
522 if lsst_temp in file_distribution_end_point and lsst_temp not in os.environ:
523 file_distribution_end_point = file_distribution_end_point_default
524 if submit_cmd and not file_distribution_end_point:
525 file_distribution_end_point = "FileDistribution"
527 executable = add_decoder_prefix(
528 config, cmd_line, file_distribution_end_point, (local_pfns, direct_io_files)
529 )
530 work = DomaPanDAWork(
531 executable=executable,
532 primary_input_collection={
533 "scope": "pseudo_dataset",
534 "name": f"pseudo_input_collection#{task_count}",
535 },
536 output_collections=[{"scope": "pseudo_dataset", "name": f"pseudo_output_collection#{task_count}"}],
537 log_collections=[],
538 dependency_map=[],
539 task_name=f"{generic_workflow.name}_{task_count:02d}_{gwjob.label}_{task_chunk:02d}",
540 task_queue=gwjob.queue,
541 task_log={
542 "destination": "local",
543 "value": "log.tgz",
544 "dataset": "PandaJob_#{pandaid}/",
545 "token": "local",
546 "param_type": "log",
547 "type": "template",
548 },
549 encode_command_line=True,
550 task_rss=task_rss,
551 task_rss_retry_offset=task_rss_retry_offset,
552 task_rss_retry_step=task_rss_retry_step,
553 task_rss_max=gwjob.request_memory_max if gwjob.request_memory_max else PANDA_DEFAULT_RSS_MAX,
554 task_cloud=gwjob.compute_cloud if gwjob.compute_cloud else PANDA_DEFAULT_CLOUD,
555 task_site=site,
556 task_priority=int(gwjob.priority) if gwjob.priority else PANDA_DEFAULT_PRIORITY,
557 core_count=gwjob.request_cpus if gwjob.request_cpus else PANDA_DEFAULT_CORE_COUNT,
558 working_group=gwjob.accounting_group,
559 processing_type=processing_type,
560 task_type=task_type,
561 prodSourceLabel=prod_source_label,
562 vo=vo,
563 es=enable_event_service,
564 es_label=es_label,
565 max_events_per_job=max_payloads_per_panda_job,
566 maxattempt=gwjob.number_of_retries if gwjob.number_of_retries else PANDA_DEFAULT_MAX_ATTEMPTS,
567 maxwalltime=maxwalltime,
568 )
569 return work, local_pfns
572def add_final_idds_work(
573 config, generic_workflow, idds_client_workflow, dag_sink_work, task_count, task_chunk
574):
575 """Add the special final PanDA task to the client workflow.
577 Parameters
578 ----------
579 config : `lsst.ctrl.bps.BpsConfig`
580 BPS configuration.
581 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
582 Generic workflow in which to find the final job.
583 idds_client_workflow : `idds.workflowv2.workflow.Workflow`
584 The iDDS client representation of the workflow to which the final task
585 is added.
586 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`]
587 The work nodes in the client workflow which have no successors.
588 task_count : `int`
589 Count of PanDA tasks used when making unique names.
590 task_chunk : `int`
591 Count of chunk of a PanDA tasks used when making unique names.
593 Returns
594 -------
595 files : `dict` [`str`, `str`]
596 Files which need to be copied to a workflow staging area.
598 Raises
599 ------
600 NotImplementedError
601 Raised if final job in GenericWorkflow is itself a workflow.
602 TypeError
603 Raised if final job in GenericWorkflow is invalid type.
604 """
605 files = {}
607 # If final job exists in generic workflow, create DAG final job
608 final = generic_workflow.get_final()
609 if final:
610 if isinstance(final, GenericWorkflow):
611 raise NotImplementedError("PanDA plugin does not support a workflow as the final job")
613 if not isinstance(final, GenericWorkflowJob):
614 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})")
616 dag_final_work, files = _make_doma_work(
617 config,
618 generic_workflow,
619 final,
620 task_count,
621 task_chunk,
622 )
623 pseudo_filename = "pure_pseudoinput+qgraphNodeId:+qgraphId:"
624 dag_final_work.dependency_map.append(
625 {"name": pseudo_filename, "submitted": False, "dependencies": []}
626 )
627 idds_client_workflow.add_work(dag_final_work)
628 conditions = []
629 for work in dag_sink_work:
630 conditions.append(work.is_terminated)
631 and_cond = AndCondition(conditions=conditions, true_works=[dag_final_work])
632 idds_client_workflow.add_condition(and_cond)
633 else:
634 _LOG.debug("No final job in GenericWorkflow")
635 return files
638def convert_exec_string_to_hex(cmdline):
639 """Convert the command line into hex representation.
641 This step is currently involved because large blocks of command lines
642 including special symbols passed to the pilot/container. To make sure
643 the 1 to 1 matching and pass by the special symbol stripping
644 performed by the Pilot we applied the hexing.
646 Parameters
647 ----------
648 cmdline : `str`
649 UTF-8 command line string.
651 Returns
652 -------
653 hex : `str`
654 Hex representation of string.
655 """
656 return binascii.hexlify(cmdline.encode()).decode("utf-8")
659def add_decoder_prefix(config, cmd_line, distribution_path, files):
660 """Compose the command line sent to the pilot from the functional part
661 (the actual SW running) and the middleware part (containers invocation).
663 Parameters
664 ----------
665 config : `lsst.ctrl.bps.BpsConfig`
666 Configuration information.
667 cmd_line : `str`
668 UTF-8 based functional part of the command line.
669 distribution_path : `str`
670 URI of path where all files are located for distribution.
671 files : `tuple` [`dict` [`str`, `str`], `list` [`str`]]
672 File names needed for a task (copied local, direct access).
674 Returns
675 -------
676 decoder_prefix : `str`
677 Full command line to be executed on the edge node.
678 """
679 # Manipulate file paths for placement on cmdline
680 files_plc_hldr = {}
681 for key, pfn in files[0].items():
682 if pfn.endswith("/"):
683 files_plc_hldr[key] = os.path.basename(pfn[:-1])
684 isdir = True
685 else:
686 files_plc_hldr[key] = os.path.basename(pfn)
687 _, extension = os.path.splitext(pfn)
688 isdir = os.path.isdir(pfn) or (key == "butlerConfig" and extension != "yaml")
689 if isdir:
690 # this is needed to make isdir function working
691 # properly in ButlerURL instance on the egde node
692 files_plc_hldr[key] += "/"
693 _LOG.debug("files_plc_hldr[%s] = %s", key, files_plc_hldr[key])
695 cmdline_hex = convert_exec_string_to_hex(cmd_line)
696 _, runner_command = config.search("runnerCommand", opt={"replaceEnvVars": False, "expandEnvVars": False})
697 order_id_map_filename = files[0].get("orderIdMapFilename", None)
698 if order_id_map_filename:
699 order_id_map_filename = os.path.basename(order_id_map_filename)
700 order_id_map_filename = os.path.join(distribution_path, order_id_map_filename)
701 runner_command = runner_command.replace("orderIdMapFilename", order_id_map_filename)
702 runner_command = runner_command.replace("\n", " ")
703 decoder_prefix = runner_command.replace(
704 "_cmd_line_",
705 str(cmdline_hex)
706 + " ${IN/L} "
707 + distribution_path
708 + " "
709 + "+".join(f"{k}:{v}" for k, v in files_plc_hldr.items())
710 + " "
711 + "+".join(files[1]),
712 )
713 return decoder_prefix
716def add_idds_work(config, generic_workflow, idds_workflow):
717 """Convert GenericWorkflowJobs to iDDS work and add them to the iDDS
718 workflow.
720 Parameters
721 ----------
722 config : `lsst.ctrl.bps.BpsConfig`
723 BPS configuration.
724 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
725 Generic workflow containing jobs to convert.
726 idds_workflow : `idds.workflowv2.workflow.Workflow`
727 The iDDS workflow to which the converted jobs should be added.
729 Returns
730 -------
731 files_to_pre_stage : `dict` [`str`, `str`]
732 Files that need to be copied to the staging area before submission.
733 dag_sink_work : `list` [`idds.doma.workflowv2.domapandawork.DomaPanDAWork`]
734 The work nodes in the client workflow which have no successors.
735 task_count : `int`
736 Number of tasks in iDDS workflow used for unique task names.
738 Raises
739 ------
740 RuntimeError
741 If cannot recover from dependency issues after pass through workflow.
742 """
743 # event service
744 _, enable_event_service = config.search("enableEventService", opt={"default": None})
745 _, enable_qnode_map = config.search("enableQnodeMap", opt={"default": None})
746 _, max_payloads_per_panda_job = config.search(
747 "maxPayloadsPerPandaJob", opt={"default": PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB}
748 )
749 _, max_wms_job_wall_time = config.search("maxWmsJobWalltime", opt={"default": None})
750 my_log = (
751 f"enableEventService: {enable_event_service}, maxPayloadsPerPandaJob: {max_payloads_per_panda_job}"
752 )
753 _LOG.info(my_log)
755 # job name map: Use a short job name to map the long job name
756 _, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None})
757 _LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}")
758 if enable_event_service and not enable_job_name_map:
759 enable_job_name_map = True
760 my_log = "enable_event_service is set, set enable_job_name_map True."
761 _LOG.info(my_log)
763 # Limit number of jobs in single PanDA task
764 _, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK})
766 files_to_pre_stage = {}
767 dag_sink_work = [] # Workflow sink nodes that need to be connected to final task
768 job_to_task = {}
769 job_to_pseudo_filename = {}
770 task_count = 0 # Task number/ID in idds workflow used for unique name
771 remote_archive_filename = None
773 submit_path = config["submitPath"]
775 submit_cmd = generic_workflow.run_attrs.get("bps_iscustom", False)
776 if submit_cmd:
777 files = generic_workflow.get_executables(data=False, transfer_only=True)
778 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz"
779 archive_filename = create_archive_file(submit_path, archive_filename, files)
780 remote_archive_filename = copy_files_to_pandacache(archive_filename)
782 order_id_map_files = {}
783 name_works = {}
784 order_id_map = {}
785 job_name_to_order_id_map = {}
786 order_id_map_file = None
787 max_payloads_per_panda_job_by_label = {}
788 if enable_event_service:
789 enable_event_service = enable_event_service.split(",")
790 enable_event_service_tmp = []
791 for es_def in enable_event_service:
792 if ":" in es_def:
793 es_label, m_payloads = es_def.split(":")
794 else:
795 es_label, m_payloads = es_def, max_payloads_per_panda_job
796 es_label = es_label.strip()
797 enable_event_service_tmp.append(es_label)
798 max_payloads_per_panda_job_by_label[es_label] = int(m_payloads)
799 enable_event_service = enable_event_service_tmp
800 if enable_job_name_map:
801 _, order_id_map_filename = config.search(
802 "orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
803 )
804 order_id_map_file = os.path.join(submit_path, order_id_map_filename)
805 order_id_map_files = {"orderIdMapFilename": order_id_map_file}
806 files_to_pre_stage.update(order_id_map_files)
808 # To avoid dying due to optimizing number of times through workflow,
809 # catch dependency issues to loop through again later.
810 jobs_with_dependency_issues = {}
812 # Initialize quantum node map
813 qnode_map = {}
814 qnode_map_filename = None
815 if enable_qnode_map:
816 qnode_map_file = os.path.join(submit_path, "qnode_map.json")
817 qnode_map_filename = {"qnodemap": qnode_map_file}
818 files_to_pre_stage.update(qnode_map_filename)
820 # Assume jobs with same label share config values
821 for job_label in generic_workflow.labels:
822 _LOG.debug("job_label = %s", job_label)
824 if enable_job_name_map:
825 order_id_map[job_label] = {}
826 job_name_to_order_id_map[job_label] = {}
828 # Add each job with a particular label to a corresponding PanDA task
829 # A PanDA task has a limit on number of jobs, so break into multiple
830 # PanDA tasks if needed.
831 job_count = 0 # Number of jobs in idds task used for task chunking
832 task_chunk = 1 # Task chunk number within job label used for unique name
833 work = None
834 order_id = -1
836 # Instead of changing code to make chunks up front and round-robin
837 # assign jobs to chunks, for now keeping chunk creation in loop
838 # but using knowledge of how many chunks there will be to set better
839 # maximum number of jobs in a chunk for more even distribution.
840 jobs_by_label = generic_workflow.get_jobs_by_label(job_label)
841 num_chunks = -(-len(jobs_by_label) // max_jobs_per_task) # ceil
842 max_jobs_per_task_this_label = -(-len(jobs_by_label) // num_chunks)
843 _LOG.debug(
844 "For job_label = %s, num jobs = %s, num_chunks = %s, max_jobs = %s",
845 job_label,
846 len(jobs_by_label),
847 num_chunks,
848 max_jobs_per_task_this_label,
849 )
850 for gwjob in jobs_by_label:
851 order_id += 1
852 pseudo_filename = _make_pseudo_filename(config, gwjob)
853 job_to_pseudo_filename[gwjob.name] = pseudo_filename
854 if enable_job_name_map:
855 order_id_map[job_label][str(order_id)] = pseudo_filename
856 job_name_to_order_id_map[job_label][gwjob.name] = str(order_id)
858 job_count += 1
859 if job_count > max_jobs_per_task_this_label:
860 job_count = 1
861 task_chunk += 1
863 if job_count == 1:
864 # Create new PanDA task object
865 task_count += 1
866 work_enable_event_service = False
867 if enable_event_service and job_label in enable_event_service:
868 work_enable_event_service = True
869 max_payloads_per_panda_job_current = max_payloads_per_panda_job_by_label.get(
870 job_label, max_payloads_per_panda_job
871 )
872 work, files = _make_doma_work(
873 config,
874 generic_workflow,
875 gwjob,
876 task_count,
877 task_chunk,
878 enable_event_service=work_enable_event_service,
879 enable_job_name_map=enable_job_name_map,
880 order_id_map_files=order_id_map_files,
881 es_label=job_label,
882 max_payloads_per_panda_job=max_payloads_per_panda_job_current,
883 max_wms_job_wall_time=max_wms_job_wall_time,
884 remote_filename=remote_archive_filename,
885 qnode_map_filename=qnode_map_filename,
886 )
887 work.dependency_tasks = []
888 name_works[work.task_name] = work
889 files_to_pre_stage.update(files)
890 idds_workflow.add_work(work)
891 if generic_workflow.out_degree(gwjob.name) == 0:
892 dag_sink_work.append(work)
894 if enable_qnode_map:
895 job_name_PH = "PH:" + gwjob.name
896 job_to_pseudo_filename[gwjob.name] = job_name_PH
897 qnode_map[job_name_PH] = pseudo_filename
899 job_to_task[gwjob.name] = work.get_work_name()
900 deps = []
901 missing_deps = False
902 for parent_job_name in generic_workflow.predecessors(gwjob.name):
903 if parent_job_name not in job_to_task:
904 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
905 missing_deps = True
906 break
907 else:
908 if enable_job_name_map:
909 parent_job = generic_workflow.get_job(parent_job_name)
910 parent_job_label = parent_job.label
911 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
912 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"
913 else:
914 inputname = job_to_pseudo_filename[parent_job_name]
916 parent_task_name = job_to_task[parent_job_name]
917 deps.append(
918 {
919 "task": parent_task_name,
920 "inputname": inputname,
921 }
922 )
923 if parent_task_name not in work.dependency_tasks:
924 work.dependency_tasks.append(parent_task_name)
925 if not missing_deps:
926 j_name = job_to_pseudo_filename[gwjob.name]
927 f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name
928 work.dependency_map.append(
929 {
930 "name": f_name,
931 "order_id": order_id,
932 "dependencies": deps,
933 }
934 )
935 else:
936 jobs_with_dependency_issues[gwjob.name] = {
937 "work": work,
938 "order_id": order_id,
939 "label": job_label,
940 }
942 if enable_qnode_map:
943 with open(qnode_map_file, "w", encoding="utf-8") as f:
944 json.dump(qnode_map, f, indent=2)
946 # If there were any issues figuring out dependencies through earlier loop
947 if jobs_with_dependency_issues:
948 _LOG.warning("Could not prepare workflow in single pass. Please notify developers.")
949 _LOG.info("Trying to recover...")
950 for job_name, work_item in jobs_with_dependency_issues.items():
951 deps = []
952 work = work_item["work"]
953 order_id = work_item["order_id"]
954 job_label = work_item["label"]
956 for parent_job_name in generic_workflow.predecessors(job_name):
957 if parent_job_name not in job_to_task:
958 _LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
959 raise RuntimeError(
960 "Could not recover from dependency issues ({job_name} missing {parent_job_name})."
961 )
962 if enable_job_name_map:
963 parent_job = generic_workflow.get_job(parent_job_name)
964 parent_job_label = parent_job.label
965 parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
966 inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"
967 else:
968 inputname = job_to_pseudo_filename[parent_job_name]
970 parent_task_name = job_to_task[parent_job_name]
971 deps.append(
972 {
973 "task": parent_task_name,
974 "inputname": inputname,
975 }
976 )
977 if parent_task_name not in work.dependency_tasks:
978 work.dependency_tasks.append(parent_task_name)
980 work.dependency_map.append(
981 {
982 "name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else job_name,
983 "order_id": order_id,
984 "dependencies": deps,
985 }
986 )
988 _LOG.info("Successfully recovered.")
990 for task_name in name_works:
991 work = name_works[task_name]
992 # trigger the setter function which will validate the dependency_map:
993 # 1) check the name length to avoid the the name too long,
994 # 2) check to avoid duplicated items.
995 sorted_dep_map = sorted(work.dependency_map, key=lambda x: x["order_id"])
996 work.dependency_map = sorted_dep_map
998 if enable_job_name_map:
999 with open(order_id_map_file, "w") as f:
1000 json.dump(order_id_map, f)
1002 return files_to_pre_stage, dag_sink_work, task_count
1005def create_archive_file(submit_path, archive_filename, files):
1006 if not archive_filename.startswith("/"):
1007 archive_filename = os.path.join(submit_path, archive_filename)
1009 with tarfile.open(archive_filename, "w:gz", dereference=True) as tar:
1010 for local_file in files:
1011 base_name = os.path.basename(local_file)
1012 tar.add(local_file, arcname=os.path.basename(base_name))
1013 return archive_filename
1016def copy_files_to_pandacache(filename):
1017 from pandaclient import Client
1019 attempt = 0
1020 max_attempts = 3
1021 done = False
1022 while attempt < max_attempts and not done:
1023 status, out = Client.putFile(filename, True)
1024 if status == 0:
1025 done = True
1026 print(f"copy_files_to_pandacache: status: {status}, out: {out}")
1027 if out.startswith("NewFileName:"):
1028 # found the same input sandbox to reuse
1029 filename = out.split(":")[-1]
1030 elif out != "True":
1031 print(out)
1032 return None
1034 filename = os.path.basename(filename)
1035 cache_path = os.path.join(os.environ["PANDACACHE_URL"], "cache")
1036 filename = os.path.join(cache_path, filename)
1037 return filename
1040def download_extract_archive(filename, prefix=None):
1041 """Download and extract the tarball from pandacache.
1043 Parameters
1044 ----------
1045 filename : `str`
1046 The filename to download.
1047 prefix : `str`, optional
1048 The target directory the tarball will be downloaded and extracted to.
1049 If None (default), the current directory will be used.
1050 """
1051 archive_basename = os.path.basename(filename)
1052 target_dir = prefix if prefix is not None else os.getcwd()
1053 full_output_filename = os.path.join(target_dir, archive_basename)
1055 if filename.startswith("https:"):
1056 panda_cache_url = os.path.dirname(os.path.dirname(filename))
1057 os.environ["PANDACACHE_URL"] = panda_cache_url
1058 elif "PANDACACHE_URL" not in os.environ and "PANDA_URL_SSL" in os.environ:
1059 os.environ["PANDACACHE_URL"] = os.environ["PANDA_URL_SSL"]
1060 panda_cache_url = os.environ.get("PANDACACHE_URL", None)
1061 print(f"PANDACACHE_URL: {panda_cache_url}")
1063 # The import of PanDA client must happen *after* the PANDACACHE_URL is set.
1064 # Otherwise, the PanDA client the environment setting will not be parsed.
1065 from pandaclient import Client
1067 attempt = 0
1068 max_attempts = 3
1069 while attempt < max_attempts:
1070 status, output = Client.getFile(archive_basename, output_path=full_output_filename)
1071 if status == 0:
1072 break
1073 if attempt <= 1:
1074 secs = random.randint(1, 10)
1075 elif attempt <= 2:
1076 secs = random.randint(1, 60)
1077 else:
1078 secs = random.randint(1, 120)
1079 time.sleep(secs)
1080 print(f"Download archive file from pandacache status: {status}, output: {output}")
1081 if status != 0:
1082 raise RuntimeError("Failed to download archive file from pandacache")
1083 with tarfile.open(full_output_filename, "r:gz") as f:
1084 f.extractall(target_dir)
1085 print(f"Extracted {full_output_filename} to {target_dir}")
1086 os.remove(full_output_filename)
1087 print(f"Removed {full_output_filename}")
1090def get_task_parameter(config, remote_build, key):
1091 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False}
1092 _, value = remote_build.search(key, search_opt)
1093 if not value:
1094 _, value = config.search(key, search_opt)
1095 return value
1098def create_idds_build_workflow(**kwargs):
1099 config = kwargs["config"] if "config" in kwargs else None
1100 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None
1101 config_file = kwargs["config_file"] if "config_file" in kwargs else None
1102 config_file_base = os.path.basename(config_file) if config_file else None
1103 compute_site = kwargs["compute_site"] if "compute_site" in kwargs else None
1104 _, files = remote_build.search("files", opt={"default": []})
1105 submit_path = config["submitPath"]
1106 files.append(config_file)
1107 archive_filename = f"jobO.{uuid.uuid4()}.tar.gz"
1108 archive_filename = create_archive_file(submit_path, archive_filename, files)
1109 _LOG.info(f"archive file name: {archive_filename}")
1110 remote_filename = copy_files_to_pandacache(archive_filename)
1111 _LOG.info(f"pandacache file: {remote_filename}")
1113 _LOG.info(type(remote_build))
1114 search_opt = {"replaceVars": True, "expandEnvVars": False, "replaceEnvVars": False, "required": False}
1115 cvals = {"LSST_VERSION": get_task_parameter(config, remote_build, "LSST_VERSION")}
1116 cvals["custom_lsst_setup"] = get_task_parameter(config, remote_build, "custom_lsst_setup")
1117 max_name_length = PANDA_DEFAULT_NAME_LENGTH
1118 if "IDDS_MAX_NAME_LENGTH" in os.environ:
1119 max_name_length = int(os.environ["IDDS_MAX_NAME_LENGTH"])
1120 cvals["IDDS_MAX_NAME_LENGTH"] = max_name_length
1121 search_opt["curvals"] = cvals
1122 _, executable = remote_build.search("runnerCommand", opt=search_opt)
1123 executable = executable.replace("_download_cmd_line_", remote_filename)
1124 executable = executable.replace("_build_cmd_line_", config_file_base)
1125 executable = executable.replace("_compute_site_", compute_site or "")
1127 task_cloud = get_task_parameter(config, remote_build, "computeCloud")
1128 task_site = get_task_parameter(config, remote_build, "computeSite")
1129 task_queue = get_task_parameter(config, remote_build, "queue")
1130 task_rss = get_task_parameter(config, remote_build, "requestMemory")
1131 task_rss_max = get_task_parameter(config, remote_build, "requestMemoryMax")
1132 memory_multiplier = get_task_parameter(config, remote_build, "memoryMultiplier")
1133 task_rss_retry_step = task_rss * memory_multiplier if memory_multiplier else 0
1134 task_rss_retry_offset = 0 if task_rss_retry_step else task_rss
1135 nretries = get_task_parameter(config, remote_build, "numberOfRetries")
1136 processing_type = get_task_parameter(config, remote_build, "processingType")
1137 priority = get_task_parameter(config, remote_build, "priority")
1138 _LOG.info("requestMemory: %s", task_rss)
1139 _LOG.info("Site: %s", task_site)
1140 # _LOG.info("executable: %s", executable)
1141 # TODO: fill other parameters based on config
1142 build_work = DomaPanDAWork(
1143 executable=executable,
1144 task_type="lsst_build",
1145 primary_input_collection={"scope": "pseudo_dataset", "name": "pseudo_input_collection#1"},
1146 output_collections=[{"scope": "pseudo_dataset", "name": "pseudo_output_collection#1"}],
1147 log_collections=[],
1148 dependency_map=None,
1149 task_name="build_task",
1150 task_queue=task_queue,
1151 encode_command_line=True,
1152 prodSourceLabel="managed",
1153 processing_type=processing_type,
1154 task_log={
1155 "dataset": "PandaJob_#{pandaid}/",
1156 "destination": "local",
1157 "param_type": "log",
1158 "token": "local",
1159 "type": "template",
1160 "value": "log.tgz",
1161 },
1162 task_rss=task_rss if task_rss else PANDA_DEFAULT_RSS,
1163 task_rss_max=task_rss_max if task_rss_max else PANDA_DEFAULT_RSS_MAX,
1164 task_rss_retry_offset=task_rss_retry_offset,
1165 task_rss_retry_step=task_rss_retry_step,
1166 task_cloud=task_cloud,
1167 task_site=task_site,
1168 task_priority=int(priority) if priority else PANDA_DEFAULT_PRIORITY,
1169 maxattempt=nretries if nretries > 0 else PANDA_DEFAULT_MAX_ATTEMPTS,
1170 )
1172 workflow = IDDS_client_workflow()
1174 workflow.add_work(build_work)
1175 workflow.name = config["bps_defined"]["uniqProcName"]
1176 return workflow