Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _scanner.py: 32%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("Scanner",)
32import dataclasses
33import itertools
34import uuid
35from contextlib import AbstractContextManager
36from typing import Any, Literal, Self
38import zstandard
40from lsst.daf.butler import ButlerLogRecords, DatasetRef, QuantumBackedButler
42from ... import automatic_connection_constants as acc
43from ..._task_metadata import TaskMetadata
44from ...pipeline_graph import PipelineGraph, TaskImportMode
45from .._multiblock import Compressor
46from .._predicted import (
47 PredictedDatasetModel,
48 PredictedQuantumDatasetsModel,
49 PredictedQuantumGraphReader,
50)
51from .._provenance import ProvenanceQuantumScanModels, ProvenanceQuantumScanStatus
52from ._communicators import ScannerCommunicator
53from ._structs import IngestRequest, ScanReport
56@dataclasses.dataclass
57class Scanner(AbstractContextManager):
58 """A helper class for the provenance aggregator that reads metadata and log
59 files and scans for which outputs exist.
60 """
62 predicted_path: str
63 """Path to the predicted quantum graph."""
65 butler_path: str
66 """Path or alias to the central butler repository."""
68 comms: ScannerCommunicator
69 """Communicator object for this worker."""
71 reader: PredictedQuantumGraphReader = dataclasses.field(init=False)
72 """Reader for the predicted quantum graph."""
74 qbb: QuantumBackedButler = dataclasses.field(init=False)
75 """A quantum-backed butler used for log and metadata reads and existence
76 checks for other outputs (when necessary).
77 """
79 compressor: Compressor | None = None
80 """Object used to compress JSON blocks.
82 This is `None` until a compression dictionary is received from the writer
83 process.
84 """
86 init_quanta: dict[uuid.UUID, PredictedQuantumDatasetsModel] = dataclasses.field(init=False)
87 """Dictionary mapping init quantum IDs to their predicted models."""
89 def __post_init__(self) -> None:
90 if self.comms.config.mock_storage_classes:
91 import lsst.pipe.base.tests.mocks # noqa: F401
92 self.comms.log.verbose("Reading from predicted quantum graph.")
93 self.reader = self.comms.exit_stack.enter_context(
94 PredictedQuantumGraphReader.open(self.predicted_path, import_mode=TaskImportMode.DO_NOT_IMPORT)
95 )
96 self.reader.read_dimension_data()
97 self.reader.read_init_quanta()
98 self.comms.log.verbose("Initializing quantum-backed butler.")
99 self.qbb = self.make_qbb(self.butler_path, self.reader.pipeline_graph)
100 self.init_quanta = {q.quantum_id: q for q in self.reader.components.init_quanta.root}
102 def __enter__(self) -> Self:
103 return self
105 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
106 try:
107 self.qbb.close()
108 except Exception:
109 self.comms.log.exception("An exception occurred during Ingester exit")
110 return False
112 @staticmethod
113 def make_qbb(butler_config: str, pipeline_graph: PipelineGraph) -> QuantumBackedButler:
114 """Make quantum-backed butler that can operate on the outputs of the
115 quantum graph.
117 Parameters
118 ----------
119 butler_config : `str`
120 Path or alias for the central butler repository that shares storage
121 with the quantum-backed butler.
122 pipeline_graph : `..pipeline_graph.PipelineGraph`
123 Graph of tasks and dataset types.
125 Returns
126 -------
127 qbb : `lsst.daf.butler.QuantumBackedButler`
128 Quantum-backed butler. This does not have the datastore records
129 needed to read overall-inputs.
130 """
131 return QuantumBackedButler.from_predicted(
132 butler_config,
133 predicted_inputs=[],
134 predicted_outputs=[],
135 dimensions=pipeline_graph.universe,
136 # We don't need the datastore records in the QG because we're
137 # only going to read metadata and logs, and those are never
138 # overall inputs.
139 datastore_records={},
140 dataset_types={node.name: node.dataset_type for node in pipeline_graph.dataset_types.values()},
141 )
143 @property
144 def pipeline_graph(self) -> PipelineGraph:
145 """Graph of tasks and dataset types."""
146 return self.reader.pipeline_graph
148 @staticmethod
149 def run(predicted_path: str, butler_path: str, comms: ScannerCommunicator) -> None:
150 """Run the scanner.
152 Parameters
153 ----------
154 predicted_path : `str`
155 Path to the predicted quantum graph.
156 butler_path : `str`
157 Path or alias to the central butler repository.
158 comms : `ScannerCommunicator`
159 Communicator for the scanner.
161 Notes
162 -----
163 This method is designed to run as the ``target`` in
164 `WorkerFactory.make_worker`.
165 """
166 with comms, Scanner(predicted_path, butler_path, comms) as scanner:
167 scanner.loop()
169 def loop(self) -> None:
170 """Run the main loop for the scanner."""
171 self.comms.log.info("Scan request loop beginning.")
172 for quantum_id in self.comms.poll():
173 if self.compressor is None and (cdict_data := self.comms.get_compression_dict()) is not None:
174 self.compressor = zstandard.ZstdCompressor(
175 self.comms.config.zstd_level, zstandard.ZstdCompressionDict(cdict_data)
176 )
177 self.scan_quantum(quantum_id)
179 def scan_dataset(self, predicted: PredictedDatasetModel) -> bool:
180 """Scan for a dataset's existence.
182 Parameters
183 ----------
184 predicted : `.PredictedDatasetModel`
185 Information about the dataset from the predicted graph.
187 Returns
188 -------
189 exists : `bool``
190 Whether the dataset exists.
191 """
192 ref = self.reader.components.make_dataset_ref(predicted)
193 return self.qbb.stored(ref)
195 def scan_quantum(self, quantum_id: uuid.UUID) -> ProvenanceQuantumScanModels:
196 """Scan for a quantum's completion and error status, and its output
197 datasets' existence.
199 Parameters
200 ----------
201 quantum_id : `uuid.UUID`
202 Unique ID for the quantum.
204 Returns
205 -------
206 result : `ProvenanceQuantumScanModels`
207 Scan result struct.
208 """
209 if (predicted_quantum := self.init_quanta.get(quantum_id)) is not None:
210 result = ProvenanceQuantumScanModels(
211 predicted_quantum.quantum_id, status=ProvenanceQuantumScanStatus.INIT
212 )
213 self.comms.log.debug("Created init scan for %s (%s)", quantum_id, predicted_quantum.task_label)
214 else:
215 self.reader.read_quantum_datasets([quantum_id])
216 predicted_quantum = self.reader.components.quantum_datasets.pop(quantum_id)
217 self.comms.log.debug(
218 "Scanning %s (%s@%s)",
219 quantum_id,
220 predicted_quantum.task_label,
221 predicted_quantum.data_coordinate,
222 )
223 logs = self._read_log(predicted_quantum)
224 metadata = self._read_metadata(predicted_quantum)
225 result = ProvenanceQuantumScanModels.from_metadata_and_logs(
226 predicted_quantum, metadata, logs, incomplete=self.comms.config.incomplete
227 )
228 if result.status is ProvenanceQuantumScanStatus.ABANDONED:
229 self.comms.log.debug("Abandoning scan for failed quantum %s.", quantum_id)
230 self.comms.report_scan(ScanReport(result.quantum_id, result.status))
231 return result
232 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()):
233 if predicted_output.dataset_id not in result.output_existence:
234 result.output_existence[predicted_output.dataset_id] = self.scan_dataset(predicted_output)
235 to_ingest = self._make_ingest_request(predicted_quantum, result)
236 if self.comms.config.is_writing_provenance:
237 to_write = result.to_scan_data(predicted_quantum, compressor=self.compressor)
238 self.comms.request_write(to_write)
239 self.comms.request_ingest(to_ingest)
240 self.comms.report_scan(ScanReport(result.quantum_id, result.status))
241 self.comms.log.debug("Finished scan for %s.", quantum_id)
242 return result
244 def _make_ingest_request(
245 self, predicted_quantum: PredictedQuantumDatasetsModel, result: ProvenanceQuantumScanModels
246 ) -> IngestRequest:
247 """Make an ingest request from a quantum scan.
249 Parameters
250 ----------
251 predicted_quantum : `PredictedQuantumDatasetsModel`
252 Information about the predicted quantum.
253 result : `ProvenanceQuantumScanModels`
254 Result of a quantum scan.
256 Returns
257 -------
258 ingest_request : `IngestRequest`
259 A request to be sent to the ingester.
260 """
261 predicted_outputs_by_id = {
262 d.dataset_id: d for d in itertools.chain.from_iterable(predicted_quantum.outputs.values())
263 }
264 to_ingest_refs: list[DatasetRef] = []
265 to_ignore: set[uuid.UUID] = set()
266 if self.comms.config.promise_ingest_graph:
267 if result.status is ProvenanceQuantumScanStatus.INIT:
268 if predicted_quantum.task_label: # i.e. not the 'packages' producer
269 to_ignore.add(
270 predicted_quantum.outputs[acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME][0].dataset_id
271 )
272 else:
273 to_ignore.add(predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME][0].dataset_id)
274 to_ignore.add(predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME][0].dataset_id)
275 for dataset_id, was_produced in result.output_existence.items():
276 if was_produced and dataset_id not in to_ignore:
277 predicted_output = predicted_outputs_by_id[dataset_id]
278 to_ingest_refs.append(self.reader.components.make_dataset_ref(predicted_output))
279 to_ingest_records = self.qbb._datastore.export_predicted_records(to_ingest_refs)
280 return IngestRequest(result.quantum_id, to_ingest_refs, to_ingest_records)
282 def _read_metadata(self, predicted_quantum: PredictedQuantumDatasetsModel) -> TaskMetadata | None:
283 """Attempt to read the metadata dataset for a quantum.
285 Parameters
286 ----------
287 predicted_quantum : `PredictedQuantumDatasetsModel`
288 Information about the predicted quantum.
290 Returns
291 -------
292 metadata : `...TaskMetadata` or `None`
293 Task metadata.
294 """
295 (predicted_dataset,) = predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME]
296 ref = self.reader.components.make_dataset_ref(predicted_dataset)
297 try:
298 # This assumes QBB metadata writes are atomic, which should be the
299 # case. If it's not we'll probably get pydantic validation errors
300 # here.
301 return self.qbb.get(ref, storageClass="TaskMetadata")
302 except FileNotFoundError:
303 return None
305 def _read_log(self, predicted_quantum: PredictedQuantumDatasetsModel) -> ButlerLogRecords | None:
306 """Attempt to read the log dataset for a quantum.
308 Parameters
309 ----------
310 predicted_quantum : `PredictedQuantumDatasetsModel`
311 Information about the predicted quantum.
313 Returns
314 -------
315 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None`
316 Task logs.
317 """
318 (predicted_dataset,) = predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME]
319 ref = self.reader.components.make_dataset_ref(predicted_dataset)
320 try:
321 # This assumes QBB log writes are atomic, which should be the case.
322 # If it's not we'll probably get pydantic validation errors here.
323 return self.qbb.get(ref)
324 except FileNotFoundError:
325 return None