Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _config.py: 98%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("AggregatorConfig",)
32import sys
33from typing import TYPE_CHECKING, Any
35import pydantic
38class AggregatorConfig(pydantic.BaseModel):
39 """Configuration for the provenance aggregator."""
41 output_path: str | None = None
42 """Path for the output provenance quantum graph file.
44 At present this option is intended only for debugging.
45 """
47 worker_log_dir: str | None = None
48 """Path to a directory (POSIX only) for parallel worker logs."""
50 worker_log_level: str = "VERBOSE"
51 """Log level for worker processes/threads.
53 Per-quantum messages only appear at ``DEBUG`` level.
54 """
56 worker_profile_dir: str | None = None
57 """Path to a directory (POSIX only) for parallel worker profiling dumps.
59 This option is ignored when `n_processes` is `1`.
60 """
62 n_processes: int = 1
63 """Number of processes the scanner should use."""
65 incomplete: bool = False
66 """If `True`, do not expect the graph to have been executed to completion
67 yet, and only ingest the outputs of successful quanta.
69 This disables writing the provenance quantum graph, since this is likely to
70 be wasted effort that just complicates a follow-up run with
71 ``incomplete=False`` later.
72 """
74 defensive_ingest: bool = False
75 """If `True`, guard against datasets having already been ingested into the
76 central butler repository.
78 Defensive ingest mode is automatically turned on (with a warning emitted)
79 if an ingest attempt fails due to a database constraint violation. Enabling
80 defensive mode up-front avoids this warning and is slightly more efficient
81 when it is already known that some datasets have already been ingested.
83 Defensive mode does not guard against race conditions from multiple ingest
84 processes running simultaneously, as it relies on a one-time query to
85 determine what is already present in the central repository.
86 """
88 ingest_batch_size: int = 10000
89 """Number of butler datasets that must accumulate to trigger an ingest."""
91 register_dataset_types: bool = True
92 """Whether to register output dataset types in the central butler
93 repository before starting ingest.
94 """
96 update_output_chain: bool = True
97 """Whether to prepend the output `~lsst.daf.butler.CollectionType.RUN` to
98 the output `~lsst.daf.butler.CollectionType.CHAINED` collection.
99 """
101 dry_run: bool = False
102 """If `True`, do not actually perform any central butler ingests.
104 Most log messages concerning ingests will still be emitted in order to
105 provide a better emulation of a real run.
106 """
108 interactive_status: bool = False
109 """Whether to use an interactive status display with progress bars.
111 If this is `True`, the `tqdm` module must be available. If this is
112 `False`, a periodic logger will be used to display status at a fixed
113 interval instead (see `log_status_interval`).
114 """
116 log_status_interval: float | None = None
117 """Interval (in seconds) between periodic logger status updates."""
119 worker_sleep: float = 0.01
120 """Time (in seconds) a worker should wait when there are no requests from
121 the main aggregator process.
122 """
124 zstd_level: int = 10
125 """ZStandard compression level to use for all compressed-JSON blocks."""
127 zstd_dict_size: int = 32768
128 """Size (in bytes) of the ZStandard compression dictionary."""
130 zstd_dict_n_inputs: int = 512
131 """Number of samples of each type (see below) to include in ZStandard
132 compression dictionary training.
134 Training is run on a random subset of the `PredictedQuantumDatasetsModel`
135 objects in the predicted graph, as well as the first provenance quanta,
136 logs, and metadata blocks encountered.
137 """
139 mock_storage_classes: bool = False
140 """Enable support for storage classes by created by the
141 lsst.pipe.base.tests.mocks package.
142 """
144 promise_ingest_graph: bool = False
145 """If `True`, the aggregator will assume that `~.ingest_graph.ingest_graph`
146 will be run later to ingest metadata/log/config datasets, and will not
147 ingest them itself. This means that if `~.ingest_graph.ingest_graph` is
148 not run, those files will be abandoned in the butler storage root without
149 being present in the butler database, but it will speed up both processes.
151 It is *usually* safe to build a quantum graph for downstream processing
152 before or while running `~.ingest_graph.ingest_graph`, because
153 metadata/log/config datasets are rarely used as inputs. To check, use
154 ``pipetask build ... --show inputs`` to show the overall-inputs to the
155 graph and scan for these dataset types.
156 """
158 worker_check_timeout: float = 5.0
159 """Time to wait (s) for reports from subprocesses before running
160 process-alive checks.
162 These checks are designed to kill the main aggregator process when a
163 subprocess has been unexpectedly killed (e.g. for for using too much
164 memory).
165 """
167 @property
168 def is_writing_provenance(self) -> bool:
169 """Whether the aggregator is configured to write the provenance quantum
170 graph.
171 """
172 return self.output_path is not None and not self.incomplete
174 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
175 # when we inherit those docstrings in our public classes.
176 if "sphinx" in sys.modules and not TYPE_CHECKING:
178 def copy(self, *args: Any, **kwargs: Any) -> Any:
179 """See `pydantic.BaseModel.copy`."""
180 return super().copy(*args, **kwargs)
182 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
183 """See `pydantic.BaseModel.model_dump`."""
184 return super().model_dump(*args, **kwargs)
186 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
187 """See `pydantic.BaseModel.model_dump_json`."""
188 return super().model_dump(*args, **kwargs)
190 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
191 """See `pydantic.BaseModel.model_copy`."""
192 return super().model_copy(*args, **kwargs)
194 @classmethod
195 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
196 """See `pydantic.BaseModel.model_construct`."""
197 return super().model_construct(*args, **kwargs)
199 @classmethod
200 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
201 """See `pydantic.BaseModel.model_json_schema`."""
202 return super().model_json_schema(*args, **kwargs)
204 @classmethod
205 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
206 """See `pydantic.BaseModel.model_validate`."""
207 return super().model_validate(*args, **kwargs)
209 @classmethod
210 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
211 """See `pydantic.BaseModel.model_validate_json`."""
212 return super().model_validate_json(*args, **kwargs)
214 @classmethod
215 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
216 """See `pydantic.BaseModel.model_validate_strings`."""
217 return super().model_validate_strings(*args, **kwargs)