Coverage for python / lsst / ctrl / bps / bps_utils.py: 18%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:53 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Misc supporting classes and functions for BPS.""" 

29 

30__all__ = [ 

31 "_dump_env_info", 

32 "_dump_pkg_info", 

33 "_make_id_link", 

34 "bps_eval", 

35 "chdir", 

36 "create_count_summary", 

37 "create_job_quantum_graph_filename", 

38 "mkdir", 

39 "parse_count_summary", 

40 "save_qg_subgraph", 

41 "subset_dimension_values", 

42] 

43 

44import contextlib 

45import dataclasses 

46import errno 

47import logging 

48import os 

49from collections import Counter 

50from enum import Enum 

51from pathlib import Path 

52from typing import Any 

53 

54import yaml 

55 

56from lsst.utils import doImport 

57from lsst.utils.packages import Packages 

58 

59_LOG = logging.getLogger(__name__) 

60 

61 

62class WhenToSaveQuantumGraphs(Enum): 

63 """Values for when to save the job quantum graphs.""" 

64 

65 QGRAPH = 1 # Must be using single_quantum_clustering algorithm. 

66 TRANSFORM = 2 

67 PREPARE = 3 

68 SUBMIT = 4 

69 NEVER = 5 # Always use full QuantumGraph. 

70 

71 

72@contextlib.contextmanager 

73def chdir(path): 

74 """Change working directory. 

75 

76 A chdir function that can be used inside a context. 

77 

78 Parameters 

79 ---------- 

80 path : `str` or `pathlib.Path` 

81 Path to be made current working directory. 

82 """ 

83 cur_dir = os.getcwd() 

84 os.chdir(path) 

85 try: 

86 yield 

87 finally: 

88 os.chdir(cur_dir) 

89 

90 

91def mkdir(path: str) -> Path: 

92 """Create a new directory at this given path. 

93 

94 Parameters 

95 ---------- 

96 path : `str` 

97 A string representing the path to create. 

98 

99 Returns 

100 ------- 

101 path: `pathlib.Path` 

102 The object representing the created directory. 

103 

104 Raises 

105 ------ 

106 OSError 

107 Raised if any issues were encountered during the attempt to create the 

108 directory. Depending on the system error code a specific subclass of 

109 ``OSError`` will get raised. For example, the function will raise 

110 ``PermissionError`` when trying to create a directory at the location 

111 it has no adequate access rights. 

112 """ 

113 path = Path(path) 

114 try: 

115 path.mkdir(parents=True, exist_ok=False) 

116 except OSError as exc: 

117 if exc.errno == errno.EEXIST: 

118 reason = "directory already exists" 

119 else: 

120 reason = exc.strerror 

121 raise type(exc)(f"cannot create directory '{path}': {reason}") from None 

122 return path 

123 

124 

125def create_job_quantum_graph_filename(config, job, out_prefix=None): 

126 """Create a filename to be used when storing the QuantumGraph for a job. 

127 

128 Parameters 

129 ---------- 

130 config : `lsst.ctrl.bps.BpsConfig` 

131 BPS configuration. 

132 job : `lsst.ctrl.bps.GenericWorkflowJob` 

133 Job for which the QuantumGraph file is being saved. 

134 out_prefix : `str`, optional 

135 Path prefix for the QuantumGraph filename. If no out_prefix is given, 

136 uses current working directory. 

137 

138 Returns 

139 ------- 

140 full_filename : `str` 

141 The filename for the job's QuantumGraph. 

142 """ 

143 curvals = dataclasses.asdict(job) 

144 if job.tags: 

145 curvals.update(job.tags) 

146 found, subdir = config.search("subDirTemplate", opt={"curvals": curvals}) 

147 if not found: 

148 subdir = "{job.label}" 

149 full_filename = Path("inputs") / subdir / f"quantum_{job.name}.qg" 

150 

151 if out_prefix is not None: 

152 full_filename = Path(out_prefix) / full_filename 

153 

154 return str(full_filename) 

155 

156 

157def save_qg_subgraph(qgraph, out_filename, node_ids=None): 

158 """Save subgraph to file. 

159 

160 Parameters 

161 ---------- 

162 qgraph : `lsst.pipe.base.QuantumGraph` 

163 QuantumGraph to save. 

164 out_filename : `str` 

165 Name of the output file. 

166 node_ids : `list` [`lsst.pipe.base.NodeId`] 

167 NodeIds for the subgraph to save to file. 

168 """ 

169 if not os.path.exists(out_filename): 

170 _LOG.debug("Saving QuantumGraph with %d nodes to %s", len(qgraph), out_filename) 

171 if node_ids is None: 

172 qgraph.saveUri(out_filename) 

173 else: 

174 qgraph.subset(qgraph.getQuantumNodeByNodeId(nid) for nid in node_ids).saveUri(out_filename) 

175 else: 

176 _LOG.debug("Skipping saving QuantumGraph to %s because already exists.", out_filename) 

177 

178 

179def create_count_summary(counts): 

180 """Create summary from count mapping. 

181 

182 Parameters 

183 ---------- 

184 counts : `collections.Counter` or `dict` [`str`, `int`] 

185 Mapping of counts to keys. 

186 

187 Returns 

188 ------- 

189 summary : `str` 

190 Semi-colon delimited string of key:count pairs. 

191 (e.g. "key1:cnt1;key2;cnt2") Parsable by 

192 parse_count_summary(). 

193 """ 

194 summary = "" 

195 if isinstance(counts, dict): 

196 summary = ";".join([f"{key}:{counts[key]}" for key in counts]) 

197 return summary 

198 

199 

200def parse_count_summary(summary): 

201 """Parse summary into count mapping. 

202 

203 Parameters 

204 ---------- 

205 summary : `str` 

206 Semi-colon delimited string of key:count pairs. 

207 

208 Returns 

209 ------- 

210 counts : `collections.Counter` 

211 Mapping representation of given summary for easier 

212 individual count lookup. 

213 """ 

214 counts = Counter() 

215 for part in summary.split(";"): 

216 label, count = part.split(":") 

217 counts[label] = count 

218 return counts 

219 

220 

221def _dump_pkg_info(filename): 

222 """Save information about versions of packages in use for future reference. 

223 

224 Parameters 

225 ---------- 

226 filename : `str` 

227 The name of the file where to save the information about the versions 

228 of the packages. 

229 """ 

230 file = Path(filename) 

231 if file.suffix.lower() not in {".yaml", ".yml"}: 

232 file = file.with_suffix(f"{file.suffix}.yaml") 

233 packages = Packages.fromSystem() 

234 packages.write(str(file)) 

235 

236 

237def _dump_env_info(filename): 

238 """Save information about runtime environment for future reference. 

239 

240 Parameters 

241 ---------- 

242 filename : `str` 

243 The name of the file where to save the information about the runtime 

244 environment. 

245 """ 

246 file = Path(filename) 

247 if file.suffix.lower() not in {".yaml", ".yml"}: 

248 file = file.with_suffix(f"{file.suffix}.yaml") 

249 with open(file, "w", encoding="utf-8") as fh: 

250 yaml.dump(dict(os.environ), fh) 

251 

252 

253def _make_id_link(config, run_id): 

254 """Make id softlink to the submit run directory if makeIdLink 

255 is true. 

256 

257 Parameters 

258 ---------- 

259 config : `lsst.ctrl.bps.BpsConfig` 

260 BPS configuration. 

261 run_id : `str` 

262 WMS run ID. 

263 """ 

264 _, make_id_link = config.search("makeIdLink") 

265 if make_id_link: 

266 if run_id is None: 

267 _LOG.info("Run ID is None. Skipping making id link.") 

268 else: 

269 found, submit_path = config.search("submitPath") 

270 # pathlib.Path.symlink_to() does not care if target exists 

271 # so we check it ourselves. 

272 if found and Path(submit_path).exists(): 

273 _, id_link_path = config.search("idLinkPath") 

274 _LOG.debug("submit_path=%s, id_link_path=%s", submit_path, id_link_path) 

275 id_link_path = Path(id_link_path) 

276 id_link_path = id_link_path / f"{run_id}" 

277 _LOG.debug("submit_path=%s, id_link_path=%s", submit_path, id_link_path) 

278 if ( 

279 id_link_path.exists() 

280 and id_link_path.is_symlink() 

281 and str(id_link_path.readlink()) == submit_path 

282 ): 

283 _LOG.debug("Correct softlink already exists (%s)", id_link_path) 

284 else: 

285 _LOG.debug("Softlink doesn't already exist (%s)", id_link_path) 

286 try: 

287 id_link_path.parent.mkdir(parents=True, exist_ok=True) 

288 id_link_path.symlink_to(submit_path) 

289 _LOG.info("Made id softlink: %s", id_link_path) 

290 except (OSError, FileExistsError, PermissionError) as exc: 

291 _LOG.warning("Could not make id softlink: %s", exc) 

292 else: 

293 _LOG.warning("Could not make id softlink: submitPath does not exist (%s)", submit_path) 

294 else: 

295 _LOG.debug("Not asked to make id link (makeIdLink=%s)", make_id_link) 

296 

297 

298def subset_dimension_values( 

299 desc_what: str, 

300 desc_for: str, 

301 subset_dim_names: str, 

302 dimension_values: dict[str, Any], 

303 equal_dims: str | None, 

304) -> dict[str, Any]: 

305 """Return subset of given dimension_values and handle any equal dimensions. 

306 

307 Parameters 

308 ---------- 

309 desc_what : `str` 

310 Description of what has the dimensions values to be used in debugging 

311 or error messages. 

312 desc_for : `str` 

313 Description of what the subset is for to be used in debugging or 

314 error messages. 

315 subset_dim_names : `str` 

316 Comma-separated list of dimension names used to make subset. 

317 dimension_values : `dict` [`str`, `~typing.Any`] 

318 Superset of dimension values from which to make subset. 

319 equal_dims : `str`, optional 

320 Description of any dimensions to be considered equal, 

321 e.g., "dim1:dim2,dim3:dim4". 

322 

323 Returns 

324 ------- 

325 dim_values_subset : `dict` [`str`, `~typing.Any`] 

326 Subset of given dimension values. 

327 

328 Raises 

329 ------ 

330 KeyError 

331 If any wanted dimensions aren't in given values. 

332 """ 

333 dim_names = [d.strip() for d in subset_dim_names.split(",")] 

334 

335 missing_dims = set() 

336 dim_values = {} 

337 for dim_name in dim_names: 

338 _LOG.debug("%s, %s: dim_name = %s", desc_what, desc_for, dim_name) 

339 if dim_name in dimension_values: 

340 dim_values[dim_name] = dimension_values[dim_name] 

341 else: 

342 missing_dims.add(dim_name) 

343 if equal_dims: 

344 for pair in [pt.strip() for pt in equal_dims.split(",")]: 

345 dim1, dim2 = pair.strip().split(":") 

346 if dim1 in dim_names and dim2 in dimension_values: 

347 dim_values[dim1] = dimension_values[dim2] 

348 missing_dims.remove(dim1) 

349 elif dim2 in dim_names and dim1 in dimension_values: 

350 dim_values[dim2] = dimension_values[dim1] 

351 missing_dims.remove(dim2) 

352 

353 if missing_dims: 

354 raise KeyError( 

355 f"{desc_what} missing dimensions ({', '.join(sorted(missing_dims))}) required for {desc_for}" 

356 ) 

357 return dim_values 

358 

359 

360def bps_eval(func: str, args: str) -> Any: 

361 """Evaluate user provided expression/function. 

362 

363 Parameters 

364 ---------- 

365 func : `str` 

366 Importable string or built-in function name. 

367 args : `str` 

368 Parameters to pass to the function. 

369 

370 Returns 

371 ------- 

372 results : `~typing.Any` 

373 Results of running eval. 

374 

375 Raises 

376 ------ 

377 ImportError 

378 If problems importing. 

379 """ 

380 if "." in func: 

381 genfunc = doImport(func) # noqa: F841 

382 func_reference = "genfunc" 

383 else: 

384 func_reference = func 

385 eval_str = f"{func_reference}({args})" 

386 _LOG.debug("String passed to eval: '%s'", eval_str) 

387 results = eval(eval_str) 

388 

389 return results