Coverage for python / lsst / pipe / base / quantum_graph / formatter.py: 28%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("ProvenanceFormatter",) 

31 

32import uuid 

33from typing import Any, ClassVar 

34 

35import pydantic 

36 

37from lsst.daf.butler import FormatterV2 

38from lsst.daf.butler.logging import ButlerLogRecords 

39from lsst.pex.config import Config 

40from lsst.resources import ResourcePath 

41from lsst.utils.logging import getLogger 

42from lsst.utils.packages import Packages 

43 

44from .._task_metadata import TaskMetadata 

45from ..pipeline_graph import TaskImportMode 

46from ._provenance import ProvenanceQuantumGraphReader 

47 

48_LOG = getLogger(__file__) 

49 

50 

51class _ProvenanceFormatterParameters(pydantic.BaseModel): 

52 """A Pydantic model for validating and applying defaults to the 

53 read parameters of `ProvenanceFormatter`. 

54 """ 

55 

56 import_mode: TaskImportMode = TaskImportMode.DO_NOT_IMPORT 

57 quanta: list[uuid.UUID] | None = None 

58 datasets: list[uuid.UUID] | None = None 

59 read_init_quanta: bool = True 

60 

61 @pydantic.field_validator("quanta", mode="before") 

62 @classmethod 

63 def quanta_to_list(cls, v: Any) -> list[uuid.UUID] | None: 

64 return list(v) if v is not None else None 

65 

66 @pydantic.field_validator("datasets", mode="before") 

67 @classmethod 

68 def datasets_to_list(cls, v: Any) -> list[uuid.UUID] | None: 

69 return list(v) if v is not None else None 

70 

71 @property 

72 def nodes(self) -> list[uuid.UUID]: 

73 if self.quanta is not None: 

74 if self.datasets is not None: 

75 return self.quanta + self.datasets 

76 else: 

77 return self.quanta 

78 elif self.datasets is not None: 

79 return self.datasets 

80 raise ValueError("'datasets' and/or 'quanta' parameters are required for this component") 

81 

82 

83class ProvenanceFormatter(FormatterV2): 

84 """Butler interface for reading `ProvenanceQuantumGraph` objects.""" 

85 

86 default_extension: ClassVar[str] = ".qg" 

87 can_read_from_uri: ClassVar[bool] = True 

88 

89 def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any: 

90 match self._dataset_ref.datasetType.storageClass_name: 

91 case "TaskMetadata" | "PropertySet": 

92 return self._read_metadata(uri) 

93 case "ButlerLogRecords": 

94 return self._read_log(uri) 

95 case "Config": 

96 return self._read_config(uri) 

97 case "ProvenanceQuantumGraph": 

98 pass 

99 case unexpected: 

100 raise ValueError(f"Unsupported storage class {unexpected!r} for ProvenanceFormatter.") 

101 parameters = _ProvenanceFormatterParameters.model_validate(self.file_descriptor.parameters or {}) 

102 with ProvenanceQuantumGraphReader.open(uri, import_mode=parameters.import_mode) as reader: 

103 match component: 

104 case None: 

105 if parameters.read_init_quanta: 

106 reader.read_init_quanta() 

107 reader.read_quanta(parameters.quanta) 

108 reader.read_datasets(parameters.datasets) 

109 return reader.graph 

110 case "metadata": 

111 return reader.fetch_metadata(parameters.nodes) 

112 case "logs": 

113 return reader.fetch_logs(parameters.nodes) 

114 case "packages": 

115 return reader.fetch_packages() 

116 raise AssertionError(f"Unexpected component {component!r}.") 

117 

118 def _read_metadata(self, uri: ResourcePath) -> TaskMetadata: 

119 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader: 

120 try: 

121 attempts = reader.fetch_metadata([self._dataset_ref.id])[self._dataset_ref.id] 

122 except LookupError: 

123 raise FileNotFoundError( 

124 f"No dataset with ID {self._dataset_ref.id} present in this graph." 

125 ) from None 

126 if not attempts: 

127 raise FileNotFoundError( 

128 f"No metadata dataset {self._dataset_ref} stored in this graph " 

129 "(no attempts for this quantum)." 

130 ) 

131 if attempts[-1] is None: 

132 raise FileNotFoundError( 

133 f"No metadata dataset {self._dataset_ref} stored in this graph " 

134 "(most recent attempt failed and did not write metadata)." 

135 ) 

136 return attempts[-1] 

137 

138 def _read_log(self, uri: ResourcePath) -> ButlerLogRecords: 

139 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader: 

140 try: 

141 attempts = reader.fetch_logs([self._dataset_ref.id])[self._dataset_ref.id] 

142 except LookupError: 

143 raise FileNotFoundError( 

144 f"No dataset with ID {self._dataset_ref.id} present in this graph." 

145 ) from None 

146 if not attempts: 

147 raise FileNotFoundError( 

148 f"No log dataset {self._dataset_ref} stored in this graph (no attempts for this quantum)." 

149 ) 

150 if attempts[-1] is None: 

151 raise FileNotFoundError( 

152 f"No log dataset {self._dataset_ref} stored in this graph " 

153 "(most recent attempt failed and did not write logs)." 

154 ) 

155 return attempts[-1] 

156 

157 def _read_packages(self, uri: ResourcePath) -> Packages: 

158 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader: 

159 return reader.fetch_packages() 

160 

161 def _read_config(self, uri: ResourcePath) -> Config: 

162 task_label = self._dataset_ref.datasetType.name.removesuffix("_config") 

163 with ProvenanceQuantumGraphReader.open( 

164 uri, import_mode=TaskImportMode.ASSUME_CONSISTENT_EDGES 

165 ) as reader: 

166 try: 

167 return reader.pipeline_graph.tasks[task_label].config.copy() 

168 except KeyError: 

169 raise FileNotFoundError( 

170 f"No task with label {task_label!r} found in the pipeline graph." 

171 ) from None