Coverage for python / lsst / ap / verify / ingestion.py: 26%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:31 +0000

1# 

2# This file is part of ap_verify. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24"""Data ingestion for ap_verify. 

25 

26This module handles ingestion of an ap_verify dataset into an appropriate repository, so 

27that pipeline code need not be aware of the dataset framework. 

28""" 

29 

30__all__ = ["Gen3DatasetIngestConfig", "ingestDatasetGen3"] 

31 

32import argparse 

33import fnmatch 

34import os 

35import shutil 

36from glob import glob 

37import logging 

38 

39from lsst.utils.argparsing import AppendDict 

40import lsst.pex.config as pexConfig 

41import lsst.pipe.base as pipeBase 

42 

43import lsst.daf.butler 

44import lsst.obs.base 

45 

46_LOG = logging.getLogger(__name__) 

47 

48 

49class IngestionParser(argparse.ArgumentParser): 

50 """An argument parser for data needed by ingestion. 

51 

52 This parser is not complete, and is designed to be passed to another parser 

53 using the `parent` parameter. 

54 """ 

55 

56 def __init__(self): 

57 # Help and documentation will be handled by main program's parser 

58 argparse.ArgumentParser.__init__(self, add_help=False) 

59 

60 self.add_argument('--namespace', dest='namespace', default=None, 

61 help='The sasquastch namespace to use for the ap_verify metrics upload.') 

62 

63 self.add_argument('--restProxyUrl', dest='restProxyUrl', default=None, 

64 help='The sasquastch url to use for the ap_verify metrics upload.') 

65 

66 self.add_argument("--extra", action=AppendDict, 

67 help="Extra field (in the form key=value) to be added to any records " 

68 "uploaded to Sasquatch. See SasquatchDispatcher.dispatch and " 

69 ".dispatchRef for more details. The --extra argument can be passed " 

70 "multiple times.") 

71 

72 

73class Gen3DatasetIngestConfig(pexConfig.Config): 

74 """Settings and defaults for `Gen3DatasetIngestTask`. 

75 

76 The correct target for `ingester` can be found in the documentation of 

77 the appropriate ``obs`` package. 

78 """ 

79 

80 ingester = pexConfig.ConfigurableField( 

81 target=lsst.obs.base.RawIngestTask, 

82 doc="Task used to perform raw data ingestion.", 

83 ) 

84 visitDefiner = pexConfig.ConfigurableField( 

85 target=lsst.obs.base.DefineVisitsTask, 

86 doc="Task used to organize raw exposures into visits.", 

87 ) 

88 # Normally file patterns should be user input, but put them in a config so 

89 # the ap_verify dataset can configure them 

90 dataFiles = pexConfig.ListField( 

91 dtype=str, 

92 default=["*.fits", "*.fz", "*.fits.gz"], 

93 doc="Names of raw science files (no path; wildcards allowed) to ingest from the ap_verify dataset.", 

94 ) 

95 dataBadFiles = pexConfig.ListField( 

96 dtype=str, 

97 default=[], 

98 doc="Names of raw science files (no path; wildcards allowed) to not ingest, " 

99 "supersedes ``dataFiles``.", 

100 ) 

101 

102 def setDefaults(self): 

103 super().setDefaults() 

104 self.ingester.transfer = "copy" 

105 

106 

107class Gen3DatasetIngestTask(pipeBase.Task): 

108 """Task for automating ingestion of a ap_verify dataset. 

109 

110 Each dataset configures this task as appropriate for the files it provides 

111 and the target instrument. Therefore, this task takes no input besides the 

112 ap_verify dataset to load and the repositories to ingest to. 

113 

114 Parameters 

115 ---------- 

116 dataset : `lsst.ap.verify.dataset.Dataset` 

117 The ``ap_verify`` dataset to be ingested. 

118 workspace : `lsst.ap.verify.workspace.WorkspaceGen3` 

119 The abstract location for all ``ap_verify`` outputs, including 

120 a Gen 3 repository. 

121 namespace : `str`, optional 

122 The Sasquatch namespace to which to upload analysis_tools metrics. If 

123 omitted, no metrics are uploaded. 

124 url : `str`, optional 

125 The Sasquatch server to which to upload analysis_tools metrics. Must be 

126 provided if ``namespace`` is. 

127 extra : `dict`, optional 

128 Extra parameters to for the SasquatchDatastore, needed to post 

129 ap_verify metrics. Should be provided if ``namespace`` is. 

130 """ 

131 

132 ConfigClass = Gen3DatasetIngestConfig 

133 # Suffix is de-facto convention for distinguishing Gen 2 and Gen 3 config overrides 

134 _DefaultName = "datasetIngest-gen3" 

135 

136 def __init__(self, dataset, workspace, namespace=None, url=None, extra=None, *args, **kwargs): 

137 super().__init__(*args, **kwargs) 

138 self.workspace = workspace 

139 self.dataset = dataset 

140 self.namespace = namespace 

141 self.extra = extra if extra is not None else {} 

142 self.url = url 

143 # workspace.workButler is undefined until the repository is created 

144 self.dataset.makeCompatibleRepoGen3(self.workspace.repo, self.namespace, self.url, self.extra) 

145 if self.url is not None: 

146 self.transferMode = "copy" 

147 self.makeSubtask("ingester", butler=self.workspace.workButler) 

148 self.makeSubtask("visitDefiner", butler=self.workspace.workButler) 

149 

150 def _reduce_kwargs(self): 

151 # Add extra parameters to pickle 

152 return dict(**super()._reduce_kwargs(), dataset=self.dataset, 

153 workspace=self.workspace, namespace=self.namespace, url=self.url) 

154 

155 def run(self, processes=1): 

156 """Ingest the contents of a dataset into a Butler repository. 

157 

158 Parameters 

159 ---------- 

160 processes : `int` 

161 The number processes to use to ingest. 

162 """ 

163 self._ensureRaws(processes=processes) 

164 self._defineVisits(processes=processes) 

165 self._copyConfigs() 

166 

167 def _ensureRaws(self, processes): 

168 """Ensure that the repository in ``workspace`` has raws ingested. 

169 

170 After this method returns, this task's repository contains all science 

171 data from this task's ap_verify dataset. Butler operations on the 

172 repository are not able to modify ``dataset`` in any way. 

173 

174 Parameters 

175 ---------- 

176 processes : `int` 

177 The number processes to use to ingest, if ingestion must be run. 

178 

179 Raises 

180 ------ 

181 RuntimeError 

182 Raised if there are no files to ingest. 

183 """ 

184 try: 

185 collectionName = self.dataset.instrument.makeDefaultRawIngestRunName() 

186 rawCollections = list(self.workspace.workButler.registry.queryCollections(collectionName)) 

187 except lsst.daf.butler.MissingCollectionError: 

188 rawCollections = [] 

189 

190 rawData = list(self.workspace.workButler.registry.queryDatasets( 

191 'raw', 

192 collections=rawCollections, 

193 dataId={"instrument": self.dataset.instrument.getName()})) \ 

194 if rawCollections else [] 

195 

196 if rawData: 

197 self.log.info("Raw images for %s were previously ingested, skipping...", 

198 self.dataset.instrument.getName()) 

199 else: 

200 self.log.info("Ingesting raw images...") 

201 dataFiles = _findMatchingFiles(self.dataset.rawLocation, self.config.dataFiles, 

202 exclude=self.config.dataBadFiles) 

203 if dataFiles: 

204 self._ingestRaws(dataFiles, processes=processes) 

205 self.log.info("Images are now ingested in {0}".format(self.workspace.repo)) 

206 else: 

207 raise RuntimeError("No raw files found at %s." % self.dataset.rawLocation) 

208 

209 def _ingestRaws(self, dataFiles, processes): 

210 """Ingest raw images into a repository. 

211 

212 This task's repository is populated with *links* to ``dataFiles``. 

213 

214 Parameters 

215 ---------- 

216 dataFiles : `list` of `str` 

217 A list of filenames to ingest. May contain wildcards. 

218 processes : `int` 

219 The number processes to use to ingest. 

220 

221 Raises 

222 ------ 

223 RuntimeError 

224 Raised if ``dataFiles`` is empty or any file has already been ingested. 

225 """ 

226 if not dataFiles: 

227 raise RuntimeError("No raw files to ingest (expected list of filenames, got %r)." % dataFiles) 

228 

229 try: 

230 # run=None because expect ingester to name a new collection. 

231 # HACK: update_exposure_records=True to modernize exposure records 

232 # from old ap_verify datasets. Since the exposure records are 

233 # generated from the same files, the only changes should be 

234 # schema-related. 

235 self.ingester.run(dataFiles, run=None, processes=processes, update_exposure_records=True) 

236 except lsst.daf.butler.registry.ConflictingDefinitionError as detail: 

237 raise RuntimeError("Not all raw files are unique") from detail 

238 

239 def _defineVisits(self, processes): 

240 """Map visits to the ingested exposures. 

241 

242 This step is necessary to be able to run most pipelines on raw datasets. 

243 

244 Parameters 

245 ---------- 

246 processes : `int` 

247 The number processes to use to define visits. 

248 

249 Raises 

250 ------ 

251 RuntimeError 

252 Raised if there are no exposures in the repository. 

253 """ 

254 exposures = set(self.workspace.workButler.registry.queryDataIds(["exposure"])) 

255 if not exposures: 

256 raise RuntimeError(f"No exposures defined in {self.workspace.repo}.") 

257 

258 exposureKeys = list(exposures)[0].dimensions 

259 exposuresWithVisits = {x.subset(exposureKeys) for x in 

260 self.workspace.workButler.registry.queryDataIds(["exposure", "visit"])} 

261 exposuresNoVisits = exposures - exposuresWithVisits 

262 if exposuresNoVisits: 

263 self.log.info("Defining visits...") 

264 self.visitDefiner.run(exposuresNoVisits) 

265 else: 

266 self.log.info("Visits were previously defined, skipping...") 

267 

268 def _copyConfigs(self): 

269 """Give a workspace a copy of all configs associated with the 

270 ingested data. 

271 

272 After this method returns, the config directory in the workspace 

273 contains all config files from the ap_verify dataset, and the 

274 pipelines directory in the workspace contains all pipeline files 

275 from the dataset. 

276 """ 

277 if os.listdir(self.workspace.pipelineDir): 

278 self.log.info("Configs already copied, skipping...") 

279 else: 

280 self.log.info("Storing data-specific configs...") 

281 for configFile in _findMatchingFiles(self.dataset.configLocation, ['*.py']): 

282 shutil.copy2(configFile, self.workspace.configDir) 

283 self.log.info("Configs are now stored in %s.", self.workspace.configDir) 

284 for pipelineFile in _findMatchingFiles(self.dataset.pipelineLocation, ['*.yaml']): 

285 shutil.copy2(pipelineFile, self.workspace.pipelineDir) 

286 self.log.info("Configs are now stored in %s.", self.workspace.pipelineDir) 

287 

288 

289def ingestDatasetGen3(dataset, workspace, sasquatchNamespace=None, sasquatchUrl=None, extra=None, 

290 processes=1): 

291 """Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository. 

292 

293 The original data directory is not modified. 

294 

295 Parameters 

296 ---------- 

297 dataset : `lsst.ap.verify.dataset.Dataset` 

298 The ap_verify dataset to be ingested. 

299 workspace : `lsst.ap.verify.workspace.WorkspaceGen3` 

300 The abstract location where the epository is be created, if it does 

301 not already exist. 

302 sasquatchNamespace : `str`, optional 

303 The name of the namespace to post the ap_verify metrics to. 

304 sasquatchUrl : `str`, optional 

305 The URL of the server to post the ap_verify metrics to. 

306 extra : `dict`, optional 

307 Extra parameters needed to post ap_verify metrics to Sasquatch. 

308 processes : `int`, optional 

309 The number processes to use to ingest. 

310 """ 

311 log = _LOG.getChild("ingestDataset") 

312 

313 ingester = Gen3DatasetIngestTask( 

314 dataset, workspace, sasquatchNamespace, sasquatchUrl, extra, 

315 config=_getConfig(Gen3DatasetIngestTask, dataset) 

316 ) 

317 ingester.run(processes=processes) 

318 log.info("Data ingested") 

319 

320 

321def _getConfig(task, dataset): 

322 """Return the ingestion config associated with a specific dataset. 

323 

324 Parameters 

325 ---------- 

326 task : `lsst.pipe.base.Task`-type 

327 The task whose config is needed 

328 dataset : `lsst.ap.verify.dataset.Dataset` 

329 The dataset whose ingestion config is desired. 

330 

331 Returns 

332 ------- 

333 config : ``task.ConfigClass`` 

334 The config for running ``task`` on ``dataset``. 

335 """ 

336 config = task.ConfigClass() 

337 dataset.instrument.applyConfigOverrides(task._DefaultName, config) 

338 return config 

339 

340 

341def _findMatchingFiles(basePath, include, exclude=None): 

342 """Recursively identify files matching one set of patterns and not matching another. 

343 

344 Parameters 

345 ---------- 

346 basePath : `str` 

347 The path on disk where the files in ``include`` are located. 

348 include : iterable of `str` 

349 A collection of files (with wildcards) to include. Must not 

350 contain paths. 

351 exclude : iterable of `str`, optional 

352 A collection of filenames (with wildcards) to exclude. Must not 

353 contain paths. If omitted, all files matching ``include`` are returned. 

354 

355 Returns 

356 ------- 

357 files : `set` of `str` 

358 The files in ``basePath`` or any subdirectory that match ``include`` 

359 but not ``exclude``. 

360 """ 

361 _exclude = exclude if exclude is not None else [] 

362 

363 allFiles = set() 

364 for pattern in include: 

365 allFiles.update(glob(os.path.join(basePath, '**', pattern), recursive=True)) 

366 

367 for pattern in _exclude: 

368 excludedFiles = [f for f in allFiles if fnmatch.fnmatch(os.path.basename(f), pattern)] 

369 allFiles.difference_update(excludedFiles) 

370 return allFiles