Coverage for python / lsst / pipe / base / quantum_graph / formatter.py: 28%
99 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("ProvenanceFormatter",)
32import uuid
33from typing import Any, ClassVar
35import pydantic
37from lsst.daf.butler import FormatterV2
38from lsst.daf.butler.logging import ButlerLogRecords
39from lsst.pex.config import Config
40from lsst.resources import ResourcePath
41from lsst.utils.logging import getLogger
42from lsst.utils.packages import Packages
44from .._task_metadata import TaskMetadata
45from ..pipeline_graph import TaskImportMode
46from ._provenance import ProvenanceQuantumGraphReader
48_LOG = getLogger(__file__)
51class _ProvenanceFormatterParameters(pydantic.BaseModel):
52 """A Pydantic model for validating and applying defaults to the
53 read parameters of `ProvenanceFormatter`.
54 """
56 import_mode: TaskImportMode = TaskImportMode.DO_NOT_IMPORT
57 quanta: list[uuid.UUID] | None = None
58 datasets: list[uuid.UUID] | None = None
59 read_init_quanta: bool = True
61 @pydantic.field_validator("quanta", mode="before")
62 @classmethod
63 def quanta_to_list(cls, v: Any) -> list[uuid.UUID] | None:
64 return list(v) if v is not None else None
66 @pydantic.field_validator("datasets", mode="before")
67 @classmethod
68 def datasets_to_list(cls, v: Any) -> list[uuid.UUID] | None:
69 return list(v) if v is not None else None
71 @property
72 def nodes(self) -> list[uuid.UUID]:
73 if self.quanta is not None:
74 if self.datasets is not None:
75 return self.quanta + self.datasets
76 else:
77 return self.quanta
78 elif self.datasets is not None:
79 return self.datasets
80 raise ValueError("'datasets' and/or 'quanta' parameters are required for this component")
83class ProvenanceFormatter(FormatterV2):
84 """Butler interface for reading `ProvenanceQuantumGraph` objects."""
86 default_extension: ClassVar[str] = ".qg"
87 can_read_from_uri: ClassVar[bool] = True
89 def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any:
90 match self._dataset_ref.datasetType.storageClass_name:
91 case "TaskMetadata" | "PropertySet":
92 return self._read_metadata(uri)
93 case "ButlerLogRecords":
94 return self._read_log(uri)
95 case "Config":
96 return self._read_config(uri)
97 case "ProvenanceQuantumGraph":
98 pass
99 case unexpected:
100 raise ValueError(f"Unsupported storage class {unexpected!r} for ProvenanceFormatter.")
101 parameters = _ProvenanceFormatterParameters.model_validate(self.file_descriptor.parameters or {})
102 with ProvenanceQuantumGraphReader.open(uri, import_mode=parameters.import_mode) as reader:
103 match component:
104 case None:
105 if parameters.read_init_quanta:
106 reader.read_init_quanta()
107 reader.read_quanta(parameters.quanta)
108 reader.read_datasets(parameters.datasets)
109 return reader.graph
110 case "metadata":
111 return reader.fetch_metadata(parameters.nodes)
112 case "logs":
113 return reader.fetch_logs(parameters.nodes)
114 case "packages":
115 return reader.fetch_packages()
116 raise AssertionError(f"Unexpected component {component!r}.")
118 def _read_metadata(self, uri: ResourcePath) -> TaskMetadata:
119 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader:
120 try:
121 attempts = reader.fetch_metadata([self._dataset_ref.id])[self._dataset_ref.id]
122 except LookupError:
123 raise FileNotFoundError(
124 f"No dataset with ID {self._dataset_ref.id} present in this graph."
125 ) from None
126 if not attempts:
127 raise FileNotFoundError(
128 f"No metadata dataset {self._dataset_ref} stored in this graph "
129 "(no attempts for this quantum)."
130 )
131 if attempts[-1] is None:
132 raise FileNotFoundError(
133 f"No metadata dataset {self._dataset_ref} stored in this graph "
134 "(most recent attempt failed and did not write metadata)."
135 )
136 return attempts[-1]
138 def _read_log(self, uri: ResourcePath) -> ButlerLogRecords:
139 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader:
140 try:
141 attempts = reader.fetch_logs([self._dataset_ref.id])[self._dataset_ref.id]
142 except LookupError:
143 raise FileNotFoundError(
144 f"No dataset with ID {self._dataset_ref.id} present in this graph."
145 ) from None
146 if not attempts:
147 raise FileNotFoundError(
148 f"No log dataset {self._dataset_ref} stored in this graph (no attempts for this quantum)."
149 )
150 if attempts[-1] is None:
151 raise FileNotFoundError(
152 f"No log dataset {self._dataset_ref} stored in this graph "
153 "(most recent attempt failed and did not write logs)."
154 )
155 return attempts[-1]
157 def _read_packages(self, uri: ResourcePath) -> Packages:
158 with ProvenanceQuantumGraphReader.open(uri, import_mode=TaskImportMode.DO_NOT_IMPORT) as reader:
159 return reader.fetch_packages()
161 def _read_config(self, uri: ResourcePath) -> Config:
162 task_label = self._dataset_ref.datasetType.name.removesuffix("_config")
163 with ProvenanceQuantumGraphReader.open(
164 uri, import_mode=TaskImportMode.ASSUME_CONSISTENT_EDGES
165 ) as reader:
166 try:
167 return reader.pipeline_graph.tasks[task_label].config.copy()
168 except KeyError:
169 raise FileNotFoundError(
170 f"No task with label {task_label!r} found in the pipeline graph."
171 ) from None