Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _scanner.py: 32%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("Scanner",) 

31 

32import dataclasses 

33import itertools 

34import uuid 

35from contextlib import AbstractContextManager 

36from typing import Any, Literal, Self 

37 

38import zstandard 

39 

40from lsst.daf.butler import ButlerLogRecords, DatasetRef, QuantumBackedButler 

41 

42from ... import automatic_connection_constants as acc 

43from ..._task_metadata import TaskMetadata 

44from ...pipeline_graph import PipelineGraph, TaskImportMode 

45from .._multiblock import Compressor 

46from .._predicted import ( 

47 PredictedDatasetModel, 

48 PredictedQuantumDatasetsModel, 

49 PredictedQuantumGraphReader, 

50) 

51from .._provenance import ProvenanceQuantumScanModels, ProvenanceQuantumScanStatus 

52from ._communicators import ScannerCommunicator 

53from ._structs import IngestRequest, ScanReport 

54 

55 

56@dataclasses.dataclass 

57class Scanner(AbstractContextManager): 

58 """A helper class for the provenance aggregator that reads metadata and log 

59 files and scans for which outputs exist. 

60 """ 

61 

62 predicted_path: str 

63 """Path to the predicted quantum graph.""" 

64 

65 butler_path: str 

66 """Path or alias to the central butler repository.""" 

67 

68 comms: ScannerCommunicator 

69 """Communicator object for this worker.""" 

70 

71 reader: PredictedQuantumGraphReader = dataclasses.field(init=False) 

72 """Reader for the predicted quantum graph.""" 

73 

74 qbb: QuantumBackedButler = dataclasses.field(init=False) 

75 """A quantum-backed butler used for log and metadata reads and existence 

76 checks for other outputs (when necessary). 

77 """ 

78 

79 compressor: Compressor | None = None 

80 """Object used to compress JSON blocks. 

81 

82 This is `None` until a compression dictionary is received from the writer 

83 process. 

84 """ 

85 

86 init_quanta: dict[uuid.UUID, PredictedQuantumDatasetsModel] = dataclasses.field(init=False) 

87 """Dictionary mapping init quantum IDs to their predicted models.""" 

88 

89 def __post_init__(self) -> None: 

90 if self.comms.config.mock_storage_classes: 

91 import lsst.pipe.base.tests.mocks # noqa: F401 

92 self.comms.log.verbose("Reading from predicted quantum graph.") 

93 self.reader = self.comms.exit_stack.enter_context( 

94 PredictedQuantumGraphReader.open(self.predicted_path, import_mode=TaskImportMode.DO_NOT_IMPORT) 

95 ) 

96 self.reader.read_dimension_data() 

97 self.reader.read_init_quanta() 

98 self.comms.log.verbose("Initializing quantum-backed butler.") 

99 self.qbb = self.make_qbb(self.butler_path, self.reader.pipeline_graph) 

100 self.init_quanta = {q.quantum_id: q for q in self.reader.components.init_quanta.root} 

101 

102 def __enter__(self) -> Self: 

103 return self 

104 

105 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: 

106 try: 

107 self.qbb.close() 

108 except Exception: 

109 self.comms.log.exception("An exception occurred during Ingester exit") 

110 return False 

111 

112 @staticmethod 

113 def make_qbb(butler_config: str, pipeline_graph: PipelineGraph) -> QuantumBackedButler: 

114 """Make quantum-backed butler that can operate on the outputs of the 

115 quantum graph. 

116 

117 Parameters 

118 ---------- 

119 butler_config : `str` 

120 Path or alias for the central butler repository that shares storage 

121 with the quantum-backed butler. 

122 pipeline_graph : `..pipeline_graph.PipelineGraph` 

123 Graph of tasks and dataset types. 

124 

125 Returns 

126 ------- 

127 qbb : `lsst.daf.butler.QuantumBackedButler` 

128 Quantum-backed butler. This does not have the datastore records 

129 needed to read overall-inputs. 

130 """ 

131 return QuantumBackedButler.from_predicted( 

132 butler_config, 

133 predicted_inputs=[], 

134 predicted_outputs=[], 

135 dimensions=pipeline_graph.universe, 

136 # We don't need the datastore records in the QG because we're 

137 # only going to read metadata and logs, and those are never 

138 # overall inputs. 

139 datastore_records={}, 

140 dataset_types={node.name: node.dataset_type for node in pipeline_graph.dataset_types.values()}, 

141 ) 

142 

143 @property 

144 def pipeline_graph(self) -> PipelineGraph: 

145 """Graph of tasks and dataset types.""" 

146 return self.reader.pipeline_graph 

147 

148 @staticmethod 

149 def run(predicted_path: str, butler_path: str, comms: ScannerCommunicator) -> None: 

150 """Run the scanner. 

151 

152 Parameters 

153 ---------- 

154 predicted_path : `str` 

155 Path to the predicted quantum graph. 

156 butler_path : `str` 

157 Path or alias to the central butler repository. 

158 comms : `ScannerCommunicator` 

159 Communicator for the scanner. 

160 

161 Notes 

162 ----- 

163 This method is designed to run as the ``target`` in 

164 `WorkerFactory.make_worker`. 

165 """ 

166 with comms, Scanner(predicted_path, butler_path, comms) as scanner: 

167 scanner.loop() 

168 

169 def loop(self) -> None: 

170 """Run the main loop for the scanner.""" 

171 self.comms.log.info("Scan request loop beginning.") 

172 for quantum_id in self.comms.poll(): 

173 if self.compressor is None and (cdict_data := self.comms.get_compression_dict()) is not None: 

174 self.compressor = zstandard.ZstdCompressor( 

175 self.comms.config.zstd_level, zstandard.ZstdCompressionDict(cdict_data) 

176 ) 

177 self.scan_quantum(quantum_id) 

178 

179 def scan_dataset(self, predicted: PredictedDatasetModel) -> bool: 

180 """Scan for a dataset's existence. 

181 

182 Parameters 

183 ---------- 

184 predicted : `.PredictedDatasetModel` 

185 Information about the dataset from the predicted graph. 

186 

187 Returns 

188 ------- 

189 exists : `bool`` 

190 Whether the dataset exists. 

191 """ 

192 ref = self.reader.components.make_dataset_ref(predicted) 

193 return self.qbb.stored(ref) 

194 

195 def scan_quantum(self, quantum_id: uuid.UUID) -> ProvenanceQuantumScanModels: 

196 """Scan for a quantum's completion and error status, and its output 

197 datasets' existence. 

198 

199 Parameters 

200 ---------- 

201 quantum_id : `uuid.UUID` 

202 Unique ID for the quantum. 

203 

204 Returns 

205 ------- 

206 result : `ProvenanceQuantumScanModels` 

207 Scan result struct. 

208 """ 

209 if (predicted_quantum := self.init_quanta.get(quantum_id)) is not None: 

210 result = ProvenanceQuantumScanModels( 

211 predicted_quantum.quantum_id, status=ProvenanceQuantumScanStatus.INIT 

212 ) 

213 self.comms.log.debug("Created init scan for %s (%s)", quantum_id, predicted_quantum.task_label) 

214 else: 

215 self.reader.read_quantum_datasets([quantum_id]) 

216 predicted_quantum = self.reader.components.quantum_datasets.pop(quantum_id) 

217 self.comms.log.debug( 

218 "Scanning %s (%s@%s)", 

219 quantum_id, 

220 predicted_quantum.task_label, 

221 predicted_quantum.data_coordinate, 

222 ) 

223 logs = self._read_log(predicted_quantum) 

224 metadata = self._read_metadata(predicted_quantum) 

225 result = ProvenanceQuantumScanModels.from_metadata_and_logs( 

226 predicted_quantum, metadata, logs, incomplete=self.comms.config.incomplete 

227 ) 

228 if result.status is ProvenanceQuantumScanStatus.ABANDONED: 

229 self.comms.log.debug("Abandoning scan for failed quantum %s.", quantum_id) 

230 self.comms.report_scan(ScanReport(result.quantum_id, result.status)) 

231 return result 

232 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()): 

233 if predicted_output.dataset_id not in result.output_existence: 

234 result.output_existence[predicted_output.dataset_id] = self.scan_dataset(predicted_output) 

235 to_ingest = self._make_ingest_request(predicted_quantum, result) 

236 if self.comms.config.is_writing_provenance: 

237 to_write = result.to_scan_data(predicted_quantum, compressor=self.compressor) 

238 self.comms.request_write(to_write) 

239 self.comms.request_ingest(to_ingest) 

240 self.comms.report_scan(ScanReport(result.quantum_id, result.status)) 

241 self.comms.log.debug("Finished scan for %s.", quantum_id) 

242 return result 

243 

244 def _make_ingest_request( 

245 self, predicted_quantum: PredictedQuantumDatasetsModel, result: ProvenanceQuantumScanModels 

246 ) -> IngestRequest: 

247 """Make an ingest request from a quantum scan. 

248 

249 Parameters 

250 ---------- 

251 predicted_quantum : `PredictedQuantumDatasetsModel` 

252 Information about the predicted quantum. 

253 result : `ProvenanceQuantumScanModels` 

254 Result of a quantum scan. 

255 

256 Returns 

257 ------- 

258 ingest_request : `IngestRequest` 

259 A request to be sent to the ingester. 

260 """ 

261 predicted_outputs_by_id = { 

262 d.dataset_id: d for d in itertools.chain.from_iterable(predicted_quantum.outputs.values()) 

263 } 

264 to_ingest_refs: list[DatasetRef] = [] 

265 to_ignore: set[uuid.UUID] = set() 

266 if self.comms.config.promise_ingest_graph: 

267 if result.status is ProvenanceQuantumScanStatus.INIT: 

268 if predicted_quantum.task_label: # i.e. not the 'packages' producer 

269 to_ignore.add( 

270 predicted_quantum.outputs[acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME][0].dataset_id 

271 ) 

272 else: 

273 to_ignore.add(predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME][0].dataset_id) 

274 to_ignore.add(predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME][0].dataset_id) 

275 for dataset_id, was_produced in result.output_existence.items(): 

276 if was_produced and dataset_id not in to_ignore: 

277 predicted_output = predicted_outputs_by_id[dataset_id] 

278 to_ingest_refs.append(self.reader.components.make_dataset_ref(predicted_output)) 

279 to_ingest_records = self.qbb._datastore.export_predicted_records(to_ingest_refs) 

280 return IngestRequest(result.quantum_id, to_ingest_refs, to_ingest_records) 

281 

282 def _read_metadata(self, predicted_quantum: PredictedQuantumDatasetsModel) -> TaskMetadata | None: 

283 """Attempt to read the metadata dataset for a quantum. 

284 

285 Parameters 

286 ---------- 

287 predicted_quantum : `PredictedQuantumDatasetsModel` 

288 Information about the predicted quantum. 

289 

290 Returns 

291 ------- 

292 metadata : `...TaskMetadata` or `None` 

293 Task metadata. 

294 """ 

295 (predicted_dataset,) = predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME] 

296 ref = self.reader.components.make_dataset_ref(predicted_dataset) 

297 try: 

298 # This assumes QBB metadata writes are atomic, which should be the 

299 # case. If it's not we'll probably get pydantic validation errors 

300 # here. 

301 return self.qbb.get(ref, storageClass="TaskMetadata") 

302 except FileNotFoundError: 

303 return None 

304 

305 def _read_log(self, predicted_quantum: PredictedQuantumDatasetsModel) -> ButlerLogRecords | None: 

306 """Attempt to read the log dataset for a quantum. 

307 

308 Parameters 

309 ---------- 

310 predicted_quantum : `PredictedQuantumDatasetsModel` 

311 Information about the predicted quantum. 

312 

313 Returns 

314 ------- 

315 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None` 

316 Task logs. 

317 """ 

318 (predicted_dataset,) = predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME] 

319 ref = self.reader.components.make_dataset_ref(predicted_dataset) 

320 try: 

321 # This assumes QBB log writes are atomic, which should be the case. 

322 # If it's not we'll probably get pydantic validation errors here. 

323 return self.qbb.get(ref) 

324 except FileNotFoundError: 

325 return None