Coverage for python / lsst / ctrl / bps / panda / edgenode / cmd_line_decoder.py: 7%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
1#!/usr/bin/python
3# This file is part of ctrl_bps_panda.
4#
5# Developed for the LSST Data Management System.
6# This product includes software developed by the LSST Project
7# (https://www.lsst.org).
8# See the COPYRIGHT file at the top-level directory of this distribution
9# for details of code ownership.
10#
11# This software is dual licensed under the GNU General Public License and also
12# under a 3-clause BSD license. Recipients may choose which of these licenses
13# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
14# respectively. If you choose the GPL option then the following text applies
15# (but note that there is still no warranty even if you opt for BSD instead):
16#
17# This program is free software: you can redistribute it and/or modify
18# it under the terms of the GNU General Public License as published by
19# the Free Software Foundation, either version 3 of the License, or
20# (at your option) any later version.
21#
22# This program is distributed in the hope that it will be useful,
23# but WITHOUT ANY WARRANTY; without even the implied warranty of
24# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25# GNU General Public License for more details.
26#
27# You should have received a copy of the GNU General Public License
28# along with this program. If not, see <https://www.gnu.org/licenses/>.
30"""
31Decode the command line string sent from the BPS
32plugin -> PanDA -> Edge node cluster management
33-> Edge node -> Container. This file is not a part
34of the BPS but a part of the payload wrapper.
35It decodes the hexified command line.
36"""
38import binascii
39import json
40import os
41import re
42import sys
44from lsst.ctrl.bps.panda.utils import download_extract_archive
45from lsst.resources import ResourcePath
48def replace_placeholders(cmd_line: str, tag: str, replacements: dict[str, str]) -> str:
49 """Replace the placeholders.
51 Parameters
52 ----------
53 cmd_line : `str`
54 Command line.
55 tag : `str`
56 Tag to use for finding placeholders.
57 replacements : `dict` [`str`, `str`]
58 Replacements indexed by place holder string.
60 Returns
61 -------
62 modified : `str`
63 Processed command line.
64 """
65 occurrences_to_replace = re.findall(f"<{tag}:(.*?)>", cmd_line)
66 for placeholder in occurrences_to_replace:
67 if placeholder in replacements:
68 cmd_line = cmd_line.replace(f"<{tag}:{placeholder}>", replacements[placeholder])
69 else:
70 raise ValueError(
71 "ValueError exception thrown, because "
72 f"{placeholder} is not found in the "
73 "replacement values and could "
74 "not be passed to the command line"
75 )
76 return cmd_line
79def replace_environment_vars(cmd_line):
80 """Replace placeholders to the actual environment variables.
82 Parameters
83 ----------
84 cmd_line : `str`
85 Command line.
87 Returns
88 -------
89 cmdline: `str`
90 Processed command line.
91 """
92 environment_vars = os.environ
93 cmd_line = replace_placeholders(cmd_line, "ENV", environment_vars)
94 return cmd_line
97def replace_files_placeholders(cmd_line, files):
98 """Replace placeholders for files.
100 Parameters
101 ----------
102 cmd_line : `str`
103 Command line.
104 files : `str`
105 String with key:value pairs separated by the '+' sign.
106 Keys contain the file type (runQgraphFile,
107 butlerConfig, ...).
108 Values contains file names.
110 Returns
111 -------
112 cmd_line: `str`
113 Processed command line.
114 """
115 files_key_vals = files.split("+")
116 files = {}
117 for file in files_key_vals:
118 file_name_placeholder, file_name = file.split(":")
119 files[file_name_placeholder] = file_name
120 cmd_line = replace_placeholders(cmd_line, "FILE", files)
121 return cmd_line
124def deliver_input_files(src_path, files, skip_copy):
125 """Deliver input files needed for a job.
127 Parameters
128 ----------
129 src_path : `str`
130 URI for folder where the input files placed.
131 files : `str`
132 String with file names separated by the '+' sign.
133 skip_copy : `str`
134 String with file names separated by the '+' sign indicating which
135 files in ``files`` should not be copied.
137 Returns
138 -------
139 cmdline: `str`
140 Processed command line.
141 """
142 files = files.split("+")
143 src_uri = ResourcePath(src_path, forceDirectory=True)
145 if "jobO" in skip_copy:
146 download_extract_archive(skip_copy)
147 for script in files:
148 file_name_placeholder, file_pfn = script.split(":")
149 os.chmod(file_pfn, 0o755)
150 return
152 for file in files:
153 file_name_placeholder, file_pfn = file.split(":")
154 if file_name_placeholder not in skip_copy.split("+"):
155 src = src_uri.join(file_pfn)
156 base_dir = None
157 if src.isdir():
158 files_to_copy = ResourcePath.findFileResources([src])
159 base_dir = file_pfn
160 else:
161 files_to_copy = [src]
162 dest_base = ResourcePath("", forceAbsolute=True, forceDirectory=True)
163 if base_dir:
164 dest_base = dest_base.join(base_dir)
165 for file_to_copy in files_to_copy:
166 dest = dest_base.join(file_to_copy.basename())
167 if file_name_placeholder == "orderIdMapFilename":
168 if not dest.exists():
169 dest.transfer_from(file_to_copy, transfer="copy")
170 else:
171 dest.transfer_from(file_to_copy, transfer="copy")
172 print(f"copied {file_to_copy.path} to {dest.path}", file=sys.stderr)
173 if file_name_placeholder == "job_executable":
174 os.chmod(dest.path, 0o777)
177def replace_event_file(params, files):
178 """Replace events with node id.
180 Parameters
181 ----------
182 params : `str`
183 String with parameters separated by the '+' sign.
184 Example params:
185 isr:eventservice_90^10+somethingelse. This part
186 'isr:eventservice_90^10' is the EventService parameter.
187 isr:orderIdMap_10. This part is using order_id map file. But it
188 is not EventService.
189 The format for the EventService parameter for LSST is
190 'label:eventservice_<baseid>^<localid>'. The '<localid>' should
191 start from 1, which means the first event of the file
192 'label:eventservice_<baseid>'. In EventService, all pseudo files
193 for a label is recorded in the 'orderIdMapFilename' file, with
194 a dict {'label0':{"0":"pseudo_file0", "1":..},'label1':..}.
195 For example, for a workflow with 100 pseudo files for the 'isr' label,
196 the dict will be {'isr': {"0": "pseudo0", "1": "pseudo_file1",
197 "99": "pseudo_file99"}}. If we split the 100 pseudo files into 5 PanDA
198 jobs with 20 files per PanDA job, the 5 eventservice group name will be
199 'isr:event_service_0' for events ["0"~"19"], 'isr:event_service_20' for
200 events ["20"~"39"], ..., and 'isr:event_service_80' for events
201 ["80"~"99"]. The EventService param 'isr:event_service_80^5' means the
202 5th event in the group 'isr:event_service_80', which is '80 + 5 -1=84'
203 and will be mapped to file 'pseudo_file84'.
204 files : `str`
205 String with file names separated by the '+' sign.
206 Example:
207 orderIdMapFilename:panda_order_id_map.json+runQgraphFile:a.qgraph
209 Returns
210 -------
211 ret_status: `bool`
212 Status of this function. If eventservice is enabled but this function
213 cannot handle it, it should return False. Otherwise it should
214 return True.
215 with_events: `bool`
216 Whether there are event parameters.
217 params_map: `dict` [`str`, `dict`]
218 Parameter map for event information.
219 """
220 ret_status = True
221 with_events = False
222 with_order_id_map = False
223 files = files.split("+")
224 file_map = {}
225 for file in files:
226 file_name_placeholder, file_pfn = file.split(":")
227 file_map[file_name_placeholder] = file_pfn
228 order_id_map_file = file_map.get("orderIdMapFilename", None)
229 order_id_map = {}
230 try:
231 # The orderIdMapFilename should exist locally or copied to current
232 # directory by deliver_input_files
233 if order_id_map_file and os.path.exists(order_id_map_file):
234 with open(order_id_map_file) as f:
235 order_id_map = json.load(f)
236 except Exception as ex:
237 print(f"failed to load orderIdMapFilename: {ex}")
239 params_map = {}
240 params_list = params.split("+")
241 for param in params_list:
242 if "eventservice_" in param:
243 with_events = True
244 label, event = param.split(":")
245 event_id = event.split("_")[1]
246 event_base_id = event_id.split("^")[0]
247 # The original format for EventService parameter is
248 # 'label:eventservice_<baseid>^<localid>^<numberOfEvents>',
249 # which can have multiple events per EventService job.
250 # However, for LSST, the '<numberOfEvents>' is always 1.
251 # When <numberOfEvents> is 1, it will not show. So for LSST,
252 # we will see 'label:eventservice_<baseid>^<localid>'.
253 # However, to leave posibilities for future updates,
254 # the line below has two splits based on '^', which is from
255 # the original EventService parameter format.
256 event_order = event_id.split("^")[1].split("^")[0]
257 event_index = str(int(event_base_id) + int(event_order) - 1)
258 if not order_id_map:
259 print("EventSerice is enabled but order_id_map file doesn't exist.")
260 ret_status = False
261 break
263 if label not in order_id_map:
264 print(
265 f"EventSerice is enabled but label {label} doesn't in the keys"
266 f" of order_id_map {order_id_map.keys()}"
267 )
268 ret_status = False
269 break
270 if event_index not in order_id_map[label]:
271 print(
272 f"EventSerice is enabled but event_index {event_index} is not"
273 f" in order_id_map[{label}] {order_id_map[label].keys()}"
274 )
275 ret_status = False
276 break
278 params_map[param] = {"order_id": event_index, "order_id_map": order_id_map[label]}
279 elif "orderIdMap_" in param:
280 with_order_id_map = True
281 label, event = param.split(":")
282 order_id = event.split("_")[1]
283 if not order_id_map:
284 print("orderIdMap is enabled but order_id_map file doesn't exist.")
285 ret_status = False
286 break
288 if label not in order_id_map:
289 print(
290 f"orderIdMap is enabled but label {label} doesn't in the keys"
291 f" of order_id_map {order_id_map.keys()}"
292 )
293 ret_status = False
294 break
295 if order_id not in order_id_map[label]:
296 print(
297 f"orderIdMap is enabled but order_id {order_id} is not"
298 f" in order_id_map[{label}] {order_id_map[label].keys()}"
299 )
300 ret_status = False
301 break
303 params_map[param] = {"order_id": order_id, "order_id_map": order_id_map[label]}
304 return ret_status, with_events, with_order_id_map, params_map
307def use_map_file(input_file):
308 """Check whether the input file needs to be replaced
309 because enableQnodeMap is enabled.
311 Parameters
312 ----------
313 input_file : `str`
314 Input file either a pseudo file or job name.
316 Returns
317 -------
318 use_qnode_map: `bool`
319 Whether qnode_map is used. There is a placeholder 'PH'
320 when enableQnodeMap is true.
321 """
322 parts = input_file.split(":")
323 use_qnode_map = len(parts) == 2 and parts[0] == "PH"
324 return use_qnode_map
327if __name__ == "__main__": 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5])
329 cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
330 data_params = sys.argv[2]
331 cmd_line = replace_environment_vars(cmd_line)
333 print(f"cmd_line: {cmd_line}")
334 print(f"data_params: {data_params}")
336 # If EventService is enabled, data_params will only contain
337 # event information. So we need to convert the event information
338 # to LSST pseudo file names. If EventService is not enabled,
339 # this part will not change data_params.
340 ret_rep = replace_event_file(data_params, sys.argv[4])
341 ret_event_status, with_events, with_order_id_map, event_params_map = ret_rep
342 print(
343 f"ret_event_status: {ret_event_status}, "
344 f"with_events: {with_events} "
345 f"with_order_id_map: {with_order_id_map}"
346 )
347 if not ret_event_status:
348 print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names")
349 exit_code = 1
350 sys.exit(exit_code)
352 for event_param in event_params_map:
353 order_id = event_params_map[event_param]["order_id"]
354 pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id]
355 print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}")
356 cmd_line = cmd_line.replace(event_param, pseudo_file_name)
357 data_params = data_params.replace(event_param, pseudo_file_name)
359 # If job name map is enabled, data_params will only contain order_id
360 # information. Here we will convert order_id information to LSST pseudo
361 # file names.
363 data_params = data_params.split("+")
365 """Replace the pipetask command line placeholders
366 with actual data provided in the script call
367 in form placeholder1:file1+placeholder2:file2:...
368 """
369 cmd_line = replace_files_placeholders(cmd_line, sys.argv[4])
371 jobname = data_params[0]
372 if use_map_file(jobname):
373 with open("qnode_map.json", encoding="utf-8") as f:
374 qnode_map = json.load(f)
375 data_params = qnode_map[jobname].split("+")
377 for key_value_pair in data_params[1:]:
378 (key, value) = key_value_pair.split(":")
379 cmd_line = cmd_line.replace("{" + key + "}", value)
381 print("executable command line:")
382 print(cmd_line)
384 exit_status = os.system(cmd_line)
385 exit_code = 1
386 if os.WIFSIGNALED(exit_status):
387 exit_code = os.WTERMSIG(exit_status) + 128
388 elif os.WIFEXITED(exit_status):
389 exit_code = os.WEXITSTATUS(exit_status)
390 sys.exit(exit_code)