Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _ingester.py: 27%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:20 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("Ingester",) 

31 

32import dataclasses 

33import logging 

34import time 

35import uuid 

36from collections import defaultdict 

37from contextlib import AbstractContextManager 

38from typing import Any, Literal, Self 

39 

40from lsst.daf.butler import Butler, CollectionType, DatasetRef, DimensionGroup 

41from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

42from lsst.daf.butler.registry import ConflictingDefinitionError 

43 

44from ...pipeline_graph import TaskImportMode 

45from .._common import DatastoreName 

46from .._predicted import PredictedQuantumGraphComponents, PredictedQuantumGraphReader 

47from ._communicators import IngesterCommunicator 

48 

49 

50@dataclasses.dataclass 

51class Ingester(AbstractContextManager): 

52 """A helper class for the provenance aggregator that handles ingestion into 

53 the central butler repository. 

54 """ 

55 

56 predicted_path: str 

57 """Path to the predicted quantum graph.""" 

58 

59 butler_path: str 

60 """Path or alias to the central butler repository.""" 

61 

62 comms: IngesterCommunicator 

63 """Communicator object for this worker.""" 

64 

65 predicted: PredictedQuantumGraphComponents = dataclasses.field(init=False) 

66 """Components of the predicted graph.""" 

67 

68 butler: Butler = dataclasses.field(init=False) 

69 """Client for the central butler repository.""" 

70 

71 n_datasets_ingested: int = 0 

72 """Total number of datasets ingested by this invocation.""" 

73 

74 n_datasets_skipped: int = 0 

75 """Total number of datasets skipped because they were already present.""" 

76 

77 n_producers_pending: int = 0 

78 """Number of quanta whose outputs are currently pending ingest.""" 

79 

80 refs_pending: defaultdict[DimensionGroup, list[DatasetRef]] = dataclasses.field( 

81 default_factory=lambda: defaultdict(list) 

82 ) 

83 """Dataset references pending ingest, grouped by their dimensions.""" 

84 

85 records_pending: dict[DatastoreName, DatastoreRecordData] = dataclasses.field(default_factory=dict) 

86 """Datastore records pending ingest, grouped by datastore name.""" 

87 

88 already_ingested: set[uuid.UUID] | None = None 

89 """A set of all dataset IDs already present in the output RUN 

90 collection. 

91 

92 If this is not `None`, the ingester is in defensive ingest mode, either 

93 because it was configured to query for these dataset IDs up front, or 

94 because a transaction failed due to a dataset already being present. 

95 """ 

96 

97 last_ingest_time: float = dataclasses.field(default_factory=time.time) 

98 """POSIX timestamp since the last ingest transaction concluded.""" 

99 

100 def __post_init__(self) -> None: 

101 self.comms.log.verbose("Reading from predicted quantum graph.") 

102 with PredictedQuantumGraphReader.open( 

103 self.predicted_path, import_mode=TaskImportMode.DO_NOT_IMPORT 

104 ) as reader: 

105 # We only need the header and pipeline graph. 

106 self.predicted = reader.components 

107 if self.comms.config.mock_storage_classes: 

108 import lsst.pipe.base.tests.mocks # noqa: F401 

109 self.comms.log.verbose("Initializing butler.") 

110 self.butler = Butler.from_config(self.butler_path, writeable=not self.comms.config.dry_run) 

111 

112 def __enter__(self) -> Self: 

113 return self 

114 

115 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: 

116 try: 

117 self.butler.close() 

118 except Exception: 

119 self.comms.log.exception("An exception occurred during Ingester exit") 

120 return False 

121 

122 @property 

123 def n_datasets_pending(self) -> int: 

124 """The number of butler datasets currently pending.""" 

125 return sum(len(v) for v in self.refs_pending.values()) 

126 

127 @staticmethod 

128 def run(predicted_path: str, butler_path: str, comms: IngesterCommunicator) -> None: 

129 """Run the ingester. 

130 

131 Parameters 

132 ---------- 

133 predicted_path : `str` 

134 Path to the predicted quantum graph. 

135 butler_path : `str` 

136 Path or alias to the central butler repository. 

137 comms : `IngesterCommunicator` 

138 Communicator for the ingester. 

139 

140 Notes 

141 ----- 

142 This method is designed to run as the ``target`` in 

143 `WorkerFactory.make_worker`. 

144 """ 

145 with comms, Ingester(predicted_path, butler_path, comms) as ingester: 

146 ingester.loop() 

147 

148 def loop(self) -> None: 

149 """Run the main loop for the ingester.""" 

150 self.comms.log.verbose("Registering collections and dataset types.") 

151 if not self.comms.config.dry_run: 

152 if self.comms.config.register_dataset_types: 

153 self.predicted.pipeline_graph.register_dataset_types( 

154 self.butler, 

155 include_inputs=False, 

156 include_packages=True, 

157 include_configs=True, 

158 include_logs=True, 

159 ) 

160 self.butler.collections.register(self.predicted.header.output_run) 

161 # Updating the output chain cannot happen inside the caching 

162 # context. 

163 if self.comms.config.update_output_chain: 

164 self.update_output_chain() 

165 with self.butler.registry.caching_context(): 

166 if self.comms.config.defensive_ingest: 

167 self.fetch_already_ingested() 

168 self.comms.log.info("Startup completed in %ss.", time.time() - self.last_ingest_time) 

169 self.last_ingest_time = time.time() 

170 for ingest_request in self.comms.poll(): 

171 self.n_producers_pending += 1 

172 self.comms.log.debug(f"Got ingest request for producer {ingest_request.producer_id}.") 

173 self.update_outputs_pending(refs=ingest_request.refs, records=ingest_request.records) 

174 if self.n_datasets_pending > self.comms.config.ingest_batch_size: 

175 self.ingest() 

176 self.comms.log.info("All ingest requests received.") 

177 # We use 'while' in case this fails with a conflict and we switch 

178 # to defensive mode (should be at most two iterations). 

179 ingest_start_time = time.time() 

180 while self.n_datasets_pending: 

181 n_datasets = self.n_datasets_pending 

182 self.ingest() 

183 self.comms.log.verbose( 

184 "Gathered %d final datasets in %ss and ingested them in %ss.", 

185 n_datasets, 

186 ingest_start_time - self.last_ingest_time, 

187 time.time() - ingest_start_time, 

188 ) 

189 if self.n_producers_pending: 

190 # We can finish with returns pending if we filtered out all of 

191 # the datasets we started with as already existing. 

192 self.report() 

193 self.comms.log_progress( 

194 logging.INFO, 

195 f"Ingested {self.n_datasets_ingested} dataset(s); " 

196 f"skipped {self.n_datasets_skipped} already present.", 

197 ) 

198 

199 def ingest(self) -> None: 

200 """Ingest all pending datasets and report success to the supervisor.""" 

201 ingest_start_time = time.time() 

202 self.comms.log.verbose( 

203 "Gathered %d datasets from %d quanta in %ss.", 

204 self.n_datasets_pending, 

205 self.n_producers_pending, 

206 ingest_start_time - self.last_ingest_time, 

207 ) 

208 try: 

209 if not self.comms.config.dry_run: 

210 with self.butler.registry.transaction(): 

211 for refs in self.refs_pending.values(): 

212 self.butler.registry._importDatasets(refs, expand=False, assume_new=True) 

213 self.butler._datastore.import_records(self.records_pending) 

214 self.last_ingest_time = time.time() 

215 self.comms.log.verbose( 

216 "Ingested %d datasets from %d quanta in %ss.", 

217 self.n_datasets_pending, 

218 self.n_producers_pending, 

219 self.last_ingest_time - ingest_start_time, 

220 ) 

221 self.n_datasets_ingested += self.n_datasets_pending 

222 except ConflictingDefinitionError: 

223 if self.already_ingested is None: 

224 self.comms.log_progress( 

225 logging.INFO, 

226 "Some outputs seem to have already been ingested; querying for existing datasets and " 

227 "switching to defensive ingest mode.", 

228 ) 

229 self.fetch_already_ingested() 

230 # We just return instead of trying again immediately because we 

231 # might have just shrunk the number of pending datasets below 

232 # the batch threshold. 

233 return 

234 else: 

235 raise 

236 self.report() 

237 self.records_pending.clear() 

238 self.refs_pending.clear() 

239 

240 def report(self) -> None: 

241 """Report a successful ingest to the supervisor.""" 

242 self.comms.report_ingest(self.n_producers_pending) 

243 self.n_producers_pending = 0 

244 

245 def fetch_already_ingested(self) -> None: 

246 """Query for the UUIDs of all dataset already present in the output 

247 RUN collection, and filter and pending datasets accordingly. 

248 """ 

249 self.comms.log.info("Fetching all UUIDs in output collection %r.", self.predicted.header.output_run) 

250 self.already_ingested = set( 

251 self.butler.registry._fetch_run_dataset_ids(self.predicted.header.output_run) 

252 ) 

253 kept: set[uuid.UUID] = set() 

254 for dimensions, refs in self.refs_pending.items(): 

255 filtered_refs: list[DatasetRef] = [] 

256 for ref in refs: 

257 if ref.id not in self.already_ingested: 

258 kept.add(ref.id) 

259 filtered_refs.append(ref) 

260 else: 

261 self.n_datasets_skipped += 1 

262 self.refs_pending[dimensions] = filtered_refs 

263 for datastore_name, datastore_records in list(self.records_pending.items()): 

264 if (filtered_records := datastore_records.subset(kept)) is not None: 

265 self.records_pending[datastore_name] = filtered_records 

266 else: 

267 del self.records_pending[datastore_name] 

268 

269 def update_outputs_pending( 

270 self, 

271 refs: list[DatasetRef], 

272 records: dict[DatastoreName, DatastoreRecordData], 

273 ) -> None: 

274 """Add an ingest request to the pending-ingest data structures. 

275 

276 Parameters 

277 ---------- 

278 refs : `list` [ `lsst.daf.butler.DatasetRef` ] 

279 Registry information about regular quantum-output datasets. 

280 records : `dict` [ `str`, \ 

281 `lsst.daf.butler.datastore.record_data.DatastoreRecordData` ] 

282 Datastore information about the datasets. 

283 """ 

284 n_given = len(refs) 

285 if self.already_ingested is not None: 

286 refs = [ref for ref in refs if ref.id not in self.already_ingested] 

287 kept = {ref.id for ref in refs} 

288 self.n_datasets_skipped += n_given - len(kept) 

289 records = { 

290 datastore_name: filtered_records 

291 for datastore_name, original_records in records.items() 

292 if (filtered_records := original_records.subset(kept)) is not None 

293 } 

294 for ref in refs: 

295 self.refs_pending[ref.datasetType.dimensions].append(ref) 

296 for datastore_name, datastore_records in records.items(): 

297 if (existing_records := self.records_pending.get(datastore_name)) is not None: 

298 existing_records.update(datastore_records) 

299 else: 

300 self.records_pending[datastore_name] = datastore_records 

301 

302 def update_output_chain(self) -> None: 

303 """Update the output CHAINED collection to include the output RUN 

304 collection (and the inputs, if the output CHAINED collection does not 

305 exist). 

306 

307 Notes 

308 ----- 

309 This method cannot be called inside the registry caching context. 

310 """ 

311 if self.predicted.header.output is None: 

312 return 

313 self.comms.log.info( 

314 "Updating output collection %s to include %s.", 

315 self.predicted.header.output, 

316 self.predicted.header.output_run, 

317 ) 

318 if self.butler.collections.register(self.predicted.header.output, CollectionType.CHAINED): 

319 # Chain is new; need to add inputs, but we want to flatten them 

320 # first. 

321 if self.predicted.header.inputs: 

322 flattened = self.butler.collections.query(self.predicted.header.inputs, flatten_chains=True) 

323 self.butler.collections.extend_chain(self.predicted.header.output, flattened) 

324 self.butler.collections.prepend_chain(self.predicted.header.output, self.predicted.header.output_run)