Coverage for python / lsst / pipe / base / log_capture.py: 41%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_Base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["LogCapture"] 

31 

32import dataclasses 

33import logging 

34import uuid 

35from collections.abc import Iterator 

36from contextlib import contextmanager 

37from logging import FileHandler 

38 

39import pydantic 

40 

41from lsst.daf.butler import Butler, LimitedButler, Quantum 

42from lsst.daf.butler._rubin.temporary_for_ingest import TemporaryForIngest 

43from lsst.daf.butler.logging import ( 

44 ButlerLogRecord, 

45 ButlerLogRecordHandler, 

46 ButlerLogRecords, 

47 ButlerMDC, 

48 JsonLogFormatter, 

49) 

50 

51from ._status import ExceptionInfo, InvalidQuantumError 

52from ._task_metadata import TaskMetadata 

53from .automatic_connection_constants import METADATA_OUTPUT_TEMPLATE 

54from .pipeline_graph import TaskNode 

55 

56_LOG = logging.getLogger(__name__) 

57 

58 

59class _ExecutionLogRecordsExtra(pydantic.BaseModel): 

60 """Extra information about a quantum's execution stored with logs. 

61 

62 This middleware-private model includes information that is not directly 

63 available via any public interface, as it is used exclusively for 

64 provenance extraction and then made available through the provenance 

65 quantum graph. 

66 """ 

67 

68 exception: ExceptionInfo | None = None 

69 """Exception information for this quantum, if it failed. 

70 """ 

71 

72 metadata: TaskMetadata | None = None 

73 """Metadata for this quantum, if it failed. 

74 

75 Metadata datasets are written if and only if a quantum succeeds, but we 

76 still want to capture metadata from failed attempts, so we store it in the 

77 log dataset. This field is always `None` when the quantum succeeds, 

78 because in that case the metadata is already stored separately. 

79 """ 

80 

81 previous_process_quanta: list[uuid.UUID] = pydantic.Field(default_factory=list) 

82 """The IDs of other quanta previously executed in the same process as this 

83 one. 

84 """ 

85 

86 logs: list[ButlerLogRecord] = pydantic.Field(default_factory=list) 

87 """Logs for this attempt. 

88 

89 This is always empty for the most recent attempt, because that stores logs 

90 in the main section of the butler log records. 

91 """ 

92 

93 previous_attempts: list[_ExecutionLogRecordsExtra] = pydantic.Field(default_factory=list) 

94 """Information about previous attempts to run this task within the same 

95 `~lsst.daf.butler.CollectionType.RUN` collection. 

96 

97 This is always empty for any attempt other than the most recent one, 

98 as all previous attempts are flattened into one list. 

99 """ 

100 

101 def attach_previous_attempt(self, log_records: ButlerLogRecords) -> None: 

102 """Attach logs from a previous attempt to this struct. 

103 

104 Parameters 

105 ---------- 

106 log_records : `lsst.daf.butler.ButlerLogRecords` 

107 Logs from a past attempt to run a quantum. 

108 """ 

109 previous = self.model_validate(log_records.extra) 

110 previous.logs.extend(log_records) 

111 self.previous_attempts.extend(previous.previous_attempts) 

112 self.previous_attempts.append(previous) 

113 previous.previous_attempts.clear() 

114 

115 

116@dataclasses.dataclass 

117class _LogCaptureContext: 

118 """Controls for log capture returned by the `LogCapture.capture_logging` 

119 context manager. 

120 """ 

121 

122 store: bool = True 

123 """Whether to store logs at all.""" 

124 

125 extra: _ExecutionLogRecordsExtra = dataclasses.field(default_factory=_ExecutionLogRecordsExtra) 

126 """Extra information about the quantum's execution to store for provenance 

127 extraction. 

128 """ 

129 

130 

131class LogCapture: 

132 """Class handling capture of logging messages and their export to butler. 

133 

134 Parameters 

135 ---------- 

136 butler : `~lsst.daf.butler.LimitedButler` 

137 Data butler with limited API. 

138 full_butler : `~lsst.daf.butler.Butler` or `None` 

139 Data butler with full API, or `None` if full Butler is not available. 

140 If not none, then this must be the same instance as ``butler``. 

141 """ 

142 

143 stream_json_logs = True 

144 """If True each log record is written to a temporary file and ingested 

145 when quantum completes. If False the records are accumulated in memory 

146 and stored in butler on quantum completion. If full butler is not available 

147 then temporary file is not used.""" 

148 

149 def __init__( 

150 self, 

151 butler: LimitedButler, 

152 full_butler: Butler | None, 

153 ): 

154 self.butler = butler 

155 self.full_butler = full_butler 

156 

157 @classmethod 

158 def from_limited(cls, butler: LimitedButler) -> LogCapture: 

159 return cls(butler, None) 

160 

161 @classmethod 

162 def from_full(cls, butler: Butler) -> LogCapture: 

163 return cls(butler, butler) 

164 

165 @contextmanager 

166 def capture_logging( 

167 self, task_node: TaskNode, /, quantum: Quantum, records: ButlerLogRecords | None = None 

168 ) -> Iterator[_LogCaptureContext]: 

169 """Configure logging system to capture logs for execution of this task. 

170 

171 Parameters 

172 ---------- 

173 task_node : `~lsst.pipe.base.pipeline_graph.TaskNode` 

174 The task definition. 

175 quantum : `~lsst.daf.butler.Quantum` 

176 Single Quantum instance. 

177 records : `lsst.daf.butler.logging.ButlerLogRecords`, optional 

178 Log record container to append to and save. If provided, streaming 

179 mode is disabled (since we'll be saving logs in memory anyway). 

180 

181 Notes 

182 ----- 

183 Expected to be used as a context manager to ensure that logging 

184 records are inserted into the butler once the quantum has been 

185 executed: 

186 

187 .. code-block:: py 

188 

189 with self.capture_logging(task_node, quantum): 

190 # Run quantum and capture logs. 

191 

192 Ths method can also setup logging to attach task- or 

193 quantum-specific information to log messages. Potentially this can 

194 take into account some info from task configuration as well. 

195 """ 

196 # include quantum dataId and task label into MDC 

197 mdc = {"LABEL": task_node.label, "RUN": ""} 

198 if quantum.dataId: 

199 mdc["LABEL"] += f":{quantum.dataId}" 

200 

201 metadata_ref = quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=task_node.label)][0] 

202 mdc["RUN"] = metadata_ref.run 

203 

204 ctx = _LogCaptureContext() 

205 log_dataset_name = ( 

206 task_node.log_output.dataset_type_name if task_node.log_output is not None else None 

207 ) 

208 

209 # Add a handler to the root logger to capture execution log output. 

210 if log_dataset_name is not None: 

211 try: 

212 [ref] = quantum.outputs[log_dataset_name] 

213 except LookupError as exc: 

214 raise InvalidQuantumError( 

215 f"Quantum outputs is missing log output dataset type {log_dataset_name};" 

216 " this could happen due to inconsistent options between QuantumGraph generation" 

217 " and execution" 

218 ) from exc 

219 # Either accumulate into ButlerLogRecords or stream JSON records to 

220 # file and ingest that (ingest is possible only with full butler). 

221 if self.stream_json_logs and self.full_butler is not None and records is None: 

222 with TemporaryForIngest(self.full_butler, ref) as temporary: 

223 log_handler_file = FileHandler(temporary.ospath) 

224 log_handler_file.setFormatter(JsonLogFormatter()) 

225 logging.getLogger().addHandler(log_handler_file) 

226 

227 try: 

228 with ButlerMDC.set_mdc(mdc): 

229 yield ctx 

230 finally: 

231 # Ensure that the logs are stored in butler. 

232 logging.getLogger().removeHandler(log_handler_file) 

233 log_handler_file.close() 

234 if ctx.extra: 

235 with open(temporary.ospath, "a") as log_stream: 

236 ButlerLogRecords.write_streaming_extra( 

237 log_stream, 

238 ctx.extra.model_dump_json(exclude_unset=True, exclude_defaults=True), 

239 ) 

240 if ctx.store: 

241 temporary.ingest() 

242 

243 else: 

244 log_handler_memory = ButlerLogRecordHandler(records) 

245 logging.getLogger().addHandler(log_handler_memory) 

246 

247 try: 

248 with ButlerMDC.set_mdc(mdc): 

249 yield ctx 

250 except: 

251 raise 

252 else: 

253 # If the quantum succeeded, we don't need to save the 

254 # metadata in the logs, because we'll have saved them in 

255 # the metadata. 

256 ctx.extra.metadata = None 

257 finally: 

258 log_handler_memory.records.extra = ctx.extra.model_dump() 

259 # Ensure that the logs are stored in butler. 

260 logging.getLogger().removeHandler(log_handler_memory) 

261 if ctx.store: 

262 self._store_log_records(quantum, log_dataset_name, log_handler_memory) 

263 

264 else: 

265 with ButlerMDC.set_mdc(mdc): 

266 yield ctx 

267 

268 def _store_log_records( 

269 self, quantum: Quantum, dataset_type: str, log_handler: ButlerLogRecordHandler 

270 ) -> None: 

271 # DatasetRef has to be in the Quantum outputs, can lookup by name. 

272 try: 

273 [ref] = quantum.outputs[dataset_type] 

274 except LookupError as exc: 

275 raise InvalidQuantumError( 

276 f"Quantum outputs is missing log output dataset type {dataset_type};" 

277 " this could happen due to inconsistent options between QuantumGraph generation" 

278 " and execution" 

279 ) from exc 

280 

281 self.butler.put(log_handler.records, ref)