Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _config.py: 98%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("AggregatorConfig",) 

31 

32import sys 

33from typing import TYPE_CHECKING, Any 

34 

35import pydantic 

36 

37 

38class AggregatorConfig(pydantic.BaseModel): 

39 """Configuration for the provenance aggregator.""" 

40 

41 output_path: str | None = None 

42 """Path for the output provenance quantum graph file. 

43 

44 At present this option is intended only for debugging. 

45 """ 

46 

47 worker_log_dir: str | None = None 

48 """Path to a directory (POSIX only) for parallel worker logs.""" 

49 

50 worker_log_level: str = "VERBOSE" 

51 """Log level for worker processes/threads. 

52 

53 Per-quantum messages only appear at ``DEBUG`` level. 

54 """ 

55 

56 worker_profile_dir: str | None = None 

57 """Path to a directory (POSIX only) for parallel worker profiling dumps. 

58 

59 This option is ignored when `n_processes` is `1`. 

60 """ 

61 

62 n_processes: int = 1 

63 """Number of processes the scanner should use.""" 

64 

65 incomplete: bool = False 

66 """If `True`, do not expect the graph to have been executed to completion 

67 yet, and only ingest the outputs of successful quanta. 

68 

69 This disables writing the provenance quantum graph, since this is likely to 

70 be wasted effort that just complicates a follow-up run with 

71 ``incomplete=False`` later. 

72 """ 

73 

74 defensive_ingest: bool = False 

75 """If `True`, guard against datasets having already been ingested into the 

76 central butler repository. 

77 

78 Defensive ingest mode is automatically turned on (with a warning emitted) 

79 if an ingest attempt fails due to a database constraint violation. Enabling 

80 defensive mode up-front avoids this warning and is slightly more efficient 

81 when it is already known that some datasets have already been ingested. 

82 

83 Defensive mode does not guard against race conditions from multiple ingest 

84 processes running simultaneously, as it relies on a one-time query to 

85 determine what is already present in the central repository. 

86 """ 

87 

88 ingest_batch_size: int = 10000 

89 """Number of butler datasets that must accumulate to trigger an ingest.""" 

90 

91 register_dataset_types: bool = True 

92 """Whether to register output dataset types in the central butler 

93 repository before starting ingest. 

94 """ 

95 

96 update_output_chain: bool = True 

97 """Whether to prepend the output `~lsst.daf.butler.CollectionType.RUN` to 

98 the output `~lsst.daf.butler.CollectionType.CHAINED` collection. 

99 """ 

100 

101 dry_run: bool = False 

102 """If `True`, do not actually perform any central butler ingests. 

103 

104 Most log messages concerning ingests will still be emitted in order to 

105 provide a better emulation of a real run. 

106 """ 

107 

108 interactive_status: bool = False 

109 """Whether to use an interactive status display with progress bars. 

110 

111 If this is `True`, the `tqdm` module must be available. If this is 

112 `False`, a periodic logger will be used to display status at a fixed 

113 interval instead (see `log_status_interval`). 

114 """ 

115 

116 log_status_interval: float | None = None 

117 """Interval (in seconds) between periodic logger status updates.""" 

118 

119 worker_sleep: float = 0.01 

120 """Time (in seconds) a worker should wait when there are no requests from 

121 the main aggregator process. 

122 """ 

123 

124 zstd_level: int = 10 

125 """ZStandard compression level to use for all compressed-JSON blocks.""" 

126 

127 zstd_dict_size: int = 32768 

128 """Size (in bytes) of the ZStandard compression dictionary.""" 

129 

130 zstd_dict_n_inputs: int = 512 

131 """Number of samples of each type (see below) to include in ZStandard 

132 compression dictionary training. 

133 

134 Training is run on a random subset of the `PredictedQuantumDatasetsModel` 

135 objects in the predicted graph, as well as the first provenance quanta, 

136 logs, and metadata blocks encountered. 

137 """ 

138 

139 mock_storage_classes: bool = False 

140 """Enable support for storage classes by created by the 

141 lsst.pipe.base.tests.mocks package. 

142 """ 

143 

144 promise_ingest_graph: bool = False 

145 """If `True`, the aggregator will assume that `~.ingest_graph.ingest_graph` 

146 will be run later to ingest metadata/log/config datasets, and will not 

147 ingest them itself. This means that if `~.ingest_graph.ingest_graph` is 

148 not run, those files will be abandoned in the butler storage root without 

149 being present in the butler database, but it will speed up both processes. 

150 

151 It is *usually* safe to build a quantum graph for downstream processing 

152 before or while running `~.ingest_graph.ingest_graph`, because 

153 metadata/log/config datasets are rarely used as inputs. To check, use 

154 ``pipetask build ... --show inputs`` to show the overall-inputs to the 

155 graph and scan for these dataset types. 

156 """ 

157 

158 worker_check_timeout: float = 5.0 

159 """Time to wait (s) for reports from subprocesses before running 

160 process-alive checks. 

161 

162 These checks are designed to kill the main aggregator process when a 

163 subprocess has been unexpectedly killed (e.g. for for using too much 

164 memory). 

165 """ 

166 

167 @property 

168 def is_writing_provenance(self) -> bool: 

169 """Whether the aggregator is configured to write the provenance quantum 

170 graph. 

171 """ 

172 return self.output_path is not None and not self.incomplete 

173 

174 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

175 # when we inherit those docstrings in our public classes. 

176 if "sphinx" in sys.modules and not TYPE_CHECKING: 

177 

178 def copy(self, *args: Any, **kwargs: Any) -> Any: 

179 """See `pydantic.BaseModel.copy`.""" 

180 return super().copy(*args, **kwargs) 

181 

182 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

183 """See `pydantic.BaseModel.model_dump`.""" 

184 return super().model_dump(*args, **kwargs) 

185 

186 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

187 """See `pydantic.BaseModel.model_dump_json`.""" 

188 return super().model_dump(*args, **kwargs) 

189 

190 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

191 """See `pydantic.BaseModel.model_copy`.""" 

192 return super().model_copy(*args, **kwargs) 

193 

194 @classmethod 

195 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

196 """See `pydantic.BaseModel.model_construct`.""" 

197 return super().model_construct(*args, **kwargs) 

198 

199 @classmethod 

200 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

201 """See `pydantic.BaseModel.model_json_schema`.""" 

202 return super().model_json_schema(*args, **kwargs) 

203 

204 @classmethod 

205 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

206 """See `pydantic.BaseModel.model_validate`.""" 

207 return super().model_validate(*args, **kwargs) 

208 

209 @classmethod 

210 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

211 """See `pydantic.BaseModel.model_validate_json`.""" 

212 return super().model_validate_json(*args, **kwargs) 

213 

214 @classmethod 

215 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

216 """See `pydantic.BaseModel.model_validate_strings`.""" 

217 return super().model_validate_strings(*args, **kwargs)