Coverage for python / lsst / ctrl / bps / panda / edgenode / cmd_line_decoder.py: 7%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:53 +0000

1#!/usr/bin/python 

2 

3# This file is part of ctrl_bps_panda. 

4# 

5# Developed for the LSST Data Management System. 

6# This product includes software developed by the LSST Project 

7# (https://www.lsst.org). 

8# See the COPYRIGHT file at the top-level directory of this distribution 

9# for details of code ownership. 

10# 

11# This software is dual licensed under the GNU General Public License and also 

12# under a 3-clause BSD license. Recipients may choose which of these licenses 

13# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

14# respectively. If you choose the GPL option then the following text applies 

15# (but note that there is still no warranty even if you opt for BSD instead): 

16# 

17# This program is free software: you can redistribute it and/or modify 

18# it under the terms of the GNU General Public License as published by 

19# the Free Software Foundation, either version 3 of the License, or 

20# (at your option) any later version. 

21# 

22# This program is distributed in the hope that it will be useful, 

23# but WITHOUT ANY WARRANTY; without even the implied warranty of 

24# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

25# GNU General Public License for more details. 

26# 

27# You should have received a copy of the GNU General Public License 

28# along with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30""" 

31Decode the command line string sent from the BPS 

32plugin -> PanDA -> Edge node cluster management 

33-> Edge node -> Container. This file is not a part 

34of the BPS but a part of the payload wrapper. 

35It decodes the hexified command line. 

36""" 

37 

38import binascii 

39import json 

40import os 

41import re 

42import sys 

43 

44from lsst.ctrl.bps.panda.utils import download_extract_archive 

45from lsst.resources import ResourcePath 

46 

47 

48def replace_placeholders(cmd_line: str, tag: str, replacements: dict[str, str]) -> str: 

49 """Replace the placeholders. 

50 

51 Parameters 

52 ---------- 

53 cmd_line : `str` 

54 Command line. 

55 tag : `str` 

56 Tag to use for finding placeholders. 

57 replacements : `dict` [`str`, `str`] 

58 Replacements indexed by place holder string. 

59 

60 Returns 

61 ------- 

62 modified : `str` 

63 Processed command line. 

64 """ 

65 occurrences_to_replace = re.findall(f"<{tag}:(.*?)>", cmd_line) 

66 for placeholder in occurrences_to_replace: 

67 if placeholder in replacements: 

68 cmd_line = cmd_line.replace(f"<{tag}:{placeholder}>", replacements[placeholder]) 

69 else: 

70 raise ValueError( 

71 "ValueError exception thrown, because " 

72 f"{placeholder} is not found in the " 

73 "replacement values and could " 

74 "not be passed to the command line" 

75 ) 

76 return cmd_line 

77 

78 

79def replace_environment_vars(cmd_line): 

80 """Replace placeholders to the actual environment variables. 

81 

82 Parameters 

83 ---------- 

84 cmd_line : `str` 

85 Command line. 

86 

87 Returns 

88 ------- 

89 cmdline: `str` 

90 Processed command line. 

91 """ 

92 environment_vars = os.environ 

93 cmd_line = replace_placeholders(cmd_line, "ENV", environment_vars) 

94 return cmd_line 

95 

96 

97def replace_files_placeholders(cmd_line, files): 

98 """Replace placeholders for files. 

99 

100 Parameters 

101 ---------- 

102 cmd_line : `str` 

103 Command line. 

104 files : `str` 

105 String with key:value pairs separated by the '+' sign. 

106 Keys contain the file type (runQgraphFile, 

107 butlerConfig, ...). 

108 Values contains file names. 

109 

110 Returns 

111 ------- 

112 cmd_line: `str` 

113 Processed command line. 

114 """ 

115 files_key_vals = files.split("+") 

116 files = {} 

117 for file in files_key_vals: 

118 file_name_placeholder, file_name = file.split(":") 

119 files[file_name_placeholder] = file_name 

120 cmd_line = replace_placeholders(cmd_line, "FILE", files) 

121 return cmd_line 

122 

123 

124def deliver_input_files(src_path, files, skip_copy): 

125 """Deliver input files needed for a job. 

126 

127 Parameters 

128 ---------- 

129 src_path : `str` 

130 URI for folder where the input files placed. 

131 files : `str` 

132 String with file names separated by the '+' sign. 

133 skip_copy : `str` 

134 String with file names separated by the '+' sign indicating which 

135 files in ``files`` should not be copied. 

136 

137 Returns 

138 ------- 

139 cmdline: `str` 

140 Processed command line. 

141 """ 

142 files = files.split("+") 

143 src_uri = ResourcePath(src_path, forceDirectory=True) 

144 

145 if "jobO" in skip_copy: 

146 download_extract_archive(skip_copy) 

147 for script in files: 

148 file_name_placeholder, file_pfn = script.split(":") 

149 os.chmod(file_pfn, 0o755) 

150 return 

151 

152 for file in files: 

153 file_name_placeholder, file_pfn = file.split(":") 

154 if file_name_placeholder not in skip_copy.split("+"): 

155 src = src_uri.join(file_pfn) 

156 base_dir = None 

157 if src.isdir(): 

158 files_to_copy = ResourcePath.findFileResources([src]) 

159 base_dir = file_pfn 

160 else: 

161 files_to_copy = [src] 

162 dest_base = ResourcePath("", forceAbsolute=True, forceDirectory=True) 

163 if base_dir: 

164 dest_base = dest_base.join(base_dir) 

165 for file_to_copy in files_to_copy: 

166 dest = dest_base.join(file_to_copy.basename()) 

167 if file_name_placeholder == "orderIdMapFilename": 

168 if not dest.exists(): 

169 dest.transfer_from(file_to_copy, transfer="copy") 

170 else: 

171 dest.transfer_from(file_to_copy, transfer="copy") 

172 print(f"copied {file_to_copy.path} to {dest.path}", file=sys.stderr) 

173 if file_name_placeholder == "job_executable": 

174 os.chmod(dest.path, 0o777) 

175 

176 

177def replace_event_file(params, files): 

178 """Replace events with node id. 

179 

180 Parameters 

181 ---------- 

182 params : `str` 

183 String with parameters separated by the '+' sign. 

184 Example params: 

185 isr:eventservice_90^10+somethingelse. This part 

186 'isr:eventservice_90^10' is the EventService parameter. 

187 isr:orderIdMap_10. This part is using order_id map file. But it 

188 is not EventService. 

189 The format for the EventService parameter for LSST is 

190 'label:eventservice_<baseid>^<localid>'. The '<localid>' should 

191 start from 1, which means the first event of the file 

192 'label:eventservice_<baseid>'. In EventService, all pseudo files 

193 for a label is recorded in the 'orderIdMapFilename' file, with 

194 a dict {'label0':{"0":"pseudo_file0", "1":..},'label1':..}. 

195 For example, for a workflow with 100 pseudo files for the 'isr' label, 

196 the dict will be {'isr': {"0": "pseudo0", "1": "pseudo_file1", 

197 "99": "pseudo_file99"}}. If we split the 100 pseudo files into 5 PanDA 

198 jobs with 20 files per PanDA job, the 5 eventservice group name will be 

199 'isr:event_service_0' for events ["0"~"19"], 'isr:event_service_20' for 

200 events ["20"~"39"], ..., and 'isr:event_service_80' for events 

201 ["80"~"99"]. The EventService param 'isr:event_service_80^5' means the 

202 5th event in the group 'isr:event_service_80', which is '80 + 5 -1=84' 

203 and will be mapped to file 'pseudo_file84'. 

204 files : `str` 

205 String with file names separated by the '+' sign. 

206 Example: 

207 orderIdMapFilename:panda_order_id_map.json+runQgraphFile:a.qgraph 

208 

209 Returns 

210 ------- 

211 ret_status: `bool` 

212 Status of this function. If eventservice is enabled but this function 

213 cannot handle it, it should return False. Otherwise it should 

214 return True. 

215 with_events: `bool` 

216 Whether there are event parameters. 

217 params_map: `dict` [`str`, `dict`] 

218 Parameter map for event information. 

219 """ 

220 ret_status = True 

221 with_events = False 

222 with_order_id_map = False 

223 files = files.split("+") 

224 file_map = {} 

225 for file in files: 

226 file_name_placeholder, file_pfn = file.split(":") 

227 file_map[file_name_placeholder] = file_pfn 

228 order_id_map_file = file_map.get("orderIdMapFilename", None) 

229 order_id_map = {} 

230 try: 

231 # The orderIdMapFilename should exist locally or copied to current 

232 # directory by deliver_input_files 

233 if order_id_map_file and os.path.exists(order_id_map_file): 

234 with open(order_id_map_file) as f: 

235 order_id_map = json.load(f) 

236 except Exception as ex: 

237 print(f"failed to load orderIdMapFilename: {ex}") 

238 

239 params_map = {} 

240 params_list = params.split("+") 

241 for param in params_list: 

242 if "eventservice_" in param: 

243 with_events = True 

244 label, event = param.split(":") 

245 event_id = event.split("_")[1] 

246 event_base_id = event_id.split("^")[0] 

247 # The original format for EventService parameter is 

248 # 'label:eventservice_<baseid>^<localid>^<numberOfEvents>', 

249 # which can have multiple events per EventService job. 

250 # However, for LSST, the '<numberOfEvents>' is always 1. 

251 # When <numberOfEvents> is 1, it will not show. So for LSST, 

252 # we will see 'label:eventservice_<baseid>^<localid>'. 

253 # However, to leave posibilities for future updates, 

254 # the line below has two splits based on '^', which is from 

255 # the original EventService parameter format. 

256 event_order = event_id.split("^")[1].split("^")[0] 

257 event_index = str(int(event_base_id) + int(event_order) - 1) 

258 if not order_id_map: 

259 print("EventSerice is enabled but order_id_map file doesn't exist.") 

260 ret_status = False 

261 break 

262 

263 if label not in order_id_map: 

264 print( 

265 f"EventSerice is enabled but label {label} doesn't in the keys" 

266 f" of order_id_map {order_id_map.keys()}" 

267 ) 

268 ret_status = False 

269 break 

270 if event_index not in order_id_map[label]: 

271 print( 

272 f"EventSerice is enabled but event_index {event_index} is not" 

273 f" in order_id_map[{label}] {order_id_map[label].keys()}" 

274 ) 

275 ret_status = False 

276 break 

277 

278 params_map[param] = {"order_id": event_index, "order_id_map": order_id_map[label]} 

279 elif "orderIdMap_" in param: 

280 with_order_id_map = True 

281 label, event = param.split(":") 

282 order_id = event.split("_")[1] 

283 if not order_id_map: 

284 print("orderIdMap is enabled but order_id_map file doesn't exist.") 

285 ret_status = False 

286 break 

287 

288 if label not in order_id_map: 

289 print( 

290 f"orderIdMap is enabled but label {label} doesn't in the keys" 

291 f" of order_id_map {order_id_map.keys()}" 

292 ) 

293 ret_status = False 

294 break 

295 if order_id not in order_id_map[label]: 

296 print( 

297 f"orderIdMap is enabled but order_id {order_id} is not" 

298 f" in order_id_map[{label}] {order_id_map[label].keys()}" 

299 ) 

300 ret_status = False 

301 break 

302 

303 params_map[param] = {"order_id": order_id, "order_id_map": order_id_map[label]} 

304 return ret_status, with_events, with_order_id_map, params_map 

305 

306 

307def use_map_file(input_file): 

308 """Check whether the input file needs to be replaced 

309 because enableQnodeMap is enabled. 

310 

311 Parameters 

312 ---------- 

313 input_file : `str` 

314 Input file either a pseudo file or job name. 

315 

316 Returns 

317 ------- 

318 use_qnode_map: `bool` 

319 Whether qnode_map is used. There is a placeholder 'PH' 

320 when enableQnodeMap is true. 

321 """ 

322 parts = input_file.split(":") 

323 use_qnode_map = len(parts) == 2 and parts[0] == "PH" 

324 return use_qnode_map 

325 

326 

327if __name__ == "__main__": 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5]) 

329 cmd_line = str(binascii.unhexlify(sys.argv[1]).decode()) 

330 data_params = sys.argv[2] 

331 cmd_line = replace_environment_vars(cmd_line) 

332 

333 print(f"cmd_line: {cmd_line}") 

334 print(f"data_params: {data_params}") 

335 

336 # If EventService is enabled, data_params will only contain 

337 # event information. So we need to convert the event information 

338 # to LSST pseudo file names. If EventService is not enabled, 

339 # this part will not change data_params. 

340 ret_rep = replace_event_file(data_params, sys.argv[4]) 

341 ret_event_status, with_events, with_order_id_map, event_params_map = ret_rep 

342 print( 

343 f"ret_event_status: {ret_event_status}, " 

344 f"with_events: {with_events} " 

345 f"with_order_id_map: {with_order_id_map}" 

346 ) 

347 if not ret_event_status: 

348 print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names") 

349 exit_code = 1 

350 sys.exit(exit_code) 

351 

352 for event_param in event_params_map: 

353 order_id = event_params_map[event_param]["order_id"] 

354 pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id] 

355 print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}") 

356 cmd_line = cmd_line.replace(event_param, pseudo_file_name) 

357 data_params = data_params.replace(event_param, pseudo_file_name) 

358 

359 # If job name map is enabled, data_params will only contain order_id 

360 # information. Here we will convert order_id information to LSST pseudo 

361 # file names. 

362 

363 data_params = data_params.split("+") 

364 

365 """Replace the pipetask command line placeholders 

366 with actual data provided in the script call 

367 in form placeholder1:file1+placeholder2:file2:... 

368 """ 

369 cmd_line = replace_files_placeholders(cmd_line, sys.argv[4]) 

370 

371 jobname = data_params[0] 

372 if use_map_file(jobname): 

373 with open("qnode_map.json", encoding="utf-8") as f: 

374 qnode_map = json.load(f) 

375 data_params = qnode_map[jobname].split("+") 

376 

377 for key_value_pair in data_params[1:]: 

378 (key, value) = key_value_pair.split(":") 

379 cmd_line = cmd_line.replace("{" + key + "}", value) 

380 

381 print("executable command line:") 

382 print(cmd_line) 

383 

384 exit_status = os.system(cmd_line) 

385 exit_code = 1 

386 if os.WIFSIGNALED(exit_status): 

387 exit_code = os.WTERMSIG(exit_status) + 128 

388 elif os.WIFEXITED(exit_status): 

389 exit_code = os.WEXITSTATUS(exit_status) 

390 sys.exit(exit_code)