Coverage for python / lsst / ap / verify / pipeline_driver.py: 17%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:40 +0000

1# 

2# This file is part of ap_verify. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24"""Interface between `ap_verify` and `ap_pipe`. 

25 

26This module handles calling `ap_pipe` and converting any information 

27as needed. 

28""" 

29 

30__all__ = ["ApPipeParser", "runApPipeGen3"] 

31 

32import argparse 

33import os 

34import re 

35import subprocess 

36import logging 

37 

38import lsst.dax.apdb as daxApdb 

39import lsst.pipe.base.exec_fixup_data_id # not lifted to package scope intentionally 

40 

41_LOG = logging.getLogger(__name__) 

42 

43 

44class ApPipeParser(argparse.ArgumentParser): 

45 """An argument parser for data needed by ``ap_pipe`` activities. 

46 

47 This parser is not complete, and is designed to be passed to another parser 

48 using the `parent` parameter. 

49 """ 

50 

51 def __init__(self): 

52 # Help and documentation will be handled by main program's parser 

53 argparse.ArgumentParser.__init__(self, add_help=False) 

54 # namespace.dataIds will always be a list of 0 or more nonempty strings, regardless of inputs. 

55 # TODO: in Python 3.8+, action='extend' handles nargs='?' more naturally than 'append'. 

56 self.add_argument('-d', '--data-query', dest='dataIds', action='append', default=[], 

57 help='An identifier for the data to process.') 

58 self.add_argument("-p", "--pipeline", default=None, 

59 help="A custom version of the ap_verify pipeline (e.g., with different metrics). " 

60 "Defaults to the ApVerify.yaml within --dataset.") 

61 self.add_argument("--db", "--db_url", default=None, 

62 help="A location for the AP database, formatted as if for apdb-cli create-sql. " 

63 "Defaults to an SQLite file in the --output directory.") 

64 self.add_argument("--skip-pipeline", action="store_true", 

65 help="Do not run the AP pipeline itself. This argument is useful " 

66 "for testing metrics on a fixed data set.") 

67 self.add_argument("--clean-run", action="store_true", 

68 help="Run the pipeline with a new run collection, " 

69 "even if one already exists.") 

70 

71 

72def runApPipeGen3(workspace, parsedCmdLine, processes=1): 

73 """Run `ap_pipe` on this object's dataset. 

74 

75 Parameters 

76 ---------- 

77 workspace : `lsst.ap.verify.workspace.WorkspaceGen3` 

78 The abstract location containing input and output repositories. 

79 parsedCmdLine : `argparse.Namespace` 

80 Command-line arguments, including all arguments supported by `ApPipeParser`. 

81 processes : `int` 

82 The number of processes with which to call the AP pipeline 

83 

84 Returns 

85 ------- 

86 code : `int` 

87 An error code that is zero if the pipeline ran without problems, or 

88 nonzero if there were errors. The exact meaning of nonzereo values 

89 is an implementation detail. 

90 """ 

91 log = _LOG.getChild('runApPipeGen3') 

92 

93 instruments = {id["instrument"] for id in workspace.workButler.registry.queryDataIds("instrument")} 

94 if len(instruments) > 1: 

95 raise RuntimeError("Only one instrument is allowed in an ap_verify dataset.") 

96 instrument = instruments.pop() 

97 _makeApdb(workspace, _getApdbArguments(workspace, parsedCmdLine), instrument) 

98 

99 pipelineFile = _getPipelineFile(workspace, parsedCmdLine) 

100 pipelineArgs = ["pipetask", "--long-log", "run", 

101 # fail-fast to ensure processing errors are obvious, and 

102 # to compensate for the extra interconnections added by 

103 # --graph-fixup (further down). 

104 "--fail-fast", 

105 "--butler-config", workspace.repo, 

106 "--pipeline", pipelineFile, 

107 ] 

108 # TODO: workaround for inability to generate crosstalk sources in main 

109 # processing pipeline (DM-31492). 

110 if instrument == "DECam": 

111 crosstalkPipeline = "${AP_PIPE_DIR}/pipelines/DECam/RunIsrForCrosstalkSources.yaml" 

112 crosstalkArgs = ["pipetask", "run", 

113 "--butler-config", workspace.repo, 

114 "--pipeline", crosstalkPipeline, 

115 ] 

116 crosstalkArgs.extend(_getCollectionArguments(workspace, reuse=(not parsedCmdLine.clean_run))) 

117 if parsedCmdLine.dataIds: 

118 for singleId in parsedCmdLine.dataIds: 

119 crosstalkArgs.extend(["--data-query", singleId]) 

120 crosstalkArgs.extend(["--processes", str(processes)]) 

121 crosstalkArgs.extend(["--register-dataset-types"]) 

122 subprocess.run(crosstalkArgs, capture_output=False, shell=False, check=False) 

123 

124 # Force same output run for crosstalk and main processing. 

125 pipelineArgs.extend(_getCollectionArguments(workspace, reuse=True)) 

126 else: 

127 # TODO: collections should be determined exclusively by Workspace.workButler, 

128 # but I can't find a way to hook that up to the graph builder. So use the CLI 

129 # for now and revisit once DM-26239 is done. 

130 pipelineArgs.extend(_getCollectionArguments(workspace, reuse=(not parsedCmdLine.clean_run))) 

131 

132 pipelineArgs.extend(_getConfigArgumentsGen3(workspace, parsedCmdLine)) 

133 if parsedCmdLine.dataIds: 

134 for singleId in parsedCmdLine.dataIds: 

135 pipelineArgs.extend(["--data-query", singleId]) 

136 pipelineArgs.extend(["--processes", str(processes)]) 

137 pipelineArgs.extend(["--register-dataset-types"]) 

138 pipelineArgs.extend(["--graph-fixup", "lsst.ap.verify.pipeline_driver._getExecOrder"]) 

139 

140 if not parsedCmdLine.skip_pipeline: 

141 # subprocess is an unsafe workaround for DM-26239 

142 # TODO: generalize this code in DM-26028 

143 # TODO: work off of workspace.workButler after DM-26239 

144 log.info("Running pipeline:") 

145 log.info(' '.join(pipelineArgs)) 

146 results = subprocess.run(pipelineArgs, capture_output=False, shell=False, check=False) 

147 log.info('Pipeline complete.') 

148 return results.returncode 

149 else: 

150 log.info('Skipping AP pipeline entirely.') 

151 

152 

153def _getExecOrder(): 

154 """Return any constraints on the Gen 3 execution order. 

155 

156 The current constraints are that executions of DiaPipelineTask must be 

157 ordered by visit ID, but this is subject to change. 

158 

159 Returns 

160 ------- 

161 order : `lsst.pipe.base.exec_fixup_data_id.ExecutionGraphFixup` 

162 An object encoding the desired execution order as an algorithm for 

163 modifying inter-quantum dependencies. 

164 

165 Notes 

166 ----- 

167 This function must be importable, but need not be public. 

168 """ 

169 # Source association algorithm is not time-symmetric. Force execution of 

170 # association (through DiaPipelineTask) in order of ascending visit number. 

171 return lsst.pipe.base.exec_fixup_data_id.ExecFixupDataId( 

172 taskLabel="associateApdb", dimensions=["visit", ], reverse=False) 

173 

174 

175def _getPipelineFile(workspace, parsed): 

176 """Return the pipeline to be run. 

177 

178 Parameters 

179 ---------- 

180 workspace : `lsst.ap.verify.workspace.Workspace` 

181 A Workspace whose pipeline directory may contain an ApVerify pipeline. 

182 parsed : `argparse.Namespace` 

183 Command-line arguments, including all arguments supported by `ApPipeParser`. 

184 

185 Returns 

186 ------- 

187 pipeline : `str` 

188 The location of the pipeline file to use for running ap_verify. 

189 """ 

190 if parsed.pipeline: 

191 return parsed.pipeline 

192 else: 

193 customPipeline = os.path.join(workspace.pipelineDir, "ApVerify.yaml") 

194 if os.path.exists(customPipeline): 

195 return customPipeline 

196 else: 

197 return os.path.join("${AP_VERIFY_DIR}", "pipelines", "ApVerify.yaml") 

198 

199 

200def _getApdbArguments(workspace, parsed): 

201 """Return the arguments for running apdb-cli create-sql on this workspace, 

202 as key-value pairs. 

203 

204 Parameters 

205 ---------- 

206 workspace : `lsst.ap.verify.workspace.Workspace` 

207 A Workspace whose config directory may contain an 

208 `~lsst.ap.pipe.ApPipeTask` config. 

209 parsed : `argparse.Namespace` 

210 Command-line arguments, including all arguments supported by `ApPipeParser`. 

211 

212 Returns 

213 ------- 

214 args : mapping [`str`] 

215 Arguments to `lsst.dax.apdb.sql.Apdb.init_database`. 

216 """ 

217 if not parsed.db: 

218 parsed.db = "sqlite:///" + workspace.dbLocation 

219 

220 args = {"db_url": parsed.db, 

221 } 

222 

223 return args 

224 

225 

226def _getConfigArgumentsGen3(workspace, parsed): 

227 """Return the config options for running the Gen 3 AP Pipeline on this 

228 workspace, as command-line arguments. 

229 

230 Parameters 

231 ---------- 

232 workspace : `lsst.ap.verify.workspace.WorkspaceGen3` 

233 A Workspace whose config directory may contain various configs. 

234 parsed : `argparse.Namespace` 

235 Command-line arguments, including all arguments supported by `ApPipeParser`. 

236 

237 Returns 

238 ------- 

239 args : `list` of `str` 

240 Command-line arguments calling ``--config`` or ``--config-file``, 

241 following the conventions of `sys.argv`. 

242 """ 

243 return [ 

244 # APDB config should have been stored in the workspace. 

245 "--config", "parameters:apdb_config=" + workspace.dbConfigLocation, 

246 # Put output alerts into the workspace. 

247 "--config", "associateApdb:alertPackager.alertWriteLocation=" + workspace.alertLocation, 

248 ] 

249 

250 

251def _getCollectionArguments(workspace, reuse): 

252 """Return the collections for running the Gen 3 AP Pipeline on this 

253 workspace, as command-line arguments. 

254 

255 Parameters 

256 ---------- 

257 workspace : `lsst.ap.verify.workspace.WorkspaceGen3` 

258 A Workspace with a Gen 3 repository. 

259 reuse : `bool` 

260 If true, use the previous run collection if one exists. Otherwise, 

261 create a new run. 

262 

263 Returns 

264 ------- 

265 args : `list` of `str` 

266 Command-line arguments calling ``--input`` or ``--output``, 

267 following the conventions of `sys.argv`. 

268 """ 

269 # workspace.outputName is a chained collection containing all inputs 

270 args = ["--output", workspace.outputName, 

271 "--clobber-outputs", 

272 ] 

273 

274 registry = workspace.workButler.registry 

275 # Should refresh registry to see crosstalk run from DM-31492, but this 

276 # currently leads to a bug involving --skip-existing. The only downside of 

277 # the cached registry is that, with two runs for DECam datasets, a rerun of 

278 # ap_verify will re-run crosstalk sources in the second run. Using 

279 # skip-existing-in would work around that, but would lead to a worse bug in 

280 # the case that the user is alternating runs with and without --clean-run. 

281 # registry.refresh() 

282 collectionPattern = re.compile(workspace.outputName + r"/\d+T\d+Z") 

283 oldRuns = list(registry.queryCollections(workspace.outputName + "/*")) 

284 oldRuns = [run for run in oldRuns if collectionPattern.fullmatch(run)] 

285 

286 if reuse and oldRuns: 

287 args.extend(["--extend-run", "--skip-existing"]) 

288 return args 

289 

290 

291def _makeApdb(workspace, args, instrument): 

292 """Create an APDB and store its config for future use. 

293 

294 Parameters 

295 ---------- 

296 workspace : `lsst.ap.verify.workspace.Workspace` 

297 A Workspace in which to store the database config. 

298 args : mapping [`str`] 

299 Arguments to `lsst.dax.apdb.sql.Apdb.init_database`. 

300 instrument : `str` 

301 Short name of the instrument this APDB will store. 

302 """ 

303 config = daxApdb.ApdbSql.init_database(**args) 

304 config.save(workspace.dbConfigLocation) 

305 

306 apdb = daxApdb.ApdbSql(config) 

307 apdb.metadata.set("instrument", instrument)