Coverage for python / lsst / source / injection / utils / _make_injection_pipeline.py: 4%
161 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:34 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:34 +0000
1# This file is part of source_injection.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["make_injection_pipeline"]
26import itertools
27import logging
29from lsst.analysis.tools.interfaces import AnalysisPipelineTask
30from lsst.pipe.base import LabelSpecifier, Pipeline
31from lsst.pipe.base.pipelineIR import ContractError
34def _parse_config_override(config_override: str) -> tuple[str, str, str]:
35 """Parse a config override string into a label, a key and a value.
37 Parameters
38 ----------
39 config_override : `str`
40 Config override string to parse.
42 Returns
43 -------
44 label : `str`
45 Label to override.
46 key : `str`
47 Key to override.
48 value : `str`
49 Value to override.
51 Raises
52 ------
53 TypeError
54 If the config override string cannot be parsed.
55 """
56 try:
57 label, keyvalue = config_override.split(":", 1)
58 except ValueError:
59 raise TypeError(
60 f"Unrecognized syntax for option 'config': '{config_override}' (does not match pattern "
61 "(?P<label>.+):(?P<value>.+=.+))"
62 ) from None
63 try:
64 key, value = keyvalue.split("=", 1)
65 except ValueError as e:
66 raise TypeError(
67 f"Could not parse key-value pair '{config_override}' using separator '=', with multiple values "
68 f"not allowed: {e}"
69 ) from None
70 return label, key, value
73def make_injection_pipeline(
74 dataset_type_name: str,
75 reference_pipeline: Pipeline | str,
76 injection_pipeline: Pipeline | str | None = None,
77 exclude_subsets: bool = False,
78 excluded_tasks: set[str] | str = {
79 "jointcal",
80 "gbdesAstrometricFit",
81 "fgcmBuildFromIsolatedStars",
82 "fgcmFitCycle",
83 "fgcmOutputProducts",
84 },
85 prefix: str = "injected_",
86 instrument: str | None = None,
87 config: str | list[str] | None = None,
88 additional_pipelines: list[Pipeline] | list[str] | None = None,
89 additional_subset: list[str] | None = None,
90 log_level: int = logging.INFO,
91) -> Pipeline:
92 """Make an expanded source injection pipeline.
94 This function takes a reference pipeline definition file in YAML format and
95 prefixes all post-injection dataset type names with the injected prefix. If
96 an optional injection pipeline definition YAML file is also provided, the
97 injection task will be merged into the pipeline.
99 Unless explicitly excluded, all subsets from the reference pipeline
100 containing the task which generates the injection dataset type will also be
101 updated to include the injection task. A series of new injected subsets
102 will also be created. These new subsets are copies of existent subsets, but
103 containing only the tasks which are affected by source injection. New
104 injected subsets will be the original subset name with the prefix
105 'injected_' prepended.
107 When the injection pipeline is constructed, a check on all existing
108 pipeline contracts is performed. If any contracts are violated, they are
109 removed from the pipeline. A warning is logged for each contract that is
110 removed.
112 Parameters
113 ----------
114 dataset_type_name : `str`
115 Name of the dataset type being injected into.
116 reference_pipeline : Pipeline | `str`
117 Location of a reference pipeline definition YAML file.
118 injection_pipeline : Pipeline | `str`, optional
119 Location of an injection pipeline definition YAML file stub. If not
120 provided, an attempt to infer the injection pipeline will be made based
121 on the injected dataset type name.
122 exclude_subsets : `bool`, optional
123 If True, do not update pipeline subsets to include the injection task.
124 excluded_tasks : `set` [`str`] | `str`
125 Set or comma-separated string of task labels to exclude from the
126 injection pipeline.
127 prefix : `str`, optional
128 Prefix to prepend to each affected post-injection dataset type name.
129 instrument : `str`, optional
130 Add instrument overrides. Must be a fully qualified class name.
131 config : `str` | `list` [`str`], optional
132 Config override for a task, in the format 'label:key=value'.
133 additional_pipelines: `list`[Pipeline] | `list`[`str`]
134 Location(s) of additional input pipeline definition YAML file(s).
135 Tasks from these additional pipelines will be added to the output
136 injection pipeline.
137 additional_subset: `list`[`str`] | `str`, optional
138 A list of subset definitions in the form
139 "subset_name[:subset_description]".
140 These subsets will be created if they don't already exist.
141 All tasks from additional_pipelines will be added to these subsets.
142 log_level : `int`, optional
143 The log level to use for logging.
145 Returns
146 -------
147 pipeline : `lsst.pipe.base.Pipeline`
148 An expanded source injection pipeline.
149 """
150 # Instantiate logger.
151 logger = logging.getLogger(__name__)
152 logger.setLevel(log_level)
154 retry_config_overrides = []
155 # Load the pipeline and apply config overrides, if supplied.
156 if isinstance(reference_pipeline, str):
157 pipeline = Pipeline.fromFile(reference_pipeline)
158 else:
159 pipeline = reference_pipeline
160 if config:
161 if isinstance(config, str):
162 config = [config]
163 for conf in config:
164 config_label, config_key, config_value = _parse_config_override(conf)
165 try:
166 pipeline.addConfigOverride(config_label, config_key, config_value)
167 except LookupError:
168 logger.debug(
169 "Config override '%s' for label '%s' not found in the reference "
170 "pipeline, either due to a typo or the label not existing in "
171 "the reference pipeline. Retrying after the injection task is added.",
172 conf,
173 config_label,
174 )
175 retry_config_overrides.append([config_label, config_key, config_value])
177 # Add an instrument override, if provided.
178 if instrument:
179 pipeline.addInstrument(instrument)
181 # Remove all tasks which are not to be included in the injection pipeline.
182 if isinstance(excluded_tasks, str):
183 excluded_tasks = set(excluded_tasks.split(","))
184 all_tasks = set(pipeline.task_labels)
185 preserved_tasks = all_tasks - excluded_tasks
186 label_specifier = LabelSpecifier(labels=preserved_tasks)
187 # EDIT mode removes tasks from parent subsets but keeps the subset itself.
188 pipeline = pipeline.subsetFromLabels(label_specifier, pipeline.PipelineSubsetCtrl.EDIT)
189 if len(not_found_tasks := excluded_tasks - all_tasks) > 0:
190 grammar = "Task" if len(not_found_tasks) == 1 else "Tasks"
191 logger.warning(
192 "%s marked for exclusion not found in the reference pipeline: %s.",
193 grammar,
194 ", ".join(sorted(not_found_tasks)),
195 )
197 # Check for any empty subsets and remove them.
198 removed_subsets = set()
199 for subset_label, subset_tasks in pipeline.subsets.items():
200 if not subset_tasks:
201 removed_subsets.add(subset_label)
202 pipeline.removeLabeledSubset(subset_label)
203 if (removed_subsets_count := len(removed_subsets)) > 0:
204 grammar = "subset" if removed_subsets_count == 1 else "subsets"
205 logger.warning(
206 "Removed %d empty %s from the pipeline: %s.",
207 removed_subsets_count,
208 grammar,
209 ", ".join(sorted(removed_subsets)),
210 )
212 # Determine the set of dataset type names affected by source injection.
213 injected_tasks = set()
214 all_connection_type_names = set()
215 injected_types = {dataset_type_name}
216 precursor_injection_task_labels = set()
217 # Loop over all tasks in the pipeline.
218 for task_node in pipeline.to_graph().tasks.values():
219 # Add override for Analysis Tools task outputs (but not inputs).
220 # Connections in Analysis Tools are dynamically assigned, and so are
221 # not able to be modified in the same way as a static connection.
222 # Instead, we add an override to the connections.outputName field.
223 # This field is prepended to all Analysis Tools connections, and so
224 # will prepend the injection prefix to all plot/metric outputs.
225 if isAnalysisPipelineTask := issubclass(task_node.task_class, AnalysisPipelineTask):
226 pipeline.addConfigOverride(
227 task_node.label,
228 "connections.outputName",
229 prefix + task_node.config.connections.outputName,
230 )
232 input_types = {
233 read_edge.parent_dataset_type_name
234 for read_edge in itertools.chain(task_node.inputs.values(), task_node.init.inputs.values())
235 }
236 output_types = {
237 write_edge.parent_dataset_type_name
238 for write_edge in itertools.chain(task_node.outputs.values(), task_node.init.outputs.values())
239 }
241 all_connection_type_names |= input_types | output_types
242 # Identify the precursor task: allows appending inject task to subset.
243 if dataset_type_name in output_types:
244 precursor_injection_task_labels.add(task_node.label)
245 # If the task has any injected dataset type names as inputs, add the
246 # task to a set of tasks touched by injection, and add all of the
247 # outputs of this task to the set of injected types.
248 if len(input_types & injected_types) > 0:
249 injected_tasks |= {task_node.label}
250 injected_types |= output_types
251 # Add the injection prefix to all affected dataset type names.
252 for edge in itertools.chain(
253 task_node.init.inputs.values(),
254 task_node.inputs.values(),
255 task_node.init.outputs.values(),
256 task_node.outputs.values(),
257 ):
258 # Continue if this is an analysis task and edge is an output.
259 if isAnalysisPipelineTask and (
260 edge in set(task_node.init.outputs.values()) | set(task_node.outputs.values())
261 ):
262 continue
263 if hasattr(task_node.config.connections.ConnectionsClass, edge.connection_name):
264 # If the connection type is not dynamic, modify as usual.
265 if edge.parent_dataset_type_name in injected_types:
266 pipeline.addConfigOverride(
267 task_node.label,
268 "connections." + edge.connection_name,
269 prefix + edge.dataset_type_name,
270 )
271 else:
272 # Add log warning if the connection type is dynamic.
273 logger.warning(
274 "Dynamic connection %s in task %s is not supported here. This connection will "
275 "neither be modified nor merged into the output injection pipeline.",
276 edge.connection_name,
277 task_node.label,
278 )
279 # Raise if the injected dataset type does not exist in the pipeline.
280 if dataset_type_name not in all_connection_type_names:
281 raise RuntimeError(
282 f"Dataset type '{dataset_type_name}' not found in the reference pipeline; "
283 "no connection type edits to be made."
284 )
286 # Attempt to infer the injection pipeline from the dataset type name.
287 if not injection_pipeline:
288 match dataset_type_name:
289 case "postISRCCD" | "post_isr_image":
290 injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_exposure.yaml"
291 case "icExp" | "calexp" | "initial_pvi" | "pvi" | "preliminary_visit_image" | "visit_image":
292 injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_visit.yaml"
293 case (
294 "deepCoadd"
295 | "deepCoadd_calexp"
296 | "goodSeeingCoadd"
297 | "deep_coadd_predetection"
298 | "deep_coadd"
299 | "template_coadd"
300 ):
301 injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_coadd.yaml"
302 case _:
303 # Print a warning rather than a raise, as the user may wish to
304 # edit connection names without merging an injection pipeline.
305 logger.warning(
306 "Unable to infer injection pipeline stub from dataset type name '%s' and none was "
307 "provided. No injection pipeline will be merged into the output pipeline.",
308 dataset_type_name,
309 )
310 if injection_pipeline:
311 logger.info(
312 "Injected dataset type '%s' used to infer injection pipeline: %s",
313 dataset_type_name,
314 injection_pipeline,
315 )
317 # Merge the injection pipeline to the modified pipeline, if provided.
318 if injection_pipeline:
319 if isinstance(injection_pipeline, str):
320 injection_pipeline = Pipeline.fromFile(injection_pipeline)
321 if len(injection_pipeline) != 1:
322 raise RuntimeError(
323 f"The injection pipeline contains {len(injection_pipeline)} tasks; only 1 task is allowed."
324 )
325 pipeline.mergePipeline(injection_pipeline)
326 # Loop over all injection tasks and modify the connection names.
327 for injection_task_label in injection_pipeline.task_labels:
328 injected_tasks.add(injection_task_label)
329 pipeline.addConfigOverride(injection_task_label, "connections.input_exposure", dataset_type_name)
330 pipeline.addConfigOverride(
331 injection_task_label, "connections.output_exposure", prefix + dataset_type_name
332 )
333 pipeline.addConfigOverride(
334 injection_task_label, "connections.output_catalog", prefix + dataset_type_name + "_catalog"
335 )
336 # Optionally update subsets to include the injection task.
337 if not exclude_subsets:
338 for label in precursor_injection_task_labels:
339 precursor_subsets = pipeline.findSubsetsWithLabel(label)
340 for subset in precursor_subsets:
341 pipeline.addLabelToSubset(subset, injection_task_label)
342 if retry_config_overrides:
343 # Retry config overrides that were not found in the pipeline before
344 # the injection task was added.
345 for config_label, config_key, config_value in retry_config_overrides:
346 pipeline.addConfigOverride(config_label, config_key, config_value)
348 # Create injected subsets.
349 injected_label_specifier = LabelSpecifier(labels=injected_tasks)
350 injected_pipeline = pipeline.subsetFromLabels(injected_label_specifier, pipeline.PipelineSubsetCtrl.EDIT)
351 injected_subset_labels = set()
352 for injected_subset in injected_pipeline.subsets.keys():
353 injected_subset_label = "injected_" + injected_subset
354 injected_subset_description = (
355 "All tasks from the '" + injected_subset + "' subset impacted by source injection."
356 )
357 if len(injected_subset_tasks := injected_pipeline.subsets[injected_subset]) > 0:
358 injected_subset_labels |= {injected_subset_label}
359 pipeline.addLabeledSubset(
360 injected_subset_label, injected_subset_description, injected_subset_tasks
361 )
363 grammar1 = "task" if len(pipeline) == 1 else "tasks"
364 grammar2 = "subset" if len(injected_subset_labels) == 1 else "subsets"
365 logger.info(
366 "Made an injection pipeline containing %d %s and %d new injected %s.",
367 len(pipeline),
368 grammar1,
369 len(injected_subset_labels),
370 grammar2,
371 )
373 # Optionally include additional tasks in the injection pipeline.
374 if additional_pipelines:
375 additional_tasks: set[str] = set()
376 # Record all input task labels and merge all input pipelines into the
377 # injection pipeline.
378 for additional_pipeline in additional_pipelines:
379 if isinstance(additional_pipeline, str):
380 additional_pipeline = Pipeline.fromFile(additional_pipeline)
381 additional_tasks.update(additional_pipeline.task_labels)
382 pipeline.mergePipeline(additional_pipeline)
384 # Add all tasks to subset_name. If the subset does not exist create it.
385 if not isinstance(additional_subset, list) and additional_subset is not None:
386 additional_subset = [additional_subset]
387 for subset in additional_subset:
388 if ":" in subset:
389 subset_name, subset_description = subset.split(":", 1)
390 else:
391 subset_name = subset
392 subset_description = ""
394 if subset_name in pipeline.subsets.keys():
395 for additional_task in additional_tasks:
396 pipeline.addLabelToSubset(subset_name, additional_task)
397 subset_grammar = f"the existing subset {subset_name}"
398 else:
399 pipeline.addLabeledSubset(subset_name, subset_description, additional_tasks)
400 subset_grammar = f"a new subset {subset_name}"
402 # Logging info.
403 task_grammar = "task" if len(additional_tasks) == 1 else "tasks"
404 logger.info(
405 "Added %d %s to %s",
406 len(additional_tasks),
407 task_grammar,
408 subset_grammar,
409 )
411 # Validate contracts, and remove any that are violated
412 try:
413 _ = pipeline.to_graph()
414 except ContractError:
415 contracts_initial = pipeline._pipelineIR.contracts
416 pipeline._pipelineIR.contracts = []
417 contracts_passed = []
418 contracts_failed = []
419 for contract in contracts_initial:
420 pipeline._pipelineIR.contracts = [contract]
421 try:
422 _ = pipeline.to_graph()
423 except ContractError:
424 contracts_failed.append(contract)
425 continue
426 contracts_passed.append(contract)
427 pipeline._pipelineIR.contracts = contracts_passed
428 if contracts_failed:
429 logger.warning(
430 "The following contracts were violated and have been removed: \n%s",
431 "\n".join([str(contract) for contract in contracts_failed]),
432 )
434 return pipeline