Coverage for python / lsst / pipe / base / quantum_graph / ingest_graph.py: 18%
181 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""A tool for ingesting provenance quantum graphs (written by the `aggregator`
29module) and [re-]ingesting other datasets (metadata/logs/configs) backed by the
30same file. This "finalizes" the RUN collection, prohibiting (at least
31conceptually) further processing.
33This always proceeds in three steps, so we can resume efficiently:
351. First we ask the butler to "forget" any metadata/log/config datasets that
36 exist in the output RUN collection, removing any record of them from the
37 butler database while preserving their files.
392. Next we ingest the ``run_provenance`` graph dataset itself.
413. Finally, in batches of quanta, we use a
42 `~lsst.daf.butler.QuantumBackedButler` to delete the original
43 metadata/log/config files and ingest new versions of those datasets into the
44 butler.
46Thus, at any point, if the ``run_provenance`` dataset has not been ingested,
47we know any metadata/log/config datasets that have been ingested are backed by
48the original files.
50Moreover, if the ``run_provenance`` dataset has been ingested, any existing
51metadata/log/config datasets must be backed by the graph file, and the original
52files for those datasets will have been deleted.
54We also know that at all times the metadata/log/config *content* is safely
55present in either the original files in the butler storage or in an
56already-ingested ``run_provenance`` dataset.
57"""
59from __future__ import annotations
61__all__ = ("ingest_graph",)
63import dataclasses
64import itertools
65import os
66import uuid
67from collections.abc import Iterator
68from contextlib import contextmanager
70from lsst.daf.butler import (
71 Butler,
72 Config,
73 DataCoordinate,
74 DatasetRef,
75 DatasetType,
76 FileDataset,
77 QuantumBackedButler,
78)
79from lsst.daf.butler.registry.sql_registry import SqlRegistry
80from lsst.resources import ResourcePath, ResourcePathExpression
81from lsst.utils.logging import getLogger
83from ..automatic_connection_constants import PROVENANCE_DATASET_TYPE_NAME, PROVENANCE_STORAGE_CLASS
84from ._provenance import (
85 ProvenanceDatasetInfo,
86 ProvenanceInitQuantumInfo,
87 ProvenanceQuantumGraph,
88 ProvenanceQuantumGraphReader,
89 ProvenanceQuantumInfo,
90)
91from .formatter import ProvenanceFormatter
93_LOG = getLogger(__name__)
96def ingest_graph(
97 butler_config: str | Config,
98 uri: ResourcePathExpression | None = None,
99 *,
100 transfer: str | None = "move",
101 batch_size: int = 10000,
102 output_run: str | None = None,
103) -> None:
104 """Ingest a provenance graph into a butler repository.
106 Parameters
107 ----------
108 butler_config : `str`
109 Path or alias for the butler repository, or a butler repository config
110 object.
111 uri : `lsst.resources.ResourcePathExpression` or `None`, optional
112 Location of the provenance quantum graph to ingest. `None` indicates
113 that the quantum graph has already been ingested, but other ingests
114 and/or deletions failed and need to be resumed.
115 transfer : `str` or `None`, optional
116 Transfer mode to use when ingesting graph. Matches those supported
117 by `lsst.resources.ResourcePath.transfer_from`.
118 batch_size : `int`, optional
119 Number of datasets to process in each transaction.
120 output_run : `str`, optional
121 Output `~lsst.daf.butler.CollectionType.RUN` collection name. Only
122 needs to be provided if ``uri`` is `None`. If it is provided the
123 output run in the graph is checked against it.
125 Notes
126 -----
127 After this operation, any further processing done in the
128 `~lsst.daf.butler.CollectionType.RUN` collection will not be included in
129 the provenance.
131 If this process is interrupted, it can pick up where it left off if run
132 again (at the cost of some duplicate work to figure out how much progress
133 it had made).
134 """
135 with _GraphIngester.open(butler_config, uri, output_run) as helper:
136 helper.fetch_already_ingested_datasets()
137 if not helper.graph_already_ingested:
138 assert uri is not None
139 helper.forget_ingested_datasets(batch_size=batch_size)
140 helper.ingest_graph_dataset(uri, transfer=transfer)
141 helper.clean_and_reingest_datasets(batch_size=batch_size)
142 if helper.directories_to_delete:
143 _LOG.info(
144 "Deleting %d directories after checking that they are empty.",
145 len(helper.directories_to_delete),
146 )
147 n_deleted: int = 0
148 for top in sorted(helper.directories_to_delete):
149 nonempty: set[str] = set()
150 for root, dirnames, filenames in os.walk(top, topdown=False):
151 if filenames:
152 nonempty.add(root)
153 for dirname in dirnames:
154 dirpath = os.path.join(root, dirname)
155 if dirpath in nonempty:
156 nonempty.add(root)
157 else:
158 os.rmdir(dirpath)
159 if nonempty:
160 _LOG.warning(
161 "Directory %r was not deleted because it unexpectedly still had files in it.",
162 top,
163 )
164 else:
165 os.rmdir(root)
166 n_deleted += 1
167 _LOG.info("Deleted %d directories.", n_deleted)
170@dataclasses.dataclass
171class _GraphIngester:
172 butler_config: str | Config
173 butler: Butler
174 graph: ProvenanceQuantumGraph
175 graph_already_ingested: bool
176 n_datasets: int
177 datasets_already_ingested: set[uuid.UUID] = dataclasses.field(default_factory=set)
178 directories_to_delete: set[str] = dataclasses.field(default_factory=set)
180 @property
181 def output_run(self) -> str:
182 return self.graph.header.output_run
184 @classmethod
185 @contextmanager
186 def open(
187 cls,
188 butler_config: str | Config,
189 uri: ResourcePathExpression | None,
190 output_run: str | None,
191 ) -> Iterator[_GraphIngester]:
192 with Butler.from_config(butler_config, collections=output_run, writeable=True) as butler:
193 butler.registry.registerDatasetType(
194 DatasetType(PROVENANCE_DATASET_TYPE_NAME, butler.dimensions.empty, PROVENANCE_STORAGE_CLASS)
195 )
196 graph, graph_already_ingested = cls.read_graph(butler, uri)
197 if output_run is not None and graph.header.output_run != output_run:
198 raise ValueError(
199 f"Given output run {output_run!r} does not match the graph "
200 f"header {graph.header.output_run!r}."
201 )
202 n_datasets = 2 * len(graph.quantum_only_xgraph) + len(graph.init_quanta)
203 yield cls(
204 butler_config=butler_config,
205 butler=butler,
206 graph=graph,
207 graph_already_ingested=graph_already_ingested,
208 n_datasets=n_datasets,
209 )
211 @staticmethod
212 def read_graph(
213 butler: Butler,
214 uri: ResourcePathExpression | None,
215 ) -> tuple[ProvenanceQuantumGraph, bool]:
216 if uri is not None:
217 _LOG.info("Reading the pre-ingest provenance graph.")
218 with ProvenanceQuantumGraphReader.open(uri) as reader:
219 reader.read_quanta()
220 reader.read_init_quanta()
221 graph = reader.graph
222 already_ingested = (
223 butler.find_dataset(PROVENANCE_DATASET_TYPE_NAME, collections=[graph.header.output_run])
224 is not None
225 )
226 return graph, already_ingested
227 else:
228 _LOG.info("Reading the already-ingested provenance graph.")
229 parameters = {"datasets": [], "read_init_quanta": True}
230 return butler.get(PROVENANCE_DATASET_TYPE_NAME, parameters=parameters), True
232 def fetch_already_ingested_datasets(self) -> None:
233 _LOG.info("Querying for existing datasets in %r.", self.output_run)
234 self.datasets_already_ingested.update(self.butler.registry._fetch_run_dataset_ids(self.output_run))
236 def iter_datasets(self) -> Iterator[tuple[uuid.UUID, ProvenanceDatasetInfo]]:
237 xgraph = self.graph.bipartite_xgraph
238 for task_label, quanta_for_task in self.graph.quanta_by_task.items():
239 _LOG.verbose(
240 "Batching up metadata and log datasets from %d %s quanta.", len(quanta_for_task), task_label
241 )
242 for quantum_id in quanta_for_task.values():
243 quantum_info: ProvenanceQuantumInfo = xgraph.nodes[quantum_id]
244 metadata_id = quantum_info["metadata_id"]
245 yield metadata_id, xgraph.nodes[metadata_id]
246 log_id = quantum_info["log_id"]
247 yield log_id, xgraph.nodes[log_id]
248 _LOG.verbose("Batching up config datasets from %d tasks.", len(self.graph.init_quanta))
249 for task_label, quantum_id in self.graph.init_quanta.items():
250 init_quantum_info: ProvenanceInitQuantumInfo = xgraph.nodes[quantum_id]
251 config_id = init_quantum_info["config_id"]
252 yield config_id, xgraph.nodes[config_id]
254 def forget_ingested_datasets(self, batch_size: int) -> None:
255 _LOG.info(
256 "Dropping database records for metadata/log/config datasets backed by their original files."
257 )
258 to_forget: list[DatasetRef] = []
259 n_forgotten: int = 0
260 n_skipped: int = 0
261 for dataset_id, dataset_info in self.iter_datasets():
262 if dataset_info["produced"] and dataset_id in self.datasets_already_ingested:
263 to_forget.append(self._make_ref_from_info(dataset_id, dataset_info))
264 self.datasets_already_ingested.remove(dataset_id)
265 if len(to_forget) >= batch_size:
266 n_forgotten += self._run_forget(to_forget, n_forgotten + n_skipped)
267 else:
268 n_skipped += 1
269 n_forgotten += self._run_forget(to_forget, n_forgotten + n_skipped)
270 _LOG.info(
271 "Removed database records for %d metadata/log/config datasets, while %d were already absent.",
272 n_forgotten,
273 n_skipped,
274 )
276 def _run_forget(self, to_forget: list[DatasetRef], n_current: int) -> int:
277 if to_forget:
278 _LOG.verbose(
279 "Forgetting a %d-dataset batch; %d/%d forgotten so far or already absent.",
280 len(to_forget),
281 n_current,
282 self.n_datasets,
283 )
284 with self.butler.registry.transaction():
285 self.butler._datastore.forget(to_forget)
286 self.butler.registry.removeDatasets(to_forget)
287 n = len(to_forget)
288 to_forget.clear()
289 return n
291 def ingest_graph_dataset(self, uri: ResourcePathExpression, transfer: str | None) -> None:
292 _LOG.info("Ingesting the provenance quantum graph.")
293 dataset_type = DatasetType(
294 PROVENANCE_DATASET_TYPE_NAME, self.butler.dimensions.empty, PROVENANCE_STORAGE_CLASS
295 )
296 self.butler.registry.registerDatasetType(dataset_type)
297 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(self.butler.dimensions), run=self.output_run)
298 uri = ResourcePath(uri)
299 self.butler.ingest(
300 # We use .abspath() since butler assumes paths are relative to the
301 # repo root, while users expects them to be relative to the CWD in
302 # this context.
303 FileDataset(refs=[ref], path=uri.abspath(), formatter=ProvenanceFormatter),
304 transfer=transfer,
305 )
307 def clean_and_reingest_datasets(self, batch_size: int) -> None:
308 _LOG.info(
309 "Deleting original metadata/log/config files and re-ingesting them with provenance graph backing."
310 )
311 direct_uri = self.butler.getURI(PROVENANCE_DATASET_TYPE_NAME, collections=[self.output_run])
312 qbb = self.make_qbb()
313 to_process: list[DatasetRef] = []
314 n_processed: int = 0
315 n_skipped: int = 0
316 n_not_produced: int = 0
317 for dataset_id, dataset_info in self.iter_datasets():
318 if not dataset_info["produced"]:
319 n_not_produced += 1
320 elif dataset_id not in self.datasets_already_ingested:
321 to_process.append(self._make_ref_from_info(dataset_id, dataset_info))
322 if len(to_process) >= batch_size:
323 n_processed += self._run_clean_and_ingest(
324 qbb, direct_uri, to_process, n_processed + n_skipped
325 )
326 else:
327 n_skipped += 1
328 n_processed += self._run_clean_and_ingest(qbb, direct_uri, to_process, n_processed + n_skipped)
329 _LOG.info(
330 "Deleted and re-ingested %d metadata/log/config datasets "
331 "(%d had already been processed, %d were not produced).",
332 n_processed,
333 n_skipped,
334 n_not_produced,
335 )
337 def _run_clean_and_ingest(
338 self, qbb: QuantumBackedButler, direct_uri: ResourcePath, to_process: list[DatasetRef], n_current: int
339 ) -> int:
340 if not to_process:
341 return 0
342 _LOG.verbose(
343 "Deleting and re-ingesting a %d-dataset batch; %d/%d complete.",
344 len(to_process),
345 n_current,
346 self.n_datasets,
347 )
348 sql_registry: SqlRegistry = self.butler._registry # type: ignore[attr-defined]
349 expanded_refs = sql_registry.expand_refs(to_process)
350 # We need to pass predict=True to keep QBB/FileDatastore from wasting
351 # time doing existence checks, since ResourcePath.mremove will ignore
352 # nonexistent files anyway.
353 original_uris = list(
354 itertools.chain.from_iterable(
355 ref_uris.iter_all() for ref_uris in qbb.get_many_uris(expanded_refs, predict=True).values()
356 )
357 )
358 removal_status = ResourcePath.mremove(original_uris, do_raise=False)
359 for path, status in removal_status.items():
360 if not status.success and not isinstance(status.exception, FileNotFoundError):
361 assert status.exception is not None, "Exception should be set if success=False."
362 status.exception.add_note(f"Attempting to delete original file at {path}.")
363 raise status.exception
364 file_dataset = FileDataset(refs=expanded_refs, path=direct_uri, formatter=ProvenanceFormatter)
365 self.butler.ingest(file_dataset, transfer=None)
366 if len(original_uris) == len(expanded_refs):
367 for uri, ref in zip(original_uris, expanded_refs):
368 if uri.isLocal:
369 if (
370 parent_dir := self.find_dataset_type_directory(uri.ospath, ref.datasetType.name)
371 ) is not None:
372 self.directories_to_delete.add(parent_dir)
373 elif any(uri.isLocal for uri in original_uris):
374 _LOG.warning(
375 "Not attempting to delete empty metadata/log/config directories because the number "
376 "of paths (%s) did not match the number of datasets (%s).",
377 len(original_uris),
378 len(expanded_refs),
379 )
380 n = len(to_process)
381 to_process.clear()
382 return n
384 @staticmethod
385 def _make_ref_from_info(dataset_id: uuid.UUID, dataset_info: ProvenanceDatasetInfo) -> DatasetRef:
386 return DatasetRef(
387 dataset_info["pipeline_node"].dataset_type,
388 dataset_info["data_id"],
389 run=dataset_info["run"],
390 id=dataset_id,
391 )
393 def make_qbb(self) -> QuantumBackedButler:
394 dataset_types = {d.name: d.dataset_type for d in self.graph.pipeline_graph.dataset_types.values()}
395 return QuantumBackedButler.from_predicted(
396 config=self.butler_config,
397 predicted_inputs=(),
398 predicted_outputs=(),
399 dimensions=self.butler.dimensions,
400 datastore_records={},
401 dataset_types=dataset_types,
402 )
404 def find_dataset_type_directory(self, ospath: str, dataset_type: str) -> str | None:
405 dir_components: list[str] = []
406 for component in os.path.dirname(ospath).split(os.path.sep):
407 dir_components.append(component)
408 # If the full dataset type name is in a single directory path
409 # component, we guess that directory can only have datasets of
410 # that type.
411 if dataset_type in component:
412 return os.path.sep.join(dir_components)
413 return None