Coverage for python / lsst / ap / verify / ingestion.py: 26%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:58 +0000
1#
2# This file is part of ap_verify.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24"""Data ingestion for ap_verify.
26This module handles ingestion of an ap_verify dataset into an appropriate repository, so
27that pipeline code need not be aware of the dataset framework.
28"""
30__all__ = ["Gen3DatasetIngestConfig", "ingestDatasetGen3"]
32import argparse
33import fnmatch
34import os
35import shutil
36from glob import glob
37import logging
39from lsst.utils.argparsing import AppendDict
40import lsst.pex.config as pexConfig
41import lsst.pipe.base as pipeBase
43import lsst.daf.butler
44import lsst.obs.base
46_LOG = logging.getLogger(__name__)
49class IngestionParser(argparse.ArgumentParser):
50 """An argument parser for data needed by ingestion.
52 This parser is not complete, and is designed to be passed to another parser
53 using the `parent` parameter.
54 """
56 def __init__(self):
57 # Help and documentation will be handled by main program's parser
58 argparse.ArgumentParser.__init__(self, add_help=False)
60 self.add_argument('--namespace', dest='namespace', default=None,
61 help='The sasquastch namespace to use for the ap_verify metrics upload.')
63 self.add_argument('--restProxyUrl', dest='restProxyUrl', default=None,
64 help='The sasquastch url to use for the ap_verify metrics upload.')
66 self.add_argument("--extra", action=AppendDict,
67 help="Extra field (in the form key=value) to be added to any records "
68 "uploaded to Sasquatch. See SasquatchDispatcher.dispatch and "
69 ".dispatchRef for more details. The --extra argument can be passed "
70 "multiple times.")
73class Gen3DatasetIngestConfig(pexConfig.Config):
74 """Settings and defaults for `Gen3DatasetIngestTask`.
76 The correct target for `ingester` can be found in the documentation of
77 the appropriate ``obs`` package.
78 """
80 ingester = pexConfig.ConfigurableField(
81 target=lsst.obs.base.RawIngestTask,
82 doc="Task used to perform raw data ingestion.",
83 )
84 visitDefiner = pexConfig.ConfigurableField(
85 target=lsst.obs.base.DefineVisitsTask,
86 doc="Task used to organize raw exposures into visits.",
87 )
88 # Normally file patterns should be user input, but put them in a config so
89 # the ap_verify dataset can configure them
90 dataFiles = pexConfig.ListField(
91 dtype=str,
92 default=["*.fits", "*.fz", "*.fits.gz"],
93 doc="Names of raw science files (no path; wildcards allowed) to ingest from the ap_verify dataset.",
94 )
95 dataBadFiles = pexConfig.ListField(
96 dtype=str,
97 default=[],
98 doc="Names of raw science files (no path; wildcards allowed) to not ingest, "
99 "supersedes ``dataFiles``.",
100 )
102 def setDefaults(self):
103 super().setDefaults()
104 self.ingester.transfer = "copy"
107class Gen3DatasetIngestTask(pipeBase.Task):
108 """Task for automating ingestion of a ap_verify dataset.
110 Each dataset configures this task as appropriate for the files it provides
111 and the target instrument. Therefore, this task takes no input besides the
112 ap_verify dataset to load and the repositories to ingest to.
114 Parameters
115 ----------
116 dataset : `lsst.ap.verify.dataset.Dataset`
117 The ``ap_verify`` dataset to be ingested.
118 workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
119 The abstract location for all ``ap_verify`` outputs, including
120 a Gen 3 repository.
121 namespace : `str`, optional
122 The Sasquatch namespace to which to upload analysis_tools metrics. If
123 omitted, no metrics are uploaded.
124 url : `str`, optional
125 The Sasquatch server to which to upload analysis_tools metrics. Must be
126 provided if ``namespace`` is.
127 extra : `dict`, optional
128 Extra parameters to for the SasquatchDatastore, needed to post
129 ap_verify metrics. Should be provided if ``namespace`` is.
130 """
132 ConfigClass = Gen3DatasetIngestConfig
133 # Suffix is de-facto convention for distinguishing Gen 2 and Gen 3 config overrides
134 _DefaultName = "datasetIngest-gen3"
136 def __init__(self, dataset, workspace, namespace=None, url=None, extra=None, *args, **kwargs):
137 super().__init__(*args, **kwargs)
138 self.workspace = workspace
139 self.dataset = dataset
140 self.namespace = namespace
141 self.extra = extra if extra is not None else {}
142 self.url = url
143 # workspace.workButler is undefined until the repository is created
144 self.dataset.makeCompatibleRepoGen3(self.workspace.repo, self.namespace, self.url, self.extra)
145 if self.url is not None:
146 self.transferMode = "copy"
147 self.makeSubtask("ingester", butler=self.workspace.workButler)
148 self.makeSubtask("visitDefiner", butler=self.workspace.workButler)
150 def _reduce_kwargs(self):
151 # Add extra parameters to pickle
152 return dict(**super()._reduce_kwargs(), dataset=self.dataset,
153 workspace=self.workspace, namespace=self.namespace, url=self.url)
155 def run(self, processes=1):
156 """Ingest the contents of a dataset into a Butler repository.
158 Parameters
159 ----------
160 processes : `int`
161 The number processes to use to ingest.
162 """
163 self._ensureRaws(processes=processes)
164 self._defineVisits(processes=processes)
165 self._copyConfigs()
167 def _ensureRaws(self, processes):
168 """Ensure that the repository in ``workspace`` has raws ingested.
170 After this method returns, this task's repository contains all science
171 data from this task's ap_verify dataset. Butler operations on the
172 repository are not able to modify ``dataset`` in any way.
174 Parameters
175 ----------
176 processes : `int`
177 The number processes to use to ingest, if ingestion must be run.
179 Raises
180 ------
181 RuntimeError
182 Raised if there are no files to ingest.
183 """
184 try:
185 collectionName = self.dataset.instrument.makeDefaultRawIngestRunName()
186 rawCollections = list(self.workspace.workButler.registry.queryCollections(collectionName))
187 except lsst.daf.butler.MissingCollectionError:
188 rawCollections = []
190 rawData = list(self.workspace.workButler.registry.queryDatasets(
191 'raw',
192 collections=rawCollections,
193 dataId={"instrument": self.dataset.instrument.getName()})) \
194 if rawCollections else []
196 if rawData:
197 self.log.info("Raw images for %s were previously ingested, skipping...",
198 self.dataset.instrument.getName())
199 else:
200 self.log.info("Ingesting raw images...")
201 dataFiles = _findMatchingFiles(self.dataset.rawLocation, self.config.dataFiles,
202 exclude=self.config.dataBadFiles)
203 if dataFiles:
204 self._ingestRaws(dataFiles, processes=processes)
205 self.log.info("Images are now ingested in {0}".format(self.workspace.repo))
206 else:
207 raise RuntimeError("No raw files found at %s." % self.dataset.rawLocation)
209 def _ingestRaws(self, dataFiles, processes):
210 """Ingest raw images into a repository.
212 This task's repository is populated with *links* to ``dataFiles``.
214 Parameters
215 ----------
216 dataFiles : `list` of `str`
217 A list of filenames to ingest. May contain wildcards.
218 processes : `int`
219 The number processes to use to ingest.
221 Raises
222 ------
223 RuntimeError
224 Raised if ``dataFiles`` is empty or any file has already been ingested.
225 """
226 if not dataFiles:
227 raise RuntimeError("No raw files to ingest (expected list of filenames, got %r)." % dataFiles)
229 try:
230 # run=None because expect ingester to name a new collection.
231 # HACK: update_exposure_records=True to modernize exposure records
232 # from old ap_verify datasets. Since the exposure records are
233 # generated from the same files, the only changes should be
234 # schema-related.
235 self.ingester.run(dataFiles, run=None, processes=processes, update_exposure_records=True)
236 except lsst.daf.butler.registry.ConflictingDefinitionError as detail:
237 raise RuntimeError("Not all raw files are unique") from detail
239 def _defineVisits(self, processes):
240 """Map visits to the ingested exposures.
242 This step is necessary to be able to run most pipelines on raw datasets.
244 Parameters
245 ----------
246 processes : `int`
247 The number processes to use to define visits.
249 Raises
250 ------
251 RuntimeError
252 Raised if there are no exposures in the repository.
253 """
254 exposures = set(self.workspace.workButler.registry.queryDataIds(["exposure"]))
255 if not exposures:
256 raise RuntimeError(f"No exposures defined in {self.workspace.repo}.")
258 exposureKeys = list(exposures)[0].dimensions
259 exposuresWithVisits = {x.subset(exposureKeys) for x in
260 self.workspace.workButler.registry.queryDataIds(["exposure", "visit"])}
261 exposuresNoVisits = exposures - exposuresWithVisits
262 if exposuresNoVisits:
263 self.log.info("Defining visits...")
264 self.visitDefiner.run(exposuresNoVisits)
265 else:
266 self.log.info("Visits were previously defined, skipping...")
268 def _copyConfigs(self):
269 """Give a workspace a copy of all configs associated with the
270 ingested data.
272 After this method returns, the config directory in the workspace
273 contains all config files from the ap_verify dataset, and the
274 pipelines directory in the workspace contains all pipeline files
275 from the dataset.
276 """
277 if os.listdir(self.workspace.pipelineDir):
278 self.log.info("Configs already copied, skipping...")
279 else:
280 self.log.info("Storing data-specific configs...")
281 for configFile in _findMatchingFiles(self.dataset.configLocation, ['*.py']):
282 shutil.copy2(configFile, self.workspace.configDir)
283 self.log.info("Configs are now stored in %s.", self.workspace.configDir)
284 for pipelineFile in _findMatchingFiles(self.dataset.pipelineLocation, ['*.yaml']):
285 shutil.copy2(pipelineFile, self.workspace.pipelineDir)
286 self.log.info("Configs are now stored in %s.", self.workspace.pipelineDir)
289def ingestDatasetGen3(dataset, workspace, sasquatchNamespace=None, sasquatchUrl=None, extra=None,
290 processes=1):
291 """Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository.
293 The original data directory is not modified.
295 Parameters
296 ----------
297 dataset : `lsst.ap.verify.dataset.Dataset`
298 The ap_verify dataset to be ingested.
299 workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
300 The abstract location where the epository is be created, if it does
301 not already exist.
302 sasquatchNamespace : `str`, optional
303 The name of the namespace to post the ap_verify metrics to.
304 sasquatchUrl : `str`, optional
305 The URL of the server to post the ap_verify metrics to.
306 extra : `dict`, optional
307 Extra parameters needed to post ap_verify metrics to Sasquatch.
308 processes : `int`, optional
309 The number processes to use to ingest.
310 """
311 log = _LOG.getChild("ingestDataset")
313 ingester = Gen3DatasetIngestTask(
314 dataset, workspace, sasquatchNamespace, sasquatchUrl, extra,
315 config=_getConfig(Gen3DatasetIngestTask, dataset)
316 )
317 ingester.run(processes=processes)
318 log.info("Data ingested")
321def _getConfig(task, dataset):
322 """Return the ingestion config associated with a specific dataset.
324 Parameters
325 ----------
326 task : `lsst.pipe.base.Task`-type
327 The task whose config is needed
328 dataset : `lsst.ap.verify.dataset.Dataset`
329 The dataset whose ingestion config is desired.
331 Returns
332 -------
333 config : ``task.ConfigClass``
334 The config for running ``task`` on ``dataset``.
335 """
336 config = task.ConfigClass()
337 dataset.instrument.applyConfigOverrides(task._DefaultName, config)
338 return config
341def _findMatchingFiles(basePath, include, exclude=None):
342 """Recursively identify files matching one set of patterns and not matching another.
344 Parameters
345 ----------
346 basePath : `str`
347 The path on disk where the files in ``include`` are located.
348 include : iterable of `str`
349 A collection of files (with wildcards) to include. Must not
350 contain paths.
351 exclude : iterable of `str`, optional
352 A collection of filenames (with wildcards) to exclude. Must not
353 contain paths. If omitted, all files matching ``include`` are returned.
355 Returns
356 -------
357 files : `set` of `str`
358 The files in ``basePath`` or any subdirectory that match ``include``
359 but not ``exclude``.
360 """
361 _exclude = exclude if exclude is not None else []
363 allFiles = set()
364 for pattern in include:
365 allFiles.update(glob(os.path.join(basePath, '**', pattern), recursive=True))
367 for pattern in _exclude:
368 excludedFiles = [f for f in allFiles if fnmatch.fnmatch(os.path.basename(f), pattern)]
369 allFiles.difference_update(excludedFiles)
370 return allFiles