Coverage for python / lsst / pipe / base / log_capture.py: 41%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of pipe_Base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["LogCapture"]
32import dataclasses
33import logging
34import uuid
35from collections.abc import Iterator
36from contextlib import contextmanager
37from logging import FileHandler
39import pydantic
41from lsst.daf.butler import Butler, LimitedButler, Quantum
42from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest
43from lsst.daf.butler.logging import (
44 ButlerLogRecord,
45 ButlerLogRecordHandler,
46 ButlerLogRecords,
47 ButlerMDC,
48 JsonLogFormatter,
49)
51from ._status import ExceptionInfo, InvalidQuantumError
52from ._task_metadata import TaskMetadata
53from .automatic_connection_constants import METADATA_OUTPUT_TEMPLATE
54from .pipeline_graph import TaskNode
56_LOG = logging.getLogger(__name__)
59class _ExecutionLogRecordsExtra(pydantic.BaseModel):
60 """Extra information about a quantum's execution stored with logs.
62 This middleware-private model includes information that is not directly
63 available via any public interface, as it is used exclusively for
64 provenance extraction and then made available through the provenance
65 quantum graph.
66 """
68 exception: ExceptionInfo | None = None
69 """Exception information for this quantum, if it failed.
70 """
72 metadata: TaskMetadata | None = None
73 """Metadata for this quantum, if it failed.
75 Metadata datasets are written if and only if a quantum succeeds, but we
76 still want to capture metadata from failed attempts, so we store it in the
77 log dataset. This field is always `None` when the quantum succeeds,
78 because in that case the metadata is already stored separately.
79 """
81 previous_process_quanta: list[uuid.UUID] = pydantic.Field(default_factory=list)
82 """The IDs of other quanta previously executed in the same process as this
83 one.
84 """
86 logs: list[ButlerLogRecord] = pydantic.Field(default_factory=list)
87 """Logs for this attempt.
89 This is always empty for the most recent attempt, because that stores logs
90 in the main section of the butler log records.
91 """
93 previous_attempts: list[_ExecutionLogRecordsExtra] = pydantic.Field(default_factory=list)
94 """Information about previous attempts to run this task within the same
95 `~lsst.daf.butler.CollectionType.RUN` collection.
97 This is always empty for any attempt other than the most recent one,
98 as all previous attempts are flattened into one list.
99 """
101 def attach_previous_attempt(self, log_records: ButlerLogRecords) -> None:
102 """Attach logs from a previous attempt to this struct.
104 Parameters
105 ----------
106 log_records : `lsst.daf.butler.ButlerLogRecords`
107 Logs from a past attempt to run a quantum.
108 """
109 previous = self.model_validate(log_records.extra)
110 previous.logs.extend(log_records)
111 self.previous_attempts.extend(previous.previous_attempts)
112 self.previous_attempts.append(previous)
113 previous.previous_attempts.clear()
116@dataclasses.dataclass
117class _LogCaptureContext:
118 """Controls for log capture returned by the `LogCapture.capture_logging`
119 context manager.
120 """
122 store: bool = True
123 """Whether to store logs at all."""
125 extra: _ExecutionLogRecordsExtra = dataclasses.field(default_factory=_ExecutionLogRecordsExtra)
126 """Extra information about the quantum's execution to store for provenance
127 extraction.
128 """
131class LogCapture:
132 """Class handling capture of logging messages and their export to butler.
134 Parameters
135 ----------
136 butler : `~lsst.daf.butler.LimitedButler`
137 Data butler with limited API.
138 full_butler : `~lsst.daf.butler.Butler` or `None`
139 Data butler with full API, or `None` if full Butler is not available.
140 If not none, then this must be the same instance as ``butler``.
141 """
143 stream_json_logs = True
144 """If True each log record is written to a temporary file and ingested
145 when quantum completes. If False the records are accumulated in memory
146 and stored in butler on quantum completion. If full butler is not available
147 then temporary file is not used."""
149 def __init__(
150 self,
151 butler: LimitedButler,
152 full_butler: Butler | None,
153 ):
154 self.butler = butler
155 self.full_butler = full_butler
157 @classmethod
158 def from_limited(cls, butler: LimitedButler) -> LogCapture:
159 return cls(butler, None)
161 @classmethod
162 def from_full(cls, butler: Butler) -> LogCapture:
163 return cls(butler, butler)
165 @contextmanager
166 def capture_logging(
167 self, task_node: TaskNode, /, quantum: Quantum, records: ButlerLogRecords | None = None
168 ) -> Iterator[_LogCaptureContext]:
169 """Configure logging system to capture logs for execution of this task.
171 Parameters
172 ----------
173 task_node : `~lsst.pipe.base.pipeline_graph.TaskNode`
174 The task definition.
175 quantum : `~lsst.daf.butler.Quantum`
176 Single Quantum instance.
177 records : `lsst.daf.butler.logging.ButlerLogRecords`, optional
178 Log record container to append to and save. If provided, streaming
179 mode is disabled (since we'll be saving logs in memory anyway).
181 Notes
182 -----
183 Expected to be used as a context manager to ensure that logging
184 records are inserted into the butler once the quantum has been
185 executed:
187 .. code-block:: py
189 with self.capture_logging(task_node, quantum):
190 # Run quantum and capture logs.
192 Ths method can also setup logging to attach task- or
193 quantum-specific information to log messages. Potentially this can
194 take into account some info from task configuration as well.
195 """
196 # include quantum dataId and task label into MDC
197 mdc = {"LABEL": task_node.label, "RUN": ""}
198 if quantum.dataId:
199 mdc["LABEL"] += f":{quantum.dataId}"
201 metadata_ref = quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=task_node.label)][0]
202 mdc["RUN"] = metadata_ref.run
204 ctx = _LogCaptureContext()
205 log_dataset_name = (
206 task_node.log_output.dataset_type_name if task_node.log_output is not None else None
207 )
209 # Add a handler to the root logger to capture execution log output.
210 if log_dataset_name is not None:
211 try:
212 [ref] = quantum.outputs[log_dataset_name]
213 except LookupError as exc:
214 raise InvalidQuantumError(
215 f"Quantum outputs is missing log output dataset type {log_dataset_name};"
216 " this could happen due to inconsistent options between QuantumGraph generation"
217 " and execution"
218 ) from exc
219 # Either accumulate into ButlerLogRecords or stream JSON records to
220 # file and ingest that (ingest is possible only with full butler).
221 if self.stream_json_logs and self.full_butler is not None and records is None:
222 with TemporaryForIngest(self.full_butler, ref) as temporary:
223 log_handler_file = FileHandler(temporary.ospath)
224 log_handler_file.setFormatter(JsonLogFormatter())
225 logging.getLogger().addHandler(log_handler_file)
227 try:
228 with ButlerMDC.set_mdc(mdc):
229 yield ctx
230 finally:
231 # Ensure that the logs are stored in butler.
232 logging.getLogger().removeHandler(log_handler_file)
233 log_handler_file.close()
234 if ctx.extra:
235 with open(temporary.ospath, "a") as log_stream:
236 ButlerLogRecords.write_streaming_extra(
237 log_stream,
238 ctx.extra.model_dump_json(exclude_unset=True, exclude_defaults=True),
239 )
240 if ctx.store:
241 temporary.ingest()
243 else:
244 log_handler_memory = ButlerLogRecordHandler(records)
245 logging.getLogger().addHandler(log_handler_memory)
247 try:
248 with ButlerMDC.set_mdc(mdc):
249 yield ctx
250 except:
251 raise
252 else:
253 # If the quantum succeeded, we don't need to save the
254 # metadata in the logs, because we'll have saved them in
255 # the metadata.
256 ctx.extra.metadata = None
257 finally:
258 log_handler_memory.records.extra = ctx.extra.model_dump()
259 # Ensure that the logs are stored in butler.
260 logging.getLogger().removeHandler(log_handler_memory)
261 if ctx.store:
262 self._store_log_records(quantum, log_dataset_name, log_handler_memory)
264 else:
265 with ButlerMDC.set_mdc(mdc):
266 yield ctx
268 def _store_log_records(
269 self, quantum: Quantum, dataset_type: str, log_handler: ButlerLogRecordHandler
270 ) -> None:
271 # DatasetRef has to be in the Quantum outputs, can lookup by name.
272 try:
273 [ref] = quantum.outputs[dataset_type]
274 except LookupError as exc:
275 raise InvalidQuantumError(
276 f"Quantum outputs is missing log output dataset type {dataset_type};"
277 " this could happen due to inconsistent options between QuantumGraph generation"
278 " and execution"
279 ) from exc
281 self.butler.put(log_handler.records, ref)