Coverage for python / lsst / pipe / base / _quantumContext.py: 17%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Module defining variants for valid values used to constrain datasets in a 

29graph building query. 

30""" 

31 

32from __future__ import annotations 

33 

34__all__ = ("ExecutionResources", "QuantumContext") 

35 

36import numbers 

37import uuid 

38from collections.abc import Callable, Sequence 

39from dataclasses import dataclass 

40from typing import Any 

41 

42import astropy.units as u 

43 

44from lsst.daf.butler import ( 

45 DataCoordinate, 

46 DatasetProvenance, 

47 DatasetRef, 

48 DatasetType, 

49 DimensionUniverse, 

50 LimitedButler, 

51 Quantum, 

52) 

53from lsst.utils.introspection import get_full_type_name 

54from lsst.utils.logging import PeriodicLogger, getLogger 

55 

56from .automatic_connection_constants import LOG_OUTPUT_CONNECTION_NAME, METADATA_OUTPUT_CONNECTION_NAME 

57from .connections import DeferredDatasetRef, InputQuantizedConnection, OutputQuantizedConnection 

58from .struct import Struct 

59 

60_LOG = getLogger(__name__) 

61 

62 

63@dataclass(init=False, frozen=True) 

64class ExecutionResources: 

65 """A description of the resources available to a running quantum. 

66 

67 Parameters 

68 ---------- 

69 num_cores : `int`, optional 

70 The number of cores allocated to the task. 

71 max_mem : `~astropy.units.Quantity`, `numbers.Real`, `str`, or `None`,\ 

72 optional 

73 The amount of memory allocated to the task. Can be specified 

74 as byte-compatible `~astropy.units.Quantity`, a plain number, 

75 a string with a plain number, or a string representing a quantity. 

76 If `None` no limit is specified. 

77 default_mem_units : `astropy.units.Unit`, optional 

78 The default unit to apply when the ``max_mem`` value is given 

79 as a plain number. 

80 """ 

81 

82 num_cores: int = 1 

83 """The maximum number of cores that the task can use.""" 

84 

85 max_mem: u.Quantity | None = None 

86 """If defined, the amount of memory allocated to the task. 

87 """ 

88 

89 def __init__( 

90 self, 

91 *, 

92 num_cores: int = 1, 

93 max_mem: u.Quantity | numbers.Real | str | None = None, 

94 default_mem_units: u.Unit = u.B, 

95 ): 

96 # Create our own __init__ to allow more flexible input parameters 

97 # but with a constrained dataclass definition. 

98 if num_cores < 1: 

99 raise ValueError("The number of cores must be a positive integer") 

100 

101 object.__setattr__(self, "num_cores", num_cores) 

102 

103 mem: u.Quantity | None = None 

104 

105 if max_mem is None or isinstance(max_mem, u.Quantity): 

106 mem = max_mem 

107 elif max_mem == "": 

108 # Some command line tooling can treat no value as empty string. 

109 pass 

110 else: 

111 parsed_mem = None 

112 try: 

113 parsed_mem = float(max_mem) 

114 except ValueError: 

115 pass 

116 else: 

117 mem = parsed_mem * default_mem_units 

118 

119 if mem is None: 

120 mem = u.Quantity(max_mem) 

121 

122 if mem is not None: 

123 # Force to bytes. This also checks that we can convert to bytes. 

124 mem = mem.to(u.B) 

125 

126 object.__setattr__(self, "max_mem", mem) 

127 

128 def __deepcopy__(self, memo: Any) -> ExecutionResources: 

129 """Deep copy returns itself because the class is frozen.""" 

130 return self 

131 

132 def _reduce_kwargs(self) -> dict[str, Any]: 

133 """Return a dict of the keyword arguments that should be used 

134 by `__reduce__`. 

135 

136 This is necessary because the dataclass is defined to be keyword 

137 only and we wish the default pickling to only store a plain number 

138 for the memory allocation and not a large Quantity. 

139 

140 Returns 

141 ------- 

142 kwargs : `dict` 

143 Keyword arguments to be used when pickling. 

144 """ 

145 kwargs: dict[str, Any] = {"num_cores": self.num_cores} 

146 if self.max_mem is not None: 

147 # .value is a numpy float. Cast it to a python int since we 

148 # do not want fractional bytes. The constructor ensures that this 

149 # uses units of byte so we do not have to convert. 

150 kwargs["max_mem"] = int(self.max_mem.value) 

151 return kwargs 

152 

153 @classmethod 

154 def _unpickle_via_factory( 

155 cls: type[ExecutionResources], args: Sequence[Any], kwargs: dict[str, Any] 

156 ) -> ExecutionResources: 

157 """Unpickle something by calling a factory. 

158 

159 Allows unpickle using `__reduce__` with keyword 

160 arguments as well as positional arguments. 

161 """ 

162 return cls(**kwargs) 

163 

164 def __reduce__( 

165 self, 

166 ) -> tuple[ 

167 Callable[[Sequence[Any], dict[str, Any]], ExecutionResources], 

168 tuple[Sequence[Any], dict[str, Any]], 

169 ]: 

170 """Pickler.""" 

171 return self._unpickle_via_factory, ([], self._reduce_kwargs()) 

172 

173 

174class QuantumContext: 

175 """A Butler-like class specialized for a single quantum along with 

176 context information that can influence how the task is executed. 

177 

178 Parameters 

179 ---------- 

180 butler : `lsst.daf.butler.LimitedButler` 

181 Butler object from/to which datasets will be get/put. 

182 quantum : `lsst.daf.butler.Quantum` 

183 Quantum object that describes the datasets which will be get/put by a 

184 single execution of this node in the pipeline graph. 

185 resources : `ExecutionResources`, optional 

186 The resources allocated for executing quanta. 

187 quantum_id : `uuid.UUID` or `None`, optional 

188 The ID of the quantum being executed. Used for provenance. 

189 

190 Notes 

191 ----- 

192 A `QuantumContext` class wraps a standard butler interface and 

193 specializes it to the context of a given quantum. What this means 

194 in practice is that the only gets and puts that this class allows 

195 are DatasetRefs that are contained in the quantum. 

196 

197 In the future this class will also be used to record provenance on 

198 what was actually get and put. This is in contrast to what the 

199 preflight expects to be get and put by looking at the graph before 

200 execution. 

201 """ 

202 

203 resources: ExecutionResources 

204 

205 def __init__( 

206 self, 

207 butler: LimitedButler, 

208 quantum: Quantum, 

209 *, 

210 resources: ExecutionResources | None = None, 

211 quantum_id: uuid.UUID | None = None, 

212 ): 

213 self.quantum = quantum 

214 if resources is None: 

215 resources = ExecutionResources() 

216 self.resources = resources 

217 

218 self.allInputs = set() 

219 self.allOutputs = set() 

220 for refs in quantum.inputs.values(): 

221 for ref in refs: 

222 self.allInputs.add((ref.datasetType, ref.dataId, ref.id)) 

223 for dataset_type, refs in quantum.outputs.items(): 

224 if dataset_type.name.endswith(METADATA_OUTPUT_CONNECTION_NAME) or dataset_type.name.endswith( 

225 LOG_OUTPUT_CONNECTION_NAME 

226 ): 

227 # Don't consider log and metadata datasets to be outputs in 

228 # this context, because we don't want the task to be able to 

229 # write them itself; that's for the execution system to do. 

230 continue 

231 for ref in refs: 

232 self.allOutputs.add((ref.datasetType, ref.dataId, ref.id)) 

233 self.outputsPut: set[tuple[DatasetType, DataCoordinate, uuid.UUID]] = set() 

234 self.__butler = butler 

235 self.dataset_provenance = DatasetProvenance(quantum_id=quantum_id) 

236 

237 def _get(self, ref: DeferredDatasetRef | DatasetRef | None) -> Any: 

238 # Butler methods below will check for unresolved DatasetRefs and 

239 # raise appropriately, so no need for us to do that here. 

240 if isinstance(ref, DeferredDatasetRef): 

241 self._checkMembership(ref.datasetRef, self.allInputs) 

242 self.dataset_provenance.add_input(ref.datasetRef) 

243 return self.__butler.getDeferred(ref.datasetRef) 

244 elif ref is None: 

245 return None 

246 else: 

247 self._checkMembership(ref, self.allInputs) 

248 self.dataset_provenance.add_input(ref) 

249 return self.__butler.get(ref) 

250 

251 def _put(self, value: Any, ref: DatasetRef) -> None: 

252 """Store data in butler.""" 

253 self._checkMembership(ref, self.allOutputs) 

254 self.__butler.put(value, ref, provenance=self.dataset_provenance) 

255 self.outputsPut.add((ref.datasetType, ref.dataId, ref.id)) 

256 

257 def get( 

258 self, 

259 dataset: ( 

260 InputQuantizedConnection 

261 | list[DatasetRef | None] 

262 | list[DeferredDatasetRef | None] 

263 | DatasetRef 

264 | DeferredDatasetRef 

265 | None 

266 ), 

267 ) -> Any: 

268 """Fetch data from the butler. 

269 

270 Parameters 

271 ---------- 

272 dataset : see description 

273 This argument may either be an `InputQuantizedConnection` which 

274 describes all the inputs of a quantum, a list of 

275 `~lsst.daf.butler.DatasetRef`, or a single 

276 `~lsst.daf.butler.DatasetRef`. The function will get and return 

277 the corresponding datasets from the butler. If `None` is passed in 

278 place of a `~lsst.daf.butler.DatasetRef` then the corresponding 

279 returned object will be `None`. 

280 

281 Returns 

282 ------- 

283 return : `object` 

284 This function returns arbitrary objects fetched from the butler. 

285 The structure these objects are returned in depends on the type of 

286 the input argument. If the input dataset argument is a 

287 `InputQuantizedConnection`, then the return type will be a 

288 dictionary with keys corresponding to the attributes of the 

289 `InputQuantizedConnection` (which in turn are the attribute 

290 identifiers of the connections). If the input argument is of type 

291 `list` of `~lsst.daf.butler.DatasetRef` then the return type will 

292 be a list of objects. If the input argument is a single 

293 `~lsst.daf.butler.DatasetRef` then a single object will be 

294 returned. 

295 

296 Raises 

297 ------ 

298 ValueError 

299 Raised if a `~lsst.daf.butler.DatasetRef` is passed to get that is 

300 not defined in the quantum object 

301 """ 

302 # Set up a periodic logger so log messages can be issued if things 

303 # are taking too long. 

304 periodic = PeriodicLogger(_LOG) 

305 

306 if isinstance(dataset, InputQuantizedConnection): 

307 retVal = {} 

308 n_connections = len(dataset) 

309 n_retrieved = 0 

310 for i, (name, ref) in enumerate(dataset): 

311 if isinstance(ref, list | tuple): 

312 val = [] 

313 n_refs = len(ref) 

314 for j, r in enumerate(ref): 

315 val.append(self._get(r)) 

316 n_retrieved += 1 

317 periodic.log( 

318 "Retrieved %d out of %d datasets for connection '%s' (%d out of %d)", 

319 j + 1, 

320 n_refs, 

321 name, 

322 i + 1, 

323 n_connections, 

324 ) 

325 else: 

326 val = self._get(ref) 

327 periodic.log( 

328 "Retrieved dataset for connection '%s' (%d out of %d)", 

329 name, 

330 i + 1, 

331 n_connections, 

332 ) 

333 n_retrieved += 1 

334 retVal[name] = val 

335 if periodic.num_issued > 0: 

336 # This took long enough that we issued some periodic log 

337 # messages, so issue a final confirmation message as well. 

338 _LOG.verbose( 

339 "Completed retrieval of %d datasets from %d connections", n_retrieved, n_connections 

340 ) 

341 return retVal 

342 elif isinstance(dataset, list | tuple): 

343 n_datasets = len(dataset) 

344 retrieved = [] 

345 for i, x in enumerate(dataset): 

346 # Mypy is not sure of the type of x because of the union 

347 # of lists so complains. Ignoring it is more efficient 

348 # than adding an isinstance assert. 

349 retrieved.append(self._get(x)) 

350 periodic.log("Retrieved %d out of %d datasets", i + 1, n_datasets) 

351 if periodic.num_issued > 0: 

352 _LOG.verbose("Completed retrieval of %d datasets", n_datasets) 

353 return retrieved 

354 elif isinstance(dataset, DatasetRef | DeferredDatasetRef) or dataset is None: 

355 return self._get(dataset) 

356 else: 

357 raise TypeError( 

358 f"Dataset argument ({get_full_type_name(dataset)}) is not a type that can be used to get" 

359 ) 

360 

361 def put( 

362 self, 

363 values: Struct | list[Any] | Any, 

364 dataset: OutputQuantizedConnection | list[DatasetRef] | DatasetRef, 

365 ) -> None: 

366 """Put data into the butler. 

367 

368 Parameters 

369 ---------- 

370 values : `Struct` or `list` of `object` or `object` 

371 The data that should be put with the butler. If the type of the 

372 dataset is `OutputQuantizedConnection` then this argument should be 

373 a `Struct` with corresponding attribute names. Each attribute 

374 should then correspond to either a list of object or a single 

375 object depending of the type of the corresponding attribute on 

376 dataset. I.e. if ``dataset.calexp`` is 

377 ``[datasetRef1, datasetRef2]`` then ``values.calexp`` should be 

378 ``[calexp1, calexp2]``. Like wise if there is a single ref, then 

379 only a single object need be passed. The same restriction applies 

380 if dataset is directly a `list` of `~lsst.daf.butler.DatasetRef` 

381 or a single `~lsst.daf.butler.DatasetRef`. If ``values.NAME`` is 

382 None, no output is written. 

383 dataset : `OutputQuantizedConnection` or `list` \ 

384 [`lsst.daf.butler.DatasetRef`] or `lsst.daf.butler.DatasetRef` 

385 This argument may either be an `InputQuantizedConnection` which 

386 describes all the inputs of a quantum, a list of 

387 `lsst.daf.butler.DatasetRef`, or a single 

388 `lsst.daf.butler.DatasetRef`. The function will get and return 

389 the corresponding datasets from the butler. 

390 

391 Raises 

392 ------ 

393 ValueError 

394 Raised if a `~lsst.daf.butler.DatasetRef` is passed to put that is 

395 not defined in the `~lsst.daf.butler.Quantum` object, or the type 

396 of values does not match what is expected from the type of dataset. 

397 """ 

398 if isinstance(dataset, OutputQuantizedConnection): 

399 if not isinstance(values, Struct): 

400 raise ValueError( 

401 "dataset is a OutputQuantizedConnection, a Struct with corresponding" 

402 " attributes must be passed as the values to put" 

403 ) 

404 for name, refs in dataset: 

405 if (valuesAttribute := getattr(values, name, None)) is None: 

406 continue 

407 if isinstance(refs, list | tuple): 

408 if len(refs) != len(valuesAttribute): 

409 raise ValueError(f"There must be a object to put for every Dataset ref in {name}") 

410 for i, ref in enumerate(refs): 

411 self._put(valuesAttribute[i], ref) 

412 else: 

413 self._put(valuesAttribute, refs) 

414 elif isinstance(dataset, list | tuple): 

415 if not isinstance(values, Sequence): 

416 raise ValueError("Values to put must be a sequence") 

417 if len(dataset) != len(values): 

418 raise ValueError("There must be a common number of references and values to put") 

419 for i, ref in enumerate(dataset): 

420 self._put(values[i], ref) 

421 elif isinstance(dataset, DatasetRef): 

422 self._put(values, dataset) 

423 else: 

424 raise TypeError("Dataset argument is not a type that can be used to put") 

425 

426 def _checkMembership(self, ref: list[DatasetRef] | DatasetRef, inout: set) -> None: 

427 """Check if a `~lsst.daf.butler.DatasetRef` is part of the input 

428 `~lsst.daf.butler.Quantum`. 

429 

430 This function will raise an exception if the `QuantumContext` is 

431 used to get/put a `~lsst.daf.butler.DatasetRef` which is not defined 

432 in the quantum. 

433 

434 Parameters 

435 ---------- 

436 ref : `list` [ `~lsst.daf.butler.DatasetRef` ] or \ 

437 `~lsst.daf.butler.DatasetRef` 

438 Either a `list` or a single `~lsst.daf.butler.DatasetRef` to check 

439 inout : `set` 

440 The connection type to check, e.g. either an input or an output. 

441 This prevents both types needing to be checked for every operation, 

442 which may be important for Quanta with lots of 

443 `~lsst.daf.butler.DatasetRef`. 

444 """ 

445 if not isinstance(ref, list | tuple): 

446 ref = [ref] 

447 for r in ref: 

448 if (r.datasetType, r.dataId, r.id) not in inout: 

449 raise ValueError("DatasetRef is not part of the Quantum being processed") 

450 

451 @property 

452 def dimensions(self) -> DimensionUniverse: 

453 """Structure managing all dimensions recognized by this data 

454 repository (`~lsst.daf.butler.DimensionUniverse`). 

455 """ 

456 return self.__butler.dimensions 

457 

458 def add_additional_provenance(self, ref: DatasetRef, extra: dict[str, int | float | str | bool]) -> None: 

459 """Add additional provenance information to the dataset provenance. 

460 

461 Parameters 

462 ---------- 

463 ref : `lsst.daf.butler.DatasetRef` 

464 The dataset to attach provenance to. This dataset must have been 

465 retrieved by this quantum context. 

466 extra : `dict` [ `str`, `int` | `float` | `str` | `bool` ] 

467 Additional information to attach as provenance information. Keys 

468 must be strings and values must be simple scalars. 

469 """ 

470 self.dataset_provenance.add_extra_provenance(ref.id, extra)