Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _ingester.py: 27%
155 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("Ingester",)
32import dataclasses
33import logging
34import time
35import uuid
36from collections import defaultdict
37from contextlib import AbstractContextManager
38from typing import Any, Literal, Self
40from lsst.daf.butler import Butler, CollectionType, DatasetRef, DimensionGroup
41from lsst.daf.butler.datastore.record_data import DatastoreRecordData
42from lsst.daf.butler.registry import ConflictingDefinitionError
44from ...pipeline_graph import TaskImportMode
45from .._common import DatastoreName
46from .._predicted import PredictedQuantumGraphComponents, PredictedQuantumGraphReader
47from ._communicators import IngesterCommunicator
50@dataclasses.dataclass
51class Ingester(AbstractContextManager):
52 """A helper class for the provenance aggregator that handles ingestion into
53 the central butler repository.
54 """
56 predicted_path: str
57 """Path to the predicted quantum graph."""
59 butler_path: str
60 """Path or alias to the central butler repository."""
62 comms: IngesterCommunicator
63 """Communicator object for this worker."""
65 predicted: PredictedQuantumGraphComponents = dataclasses.field(init=False)
66 """Components of the predicted graph."""
68 butler: Butler = dataclasses.field(init=False)
69 """Client for the central butler repository."""
71 n_datasets_ingested: int = 0
72 """Total number of datasets ingested by this invocation."""
74 n_datasets_skipped: int = 0
75 """Total number of datasets skipped because they were already present."""
77 n_producers_pending: int = 0
78 """Number of quanta whose outputs are currently pending ingest."""
80 refs_pending: defaultdict[DimensionGroup, list[DatasetRef]] = dataclasses.field(
81 default_factory=lambda: defaultdict(list)
82 )
83 """Dataset references pending ingest, grouped by their dimensions."""
85 records_pending: dict[DatastoreName, DatastoreRecordData] = dataclasses.field(default_factory=dict)
86 """Datastore records pending ingest, grouped by datastore name."""
88 already_ingested: set[uuid.UUID] | None = None
89 """A set of all dataset IDs already present in the output RUN
90 collection.
92 If this is not `None`, the ingester is in defensive ingest mode, either
93 because it was configured to query for these dataset IDs up front, or
94 because a transaction failed due to a dataset already being present.
95 """
97 last_ingest_time: float = dataclasses.field(default_factory=time.time)
98 """POSIX timestamp since the last ingest transaction concluded."""
100 def __post_init__(self) -> None:
101 self.comms.log.verbose("Reading from predicted quantum graph.")
102 with PredictedQuantumGraphReader.open(
103 self.predicted_path, import_mode=TaskImportMode.DO_NOT_IMPORT
104 ) as reader:
105 # We only need the header and pipeline graph.
106 self.predicted = reader.components
107 if self.comms.config.mock_storage_classes:
108 import lsst.pipe.base.tests.mocks # noqa: F401
109 self.comms.log.verbose("Initializing butler.")
110 self.butler = Butler.from_config(self.butler_path, writeable=not self.comms.config.dry_run)
112 def __enter__(self) -> Self:
113 return self
115 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
116 try:
117 self.butler.close()
118 except Exception:
119 self.comms.log.exception("An exception occurred during Ingester exit")
120 return False
122 @property
123 def n_datasets_pending(self) -> int:
124 """The number of butler datasets currently pending."""
125 return sum(len(v) for v in self.refs_pending.values())
127 @staticmethod
128 def run(predicted_path: str, butler_path: str, comms: IngesterCommunicator) -> None:
129 """Run the ingester.
131 Parameters
132 ----------
133 predicted_path : `str`
134 Path to the predicted quantum graph.
135 butler_path : `str`
136 Path or alias to the central butler repository.
137 comms : `IngesterCommunicator`
138 Communicator for the ingester.
140 Notes
141 -----
142 This method is designed to run as the ``target`` in
143 `WorkerFactory.make_worker`.
144 """
145 with comms, Ingester(predicted_path, butler_path, comms) as ingester:
146 ingester.loop()
148 def loop(self) -> None:
149 """Run the main loop for the ingester."""
150 self.comms.log.verbose("Registering collections and dataset types.")
151 if not self.comms.config.dry_run:
152 if self.comms.config.register_dataset_types:
153 self.predicted.pipeline_graph.register_dataset_types(
154 self.butler,
155 include_inputs=False,
156 include_packages=True,
157 include_configs=True,
158 include_logs=True,
159 )
160 self.butler.collections.register(self.predicted.header.output_run)
161 # Updating the output chain cannot happen inside the caching
162 # context.
163 if self.comms.config.update_output_chain:
164 self.update_output_chain()
165 with self.butler.registry.caching_context():
166 if self.comms.config.defensive_ingest:
167 self.fetch_already_ingested()
168 self.comms.log.info("Startup completed in %ss.", time.time() - self.last_ingest_time)
169 self.last_ingest_time = time.time()
170 for ingest_request in self.comms.poll():
171 self.n_producers_pending += 1
172 self.comms.log.debug(f"Got ingest request for producer {ingest_request.producer_id}.")
173 self.update_outputs_pending(refs=ingest_request.refs, records=ingest_request.records)
174 if self.n_datasets_pending > self.comms.config.ingest_batch_size:
175 self.ingest()
176 self.comms.log.info("All ingest requests received.")
177 # We use 'while' in case this fails with a conflict and we switch
178 # to defensive mode (should be at most two iterations).
179 ingest_start_time = time.time()
180 while self.n_datasets_pending:
181 n_datasets = self.n_datasets_pending
182 self.ingest()
183 self.comms.log.verbose(
184 "Gathered %d final datasets in %ss and ingested them in %ss.",
185 n_datasets,
186 ingest_start_time - self.last_ingest_time,
187 time.time() - ingest_start_time,
188 )
189 if self.n_producers_pending:
190 # We can finish with returns pending if we filtered out all of
191 # the datasets we started with as already existing.
192 self.report()
193 self.comms.log_progress(
194 logging.INFO,
195 f"Ingested {self.n_datasets_ingested} dataset(s); "
196 f"skipped {self.n_datasets_skipped} already present.",
197 )
199 def ingest(self) -> None:
200 """Ingest all pending datasets and report success to the supervisor."""
201 ingest_start_time = time.time()
202 self.comms.log.verbose(
203 "Gathered %d datasets from %d quanta in %ss.",
204 self.n_datasets_pending,
205 self.n_producers_pending,
206 ingest_start_time - self.last_ingest_time,
207 )
208 try:
209 if not self.comms.config.dry_run:
210 with self.butler.registry.transaction():
211 for refs in self.refs_pending.values():
212 self.butler.registry._importDatasets(refs, expand=False, assume_new=True)
213 self.butler._datastore.import_records(self.records_pending)
214 self.last_ingest_time = time.time()
215 self.comms.log.verbose(
216 "Ingested %d datasets from %d quanta in %ss.",
217 self.n_datasets_pending,
218 self.n_producers_pending,
219 self.last_ingest_time - ingest_start_time,
220 )
221 self.n_datasets_ingested += self.n_datasets_pending
222 except ConflictingDefinitionError:
223 if self.already_ingested is None:
224 self.comms.log_progress(
225 logging.INFO,
226 "Some outputs seem to have already been ingested; querying for existing datasets and "
227 "switching to defensive ingest mode.",
228 )
229 self.fetch_already_ingested()
230 # We just return instead of trying again immediately because we
231 # might have just shrunk the number of pending datasets below
232 # the batch threshold.
233 return
234 else:
235 raise
236 self.report()
237 self.records_pending.clear()
238 self.refs_pending.clear()
240 def report(self) -> None:
241 """Report a successful ingest to the supervisor."""
242 self.comms.report_ingest(self.n_producers_pending)
243 self.n_producers_pending = 0
245 def fetch_already_ingested(self) -> None:
246 """Query for the UUIDs of all dataset already present in the output
247 RUN collection, and filter and pending datasets accordingly.
248 """
249 self.comms.log.info("Fetching all UUIDs in output collection %r.", self.predicted.header.output_run)
250 self.already_ingested = set(
251 self.butler.registry._fetch_run_dataset_ids(self.predicted.header.output_run)
252 )
253 kept: set[uuid.UUID] = set()
254 for dimensions, refs in self.refs_pending.items():
255 filtered_refs: list[DatasetRef] = []
256 for ref in refs:
257 if ref.id not in self.already_ingested:
258 kept.add(ref.id)
259 filtered_refs.append(ref)
260 else:
261 self.n_datasets_skipped += 1
262 self.refs_pending[dimensions] = filtered_refs
263 for datastore_name, datastore_records in list(self.records_pending.items()):
264 if (filtered_records := datastore_records.subset(kept)) is not None:
265 self.records_pending[datastore_name] = filtered_records
266 else:
267 del self.records_pending[datastore_name]
269 def update_outputs_pending(
270 self,
271 refs: list[DatasetRef],
272 records: dict[DatastoreName, DatastoreRecordData],
273 ) -> None:
274 """Add an ingest request to the pending-ingest data structures.
276 Parameters
277 ----------
278 refs : `list` [ `lsst.daf.butler.DatasetRef` ]
279 Registry information about regular quantum-output datasets.
280 records : `dict` [ `str`, \
281 `lsst.daf.butler.datastore.record_data.DatastoreRecordData` ]
282 Datastore information about the datasets.
283 """
284 n_given = len(refs)
285 if self.already_ingested is not None:
286 refs = [ref for ref in refs if ref.id not in self.already_ingested]
287 kept = {ref.id for ref in refs}
288 self.n_datasets_skipped += n_given - len(kept)
289 records = {
290 datastore_name: filtered_records
291 for datastore_name, original_records in records.items()
292 if (filtered_records := original_records.subset(kept)) is not None
293 }
294 for ref in refs:
295 self.refs_pending[ref.datasetType.dimensions].append(ref)
296 for datastore_name, datastore_records in records.items():
297 if (existing_records := self.records_pending.get(datastore_name)) is not None:
298 existing_records.update(datastore_records)
299 else:
300 self.records_pending[datastore_name] = datastore_records
302 def update_output_chain(self) -> None:
303 """Update the output CHAINED collection to include the output RUN
304 collection (and the inputs, if the output CHAINED collection does not
305 exist).
307 Notes
308 -----
309 This method cannot be called inside the registry caching context.
310 """
311 if self.predicted.header.output is None:
312 return
313 self.comms.log.info(
314 "Updating output collection %s to include %s.",
315 self.predicted.header.output,
316 self.predicted.header.output_run,
317 )
318 if self.butler.collections.register(self.predicted.header.output, CollectionType.CHAINED):
319 # Chain is new; need to add inputs, but we want to flatten them
320 # first.
321 if self.predicted.header.inputs:
322 flattened = self.butler.collections.query(self.predicted.header.inputs, flatten_chains=True)
323 self.butler.collections.extend_chain(self.predicted.header.output, flattened)
324 self.butler.collections.prepend_chain(self.predicted.header.output, self.predicted.header.output_run)