Coverage for python / lsst / pipe / base / quantum_graph / ingest_graph.py: 18%

181 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:49 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""A tool for ingesting provenance quantum graphs (written by the `aggregator` 

29module) and [re-]ingesting other datasets (metadata/logs/configs) backed by the 

30same file. This "finalizes" the RUN collection, prohibiting (at least 

31conceptually) further processing. 

32 

33This always proceeds in three steps, so we can resume efficiently: 

34 

351. First we ask the butler to "forget" any metadata/log/config datasets that 

36 exist in the output RUN collection, removing any record of them from the 

37 butler database while preserving their files. 

38 

392. Next we ingest the ``run_provenance`` graph dataset itself. 

40 

413. Finally, in batches of quanta, we use a 

42 `~lsst.daf.butler.QuantumBackedButler` to delete the original 

43 metadata/log/config files and ingest new versions of those datasets into the 

44 butler. 

45 

46Thus, at any point, if the ``run_provenance`` dataset has not been ingested, 

47we know any metadata/log/config datasets that have been ingested are backed by 

48the original files. 

49 

50Moreover, if the ``run_provenance`` dataset has been ingested, any existing 

51metadata/log/config datasets must be backed by the graph file, and the original 

52files for those datasets will have been deleted. 

53 

54We also know that at all times the metadata/log/config *content* is safely 

55present in either the original files in the butler storage or in an 

56already-ingested ``run_provenance`` dataset. 

57""" 

58 

59from __future__ import annotations 

60 

61__all__ = ("ingest_graph",) 

62 

63import dataclasses 

64import itertools 

65import os 

66import uuid 

67from collections.abc import Iterator 

68from contextlib import contextmanager 

69 

70from lsst.daf.butler import ( 

71 Butler, 

72 Config, 

73 DataCoordinate, 

74 DatasetRef, 

75 DatasetType, 

76 FileDataset, 

77 QuantumBackedButler, 

78) 

79from lsst.daf.butler.registry.sql_registry import SqlRegistry 

80from lsst.resources import ResourcePath, ResourcePathExpression 

81from lsst.utils.logging import getLogger 

82 

83from ..automatic_connection_constants import PROVENANCE_DATASET_TYPE_NAME, PROVENANCE_STORAGE_CLASS 

84from ._provenance import ( 

85 ProvenanceDatasetInfo, 

86 ProvenanceInitQuantumInfo, 

87 ProvenanceQuantumGraph, 

88 ProvenanceQuantumGraphReader, 

89 ProvenanceQuantumInfo, 

90) 

91from .formatter import ProvenanceFormatter 

92 

93_LOG = getLogger(__name__) 

94 

95 

96def ingest_graph( 

97 butler_config: str | Config, 

98 uri: ResourcePathExpression | None = None, 

99 *, 

100 transfer: str | None = "move", 

101 batch_size: int = 10000, 

102 output_run: str | None = None, 

103) -> None: 

104 """Ingest a provenance graph into a butler repository. 

105 

106 Parameters 

107 ---------- 

108 butler_config : `str` 

109 Path or alias for the butler repository, or a butler repository config 

110 object. 

111 uri : `lsst.resources.ResourcePathExpression` or `None`, optional 

112 Location of the provenance quantum graph to ingest. `None` indicates 

113 that the quantum graph has already been ingested, but other ingests 

114 and/or deletions failed and need to be resumed. 

115 transfer : `str` or `None`, optional 

116 Transfer mode to use when ingesting graph. Matches those supported 

117 by `lsst.resources.ResourcePath.transfer_from`. 

118 batch_size : `int`, optional 

119 Number of datasets to process in each transaction. 

120 output_run : `str`, optional 

121 Output `~lsst.daf.butler.CollectionType.RUN` collection name. Only 

122 needs to be provided if ``uri`` is `None`. If it is provided the 

123 output run in the graph is checked against it. 

124 

125 Notes 

126 ----- 

127 After this operation, any further processing done in the 

128 `~lsst.daf.butler.CollectionType.RUN` collection will not be included in 

129 the provenance. 

130 

131 If this process is interrupted, it can pick up where it left off if run 

132 again (at the cost of some duplicate work to figure out how much progress 

133 it had made). 

134 """ 

135 with _GraphIngester.open(butler_config, uri, output_run) as helper: 

136 helper.fetch_already_ingested_datasets() 

137 if not helper.graph_already_ingested: 

138 assert uri is not None 

139 helper.forget_ingested_datasets(batch_size=batch_size) 

140 helper.ingest_graph_dataset(uri, transfer=transfer) 

141 helper.clean_and_reingest_datasets(batch_size=batch_size) 

142 if helper.directories_to_delete: 

143 _LOG.info( 

144 "Deleting %d directories after checking that they are empty.", 

145 len(helper.directories_to_delete), 

146 ) 

147 n_deleted: int = 0 

148 for top in sorted(helper.directories_to_delete): 

149 nonempty: set[str] = set() 

150 for root, dirnames, filenames in os.walk(top, topdown=False): 

151 if filenames: 

152 nonempty.add(root) 

153 for dirname in dirnames: 

154 dirpath = os.path.join(root, dirname) 

155 if dirpath in nonempty: 

156 nonempty.add(root) 

157 else: 

158 os.rmdir(dirpath) 

159 if nonempty: 

160 _LOG.warning( 

161 "Directory %r was not deleted because it unexpectedly still had files in it.", 

162 top, 

163 ) 

164 else: 

165 os.rmdir(root) 

166 n_deleted += 1 

167 _LOG.info("Deleted %d directories.", n_deleted) 

168 

169 

170@dataclasses.dataclass 

171class _GraphIngester: 

172 butler_config: str | Config 

173 butler: Butler 

174 graph: ProvenanceQuantumGraph 

175 graph_already_ingested: bool 

176 n_datasets: int 

177 datasets_already_ingested: set[uuid.UUID] = dataclasses.field(default_factory=set) 

178 directories_to_delete: set[str] = dataclasses.field(default_factory=set) 

179 

180 @property 

181 def output_run(self) -> str: 

182 return self.graph.header.output_run 

183 

184 @classmethod 

185 @contextmanager 

186 def open( 

187 cls, 

188 butler_config: str | Config, 

189 uri: ResourcePathExpression | None, 

190 output_run: str | None, 

191 ) -> Iterator[_GraphIngester]: 

192 with Butler.from_config(butler_config, collections=output_run, writeable=True) as butler: 

193 butler.registry.registerDatasetType( 

194 DatasetType(PROVENANCE_DATASET_TYPE_NAME, butler.dimensions.empty, PROVENANCE_STORAGE_CLASS) 

195 ) 

196 graph, graph_already_ingested = cls.read_graph(butler, uri) 

197 if output_run is not None and graph.header.output_run != output_run: 

198 raise ValueError( 

199 f"Given output run {output_run!r} does not match the graph " 

200 f"header {graph.header.output_run!r}." 

201 ) 

202 n_datasets = 2 * len(graph.quantum_only_xgraph) + len(graph.init_quanta) 

203 yield cls( 

204 butler_config=butler_config, 

205 butler=butler, 

206 graph=graph, 

207 graph_already_ingested=graph_already_ingested, 

208 n_datasets=n_datasets, 

209 ) 

210 

211 @staticmethod 

212 def read_graph( 

213 butler: Butler, 

214 uri: ResourcePathExpression | None, 

215 ) -> tuple[ProvenanceQuantumGraph, bool]: 

216 if uri is not None: 

217 _LOG.info("Reading the pre-ingest provenance graph.") 

218 with ProvenanceQuantumGraphReader.open(uri) as reader: 

219 reader.read_quanta() 

220 reader.read_init_quanta() 

221 graph = reader.graph 

222 already_ingested = ( 

223 butler.find_dataset(PROVENANCE_DATASET_TYPE_NAME, collections=[graph.header.output_run]) 

224 is not None 

225 ) 

226 return graph, already_ingested 

227 else: 

228 _LOG.info("Reading the already-ingested provenance graph.") 

229 parameters = {"datasets": [], "read_init_quanta": True} 

230 return butler.get(PROVENANCE_DATASET_TYPE_NAME, parameters=parameters), True 

231 

232 def fetch_already_ingested_datasets(self) -> None: 

233 _LOG.info("Querying for existing datasets in %r.", self.output_run) 

234 self.datasets_already_ingested.update(self.butler.registry._fetch_run_dataset_ids(self.output_run)) 

235 

236 def iter_datasets(self) -> Iterator[tuple[uuid.UUID, ProvenanceDatasetInfo]]: 

237 xgraph = self.graph.bipartite_xgraph 

238 for task_label, quanta_for_task in self.graph.quanta_by_task.items(): 

239 _LOG.verbose( 

240 "Batching up metadata and log datasets from %d %s quanta.", len(quanta_for_task), task_label 

241 ) 

242 for quantum_id in quanta_for_task.values(): 

243 quantum_info: ProvenanceQuantumInfo = xgraph.nodes[quantum_id] 

244 metadata_id = quantum_info["metadata_id"] 

245 yield metadata_id, xgraph.nodes[metadata_id] 

246 log_id = quantum_info["log_id"] 

247 yield log_id, xgraph.nodes[log_id] 

248 _LOG.verbose("Batching up config datasets from %d tasks.", len(self.graph.init_quanta)) 

249 for task_label, quantum_id in self.graph.init_quanta.items(): 

250 init_quantum_info: ProvenanceInitQuantumInfo = xgraph.nodes[quantum_id] 

251 config_id = init_quantum_info["config_id"] 

252 yield config_id, xgraph.nodes[config_id] 

253 

254 def forget_ingested_datasets(self, batch_size: int) -> None: 

255 _LOG.info( 

256 "Dropping database records for metadata/log/config datasets backed by their original files." 

257 ) 

258 to_forget: list[DatasetRef] = [] 

259 n_forgotten: int = 0 

260 n_skipped: int = 0 

261 for dataset_id, dataset_info in self.iter_datasets(): 

262 if dataset_info["produced"] and dataset_id in self.datasets_already_ingested: 

263 to_forget.append(self._make_ref_from_info(dataset_id, dataset_info)) 

264 self.datasets_already_ingested.remove(dataset_id) 

265 if len(to_forget) >= batch_size: 

266 n_forgotten += self._run_forget(to_forget, n_forgotten + n_skipped) 

267 else: 

268 n_skipped += 1 

269 n_forgotten += self._run_forget(to_forget, n_forgotten + n_skipped) 

270 _LOG.info( 

271 "Removed database records for %d metadata/log/config datasets, while %d were already absent.", 

272 n_forgotten, 

273 n_skipped, 

274 ) 

275 

276 def _run_forget(self, to_forget: list[DatasetRef], n_current: int) -> int: 

277 if to_forget: 

278 _LOG.verbose( 

279 "Forgetting a %d-dataset batch; %d/%d forgotten so far or already absent.", 

280 len(to_forget), 

281 n_current, 

282 self.n_datasets, 

283 ) 

284 with self.butler.registry.transaction(): 

285 self.butler._datastore.forget(to_forget) 

286 self.butler.registry.removeDatasets(to_forget) 

287 n = len(to_forget) 

288 to_forget.clear() 

289 return n 

290 

291 def ingest_graph_dataset(self, uri: ResourcePathExpression, transfer: str | None) -> None: 

292 _LOG.info("Ingesting the provenance quantum graph.") 

293 dataset_type = DatasetType( 

294 PROVENANCE_DATASET_TYPE_NAME, self.butler.dimensions.empty, PROVENANCE_STORAGE_CLASS 

295 ) 

296 self.butler.registry.registerDatasetType(dataset_type) 

297 ref = DatasetRef(dataset_type, DataCoordinate.make_empty(self.butler.dimensions), run=self.output_run) 

298 uri = ResourcePath(uri) 

299 self.butler.ingest( 

300 # We use .abspath() since butler assumes paths are relative to the 

301 # repo root, while users expects them to be relative to the CWD in 

302 # this context. 

303 FileDataset(refs=[ref], path=uri.abspath(), formatter=ProvenanceFormatter), 

304 transfer=transfer, 

305 ) 

306 

307 def clean_and_reingest_datasets(self, batch_size: int) -> None: 

308 _LOG.info( 

309 "Deleting original metadata/log/config files and re-ingesting them with provenance graph backing." 

310 ) 

311 direct_uri = self.butler.getURI(PROVENANCE_DATASET_TYPE_NAME, collections=[self.output_run]) 

312 qbb = self.make_qbb() 

313 to_process: list[DatasetRef] = [] 

314 n_processed: int = 0 

315 n_skipped: int = 0 

316 n_not_produced: int = 0 

317 for dataset_id, dataset_info in self.iter_datasets(): 

318 if not dataset_info["produced"]: 

319 n_not_produced += 1 

320 elif dataset_id not in self.datasets_already_ingested: 

321 to_process.append(self._make_ref_from_info(dataset_id, dataset_info)) 

322 if len(to_process) >= batch_size: 

323 n_processed += self._run_clean_and_ingest( 

324 qbb, direct_uri, to_process, n_processed + n_skipped 

325 ) 

326 else: 

327 n_skipped += 1 

328 n_processed += self._run_clean_and_ingest(qbb, direct_uri, to_process, n_processed + n_skipped) 

329 _LOG.info( 

330 "Deleted and re-ingested %d metadata/log/config datasets " 

331 "(%d had already been processed, %d were not produced).", 

332 n_processed, 

333 n_skipped, 

334 n_not_produced, 

335 ) 

336 

337 def _run_clean_and_ingest( 

338 self, qbb: QuantumBackedButler, direct_uri: ResourcePath, to_process: list[DatasetRef], n_current: int 

339 ) -> int: 

340 if not to_process: 

341 return 0 

342 _LOG.verbose( 

343 "Deleting and re-ingesting a %d-dataset batch; %d/%d complete.", 

344 len(to_process), 

345 n_current, 

346 self.n_datasets, 

347 ) 

348 sql_registry: SqlRegistry = self.butler._registry # type: ignore[attr-defined] 

349 expanded_refs = sql_registry.expand_refs(to_process) 

350 # We need to pass predict=True to keep QBB/FileDatastore from wasting 

351 # time doing existence checks, since ResourcePath.mremove will ignore 

352 # nonexistent files anyway. 

353 original_uris = list( 

354 itertools.chain.from_iterable( 

355 ref_uris.iter_all() for ref_uris in qbb.get_many_uris(expanded_refs, predict=True).values() 

356 ) 

357 ) 

358 removal_status = ResourcePath.mremove(original_uris, do_raise=False) 

359 for path, status in removal_status.items(): 

360 if not status.success and not isinstance(status.exception, FileNotFoundError): 

361 assert status.exception is not None, "Exception should be set if success=False." 

362 status.exception.add_note(f"Attempting to delete original file at {path}.") 

363 raise status.exception 

364 file_dataset = FileDataset(refs=expanded_refs, path=direct_uri, formatter=ProvenanceFormatter) 

365 self.butler.ingest(file_dataset, transfer=None) 

366 if len(original_uris) == len(expanded_refs): 

367 for uri, ref in zip(original_uris, expanded_refs): 

368 if uri.isLocal: 

369 if ( 

370 parent_dir := self.find_dataset_type_directory(uri.ospath, ref.datasetType.name) 

371 ) is not None: 

372 self.directories_to_delete.add(parent_dir) 

373 elif any(uri.isLocal for uri in original_uris): 

374 _LOG.warning( 

375 "Not attempting to delete empty metadata/log/config directories because the number " 

376 "of paths (%s) did not match the number of datasets (%s).", 

377 len(original_uris), 

378 len(expanded_refs), 

379 ) 

380 n = len(to_process) 

381 to_process.clear() 

382 return n 

383 

384 @staticmethod 

385 def _make_ref_from_info(dataset_id: uuid.UUID, dataset_info: ProvenanceDatasetInfo) -> DatasetRef: 

386 return DatasetRef( 

387 dataset_info["pipeline_node"].dataset_type, 

388 dataset_info["data_id"], 

389 run=dataset_info["run"], 

390 id=dataset_id, 

391 ) 

392 

393 def make_qbb(self) -> QuantumBackedButler: 

394 dataset_types = {d.name: d.dataset_type for d in self.graph.pipeline_graph.dataset_types.values()} 

395 return QuantumBackedButler.from_predicted( 

396 config=self.butler_config, 

397 predicted_inputs=(), 

398 predicted_outputs=(), 

399 dimensions=self.butler.dimensions, 

400 datastore_records={}, 

401 dataset_types=dataset_types, 

402 ) 

403 

404 def find_dataset_type_directory(self, ospath: str, dataset_type: str) -> str | None: 

405 dir_components: list[str] = [] 

406 for component in os.path.dirname(ospath).split(os.path.sep): 

407 dir_components.append(component) 

408 # If the full dataset type name is in a single directory path 

409 # component, we guess that directory can only have datasets of 

410 # that type. 

411 if dataset_type in component: 

412 return os.path.sep.join(dir_components) 

413 return None