Coverage for python / lsst / ap / verify / workspace.py: 45%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 17:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-04 17:46 +0000
1#
2# This file is part of ap_verify.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24__all__ = ["Workspace", "WorkspaceGen3"]
26import abc
27import os
28import pathlib
29import stat
31import lsst.daf.butler as dafButler
32import lsst.obs.base as obsBase
35class Workspace(metaclass=abc.ABCMeta):
36 """A directory used by ``ap_verify`` to handle data and outputs.
38 Any object of this class represents a working directory containing
39 (possibly empty) subdirectories for various purposes. Subclasses are
40 typically specialized for particular workflows. Keeping such details in
41 separate classes makes it easier to provide guarantees without forcing
42 awkward directory structures on users.
44 All Workspace classes must guarantee the existence of any subdirectories
45 inside the workspace. Directories corresponding to repositories do not need
46 to be initialized, since creating a valid repository usually requires
47 external information.
49 Parameters
50 ----------
51 location : `str`
52 The location on disk where the workspace will be set up. Will be
53 created if it does not already exist.
55 Raises
56 ------
57 EnvironmentError
58 Raised if ``location`` is not readable or not writeable
59 """
60 def __init__(self, location):
61 # Properties must be `str` for backwards compatibility
62 self._location = str(pathlib.Path(location).resolve())
64 self.mkdir(self._location)
65 self.mkdir(self.configDir)
67 @staticmethod
68 def mkdir(directory):
69 """Create a directory for the workspace.
71 This method is intended to be called only by subclasses, and should
72 not be used by external code.
74 Parameters
75 ----------
76 directory : `str`
77 The directory to create.
78 """
79 mode = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH # a+r, u+rwx
80 pathlib.Path(directory).mkdir(parents=True, exist_ok=True, mode=mode)
82 def __eq__(self, other):
83 """Test whether two workspaces are of the same type and have the
84 same location.
85 """
86 return type(self) is type(other) and self.workDir == other.workDir
88 def __repr__(self):
89 """A string representation that can be used to reconstruct the Workspace.
90 """
91 return f"{type(self).__name__}({self.workDir!r})"
93 @property
94 def workDir(self):
95 """The absolute location of the workspace as a whole
96 (`str`, read-only).
97 """
98 return self._location
100 @property
101 def configDir(self):
102 """The absolute location of a directory containing custom Task config
103 files for use with the data (`str`, read-only).
104 """
105 return os.path.join(self._location, 'config')
107 @property
108 @abc.abstractmethod
109 def dbLocation(self):
110 """The default absolute location of the source association database to
111 be created or updated by the pipeline (`str`, read-only).
113 Shall be a pathname to a database suitable for the backend of `Apdb`.
114 """
116 @property
117 @abc.abstractmethod
118 def dbConfigLocation(self):
119 """The absolute location of the config file for the source association
120 database to be created or updated by the pipeline (`str`, read-only).
122 The location is assumed to be a Python (`lsst.pex.config.Config`) file.
123 """
125 @property
126 @abc.abstractmethod
127 def alertLocation(self):
128 """The absolute location of an output directory for persisted
129 alert packets (`str`, read-only).
130 """
132 @property
133 @abc.abstractmethod
134 def workButler(self):
135 """A Butler that can produce pipeline inputs and outputs (read-only).
136 The type is class-dependent.
137 """
139 @property
140 @abc.abstractmethod
141 def analysisButler(self):
142 """A Butler that can read pipeline outputs (read-only).
143 The type is class-dependent.
145 The Butler should be read-only, if its type supports the restriction.
146 """
149class WorkspaceGen3(Workspace):
150 """A directory used by ``ap_verify`` to handle data.
152 Any object of this class represents a working directory containing
153 subdirectories for a repository and for non-repository files. Constructing
154 a WorkspaceGen3 does not *initialize* its repository, as this requires
155 external information.
157 Parameters
158 ----------
159 location : `str`
160 The location on disk where the workspace will be set up. Will be
161 created if it does not already exist.
163 Raises
164 ------
165 EnvironmentError
166 Raised if ``location`` is not readable or not writeable
167 """
169 def __init__(self, location):
170 super().__init__(location)
172 self.mkdir(self.repo)
173 self.mkdir(self.pipelineDir)
175 # Gen 3 name of the output
176 self.outputName = "ap_verify-output"
178 # Lazy evaluation to optimize butlers
179 self._workButler = None
180 self._analysisButler = None
182 @property
183 def repo(self):
184 """The absolute path/URI to a Butler repo for AP pipeline processing
185 (`str`, read-only).
186 """
187 return os.path.join(self._location, 'repo')
189 @property
190 def pipelineDir(self):
191 """The absolute location of a directory containing custom pipeline
192 files for use with the data (`str`, read-only).
193 """
194 return os.path.join(self._location, 'pipelines')
196 @property
197 def dbLocation(self):
198 return os.path.join(self._location, 'association.db')
200 @property
201 def dbConfigLocation(self):
202 return os.path.join(self._location, 'apdb.py')
204 @property
205 def alertLocation(self):
206 return os.path.join(self._location, 'alerts')
208 def _ensureCollection(self, registry, name, collectionType):
209 """Add a collection to a repository if it does not already exist.
211 Parameters
212 ----------
213 registry : `lsst.daf.butler.Registry`
214 The repository to which to add the collection.
215 name : `str`
216 The name of the collection to test for and add.
217 collectionType : `lsst.daf.butler.CollectionType`
218 The type of collection to add. This field is ignored when
219 testing if a collection exists.
220 """
221 if not self._doesCollectionExist(registry, name):
222 registry.registerCollection(name, type=collectionType)
224 def _doesCollectionExist(self, registry, name):
225 """Check if a collection exists in the registry.
227 Parameters
228 ----------
229 registry : `lsst.daf.butler.Registry`
230 The repository that may contain the collection.
231 name : `str`
232 The name of the collection to check for existence.
234 Returns
235 -------
236 exists : `bool`
237 `True` if the collection exists in the registry, `False` otherwise.
239 """
240 try:
241 matchingCollections = list(registry.queryCollections(name))
242 return len(matchingCollections) > 0
243 except dafButler.MissingCollectionError:
244 return False
246 @property
247 def workButler(self):
248 """A Butler that can read and write to a Gen 3 repository (`lsst.daf.butler.Butler`, read-only).
250 Notes
251 -----
252 Assumes `repo` has been initialized.
253 """
254 if self._workButler is None:
255 try:
256 # Dataset generation puts all preloaded datasets in <instrument>/defaults.
257 # However, this definition excludes raws, which are not preloaded.
258 queryButler = dafButler.Butler(self.repo, writeable=True) # writeable for _workButler
259 inputs = []
260 for dimension in queryButler.registry.queryDataIds('instrument'):
261 instrument = obsBase.Instrument.fromName(dimension["instrument"], queryButler.registry)
262 defaultName = instrument.makeCollectionName("defaults")
263 inputs.append(defaultName)
264 rawName = instrument.makeDefaultRawIngestRunName()
265 inputs.append(rawName)
266 self._ensureCollection(queryButler.registry, rawName, dafButler.CollectionType.RUN)
268 # Create an output chain here, so that workButler can see it.
269 # Definition does not conflict with what pipetask --output uses.
270 if not self._doesCollectionExist(queryButler.registry, self.outputName):
271 queryButler.registry.registerCollection(self.outputName,
272 dafButler.CollectionType.CHAINED)
273 queryButler.registry.setCollectionChain(self.outputName, inputs)
275 self._workButler = dafButler.Butler(butler=queryButler, collections=self.outputName)
276 except OSError as e:
277 raise RuntimeError(f"{self.repo} is not a Gen 3 repository") from e
278 return self._workButler
280 @property
281 def analysisButler(self):
282 """A Butler that can read from a Gen 3 repository with outputs (`lsst.daf.butler.Butler`, read-only).
284 Notes
285 -----
286 Assumes `repo` has been initialized.
287 """
288 if self._analysisButler is None:
289 try:
290 self._analysisButler = dafButler.Butler(self.repo, collections=self.outputName,
291 writeable=False)
292 except OSError as e:
293 raise RuntimeError(f"{self.repo} is not a Gen 3 repository") from e
294 return self._analysisButler