Coverage for python / lsst / ap / verify / workspace.py: 45%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:25 +0000

1# 

2# This file is part of ap_verify. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24__all__ = ["Workspace", "WorkspaceGen3"] 

25 

26import abc 

27import os 

28import pathlib 

29import stat 

30 

31import lsst.daf.butler as dafButler 

32import lsst.obs.base as obsBase 

33 

34 

35class Workspace(metaclass=abc.ABCMeta): 

36 """A directory used by ``ap_verify`` to handle data and outputs. 

37 

38 Any object of this class represents a working directory containing 

39 (possibly empty) subdirectories for various purposes. Subclasses are 

40 typically specialized for particular workflows. Keeping such details in 

41 separate classes makes it easier to provide guarantees without forcing 

42 awkward directory structures on users. 

43 

44 All Workspace classes must guarantee the existence of any subdirectories 

45 inside the workspace. Directories corresponding to repositories do not need 

46 to be initialized, since creating a valid repository usually requires 

47 external information. 

48 

49 Parameters 

50 ---------- 

51 location : `str` 

52 The location on disk where the workspace will be set up. Will be 

53 created if it does not already exist. 

54 

55 Raises 

56 ------ 

57 EnvironmentError 

58 Raised if ``location`` is not readable or not writeable 

59 """ 

60 def __init__(self, location): 

61 # Properties must be `str` for backwards compatibility 

62 self._location = str(pathlib.Path(location).resolve()) 

63 

64 self.mkdir(self._location) 

65 self.mkdir(self.configDir) 

66 

67 @staticmethod 

68 def mkdir(directory): 

69 """Create a directory for the workspace. 

70 

71 This method is intended to be called only by subclasses, and should 

72 not be used by external code. 

73 

74 Parameters 

75 ---------- 

76 directory : `str` 

77 The directory to create. 

78 """ 

79 mode = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH # a+r, u+rwx 

80 pathlib.Path(directory).mkdir(parents=True, exist_ok=True, mode=mode) 

81 

82 def __eq__(self, other): 

83 """Test whether two workspaces are of the same type and have the 

84 same location. 

85 """ 

86 return type(self) is type(other) and self.workDir == other.workDir 

87 

88 def __repr__(self): 

89 """A string representation that can be used to reconstruct the Workspace. 

90 """ 

91 return f"{type(self).__name__}({self.workDir!r})" 

92 

93 @property 

94 def workDir(self): 

95 """The absolute location of the workspace as a whole 

96 (`str`, read-only). 

97 """ 

98 return self._location 

99 

100 @property 

101 def configDir(self): 

102 """The absolute location of a directory containing custom Task config 

103 files for use with the data (`str`, read-only). 

104 """ 

105 return os.path.join(self._location, 'config') 

106 

107 @property 

108 @abc.abstractmethod 

109 def dbLocation(self): 

110 """The default absolute location of the source association database to 

111 be created or updated by the pipeline (`str`, read-only). 

112 

113 Shall be a pathname to a database suitable for the backend of `Apdb`. 

114 """ 

115 

116 @property 

117 @abc.abstractmethod 

118 def dbConfigLocation(self): 

119 """The absolute location of the config file for the source association 

120 database to be created or updated by the pipeline (`str`, read-only). 

121 

122 The location is assumed to be a Python (`lsst.pex.config.Config`) file. 

123 """ 

124 

125 @property 

126 @abc.abstractmethod 

127 def alertLocation(self): 

128 """The absolute location of an output directory for persisted 

129 alert packets (`str`, read-only). 

130 """ 

131 

132 @property 

133 @abc.abstractmethod 

134 def workButler(self): 

135 """A Butler that can produce pipeline inputs and outputs (read-only). 

136 The type is class-dependent. 

137 """ 

138 

139 @property 

140 @abc.abstractmethod 

141 def analysisButler(self): 

142 """A Butler that can read pipeline outputs (read-only). 

143 The type is class-dependent. 

144 

145 The Butler should be read-only, if its type supports the restriction. 

146 """ 

147 

148 

149class WorkspaceGen3(Workspace): 

150 """A directory used by ``ap_verify`` to handle data. 

151 

152 Any object of this class represents a working directory containing 

153 subdirectories for a repository and for non-repository files. Constructing 

154 a WorkspaceGen3 does not *initialize* its repository, as this requires 

155 external information. 

156 

157 Parameters 

158 ---------- 

159 location : `str` 

160 The location on disk where the workspace will be set up. Will be 

161 created if it does not already exist. 

162 

163 Raises 

164 ------ 

165 EnvironmentError 

166 Raised if ``location`` is not readable or not writeable 

167 """ 

168 

169 def __init__(self, location): 

170 super().__init__(location) 

171 

172 self.mkdir(self.repo) 

173 self.mkdir(self.pipelineDir) 

174 

175 # Gen 3 name of the output 

176 self.outputName = "ap_verify-output" 

177 

178 # Lazy evaluation to optimize butlers 

179 self._workButler = None 

180 self._analysisButler = None 

181 

182 @property 

183 def repo(self): 

184 """The absolute path/URI to a Butler repo for AP pipeline processing 

185 (`str`, read-only). 

186 """ 

187 return os.path.join(self._location, 'repo') 

188 

189 @property 

190 def pipelineDir(self): 

191 """The absolute location of a directory containing custom pipeline 

192 files for use with the data (`str`, read-only). 

193 """ 

194 return os.path.join(self._location, 'pipelines') 

195 

196 @property 

197 def dbLocation(self): 

198 return os.path.join(self._location, 'association.db') 

199 

200 @property 

201 def dbConfigLocation(self): 

202 return os.path.join(self._location, 'apdb.py') 

203 

204 @property 

205 def alertLocation(self): 

206 return os.path.join(self._location, 'alerts') 

207 

208 def _ensureCollection(self, registry, name, collectionType): 

209 """Add a collection to a repository if it does not already exist. 

210 

211 Parameters 

212 ---------- 

213 registry : `lsst.daf.butler.Registry` 

214 The repository to which to add the collection. 

215 name : `str` 

216 The name of the collection to test for and add. 

217 collectionType : `lsst.daf.butler.CollectionType` 

218 The type of collection to add. This field is ignored when 

219 testing if a collection exists. 

220 """ 

221 if not self._doesCollectionExist(registry, name): 

222 registry.registerCollection(name, type=collectionType) 

223 

224 def _doesCollectionExist(self, registry, name): 

225 """Check if a collection exists in the registry. 

226 

227 Parameters 

228 ---------- 

229 registry : `lsst.daf.butler.Registry` 

230 The repository that may contain the collection. 

231 name : `str` 

232 The name of the collection to check for existence. 

233 

234 Returns 

235 ------- 

236 exists : `bool` 

237 `True` if the collection exists in the registry, `False` otherwise. 

238 

239 """ 

240 try: 

241 matchingCollections = list(registry.queryCollections(name)) 

242 return len(matchingCollections) > 0 

243 except dafButler.MissingCollectionError: 

244 return False 

245 

246 @property 

247 def workButler(self): 

248 """A Butler that can read and write to a Gen 3 repository (`lsst.daf.butler.Butler`, read-only). 

249 

250 Notes 

251 ----- 

252 Assumes `repo` has been initialized. 

253 """ 

254 if self._workButler is None: 

255 try: 

256 # Dataset generation puts all preloaded datasets in <instrument>/defaults. 

257 # However, this definition excludes raws, which are not preloaded. 

258 queryButler = dafButler.Butler(self.repo, writeable=True) # writeable for _workButler 

259 inputs = [] 

260 for dimension in queryButler.registry.queryDataIds('instrument'): 

261 instrument = obsBase.Instrument.fromName(dimension["instrument"], queryButler.registry) 

262 defaultName = instrument.makeCollectionName("defaults") 

263 inputs.append(defaultName) 

264 rawName = instrument.makeDefaultRawIngestRunName() 

265 inputs.append(rawName) 

266 self._ensureCollection(queryButler.registry, rawName, dafButler.CollectionType.RUN) 

267 

268 # Create an output chain here, so that workButler can see it. 

269 # Definition does not conflict with what pipetask --output uses. 

270 if not self._doesCollectionExist(queryButler.registry, self.outputName): 

271 queryButler.registry.registerCollection(self.outputName, 

272 dafButler.CollectionType.CHAINED) 

273 queryButler.registry.setCollectionChain(self.outputName, inputs) 

274 

275 self._workButler = dafButler.Butler(butler=queryButler, collections=self.outputName) 

276 except OSError as e: 

277 raise RuntimeError(f"{self.repo} is not a Gen 3 repository") from e 

278 return self._workButler 

279 

280 @property 

281 def analysisButler(self): 

282 """A Butler that can read from a Gen 3 repository with outputs (`lsst.daf.butler.Butler`, read-only). 

283 

284 Notes 

285 ----- 

286 Assumes `repo` has been initialized. 

287 """ 

288 if self._analysisButler is None: 

289 try: 

290 self._analysisButler = dafButler.Butler(self.repo, collections=self.outputName, 

291 writeable=False) 

292 except OSError as e: 

293 raise RuntimeError(f"{self.repo} is not a Gen 3 repository") from e 

294 return self._analysisButler