Coverage for python / lsst / ap / verify / pipeline_driver.py: 17%
82 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:59 +0000
1#
2# This file is part of ap_verify.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24"""Interface between `ap_verify` and `ap_pipe`.
26This module handles calling `ap_pipe` and converting any information
27as needed.
28"""
30__all__ = ["ApPipeParser", "runApPipeGen3"]
32import argparse
33import os
34import re
35import subprocess
36import logging
38import lsst.dax.apdb as daxApdb
39import lsst.pipe.base.exec_fixup_data_id # not lifted to package scope intentionally
41_LOG = logging.getLogger(__name__)
44class ApPipeParser(argparse.ArgumentParser):
45 """An argument parser for data needed by ``ap_pipe`` activities.
47 This parser is not complete, and is designed to be passed to another parser
48 using the `parent` parameter.
49 """
51 def __init__(self):
52 # Help and documentation will be handled by main program's parser
53 argparse.ArgumentParser.__init__(self, add_help=False)
54 # namespace.dataIds will always be a list of 0 or more nonempty strings, regardless of inputs.
55 # TODO: in Python 3.8+, action='extend' handles nargs='?' more naturally than 'append'.
56 self.add_argument('-d', '--data-query', dest='dataIds', action='append', default=[],
57 help='An identifier for the data to process.')
58 self.add_argument("-p", "--pipeline", default=None,
59 help="A custom version of the ap_verify pipeline (e.g., with different metrics). "
60 "Defaults to the ApVerify.yaml within --dataset.")
61 self.add_argument("--db", "--db_url", default=None,
62 help="A location for the AP database, formatted as if for apdb-cli create-sql. "
63 "Defaults to an SQLite file in the --output directory.")
64 self.add_argument("--skip-pipeline", action="store_true",
65 help="Do not run the AP pipeline itself. This argument is useful "
66 "for testing metrics on a fixed data set.")
67 self.add_argument("--clean-run", action="store_true",
68 help="Run the pipeline with a new run collection, "
69 "even if one already exists.")
72def runApPipeGen3(workspace, parsedCmdLine, processes=1):
73 """Run `ap_pipe` on this object's dataset.
75 Parameters
76 ----------
77 workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
78 The abstract location containing input and output repositories.
79 parsedCmdLine : `argparse.Namespace`
80 Command-line arguments, including all arguments supported by `ApPipeParser`.
81 processes : `int`
82 The number of processes with which to call the AP pipeline
84 Returns
85 -------
86 code : `int`
87 An error code that is zero if the pipeline ran without problems, or
88 nonzero if there were errors. The exact meaning of nonzereo values
89 is an implementation detail.
90 """
91 log = _LOG.getChild('runApPipeGen3')
93 instruments = {id["instrument"] for id in workspace.workButler.registry.queryDataIds("instrument")}
94 if len(instruments) > 1:
95 raise RuntimeError("Only one instrument is allowed in an ap_verify dataset.")
96 instrument = instruments.pop()
97 _makeApdb(workspace, _getApdbArguments(workspace, parsedCmdLine), instrument)
99 pipelineFile = _getPipelineFile(workspace, parsedCmdLine)
100 pipelineArgs = ["pipetask", "--long-log", "run",
101 # fail-fast to ensure processing errors are obvious, and
102 # to compensate for the extra interconnections added by
103 # --graph-fixup (further down).
104 "--fail-fast",
105 "--butler-config", workspace.repo,
106 "--pipeline", pipelineFile,
107 ]
108 # TODO: workaround for inability to generate crosstalk sources in main
109 # processing pipeline (DM-31492).
110 if instrument == "DECam":
111 crosstalkPipeline = "${AP_PIPE_DIR}/pipelines/DECam/RunIsrForCrosstalkSources.yaml"
112 crosstalkArgs = ["pipetask", "run",
113 "--butler-config", workspace.repo,
114 "--pipeline", crosstalkPipeline,
115 ]
116 crosstalkArgs.extend(_getCollectionArguments(workspace, reuse=(not parsedCmdLine.clean_run)))
117 if parsedCmdLine.dataIds:
118 for singleId in parsedCmdLine.dataIds:
119 crosstalkArgs.extend(["--data-query", singleId])
120 crosstalkArgs.extend(["--processes", str(processes)])
121 crosstalkArgs.extend(["--register-dataset-types"])
122 subprocess.run(crosstalkArgs, capture_output=False, shell=False, check=False)
124 # Force same output run for crosstalk and main processing.
125 pipelineArgs.extend(_getCollectionArguments(workspace, reuse=True))
126 else:
127 # TODO: collections should be determined exclusively by Workspace.workButler,
128 # but I can't find a way to hook that up to the graph builder. So use the CLI
129 # for now and revisit once DM-26239 is done.
130 pipelineArgs.extend(_getCollectionArguments(workspace, reuse=(not parsedCmdLine.clean_run)))
132 pipelineArgs.extend(_getConfigArgumentsGen3(workspace, parsedCmdLine))
133 if parsedCmdLine.dataIds:
134 for singleId in parsedCmdLine.dataIds:
135 pipelineArgs.extend(["--data-query", singleId])
136 pipelineArgs.extend(["--processes", str(processes)])
137 pipelineArgs.extend(["--register-dataset-types"])
138 pipelineArgs.extend(["--graph-fixup", "lsst.ap.verify.pipeline_driver._getExecOrder"])
140 if not parsedCmdLine.skip_pipeline:
141 # subprocess is an unsafe workaround for DM-26239
142 # TODO: generalize this code in DM-26028
143 # TODO: work off of workspace.workButler after DM-26239
144 log.info("Running pipeline:")
145 log.info(' '.join(pipelineArgs))
146 results = subprocess.run(pipelineArgs, capture_output=False, shell=False, check=False)
147 log.info('Pipeline complete.')
148 return results.returncode
149 else:
150 log.info('Skipping AP pipeline entirely.')
153def _getExecOrder():
154 """Return any constraints on the Gen 3 execution order.
156 The current constraints are that executions of DiaPipelineTask must be
157 ordered by visit ID, but this is subject to change.
159 Returns
160 -------
161 order : `lsst.pipe.base.exec_fixup_data_id.ExecutionGraphFixup`
162 An object encoding the desired execution order as an algorithm for
163 modifying inter-quantum dependencies.
165 Notes
166 -----
167 This function must be importable, but need not be public.
168 """
169 # Source association algorithm is not time-symmetric. Force execution of
170 # association (through DiaPipelineTask) in order of ascending visit number.
171 return lsst.pipe.base.exec_fixup_data_id.ExecFixupDataId(
172 taskLabel="associateApdb", dimensions=["visit", ], reverse=False)
175def _getPipelineFile(workspace, parsed):
176 """Return the pipeline to be run.
178 Parameters
179 ----------
180 workspace : `lsst.ap.verify.workspace.Workspace`
181 A Workspace whose pipeline directory may contain an ApVerify pipeline.
182 parsed : `argparse.Namespace`
183 Command-line arguments, including all arguments supported by `ApPipeParser`.
185 Returns
186 -------
187 pipeline : `str`
188 The location of the pipeline file to use for running ap_verify.
189 """
190 if parsed.pipeline:
191 return parsed.pipeline
192 else:
193 customPipeline = os.path.join(workspace.pipelineDir, "ApVerify.yaml")
194 if os.path.exists(customPipeline):
195 return customPipeline
196 else:
197 return os.path.join("${AP_VERIFY_DIR}", "pipelines", "ApVerify.yaml")
200def _getApdbArguments(workspace, parsed):
201 """Return the arguments for running apdb-cli create-sql on this workspace,
202 as key-value pairs.
204 Parameters
205 ----------
206 workspace : `lsst.ap.verify.workspace.Workspace`
207 A Workspace whose config directory may contain an
208 `~lsst.ap.pipe.ApPipeTask` config.
209 parsed : `argparse.Namespace`
210 Command-line arguments, including all arguments supported by `ApPipeParser`.
212 Returns
213 -------
214 args : mapping [`str`]
215 Arguments to `lsst.dax.apdb.sql.Apdb.init_database`.
216 """
217 if not parsed.db:
218 parsed.db = "sqlite:///" + workspace.dbLocation
220 args = {"db_url": parsed.db,
221 }
223 return args
226def _getConfigArgumentsGen3(workspace, parsed):
227 """Return the config options for running the Gen 3 AP Pipeline on this
228 workspace, as command-line arguments.
230 Parameters
231 ----------
232 workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
233 A Workspace whose config directory may contain various configs.
234 parsed : `argparse.Namespace`
235 Command-line arguments, including all arguments supported by `ApPipeParser`.
237 Returns
238 -------
239 args : `list` of `str`
240 Command-line arguments calling ``--config`` or ``--config-file``,
241 following the conventions of `sys.argv`.
242 """
243 return [
244 # APDB config should have been stored in the workspace.
245 "--config", "parameters:apdb_config=" + workspace.dbConfigLocation,
246 # Put output alerts into the workspace.
247 "--config", "associateApdb:alertPackager.alertWriteLocation=" + workspace.alertLocation,
248 ]
251def _getCollectionArguments(workspace, reuse):
252 """Return the collections for running the Gen 3 AP Pipeline on this
253 workspace, as command-line arguments.
255 Parameters
256 ----------
257 workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
258 A Workspace with a Gen 3 repository.
259 reuse : `bool`
260 If true, use the previous run collection if one exists. Otherwise,
261 create a new run.
263 Returns
264 -------
265 args : `list` of `str`
266 Command-line arguments calling ``--input`` or ``--output``,
267 following the conventions of `sys.argv`.
268 """
269 # workspace.outputName is a chained collection containing all inputs
270 args = ["--output", workspace.outputName,
271 "--clobber-outputs",
272 ]
274 registry = workspace.workButler.registry
275 # Should refresh registry to see crosstalk run from DM-31492, but this
276 # currently leads to a bug involving --skip-existing. The only downside of
277 # the cached registry is that, with two runs for DECam datasets, a rerun of
278 # ap_verify will re-run crosstalk sources in the second run. Using
279 # skip-existing-in would work around that, but would lead to a worse bug in
280 # the case that the user is alternating runs with and without --clean-run.
281 # registry.refresh()
282 collectionPattern = re.compile(workspace.outputName + r"/\d+T\d+Z")
283 oldRuns = list(registry.queryCollections(workspace.outputName + "/*"))
284 oldRuns = [run for run in oldRuns if collectionPattern.fullmatch(run)]
286 if reuse and oldRuns:
287 args.extend(["--extend-run", "--skip-existing"])
288 return args
291def _makeApdb(workspace, args, instrument):
292 """Create an APDB and store its config for future use.
294 Parameters
295 ----------
296 workspace : `lsst.ap.verify.workspace.Workspace`
297 A Workspace in which to store the database config.
298 args : mapping [`str`]
299 Arguments to `lsst.dax.apdb.sql.Apdb.init_database`.
300 instrument : `str`
301 Short name of the instrument this APDB will store.
302 """
303 config = daxApdb.ApdbSql.init_database(**args)
304 config.save(workspace.dbConfigLocation)
306 apdb = daxApdb.ApdbSql(config)
307 apdb.metadata.set("instrument", instrument)