Coverage for python / lsst / source / injection / bin / make_injection_pipeline.py: 20%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:38 +0000

1# This file is part of source_injection. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import logging 

25import os 

26import time 

27from argparse import SUPPRESS, ArgumentParser 

28 

29from ..utils import make_injection_pipeline 

30from .source_injection_help_formatter import SourceInjectionHelpFormatter 

31 

32 

33def build_argparser(): 

34 """Build an argument parser for this script.""" 

35 parser = ArgumentParser( 

36 description="""Make an expanded source injection pipeline. 

37 

38This script takes a reference pipeline definition file in YAML format and 

39prefixes all post-injection dataset type names with the injected prefix. If an 

40optional injection pipeline definition YAML file is also provided, the 

41injection task will be merged into the pipeline. 

42 

43Unless explicitly excluded, all subsets from the reference pipeline containing 

44the task which generates the injection dataset type will also be updated to 

45include the injection task. A series of new injection subsets will also be 

46constructed. These new subsets are copies of existent subsets, but with tasks 

47not directly impacted by source injection removed. Injected subsets will be the 

48original existent subset name with the 'injected_' prefix prepended. 

49 

50When the injection pipeline is constructed, a check on all existing pipeline 

51contracts is performed. If any contracts are violated, they are removed from 

52the pipeline. A warning is logged for each contract that is removed. 

53""", 

54 formatter_class=SourceInjectionHelpFormatter, 

55 epilog="More information is available at https://pipelines.lsst.io.", 

56 add_help=False, 

57 argument_default=SUPPRESS, 

58 ) 

59 parser.add_argument( 

60 "-t", 

61 "--dataset-type-name", 

62 type=str, 

63 help="Name of the dataset type being injected into.", 

64 required=True, 

65 metavar="TEXT", 

66 ) 

67 parser.add_argument( 

68 "-r", 

69 "--reference-pipeline", 

70 type=str, 

71 help="Location of a reference pipeline definition YAML file.", 

72 required=True, 

73 metavar="FILE", 

74 ) 

75 parser.add_argument( 

76 "-i", 

77 "--injection-pipeline", 

78 type=str, 

79 help="Location of an injection pipeline definition YAML file stub. If " 

80 "this is not explicitly provided, an attempt to infer the injection " 

81 "pipeline stub will be made using the injected dataset type name.", 

82 metavar="FILE", 

83 ) 

84 parser.add_argument( 

85 "-e", 

86 "--exclude-subsets", 

87 help="Do not update pipeline subsets to include the injection task.", 

88 action="store_true", 

89 ) 

90 parser.add_argument( 

91 "-x", 

92 "--excluded-tasks", 

93 type=str, 

94 help="Comma-separated set of task labels to exclude from the pipeline.", 

95 metavar="task", 

96 default="jointcal,gbdesAstrometricFit,fgcmBuildFromIsolatedStars,fgcmFitCycle,fgcmOutputProducts", 

97 ) 

98 parser.add_argument( 

99 "-f", 

100 "--filename", 

101 help="Path to save a modified pipeline definition YAML file.", 

102 metavar="FILE", 

103 ) 

104 parser.add_argument( 

105 "--overwrite", 

106 help="Overwrite the output saved pipeline definition file if it already exists.", 

107 action="store_true", 

108 ) 

109 parser.add_argument( 

110 "--prefix", 

111 type=str, 

112 help="Prefix to prepend to each affected post-injection dataset type name.", 

113 default="injected_", 

114 ) 

115 parser.add_argument( 

116 "--instrument", 

117 type=str, 

118 help="Add instrument overrides. Must be a fully qualified class name.", 

119 metavar="instrument", 

120 ) 

121 parser.add_argument( 

122 "-c", 

123 "--config", 

124 type=str, 

125 help="Config override for a task, in the format 'label:key=value'.", 

126 action="append", 

127 ) 

128 parser.add_argument( 

129 "-a", 

130 "--additional-pipelines", 

131 type=str, 

132 help="Location(s) of additional input pipeline definition YAML file(s)." 

133 "Tasks from these additional pipelines will be added to the output injection pipeline.", 

134 metavar="FILE", 

135 nargs="+", 

136 ) 

137 parser.add_argument( 

138 "-s", 

139 "--additional-subset", 

140 type=str, 

141 help="Subset for additional tasks, in the form 'name[:description]'." 

142 "All tasks from any additional pipelines will be added into this subset." 

143 "The subset will be created if it does not already exist." 

144 "Description text is optional, and ignored if the subset already exists." 

145 "This argument can be specified multiple times to add multiple subsets.", 

146 metavar="TEXT", 

147 action="append", 

148 ) 

149 parser.add_argument( 

150 "-h", 

151 "--help", 

152 action="help", 

153 help="Show this help message and exit.", 

154 ) 

155 return parser 

156 

157 

158def main(): 

159 """Use this as the main entry point when calling from the command line.""" 

160 # Set up logging. 

161 tz = time.strftime("%z") 

162 logging.basicConfig( 

163 format="%(levelname)s %(asctime)s.%(msecs)03d" + tz + " - %(message)s", 

164 datefmt="%Y-%m-%dT%H:%M:%S", 

165 ) 

166 logger = logging.getLogger(__name__) 

167 logger.setLevel(logging.DEBUG) 

168 

169 args = build_argparser().parse_args() 

170 if hasattr(args, "filename"): 

171 if os.path.exists(args.filename): 

172 if not hasattr(args, "overwrite"): 

173 raise RuntimeError(f"File {args.filename} already exists; use --overwrite to write anyway.") 

174 else: 

175 logger.warning("File %s already exists; overwriting.", args.filename) 

176 pipeline = make_injection_pipeline( 

177 **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} 

178 ) 

179 pipeline.write_to_uri(args.filename) 

180 logger.info( 

181 "Modified pipeline definition YAML file saved at %s.", 

182 os.path.realpath(args.filename), 

183 ) 

184 else: 

185 pipeline = make_injection_pipeline( 

186 **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} 

187 ) 

188 print("\n", pipeline, sep="")