Coverage for python / lsst / daf / butler / transfers / _context.py: 12%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["RepoExportContext"] 

31 

32from collections import defaultdict 

33from collections.abc import Callable, Iterable, Set 

34from typing import TYPE_CHECKING 

35 

36from .._collection_type import CollectionType 

37from .._dataset_association import DatasetAssociation 

38from .._dataset_ref import DatasetId, DatasetRef 

39from .._dataset_type import DatasetType 

40from .._file_dataset import FileDataset 

41from ..dimensions import DataCoordinate, DimensionElement, DimensionRecord 

42from ..registry.interfaces import ChainedCollectionRecord, CollectionRecord 

43 

44if TYPE_CHECKING: 

45 from lsst.resources import ResourcePathExpression 

46 

47 from ..direct_butler import DirectButler 

48 from ._interfaces import RepoExportBackend 

49 

50 

51class RepoExportContext: 

52 """Public interface for exporting a subset of a data repository. 

53 

54 Instances of this class are obtained by calling `Butler.export` as the 

55 value returned by that context manager:: 

56 

57 with butler.export(filename="export.yaml") as export: 

58 export.saveDataIds(...) 

59 export.saveDatasets(...) 

60 

61 Parameters 

62 ---------- 

63 butler : `lsst.daf.butler.direct_butler.DirectButler` 

64 Butler to export from. 

65 backend : `RepoExportBackend` 

66 Implementation class for a particular export file format. 

67 directory : `~lsst.resources.ResourcePathExpression`, optional 

68 Directory to pass to `Datastore.export`. Can be `None` to use 

69 the current working directory. 

70 transfer : `str`, optional 

71 Transfer mode to pass to `Datastore.export`. 

72 """ 

73 

74 def __init__( 

75 self, 

76 butler: DirectButler, # Requires butler._registry to work for now. 

77 backend: RepoExportBackend, 

78 *, 

79 directory: ResourcePathExpression | None = None, 

80 transfer: str | None = None, 

81 ): 

82 self._butler = butler 

83 self._backend = backend 

84 self._directory = directory 

85 self._transfer = transfer 

86 self._records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict) 

87 self._dataset_ids: set[DatasetId] = set() 

88 self._datasets: dict[DatasetType, dict[str, list[FileDataset]]] = defaultdict( 

89 lambda: defaultdict(list) 

90 ) 

91 self._collections: dict[str, CollectionRecord] = {} 

92 

93 def saveCollection(self, name: str) -> None: 

94 """Export the given collection. 

95 

96 Parameters 

97 ---------- 

98 name : `str` 

99 Name of the collection. 

100 

101 Notes 

102 ----- 

103 `~CollectionType.RUN` collections are also exported automatically when 

104 any dataset referencing them is exported. They may also be explicitly 

105 exported this method to export the collection with no datasets. 

106 Duplicate exports of collections are ignored. 

107 

108 Exporting a `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION` 

109 collection will cause its associations with exported datasets to also 

110 be exported, but it does not export those datasets automatically. 

111 

112 Exporting a `~CollectionType.CHAINED` collection does not automatically 

113 export its child collections; these must be explicitly exported or 

114 already be present in the repository they are being imported into. 

115 """ 

116 self._collections[name] = self._butler._registry.get_collection_record(name) 

117 

118 def saveDimensionData( 

119 self, element: str | DimensionElement, records: Iterable[dict | DimensionRecord] 

120 ) -> None: 

121 """Export the given dimension records associated with one or more data 

122 IDs. 

123 

124 Parameters 

125 ---------- 

126 element : `str` or `DimensionElement` 

127 `DimensionElement` or `str` indicating the logical table these 

128 records are from. 

129 records : `~collections.abc.Iterable` [ `DimensionRecord` or `dict` ] 

130 Records to export, as an iterable containing `DimensionRecord` or 

131 `dict` instances. 

132 """ 

133 if not isinstance(element, DimensionElement): 

134 element = self._butler.dimensions[element] 

135 for record in records: 

136 if not isinstance(record, DimensionRecord): 

137 record = element.RecordClass(**record) 

138 elif record.definition != element: 

139 raise ValueError( 

140 f"Mismatch between element={element.name} and " 

141 f"dimension record with definition={record.definition.name}." 

142 ) 

143 self._records[element].setdefault(record.dataId, record) 

144 

145 def saveDataIds( 

146 self, 

147 dataIds: Iterable[DataCoordinate], 

148 *, 

149 elements: Iterable[str | DimensionElement] | None = None, 

150 ) -> None: 

151 """Export the dimension records associated with one or more data IDs. 

152 

153 Parameters 

154 ---------- 

155 dataIds : `~collections.abc.Iterable` of `DataCoordinate` 

156 Data IDs to export. For large numbers of data IDs obtained by 

157 calls to `Registry.queryDataIds`, it will be much more efficient if 

158 these are expanded to include records (i.e. 

159 `DataCoordinate.hasRecords` returns `True`) prior to the call to 

160 `saveDataIds` via e.g. ``Registry.queryDataIds(...).expanded()``. 

161 elements : `~collections.abc.Iterable` [`DimensionElement`] or `str`,\ 

162 optional 

163 Dimension elements whose records should be exported. If `None`, 

164 records for all dimensions will be exported. 

165 """ 

166 standardized_elements: Set[DimensionElement] 

167 if elements is None: 

168 standardized_elements = frozenset( 

169 element for element in self._butler.dimensions.elements if element.has_own_table 

170 ) 

171 else: 

172 standardized_elements = set() 

173 for element in elements: 

174 if not isinstance(element, DimensionElement): 

175 element = self._butler.dimensions[element] 

176 if element.has_own_table: 

177 standardized_elements.add(element) 

178 

179 dimension_records = self._butler._extract_all_dimension_records_from_data_ids( 

180 self._butler, set(dataIds), frozenset(standardized_elements) 

181 ) 

182 for element, record_mapping in dimension_records.items(): 

183 if element in standardized_elements: 

184 for record in record_mapping.values(): 

185 self._records[element].setdefault(record.dataId, record) 

186 

187 def saveDatasets( 

188 self, 

189 refs: Iterable[DatasetRef], 

190 *, 

191 elements: Iterable[str | DimensionElement] | None = None, 

192 rewrite: Callable[[FileDataset], FileDataset] | None = None, 

193 ) -> None: 

194 """Export one or more datasets. 

195 

196 This automatically exports any `DatasetType`, `~CollectionType.RUN` 

197 collections, and dimension records associated with the datasets. 

198 

199 Parameters 

200 ---------- 

201 refs : `~collections.abc.Iterable` of `DatasetRef` 

202 References to the datasets to export. Their `DatasetRef.id` 

203 attributes must not be `None`. Duplicates are automatically 

204 ignored. Nested data IDs must have `DataCoordinate.hasRecords` 

205 return `True`. If any reference is to a component dataset, the 

206 parent will be exported instead. 

207 elements : `~collections.abc.Iterable` [`DimensionElement`] \ 

208 or `str`, optional 

209 Dimension elements whose records should be exported; this is 

210 forwarded to `saveDataIds` when exporting the data IDs of the 

211 given datasets. 

212 rewrite : `~collections.abc.Callable`, optional 

213 A callable that takes a single `FileDataset` argument and returns 

214 a modified `FileDataset`. This is typically used to rewrite the 

215 path generated by the datastore. If `None`, the `FileDataset` 

216 returned by `Datastore.export` will be used directly. 

217 

218 Notes 

219 ----- 

220 At present, this only associates datasets with `~CollectionType.RUN` 

221 collections. Other collections will be included in the export in the 

222 future (once `Registry` provides a way to look up that information). 

223 """ 

224 data_ids = set() 

225 refs_to_export = {} 

226 for ref in sorted(refs): 

227 dataset_id = ref.id 

228 # The query interfaces that are often used to generate the refs 

229 # passed here often don't remove duplicates, so do that here for 

230 # convenience. 

231 if dataset_id in self._dataset_ids or dataset_id in refs_to_export: 

232 continue 

233 # Also convert components to composites. 

234 if ref.isComponent(): 

235 ref = ref.makeCompositeRef() 

236 data_ids.add(ref.dataId) 

237 refs_to_export[dataset_id] = ref 

238 # Do a vectorized datastore export, which might be a lot faster than 

239 # one-by-one. 

240 exports = self._butler._datastore.export( 

241 refs_to_export.values(), 

242 directory=self._directory, 

243 transfer=self._transfer, 

244 ) 

245 # Export associated data IDs. 

246 self.saveDataIds(data_ids, elements=elements) 

247 # Rewrite export filenames if desired, and then save them to the 

248 # data structure we'll write in `_finish`. 

249 # If a single exported FileDataset has multiple DatasetRefs, we save 

250 # it with each of them. 

251 for file_dataset in exports: 

252 if rewrite is not None: 

253 file_dataset = rewrite(file_dataset) 

254 for ref in file_dataset.refs: 

255 assert ref.run is not None 

256 self._datasets[ref.datasetType][ref.run].append(file_dataset) 

257 self._dataset_ids.update(refs_to_export.keys()) 

258 

259 def _finish(self) -> None: 

260 """Delegate to the backend to finish the export process. 

261 

262 For use by `Butler.export` only. 

263 """ 

264 for element in self._butler.dimensions.sorted(self._records.keys()): 

265 # To make export deterministic sort the DataCoordinate instances. 

266 r = self._records[element] 

267 self._backend.saveDimensionData(element, *[r[dataId] for dataId in sorted(r.keys())]) 

268 for datasetsByRun in self._datasets.values(): 

269 for run in datasetsByRun: 

270 self._collections[run] = self._butler._registry.get_collection_record(run) 

271 for collectionName in self._computeSortedCollections(): 

272 doc = self._butler.registry.getCollectionDocumentation(collectionName) 

273 self._backend.saveCollection(self._collections[collectionName], doc) 

274 # Sort the dataset types and runs before exporting to ensure 

275 # reproducible order in export file. 

276 for datasetType in sorted(self._datasets.keys()): 

277 for run in sorted(self._datasets[datasetType].keys()): 

278 # Sort the FileDataset 

279 records = sorted(self._datasets[datasetType][run]) 

280 self._backend.saveDatasets(datasetType, run, *records) 

281 # Export associations between datasets and collections. These need to 

282 # be sorted (at two levels; they're dicts) or created more 

283 # deterministically, too, which probably involves more data ID sorting. 

284 datasetAssociations = self._computeDatasetAssociations() 

285 for collection in sorted(datasetAssociations): 

286 self._backend.saveDatasetAssociations( 

287 collection, self._collections[collection].type, sorted(datasetAssociations[collection]) 

288 ) 

289 self._backend.finish() 

290 

291 def _computeSortedCollections(self) -> list[str]: 

292 """Sort collections in a way that is both deterministic and safe 

293 for registering them in a new repo in the presence of nested chains. 

294 

295 This method is intended for internal use by `RepoExportContext` only. 

296 

297 Returns 

298 ------- 

299 names: `List` [ `str` ] 

300 Ordered list of collection names. 

301 """ 

302 # Split collections into CHAINED and everything else, and just 

303 # sort "everything else" lexicographically since there are no 

304 # dependencies. 

305 chains: dict[str, list[str]] = {} 

306 result: list[str] = [] 

307 for record in self._collections.values(): 

308 if record.type is CollectionType.CHAINED: 

309 assert isinstance(record, ChainedCollectionRecord) 

310 chains[record.name] = list(record.children) 

311 else: 

312 result.append(record.name) 

313 result.sort() 

314 # Sort all chains topologically, breaking ties lexicographically. 

315 # Append these to 'result' and remove them from 'chains' as we go. 

316 while chains: 

317 unblocked = { 

318 parent 

319 for parent, children in chains.items() 

320 if not any(child in chains for child in children) 

321 } 

322 if not unblocked: 

323 raise RuntimeError( 

324 f"Apparent cycle in CHAINED collection dependencies involving {unblocked}." 

325 ) 

326 result.extend(sorted(unblocked)) 

327 for name in unblocked: 

328 del chains[name] 

329 return result 

330 

331 def _computeDatasetAssociations(self) -> dict[str, list[DatasetAssociation]]: 

332 """Return datasets-collection associations, grouped by association. 

333 

334 This queries for all associations between exported datasets and 

335 exported TAGGED or CALIBRATION collections and is intended to be run 

336 only by `_finish`, as this ensures all collections and all datasets 

337 have already been exported and hence the order in which they are 

338 exported does not matter. 

339 

340 Returns 

341 ------- 

342 associations : `dict` [ `str`, `list` [ `DatasetAssociation` ] ] 

343 Dictionary keyed by collection name, with values lists of structs 

344 representing an association between that collection and a dataset. 

345 """ 

346 results = defaultdict(list) 

347 for datasetType in self._datasets: 

348 # We query for _all_ datasets of each dataset type we export, in 

349 # the specific collections we are exporting. The worst-case 

350 # efficiency of this is _awful_ (i.e. big repo, exporting a tiny 

351 # subset). But we don't have any better options right now; we need 

352 # a way to query for a _lot_ of explicitly given dataset_ids, and 

353 # the only way to make that scale up is to either upload them to a 

354 # temporary table or recognize when they are already in one because 

355 # the user passed us a QueryResult object. That's blocked by (at 

356 # least) DM-26692. 

357 collectionTypes = {CollectionType.TAGGED} 

358 if datasetType.isCalibration(): 

359 collectionTypes.add(CollectionType.CALIBRATION) 

360 for association in self._butler.registry.queryDatasetAssociations( 

361 datasetType, self._collections.keys(), collectionTypes=collectionTypes, flattenChains=False 

362 ): 

363 if association.ref.id in self._dataset_ids: 

364 results[association.collection].append(association) 

365 return results