Coverage for python / lsst / source / injection / bin / make_injection_pipeline.py: 20%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:57 +0000
1# This file is part of source_injection.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import logging
25import os
26import time
27from argparse import SUPPRESS, ArgumentParser
29from ..utils import make_injection_pipeline
30from .source_injection_help_formatter import SourceInjectionHelpFormatter
33def build_argparser():
34 """Build an argument parser for this script."""
35 parser = ArgumentParser(
36 description="""Make an expanded source injection pipeline.
38This script takes a reference pipeline definition file in YAML format and
39prefixes all post-injection dataset type names with the injected prefix. If an
40optional injection pipeline definition YAML file is also provided, the
41injection task will be merged into the pipeline.
43Unless explicitly excluded, all subsets from the reference pipeline containing
44the task which generates the injection dataset type will also be updated to
45include the injection task. A series of new injection subsets will also be
46constructed. These new subsets are copies of existent subsets, but with tasks
47not directly impacted by source injection removed. Injected subsets will be the
48original existent subset name with the 'injected_' prefix prepended.
50When the injection pipeline is constructed, a check on all existing pipeline
51contracts is performed. If any contracts are violated, they are removed from
52the pipeline. A warning is logged for each contract that is removed.
53""",
54 formatter_class=SourceInjectionHelpFormatter,
55 epilog="More information is available at https://pipelines.lsst.io.",
56 add_help=False,
57 argument_default=SUPPRESS,
58 )
59 parser.add_argument(
60 "-t",
61 "--dataset-type-name",
62 type=str,
63 help="Name of the dataset type being injected into.",
64 required=True,
65 metavar="TEXT",
66 )
67 parser.add_argument(
68 "-r",
69 "--reference-pipeline",
70 type=str,
71 help="Location of a reference pipeline definition YAML file.",
72 required=True,
73 metavar="FILE",
74 )
75 parser.add_argument(
76 "-i",
77 "--injection-pipeline",
78 type=str,
79 help="Location of an injection pipeline definition YAML file stub. If "
80 "this is not explicitly provided, an attempt to infer the injection "
81 "pipeline stub will be made using the injected dataset type name.",
82 metavar="FILE",
83 )
84 parser.add_argument(
85 "-e",
86 "--exclude-subsets",
87 help="Do not update pipeline subsets to include the injection task.",
88 action="store_true",
89 )
90 parser.add_argument(
91 "-x",
92 "--excluded-tasks",
93 type=str,
94 help="Comma-separated set of task labels to exclude from the pipeline.",
95 metavar="task",
96 default="jointcal,gbdesAstrometricFit,fgcmBuildFromIsolatedStars,fgcmFitCycle,fgcmOutputProducts",
97 )
98 parser.add_argument(
99 "-f",
100 "--filename",
101 help="Path to save a modified pipeline definition YAML file.",
102 metavar="FILE",
103 )
104 parser.add_argument(
105 "--overwrite",
106 help="Overwrite the output saved pipeline definition file if it already exists.",
107 action="store_true",
108 )
109 parser.add_argument(
110 "--prefix",
111 type=str,
112 help="Prefix to prepend to each affected post-injection dataset type name.",
113 default="injected_",
114 )
115 parser.add_argument(
116 "--instrument",
117 type=str,
118 help="Add instrument overrides. Must be a fully qualified class name.",
119 metavar="instrument",
120 )
121 parser.add_argument(
122 "-c",
123 "--config",
124 type=str,
125 help="Config override for a task, in the format 'label:key=value'.",
126 action="append",
127 )
128 parser.add_argument(
129 "-a",
130 "--additional-pipelines",
131 type=str,
132 help="Location(s) of additional input pipeline definition YAML file(s)."
133 "Tasks from these additional pipelines will be added to the output injection pipeline.",
134 metavar="FILE",
135 nargs="+",
136 )
137 parser.add_argument(
138 "-s",
139 "--additional-subset",
140 type=str,
141 help="Subset for additional tasks, in the form 'name[:description]'."
142 "All tasks from any additional pipelines will be added into this subset."
143 "The subset will be created if it does not already exist."
144 "Description text is optional, and ignored if the subset already exists."
145 "This argument can be specified multiple times to add multiple subsets.",
146 metavar="TEXT",
147 action="append",
148 )
149 parser.add_argument(
150 "-h",
151 "--help",
152 action="help",
153 help="Show this help message and exit.",
154 )
155 return parser
158def main():
159 """Use this as the main entry point when calling from the command line."""
160 # Set up logging.
161 tz = time.strftime("%z")
162 logging.basicConfig(
163 format="%(levelname)s %(asctime)s.%(msecs)03d" + tz + " - %(message)s",
164 datefmt="%Y-%m-%dT%H:%M:%S",
165 )
166 logger = logging.getLogger(__name__)
167 logger.setLevel(logging.DEBUG)
169 args = build_argparser().parse_args()
170 if hasattr(args, "filename"):
171 if os.path.exists(args.filename):
172 if not hasattr(args, "overwrite"):
173 raise RuntimeError(f"File {args.filename} already exists; use --overwrite to write anyway.")
174 else:
175 logger.warning("File %s already exists; overwriting.", args.filename)
176 pipeline = make_injection_pipeline(
177 **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]}
178 )
179 pipeline.write_to_uri(args.filename)
180 logger.info(
181 "Modified pipeline definition YAML file saved at %s.",
182 os.path.realpath(args.filename),
183 )
184 else:
185 pipeline = make_injection_pipeline(
186 **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]}
187 )
188 print("\n", pipeline, sep="")