Coverage for python / lsst / pipe / base / quantum_graph / aggregator / __init__.py: 100%

5 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""A tool for aggregating provenance information, logs, and metadata from a 

29processing run into a `ProvenanceQuantumGraph`. 

30 

31The aggregator uses multiple processes (typically) or threads to gather 

32information about a processing run that has already completed, ingest the 

33output datasets into a butler repository, and write provenance information to 

34disk. 

35 

36The only public entry point is the `aggregate_graph` function, with all options 

37controlled by an `AggregatorConfig` instance. 

38Usually the tool is invoked from the command line as 

39``butler aggregate-graph``. 

40 

41Most log messages are sent to the ``aggregate-graph`` log, which users may want 

42to set to ``VERBOSE`` to get additional detail, including periodic logging. 

43See `AggregatorConfig` for how to enable additional logging by each process. 

44""" 

45 

46from __future__ import annotations 

47 

48__all__ = ("AggregatorConfig", "FatalWorkerError", "aggregate_graph") 

49 

50from ._config import AggregatorConfig 

51from ._supervisor import aggregate_graph 

52from ._communicators import FatalWorkerError 

53 

54# 

55# Aggregator Design Notes and Package Layout 

56# ========================================== 

57# 

58# The aggregator tool is designed to run in multiple processes. Using threads 

59# instead is supported, but mostly as a way to leverage the GIL to (mostly) 

60# only occupy a single core without having to maintain two largely distinct 

61# implementations. Throughout the rest of these notes I'll just assume they're 

62# processes. 

63# 

64# There are four types of processes, each represented in code as a class in its 

65# own module: 

66# 

67# - The Supervisor runs in the main process and is responsible for interacting 

68# with the user (reporting progress, handling KeyboardInterrupt) and 

69# traversing the predicted quantum graph. 

70# 

71# - Scanner workers are responsible for reading log and metadata files using a 

72# QBB, occasionally checking for the existence of other outputs (we can 

73# usually get the information about which outputs exist from the metadata) 

74# and sending the information we extract from them to other workers. Scanners 

75# are the only kind of worker we can add more of when we are given more 

76# cores, so we try to do as much pre-processing as possible on them (e.g. 

77# compression, datastore record extraction). 

78# 

79# - The ingester worker is responsible for ingesting output datasets into the 

80# central butler repository. It batches up ingest requests from the scanners 

81# until the batch size exceeds a configurable limit (e.g. 10k datasets), and 

82# then ingests those pending requests in a single transaction. 

83# 

84# - The writer worker is responsible for actually writing the 

85# `ProvenanceQuantumGraph` file. Unlike the other workers, it needs to read 

86# almost the entire predicted quantum graph file before it starts, so it can 

87# take a few minutes before it starts its main loop of accepting write 

88# requests from the scanners (most of the time) and the supervisor (only for 

89# blocked quanta, which we don't bother to scan). For each of the big 

90# multi-block files (datasets, quanta, logs, metadata) the writes opens and 

91# writes to a temporary file, which is moved into the provenance QG zip 

92# archive at the end. In addition, to try to keep expensive operations on 

93# the scanners, the writer builds a ZStandard compression dictionary from the 

94# first write requests it sees, and then sends that to the scanners so they 

95# can take over doing the compression after that is complete. Since writing 

96# the provenance graph is (at present) optional, sometimes there is no writer 

97# worker. 

98# 

99# Each process is also associated with its own "communicator" class, which are 

100# all defined together in the _communicators.py file. The communicators manage 

101# the queues that are used to send information between processes, as well as 

102# providing loggers and some other common utility functionality. Communicators 

103# are context managers that take care of cleanly shutting down each process 

104# when it completes or is interrupted; `multiprocessing.Queue` objects need to 

105# be cleared out carefully or you get deadlocks that prevent processes from 

106# actually exiting, and so a lot of the work of the communicators involves 

107# passing sentinal objects over those queues so we can identify when they 

108# really are empty. 

109# 

110# The other modules in the packages include: 

111# 

112# - _config.py: the `AggregatorConfig` model, which is essentially the public 

113# interface for controlling the aggregator. 

114# 

115# - _progress.py: helper objects for [periodic] logging and tqdm progress bars. 

116# It's not clear the tqdm stuff should continue to exist once development of 

117# the tool is complete and we stop wanting to run it from an interactive 

118# terminal, but it's certainly been useful for development so far. 

119# 

120# - _structs.py: simple dataclasses used by multiple workers (most of what we 

121# put on the queues to request scans/ingests/writes); 

122# 

123# The main logger used by the aggregator is called 

124# ``lsst.pipe.base.quantum_graph.aggregator``, and it emits at ``VERBOSE`` and 

125# above. At ``VERBOSE``, it's also used for periodic reporting of status 

126# (unless the tqdm progress bars are enabled instead). Each worker also has its 

127# own log (including the supervisor) with names like 

128# ``lsst.pipe.base.quantum_graph.aggregator.scanner-012`` that include 

129# additional information at ``DEBUG`` and above, but these are *not* propagated 

130# up; instead there's a config option to write per-worker logs to separate 

131# files in a directory. It would probably be more idiomatic to set up a 

132# QueueHandler to feed all the LogRecords the supervisor and use Python logging 

133# configuration to send them wherever desired, but:: 

134# 

135# - that seemed to require more wading into `lsst.daf.butler.cli.cliLog` a lot 

136# more than I cared to (especially considering that we want this to work in 

137# `threading` and `multiprocessing`, and logging configuration is very 

138# different in those two contexts); 

139# 

140# - having the worker logs go to separate files is actually very nice, and it's 

141# more efficient if they just do that themselves, and that's not something 

142# our logging CLI can actually do, AFAICT.