Coverage for python / lsst / daf / butler / _dataset_provenance.py: 11%
184 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:37 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DatasetProvenance",)
32import re
33import uuid
34from typing import TYPE_CHECKING, Any, Self, TypeAlias
36import pydantic
38from ._dataset_ref import DatasetRef, SerializedDatasetRef
39from .dimensions import DataIdValue
41if TYPE_CHECKING:
42 from collections.abc import Mapping, MutableMapping
44 from ._butler import Butler
46# Types that can be stored in a provenance dictionary.
47_PROV_TYPES: TypeAlias = str | int | float | bool | DataIdValue | uuid.UUID
50class DatasetProvenance(pydantic.BaseModel):
51 """Provenance of a single `DatasetRef`."""
53 inputs: list[SerializedDatasetRef] = pydantic.Field(default_factory=list)
54 """The input datasets."""
55 quantum_id: uuid.UUID | None = None
56 """Identifier of the Quantum that was executed."""
57 extras: dict[uuid.UUID, dict[str, _PROV_TYPES]] = pydantic.Field(default_factory=dict)
58 """Extra provenance information associated with a particular dataset."""
59 _uuids: set[uuid.UUID] = pydantic.PrivateAttr(default_factory=set)
61 @pydantic.model_validator(mode="after")
62 def populate_cache(self) -> Self:
63 for ref in self.inputs:
64 self._uuids.add(ref.id)
65 return self
67 def add_input(self, ref: DatasetRef) -> None:
68 """Add an input dataset to the provenance.
70 Parameters
71 ----------
72 ref : `DatasetRef`
73 A dataset to register as an input.
74 """
75 if ref.id in self._uuids:
76 # Already registered.
77 return
78 self._uuids.add(ref.id)
79 self.inputs.append(ref.to_simple())
81 def add_extra_provenance(self, dataset_id: uuid.UUID, extra: Mapping[str, _PROV_TYPES]) -> None:
82 """Attach extra provenance to a specific dataset.
84 Parameters
85 ----------
86 dataset_id : `uuid.UUID`
87 The ID of the dataset to receive this provenance.
88 extra : `~collections.abc.Mapping` [ `str`, `typing.Any` ]
89 The extra provenance information as a dictionary. The values
90 must be simple Python scalars or scalars that can be serialized
91 by Pydantic and convert to a simple string value.
93 Notes
94 -----
95 The keys in the extra provenance can not include provenance keys of
96 ``run``, ``id``, or ``datasettype`` (in upper or lower case).
97 """
98 if dataset_id not in self._uuids:
99 raise ValueError(f"The given dataset ID {dataset_id} is not known to this provenance instance.")
100 extra_keys = {k.lower() for k in extra.keys()}
101 if overlap := (extra_keys & {"run", "id", "datasettype"}):
102 raise ValueError(f"Extra provenance includes a reserved provenance key: {overlap}")
104 self.extras.setdefault(dataset_id, {}).update(extra)
106 def to_flat_dict(
107 self,
108 ref: DatasetRef | None,
109 /,
110 *,
111 prefix: str = "",
112 sep: str = ".",
113 simple_types: bool = False,
114 use_upper: bool | None = None,
115 max_inputs: int | None = None,
116 store_minimalist_inputs: bool = False,
117 ) -> dict[str, _PROV_TYPES]:
118 """Return provenance as a flattened dictionary.
120 Parameters
121 ----------
122 ref : `DatasetRef` or `None`
123 If given, a dataset for which this provenance is relevant and
124 should be included.
125 prefix : `str`, optional
126 A prefix to use for each key in the provenance dictionary.
127 sep : `str`, optional
128 Separator to use to represent hierarchy. Must be a single
129 character. Can not be a number, letter, or underscore (to avoid
130 confusion with provenance keys themselves).
131 simple_types : `bool`, optional
132 If `True` only simple Python types will be used in the returned
133 dictionary, specifically UUIDs will be returned as `str`. If
134 `False`, UUIDs will be returned as `uuid.UUID`. Complex types
135 found in `DatasetProvenance.extras` will be cast to a `str`
136 if `True`.
137 use_upper : `bool` or `None`, optional
138 If `None` the case of the keys matches the case of the first
139 character of the prefix (defined by whether `str.isupper` returns
140 true, else they will be lower case). If `False` the case will be
141 lower case, and if `True` the case will be upper case.
142 max_inputs : `int` or `None`, optional
143 Maximum number of inputs to be recorded in provenance. `None`
144 results in all inputs being recorded. If the number of inputs
145 exceeds this value no input provenance will be recorded.
146 store_minimalist_inputs : `bool`, optional
147 If `True` only the ID of the input is stored along with explicit
148 extras. If `False` the run and dataset type are also recorded.
150 Returns
151 -------
152 prov : `dict`
153 Dictionary representing the provenance. The keys are defined
154 in the notes below.
156 Notes
157 -----
158 Keys from the given dataset (all optional if no dataset is given):
160 :id: UUID of the given dataset.
161 :run: Run of the given dataset.
162 :datasettype: Dataset type of the given dataset.
163 :dataid x: An entry for each required dimension, "x", in the data ID.
165 Each input dataset will have the ``id``, ``run``, and ``datasettype``
166 keys as defined above (but no ``dataid`` key) with an ``input N``
167 prefix where ``N`` starts counting at 0. It is possible to drop
168 the ``datasettype`` and ``run`` to save space by using the
169 ``store_minimalist_inputs`` flag.
171 If there are too many inputs (see the ``max_inputs`` parameters)
172 no inputs will be recorded. The number of inputs is always recorded
173 to indicate that the inputs were dropped.
175 The quantum ID, if present, will use key ``quantum``.
177 Examples
178 --------
179 >>> provenance.to_flat_dict(
180 ... ref, prefix="lsst.butler", sep=".", simple_types=True
181 ... )
182 {
183 "lsst.butler.id": "ae0fa83d-cc89-41dd-9680-f97ede49f01e",
184 "lsst.butler.run": "test_run",
185 "lsst.butler.datasettype": "data",
186 "lsst.butler.dataid.detector": 10,
187 "lsst.butler.dataid.instrument": "LSSTCam",
188 "lsst.butler.quantum": "d93a735b-08f0-477d-bc95-2cc32d6d898b",
189 "lsst.butler.n_inputs": 2,
190 "lsst.butler.input.0.id": "3dfd7ba5-5e35-4565-9d87-4b33880ed06c",
191 "lsst.butler.input.0.run": "other_run",
192 "lsst.butler.input.0.datasettype": "astropy_parquet",
193 "lsst.butler.input.1.id": "7a99f6e9-4035-3d68-842e-58ecce1dc935",
194 "lsst.butler.input.1.run": "other_run",
195 "lsst.butler.input.1.datasettype": "astropy_parquet",
196 }
198 Raises
199 ------
200 ValueError
201 Raised if the separator is not a single character.
202 """
203 if len(sep) != 1:
204 raise ValueError(f"Separator for provenance keys must be a single character. Got {sep!r}.")
205 if re.match(r"[_\w\d]$", sep):
206 raise ValueError(
207 f"Separator for provenance keys can not be word character or underscore. Got {sep!r}."
208 )
210 def _make_key(*keys: str | int) -> str:
211 """Make the key in the correct form with simpler API."""
212 return self._make_provenance_key(prefix, sep, use_upper, *keys)
214 prov: dict[str, _PROV_TYPES] = {}
215 if ref is not None:
216 prov[_make_key("id")] = ref.id if not simple_types else str(ref.id)
217 prov[_make_key("run")] = ref.run
218 prov[_make_key("datasettype")] = ref.datasetType.name
219 for k, v in sorted(ref.dataId.required.items()):
220 prov[_make_key("dataid", k)] = v
222 if self.quantum_id is not None:
223 prov[_make_key("quantum")] = self.quantum_id if not simple_types else str(self.quantum_id)
225 # Record the number of inputs so that people can determine how many
226 # there were even if they were dropped because they exceeded the
227 # allowed maximum. Do not record the count if we have a null provenance
228 # state with no ref and no inputs.
229 if ref is not None or len(self.inputs) > 0:
230 prov[_make_key("n_inputs")] = len(self.inputs)
232 # Remove all inputs if the maximum is found. Truncating to the
233 # maximum (or auto switching to minimalist mode and increasing the
234 # maximum by 3) is not preferred.
235 inputs = (
236 self.inputs
237 if max_inputs is None or (max_inputs is not None and len(self.inputs) <= max_inputs)
238 else []
239 )
240 for i, input in enumerate(inputs):
241 prov[_make_key("input", i, "id")] = input.id if not simple_types else str(input.id)
242 if not store_minimalist_inputs:
243 if input.run is not None: # for mypy
244 prov[_make_key("input", i, "run")] = input.run
245 if input.datasetType is not None: # for mypy
246 prov[_make_key("input", i, "datasettype")] = input.datasetType.name
248 if input.id in self.extras:
249 for xk, xv in self.extras[input.id].items():
250 if simple_types and not isinstance(xv, str | float | int | bool):
251 xv = str(xv)
252 prov[_make_key("input", i, xk)] = xv
254 return prov
256 @staticmethod
257 def _make_provenance_key(prefix: str, sep: str, use_upper: bool | None, *keys: str | int) -> str:
258 """Construct provenance key from prefix and separator.
260 Parameters
261 ----------
262 prefix : `str`
263 A prefix to use for each key in the provenance dictionary.
264 sep : `str`
265 Separator to use to represent hierarchy. Must be a single
266 character.
267 use_upper : `bool` or `None`
268 If `True` use upper case for provenance keys, if `False` use lower
269 case, if `None` match the case of the prefix.
270 *keys : `tuple` of `str` | `int`
271 Components of key to combine with prefix and separator.
273 Returns
274 -------
275 key : `str`
276 Key to use in dictionary. Case of result will match case of
277 prefix (defaulting to lower case if the first character of
278 prefix has no case).
279 """
280 if use_upper is None:
281 use_upper = prefix[0].isupper() if prefix else False
282 if prefix:
283 prefix += sep
284 k = sep.join(str(kk) for kk in keys)
285 if use_upper:
286 k = k.upper()
287 return f"{prefix}{k}"
289 @staticmethod
290 def _find_prefix_and_sep(prov_dict: Mapping[str, Any]) -> tuple[str, str] | tuple[None, None]:
291 """Given a mapping try to determine the prefix and separator for
292 provenance keys.
294 Parameters
295 ----------
296 prov_dict : `collections.abc.Mapping`
297 Mapping to scan. Assumed to include keys populated by
298 `to_flat_dict`.
300 Returns
301 -------
302 prefix : `str`
303 Prefix given to `to_flat_dict`. `None` if no provenance headers
304 were found.
305 sep : `str`
306 Separator given to `to_flat_dict`. `None` if no provenance headers
307 were found.
309 Raises
310 ------
311 ValueError
312 Raised if more than one value was found for either the prefix or
313 separator.
314 """
315 # Best keys to look for are dataid and input 0.
316 # dataid is only used in ref provenance and always has a separator.
317 # input 0 is always present if there is any input provenance.
319 def _update_matches(match: re.Match, prefixes: set[str], separators: set[str]) -> None:
320 prefix, *seps = match.groups()
321 if prefix:
322 # Will have a separator at the end.
323 prefix, sep = prefix[:-1], prefix[-1]
324 separators.add(sep)
325 prefixes.add(prefix)
326 separators |= set(seps)
328 separators: set[str] = set()
329 prefixes: set[str] = set()
331 # It is possible for there to be no inputs and just a reference
332 # dataset. If that reference dataset has no DATAID then the simple
333 # logic will not work. In that scenario look for the presence
334 # of RUN, DATASETTYPE and ID to spot that provenance exists.
335 # In this case it may not be possible to determine a sep value.
336 backup = {}
338 for k in prov_dict:
339 if match := re.match("(.*)input(.)0(.)(?:id|datasettype|run)$", k, flags=re.IGNORECASE):
340 _update_matches(match, prefixes, separators)
341 elif match := re.match(
342 # Data coordinates are a-z or underscore.
343 "(.*)dataid(.)[a-z_]+$",
344 k,
345 flags=re.IGNORECASE,
346 ):
347 _update_matches(match, prefixes, separators)
348 elif match := re.match(r"(.*)\b(id|datasettype|run)$", k, flags=re.IGNORECASE):
349 prefix, key = match.groups()
350 backup[key.lower()] = prefix
352 if not prefixes:
353 if "run" in backup and "datasettype" in backup and "id" in backup:
354 # Looks like there is a provenance after all. All 3 must be
355 # present.
356 for prefix in backup.values():
357 if prefix:
358 prefix, sep = prefix[:-1], prefix[-1]
359 else:
360 sep = " " # Will not be used.
361 prefixes.add(prefix)
362 separators.add(sep)
364 if not prefixes:
365 return None, None
367 if len(separators) > 1:
368 raise ValueError(
369 f"Inconsistent values found for separators in provenance header. Got {separators}."
370 )
371 if len(prefixes) > 1:
372 raise ValueError(f"Inconsistent values for prefix found in provenance headers. Got {prefixes}.")
373 return prefixes.pop(), separators.pop()
375 @classmethod
376 def _find_provenance_keys_in_flat_dict(cls, prov_dict: Mapping[str, Any]) -> dict[str, str]:
377 """Find the provenance keys in a dictionary.
379 Parameters
380 ----------
381 prov_dict : `collections.abc.Mapping`
382 Dictionary to be analyzed. Assumed to have been populated by
383 `to_flat_dict`.
385 Returns
386 -------
387 prov_keys : `dict` [ `str`, `str` ]
388 Provenance key as found in the given header mapping to the
389 standardized provenance key (with prefix removed and "."
390 separator).
391 """
392 prefix, sep = cls._find_prefix_and_sep(prov_dict)
394 if prefix is None:
395 return {}
396 # for mypy which can not work out that the above method returns
397 # str, str or None, None.
398 if sep is None:
399 return {}
400 if prefix:
401 # Prefix will always include the separator if it is defined.
402 prefix += sep
404 core_provenance = tuple(
405 f"{prefix}{k}".lower() for k in ("run", "id", "datasettype", "quantum", "n_inputs")
406 )
408 # Need to escape the prefix and separator for regex usage.
409 esc_sep = re.escape(sep)
410 esc_prefix = re.escape(prefix)
411 prov_keys: dict[str, str] = {}
412 for k in list(prov_dict):
413 # the input provenance can include extra keys that we cannot
414 # know so have to match solely on INPUT N.
415 found_key = False
416 if re.match(rf"{esc_prefix}input{esc_sep}(\d+){esc_sep}(.*)$", k, flags=re.IGNORECASE):
417 found_key = True
418 elif k.lower() in core_provenance:
419 found_key = True
420 elif re.match(f"{esc_prefix}dataid{esc_sep}[a-z_]+$", k, flags=re.IGNORECASE):
421 found_key = True
423 if found_key:
424 standard = k.removeprefix(prefix)
425 standard = standard.replace(sep, ".")
426 prov_keys[k] = standard.lower()
428 return prov_keys
430 @classmethod
431 def strip_provenance_from_flat_dict(cls, prov_dict: MutableMapping[str, Any]) -> None:
432 """Remove provenance keys from a mapping that had been populated
433 by `to_flat_dict`.
435 Parameters
436 ----------
437 prov_dict : `collections.abc.MutableMapping`
438 Dictionary to modify.
439 """
440 for prov_key in cls._find_provenance_keys_in_flat_dict(prov_dict):
441 del prov_dict[prov_key]
443 return
445 @classmethod
446 def from_flat_dict(cls, prov_dict: Mapping[str, Any], butler: Butler) -> tuple[Self, DatasetRef | None]:
447 """Create a provenance object from a provenance dictionary.
449 Parameters
450 ----------
451 prov_dict : `collections.abc.Mapping`
452 Dictionary populated by `to_flat_dict`.
453 butler : `lsst.daf.butler.Butler`
454 Butler to query to find references datasets.
456 Returns
457 -------
458 prov : `DatasetProvenance`
459 Provenance extracted from this object.
460 ref : `DatasetRef` or `None`
461 Dataset associated with this provenance. Can be `None` if no
462 provenance found.
464 Raises
465 ------
466 ValueError
467 Raised if no provenance values are found in the dictionary.
468 RuntimeError
469 Raised if a referenced dataset is not known to the given butler.
470 """
471 prov_keys = cls._find_provenance_keys_in_flat_dict(prov_dict)
472 if not prov_keys:
473 raise ValueError("No provenance information found in header.")
475 def _coerce_id(id_: str | uuid.UUID) -> uuid.UUID:
476 if isinstance(id_, uuid.UUID):
477 return id_
478 return uuid.UUID(hex=id_)
480 quantum_id = None
481 ref_id = None
482 input_ids: dict[int, uuid.UUID] = {}
483 extras: dict[int, dict[str, Any]] = {}
485 for k, standard in prov_keys.items():
486 if standard == "id":
487 ref_id = _coerce_id(prov_dict[k])
488 elif standard == "quantum":
489 quantum_id = _coerce_id(prov_dict[k])
490 elif match := re.match(r"input.(\d+).([a-z_]+)$", standard):
491 input_num = int(match.group(1))
492 subkey = match.group(2)
493 if subkey == "id":
494 input_ids[input_num] = _coerce_id(prov_dict[k])
495 elif subkey not in ("datasettype", "run"):
496 # Extra information. Can not know the original case
497 # but can match to the original dictionary.
498 if k[0].isupper():
499 subkey = subkey.upper()
500 extras.setdefault(input_num, {})[subkey] = prov_dict[k]
502 ref = None
503 if ref_id is not None:
504 ref = butler.get_dataset(ref_id)
505 if ref is None:
506 raise ValueError(
507 f"Dataset associated with this provenance ({ref_id}) is not known to this butler."
508 )
510 provenance = cls(quantum_id=quantum_id)
512 input_refs = {ref.id: ref for ref in butler.get_many_datasets(input_ids.values())}
513 for i in sorted(input_ids):
514 input_ref = input_refs.get(input_ids[i])
515 if input_ref is None:
516 raise ValueError(f"Input dataset ({input_ids[i]}) is not known to this butler.")
517 provenance.add_input(input_ref)
518 if i in extras:
519 provenance.add_extra_provenance(input_ref.id, extras[i])
521 return provenance, ref