Coverage for python / lsst / ctrl / bps / bps_utils.py: 18%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Misc supporting classes and functions for BPS."""
30__all__ = [
31 "_dump_env_info",
32 "_dump_pkg_info",
33 "_make_id_link",
34 "bps_eval",
35 "chdir",
36 "create_count_summary",
37 "create_job_quantum_graph_filename",
38 "mkdir",
39 "parse_count_summary",
40 "save_qg_subgraph",
41 "subset_dimension_values",
42]
44import contextlib
45import dataclasses
46import errno
47import logging
48import os
49from collections import Counter
50from enum import Enum
51from pathlib import Path
52from typing import Any
54import yaml
56from lsst.utils import doImport
57from lsst.utils.packages import Packages
59_LOG = logging.getLogger(__name__)
62class WhenToSaveQuantumGraphs(Enum):
63 """Values for when to save the job quantum graphs."""
65 QGRAPH = 1 # Must be using single_quantum_clustering algorithm.
66 TRANSFORM = 2
67 PREPARE = 3
68 SUBMIT = 4
69 NEVER = 5 # Always use full QuantumGraph.
72@contextlib.contextmanager
73def chdir(path):
74 """Change working directory.
76 A chdir function that can be used inside a context.
78 Parameters
79 ----------
80 path : `str` or `pathlib.Path`
81 Path to be made current working directory.
82 """
83 cur_dir = os.getcwd()
84 os.chdir(path)
85 try:
86 yield
87 finally:
88 os.chdir(cur_dir)
91def mkdir(path: str) -> Path:
92 """Create a new directory at this given path.
94 Parameters
95 ----------
96 path : `str`
97 A string representing the path to create.
99 Returns
100 -------
101 path: `pathlib.Path`
102 The object representing the created directory.
104 Raises
105 ------
106 OSError
107 Raised if any issues were encountered during the attempt to create the
108 directory. Depending on the system error code a specific subclass of
109 ``OSError`` will get raised. For example, the function will raise
110 ``PermissionError`` when trying to create a directory at the location
111 it has no adequate access rights.
112 """
113 path = Path(path)
114 try:
115 path.mkdir(parents=True, exist_ok=False)
116 except OSError as exc:
117 if exc.errno == errno.EEXIST:
118 reason = "directory already exists"
119 else:
120 reason = exc.strerror
121 raise type(exc)(f"cannot create directory '{path}': {reason}") from None
122 return path
125def create_job_quantum_graph_filename(config, job, out_prefix=None):
126 """Create a filename to be used when storing the QuantumGraph for a job.
128 Parameters
129 ----------
130 config : `lsst.ctrl.bps.BpsConfig`
131 BPS configuration.
132 job : `lsst.ctrl.bps.GenericWorkflowJob`
133 Job for which the QuantumGraph file is being saved.
134 out_prefix : `str`, optional
135 Path prefix for the QuantumGraph filename. If no out_prefix is given,
136 uses current working directory.
138 Returns
139 -------
140 full_filename : `str`
141 The filename for the job's QuantumGraph.
142 """
143 curvals = dataclasses.asdict(job)
144 if job.tags:
145 curvals.update(job.tags)
146 found, subdir = config.search("subDirTemplate", opt={"curvals": curvals})
147 if not found:
148 subdir = "{job.label}"
149 full_filename = Path("inputs") / subdir / f"quantum_{job.name}.qg"
151 if out_prefix is not None:
152 full_filename = Path(out_prefix) / full_filename
154 return str(full_filename)
157def save_qg_subgraph(qgraph, out_filename, node_ids=None):
158 """Save subgraph to file.
160 Parameters
161 ----------
162 qgraph : `lsst.pipe.base.QuantumGraph`
163 QuantumGraph to save.
164 out_filename : `str`
165 Name of the output file.
166 node_ids : `list` [`lsst.pipe.base.NodeId`]
167 NodeIds for the subgraph to save to file.
168 """
169 if not os.path.exists(out_filename):
170 _LOG.debug("Saving QuantumGraph with %d nodes to %s", len(qgraph), out_filename)
171 if node_ids is None:
172 qgraph.saveUri(out_filename)
173 else:
174 qgraph.subset(qgraph.getQuantumNodeByNodeId(nid) for nid in node_ids).saveUri(out_filename)
175 else:
176 _LOG.debug("Skipping saving QuantumGraph to %s because already exists.", out_filename)
179def create_count_summary(counts):
180 """Create summary from count mapping.
182 Parameters
183 ----------
184 counts : `collections.Counter` or `dict` [`str`, `int`]
185 Mapping of counts to keys.
187 Returns
188 -------
189 summary : `str`
190 Semi-colon delimited string of key:count pairs.
191 (e.g. "key1:cnt1;key2;cnt2") Parsable by
192 parse_count_summary().
193 """
194 summary = ""
195 if isinstance(counts, dict):
196 summary = ";".join([f"{key}:{counts[key]}" for key in counts])
197 return summary
200def parse_count_summary(summary):
201 """Parse summary into count mapping.
203 Parameters
204 ----------
205 summary : `str`
206 Semi-colon delimited string of key:count pairs.
208 Returns
209 -------
210 counts : `collections.Counter`
211 Mapping representation of given summary for easier
212 individual count lookup.
213 """
214 counts = Counter()
215 for part in summary.split(";"):
216 label, count = part.split(":")
217 counts[label] = count
218 return counts
221def _dump_pkg_info(filename):
222 """Save information about versions of packages in use for future reference.
224 Parameters
225 ----------
226 filename : `str`
227 The name of the file where to save the information about the versions
228 of the packages.
229 """
230 file = Path(filename)
231 if file.suffix.lower() not in {".yaml", ".yml"}:
232 file = file.with_suffix(f"{file.suffix}.yaml")
233 packages = Packages.fromSystem()
234 packages.write(str(file))
237def _dump_env_info(filename):
238 """Save information about runtime environment for future reference.
240 Parameters
241 ----------
242 filename : `str`
243 The name of the file where to save the information about the runtime
244 environment.
245 """
246 file = Path(filename)
247 if file.suffix.lower() not in {".yaml", ".yml"}:
248 file = file.with_suffix(f"{file.suffix}.yaml")
249 with open(file, "w", encoding="utf-8") as fh:
250 yaml.dump(dict(os.environ), fh)
253def _make_id_link(config, run_id):
254 """Make id softlink to the submit run directory if makeIdLink
255 is true.
257 Parameters
258 ----------
259 config : `lsst.ctrl.bps.BpsConfig`
260 BPS configuration.
261 run_id : `str`
262 WMS run ID.
263 """
264 _, make_id_link = config.search("makeIdLink")
265 if make_id_link:
266 if run_id is None:
267 _LOG.info("Run ID is None. Skipping making id link.")
268 else:
269 found, submit_path = config.search("submitPath")
270 # pathlib.Path.symlink_to() does not care if target exists
271 # so we check it ourselves.
272 if found and Path(submit_path).exists():
273 _, id_link_path = config.search("idLinkPath")
274 _LOG.debug("submit_path=%s, id_link_path=%s", submit_path, id_link_path)
275 id_link_path = Path(id_link_path)
276 id_link_path = id_link_path / f"{run_id}"
277 _LOG.debug("submit_path=%s, id_link_path=%s", submit_path, id_link_path)
278 if (
279 id_link_path.exists()
280 and id_link_path.is_symlink()
281 and str(id_link_path.readlink()) == submit_path
282 ):
283 _LOG.debug("Correct softlink already exists (%s)", id_link_path)
284 else:
285 _LOG.debug("Softlink doesn't already exist (%s)", id_link_path)
286 try:
287 id_link_path.parent.mkdir(parents=True, exist_ok=True)
288 id_link_path.symlink_to(submit_path)
289 _LOG.info("Made id softlink: %s", id_link_path)
290 except (OSError, FileExistsError, PermissionError) as exc:
291 _LOG.warning("Could not make id softlink: %s", exc)
292 else:
293 _LOG.warning("Could not make id softlink: submitPath does not exist (%s)", submit_path)
294 else:
295 _LOG.debug("Not asked to make id link (makeIdLink=%s)", make_id_link)
298def subset_dimension_values(
299 desc_what: str,
300 desc_for: str,
301 subset_dim_names: str,
302 dimension_values: dict[str, Any],
303 equal_dims: str | None,
304) -> dict[str, Any]:
305 """Return subset of given dimension_values and handle any equal dimensions.
307 Parameters
308 ----------
309 desc_what : `str`
310 Description of what has the dimensions values to be used in debugging
311 or error messages.
312 desc_for : `str`
313 Description of what the subset is for to be used in debugging or
314 error messages.
315 subset_dim_names : `str`
316 Comma-separated list of dimension names used to make subset.
317 dimension_values : `dict` [`str`, `~typing.Any`]
318 Superset of dimension values from which to make subset.
319 equal_dims : `str`, optional
320 Description of any dimensions to be considered equal,
321 e.g., "dim1:dim2,dim3:dim4".
323 Returns
324 -------
325 dim_values_subset : `dict` [`str`, `~typing.Any`]
326 Subset of given dimension values.
328 Raises
329 ------
330 KeyError
331 If any wanted dimensions aren't in given values.
332 """
333 dim_names = [d.strip() for d in subset_dim_names.split(",")]
335 missing_dims = set()
336 dim_values = {}
337 for dim_name in dim_names:
338 _LOG.debug("%s, %s: dim_name = %s", desc_what, desc_for, dim_name)
339 if dim_name in dimension_values:
340 dim_values[dim_name] = dimension_values[dim_name]
341 else:
342 missing_dims.add(dim_name)
343 if equal_dims:
344 for pair in [pt.strip() for pt in equal_dims.split(",")]:
345 dim1, dim2 = pair.strip().split(":")
346 if dim1 in dim_names and dim2 in dimension_values:
347 dim_values[dim1] = dimension_values[dim2]
348 missing_dims.remove(dim1)
349 elif dim2 in dim_names and dim1 in dimension_values:
350 dim_values[dim2] = dimension_values[dim1]
351 missing_dims.remove(dim2)
353 if missing_dims:
354 raise KeyError(
355 f"{desc_what} missing dimensions ({', '.join(sorted(missing_dims))}) required for {desc_for}"
356 )
357 return dim_values
360def bps_eval(func: str, args: str) -> Any:
361 """Evaluate user provided expression/function.
363 Parameters
364 ----------
365 func : `str`
366 Importable string or built-in function name.
367 args : `str`
368 Parameters to pass to the function.
370 Returns
371 -------
372 results : `~typing.Any`
373 Results of running eval.
375 Raises
376 ------
377 ImportError
378 If problems importing.
379 """
380 if "." in func:
381 genfunc = doImport(func) # noqa: F841
382 func_reference = "genfunc"
383 else:
384 func_reference = func
385 eval_str = f"{func_reference}({args})"
386 _LOG.debug("String passed to eval: '%s'", eval_str)
387 results = eval(eval_str)
389 return results