Coverage for python / lsst / pipe / base / quantum_graph / aggregator / __init__.py: 100%
5 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""A tool for aggregating provenance information, logs, and metadata from a
29processing run into a `ProvenanceQuantumGraph`.
31The aggregator uses multiple processes (typically) or threads to gather
32information about a processing run that has already completed, ingest the
33output datasets into a butler repository, and write provenance information to
34disk.
36The only public entry point is the `aggregate_graph` function, with all options
37controlled by an `AggregatorConfig` instance.
38Usually the tool is invoked from the command line as
39``butler aggregate-graph``.
41Most log messages are sent to the ``aggregate-graph`` log, which users may want
42to set to ``VERBOSE`` to get additional detail, including periodic logging.
43See `AggregatorConfig` for how to enable additional logging by each process.
44"""
46from __future__ import annotations
48__all__ = ("AggregatorConfig", "FatalWorkerError", "aggregate_graph")
50from ._config import AggregatorConfig
51from ._supervisor import aggregate_graph
52from ._communicators import FatalWorkerError
54#
55# Aggregator Design Notes and Package Layout
56# ==========================================
57#
58# The aggregator tool is designed to run in multiple processes. Using threads
59# instead is supported, but mostly as a way to leverage the GIL to (mostly)
60# only occupy a single core without having to maintain two largely distinct
61# implementations. Throughout the rest of these notes I'll just assume they're
62# processes.
63#
64# There are four types of processes, each represented in code as a class in its
65# own module:
66#
67# - The Supervisor runs in the main process and is responsible for interacting
68# with the user (reporting progress, handling KeyboardInterrupt) and
69# traversing the predicted quantum graph.
70#
71# - Scanner workers are responsible for reading log and metadata files using a
72# QBB, occasionally checking for the existence of other outputs (we can
73# usually get the information about which outputs exist from the metadata)
74# and sending the information we extract from them to other workers. Scanners
75# are the only kind of worker we can add more of when we are given more
76# cores, so we try to do as much pre-processing as possible on them (e.g.
77# compression, datastore record extraction).
78#
79# - The ingester worker is responsible for ingesting output datasets into the
80# central butler repository. It batches up ingest requests from the scanners
81# until the batch size exceeds a configurable limit (e.g. 10k datasets), and
82# then ingests those pending requests in a single transaction.
83#
84# - The writer worker is responsible for actually writing the
85# `ProvenanceQuantumGraph` file. Unlike the other workers, it needs to read
86# almost the entire predicted quantum graph file before it starts, so it can
87# take a few minutes before it starts its main loop of accepting write
88# requests from the scanners (most of the time) and the supervisor (only for
89# blocked quanta, which we don't bother to scan). For each of the big
90# multi-block files (datasets, quanta, logs, metadata) the writes opens and
91# writes to a temporary file, which is moved into the provenance QG zip
92# archive at the end. In addition, to try to keep expensive operations on
93# the scanners, the writer builds a ZStandard compression dictionary from the
94# first write requests it sees, and then sends that to the scanners so they
95# can take over doing the compression after that is complete. Since writing
96# the provenance graph is (at present) optional, sometimes there is no writer
97# worker.
98#
99# Each process is also associated with its own "communicator" class, which are
100# all defined together in the _communicators.py file. The communicators manage
101# the queues that are used to send information between processes, as well as
102# providing loggers and some other common utility functionality. Communicators
103# are context managers that take care of cleanly shutting down each process
104# when it completes or is interrupted; `multiprocessing.Queue` objects need to
105# be cleared out carefully or you get deadlocks that prevent processes from
106# actually exiting, and so a lot of the work of the communicators involves
107# passing sentinal objects over those queues so we can identify when they
108# really are empty.
109#
110# The other modules in the packages include:
111#
112# - _config.py: the `AggregatorConfig` model, which is essentially the public
113# interface for controlling the aggregator.
114#
115# - _progress.py: helper objects for [periodic] logging and tqdm progress bars.
116# It's not clear the tqdm stuff should continue to exist once development of
117# the tool is complete and we stop wanting to run it from an interactive
118# terminal, but it's certainly been useful for development so far.
119#
120# - _structs.py: simple dataclasses used by multiple workers (most of what we
121# put on the queues to request scans/ingests/writes);
122#
123# The main logger used by the aggregator is called
124# ``lsst.pipe.base.quantum_graph.aggregator``, and it emits at ``VERBOSE`` and
125# above. At ``VERBOSE``, it's also used for periodic reporting of status
126# (unless the tqdm progress bars are enabled instead). Each worker also has its
127# own log (including the supervisor) with names like
128# ``lsst.pipe.base.quantum_graph.aggregator.scanner-012`` that include
129# additional information at ``DEBUG`` and above, but these are *not* propagated
130# up; instead there's a config option to write per-worker logs to separate
131# files in a directory. It would probably be more idiomatic to set up a
132# QueueHandler to feed all the LogRecords the supervisor and use Python logging
133# configuration to send them wherever desired, but::
134#
135# - that seemed to require more wading into `lsst.daf.butler.cli.cliLog` a lot
136# more than I cared to (especially considering that we want this to work in
137# `threading` and `multiprocessing`, and logging configuration is very
138# different in those two contexts);
139#
140# - having the worker logs go to separate files is actually very nice, and it's
141# more efficient if they just do that themselves, and that's not something
142# our logging CLI can actually do, AFAICT.